From 67215f30fec0a3b5b4ad07c3cefea26207f0bc4c Mon Sep 17 00:00:00 2001 From: Andy Ning Date: Fri, 19 Oct 2018 11:06:40 -0400 Subject: [PATCH] Keystone DB sync - update dcorch to use dcdbsync This commit updates dcorch to use the newly introduced dbsync service APIs to synchronize identity resources from central cloud to subclouds. The following identity resources are synced: - users (local users only) - user passwords - projects - roles - project role assignments - token revocation events Story: 2002842 Task: 22787 Signed-off-by: Andy Ning (cherry picked from commit e9096c7a23abeebc1119b5ba3ed1e7b3a357a2ed) Depends-On: https://review.opendev.org/#/c/655921 Depends-On: https://review.opendev.org/#/c/655773 Depends-On: https://review.opendev.org/#/c/655776 Depends-On: https://review.opendev.org/#/c/655927 Change-Id: I77c2cc712a1c3dc8a228883c3fea1423e5207dea --- dcmanager/api/controllers/v1/subclouds.py | 4 +- dcmanager/manager/subcloud_manager.py | 10 +- dcorch/api/proxy/apps/controller.py | 65 +- dcorch/api/proxy/common/constants.py | 11 +- dcorch/api/proxy/common/utils.py | 93 +- dcorch/common/consts.py | 6 + dcorch/common/exceptions.py | 4 + dcorch/drivers/openstack/sysinv_v1.py | 16 +- dcorch/engine/fernet_key_manager.py | 30 +- dcorch/engine/generic_sync_manager.py | 9 +- dcorch/engine/service.py | 21 +- dcorch/engine/subcloud.py | 5 + dcorch/engine/sync_services/identity.py | 1155 +++++++++++++++++---- dcorch/engine/sync_services/sysinv.py | 8 +- dcorch/engine/sync_thread.py | 28 +- 15 files changed, 1232 insertions(+), 233 deletions(-) diff --git a/dcmanager/api/controllers/v1/subclouds.py b/dcmanager/api/controllers/v1/subclouds.py index 668ce7a49..28d4ab1cf 100644 --- a/dcmanager/api/controllers/v1/subclouds.py +++ b/dcmanager/api/controllers/v1/subclouds.py @@ -381,7 +381,9 @@ class SubcloudsController(object): ('vim', 'vim'), ('mtce', 'mtce'), ('fm', 'fm'), - ('barbican', 'barbican') + ('barbican', 'barbican'), + ('smapi', 'smapi'), + ('dcdbsync', 'dcdbsync') ] user_list = list() diff --git a/dcmanager/manager/subcloud_manager.py b/dcmanager/manager/subcloud_manager.py index ff3369af0..7b5217655 100644 --- a/dcmanager/manager/subcloud_manager.py +++ b/dcmanager/manager/subcloud_manager.py @@ -310,6 +310,7 @@ class SubcloudManager(manager.Manager): # Get the subcloud details from the database subcloud = db_api.subcloud_get(context, subcloud_id) + original_management_state = subcloud.management_state # Semantic checking if management_state: @@ -357,7 +358,14 @@ class SubcloudManager(manager.Manager): except Exception as e: LOG.exception(e) LOG.warn('Problem informing dcorch of subcloud ' - 'state change, subcloud: %s' % subcloud.name) + 'state change, resume to original state, subcloud: %s' + % subcloud.name) + management_state = original_management_state + subcloud = \ + db_api.subcloud_update(context, subcloud_id, + management_state=management_state, + description=description, + location=location) if management_state == consts.MANAGEMENT_UNMANAGED: diff --git a/dcorch/api/proxy/apps/controller.py b/dcorch/api/proxy/apps/controller.py index 673a1786c..e57fb1521 100644 --- a/dcorch/api/proxy/apps/controller.py +++ b/dcorch/api/proxy/apps/controller.py @@ -343,6 +343,7 @@ class ComputeAPIController(APIController): resource_tag = self._get_resource_tag_from_header(request_header, operation_type, resource_type) + handler = self._resource_handler[resource_tag] operation_type, resource_id, resource_info = handler( environ=environ, @@ -454,7 +455,7 @@ class IdentityAPIController(APIController): return response def _generate_assignment_rid(self, url, environ): - resource_id = '' + resource_id = None # for role assignment or revocation, the URL is of format: # /v3/projects/{project_id}/users/{user_id}/roles/{role_id} # We need to extract all ID parameters from the URL @@ -468,6 +469,23 @@ class IdentityAPIController(APIController): resource_id = "{}_{}_{}".format(proj_id, user_id, role_id) return resource_id + def _retrieve_token_revoke_event_rid(self, url, environ): + resource_id = None + # for token revocation event, we need to retrieve the audit_id + # from the token being revoked. + revoked_token = environ.get('HTTP_X_SUBJECT_TOKEN', None) + + if not revoked_token: + LOG.error("Malformed Token Revocation URL: %s", url) + else: + try: + resource_id = proxy_utils.\ + retrieve_token_audit_id(revoked_token) + except Exception as e: + LOG.error("Failed to retrieve token audit id: %s" % e) + + return resource_id + def _enqueue_work(self, environ, request_body, response): LOG.info("enqueue_work") resource_info = {} @@ -482,6 +500,26 @@ class IdentityAPIController(APIController): consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): resource_id = self._generate_assignment_rid(request_header, environ) + # grant a role to a user (PUT) creates a project role assignment + if operation_type == consts.OPERATION_TYPE_PUT: + operation_type = consts.OPERATION_TYPE_POST + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + resource_id = self._retrieve_token_revoke_event_rid(request_header, + environ) + # delete (revoke) a token (DELETE) creates a token revoke event. + if operation_type == consts.OPERATION_TYPE_DELETE and resource_id: + operation_type = consts.OPERATION_TYPE_POST + resource_info = {'token_revoke_event': + {'audit_id': resource_id}} + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS_PASSWORD): + resource_id = self.get_resource_id_from_link(request_header. + strip('/password')) + # user change password (POST) is an update to the user + if operation_type == consts.OPERATION_TYPE_POST: + operation_type = consts.OPERATION_TYPE_PATCH + resource_type = consts.RESOURCE_TYPE_IDENTITY_USERS else: if operation_type == consts.OPERATION_TYPE_POST: # Retrieve the ID from the response @@ -490,20 +528,25 @@ class IdentityAPIController(APIController): else: resource_id = self.get_resource_id_from_link(request_header) - if (operation_type != consts.OPERATION_TYPE_DELETE and request_body): + if (operation_type != consts.OPERATION_TYPE_DELETE and + request_body and (not resource_info)): resource_info = json.loads(request_body) LOG.info("%s: Resource id: (%s), type: (%s), info: (%s)", operation_type, resource_id, resource_type, resource_info) - try: - utils.enqueue_work(self.ctxt, - self.ENDPOINT_TYPE, - resource_type, - resource_id, - operation_type, - json.dumps(resource_info)) - except exception.ResourceNotFound as e: - raise webob.exc.HTTPNotFound(explanation=e.format_message()) + + if resource_id: + try: + utils.enqueue_work(self.ctxt, + self.ENDPOINT_TYPE, + resource_type, + resource_id, + operation_type, + json.dumps(resource_info)) + except exception.ResourceNotFound as e: + raise webob.exc.HTTPNotFound(explanation=e.format_message()) + else: + LOG.warning("Empty resource id for resource: %s", operation_type) class CinderAPIController(APIController): diff --git a/dcorch/api/proxy/common/constants.py b/dcorch/api/proxy/common/constants.py index 0d4e33c8d..e1373f7c7 100755 --- a/dcorch/api/proxy/common/constants.py +++ b/dcorch/api/proxy/common/constants.py @@ -297,7 +297,10 @@ IDENTITY_PROJECTS_PATH = [ IDENTITY_PROJECTS_ROLE_PATH = [ '/v3/projects/{project_id}/users/{user_id}/roles/{role_id}', +] +IDENTITY_TOKEN_REVOKE_EVENTS_PATH = [ + '/v3/auth/tokens', ] IDENTITY_PATH_MAP = { @@ -306,7 +309,9 @@ IDENTITY_PATH_MAP = { consts.RESOURCE_TYPE_IDENTITY_ROLES: IDENTITY_ROLES_PATH, consts.RESOURCE_TYPE_IDENTITY_PROJECTS: IDENTITY_PROJECTS_PATH, consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: - IDENTITY_PROJECTS_ROLE_PATH + IDENTITY_PROJECTS_ROLE_PATH, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + IDENTITY_TOKEN_REVOKE_EVENTS_PATH, } ROUTE_METHOD_MAP = { @@ -362,7 +367,9 @@ ROUTE_METHOD_MAP = { consts.RESOURCE_TYPE_IDENTITY_PROJECTS: ['POST', 'PATCH', 'DELETE'], consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: - ['PUT', 'DELETE'] + ['PUT', 'DELETE'], + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + ['DELETE'] } } diff --git a/dcorch/api/proxy/common/utils.py b/dcorch/api/proxy/common/utils.py index a457cd1a8..33c267728 100644 --- a/dcorch/api/proxy/common/utils.py +++ b/dcorch/api/proxy/common/utils.py @@ -13,10 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dcorch.common import consts -from oslo_log import log as logging +import base64 +from cryptography import fernet +import msgpack +import six from six.moves.urllib.parse import urlparse +from keystoneauth1 import exceptions as keystone_exceptions +from oslo_log import log as logging + +from dcorch.common import consts +from dcorch.drivers.openstack import sdk_platform as sdk + LOG = logging.getLogger(__name__) @@ -108,3 +116,84 @@ def set_request_forward_environ(req, remote_host, remote_port): if ('REMOTE_ADDR' in req.environ and 'HTTP_X_FORWARDED_FOR' not in req.environ): req.environ['HTTP_X_FORWARDED_FOR'] = req.environ['REMOTE_ADDR'] + + +def _get_fernet_keys(): + """Get fernet keys from sysinv.""" + os_client = sdk.OpenStackDriver(consts.CLOUD_0) + try: + key_list = os_client.sysinv_client.get_fernet_keys() + return [str(getattr(key, 'key')) for key in key_list] + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure) as e: + LOG.info("get_fernet_keys: cloud {} is not reachable [{}]" + .format(consts.CLOUD_0, str(e))) + os_client.delete_region_clients(consts.CLOUD_0) + return None + except (AttributeError, TypeError) as e: + LOG.info("get_fernet_keys error {}".format(e)) + os_client.delete_region_clients(consts.CLOUD_0, clear_token=True) + return None + except Exception as e: + LOG.exception(e) + return None + + +def _restore_padding(token): + """Restore padding based on token size. + + :param token: token to restore padding on + :returns: token with correct padding + """ + + # Re-inflate the padding + mod_returned = len(token) % 4 + if mod_returned: + missing_padding = 4 - mod_returned + token += b'=' * missing_padding + return token + + +def _unpack_token(fernet_token, fernet_keys): + """Attempt to unpack a token using the supplied Fernet keys. + + :param fernet_token: token to unpack + :type fernet_token: string + :param fernet_keys: a list consisting of keys in the repository + :type fernet_keys: list + :returns: the token payload + """ + + # create a list of fernet instances + fernet_instances = [fernet.Fernet(key) for key in fernet_keys] + # create a encryption/decryption object from the fernet keys + crypt = fernet.MultiFernet(fernet_instances) + + # attempt to decode the token + token = _restore_padding(six.binary_type(fernet_token)) + serialized_payload = crypt.decrypt(token) + payload = msgpack.unpackb(serialized_payload) + + # present token values + return payload + + +def retrieve_token_audit_id(fernet_token): + """Attempt to retrieve the audit id from the fernet token. + + :param fernet_token: + :param keys_repository: + :return: audit id in base64 encoded (without paddings) + """ + + audit_id = None + fernet_keys = _get_fernet_keys() + LOG.info("fernet_keys: {}".format(fernet_keys)) + + if fernet_keys: + unpacked_token = _unpack_token(fernet_token, fernet_keys) + if unpacked_token: + audit_id = unpacked_token[-1][0] + audit_id = base64.urlsafe_b64encode(audit_id).rstrip('=') + + return audit_id diff --git a/dcorch/common/consts.py b/dcorch/common/consts.py index 1b0a36052..53bb11b43 100644 --- a/dcorch/common/consts.py +++ b/dcorch/common/consts.py @@ -122,6 +122,8 @@ RESOURCE_TYPE_IDENTITY_USERS_PASSWORD = "users_password" RESOURCE_TYPE_IDENTITY_ROLES = "roles" RESOURCE_TYPE_IDENTITY_PROJECTS = "projects" RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS = "project_role_assignments" +RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS = "revoke_events" +RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER = "revoke_events_for_user" KEYPAIR_ID_DELIM = "/" @@ -148,6 +150,10 @@ ENDPOINT_QUOTA_MAPPING = { KS_ENDPOINT_INTERNAL = "internal" KS_ENDPOINT_DEFAULT = KS_ENDPOINT_INTERNAL +# DB sync agent endpoint +DBS_ENDPOINT_INTERNAL = "internal" +DBS_ENDPOINT_DEFAULT = DBS_ENDPOINT_INTERNAL + # Do we need separate patch/put operations or could we just use # create/update/delete and have the sync code know which HTTP # operation to use? diff --git a/dcorch/common/exceptions.py b/dcorch/common/exceptions.py index 73f66b9b5..6d5ba1f9a 100644 --- a/dcorch/common/exceptions.py +++ b/dcorch/common/exceptions.py @@ -170,6 +170,10 @@ class SubcloudNotFound(NotFound): message = _("Subcloud %(region_name)s not found") +class ThreadNotFound(NotFound): + message = _("Thread %(thread_name)s of %(region_name)s not found") + + class OrchJobNotFound(NotFound): message = _("OrchJob %(orch_job)s not found") diff --git a/dcorch/drivers/openstack/sysinv_v1.py b/dcorch/drivers/openstack/sysinv_v1.py index 2251019e6..01500b6e6 100644 --- a/dcorch/drivers/openstack/sysinv_v1.py +++ b/dcorch/drivers/openstack/sysinv_v1.py @@ -74,6 +74,10 @@ class SysinvClient(base.DriverBase): token = session.get_token() client = cgts_client.Client( api_version, + username=session.auth._username, + password=session.auth._password, + tenant_name=session.auth._project_name, + auth_url=session.auth.auth_url, endpoint=endpoint, token=token) except exceptions.ServiceUnavailable: @@ -710,7 +714,7 @@ class SysinvClient(base.DriverBase): return iuser - def create_fernet_repo(self, key_list): + def post_fernet_repo(self, key_list=None): """Add the fernet keys for this region :param: key list payload @@ -721,26 +725,26 @@ class SysinvClient(base.DriverBase): # [{"id": 0, "key": "GgDAOfmyr19u0hXdm5r_zMgaMLjglVFpp5qn_N4GBJQ="}, # {"id": 1, "key": "7WfL_z54p67gWAkOmQhLA9P0ZygsbbJcKgff0uh28O8="}, # {"id": 2, "key": ""5gsUQeOZ2FzZP58DN32u8pRKRgAludrjmrZFJSOHOw0="}] - LOG.info("create_fernet_repo driver region={} " + LOG.info("post_fernet_repo driver region={} " "fernet_repo_list={}".format(self.region_name, key_list)) try: self.client.fernet.create(key_list) except Exception as e: - LOG.error("create_fernet_repo exception={}".format(e)) + LOG.error("post_fernet_repo exception={}".format(e)) raise exceptions.SyncRequestFailedRetry() - def update_fernet_repo(self, key_list): + def put_fernet_repo(self, key_list): """Update the fernet keys for this region :param: key list payload :return: Nothing """ - LOG.info("update_fernet_repo driver region={} " + LOG.info("put_fernet_repo driver region={} " "fernet_repo_list={}".format(self.region_name, key_list)) try: self.client.fernet.put(key_list) except Exception as e: - LOG.error("update_fernet_repo exception={}".format(e)) + LOG.error("put_fernet_repo exception={}".format(e)) raise exceptions.SyncRequestFailedRetry() def get_fernet_keys(self): diff --git a/dcorch/engine/fernet_key_manager.py b/dcorch/engine/fernet_key_manager.py index a35ebdc5d..ccf1651b8 100644 --- a/dcorch/engine/fernet_key_manager.py +++ b/dcorch/engine/fernet_key_manager.py @@ -27,7 +27,6 @@ from dcorch.common.i18n import _ from dcorch.common import manager from dcorch.common import utils from dcorch.drivers.openstack import sdk_platform as sdk -from dcorch.objects import subcloud as subcloud_obj FERNET_REPO_MASTER_ID = "keys" @@ -117,9 +116,26 @@ class FernetKeyManager(manager.Manager): self._schedule_work(consts.OPERATION_TYPE_PUT) def distribute_keys(self, ctxt, subcloud_name): - subclouds = subcloud_obj.SubcloudList.get_all(ctxt) - for sc in subclouds: - if sc.region_name == subcloud_name: - subcloud = sc - self._schedule_work(consts.OPERATION_TYPE_CREATE, subcloud) - break + keys = self._get_master_keys() + if not keys: + LOG.info(_("No fernet keys returned from %s") % consts.CLOUD_0) + return + resource_info = FernetKeyManager.to_resource_info(keys) + key_list = FernetKeyManager.from_resource_info(resource_info) + self.update_fernet_repo(subcloud_name, key_list) + + def reset_keys(self, subcloud_name): + self.update_fernet_repo(subcloud_name) + + @staticmethod + def update_fernet_repo(subcloud_name, key_list=None): + try: + os_client = sdk.OpenStackDriver(subcloud_name) + os_client.sysinv_client.post_fernet_repo(key_list) + except (exceptions.ConnectionRefused, exceptions.NotAuthorized, + exceptions.TimeOut): + LOG.info(_("Update the fernet repo on %s timeout") % + subcloud_name) + except Exception as e: + error_msg = "subcloud: {}, {}".format(subcloud_name, e.message) + LOG.info(_("Fail to update fernet repo %s") % error_msg) diff --git a/dcorch/engine/generic_sync_manager.py b/dcorch/engine/generic_sync_manager.py index 9de9226a7..d08435451 100644 --- a/dcorch/engine/generic_sync_manager.py +++ b/dcorch/engine/generic_sync_manager.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from oslo_log import log as logging from dcorch.common import exceptions @@ -85,6 +84,14 @@ class GenericSyncManager(object): except KeyError: raise exceptions.SubcloudNotFound(region_name=subcloud_name) + def initial_sync(self, context, subcloud_name): + try: + subcloud_engine = self.subcloud_engines[subcloud_name] + LOG.info('Initial sync subcloud %(sc)s' % {'sc': subcloud_name}) + subcloud_engine.initial_sync() + except KeyError: + raise exceptions.SubcloudNotFound(region_name=subcloud_name) + def run_sync_audit(self): for subcloud_engine in self.subcloud_engines.values(): subcloud_engine.run_sync_audit() diff --git a/dcorch/engine/service.py b/dcorch/engine/service.py index bd1288aef..c5399d5ff 100644 --- a/dcorch/engine/service.py +++ b/dcorch/engine/service.py @@ -191,11 +191,26 @@ class EngineService(service.Service): # keep equivalent functionality for now if (management_state == dcm_consts.MANAGEMENT_MANAGED) and \ (availability_status == dcm_consts.AVAILABILITY_ONLINE): - self.fkm.distribute_keys(ctxt, subcloud_name) - self.aam.enable_snmp(ctxt, subcloud_name) - self.gsm.enable_subcloud(ctxt, subcloud_name) + # Initial identity sync. It's synchronous so that identity + # get synced before fernet token keys are synced. This is + # necessary since we want to revoke all existing tokens on + # this subcloud after its services user IDs and project + # IDs are changed. Otherwise subcloud services will fail + # authentication since they keep on using their existing tokens + # issued before these IDs change, until these tokens expires. + try: + self.gsm.initial_sync(ctxt, subcloud_name) + self.fkm.distribute_keys(ctxt, subcloud_name) + self.aam.enable_snmp(ctxt, subcloud_name) + self.gsm.enable_subcloud(ctxt, subcloud_name) + except Exception as ex: + LOG.warning('Update subcloud state failed for %s: %s', + subcloud_name, six.text_type(ex)) + raise else: self.gsm.disable_subcloud(ctxt, subcloud_name) + if (management_state == dcm_consts.MANAGEMENT_UNMANAGED): + self.fkm.reset_keys(subcloud_name) @request_context # todo: add authentication since ctxt not actually needed later diff --git a/dcorch/engine/subcloud.py b/dcorch/engine/subcloud.py index 28c6d4611..5fa5b5fa0 100644 --- a/dcorch/engine/subcloud.py +++ b/dcorch/engine/subcloud.py @@ -106,6 +106,11 @@ class SubCloudEngine(object): self.shutdown() self.subcloud.delete() + def initial_sync(self): + # initial synchronization of the subcloud + for thread in self.sync_threads: + thread.initial_sync() + def run_sync_audit(self): # run periodic sync audit on all threads in this subcloud if self.is_enabled(): diff --git a/dcorch/engine/sync_services/identity.py b/dcorch/engine/sync_services/identity.py index 6658b7eb7..6a6a45d76 100644 --- a/dcorch/engine/sync_services/identity.py +++ b/dcorch/engine/sync_services/identity.py @@ -13,18 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import keyring +import base64 from collections import namedtuple + +from dcdbsync.dbsyncclient import client as dbsyncclient +from dcdbsync.dbsyncclient import exceptions as dbsync_exceptions +from dcorch.common import consts +from dcorch.common import exceptions +from dcorch.engine.sync_thread import SyncThread + from keystoneauth1 import exceptions as keystone_exceptions from keystoneclient import client as keystoneclient from oslo_log import log as logging from oslo_serialization import jsonutils -from dcorch.common import consts -from dcorch.common import exceptions -from dcorch.engine.sync_thread import SyncThread LOG = logging.getLogger(__name__) @@ -46,41 +50,63 @@ class IdentitySyncThread(SyncThread): self.sync_identity_resource, consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: self.sync_identity_resource, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + self.sync_identity_resource, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + self.sync_identity_resource, } # Since services may use unscoped tokens, it is essential to ensure # that users are replicated prior to assignment data (roles/projects) self.audit_resources = [ consts.RESOURCE_TYPE_IDENTITY_USERS, - consts.RESOURCE_TYPE_IDENTITY_ROLES, consts.RESOURCE_TYPE_IDENTITY_PROJECTS, - consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS + consts.RESOURCE_TYPE_IDENTITY_ROLES, + consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER ] # For all the resource types, we need to filter out certain # resources self.filtered_audit_resources = { consts.RESOURCE_TYPE_IDENTITY_USERS: - ['admin', 'mtce', 'heat_admin', - 'cinder' + self.subcloud_engine.subcloud.region_name], + ['dcdbsync', 'dcorch', 'dcmanager', 'heat_admin', 'smapi', + 'fm', 'cinder' + self.subcloud_engine.subcloud.region_name], consts.RESOURCE_TYPE_IDENTITY_ROLES: ['heat_stack_owner', 'heat_stack_user', 'ResellerAdmin'], consts.RESOURCE_TYPE_IDENTITY_PROJECTS: - ['admin', 'services'] + [] } self.log_extra = {"instance": "{}/{}: ".format( self.subcloud_engine.subcloud.region_name, self.endpoint_type)} self.sc_ks_client = None + self.sc_dbs_client = None self.initialize() LOG.info("IdentitySyncThread initialized", extra=self.log_extra) def initialize_sc_clients(self): super(IdentitySyncThread, self).initialize_sc_clients() + # create a keystone client for the subcloud if (not self.sc_ks_client and self.sc_admin_session): self.sc_ks_client = keystoneclient.Client( session=self.sc_admin_session, endpoint_type=consts.KS_ENDPOINT_INTERNAL, region_name=self.subcloud_engine.subcloud.region_name) + # create a dbsync client for the subcloud + if (not self.sc_dbs_client and self.sc_admin_session): + self.sc_dbs_client = dbsyncclient.Client( + session=self.sc_admin_session, + endpoint_type=consts.DBS_ENDPOINT_INTERNAL, + region_name=self.subcloud_engine.subcloud.region_name) + + def reinitialize_m_clients(self): + if self.m_dbs_client and self.admin_session: + self.m_dbs_client.update(session=self.admin_session) + + def reinitialize_sc_clients(self): + if self.sc_dbs_client and self.sc_admin_session: + self.sc_dbs_client.update(session=self.sc_admin_session) def initialize(self): # Subcloud may be enabled a while after being added. @@ -93,9 +119,152 @@ class IdentitySyncThread(SyncThread): # subcloud specific version self.m_ks_client = self.ks_client + # We initialize a master version of the dbsync client, and a + # subcloud specific version + self.m_dbs_client = self.dbs_client + LOG.info("Identity session and clients initialized", extra=self.log_extra) + def _initial_sync_users(self, m_users, sc_users): + # Particularly sync users with same name but different ID + m_client = self.m_dbs_client.identity_manager + sc_client = self.sc_dbs_client.identity_manager + + for m_user in m_users: + for sc_user in sc_users: + if (m_user.local_user.name == sc_user.local_user.name and + m_user.domain_id == sc_user.domain_id and + m_user.id != sc_user.id): + user_records = m_client.user_detail(m_user.id) + if not user_records: + LOG.error("No data retrieved from master cloud for" + " user {} to update its equivalent in" + " subcloud.".format(m_user.id)) + raise exceptions.SyncRequestFailed + # update the user by pushing down the DB records to + # subcloud + try: + user_ref = sc_client.update_user(sc_user.id, + user_records) + # Retry once if unauthorized + except dbsync_exceptions.Unauthorized as e: + LOG.info("Update user {} request failed for {}: {}." + .format(sc_user.id, + self.subcloud_engine.subcloud. + region_name, str(e))) + self.reinitialize_sc_clients() + user_ref = sc_client.update_user(sc_user.id, + user_records) + + if not user_ref: + LOG.error("No user data returned when updating user {}" + " in subcloud.".format(sc_user.id)) + raise exceptions.SyncRequestFailed + # If admin user get synced, the client need to + # re-authenticate. + if sc_user.local_user.name == "admin": + self.reinitialize_sc_clients() + + def _initial_sync_projects(self, m_projects, sc_projects): + # Particularly sync projects with same name but different ID. + m_client = self.m_dbs_client.project_manager + sc_client = self.sc_dbs_client.project_manager + + for m_project in m_projects: + for sc_project in sc_projects: + if (m_project.name == sc_project.name and + m_project.domain_id == sc_project.domain_id and + m_project.id != sc_project.id): + project_records = m_client.project_detail(m_project.id) + if not project_records: + LOG.error("No data retrieved from master cloud for" + " project {} to update its equivalent in" + " subcloud.".format(m_project.id)) + raise exceptions.SyncRequestFailed + # update the project by pushing down the DB records to + # subcloud + try: + project_ref = sc_client.update_project(sc_project.id, + project_records) + # Retry once if unauthorized + except dbsync_exceptions.Unauthorized as e: + LOG.info("Update project {} request failed for {}: {}." + .format(sc_project.id, + self.subcloud_engine.subcloud. + region_name, str(e))) + self.reinitialize_sc_clients() + project_ref = sc_client.update_project(sc_project.id, + project_records) + + if not project_ref: + LOG.error("No project data returned when updating" + " project {} in subcloud.". + format(sc_project.id)) + raise exceptions.SyncRequestFailed + # If admin project get synced, the client need to + # re-authenticate. + if sc_project.name == "admin": + self.reinitialize_sc_clients() + + def initial_sync(self): + # Service users and projects are created at deployment time. They exist + # before dcorch starts to audit resources. Later on when dcorch audits + # and sync them over(including their IDs) to the subcloud, running + # services at the subcloud with tokens issued before their ID are + # changed will get user/project not found error since their IDs are + # changed. This will continue until their tokens expire in up to + # 1 hour. Before that these services basically stop working. + # By an initial synchronization on existing users/projects, + # synchronously followed by a fernet keys synchronization, existing + # tokens at subcloud are revoked and services are forced to + # re-authenticate to get new tokens. This significantly decreases + # service recovery time at subcloud. + self.initialize_sc_clients() + + # get users from master cloud + m_users = self.get_master_resources( + consts.RESOURCE_TYPE_IDENTITY_USERS) + + if not m_users: + LOG.error("No users returned from {}". + format(consts.VIRTUAL_MASTER_CLOUD)) + raise exceptions.SyncRequestFailed + + # get users from the subcloud + sc_users = self.get_subcloud_resources( + consts.RESOURCE_TYPE_IDENTITY_USERS) + + if not sc_users: + LOG.error("No users returned from subcloud {}". + format(self.subcloud_engine.subcloud.region_name)) + raise exceptions.SyncRequestFailed + + self._initial_sync_users(m_users, sc_users) + + # get projects from master cloud + m_projects = self.get_master_resources( + consts.RESOURCE_TYPE_IDENTITY_PROJECTS) + + if not m_projects: + LOG.error("No projects returned from {}". + format(consts.VIRTUAL_MASTER_CLOUD)) + raise exceptions.SyncRequestFailed + + # get projects from the subcloud + sc_projects = self.get_subcloud_resources( + consts.RESOURCE_TYPE_IDENTITY_PROJECTS) + + if not sc_projects: + LOG.error("No projects returned from subcloud {}". + format(self.subcloud_engine.subcloud.region_name)) + raise exceptions.SyncRequestFailed + + self._initial_sync_projects(m_projects, sc_projects) + + # Return True if no exceptions + return True + def sync_identity_resource(self, request, rsrc): self.initialize_sc_clients() # Invoke function with name format "operationtype_resourcetype" @@ -106,28 +275,31 @@ class IdentitySyncThread(SyncThread): # We therefore recognize those triggers and convert them to # POST operations operation_type = request.orch_job.operation_type - rtype_role_assignments = \ - consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS if operation_type == consts.OPERATION_TYPE_CREATE: - if (rsrc.resource_type == rtype_role_assignments): - operation_type = consts.OPERATION_TYPE_PUT - else: - operation_type = consts.OPERATION_TYPE_POST + operation_type = consts.OPERATION_TYPE_POST - func_name = operation_type + \ - "_" + rsrc.resource_type + func_name = operation_type + "_" + rsrc.resource_type getattr(self, func_name)(request, rsrc) except AttributeError: LOG.error("{} not implemented for {}" - .format(operation_type, + .format(request.orch_job.operation_type, rsrc.resource_type)) raise exceptions.SyncRequestFailed except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.error("sync_identity_resource: {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) raise exceptions.SyncRequestTimeout + except dbsync_exceptions.Unauthorized as e: + LOG.info("Request [{}] failed for {}: {}" + .format(request.orch_job.operation_type, + self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + self.reinitialize_sc_clients() + raise exceptions.SyncRequestFailedRetry except exceptions.SyncRequestFailed: raise except Exception as e: @@ -136,96 +308,90 @@ class IdentitySyncThread(SyncThread): def post_users(self, request, rsrc): # Create this user on this subcloud - user_dict = jsonutils.loads(request.orch_job.resource_info) - if 'user' in user_dict.keys(): - user_dict = user_dict['user'] - - # (NOTE: knasim-wrs): If the user create request contains - # "default_project_id" or "domain_id" then we need to remove - # both these fields, since it is highly unlikely that these - # IDs would exist on the subcloud, i.e. the ID for the "services" - # project on subcloud-X will be different to the ID for the - # project on Central Region. - # These fields are optional anyways since a subsequent role - # assignment will give the same scoping - # - # If these do need to be synced in the future then - # procure the project / domain list for this subcloud first - # and use IDs from that. - user_dict.pop('default_project_id', None) - user_dict.pop('domain_id', None) - username = user_dict.pop('name', None) # compulsory - if not username: + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + user_id = request.orch_job.source_resource_id + if not user_id: LOG.error("Received user create request without required " - "'name' field", extra=self.log_extra) + "'source_resource_id' field", extra=self.log_extra) raise exceptions.SyncRequestFailed - password = user_dict.pop('password', None) # compulsory - if not password: - # this user creation request may have been generated - # from the Identity Audit, in which case this password - # would not be present in the resource info. We will - # attempt to retrieve it from Keyring, failing which - # we cannot proceed. + # Retrieve DB records of the user just created. The records is in JSON + # format + user_records = self.m_dbs_client.identity_manager.user_detail(user_id) + if not user_records: + LOG.error("No data retrieved from master cloud for user {} to" + " create its equivalent in subcloud.".format(user_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - # TODO(knasim-wrs): Set Service as constant - password = keyring.get_password('CGCS', username) - if not password: - LOG.error("Received user create request without required " - "'password' field and cannot retrieve from " - "Keyring either", extra=self.log_extra) - raise exceptions.SyncRequestFailed - - # Create the user in the subcloud - user_ref = self.sc_ks_client.users.create( - name=username, - domain=user_dict.pop('domain', None), - password=password, - email=user_dict.pop('email', None), - description=user_dict.pop('description', None), - enabled=user_dict.pop('enabled', True), - project=user_dict.pop('project', None), - default_project=user_dict.pop('default_project', None)) - - user_ref_id = user_ref.id + # Create the user on subcloud by pushing the DB records to subcloud + user_ref = self.sc_dbs_client.identity_manager.add_user(user_records) + if not user_ref: + LOG.error("No user data returned when creating user {} in" + " subcloud.".format(user_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + user_ref_id = user_ref.get('user').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, user_ref_id) + username = user_ref.get('local_user').get('name') LOG.info("Created Keystone user {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, username), extra=self.log_extra) - def post_users_password(self, request, rsrc): - # Update this user's password on this subcloud + def put_users(self, request, rsrc): + # Update this user on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + user_id = request.orch_job.source_resource_id + if not user_id: + LOG.error("Received user update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + user_dict = jsonutils.loads(request.orch_job.resource_info) - oldpw = user_dict.pop('original_password', None) - newpw = user_dict.pop('password', None) - if (not oldpw or not newpw): - LOG.error("Received users password change request without " - "required original password or new password field", + if 'user' in user_dict.keys(): + user_dict = user_dict['user'] + + sc_user_id = user_dict.pop('id', None) + if not sc_user_id: + LOG.error("Received user update request without required " + "subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the user. The records is in JSON + # format + user_records = self.m_dbs_client.identity_manager.user_detail(user_id) + if not user_records: + LOG.error("No data retrieved from master cloud for user {} to" + " update its equivalent in subcloud.".format(user_id), extra=self.log_extra) raise exceptions.SyncRequestFailed - # NOTE (knasim-wrs): We can only update the password of the ADMIN - # user, that is the one used to establish this subcloud session, - # since the default behavior within the keystone client is to - # take the user_id from within the client context (client.user_id) - - # user_id for this resource was passed in via URL and extracted - # into the resource_id - if (self.sc_ks_client.user_id == rsrc.id): - self.sc_ks_client.users.update_password(oldpw, newpw) - LOG.info("Updated password for user {}".format(rsrc.id), - extra=self.log_extra) - - else: - LOG.error("User {} requested a modification to its password. " - "Can only self-modify for user {}. Consider updating " - "the password for {} using the Admin user" - .format(rsrc.id, self.sc_ks_client.user_id, rsrc.id)) + # Update the corresponding user on subcloud by pushing the DB records + # to subcloud + user_ref = self.sc_dbs_client.identity_manager.\ + update_user(sc_user_id, user_records) + if not user_ref: + LOG.error("No user data returned when updating user {} in" + " subcloud.".format(sc_user_id), extra=self.log_extra) raise exceptions.SyncRequestFailed + # Persist the subcloud resource. + user_ref_id = user_ref.get('user').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + user_ref_id) + username = user_ref.get('local_user').get('name') + LOG.info("Updated Keystone user {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, username), + extra=self.log_extra) + def patch_users(self, request, rsrc): # Update user reference on this subcloud user_update_dict = jsonutils.loads(request.orch_job.resource_info) @@ -290,7 +456,15 @@ class IdentitySyncThread(SyncThread): original_user_ref = UserReferenceWrapper(id=user_id) # Delete the user in the subcloud - self.sc_ks_client.users.delete(original_user_ref) + try: + self.sc_ks_client.users.delete(original_user_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete user: user {} not found in {}, " + "considered as deleted.". + format(original_user_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone user {}:{} [{}] deleted" @@ -301,31 +475,90 @@ class IdentitySyncThread(SyncThread): def post_projects(self, request, rsrc): # Create this project on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + project_id = request.orch_job.source_resource_id + if not project_id: + LOG.error("Received project create request without required " + "'source_resource_id' field", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the project just created. + # The records is in JSON format. + project_records = self.m_dbs_client.project_manager.\ + project_detail(project_id) + if not project_records: + LOG.error("No data retrieved from master cloud for project {} to" + " create its equivalent in subcloud.".format(project_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the project on subcloud by pushing the DB records to subcloud + project_ref = self.sc_dbs_client.project_manager.\ + add_project(project_records) + if not project_ref: + LOG.error("No project data returned when creating project {} in" + " subcloud.".format(project_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Persist the subcloud resource. + project_ref_id = project_ref.get('project').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + project_ref_id) + projectname = project_ref.get('project').get('name') + LOG.info("Created Keystone project {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, projectname), + extra=self.log_extra) + + def put_projects(self, request, rsrc): + # Update this project on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + project_id = request.orch_job.source_resource_id + if not project_id: + LOG.error("Received project update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + project_dict = jsonutils.loads(request.orch_job.resource_info) if 'project' in project_dict.keys(): project_dict = project_dict['project'] - projectname = project_dict.pop('name', None) # compulsory - projectdomain = project_dict.pop('domain_id', 'default') # compulsory - if not projectname: - LOG.error("Received project create request without required " - "'name' field", extra=self.log_extra) + sc_project_id = project_dict.pop('id', None) + if not sc_project_id: + LOG.error("Received project update request without required " + "subcloud resource id", extra=self.log_extra) raise exceptions.SyncRequestFailed - # Create the project in the subcloud - project_ref = self.sc_ks_client.projects.create( - name=projectname, - domain=projectdomain, - description=project_dict.pop('description', None), - enabled=project_dict.pop('enabled', True), - parent=project_dict.pop('parent_id', None)) + # Retrieve DB records of the project. The records is in JSON + # format + project_records = self.m_dbs_client.project_manager.\ + project_detail(project_id) + if not project_records: + LOG.error("No data retrieved from master cloud for project {} to" + " update its equivalent in subcloud.".format(project_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - project_ref_id = project_ref.id + # Update the corresponding project on subcloud by pushing the DB + # records to subcloud + project_ref = self.sc_dbs_client.project_manager.\ + update_project(sc_project_id, project_records) + if not project_ref: + LOG.error("No project data returned when updating project {} in" + " subcloud.".format(sc_project_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + project_ref_id = project_ref.get('project').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, project_ref_id) - LOG.info("Created Keystone project {}:{} [{}]" + projectname = project_ref.get('project').get('name') + LOG.info("Updated Keystone project {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, projectname), extra=self.log_extra) @@ -349,7 +582,7 @@ class IdentitySyncThread(SyncThread): # instead of stowing the entire project reference or # retrieving it, we build an opaque wrapper for the # v3 ProjectManager, containing the ID field which is - # needed to update this user reference + # needed to update this project reference ProjectReferenceWrapper = namedtuple('ProjectReferenceWrapper', 'id') proj_id = project_subcloud_rsrc.subcloud_resource_id original_proj_ref = ProjectReferenceWrapper(id=proj_id) @@ -388,7 +621,15 @@ class IdentitySyncThread(SyncThread): original_proj_ref = ProjectReferenceWrapper(id=proj_id) # Delete the project in the subcloud - self.sc_ks_client.projects.delete(original_proj_ref) + try: + self.sc_ks_client.projects.delete(original_proj_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete project: project {} not found in {}, " + "considered as deleted.". + format(original_proj_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone project {}:{} [{}] deleted" @@ -399,27 +640,90 @@ class IdentitySyncThread(SyncThread): def post_roles(self, request, rsrc): # Create this role on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + role_id = request.orch_job.source_resource_id + if not role_id: + LOG.error("Received role create request without required " + "'source_resource_id' field", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the role just created. The records is in JSON + # format. + role_records = self.m_dbs_client.role_manager.\ + role_detail(role_id) + if not role_records: + LOG.error("No data retrieved from master cloud for role {} to" + " create its equivalent in subcloud.".format(role_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the role on subcloud by pushing the DB records to subcloud + role_ref = self.sc_dbs_client.role_manager.\ + add_role(role_records) + if not role_ref: + LOG.error("No role data returned when creating role {} in" + " subcloud.".format(role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Persist the subcloud resource. + role_ref_id = role_ref.get('role').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + role_ref_id) + rolename = role_ref.get('role').get('name') + LOG.info("Created Keystone role {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, rolename), + extra=self.log_extra) + + def put_roles(self, request, rsrc): + # Update this role on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + role_id = request.orch_job.source_resource_id + if not role_id: + LOG.error("Received role update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + role_dict = jsonutils.loads(request.orch_job.resource_info) if 'role' in role_dict.keys(): role_dict = role_dict['role'] - rolename = role_dict.pop('name', None) # compulsory - if not rolename: - LOG.error("Received role create request without required " - "'name' field", extra=self.log_extra) + sc_role_id = role_dict.pop('id', None) + if not sc_role_id: + LOG.error("Received role update request without required " + "subcloud resource id", extra=self.log_extra) raise exceptions.SyncRequestFailed - # Create the role in the subcloud - role_ref = self.sc_ks_client.roles.create( - name=rolename, - domain=role_dict.pop('domain_id', None)) + # Retrieve DB records of the role. The records is in JSON + # format + role_records = self.m_dbs_client.role_manager.\ + role_detail(role_id) + if not role_records: + LOG.error("No data retrieved from master cloud for role {} to" + " update its equivalent in subcloud.".format(role_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - role_ref_id = role_ref.id + # Update the corresponding role on subcloud by pushing the DB records + # to subcloud + role_ref = self.sc_dbs_client.role_manager.\ + update_role(sc_role_id, role_records) + if not role_ref: + LOG.error("No role data returned when updating role {} in" + " subcloud.".format(role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + role_ref_id = role_ref.get('role').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, role_ref_id) - LOG.info("Created Keystone role {}:{} [{}]" + rolename = role_ref.get('role').get('name') + LOG.info("Updated Keystone role {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, rolename), extra=self.log_extra) @@ -479,7 +783,15 @@ class IdentitySyncThread(SyncThread): original_role_ref = RoleReferenceWrapper(id=role_id) # Delete the role in the subcloud - self.sc_ks_client.roles.delete(original_role_ref) + try: + self.sc_ks_client.roles.delete(original_role_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete role: role {} not found in {}, " + "considered as deleted.". + format(original_role_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone role {}:{} [{}] deleted" @@ -488,8 +800,10 @@ class IdentitySyncThread(SyncThread): extra=self.log_extra) role_subcloud_rsrc.delete() - def put_project_role_assignments(self, request, rsrc): + def post_project_role_assignments(self, request, rsrc): # Assign this role to user on project on this subcloud + # Project role assignments creation is still using keystone APIs since + # the APIs can be used to sync them. resource_tags = rsrc.master_id.split('_') if len(resource_tags) < 3: LOG.error("Malformed resource tag {} expected to be in " @@ -569,6 +883,12 @@ class IdentitySyncThread(SyncThread): else: LOG.error("Unable to update Keystone role assignment {}:{} " .format(rsrc.id, sc_role), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + def put_project_role_assignments(self, request, rsrc): + # update the project role assignment on this subcloud + # For project role assignment, there is nothing to update. + return def delete_project_role_assignments(self, request, rsrc): # Revoke this role for user on project on this subcloud @@ -587,7 +907,7 @@ class IdentitySyncThread(SyncThread): subcloud_rid = assignment_subcloud_rsrc.subcloud_resource_id resource_tags = subcloud_rid.split('_') if len(resource_tags) < 3: - LOG.error("Malformed subcloud resource tag {} expected to be in " + LOG.error("Malformed subcloud resource tag {}, expected to be in " "format: ProjectID_UserID_RoleID." .format(assignment_subcloud_rsrc), extra=self.log_extra) assignment_subcloud_rsrc.delete() @@ -598,10 +918,17 @@ class IdentitySyncThread(SyncThread): role_id = resource_tags[2] # Revoke role assignment - self.sc_ks_client.roles.revoke( - role_id, - user=user_id, - project=project_id) + try: + self.sc_ks_client.roles.revoke( + role_id, + user=user_id, + project=project_id) + except keystone_exceptions.NotFound: + LOG.info("Revoke role assignment: (role {}, user {}, project {})" + " not found in {}, considered as deleted.". + format(role_id, user_id, project_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) role_ref = self.sc_ks_client.role_assignments.list( user=user_id, @@ -615,20 +942,173 @@ class IdentitySyncThread(SyncThread): else: LOG.error("Unable to delete Keystone role assignment {}:{} " .format(rsrc.id, role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + assignment_subcloud_rsrc.delete() - # ---- Override common audit functions ---- + def post_revoke_events(self, request, rsrc): + # Create token revoke event on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + revoke_event_dict = jsonutils.loads(request.orch_job.resource_info) + if 'token_revoke_event' in revoke_event_dict.keys(): + revoke_event_dict = revoke_event_dict['token_revoke_event'] + audit_id = revoke_event_dict.pop('audit_id', None) + if not audit_id: + LOG.error("Received token revocation event create request without " + "required subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the revoke event just created. The records + # is in JSON format. + revoke_event_records = self.m_dbs_client.revoke_event_manager.\ + revoke_event_detail(audit_id=audit_id) + if not revoke_event_records: + LOG.error("No data retrieved from master cloud for token" + " revocation event with audit_id {} to create its" + " equivalent in subcloud.".format(audit_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the revoke event on subcloud by pushing the DB records to + # subcloud + revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\ + add_revoke_event(revoke_event_records) + if not revoke_event_ref: + LOG.error("No token revocation event data returned when creating" + " token revocation event with audit_id {} in subcloud." + .format(audit_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + revoke_event_ref_id = revoke_event_ref.\ + get('revocation_event').get('audit_id') + subcloud_rsrc_id = self.\ + persist_db_subcloud_resource(rsrc.id, revoke_event_ref_id) + LOG.info("Created Keystone token revoke event {}:{}" + .format(rsrc.id, subcloud_rsrc_id), + extra=self.log_extra) + + def delete_revoke_events(self, request, rsrc): + # Delete token revocation event reference on this subcloud + revoke_event_subcloud_rsrc = self.get_db_subcloud_resource(rsrc.id) + if not revoke_event_subcloud_rsrc: + LOG.error("Unable to delete token revocation event reference {}, " + "cannot find equivalent Keystone token revocation event " + "in subcloud.".format(rsrc), extra=self.log_extra) + return + + # subcloud resource id is the audit_id + subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id + + try: + self.sc_dbs_client.revoke_event_manager.delete_revoke_event( + audit_id=subcloud_resource_id) + except dbsync_exceptions.NotFound: + LOG.info("Delete token revocation event: event {} not found in {}," + " considered as deleted.". + format(revoke_event_subcloud_rsrc.subcloud_resource_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + + # Master Resource can be deleted only when all subcloud resources + # are deleted along with corresponding orch_job and orch_requests. + LOG.info("Keystone token revocation event {}:{} [{}] deleted" + .format(rsrc.id, revoke_event_subcloud_rsrc.id, + revoke_event_subcloud_rsrc.subcloud_resource_id), + extra=self.log_extra) + revoke_event_subcloud_rsrc.delete() + + def post_revoke_events_for_user(self, request, rsrc): + # Create token revoke event on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + event_id = request.orch_job.source_resource_id + if not event_id: + LOG.error("Received token revocation event create request without " + "required subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the revoke event just created. The records + # is in JSON format. + revoke_event_records = self.m_dbs_client.revoke_event_manager.\ + revoke_event_detail(user_id=event_id) + if not revoke_event_records: + LOG.error("No data retrieved from master cloud for token" + " revocation event with event_id {} to create its" + " equivalent in subcloud.".format(event_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the revoke event on subcloud by pushing the DB records to + # subcloud + revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\ + add_revoke_event(revoke_event_records) + if not revoke_event_ref: + LOG.error("No token revocation event data returned when creating" + " token revocation event with event_id {} in subcloud." + .format(event_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + event_id) + LOG.info("Created Keystone token revoke event {}:{}" + .format(rsrc.id, subcloud_rsrc_id), + extra=self.log_extra) + + def delete_revoke_events_for_user(self, request, rsrc): + # Delete token revocation event reference on this subcloud + revoke_event_subcloud_rsrc = self.get_db_subcloud_resource(rsrc.id) + if not revoke_event_subcloud_rsrc: + LOG.error("Unable to delete token revocation event reference {}, " + "cannot find equivalent Keystone token revocation event " + "in subcloud.".format(rsrc), extra=self.log_extra) + return + + # subcloud resource id is _ encoded in base64 + subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id + + try: + self.sc_dbs_client.revoke_event_manager.delete_revoke_event( + user_id=subcloud_resource_id) + except dbsync_exceptions.NotFound: + LOG.info("Delete token revocation event: event {} not found in {}," + " considered as deleted.". + format(revoke_event_subcloud_rsrc.subcloud_resource_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + + # Master Resource can be deleted only when all subcloud resources + # are deleted along with corresponding orch_job and orch_requests. + LOG.info("Keystone token revocation event {}:{} [{}] deleted" + .format(rsrc.id, revoke_event_subcloud_rsrc.id, + revoke_event_subcloud_rsrc.subcloud_resource_id), + extra=self.log_extra) + revoke_event_subcloud_rsrc.delete() + + # ---- Override common audit functions ---- def _get_resource_audit_handler(self, resource_type, client): if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS: - return self._get_users_resource(client) + return self._get_users_resource(client.identity_manager) elif resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES: - return self._get_roles_resource(client) + return self._get_roles_resource(client.role_manager) elif resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECTS: - return self._get_projects_resource(client) + return self._get_projects_resource(client.project_manager) elif (resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): return self._get_assignments_resource(client) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._get_revoke_events_resource(client. + revoke_event_manager) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._get_revoke_events_for_user_resource( + client.revoke_event_manager) else: LOG.error("Wrong resource type {}".format(resource_type), extra=self.log_extra) @@ -636,36 +1116,53 @@ class IdentitySyncThread(SyncThread): def _get_users_resource(self, client): try: - users = client.users.list() - # NOTE (knasim-wrs): We need to filter out services users, - # as some of these users may be for optional services - # (such as Magnum, Murano etc) which will be picked up by - # the Sync Audit and created on subclouds, later when these - # optional services are enabled on the subcloud - services = client.services.list() + services = [] + filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_USERS] - filtered_users = [user for user in users if - (all(user.name != service.name for - service in services) and - all(user.name != filtered for - filtered in filtered_list))] + # Filter out services users and some predefined users. These users + # are not to be synced to the subcloud. + filtered_users = [] + # get users from DB API + if hasattr(client, 'list_users'): + users = client.list_users() + for user in users: + user_name = user.local_user.name + if all(user_name != service.name for service in services)\ + and all(user_name != filtered for filtered in + filtered_list): + filtered_users.append(user) + # get users from keystone API + else: + users = client.users.list() + for user in users: + user_name = user.name + if all(user_name != service.name for service in services)\ + and all(user_name != filtered for filtered in + filtered_list): + filtered_users.append(user) + return filtered_users except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("User Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_roles_resource(self, client): try: - roles = client.roles.list() + # get roles from DB API + if hasattr(client, 'list_roles'): + roles = client.list_roles() + # get roles from keystone API + else: + roles = client.roles.list() + # Filter out system roles filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_ROLES] @@ -675,19 +1172,24 @@ class IdentitySyncThread(SyncThread): filtered in filtered_list))] return filtered_roles except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Role Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_projects_resource(self, client): try: - projects = client.projects.list() + # get projects from DB API + if hasattr(client, 'list_projects'): + projects = client.list_projects() + # get roles from keystone API + else: + projects = client.projects.list() + # Filter out admin or services projects filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_PROJECTS] @@ -697,15 +1199,14 @@ class IdentitySyncThread(SyncThread): filtered in filtered_list)] return filtered_projects except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Project Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_assignments_resource(self, client): try: @@ -762,82 +1263,357 @@ class IdentitySyncThread(SyncThread): return refactored_assignments except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Assignment Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) + + def _get_revoke_events_resource(self, client): + try: + # get token revoke events from DB API + revoke_events = client.list_revoke_events() + # Events with audit_id are generated by openstack token + # revocation command. audit_id will be the unique id of + # the resource. + filtered_revoke_events = [event for event in revoke_events if + event.audit_id is not None] + return filtered_revoke_events + + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: + LOG.info("Token revoke events Audit: subcloud {} is not reachable" + " [{}]".format(self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # None will force skip of audit + return None + + def _get_revoke_events_for_user_resource(self, client): + try: + # get token revoke events from DB API + revoke_events = client.list_revoke_events() + # Events with user_id are generated when user password is changed. + # _ will be the unique id of + # the resource. + filtered_revoke_events = [event for event in revoke_events if + event.user_id is not None] + return filtered_revoke_events + + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: + LOG.info("Token revoke events Audit: subcloud {} is not reachable" + " [{}]".format(self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # None will force skip of audit return None def _same_identity_resource(self, m, sc): LOG.debug("master={}, subcloud={}".format(m, sc), extra=self.log_extra) - # Any Keystone resource can be system wide or domain scoped, - # If the domains are different then these resources - # are instantly unique since the same resource name can be - # mapped in different domains - return (m.name == sc.name and - m.domain_id == sc.domain_id) + # For user the comparison is DB records by DB records. + # The user DB records are from multiple tables, including user, + # local_user, and password tables. If any of them are not matched, + # it is considered not a same identity resource. + # Note that the user id is compared, since user id is to be synced + # to subcloud too. + same_user = (m.id == sc.id and + m.domain_id == sc.domain_id and + m.default_project_id == sc.default_project_id and + m.enabled == sc.enabled and + m.created_at == sc.created_at and + m.last_active_at == sc.last_active_at and + m.extra == sc.extra) + if not same_user: + return False + + same_local_user = (m.local_user.domain_id == + sc.local_user.domain_id and + m.local_user.name == sc.local_user.name and + # Foreign key to user.id + m.local_user.user_id == sc.local_user.user_id and + m.local_user.failed_auth_count == + sc.local_user.failed_auth_count and + m.local_user.failed_auth_at == + sc.local_user.failed_auth_at) + if not same_local_user: + return False + + result = False + if len(m.local_user.passwords) == len(sc.local_user.passwords): + for m_password in m.local_user.passwords: + for sc_password in sc.local_user.passwords: + if m_password.password_hash == sc_password.password_hash: + break + # m_password is not found in sc_passwords + else: + break + # All are found + else: + result = True + + return result + + def _has_same_identity_ids(self, m, sc): + # If (user name + domain name) or use id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.local_user.name == sc.local_user.name and + m.domain_id == sc.domain_id) or m.id == sc.id) + + def _same_project_resource(self, m, sc): + LOG.debug("master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For project the comparison is DB records by DB records. + # The project DB records are from project tables. If any of + # them are not matched, it is considered not the same. + # Note that the project id is compared, since project id is to + # be synced to subcloud too. + return (m.id == sc.id and + m.domain_id == sc.domain_id and + m.name == sc.name and + m.extra == sc.extra and + m.description == sc.description and + m.enabled == sc.enabled and + m.parent_id == sc.parent_id and + m.is_domain == sc.is_domain) + + def _has_same_project_ids(self, m, sc): + # If (project name + domain name) or project id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.name == sc.name and m.domain_id == sc.domain_id) + or m.id == sc.id) + + def _same_role_resource(self, m, sc): + LOG.debug("master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For role the comparison is DB records by DB records. + # The role DB records are from role tables. If any of + # them are not matched, it is considered not the same. + # Note that the role id is compared, since role id is to + # be synced to subcloud too. + return (m.id == sc.id and + m.domain_id == sc.domain_id and + m.name == sc.name and + m.extra == sc.extra) + + def _has_same_role_ids(self, m, sc): + # If (role name + domain name) or role id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.name == sc.name and m.domain_id == sc.domain_id) + or m.id == sc.id) def _same_assignment_resource(self, m, sc): LOG.debug("same_assignment master={}, subcloud={}".format(m, sc), extra=self.log_extra) # For an assignment to be the same, all 3 of its role, project and # user information must match up - is_same = (self._same_identity_resource(m.user, sc.user) and - self._same_identity_resource(m.role, sc.role) and - self._same_identity_resource(m.project, sc.project)) - return is_same + return((m.user.name == sc.user.name and + m.user.domain_id == sc.user.domain_id) and + (m.role.name == sc.role.name and + m.role.domain_id == sc.role.domain_id) and + (m.project.name == sc.project.name and + m.project.domain_id == sc.project.domain_id)) + + def _has_same_assignment_ids(self, m, sc): + # For assignment the triple(user, project, role) is the unique id. + # The two resources have same id only when all of them are identical. + return self._same_assignment_resource(m, sc) + + def _same_revoke_event_resource(self, m, sc): + LOG.debug("same_revoke_event master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For token revocation event the comparison is DB records by + # DB records. The DB records are from revocation_event tables. + # Token revocation events are considered the same when all columns + # match up. + return(m.domain_id == sc.domain_id and + m.project_id == sc.project_id and + m.user_id == sc.user_id and + m.role_id == sc.role_id and + m.trust_id == sc.trust_id and + m.consumer_id == sc.consumer_id and + m.access_token_id == sc.access_token_id and + m.issued_before == sc.issued_before and + m.expires_at == sc.expires_at and + m.revoked_at == sc.revoked_at and + m.audit_id == sc.audit_id and + m.audit_chain_id == sc.audit_chain_id) + + def _has_same_revoke_event_ids(self, m, sc): + # For token revoke events to have same ids, all columns must be + # match up. + return self._same_revoke_event_resource(m, sc) def get_master_resources(self, resource_type): - return self._get_resource_audit_handler(resource_type, - self.m_ks_client) + # Retrieve master resources from DB or through Keystone. + # users, projects, roles, and token revocation events use + # dbsync client, other resources use keystone client. + try: + if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECTS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + return self._get_resource_audit_handler(resource_type, + self.m_dbs_client) + return self._get_resource_audit_handler(resource_type, + self.m_ks_client) + except dbsync_exceptions.Unauthorized as e: + LOG.info("Get resource [{}] request failed for {}: {}." + .format(resource_type, consts.VIRTUAL_MASTER_CLOUD, + str(e)), extra=self.log_extra) + # In case of token expires, re-authenticate and retry once + self.reinitialize_m_clients() + return self._get_resource_audit_handler(resource_type, + self.m_dbs_client) + except Exception as e: + LOG.exception(e) + return None def get_subcloud_resources(self, resource_type): self.initialize_sc_clients() - return self._get_resource_audit_handler(resource_type, - self.sc_ks_client) + # Retrieve master resources from DB or through keystone. + # users, projects, roles, and token revocation events use + # dbsync client, other resources use keystone client. + try: + if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_PROJECTS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + return self._get_resource_audit_handler(resource_type, + self.sc_dbs_client) + return self._get_resource_audit_handler(resource_type, + self.sc_ks_client) + + except dbsync_exceptions.Unauthorized as e: + LOG.info("Get resource [{}] request failed for {}: {}." + .format(resource_type, + self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # In case of token expires, re-authenticate and retry once + self.reinitialize_sc_clients() + return self._get_resource_audit_handler(resource_type, + self.sc_dbs_client) + except Exception as e: + LOG.exception(e) + return None def same_resource(self, resource_type, m_resource, sc_resource): if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS): + return self._same_identity_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECTS): + return self._same_project_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_ROLES): + return self._same_role_resource(m_resource, sc_resource) + elif (resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): return self._same_assignment_resource(m_resource, sc_resource) - else: - return self._same_identity_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._same_revoke_event_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._same_revoke_event_resource(m_resource, sc_resource) + + def has_same_ids(self, resource_type, m_resource, sc_resource): + if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS): + return self._has_same_identity_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECTS): + return self._has_same_project_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_ROLES): + return self._has_same_role_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): + return self._has_same_assignment_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._has_same_revoke_event_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._has_same_revoke_event_ids(m_resource, sc_resource) def get_resource_id(self, resource_type, resource): if hasattr(resource, 'master_id'): # If resource from DB, return master resource id # from master cloud return resource.master_id - - # Else, it is OpenStack resource retrieved from master cloud + # For token revocation event, use audit_id if it presents. + if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS)\ + and resource.audit_id: + return resource.audit_id + # For user token revocation event, the id is + # _ then base64 encoded + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER)\ + and resource.user_id and resource.issued_before: + event_id = "{}_{}".format(resource.user_id, resource.issued_before) + return base64.urlsafe_b64encode(event_id) + # Default id field retrieved from master cloud return resource.id def get_resource_info(self, resource_type, resource, operation_type=None): rtype = consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS - if (operation_type == consts.OPERATION_TYPE_CREATE and - resource_type != rtype): + if ((operation_type == consts.OPERATION_TYPE_CREATE or + operation_type == consts.OPERATION_TYPE_POST or + operation_type == consts.OPERATION_TYPE_PUT) + and resource_type != rtype): # With the exception of role assignments, for all create # requests the resource_info needs to be extracted # from the master resource - return jsonutils.dumps(resource._info) + resource_info = resource.info() + return jsonutils.dumps(resource_info) else: super(IdentitySyncThread, self).get_resource_info( resource_type, resource, operation_type) def audit_discrepancy(self, resource_type, m_resource, sc_resources): - # It could be that the details are different - # between master cloud and subcloud now. - # Thus, delete the resource before creating it again. - self.schedule_work(self.endpoint_type, resource_type, - self.get_resource_id(resource_type, m_resource), - consts.OPERATION_TYPE_DELETE) - # Return true to try creating the resource again + # Check if the resource is indeed missing or its details are mismatched + # from master cloud. If missing, return True to create the resource. + # If mismatched, queue work to update this resource and return False. + mismatched_resource = False + for sc_r in sc_resources: + if self.has_same_ids(resource_type, m_resource, sc_r): + mismatched_resource = True + break + + if mismatched_resource: + LOG.info("Subcloud res {}:{} is found but diverse in details," + " will update".format(resource_type, sc_r.id), + extra=self.log_extra) + self.schedule_work(self.endpoint_type, resource_type, + self.get_resource_id(resource_type, m_resource), + consts.OPERATION_TYPE_PUT, + self.get_resource_info( + resource_type, sc_r, + consts.OPERATION_TYPE_PUT)) + return False + return True def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db, @@ -850,10 +1626,9 @@ class IdentitySyncThread(SyncThread): # manifest on the Subclouds and the Central Region should not try # to create these on the subclouds for sc_r in sc_resources: - if self.same_resource(resource_type, m_r, sc_r): - LOG.info( - "Mapping resource {} to existing subcloud resource {}" - .format(m_r, sc_r), extra=self.log_extra) + if self.has_same_ids(resource_type, m_r, sc_r): + LOG.info("Mapping resource {} to existing subcloud resource {}" + .format(m_r, sc_r), extra=self.log_extra) self.persist_db_subcloud_resource(m_rsrc_db.id, self.get_resource_id( resource_type, diff --git a/dcorch/engine/sync_services/sysinv.py b/dcorch/engine/sync_services/sysinv.py index 43628cc5e..d6c8fa345 100644 --- a/dcorch/engine/sync_services/sysinv.py +++ b/dcorch/engine/sync_services/sysinv.py @@ -799,7 +799,7 @@ class SysinvSyncThread(SyncThread): s_os_client = sdk.OpenStackDriver(self.region_name) try: - s_os_client.sysinv_client.create_fernet_repo( + s_os_client.sysinv_client.post_fernet_repo( FernetKeyManager.from_resource_info(resource_info)) # Ensure subcloud resource is persisted to the DB for later subcloud_rsrc_id = self.persist_db_subcloud_resource( @@ -831,7 +831,7 @@ class SysinvSyncThread(SyncThread): s_os_client = sdk.OpenStackDriver(self.region_name) try: - s_os_client.sysinv_client.update_fernet_repo( + s_os_client.sysinv_client.put_fernet_repo( FernetKeyManager.from_resource_info(resource_info)) # Ensure subcloud resource is persisted to the DB for later subcloud_rsrc_id = self.persist_db_subcloud_resource( @@ -1309,7 +1309,7 @@ class SysinvSyncThread(SyncThread): resource_type), extra=self.log_extra) - def audit_discrepancy(self, resource_type, m_resource, sc_resources): + def audit_discrepancy(self, resource_type, m_resource): # Return true to try the audit_action if resource_type in self.SYSINV_ADD_DELETE_RESOURCES: # It could be that the details are different @@ -1329,7 +1329,7 @@ class SysinvSyncThread(SyncThread): extra=self.log_extra) return False - def audit_action(self, resource_type, finding, resource): + def audit_action(self, resource_type, finding, resource, sc_source=None): if resource_type in self.SYSINV_MODIFY_RESOURCES: LOG.info("audit_action: {}/{}" .format(finding, resource_type), diff --git a/dcorch/engine/sync_thread.py b/dcorch/engine/sync_thread.py index c54ae4eca..5634fa39f 100644 --- a/dcorch/engine/sync_thread.py +++ b/dcorch/engine/sync_thread.py @@ -15,12 +15,10 @@ import threading +from oslo_config import cfg from oslo_log import log as logging -from keystoneauth1 import loading -from keystoneauth1 import session -from keystoneclient import client as keystoneclient - +from dcdbsync.dbsyncclient import client as dbsyncclient from dcmanager.common import consts as dcmanager_consts from dcmanager.rpc import client as dcmanager_rpc_client from dcorch.common import consts @@ -30,7 +28,11 @@ from dcorch.common import utils from dcorch.objects import orchrequest from dcorch.objects import resource from dcorch.objects import subcloud_resource -from oslo_config import cfg + +from keystoneauth1 import loading +from keystoneauth1 import session +from keystoneclient import client as keystoneclient + LOG = logging.getLogger(__name__) @@ -81,6 +83,7 @@ class SyncThread(object): self.sc_admin_session = None self.admin_session = None self.ks_client = None + self.dbs_client = None def start(self): if self.status == STATUS_NEW: @@ -124,9 +127,15 @@ class SyncThread(object): user_domain_name=cfg.CONF.cache.admin_user_domain_name) self.admin_session = session.Session( auth=auth, timeout=60, additional_headers=consts.USER_HEADER) + # keystone client self.ks_client = keystoneclient.Client( session=self.admin_session, region_name=consts.VIRTUAL_MASTER_CLOUD) + # dcdbsync client + self.dbs_client = dbsyncclient.Client( + endpoint_type=consts.DBS_ENDPOINT_INTERNAL, + session=self.admin_session, + region_name=consts.VIRTUAL_MASTER_CLOUD) def initialize_sc_clients(self): # base implementation of initializing the subcloud specific @@ -164,6 +173,10 @@ class SyncThread(object): auth=sc_auth, timeout=60, additional_headers=consts.USER_HEADER) + def initial_sync(self): + # Return True to indicate initial sync success + return True + def enable(self): # Called when DC manager thinks this subcloud is good to go. self.initialize() @@ -530,6 +543,7 @@ class SyncThread(object): extra=self.log_extra) # Subcloud resource is present in DB, but the check # for same_resource() was negative. Either the resource + # disappeared from subcloud or the resource details # are different from that of master cloud. Let the # resource implementation decide on the audit action. @@ -556,6 +570,7 @@ class SyncThread(object): # Resource is missing from subcloud, take action num_of_audit_jobs += self.audit_action( resource_type, AUDIT_RESOURCE_MISSING, m_r) + # As the subcloud resource is missing, invoke # the hook for dependants with no subcloud resource. # Resource implementation should handle this. @@ -667,6 +682,9 @@ class SyncThread(object): def same_resource(self, resource_type, m_resource, sc_resource): return True + def has_same_ids(self, resource_type, m_resource, sc_resource): + return False + def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db, sc_resources): # Child classes can override this function to map an existing subcloud