diff --git a/dcmanager/api/controllers/v1/subclouds.py b/dcmanager/api/controllers/v1/subclouds.py index 668ce7a49..28d4ab1cf 100644 --- a/dcmanager/api/controllers/v1/subclouds.py +++ b/dcmanager/api/controllers/v1/subclouds.py @@ -381,7 +381,9 @@ class SubcloudsController(object): ('vim', 'vim'), ('mtce', 'mtce'), ('fm', 'fm'), - ('barbican', 'barbican') + ('barbican', 'barbican'), + ('smapi', 'smapi'), + ('dcdbsync', 'dcdbsync') ] user_list = list() diff --git a/dcmanager/manager/subcloud_manager.py b/dcmanager/manager/subcloud_manager.py index ff3369af0..7b5217655 100644 --- a/dcmanager/manager/subcloud_manager.py +++ b/dcmanager/manager/subcloud_manager.py @@ -310,6 +310,7 @@ class SubcloudManager(manager.Manager): # Get the subcloud details from the database subcloud = db_api.subcloud_get(context, subcloud_id) + original_management_state = subcloud.management_state # Semantic checking if management_state: @@ -357,7 +358,14 @@ class SubcloudManager(manager.Manager): except Exception as e: LOG.exception(e) LOG.warn('Problem informing dcorch of subcloud ' - 'state change, subcloud: %s' % subcloud.name) + 'state change, resume to original state, subcloud: %s' + % subcloud.name) + management_state = original_management_state + subcloud = \ + db_api.subcloud_update(context, subcloud_id, + management_state=management_state, + description=description, + location=location) if management_state == consts.MANAGEMENT_UNMANAGED: diff --git a/dcorch/api/proxy/apps/controller.py b/dcorch/api/proxy/apps/controller.py index 673a1786c..e57fb1521 100644 --- a/dcorch/api/proxy/apps/controller.py +++ b/dcorch/api/proxy/apps/controller.py @@ -343,6 +343,7 @@ class ComputeAPIController(APIController): resource_tag = self._get_resource_tag_from_header(request_header, operation_type, resource_type) + handler = self._resource_handler[resource_tag] operation_type, resource_id, resource_info = handler( environ=environ, @@ -454,7 +455,7 @@ class IdentityAPIController(APIController): return response def _generate_assignment_rid(self, url, environ): - resource_id = '' + resource_id = None # for role assignment or revocation, the URL is of format: # /v3/projects/{project_id}/users/{user_id}/roles/{role_id} # We need to extract all ID parameters from the URL @@ -468,6 +469,23 @@ class IdentityAPIController(APIController): resource_id = "{}_{}_{}".format(proj_id, user_id, role_id) return resource_id + def _retrieve_token_revoke_event_rid(self, url, environ): + resource_id = None + # for token revocation event, we need to retrieve the audit_id + # from the token being revoked. + revoked_token = environ.get('HTTP_X_SUBJECT_TOKEN', None) + + if not revoked_token: + LOG.error("Malformed Token Revocation URL: %s", url) + else: + try: + resource_id = proxy_utils.\ + retrieve_token_audit_id(revoked_token) + except Exception as e: + LOG.error("Failed to retrieve token audit id: %s" % e) + + return resource_id + def _enqueue_work(self, environ, request_body, response): LOG.info("enqueue_work") resource_info = {} @@ -482,6 +500,26 @@ class IdentityAPIController(APIController): consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): resource_id = self._generate_assignment_rid(request_header, environ) + # grant a role to a user (PUT) creates a project role assignment + if operation_type == consts.OPERATION_TYPE_PUT: + operation_type = consts.OPERATION_TYPE_POST + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + resource_id = self._retrieve_token_revoke_event_rid(request_header, + environ) + # delete (revoke) a token (DELETE) creates a token revoke event. + if operation_type == consts.OPERATION_TYPE_DELETE and resource_id: + operation_type = consts.OPERATION_TYPE_POST + resource_info = {'token_revoke_event': + {'audit_id': resource_id}} + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS_PASSWORD): + resource_id = self.get_resource_id_from_link(request_header. + strip('/password')) + # user change password (POST) is an update to the user + if operation_type == consts.OPERATION_TYPE_POST: + operation_type = consts.OPERATION_TYPE_PATCH + resource_type = consts.RESOURCE_TYPE_IDENTITY_USERS else: if operation_type == consts.OPERATION_TYPE_POST: # Retrieve the ID from the response @@ -490,20 +528,25 @@ class IdentityAPIController(APIController): else: resource_id = self.get_resource_id_from_link(request_header) - if (operation_type != consts.OPERATION_TYPE_DELETE and request_body): + if (operation_type != consts.OPERATION_TYPE_DELETE and + request_body and (not resource_info)): resource_info = json.loads(request_body) LOG.info("%s: Resource id: (%s), type: (%s), info: (%s)", operation_type, resource_id, resource_type, resource_info) - try: - utils.enqueue_work(self.ctxt, - self.ENDPOINT_TYPE, - resource_type, - resource_id, - operation_type, - json.dumps(resource_info)) - except exception.ResourceNotFound as e: - raise webob.exc.HTTPNotFound(explanation=e.format_message()) + + if resource_id: + try: + utils.enqueue_work(self.ctxt, + self.ENDPOINT_TYPE, + resource_type, + resource_id, + operation_type, + json.dumps(resource_info)) + except exception.ResourceNotFound as e: + raise webob.exc.HTTPNotFound(explanation=e.format_message()) + else: + LOG.warning("Empty resource id for resource: %s", operation_type) class CinderAPIController(APIController): diff --git a/dcorch/api/proxy/common/constants.py b/dcorch/api/proxy/common/constants.py index 0d4e33c8d..e1373f7c7 100755 --- a/dcorch/api/proxy/common/constants.py +++ b/dcorch/api/proxy/common/constants.py @@ -297,7 +297,10 @@ IDENTITY_PROJECTS_PATH = [ IDENTITY_PROJECTS_ROLE_PATH = [ '/v3/projects/{project_id}/users/{user_id}/roles/{role_id}', +] +IDENTITY_TOKEN_REVOKE_EVENTS_PATH = [ + '/v3/auth/tokens', ] IDENTITY_PATH_MAP = { @@ -306,7 +309,9 @@ IDENTITY_PATH_MAP = { consts.RESOURCE_TYPE_IDENTITY_ROLES: IDENTITY_ROLES_PATH, consts.RESOURCE_TYPE_IDENTITY_PROJECTS: IDENTITY_PROJECTS_PATH, consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: - IDENTITY_PROJECTS_ROLE_PATH + IDENTITY_PROJECTS_ROLE_PATH, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + IDENTITY_TOKEN_REVOKE_EVENTS_PATH, } ROUTE_METHOD_MAP = { @@ -362,7 +367,9 @@ ROUTE_METHOD_MAP = { consts.RESOURCE_TYPE_IDENTITY_PROJECTS: ['POST', 'PATCH', 'DELETE'], consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: - ['PUT', 'DELETE'] + ['PUT', 'DELETE'], + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + ['DELETE'] } } diff --git a/dcorch/api/proxy/common/utils.py b/dcorch/api/proxy/common/utils.py index a457cd1a8..33c267728 100644 --- a/dcorch/api/proxy/common/utils.py +++ b/dcorch/api/proxy/common/utils.py @@ -13,10 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from dcorch.common import consts -from oslo_log import log as logging +import base64 +from cryptography import fernet +import msgpack +import six from six.moves.urllib.parse import urlparse +from keystoneauth1 import exceptions as keystone_exceptions +from oslo_log import log as logging + +from dcorch.common import consts +from dcorch.drivers.openstack import sdk_platform as sdk + LOG = logging.getLogger(__name__) @@ -108,3 +116,84 @@ def set_request_forward_environ(req, remote_host, remote_port): if ('REMOTE_ADDR' in req.environ and 'HTTP_X_FORWARDED_FOR' not in req.environ): req.environ['HTTP_X_FORWARDED_FOR'] = req.environ['REMOTE_ADDR'] + + +def _get_fernet_keys(): + """Get fernet keys from sysinv.""" + os_client = sdk.OpenStackDriver(consts.CLOUD_0) + try: + key_list = os_client.sysinv_client.get_fernet_keys() + return [str(getattr(key, 'key')) for key in key_list] + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure) as e: + LOG.info("get_fernet_keys: cloud {} is not reachable [{}]" + .format(consts.CLOUD_0, str(e))) + os_client.delete_region_clients(consts.CLOUD_0) + return None + except (AttributeError, TypeError) as e: + LOG.info("get_fernet_keys error {}".format(e)) + os_client.delete_region_clients(consts.CLOUD_0, clear_token=True) + return None + except Exception as e: + LOG.exception(e) + return None + + +def _restore_padding(token): + """Restore padding based on token size. + + :param token: token to restore padding on + :returns: token with correct padding + """ + + # Re-inflate the padding + mod_returned = len(token) % 4 + if mod_returned: + missing_padding = 4 - mod_returned + token += b'=' * missing_padding + return token + + +def _unpack_token(fernet_token, fernet_keys): + """Attempt to unpack a token using the supplied Fernet keys. + + :param fernet_token: token to unpack + :type fernet_token: string + :param fernet_keys: a list consisting of keys in the repository + :type fernet_keys: list + :returns: the token payload + """ + + # create a list of fernet instances + fernet_instances = [fernet.Fernet(key) for key in fernet_keys] + # create a encryption/decryption object from the fernet keys + crypt = fernet.MultiFernet(fernet_instances) + + # attempt to decode the token + token = _restore_padding(six.binary_type(fernet_token)) + serialized_payload = crypt.decrypt(token) + payload = msgpack.unpackb(serialized_payload) + + # present token values + return payload + + +def retrieve_token_audit_id(fernet_token): + """Attempt to retrieve the audit id from the fernet token. + + :param fernet_token: + :param keys_repository: + :return: audit id in base64 encoded (without paddings) + """ + + audit_id = None + fernet_keys = _get_fernet_keys() + LOG.info("fernet_keys: {}".format(fernet_keys)) + + if fernet_keys: + unpacked_token = _unpack_token(fernet_token, fernet_keys) + if unpacked_token: + audit_id = unpacked_token[-1][0] + audit_id = base64.urlsafe_b64encode(audit_id).rstrip('=') + + return audit_id diff --git a/dcorch/common/consts.py b/dcorch/common/consts.py index 1b0a36052..53bb11b43 100644 --- a/dcorch/common/consts.py +++ b/dcorch/common/consts.py @@ -122,6 +122,8 @@ RESOURCE_TYPE_IDENTITY_USERS_PASSWORD = "users_password" RESOURCE_TYPE_IDENTITY_ROLES = "roles" RESOURCE_TYPE_IDENTITY_PROJECTS = "projects" RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS = "project_role_assignments" +RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS = "revoke_events" +RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER = "revoke_events_for_user" KEYPAIR_ID_DELIM = "/" @@ -148,6 +150,10 @@ ENDPOINT_QUOTA_MAPPING = { KS_ENDPOINT_INTERNAL = "internal" KS_ENDPOINT_DEFAULT = KS_ENDPOINT_INTERNAL +# DB sync agent endpoint +DBS_ENDPOINT_INTERNAL = "internal" +DBS_ENDPOINT_DEFAULT = DBS_ENDPOINT_INTERNAL + # Do we need separate patch/put operations or could we just use # create/update/delete and have the sync code know which HTTP # operation to use? diff --git a/dcorch/common/exceptions.py b/dcorch/common/exceptions.py index 73f66b9b5..6d5ba1f9a 100644 --- a/dcorch/common/exceptions.py +++ b/dcorch/common/exceptions.py @@ -170,6 +170,10 @@ class SubcloudNotFound(NotFound): message = _("Subcloud %(region_name)s not found") +class ThreadNotFound(NotFound): + message = _("Thread %(thread_name)s of %(region_name)s not found") + + class OrchJobNotFound(NotFound): message = _("OrchJob %(orch_job)s not found") diff --git a/dcorch/drivers/openstack/sysinv_v1.py b/dcorch/drivers/openstack/sysinv_v1.py index 2251019e6..01500b6e6 100644 --- a/dcorch/drivers/openstack/sysinv_v1.py +++ b/dcorch/drivers/openstack/sysinv_v1.py @@ -74,6 +74,10 @@ class SysinvClient(base.DriverBase): token = session.get_token() client = cgts_client.Client( api_version, + username=session.auth._username, + password=session.auth._password, + tenant_name=session.auth._project_name, + auth_url=session.auth.auth_url, endpoint=endpoint, token=token) except exceptions.ServiceUnavailable: @@ -710,7 +714,7 @@ class SysinvClient(base.DriverBase): return iuser - def create_fernet_repo(self, key_list): + def post_fernet_repo(self, key_list=None): """Add the fernet keys for this region :param: key list payload @@ -721,26 +725,26 @@ class SysinvClient(base.DriverBase): # [{"id": 0, "key": "GgDAOfmyr19u0hXdm5r_zMgaMLjglVFpp5qn_N4GBJQ="}, # {"id": 1, "key": "7WfL_z54p67gWAkOmQhLA9P0ZygsbbJcKgff0uh28O8="}, # {"id": 2, "key": ""5gsUQeOZ2FzZP58DN32u8pRKRgAludrjmrZFJSOHOw0="}] - LOG.info("create_fernet_repo driver region={} " + LOG.info("post_fernet_repo driver region={} " "fernet_repo_list={}".format(self.region_name, key_list)) try: self.client.fernet.create(key_list) except Exception as e: - LOG.error("create_fernet_repo exception={}".format(e)) + LOG.error("post_fernet_repo exception={}".format(e)) raise exceptions.SyncRequestFailedRetry() - def update_fernet_repo(self, key_list): + def put_fernet_repo(self, key_list): """Update the fernet keys for this region :param: key list payload :return: Nothing """ - LOG.info("update_fernet_repo driver region={} " + LOG.info("put_fernet_repo driver region={} " "fernet_repo_list={}".format(self.region_name, key_list)) try: self.client.fernet.put(key_list) except Exception as e: - LOG.error("update_fernet_repo exception={}".format(e)) + LOG.error("put_fernet_repo exception={}".format(e)) raise exceptions.SyncRequestFailedRetry() def get_fernet_keys(self): diff --git a/dcorch/engine/fernet_key_manager.py b/dcorch/engine/fernet_key_manager.py index a35ebdc5d..ccf1651b8 100644 --- a/dcorch/engine/fernet_key_manager.py +++ b/dcorch/engine/fernet_key_manager.py @@ -27,7 +27,6 @@ from dcorch.common.i18n import _ from dcorch.common import manager from dcorch.common import utils from dcorch.drivers.openstack import sdk_platform as sdk -from dcorch.objects import subcloud as subcloud_obj FERNET_REPO_MASTER_ID = "keys" @@ -117,9 +116,26 @@ class FernetKeyManager(manager.Manager): self._schedule_work(consts.OPERATION_TYPE_PUT) def distribute_keys(self, ctxt, subcloud_name): - subclouds = subcloud_obj.SubcloudList.get_all(ctxt) - for sc in subclouds: - if sc.region_name == subcloud_name: - subcloud = sc - self._schedule_work(consts.OPERATION_TYPE_CREATE, subcloud) - break + keys = self._get_master_keys() + if not keys: + LOG.info(_("No fernet keys returned from %s") % consts.CLOUD_0) + return + resource_info = FernetKeyManager.to_resource_info(keys) + key_list = FernetKeyManager.from_resource_info(resource_info) + self.update_fernet_repo(subcloud_name, key_list) + + def reset_keys(self, subcloud_name): + self.update_fernet_repo(subcloud_name) + + @staticmethod + def update_fernet_repo(subcloud_name, key_list=None): + try: + os_client = sdk.OpenStackDriver(subcloud_name) + os_client.sysinv_client.post_fernet_repo(key_list) + except (exceptions.ConnectionRefused, exceptions.NotAuthorized, + exceptions.TimeOut): + LOG.info(_("Update the fernet repo on %s timeout") % + subcloud_name) + except Exception as e: + error_msg = "subcloud: {}, {}".format(subcloud_name, e.message) + LOG.info(_("Fail to update fernet repo %s") % error_msg) diff --git a/dcorch/engine/generic_sync_manager.py b/dcorch/engine/generic_sync_manager.py index 9de9226a7..d08435451 100644 --- a/dcorch/engine/generic_sync_manager.py +++ b/dcorch/engine/generic_sync_manager.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from oslo_log import log as logging from dcorch.common import exceptions @@ -85,6 +84,14 @@ class GenericSyncManager(object): except KeyError: raise exceptions.SubcloudNotFound(region_name=subcloud_name) + def initial_sync(self, context, subcloud_name): + try: + subcloud_engine = self.subcloud_engines[subcloud_name] + LOG.info('Initial sync subcloud %(sc)s' % {'sc': subcloud_name}) + subcloud_engine.initial_sync() + except KeyError: + raise exceptions.SubcloudNotFound(region_name=subcloud_name) + def run_sync_audit(self): for subcloud_engine in self.subcloud_engines.values(): subcloud_engine.run_sync_audit() diff --git a/dcorch/engine/service.py b/dcorch/engine/service.py index bd1288aef..c5399d5ff 100644 --- a/dcorch/engine/service.py +++ b/dcorch/engine/service.py @@ -191,11 +191,26 @@ class EngineService(service.Service): # keep equivalent functionality for now if (management_state == dcm_consts.MANAGEMENT_MANAGED) and \ (availability_status == dcm_consts.AVAILABILITY_ONLINE): - self.fkm.distribute_keys(ctxt, subcloud_name) - self.aam.enable_snmp(ctxt, subcloud_name) - self.gsm.enable_subcloud(ctxt, subcloud_name) + # Initial identity sync. It's synchronous so that identity + # get synced before fernet token keys are synced. This is + # necessary since we want to revoke all existing tokens on + # this subcloud after its services user IDs and project + # IDs are changed. Otherwise subcloud services will fail + # authentication since they keep on using their existing tokens + # issued before these IDs change, until these tokens expires. + try: + self.gsm.initial_sync(ctxt, subcloud_name) + self.fkm.distribute_keys(ctxt, subcloud_name) + self.aam.enable_snmp(ctxt, subcloud_name) + self.gsm.enable_subcloud(ctxt, subcloud_name) + except Exception as ex: + LOG.warning('Update subcloud state failed for %s: %s', + subcloud_name, six.text_type(ex)) + raise else: self.gsm.disable_subcloud(ctxt, subcloud_name) + if (management_state == dcm_consts.MANAGEMENT_UNMANAGED): + self.fkm.reset_keys(subcloud_name) @request_context # todo: add authentication since ctxt not actually needed later diff --git a/dcorch/engine/subcloud.py b/dcorch/engine/subcloud.py index 28c6d4611..5fa5b5fa0 100644 --- a/dcorch/engine/subcloud.py +++ b/dcorch/engine/subcloud.py @@ -106,6 +106,11 @@ class SubCloudEngine(object): self.shutdown() self.subcloud.delete() + def initial_sync(self): + # initial synchronization of the subcloud + for thread in self.sync_threads: + thread.initial_sync() + def run_sync_audit(self): # run periodic sync audit on all threads in this subcloud if self.is_enabled(): diff --git a/dcorch/engine/sync_services/identity.py b/dcorch/engine/sync_services/identity.py index 6658b7eb7..6a6a45d76 100644 --- a/dcorch/engine/sync_services/identity.py +++ b/dcorch/engine/sync_services/identity.py @@ -13,18 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import keyring +import base64 from collections import namedtuple + +from dcdbsync.dbsyncclient import client as dbsyncclient +from dcdbsync.dbsyncclient import exceptions as dbsync_exceptions +from dcorch.common import consts +from dcorch.common import exceptions +from dcorch.engine.sync_thread import SyncThread + from keystoneauth1 import exceptions as keystone_exceptions from keystoneclient import client as keystoneclient from oslo_log import log as logging from oslo_serialization import jsonutils -from dcorch.common import consts -from dcorch.common import exceptions -from dcorch.engine.sync_thread import SyncThread LOG = logging.getLogger(__name__) @@ -46,41 +50,63 @@ class IdentitySyncThread(SyncThread): self.sync_identity_resource, consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS: self.sync_identity_resource, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS: + self.sync_identity_resource, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + self.sync_identity_resource, } # Since services may use unscoped tokens, it is essential to ensure # that users are replicated prior to assignment data (roles/projects) self.audit_resources = [ consts.RESOURCE_TYPE_IDENTITY_USERS, - consts.RESOURCE_TYPE_IDENTITY_ROLES, consts.RESOURCE_TYPE_IDENTITY_PROJECTS, - consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS + consts.RESOURCE_TYPE_IDENTITY_ROLES, + consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS, + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER ] # For all the resource types, we need to filter out certain # resources self.filtered_audit_resources = { consts.RESOURCE_TYPE_IDENTITY_USERS: - ['admin', 'mtce', 'heat_admin', - 'cinder' + self.subcloud_engine.subcloud.region_name], + ['dcdbsync', 'dcorch', 'dcmanager', 'heat_admin', 'smapi', + 'fm', 'cinder' + self.subcloud_engine.subcloud.region_name], consts.RESOURCE_TYPE_IDENTITY_ROLES: ['heat_stack_owner', 'heat_stack_user', 'ResellerAdmin'], consts.RESOURCE_TYPE_IDENTITY_PROJECTS: - ['admin', 'services'] + [] } self.log_extra = {"instance": "{}/{}: ".format( self.subcloud_engine.subcloud.region_name, self.endpoint_type)} self.sc_ks_client = None + self.sc_dbs_client = None self.initialize() LOG.info("IdentitySyncThread initialized", extra=self.log_extra) def initialize_sc_clients(self): super(IdentitySyncThread, self).initialize_sc_clients() + # create a keystone client for the subcloud if (not self.sc_ks_client and self.sc_admin_session): self.sc_ks_client = keystoneclient.Client( session=self.sc_admin_session, endpoint_type=consts.KS_ENDPOINT_INTERNAL, region_name=self.subcloud_engine.subcloud.region_name) + # create a dbsync client for the subcloud + if (not self.sc_dbs_client and self.sc_admin_session): + self.sc_dbs_client = dbsyncclient.Client( + session=self.sc_admin_session, + endpoint_type=consts.DBS_ENDPOINT_INTERNAL, + region_name=self.subcloud_engine.subcloud.region_name) + + def reinitialize_m_clients(self): + if self.m_dbs_client and self.admin_session: + self.m_dbs_client.update(session=self.admin_session) + + def reinitialize_sc_clients(self): + if self.sc_dbs_client and self.sc_admin_session: + self.sc_dbs_client.update(session=self.sc_admin_session) def initialize(self): # Subcloud may be enabled a while after being added. @@ -93,9 +119,152 @@ class IdentitySyncThread(SyncThread): # subcloud specific version self.m_ks_client = self.ks_client + # We initialize a master version of the dbsync client, and a + # subcloud specific version + self.m_dbs_client = self.dbs_client + LOG.info("Identity session and clients initialized", extra=self.log_extra) + def _initial_sync_users(self, m_users, sc_users): + # Particularly sync users with same name but different ID + m_client = self.m_dbs_client.identity_manager + sc_client = self.sc_dbs_client.identity_manager + + for m_user in m_users: + for sc_user in sc_users: + if (m_user.local_user.name == sc_user.local_user.name and + m_user.domain_id == sc_user.domain_id and + m_user.id != sc_user.id): + user_records = m_client.user_detail(m_user.id) + if not user_records: + LOG.error("No data retrieved from master cloud for" + " user {} to update its equivalent in" + " subcloud.".format(m_user.id)) + raise exceptions.SyncRequestFailed + # update the user by pushing down the DB records to + # subcloud + try: + user_ref = sc_client.update_user(sc_user.id, + user_records) + # Retry once if unauthorized + except dbsync_exceptions.Unauthorized as e: + LOG.info("Update user {} request failed for {}: {}." + .format(sc_user.id, + self.subcloud_engine.subcloud. + region_name, str(e))) + self.reinitialize_sc_clients() + user_ref = sc_client.update_user(sc_user.id, + user_records) + + if not user_ref: + LOG.error("No user data returned when updating user {}" + " in subcloud.".format(sc_user.id)) + raise exceptions.SyncRequestFailed + # If admin user get synced, the client need to + # re-authenticate. + if sc_user.local_user.name == "admin": + self.reinitialize_sc_clients() + + def _initial_sync_projects(self, m_projects, sc_projects): + # Particularly sync projects with same name but different ID. + m_client = self.m_dbs_client.project_manager + sc_client = self.sc_dbs_client.project_manager + + for m_project in m_projects: + for sc_project in sc_projects: + if (m_project.name == sc_project.name and + m_project.domain_id == sc_project.domain_id and + m_project.id != sc_project.id): + project_records = m_client.project_detail(m_project.id) + if not project_records: + LOG.error("No data retrieved from master cloud for" + " project {} to update its equivalent in" + " subcloud.".format(m_project.id)) + raise exceptions.SyncRequestFailed + # update the project by pushing down the DB records to + # subcloud + try: + project_ref = sc_client.update_project(sc_project.id, + project_records) + # Retry once if unauthorized + except dbsync_exceptions.Unauthorized as e: + LOG.info("Update project {} request failed for {}: {}." + .format(sc_project.id, + self.subcloud_engine.subcloud. + region_name, str(e))) + self.reinitialize_sc_clients() + project_ref = sc_client.update_project(sc_project.id, + project_records) + + if not project_ref: + LOG.error("No project data returned when updating" + " project {} in subcloud.". + format(sc_project.id)) + raise exceptions.SyncRequestFailed + # If admin project get synced, the client need to + # re-authenticate. + if sc_project.name == "admin": + self.reinitialize_sc_clients() + + def initial_sync(self): + # Service users and projects are created at deployment time. They exist + # before dcorch starts to audit resources. Later on when dcorch audits + # and sync them over(including their IDs) to the subcloud, running + # services at the subcloud with tokens issued before their ID are + # changed will get user/project not found error since their IDs are + # changed. This will continue until their tokens expire in up to + # 1 hour. Before that these services basically stop working. + # By an initial synchronization on existing users/projects, + # synchronously followed by a fernet keys synchronization, existing + # tokens at subcloud are revoked and services are forced to + # re-authenticate to get new tokens. This significantly decreases + # service recovery time at subcloud. + self.initialize_sc_clients() + + # get users from master cloud + m_users = self.get_master_resources( + consts.RESOURCE_TYPE_IDENTITY_USERS) + + if not m_users: + LOG.error("No users returned from {}". + format(consts.VIRTUAL_MASTER_CLOUD)) + raise exceptions.SyncRequestFailed + + # get users from the subcloud + sc_users = self.get_subcloud_resources( + consts.RESOURCE_TYPE_IDENTITY_USERS) + + if not sc_users: + LOG.error("No users returned from subcloud {}". + format(self.subcloud_engine.subcloud.region_name)) + raise exceptions.SyncRequestFailed + + self._initial_sync_users(m_users, sc_users) + + # get projects from master cloud + m_projects = self.get_master_resources( + consts.RESOURCE_TYPE_IDENTITY_PROJECTS) + + if not m_projects: + LOG.error("No projects returned from {}". + format(consts.VIRTUAL_MASTER_CLOUD)) + raise exceptions.SyncRequestFailed + + # get projects from the subcloud + sc_projects = self.get_subcloud_resources( + consts.RESOURCE_TYPE_IDENTITY_PROJECTS) + + if not sc_projects: + LOG.error("No projects returned from subcloud {}". + format(self.subcloud_engine.subcloud.region_name)) + raise exceptions.SyncRequestFailed + + self._initial_sync_projects(m_projects, sc_projects) + + # Return True if no exceptions + return True + def sync_identity_resource(self, request, rsrc): self.initialize_sc_clients() # Invoke function with name format "operationtype_resourcetype" @@ -106,28 +275,31 @@ class IdentitySyncThread(SyncThread): # We therefore recognize those triggers and convert them to # POST operations operation_type = request.orch_job.operation_type - rtype_role_assignments = \ - consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS if operation_type == consts.OPERATION_TYPE_CREATE: - if (rsrc.resource_type == rtype_role_assignments): - operation_type = consts.OPERATION_TYPE_PUT - else: - operation_type = consts.OPERATION_TYPE_POST + operation_type = consts.OPERATION_TYPE_POST - func_name = operation_type + \ - "_" + rsrc.resource_type + func_name = operation_type + "_" + rsrc.resource_type getattr(self, func_name)(request, rsrc) except AttributeError: LOG.error("{} not implemented for {}" - .format(operation_type, + .format(request.orch_job.operation_type, rsrc.resource_type)) raise exceptions.SyncRequestFailed except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.error("sync_identity_resource: {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) raise exceptions.SyncRequestTimeout + except dbsync_exceptions.Unauthorized as e: + LOG.info("Request [{}] failed for {}: {}" + .format(request.orch_job.operation_type, + self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + self.reinitialize_sc_clients() + raise exceptions.SyncRequestFailedRetry except exceptions.SyncRequestFailed: raise except Exception as e: @@ -136,96 +308,90 @@ class IdentitySyncThread(SyncThread): def post_users(self, request, rsrc): # Create this user on this subcloud - user_dict = jsonutils.loads(request.orch_job.resource_info) - if 'user' in user_dict.keys(): - user_dict = user_dict['user'] - - # (NOTE: knasim-wrs): If the user create request contains - # "default_project_id" or "domain_id" then we need to remove - # both these fields, since it is highly unlikely that these - # IDs would exist on the subcloud, i.e. the ID for the "services" - # project on subcloud-X will be different to the ID for the - # project on Central Region. - # These fields are optional anyways since a subsequent role - # assignment will give the same scoping - # - # If these do need to be synced in the future then - # procure the project / domain list for this subcloud first - # and use IDs from that. - user_dict.pop('default_project_id', None) - user_dict.pop('domain_id', None) - username = user_dict.pop('name', None) # compulsory - if not username: + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + user_id = request.orch_job.source_resource_id + if not user_id: LOG.error("Received user create request without required " - "'name' field", extra=self.log_extra) + "'source_resource_id' field", extra=self.log_extra) raise exceptions.SyncRequestFailed - password = user_dict.pop('password', None) # compulsory - if not password: - # this user creation request may have been generated - # from the Identity Audit, in which case this password - # would not be present in the resource info. We will - # attempt to retrieve it from Keyring, failing which - # we cannot proceed. + # Retrieve DB records of the user just created. The records is in JSON + # format + user_records = self.m_dbs_client.identity_manager.user_detail(user_id) + if not user_records: + LOG.error("No data retrieved from master cloud for user {} to" + " create its equivalent in subcloud.".format(user_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - # TODO(knasim-wrs): Set Service as constant - password = keyring.get_password('CGCS', username) - if not password: - LOG.error("Received user create request without required " - "'password' field and cannot retrieve from " - "Keyring either", extra=self.log_extra) - raise exceptions.SyncRequestFailed - - # Create the user in the subcloud - user_ref = self.sc_ks_client.users.create( - name=username, - domain=user_dict.pop('domain', None), - password=password, - email=user_dict.pop('email', None), - description=user_dict.pop('description', None), - enabled=user_dict.pop('enabled', True), - project=user_dict.pop('project', None), - default_project=user_dict.pop('default_project', None)) - - user_ref_id = user_ref.id + # Create the user on subcloud by pushing the DB records to subcloud + user_ref = self.sc_dbs_client.identity_manager.add_user(user_records) + if not user_ref: + LOG.error("No user data returned when creating user {} in" + " subcloud.".format(user_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + user_ref_id = user_ref.get('user').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, user_ref_id) + username = user_ref.get('local_user').get('name') LOG.info("Created Keystone user {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, username), extra=self.log_extra) - def post_users_password(self, request, rsrc): - # Update this user's password on this subcloud + def put_users(self, request, rsrc): + # Update this user on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + user_id = request.orch_job.source_resource_id + if not user_id: + LOG.error("Received user update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + user_dict = jsonutils.loads(request.orch_job.resource_info) - oldpw = user_dict.pop('original_password', None) - newpw = user_dict.pop('password', None) - if (not oldpw or not newpw): - LOG.error("Received users password change request without " - "required original password or new password field", + if 'user' in user_dict.keys(): + user_dict = user_dict['user'] + + sc_user_id = user_dict.pop('id', None) + if not sc_user_id: + LOG.error("Received user update request without required " + "subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the user. The records is in JSON + # format + user_records = self.m_dbs_client.identity_manager.user_detail(user_id) + if not user_records: + LOG.error("No data retrieved from master cloud for user {} to" + " update its equivalent in subcloud.".format(user_id), extra=self.log_extra) raise exceptions.SyncRequestFailed - # NOTE (knasim-wrs): We can only update the password of the ADMIN - # user, that is the one used to establish this subcloud session, - # since the default behavior within the keystone client is to - # take the user_id from within the client context (client.user_id) - - # user_id for this resource was passed in via URL and extracted - # into the resource_id - if (self.sc_ks_client.user_id == rsrc.id): - self.sc_ks_client.users.update_password(oldpw, newpw) - LOG.info("Updated password for user {}".format(rsrc.id), - extra=self.log_extra) - - else: - LOG.error("User {} requested a modification to its password. " - "Can only self-modify for user {}. Consider updating " - "the password for {} using the Admin user" - .format(rsrc.id, self.sc_ks_client.user_id, rsrc.id)) + # Update the corresponding user on subcloud by pushing the DB records + # to subcloud + user_ref = self.sc_dbs_client.identity_manager.\ + update_user(sc_user_id, user_records) + if not user_ref: + LOG.error("No user data returned when updating user {} in" + " subcloud.".format(sc_user_id), extra=self.log_extra) raise exceptions.SyncRequestFailed + # Persist the subcloud resource. + user_ref_id = user_ref.get('user').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + user_ref_id) + username = user_ref.get('local_user').get('name') + LOG.info("Updated Keystone user {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, username), + extra=self.log_extra) + def patch_users(self, request, rsrc): # Update user reference on this subcloud user_update_dict = jsonutils.loads(request.orch_job.resource_info) @@ -290,7 +456,15 @@ class IdentitySyncThread(SyncThread): original_user_ref = UserReferenceWrapper(id=user_id) # Delete the user in the subcloud - self.sc_ks_client.users.delete(original_user_ref) + try: + self.sc_ks_client.users.delete(original_user_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete user: user {} not found in {}, " + "considered as deleted.". + format(original_user_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone user {}:{} [{}] deleted" @@ -301,31 +475,90 @@ class IdentitySyncThread(SyncThread): def post_projects(self, request, rsrc): # Create this project on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + project_id = request.orch_job.source_resource_id + if not project_id: + LOG.error("Received project create request without required " + "'source_resource_id' field", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the project just created. + # The records is in JSON format. + project_records = self.m_dbs_client.project_manager.\ + project_detail(project_id) + if not project_records: + LOG.error("No data retrieved from master cloud for project {} to" + " create its equivalent in subcloud.".format(project_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the project on subcloud by pushing the DB records to subcloud + project_ref = self.sc_dbs_client.project_manager.\ + add_project(project_records) + if not project_ref: + LOG.error("No project data returned when creating project {} in" + " subcloud.".format(project_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Persist the subcloud resource. + project_ref_id = project_ref.get('project').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + project_ref_id) + projectname = project_ref.get('project').get('name') + LOG.info("Created Keystone project {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, projectname), + extra=self.log_extra) + + def put_projects(self, request, rsrc): + # Update this project on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + project_id = request.orch_job.source_resource_id + if not project_id: + LOG.error("Received project update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + project_dict = jsonutils.loads(request.orch_job.resource_info) if 'project' in project_dict.keys(): project_dict = project_dict['project'] - projectname = project_dict.pop('name', None) # compulsory - projectdomain = project_dict.pop('domain_id', 'default') # compulsory - if not projectname: - LOG.error("Received project create request without required " - "'name' field", extra=self.log_extra) + sc_project_id = project_dict.pop('id', None) + if not sc_project_id: + LOG.error("Received project update request without required " + "subcloud resource id", extra=self.log_extra) raise exceptions.SyncRequestFailed - # Create the project in the subcloud - project_ref = self.sc_ks_client.projects.create( - name=projectname, - domain=projectdomain, - description=project_dict.pop('description', None), - enabled=project_dict.pop('enabled', True), - parent=project_dict.pop('parent_id', None)) + # Retrieve DB records of the project. The records is in JSON + # format + project_records = self.m_dbs_client.project_manager.\ + project_detail(project_id) + if not project_records: + LOG.error("No data retrieved from master cloud for project {} to" + " update its equivalent in subcloud.".format(project_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - project_ref_id = project_ref.id + # Update the corresponding project on subcloud by pushing the DB + # records to subcloud + project_ref = self.sc_dbs_client.project_manager.\ + update_project(sc_project_id, project_records) + if not project_ref: + LOG.error("No project data returned when updating project {} in" + " subcloud.".format(sc_project_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + project_ref_id = project_ref.get('project').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, project_ref_id) - LOG.info("Created Keystone project {}:{} [{}]" + projectname = project_ref.get('project').get('name') + LOG.info("Updated Keystone project {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, projectname), extra=self.log_extra) @@ -349,7 +582,7 @@ class IdentitySyncThread(SyncThread): # instead of stowing the entire project reference or # retrieving it, we build an opaque wrapper for the # v3 ProjectManager, containing the ID field which is - # needed to update this user reference + # needed to update this project reference ProjectReferenceWrapper = namedtuple('ProjectReferenceWrapper', 'id') proj_id = project_subcloud_rsrc.subcloud_resource_id original_proj_ref = ProjectReferenceWrapper(id=proj_id) @@ -388,7 +621,15 @@ class IdentitySyncThread(SyncThread): original_proj_ref = ProjectReferenceWrapper(id=proj_id) # Delete the project in the subcloud - self.sc_ks_client.projects.delete(original_proj_ref) + try: + self.sc_ks_client.projects.delete(original_proj_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete project: project {} not found in {}, " + "considered as deleted.". + format(original_proj_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone project {}:{} [{}] deleted" @@ -399,27 +640,90 @@ class IdentitySyncThread(SyncThread): def post_roles(self, request, rsrc): # Create this role on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + role_id = request.orch_job.source_resource_id + if not role_id: + LOG.error("Received role create request without required " + "'source_resource_id' field", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the role just created. The records is in JSON + # format. + role_records = self.m_dbs_client.role_manager.\ + role_detail(role_id) + if not role_records: + LOG.error("No data retrieved from master cloud for role {} to" + " create its equivalent in subcloud.".format(role_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the role on subcloud by pushing the DB records to subcloud + role_ref = self.sc_dbs_client.role_manager.\ + add_role(role_records) + if not role_ref: + LOG.error("No role data returned when creating role {} in" + " subcloud.".format(role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Persist the subcloud resource. + role_ref_id = role_ref.get('role').get('id') + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + role_ref_id) + rolename = role_ref.get('role').get('name') + LOG.info("Created Keystone role {}:{} [{}]" + .format(rsrc.id, subcloud_rsrc_id, rolename), + extra=self.log_extra) + + def put_roles(self, request, rsrc): + # Update this role on this subcloud + # The DB level resource update process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then updates the resource records in its DB tables. + role_id = request.orch_job.source_resource_id + if not role_id: + LOG.error("Received role update request without required " + "source resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + role_dict = jsonutils.loads(request.orch_job.resource_info) if 'role' in role_dict.keys(): role_dict = role_dict['role'] - rolename = role_dict.pop('name', None) # compulsory - if not rolename: - LOG.error("Received role create request without required " - "'name' field", extra=self.log_extra) + sc_role_id = role_dict.pop('id', None) + if not sc_role_id: + LOG.error("Received role update request without required " + "subcloud resource id", extra=self.log_extra) raise exceptions.SyncRequestFailed - # Create the role in the subcloud - role_ref = self.sc_ks_client.roles.create( - name=rolename, - domain=role_dict.pop('domain_id', None)) + # Retrieve DB records of the role. The records is in JSON + # format + role_records = self.m_dbs_client.role_manager.\ + role_detail(role_id) + if not role_records: + LOG.error("No data retrieved from master cloud for role {} to" + " update its equivalent in subcloud.".format(role_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed - role_ref_id = role_ref.id + # Update the corresponding role on subcloud by pushing the DB records + # to subcloud + role_ref = self.sc_dbs_client.role_manager.\ + update_role(sc_role_id, role_records) + if not role_ref: + LOG.error("No role data returned when updating role {} in" + " subcloud.".format(role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed # Persist the subcloud resource. + role_ref_id = role_ref.get('role').get('id') subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, role_ref_id) - LOG.info("Created Keystone role {}:{} [{}]" + rolename = role_ref.get('role').get('name') + LOG.info("Updated Keystone role {}:{} [{}]" .format(rsrc.id, subcloud_rsrc_id, rolename), extra=self.log_extra) @@ -479,7 +783,15 @@ class IdentitySyncThread(SyncThread): original_role_ref = RoleReferenceWrapper(id=role_id) # Delete the role in the subcloud - self.sc_ks_client.roles.delete(original_role_ref) + try: + self.sc_ks_client.roles.delete(original_role_ref) + except keystone_exceptions.NotFound: + LOG.info("Delete role: role {} not found in {}, " + "considered as deleted.". + format(original_role_ref.id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + # Master Resource can be deleted only when all subcloud resources # are deleted along with corresponding orch_job and orch_requests. LOG.info("Keystone role {}:{} [{}] deleted" @@ -488,8 +800,10 @@ class IdentitySyncThread(SyncThread): extra=self.log_extra) role_subcloud_rsrc.delete() - def put_project_role_assignments(self, request, rsrc): + def post_project_role_assignments(self, request, rsrc): # Assign this role to user on project on this subcloud + # Project role assignments creation is still using keystone APIs since + # the APIs can be used to sync them. resource_tags = rsrc.master_id.split('_') if len(resource_tags) < 3: LOG.error("Malformed resource tag {} expected to be in " @@ -569,6 +883,12 @@ class IdentitySyncThread(SyncThread): else: LOG.error("Unable to update Keystone role assignment {}:{} " .format(rsrc.id, sc_role), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + def put_project_role_assignments(self, request, rsrc): + # update the project role assignment on this subcloud + # For project role assignment, there is nothing to update. + return def delete_project_role_assignments(self, request, rsrc): # Revoke this role for user on project on this subcloud @@ -587,7 +907,7 @@ class IdentitySyncThread(SyncThread): subcloud_rid = assignment_subcloud_rsrc.subcloud_resource_id resource_tags = subcloud_rid.split('_') if len(resource_tags) < 3: - LOG.error("Malformed subcloud resource tag {} expected to be in " + LOG.error("Malformed subcloud resource tag {}, expected to be in " "format: ProjectID_UserID_RoleID." .format(assignment_subcloud_rsrc), extra=self.log_extra) assignment_subcloud_rsrc.delete() @@ -598,10 +918,17 @@ class IdentitySyncThread(SyncThread): role_id = resource_tags[2] # Revoke role assignment - self.sc_ks_client.roles.revoke( - role_id, - user=user_id, - project=project_id) + try: + self.sc_ks_client.roles.revoke( + role_id, + user=user_id, + project=project_id) + except keystone_exceptions.NotFound: + LOG.info("Revoke role assignment: (role {}, user {}, project {})" + " not found in {}, considered as deleted.". + format(role_id, user_id, project_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) role_ref = self.sc_ks_client.role_assignments.list( user=user_id, @@ -615,20 +942,173 @@ class IdentitySyncThread(SyncThread): else: LOG.error("Unable to delete Keystone role assignment {}:{} " .format(rsrc.id, role_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + assignment_subcloud_rsrc.delete() - # ---- Override common audit functions ---- + def post_revoke_events(self, request, rsrc): + # Create token revoke event on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + revoke_event_dict = jsonutils.loads(request.orch_job.resource_info) + if 'token_revoke_event' in revoke_event_dict.keys(): + revoke_event_dict = revoke_event_dict['token_revoke_event'] + audit_id = revoke_event_dict.pop('audit_id', None) + if not audit_id: + LOG.error("Received token revocation event create request without " + "required subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the revoke event just created. The records + # is in JSON format. + revoke_event_records = self.m_dbs_client.revoke_event_manager.\ + revoke_event_detail(audit_id=audit_id) + if not revoke_event_records: + LOG.error("No data retrieved from master cloud for token" + " revocation event with audit_id {} to create its" + " equivalent in subcloud.".format(audit_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the revoke event on subcloud by pushing the DB records to + # subcloud + revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\ + add_revoke_event(revoke_event_records) + if not revoke_event_ref: + LOG.error("No token revocation event data returned when creating" + " token revocation event with audit_id {} in subcloud." + .format(audit_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + revoke_event_ref_id = revoke_event_ref.\ + get('revocation_event').get('audit_id') + subcloud_rsrc_id = self.\ + persist_db_subcloud_resource(rsrc.id, revoke_event_ref_id) + LOG.info("Created Keystone token revoke event {}:{}" + .format(rsrc.id, subcloud_rsrc_id), + extra=self.log_extra) + + def delete_revoke_events(self, request, rsrc): + # Delete token revocation event reference on this subcloud + revoke_event_subcloud_rsrc = self.get_db_subcloud_resource(rsrc.id) + if not revoke_event_subcloud_rsrc: + LOG.error("Unable to delete token revocation event reference {}, " + "cannot find equivalent Keystone token revocation event " + "in subcloud.".format(rsrc), extra=self.log_extra) + return + + # subcloud resource id is the audit_id + subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id + + try: + self.sc_dbs_client.revoke_event_manager.delete_revoke_event( + audit_id=subcloud_resource_id) + except dbsync_exceptions.NotFound: + LOG.info("Delete token revocation event: event {} not found in {}," + " considered as deleted.". + format(revoke_event_subcloud_rsrc.subcloud_resource_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + + # Master Resource can be deleted only when all subcloud resources + # are deleted along with corresponding orch_job and orch_requests. + LOG.info("Keystone token revocation event {}:{} [{}] deleted" + .format(rsrc.id, revoke_event_subcloud_rsrc.id, + revoke_event_subcloud_rsrc.subcloud_resource_id), + extra=self.log_extra) + revoke_event_subcloud_rsrc.delete() + + def post_revoke_events_for_user(self, request, rsrc): + # Create token revoke event on this subcloud + # The DB level resource creation process is, retrieve the resource + # records from master cloud by its ID, send the records in its original + # JSON format by REST call to the DB synchronization service on this + # subcloud, which then inserts the resource records into DB tables. + event_id = request.orch_job.source_resource_id + if not event_id: + LOG.error("Received token revocation event create request without " + "required subcloud resource id", extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Retrieve DB records of the revoke event just created. The records + # is in JSON format. + revoke_event_records = self.m_dbs_client.revoke_event_manager.\ + revoke_event_detail(user_id=event_id) + if not revoke_event_records: + LOG.error("No data retrieved from master cloud for token" + " revocation event with event_id {} to create its" + " equivalent in subcloud.".format(event_id), + extra=self.log_extra) + raise exceptions.SyncRequestFailed + + # Create the revoke event on subcloud by pushing the DB records to + # subcloud + revoke_event_ref = self.sc_dbs_client.revoke_event_manager.\ + add_revoke_event(revoke_event_records) + if not revoke_event_ref: + LOG.error("No token revocation event data returned when creating" + " token revocation event with event_id {} in subcloud." + .format(event_id), extra=self.log_extra) + raise exceptions.SyncRequestFailed + + subcloud_rsrc_id = self.persist_db_subcloud_resource(rsrc.id, + event_id) + LOG.info("Created Keystone token revoke event {}:{}" + .format(rsrc.id, subcloud_rsrc_id), + extra=self.log_extra) + + def delete_revoke_events_for_user(self, request, rsrc): + # Delete token revocation event reference on this subcloud + revoke_event_subcloud_rsrc = self.get_db_subcloud_resource(rsrc.id) + if not revoke_event_subcloud_rsrc: + LOG.error("Unable to delete token revocation event reference {}, " + "cannot find equivalent Keystone token revocation event " + "in subcloud.".format(rsrc), extra=self.log_extra) + return + + # subcloud resource id is _ encoded in base64 + subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id + + try: + self.sc_dbs_client.revoke_event_manager.delete_revoke_event( + user_id=subcloud_resource_id) + except dbsync_exceptions.NotFound: + LOG.info("Delete token revocation event: event {} not found in {}," + " considered as deleted.". + format(revoke_event_subcloud_rsrc.subcloud_resource_id, + self.subcloud_engine.subcloud.region_name), + extra=self.log_extra) + + # Master Resource can be deleted only when all subcloud resources + # are deleted along with corresponding orch_job and orch_requests. + LOG.info("Keystone token revocation event {}:{} [{}] deleted" + .format(rsrc.id, revoke_event_subcloud_rsrc.id, + revoke_event_subcloud_rsrc.subcloud_resource_id), + extra=self.log_extra) + revoke_event_subcloud_rsrc.delete() + + # ---- Override common audit functions ---- def _get_resource_audit_handler(self, resource_type, client): if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS: - return self._get_users_resource(client) + return self._get_users_resource(client.identity_manager) elif resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES: - return self._get_roles_resource(client) + return self._get_roles_resource(client.role_manager) elif resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECTS: - return self._get_projects_resource(client) + return self._get_projects_resource(client.project_manager) elif (resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): return self._get_assignments_resource(client) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._get_revoke_events_resource(client. + revoke_event_manager) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._get_revoke_events_for_user_resource( + client.revoke_event_manager) else: LOG.error("Wrong resource type {}".format(resource_type), extra=self.log_extra) @@ -636,36 +1116,53 @@ class IdentitySyncThread(SyncThread): def _get_users_resource(self, client): try: - users = client.users.list() - # NOTE (knasim-wrs): We need to filter out services users, - # as some of these users may be for optional services - # (such as Magnum, Murano etc) which will be picked up by - # the Sync Audit and created on subclouds, later when these - # optional services are enabled on the subcloud - services = client.services.list() + services = [] + filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_USERS] - filtered_users = [user for user in users if - (all(user.name != service.name for - service in services) and - all(user.name != filtered for - filtered in filtered_list))] + # Filter out services users and some predefined users. These users + # are not to be synced to the subcloud. + filtered_users = [] + # get users from DB API + if hasattr(client, 'list_users'): + users = client.list_users() + for user in users: + user_name = user.local_user.name + if all(user_name != service.name for service in services)\ + and all(user_name != filtered for filtered in + filtered_list): + filtered_users.append(user) + # get users from keystone API + else: + users = client.users.list() + for user in users: + user_name = user.name + if all(user_name != service.name for service in services)\ + and all(user_name != filtered for filtered in + filtered_list): + filtered_users.append(user) + return filtered_users except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("User Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_roles_resource(self, client): try: - roles = client.roles.list() + # get roles from DB API + if hasattr(client, 'list_roles'): + roles = client.list_roles() + # get roles from keystone API + else: + roles = client.roles.list() + # Filter out system roles filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_ROLES] @@ -675,19 +1172,24 @@ class IdentitySyncThread(SyncThread): filtered in filtered_list))] return filtered_roles except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Role Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_projects_resource(self, client): try: - projects = client.projects.list() + # get projects from DB API + if hasattr(client, 'list_projects'): + projects = client.list_projects() + # get roles from keystone API + else: + projects = client.projects.list() + # Filter out admin or services projects filtered_list = self.filtered_audit_resources[ consts.RESOURCE_TYPE_IDENTITY_PROJECTS] @@ -697,15 +1199,14 @@ class IdentitySyncThread(SyncThread): filtered in filtered_list)] return filtered_projects except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Project Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) - return None def _get_assignments_resource(self, client): try: @@ -762,82 +1263,357 @@ class IdentitySyncThread(SyncThread): return refactored_assignments except (keystone_exceptions.connection.ConnectTimeout, - keystone_exceptions.ConnectFailure) as e: + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: LOG.info("Assignment Audit: subcloud {} is not reachable [{}]" .format(self.subcloud_engine.subcloud.region_name, str(e)), extra=self.log_extra) # None will force skip of audit return None - except Exception as e: - LOG.exception(e) + + def _get_revoke_events_resource(self, client): + try: + # get token revoke events from DB API + revoke_events = client.list_revoke_events() + # Events with audit_id are generated by openstack token + # revocation command. audit_id will be the unique id of + # the resource. + filtered_revoke_events = [event for event in revoke_events if + event.audit_id is not None] + return filtered_revoke_events + + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: + LOG.info("Token revoke events Audit: subcloud {} is not reachable" + " [{}]".format(self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # None will force skip of audit + return None + + def _get_revoke_events_for_user_resource(self, client): + try: + # get token revoke events from DB API + revoke_events = client.list_revoke_events() + # Events with user_id are generated when user password is changed. + # _ will be the unique id of + # the resource. + filtered_revoke_events = [event for event in revoke_events if + event.user_id is not None] + return filtered_revoke_events + + except (keystone_exceptions.connection.ConnectTimeout, + keystone_exceptions.ConnectFailure, + dbsync_exceptions.ConnectTimeout, + dbsync_exceptions.ConnectFailure) as e: + LOG.info("Token revoke events Audit: subcloud {} is not reachable" + " [{}]".format(self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # None will force skip of audit return None def _same_identity_resource(self, m, sc): LOG.debug("master={}, subcloud={}".format(m, sc), extra=self.log_extra) - # Any Keystone resource can be system wide or domain scoped, - # If the domains are different then these resources - # are instantly unique since the same resource name can be - # mapped in different domains - return (m.name == sc.name and - m.domain_id == sc.domain_id) + # For user the comparison is DB records by DB records. + # The user DB records are from multiple tables, including user, + # local_user, and password tables. If any of them are not matched, + # it is considered not a same identity resource. + # Note that the user id is compared, since user id is to be synced + # to subcloud too. + same_user = (m.id == sc.id and + m.domain_id == sc.domain_id and + m.default_project_id == sc.default_project_id and + m.enabled == sc.enabled and + m.created_at == sc.created_at and + m.last_active_at == sc.last_active_at and + m.extra == sc.extra) + if not same_user: + return False + + same_local_user = (m.local_user.domain_id == + sc.local_user.domain_id and + m.local_user.name == sc.local_user.name and + # Foreign key to user.id + m.local_user.user_id == sc.local_user.user_id and + m.local_user.failed_auth_count == + sc.local_user.failed_auth_count and + m.local_user.failed_auth_at == + sc.local_user.failed_auth_at) + if not same_local_user: + return False + + result = False + if len(m.local_user.passwords) == len(sc.local_user.passwords): + for m_password in m.local_user.passwords: + for sc_password in sc.local_user.passwords: + if m_password.password_hash == sc_password.password_hash: + break + # m_password is not found in sc_passwords + else: + break + # All are found + else: + result = True + + return result + + def _has_same_identity_ids(self, m, sc): + # If (user name + domain name) or use id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.local_user.name == sc.local_user.name and + m.domain_id == sc.domain_id) or m.id == sc.id) + + def _same_project_resource(self, m, sc): + LOG.debug("master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For project the comparison is DB records by DB records. + # The project DB records are from project tables. If any of + # them are not matched, it is considered not the same. + # Note that the project id is compared, since project id is to + # be synced to subcloud too. + return (m.id == sc.id and + m.domain_id == sc.domain_id and + m.name == sc.name and + m.extra == sc.extra and + m.description == sc.description and + m.enabled == sc.enabled and + m.parent_id == sc.parent_id and + m.is_domain == sc.is_domain) + + def _has_same_project_ids(self, m, sc): + # If (project name + domain name) or project id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.name == sc.name and m.domain_id == sc.domain_id) + or m.id == sc.id) + + def _same_role_resource(self, m, sc): + LOG.debug("master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For role the comparison is DB records by DB records. + # The role DB records are from role tables. If any of + # them are not matched, it is considered not the same. + # Note that the role id is compared, since role id is to + # be synced to subcloud too. + return (m.id == sc.id and + m.domain_id == sc.domain_id and + m.name == sc.name and + m.extra == sc.extra) + + def _has_same_role_ids(self, m, sc): + # If (role name + domain name) or role id is the same, + # the resources are considered to be the same resource. + # Any difference in other attributes will trigger an update (PUT) + # to that resource in subcloud. + return ((m.name == sc.name and m.domain_id == sc.domain_id) + or m.id == sc.id) def _same_assignment_resource(self, m, sc): LOG.debug("same_assignment master={}, subcloud={}".format(m, sc), extra=self.log_extra) # For an assignment to be the same, all 3 of its role, project and # user information must match up - is_same = (self._same_identity_resource(m.user, sc.user) and - self._same_identity_resource(m.role, sc.role) and - self._same_identity_resource(m.project, sc.project)) - return is_same + return((m.user.name == sc.user.name and + m.user.domain_id == sc.user.domain_id) and + (m.role.name == sc.role.name and + m.role.domain_id == sc.role.domain_id) and + (m.project.name == sc.project.name and + m.project.domain_id == sc.project.domain_id)) + + def _has_same_assignment_ids(self, m, sc): + # For assignment the triple(user, project, role) is the unique id. + # The two resources have same id only when all of them are identical. + return self._same_assignment_resource(m, sc) + + def _same_revoke_event_resource(self, m, sc): + LOG.debug("same_revoke_event master={}, subcloud={}".format(m, sc), + extra=self.log_extra) + # For token revocation event the comparison is DB records by + # DB records. The DB records are from revocation_event tables. + # Token revocation events are considered the same when all columns + # match up. + return(m.domain_id == sc.domain_id and + m.project_id == sc.project_id and + m.user_id == sc.user_id and + m.role_id == sc.role_id and + m.trust_id == sc.trust_id and + m.consumer_id == sc.consumer_id and + m.access_token_id == sc.access_token_id and + m.issued_before == sc.issued_before and + m.expires_at == sc.expires_at and + m.revoked_at == sc.revoked_at and + m.audit_id == sc.audit_id and + m.audit_chain_id == sc.audit_chain_id) + + def _has_same_revoke_event_ids(self, m, sc): + # For token revoke events to have same ids, all columns must be + # match up. + return self._same_revoke_event_resource(m, sc) def get_master_resources(self, resource_type): - return self._get_resource_audit_handler(resource_type, - self.m_ks_client) + # Retrieve master resources from DB or through Keystone. + # users, projects, roles, and token revocation events use + # dbsync client, other resources use keystone client. + try: + if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECTS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + return self._get_resource_audit_handler(resource_type, + self.m_dbs_client) + return self._get_resource_audit_handler(resource_type, + self.m_ks_client) + except dbsync_exceptions.Unauthorized as e: + LOG.info("Get resource [{}] request failed for {}: {}." + .format(resource_type, consts.VIRTUAL_MASTER_CLOUD, + str(e)), extra=self.log_extra) + # In case of token expires, re-authenticate and retry once + self.reinitialize_m_clients() + return self._get_resource_audit_handler(resource_type, + self.m_dbs_client) + except Exception as e: + LOG.exception(e) + return None def get_subcloud_resources(self, resource_type): self.initialize_sc_clients() - return self._get_resource_audit_handler(resource_type, - self.sc_ks_client) + # Retrieve master resources from DB or through keystone. + # users, projects, roles, and token revocation events use + # dbsync client, other resources use keystone client. + try: + if resource_type == consts.RESOURCE_TYPE_IDENTITY_USERS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_PROJECTS or \ + resource_type == consts.RESOURCE_TYPE_IDENTITY_ROLES or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS or \ + resource_type == \ + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER: + return self._get_resource_audit_handler(resource_type, + self.sc_dbs_client) + return self._get_resource_audit_handler(resource_type, + self.sc_ks_client) + + except dbsync_exceptions.Unauthorized as e: + LOG.info("Get resource [{}] request failed for {}: {}." + .format(resource_type, + self.subcloud_engine.subcloud.region_name, + str(e)), extra=self.log_extra) + # In case of token expires, re-authenticate and retry once + self.reinitialize_sc_clients() + return self._get_resource_audit_handler(resource_type, + self.sc_dbs_client) + except Exception as e: + LOG.exception(e) + return None def same_resource(self, resource_type, m_resource, sc_resource): if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS): + return self._same_identity_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECTS): + return self._same_project_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_ROLES): + return self._same_role_resource(m_resource, sc_resource) + elif (resource_type == consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): return self._same_assignment_resource(m_resource, sc_resource) - else: - return self._same_identity_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._same_revoke_event_resource(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._same_revoke_event_resource(m_resource, sc_resource) + + def has_same_ids(self, resource_type, m_resource, sc_resource): + if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_USERS): + return self._has_same_identity_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECTS): + return self._has_same_project_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_ROLES): + return self._has_same_role_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS): + return self._has_same_assignment_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS): + return self._has_same_revoke_event_ids(m_resource, sc_resource) + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER): + return self._has_same_revoke_event_ids(m_resource, sc_resource) def get_resource_id(self, resource_type, resource): if hasattr(resource, 'master_id'): # If resource from DB, return master resource id # from master cloud return resource.master_id - - # Else, it is OpenStack resource retrieved from master cloud + # For token revocation event, use audit_id if it presents. + if (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS)\ + and resource.audit_id: + return resource.audit_id + # For user token revocation event, the id is + # _ then base64 encoded + elif (resource_type == + consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER)\ + and resource.user_id and resource.issued_before: + event_id = "{}_{}".format(resource.user_id, resource.issued_before) + return base64.urlsafe_b64encode(event_id) + # Default id field retrieved from master cloud return resource.id def get_resource_info(self, resource_type, resource, operation_type=None): rtype = consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS - if (operation_type == consts.OPERATION_TYPE_CREATE and - resource_type != rtype): + if ((operation_type == consts.OPERATION_TYPE_CREATE or + operation_type == consts.OPERATION_TYPE_POST or + operation_type == consts.OPERATION_TYPE_PUT) + and resource_type != rtype): # With the exception of role assignments, for all create # requests the resource_info needs to be extracted # from the master resource - return jsonutils.dumps(resource._info) + resource_info = resource.info() + return jsonutils.dumps(resource_info) else: super(IdentitySyncThread, self).get_resource_info( resource_type, resource, operation_type) def audit_discrepancy(self, resource_type, m_resource, sc_resources): - # It could be that the details are different - # between master cloud and subcloud now. - # Thus, delete the resource before creating it again. - self.schedule_work(self.endpoint_type, resource_type, - self.get_resource_id(resource_type, m_resource), - consts.OPERATION_TYPE_DELETE) - # Return true to try creating the resource again + # Check if the resource is indeed missing or its details are mismatched + # from master cloud. If missing, return True to create the resource. + # If mismatched, queue work to update this resource and return False. + mismatched_resource = False + for sc_r in sc_resources: + if self.has_same_ids(resource_type, m_resource, sc_r): + mismatched_resource = True + break + + if mismatched_resource: + LOG.info("Subcloud res {}:{} is found but diverse in details," + " will update".format(resource_type, sc_r.id), + extra=self.log_extra) + self.schedule_work(self.endpoint_type, resource_type, + self.get_resource_id(resource_type, m_resource), + consts.OPERATION_TYPE_PUT, + self.get_resource_info( + resource_type, sc_r, + consts.OPERATION_TYPE_PUT)) + return False + return True def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db, @@ -850,10 +1626,9 @@ class IdentitySyncThread(SyncThread): # manifest on the Subclouds and the Central Region should not try # to create these on the subclouds for sc_r in sc_resources: - if self.same_resource(resource_type, m_r, sc_r): - LOG.info( - "Mapping resource {} to existing subcloud resource {}" - .format(m_r, sc_r), extra=self.log_extra) + if self.has_same_ids(resource_type, m_r, sc_r): + LOG.info("Mapping resource {} to existing subcloud resource {}" + .format(m_r, sc_r), extra=self.log_extra) self.persist_db_subcloud_resource(m_rsrc_db.id, self.get_resource_id( resource_type, diff --git a/dcorch/engine/sync_services/sysinv.py b/dcorch/engine/sync_services/sysinv.py index 43628cc5e..d6c8fa345 100644 --- a/dcorch/engine/sync_services/sysinv.py +++ b/dcorch/engine/sync_services/sysinv.py @@ -799,7 +799,7 @@ class SysinvSyncThread(SyncThread): s_os_client = sdk.OpenStackDriver(self.region_name) try: - s_os_client.sysinv_client.create_fernet_repo( + s_os_client.sysinv_client.post_fernet_repo( FernetKeyManager.from_resource_info(resource_info)) # Ensure subcloud resource is persisted to the DB for later subcloud_rsrc_id = self.persist_db_subcloud_resource( @@ -831,7 +831,7 @@ class SysinvSyncThread(SyncThread): s_os_client = sdk.OpenStackDriver(self.region_name) try: - s_os_client.sysinv_client.update_fernet_repo( + s_os_client.sysinv_client.put_fernet_repo( FernetKeyManager.from_resource_info(resource_info)) # Ensure subcloud resource is persisted to the DB for later subcloud_rsrc_id = self.persist_db_subcloud_resource( @@ -1309,7 +1309,7 @@ class SysinvSyncThread(SyncThread): resource_type), extra=self.log_extra) - def audit_discrepancy(self, resource_type, m_resource, sc_resources): + def audit_discrepancy(self, resource_type, m_resource): # Return true to try the audit_action if resource_type in self.SYSINV_ADD_DELETE_RESOURCES: # It could be that the details are different @@ -1329,7 +1329,7 @@ class SysinvSyncThread(SyncThread): extra=self.log_extra) return False - def audit_action(self, resource_type, finding, resource): + def audit_action(self, resource_type, finding, resource, sc_source=None): if resource_type in self.SYSINV_MODIFY_RESOURCES: LOG.info("audit_action: {}/{}" .format(finding, resource_type), diff --git a/dcorch/engine/sync_thread.py b/dcorch/engine/sync_thread.py index c54ae4eca..5634fa39f 100644 --- a/dcorch/engine/sync_thread.py +++ b/dcorch/engine/sync_thread.py @@ -15,12 +15,10 @@ import threading +from oslo_config import cfg from oslo_log import log as logging -from keystoneauth1 import loading -from keystoneauth1 import session -from keystoneclient import client as keystoneclient - +from dcdbsync.dbsyncclient import client as dbsyncclient from dcmanager.common import consts as dcmanager_consts from dcmanager.rpc import client as dcmanager_rpc_client from dcorch.common import consts @@ -30,7 +28,11 @@ from dcorch.common import utils from dcorch.objects import orchrequest from dcorch.objects import resource from dcorch.objects import subcloud_resource -from oslo_config import cfg + +from keystoneauth1 import loading +from keystoneauth1 import session +from keystoneclient import client as keystoneclient + LOG = logging.getLogger(__name__) @@ -81,6 +83,7 @@ class SyncThread(object): self.sc_admin_session = None self.admin_session = None self.ks_client = None + self.dbs_client = None def start(self): if self.status == STATUS_NEW: @@ -124,9 +127,15 @@ class SyncThread(object): user_domain_name=cfg.CONF.cache.admin_user_domain_name) self.admin_session = session.Session( auth=auth, timeout=60, additional_headers=consts.USER_HEADER) + # keystone client self.ks_client = keystoneclient.Client( session=self.admin_session, region_name=consts.VIRTUAL_MASTER_CLOUD) + # dcdbsync client + self.dbs_client = dbsyncclient.Client( + endpoint_type=consts.DBS_ENDPOINT_INTERNAL, + session=self.admin_session, + region_name=consts.VIRTUAL_MASTER_CLOUD) def initialize_sc_clients(self): # base implementation of initializing the subcloud specific @@ -164,6 +173,10 @@ class SyncThread(object): auth=sc_auth, timeout=60, additional_headers=consts.USER_HEADER) + def initial_sync(self): + # Return True to indicate initial sync success + return True + def enable(self): # Called when DC manager thinks this subcloud is good to go. self.initialize() @@ -530,6 +543,7 @@ class SyncThread(object): extra=self.log_extra) # Subcloud resource is present in DB, but the check # for same_resource() was negative. Either the resource + # disappeared from subcloud or the resource details # are different from that of master cloud. Let the # resource implementation decide on the audit action. @@ -556,6 +570,7 @@ class SyncThread(object): # Resource is missing from subcloud, take action num_of_audit_jobs += self.audit_action( resource_type, AUDIT_RESOURCE_MISSING, m_r) + # As the subcloud resource is missing, invoke # the hook for dependants with no subcloud resource. # Resource implementation should handle this. @@ -667,6 +682,9 @@ class SyncThread(object): def same_resource(self, resource_type, m_resource, sc_resource): return True + def has_same_ids(self, resource_type, m_resource, sc_resource): + return False + def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db, sc_resources): # Child classes can override this function to map an existing subcloud