Dcorch Integration with Optimized OpenStackDriver

Integrate dcorch master clients with the optimized OpenStackDriver.

Update the methods of creating subcloud keystone, dcdbsync and sysinv
clients.

Add subcloud management ip parameter to RPC calls between dcorch-engine
master and workers services to construct client endpoints.

Test Plan:
PASS: Change the admin password on the system controller using
      the command "openstack --os-region-name SystemController user
      password set". Verify that the admin password is synchronized
      to the subcloud and the dcorch receives the corresponding sync
      request, followed by successful execution of sync resources for
      the subcloud.
PASS: Unmanage and then manage a subcloud, and verify that the initial
      sync is executed successfully for that subcloud.
PASS: Verify successful dcorch audits every 5 minutes.

Story: 2011106
Task: 50113

Change-Id: Idfa493068dc7d2bac21aac2871238b9f0de12c9d
Signed-off-by: lzhu1 <li.zhu@windriver.com>
This commit is contained in:
Li Zhu 2024-05-17 09:38:04 -04:00
parent fbd78b235c
commit 4b1a277d85
15 changed files with 430 additions and 328 deletions

View File

@ -442,7 +442,7 @@ class OptimizedOpenStackDriver(object):
:param clients: The list of client names.
:type clients: list
"""
os_clients = OpenStackDriver.os_clients_dict
os_clients = OptimizedOpenStackDriver.os_clients_dict
for client in clients:
client_obj = (

View File

@ -325,6 +325,23 @@ def build_subcloud_endpoints(subcloud_mgmt_ips: dict) -> dict:
return subcloud_endpoints
def build_subcloud_endpoint(ip: str, service: str) -> str:
"""Builds a service endpoint for a given IP address.
:param ip: The IP address for constructing the service endpoint.
:type ip: str
:param service: The service of the endpoint
:type service: str
:return: The service endpoint URL.
:type: str
"""
endpoint = ENDPOINT_URLS.get(service, None)
if endpoint:
formatted_ip = f"[{ip}]" if netaddr.IPAddress(ip).version == 6 else ip
endpoint = endpoint.format(formatted_ip)
return endpoint
class OptimizedEndpointCache(object):
"""Cache for storing endpoint information.
@ -466,7 +483,7 @@ class OptimizedEndpointCache(object):
user_project: str,
user_project_domain: str,
timeout: float = None,
) -> None:
) -> session.Session:
"""Get the admin session.
:param auth_url: The authentication URL.

View File

@ -460,7 +460,10 @@ def subcloud_capabilities_get_all(context, region_name=None,
initial_sync_state=None):
results = subcloud_get_all(context, region_name, management_state,
availability_status, initial_sync_state)
return {result['region_name']: result['capabilities'] for result in results}
return {
result['region_name']: (result['capabilities'], result['management_ip'])
for result in results
}
@require_context
@ -471,7 +474,8 @@ def subcloud_sync_update_all_to_in_progress(context,
sync_requests):
with write_session() as session:
# Fetch the records of subcloud_sync that meet the update criteria
subcloud_sync_rows = session.query(models.SubcloudSync).join(
subcloud_sync_rows = session.query(models.SubcloudSync,
models.Subcloud.management_ip).join(
models.Subcloud,
models.Subcloud.region_name == models.SubcloudSync.subcloud_name
).filter(
@ -484,10 +488,11 @@ def subcloud_sync_update_all_to_in_progress(context,
# Update the sync status to in-progress for the selected subcloud_sync
# records
updated_rows = []
for subcloud_sync in subcloud_sync_rows:
for subcloud_sync, management_ip in subcloud_sync_rows:
subcloud_sync.sync_request = consts.SYNC_STATUS_IN_PROGRESS
updated_rows.append((subcloud_sync.subcloud_name,
subcloud_sync.endpoint_type))
subcloud_sync.endpoint_type,
management_ip))
return updated_rows
@ -502,7 +507,8 @@ def subcloud_audit_update_all_to_in_progress(context,
with write_session() as session:
# Fetch the records of subcloud_sync that meet the update criteria
subcloud_sync_rows = session.query(models.SubcloudSync).join(
subcloud_sync_rows = session.query(models.SubcloudSync,
models.Subcloud.management_ip).join(
models.Subcloud,
models.Subcloud.region_name == models.SubcloudSync.subcloud_name
).filter(
@ -525,11 +531,12 @@ def subcloud_audit_update_all_to_in_progress(context,
# Update the audit status to in-progress for the selected subcloud_sync
# records
updated_rows = []
for subcloud_sync in subcloud_sync_rows:
for subcloud_sync, management_ip in subcloud_sync_rows:
subcloud_sync.audit_status = consts.AUDIT_STATUS_IN_PROGRESS
subcloud_sync.last_audit_time = timeutils.utcnow()
updated_rows.append((subcloud_sync.subcloud_name,
subcloud_sync.endpoint_type))
subcloud_sync.endpoint_type,
management_ip))
return updated_rows

View File

@ -45,7 +45,7 @@ class GenericSyncWorkerManager(object):
self.audit_thread_group_manager = scheduler.ThreadGroupManager(
thread_pool_size=100)
def create_sync_objects(self, subcloud_name, capabilities):
def create_sync_objects(self, subcloud_name, capabilities, management_ip):
"""Create sync object objects for the subcloud
The objects handle the syncing of the subcloud's endpoint_types
@ -55,9 +55,11 @@ class GenericSyncWorkerManager(object):
if endpoint_type_list:
for endpoint_type in endpoint_type_list:
LOG.debug(f"Engine id:({self.engine_id}) create "
f"{subcloud_name}/{endpoint_type} sync obj")
f"{subcloud_name}/{endpoint_type}/{management_ip} "
f"sync obj")
sync_obj = sync_object_class_map[endpoint_type](subcloud_name,
endpoint_type)
endpoint_type,
management_ip)
sync_objs[endpoint_type] = sync_obj
return sync_objs
@ -67,12 +69,13 @@ class GenericSyncWorkerManager(object):
LOG.debug(f"Engine id:({self.engine_id}) Start to sync "
f"{subcloud_sync_list}.")
for sc_region_name, ept in subcloud_sync_list:
for sc_region_name, ept, ip in subcloud_sync_list:
try:
self.sync_thread_group_manager.start(self._sync_subcloud,
self.context,
sc_region_name,
ept)
ept,
ip)
except exceptions.SubcloudSyncNotFound:
# The endpoint in subcloud_sync has been removed
LOG.debug(f"Engine id:({self.engine_id}/{sc_region_name}/{ept}) "
@ -85,10 +88,11 @@ class GenericSyncWorkerManager(object):
self.context, sc_region_name, ept,
values={'sync_request': dco_consts.SYNC_STATUS_FAILED})
def _sync_subcloud(self, context, subcloud_name, endpoint_type):
def _sync_subcloud(self, context, subcloud_name, endpoint_type, management_ip):
LOG.info(f"Start to sync subcloud {subcloud_name}/{endpoint_type}.")
sync_obj = sync_object_class_map[endpoint_type](subcloud_name,
endpoint_type)
endpoint_type,
management_ip)
new_state = dco_consts.SYNC_STATUS_COMPLETED
timeout = eventlet.timeout.Timeout(SYNC_TIMEOUT)
try:
@ -124,7 +128,7 @@ class GenericSyncWorkerManager(object):
# pylint: disable-next=no-member
values={'subcloud_id': sc.id})
# Create the sync object for this engine
self.create_sync_objects(name, capabilities)
self.create_sync_objects(name, capabilities, management_ip)
def del_subcloud(self, context, subcloud_name):
# first update the state of the subcloud
@ -235,7 +239,7 @@ class GenericSyncWorkerManager(object):
pass
sync_obj = sync_object_class_map[endpoint_type](
subcloud_name, endpoint_type=endpoint_type)
subcloud_name, endpoint_type, sc.management_ip)
# create the subcloud_sync !!!
db_api.subcloud_sync_create(
@ -337,16 +341,16 @@ class GenericSyncWorkerManager(object):
LOG.debug(f"Engine id:({self.engine_id}) Start to audit "
f"{subcloud_sync_list}.")
for sc_region_name, ept in subcloud_sync_list:
for sc_region_name, ept, ip in subcloud_sync_list:
LOG.debug(f"Attempt audit_subcloud: "
f"{self.engine_id}/{sc_region_name}/{ept}")
try:
sync_obj = sync_object_class_map[ept](sc_region_name, ept)
self.sync_thread_group_manager.start(self._audit_subcloud,
self.context,
sc_region_name,
ept,
sync_obj)
sync_obj = sync_object_class_map[ept](sc_region_name, ept, ip)
self.audit_thread_group_manager.start(self._audit_subcloud,
self.context,
sc_region_name,
ept,
sync_obj)
except exceptions.SubcloudSyncNotFound:
# The endpoint in subcloud_sync has been removed
LOG.debug(f"Engine id:({self.engine_id}/{sc_region_name}/{ept}) "

View File

@ -90,8 +90,8 @@ class InitialSyncManager(object):
chunksize = (len(subclouds) + CONF.workers) // (CONF.workers)
subcloud_capabilities = {}
for region_name, capabilities in subclouds.items():
subcloud_capabilities[region_name] = capabilities
for region_name, capabilities_and_ip in subclouds.items():
subcloud_capabilities[region_name] = capabilities_and_ip
if len(subcloud_capabilities) == chunksize:
# We've gathered a batch of subclouds, send it to engine worker
# to process.

View File

@ -41,7 +41,7 @@ class InitialSyncWorkerManager(object):
LOG.debug(f"Engine id:({self.engine_id}) Start initial sync for "
f"subclouds {list(subcloud_capabilities.keys())}.")
for sc_region_name, sc_capabilities in subcloud_capabilities.items():
for sc_region_name, sc_capabilities_and_ip in subcloud_capabilities.items():
# Create a new greenthread for each subcloud to allow the
# initial syncs to be done in parallel. If there are not enough
# greenthreads in the pool, this will block until one becomes
@ -51,12 +51,14 @@ class InitialSyncWorkerManager(object):
self._initial_sync_subcloud,
self.context,
sc_region_name,
sc_capabilities)
sc_capabilities_and_ip[0],
sc_capabilities_and_ip[1])
except Exception as e:
LOG.error(f"Exception occurred when running initial_sync for "
f"subcloud {sc_region_name}: {e}")
def _initial_sync_subcloud(self, context, subcloud_name, subcloud_capabilities):
def _initial_sync_subcloud(self, context, subcloud_name, subcloud_capabilities,
management_ip):
"""Perform initial sync for a subcloud.
This runs in a separate greenthread for each subcloud.
@ -78,7 +80,7 @@ class InitialSyncWorkerManager(object):
# sync_objs stores the sync object per endpoint
sync_objs = self.gswm.create_sync_objects(
subcloud_name, subcloud_capabilities)
subcloud_name, subcloud_capabilities, management_ip)
# Initial sync. It's synchronous so that identity
# get synced before fernet token keys are synced. This is

View File

@ -18,14 +18,17 @@ import base64
from collections import namedtuple
from keystoneauth1 import exceptions as keystone_exceptions
from keystoneclient import client as keystoneclient
from oslo_log import log as logging
from oslo_serialization import jsonutils
from dccommon import consts as dccommon_consts
from dccommon.drivers.openstack import sdk_platform as sdk
from dcdbsync.dbsyncclient.client import Client
from dcdbsync.dbsyncclient import exceptions as dbsync_exceptions
from dcorch.common import consts
from dcorch.common import exceptions
from dcorch.engine.sync_thread import get_os_client
from dcorch.engine.sync_thread import SyncThread
from dcorch.objects import resource
@ -35,9 +38,11 @@ LOG = logging.getLogger(__name__)
class IdentitySyncThread(SyncThread):
"""Manages tasks related to resource management for keystone."""
def __init__(self, subcloud_name, endpoint_type=None, engine_id=None):
def __init__(self, subcloud_name, endpoint_type=None, management_ip=None,
engine_id=None):
super(IdentitySyncThread, self).__init__(subcloud_name,
endpoint_type=endpoint_type,
management_ip=management_ip,
engine_id=engine_id)
self.region_name = subcloud_name
if not self.endpoint_type:
@ -85,35 +90,48 @@ class IdentitySyncThread(SyncThread):
[]
}
# Subcloud clients
self.sc_ks_client = None
self.sc_dbs_client = None
self.log_extra = {"instance": "{}/{}: ".format(
self.region_name, self.endpoint_type)}
LOG.info("IdentitySyncThread initialized", extra=self.log_extra)
@staticmethod
def get_os_client(region):
try:
os_client = sdk.OpenStackDriver(
region_name=region,
region_clients=['dbsync'])
except Exception as e:
LOG.error("Failed to get os_client for region {} {}."
.format(region, str(e)))
raise e
return os_client
def initialize_sc_clients(self):
super().initialize_sc_clients()
def get_ks_client(self, region):
return self.get_os_client(region).keystone_client.keystone_client
self.sc_ks_client = keystoneclient.Client(
session=self.sc_admin_session,
region_name=self.region_name)
self.sc_dbs_client = Client(
endpoint_type=consts.DBS_ENDPOINT_ADMIN,
session=self.sc_admin_session)
def get_dbs_client(self, region):
return self.get_os_client(region).dbsync_client
def get_master_ks_client(self):
return get_os_client(self.master_region_name, ['dbsync']).\
keystone_client.keystone_client
def get_master_dbs_client(self):
return get_os_client(self.master_region_name, ['dbsync']).dbsync_client
def get_sc_ks_client(self):
if self.sc_ks_client is None:
self.initialize_sc_clients()
return self.sc_ks_client
def get_sc_dbs_client(self):
if self.sc_dbs_client is None:
self.initialize_sc_clients()
return self.sc_dbs_client
def _initial_sync_users(self, m_users, sc_users):
# Particularly sync users with same name but different ID. admin,
# sysinv, and dcmanager users are special cases as the id's will match
# (as this is forced during the subcloud deploy) but the details will
# not so we still need to sync them here.
m_client = self.get_dbs_client(self.master_region_name).identity_user_manager
sc_client = self.get_dbs_client(self.region_name).identity_user_manager
m_client = self.get_master_dbs_client().identity_user_manager
sc_client = self.get_sc_dbs_client().identity_user_manager
for m_user in m_users:
for sc_user in sc_users:
@ -140,11 +158,11 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update user {} request failed for {}: {}."
.format(sc_user.id,
self.region_name, str(e)))
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
# Retry with a new token
sc_client = self.get_dbs_client(
self.region_name).identity_user_manager
sc_client = self.get_sc_dbs_client().identity_user_manager
user_ref = sc_client.update_user(sc_user.id,
user_records)
if not user_ref:
@ -154,9 +172,8 @@ class IdentitySyncThread(SyncThread):
def _initial_sync_groups(self, m_groups, sc_groups):
# Particularly sync groups with same name but different ID.
m_client = self.get_dbs_client(
self.master_region_name).identity_group_manager
sc_client = self.get_dbs_client(self.region_name).identity_group_manager
m_client = self.get_master_dbs_client().identity_group_manager
sc_client = self.get_sc_dbs_client().identity_group_manager
for m_group in m_groups:
for sc_group in sc_groups:
@ -179,10 +196,10 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update group {} request failed for {}: {}."
.format(sc_group.id,
self.region_name, str(e)))
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
sc_client = self.get_dbs_client(
self.region_name).identity_group_manager
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
sc_client = self.get_sc_dbs_client().identity_group_manager
group_ref = sc_client.update_group(sc_group.id,
group_records)
@ -194,8 +211,8 @@ class IdentitySyncThread(SyncThread):
def _initial_sync_projects(self, m_projects, sc_projects):
# Particularly sync projects with same name but different ID.
m_client = self.get_dbs_client(self.master_region_name).project_manager
sc_client = self.get_dbs_client(self.region_name).project_manager
m_client = self.get_master_dbs_client().project_manager
sc_client = self.get_sc_dbs_client().project_manager
for m_project in m_projects:
for sc_project in sc_projects:
@ -218,10 +235,10 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update project {} request failed for {}: {}."
.format(sc_project.id,
self.region_name, str(e)))
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
sc_client = self.get_dbs_client(
self.region_name).project_manager
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
sc_client = self.get_sc_dbs_client().project_manager
project_ref = sc_client.update_project(sc_project.id,
project_records)
@ -233,8 +250,8 @@ class IdentitySyncThread(SyncThread):
def _initial_sync_roles(self, m_roles, sc_roles):
# Particularly sync roles with same name but different ID
m_client = self.get_dbs_client(self.master_region_name).role_manager
sc_client = self.get_dbs_client(self.region_name).role_manager
m_client = self.get_master_dbs_client().role_manager
sc_client = self.get_sc_dbs_client().role_manager
for m_role in m_roles:
for sc_role in sc_roles:
@ -257,10 +274,10 @@ class IdentitySyncThread(SyncThread):
LOG.info("Update role {} request failed for {}: {}."
.format(sc_role.id,
self.region_name, str(e)))
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
sc_client = self.get_dbs_client(
self.region_name).role_manager
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
sc_client = self.get_sc_dbs_client().role_manager
role_ref = sc_client.update_role(sc_role.id,
role_record)
@ -429,7 +446,7 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the user just created. The records is in JSON
# format
try:
user_records = self.get_dbs_client(self.master_region_name).\
user_records = self.get_master_dbs_client().\
identity_user_manager.user_detail(user_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@ -440,8 +457,7 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the user on subcloud by pushing the DB records to subcloud
user_ref = self.get_dbs_client(
self.region_name).identity_user_manager.add_user(
user_ref = self.get_sc_dbs_client().identity_user_manager.add_user(
user_records)
if not user_ref:
LOG.error("No user data returned when creating user {} in"
@ -482,7 +498,7 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the user. The records is in JSON
# format
try:
user_records = self.get_dbs_client(self.master_region_name).\
user_records = self.get_master_dbs_client().\
identity_user_manager.user_detail(user_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@ -494,8 +510,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding user on subcloud by pushing the DB records
# to subcloud
user_ref = self.get_dbs_client(self.region_name).identity_user_manager.\
update_user(sc_user_id, user_records)
user_ref = self.get_sc_dbs_client().identity_user_manager.update_user(
sc_user_id, user_records)
if not user_ref:
LOG.error("No user data returned when updating user {} in"
" subcloud.".format(sc_user_id), extra=self.log_extra)
@ -536,9 +552,8 @@ class IdentitySyncThread(SyncThread):
user_id = user_subcloud_rsrc.subcloud_resource_id
original_user_ref = UserReferenceWrapper(id=user_id)
sc_ks_client = self.get_ks_client(self.region_name)
# Update the user in the subcloud
user_ref = sc_ks_client.users.update(
user_ref = self.get_sc_ks_client().users.update(
original_user_ref,
name=user_update_dict.pop('name', None),
domain=user_update_dict.pop('domain', None),
@ -576,8 +591,7 @@ class IdentitySyncThread(SyncThread):
# Delete the user in the subcloud
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.users.delete(original_user_ref)
self.get_sc_ks_client().users.delete(original_user_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete user: user {} not found in {}, "
"considered as deleted.".
@ -608,7 +622,7 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the group just created. The records is in JSON
# format
try:
group_records = self.get_dbs_client(self.master_region_name).\
group_records = self.get_master_dbs_client().\
identity_group_manager.group_detail(group_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@ -619,8 +633,7 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the group on subcloud by pushing the DB records to subcloud
group_ref = self.get_dbs_client(
self.region_name).identity_group_manager.add_group(
group_ref = self.get_sc_dbs_client().identity_group_manager.add_group(
group_records)
if not group_ref:
LOG.error("No group data returned when creating group {} in"
@ -661,7 +674,7 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the group. The records is in JSON
# format
try:
group_records = self.get_dbs_client(self.master_region_name).\
group_records = self.get_master_dbs_client().\
identity_group_manager.group_detail(group_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
@ -673,8 +686,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding group on subcloud by pushing the DB records
# to subcloud
group_ref = self.get_dbs_client(self.region_name).identity_group_manager.\
update_group(sc_group_id, group_records)
group_ref = self.get_sc_dbs_client().identity_group_manager.update_group(
sc_group_id, group_records)
if not group_ref:
LOG.error("No group data returned when updating group {} in"
" subcloud.".format(sc_group_id), extra=self.log_extra)
@ -715,9 +728,8 @@ class IdentitySyncThread(SyncThread):
group_id = group_subcloud_rsrc.subcloud_resource_id
original_group_ref = GroupReferenceWrapper(id=group_id)
sc_ks_client = self.get_ks_client(self.region_name)
# Update the group in the subcloud
group_ref = sc_ks_client.groups.update(
group_ref = self.get_sc_ks_client().groups.update(
original_group_ref,
name=group_update_dict.pop('name', None),
domain=group_update_dict.pop('domain', None),
@ -750,8 +762,7 @@ class IdentitySyncThread(SyncThread):
# Delete the group in the subcloud
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.groups.delete(original_group_ref)
self.get_sc_ks_client().groups.delete(original_group_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete group: group {} not found in {}, "
"considered as deleted.".
@ -782,9 +793,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the project just created.
# The records is in JSON format.
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
project_records = m_dbs_client.project_manager.\
project_detail(project_id)
project_records = self.get_master_dbs_client().\
project_manager.project_detail(project_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not project_records:
@ -794,9 +804,8 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the project on subcloud by pushing the DB records to subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
project_ref = sc_dbs_client.project_manager.\
add_project(project_records)
project_ref = self.get_sc_dbs_client().project_manager.add_project(
project_records)
if not project_ref:
LOG.error("No project data returned when creating project {} in"
" subcloud.".format(project_id), extra=self.log_extra)
@ -836,9 +845,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the project. The records is in JSON
# format
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
project_records = m_dbs_client.project_manager.\
project_detail(project_id)
project_records = self.get_master_dbs_client().\
project_manager.project_detail(project_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not project_records:
@ -849,9 +857,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding project on subcloud by pushing the DB
# records to subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
project_ref = sc_dbs_client.project_manager.\
update_project(sc_project_id, project_records)
project_ref = self.get_sc_dbs_client().project_manager.update_project(
sc_project_id, project_records)
if not project_ref:
LOG.error("No project data returned when updating project {} in"
" subcloud.".format(sc_project_id), extra=self.log_extra)
@ -892,8 +899,7 @@ class IdentitySyncThread(SyncThread):
original_proj_ref = ProjectReferenceWrapper(id=proj_id)
# Update the project in the subcloud
sc_ks_client = self.get_ks_client(self.region_name)
project_ref = sc_ks_client.projects.update(
project_ref = self.get_sc_ks_client().projects.update(
original_proj_ref,
name=project_update_dict.pop('name', None),
domain=project_update_dict.pop('domain_id', None),
@ -927,8 +933,7 @@ class IdentitySyncThread(SyncThread):
# Delete the project in the subcloud
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.projects.delete(original_proj_ref)
self.get_sc_ks_client().projects.delete(original_proj_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete project: project {} not found in {}, "
"considered as deleted.".
@ -959,9 +964,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the role just created. The records is in JSON
# format.
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
role_records = m_dbs_client.role_manager.\
role_detail(role_id)
role_records = self.get_master_dbs_client().\
role_manager.role_detail(role_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not role_records:
@ -971,8 +975,7 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
# Create the role on subcloud by pushing the DB records to subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
role_ref = sc_dbs_client.role_manager.add_role(role_records)
role_ref = self.get_sc_dbs_client().role_manager.add_role(role_records)
if not role_ref:
LOG.error("No role data returned when creating role {} in"
" subcloud.".format(role_id), extra=self.log_extra)
@ -1012,8 +1015,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the role. The records is in JSON
# format
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
role_records = m_dbs_client.role_manager.role_detail(role_id)
role_records = self.get_master_dbs_client().\
role_manager.role_detail(role_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not role_records:
@ -1024,9 +1027,8 @@ class IdentitySyncThread(SyncThread):
# Update the corresponding role on subcloud by pushing the DB records
# to subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
role_ref = sc_dbs_client.role_manager.\
update_role(sc_role_id, role_records)
role_ref = self.get_sc_dbs_client().role_manager.update_role(
sc_role_id, role_records)
if not role_ref:
LOG.error("No role data returned when updating role {} in"
" subcloud.".format(sc_role_id), extra=self.log_extra)
@ -1067,8 +1069,7 @@ class IdentitySyncThread(SyncThread):
original_role_ref = RoleReferenceWrapper(id=role_id)
# Update the role in the subcloud
sc_ks_client = self.get_ks_client(self.region_name)
role_ref = sc_ks_client.roles.update(
role_ref = self.get_sc_ks_client().roles.update(
original_role_ref,
name=role_update_dict.pop('name', None))
@ -1099,8 +1100,7 @@ class IdentitySyncThread(SyncThread):
# Delete the role in the subcloud
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.roles.delete(original_role_ref)
self.get_sc_ks_client().roles.delete(original_role_ref)
except keystone_exceptions.NotFound:
LOG.info("Delete role: role {} not found in {}, "
"considered as deleted.".
@ -1135,8 +1135,7 @@ class IdentitySyncThread(SyncThread):
# Ensure that we have already synced the project, user and role
# prior to syncing the assignment
sc_role = None
sc_ks_client = self.get_ks_client(self.region_name)
sc_role_list = sc_ks_client.roles.list()
sc_role_list = self.get_sc_ks_client().roles.list()
for role in sc_role_list:
if role.id == role_id:
sc_role = role
@ -1150,7 +1149,7 @@ class IdentitySyncThread(SyncThread):
sc_proj = None
# refresh client in case the token expires in between API calls
sc_proj_list = sc_ks_client.projects.list()
sc_proj_list = self.get_sc_ks_client().projects.list()
for proj in sc_proj_list:
if proj.id == project_id:
sc_proj = proj
@ -1163,13 +1162,13 @@ class IdentitySyncThread(SyncThread):
raise exceptions.SyncRequestFailed
sc_user = None
sc_user_list = self._get_all_users(sc_ks_client)
sc_user_list = self._get_all_users(self.get_sc_ks_client())
for user in sc_user_list:
if user.id == actor_id:
sc_user = user
break
sc_group = None
sc_group_list = self._get_all_groups(sc_ks_client)
sc_group_list = self._get_all_groups(self.get_sc_ks_client())
for group in sc_group_list:
if group.id == actor_id:
sc_group = group
@ -1183,20 +1182,20 @@ class IdentitySyncThread(SyncThread):
# Create role assignment
if sc_user:
sc_ks_client.roles.grant(
self.get_sc_ks_client().roles.grant(
sc_role,
user=sc_user,
project=sc_proj)
role_ref = sc_ks_client.role_assignments.list(
role_ref = self.get_sc_ks_client().role_assignments.list(
user=sc_user,
project=sc_proj,
role=sc_role)
elif sc_group:
sc_ks_client.roles.grant(
self.get_sc_ks_client().roles.grant(
sc_role,
group=sc_group,
project=sc_proj)
role_ref = sc_ks_client.role_assignments.list(
role_ref = self.get_sc_ks_client().role_assignments.list(
group=sc_group,
project=sc_proj,
role=sc_role)
@ -1254,8 +1253,7 @@ class IdentitySyncThread(SyncThread):
# Revoke role assignment
actor = None
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.roles.revoke(
self.get_sc_ks_client().roles.revoke(
role_id,
user=actor_id,
project=project_id)
@ -1267,8 +1265,7 @@ class IdentitySyncThread(SyncThread):
self.region_name),
extra=self.log_extra)
try:
sc_ks_client = self.get_ks_client(self.region_name)
sc_ks_client.roles.revoke(
self.get_sc_ks_client().roles.revoke(
role_id,
group=actor_id,
project=project_id)
@ -1282,12 +1279,12 @@ class IdentitySyncThread(SyncThread):
role_ref = None
if actor == 'user':
role_ref = sc_ks_client.role_assignments.list(
role_ref = self.get_sc_ks_client().role_assignments.list(
user=actor_id,
project=project_id,
role=role_id)
elif actor == 'group':
role_ref = sc_ks_client.role_assignments.list(
role_ref = self.get_sc_ks_client().role_assignments.list(
group=actor_id,
project=project_id,
role=role_id)
@ -1322,9 +1319,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the revoke event just created. The records
# is in JSON format.
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
revoke_event_records = m_dbs_client.revoke_event_manager.\
revoke_event_detail(audit_id=audit_id)
revoke_event_records = self.get_master_dbs_client().\
revoke_event_manager.revoke_event_detail(audit_id=audit_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not revoke_event_records:
@ -1336,8 +1332,7 @@ class IdentitySyncThread(SyncThread):
# Create the revoke event on subcloud by pushing the DB records to
# subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
revoke_event_ref = sc_dbs_client.revoke_event_manager.\
revoke_event_ref = self.get_sc_dbs_client().revoke_event_manager.\
add_revoke_event(revoke_event_records)
if not revoke_event_ref:
LOG.error("No token revocation event data returned when creating"
@ -1365,8 +1360,7 @@ class IdentitySyncThread(SyncThread):
# subcloud resource id is the audit_id
subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id
try:
sc_dbs_client = self.get_dbs_client(self.region_name)
sc_dbs_client.revoke_event_manager.delete_revoke_event(
self.get_sc_dbs_client().revoke_event_manager.delete_revoke_event(
audit_id=subcloud_resource_id)
except dbsync_exceptions.NotFound:
LOG.info("Delete token revocation event: event {} not found in {},"
@ -1399,9 +1393,8 @@ class IdentitySyncThread(SyncThread):
# Retrieve DB records of the revoke event just created. The records
# is in JSON format.
try:
m_dbs_client = self.get_dbs_client(self.master_region_name)
revoke_event_records = m_dbs_client.revoke_event_manager.\
revoke_event_detail(user_id=event_id)
revoke_event_records = self.get_master_dbs_client().\
revoke_event_manager.revoke_event_detail(user_id=event_id)
except dbsync_exceptions.Unauthorized:
raise dbsync_exceptions.UnauthorizedMaster
if not revoke_event_records:
@ -1413,8 +1406,7 @@ class IdentitySyncThread(SyncThread):
# Create the revoke event on subcloud by pushing the DB records to
# subcloud
sc_dbs_client = self.get_dbs_client(self.region_name)
revoke_event_ref = sc_dbs_client.revoke_event_manager.\
revoke_event_ref = self.get_sc_dbs_client().revoke_event_manager.\
add_revoke_event(revoke_event_records)
if not revoke_event_ref:
LOG.error("No token revocation event data returned when creating"
@ -1440,8 +1432,7 @@ class IdentitySyncThread(SyncThread):
# subcloud resource id is <user_id>_<issued_before> encoded in base64
subcloud_resource_id = revoke_event_subcloud_rsrc.subcloud_resource_id
try:
sc_dbs_client = self.get_dbs_client(self.region_name)
sc_dbs_client.revoke_event_manager.delete_revoke_event(
self.get_sc_dbs_client().revoke_event_manager.delete_revoke_event(
user_id=subcloud_resource_id)
except dbsync_exceptions.NotFound:
LOG.info("Delete token revocation event: event {} not found in {},"
@ -1920,18 +1911,19 @@ class IdentitySyncThread(SyncThread):
try:
return self._get_resource_audit_handler(
resource_type,
self.get_dbs_client(self.master_region_name))
self.get_master_dbs_client())
except dbsync_exceptions.Unauthorized as e:
LOG.info("Get master resource [{}] request failed for {}: {}."
.format(resource_type,
dccommon_consts.VIRTUAL_MASTER_CLOUD,
str(e)), extra=self.log_extra)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.master_region_name)
sdk.OptimizedOpenStackDriver.delete_region_clients(
self.master_region_name)
# Retry will get a new token
return self._get_resource_audit_handler(
resource_type,
self.get_dbs_client(self.master_region_name))
self.get_master_dbs_client())
except Exception as e:
LOG.exception(e)
return None
@ -1939,18 +1931,19 @@ class IdentitySyncThread(SyncThread):
try:
return self._get_resource_audit_handler(
resource_type,
self.get_ks_client(self.master_region_name))
self.get_master_ks_client())
except keystone_exceptions.Unauthorized as e:
LOG.info("Get master resource [{}] request failed for {}: {}."
.format(resource_type,
dccommon_consts.VIRTUAL_MASTER_CLOUD,
str(e)), extra=self.log_extra)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.master_region_name)
sdk.OptimizedOpenStackDriver.delete_region_clients(
self.master_region_name)
# Retry with get a new token
return self._get_resource_audit_handler(
resource_type,
self.get_ks_client(self.master_region_name))
self.get_master_ks_client())
except Exception as e:
LOG.exception(e)
return None
@ -1963,34 +1956,36 @@ class IdentitySyncThread(SyncThread):
if self.is_resource_handled_by_dbs_client(resource_type):
try:
return self._get_resource_audit_handler(
resource_type, self.get_dbs_client(self.region_name))
resource_type, self.get_sc_dbs_client())
except dbsync_exceptions.Unauthorized as e:
LOG.info("Get subcloud resource [{}] request failed for {}: {}."
.format(resource_type,
self.region_name,
str(e)), extra=self.log_extra)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
# Retry with re-authenticated dbsync client
return self._get_resource_audit_handler(
resource_type, self.get_dbs_client(self.region_name))
resource_type, self.get_sc_dbs_client())
except Exception as e:
LOG.exception(e)
return None
else:
try:
return self._get_resource_audit_handler(
resource_type, self.get_ks_client(self.region_name))
resource_type, self.get_sc_ks_client())
except keystone_exceptions.Unauthorized as e:
LOG.info("Get subcloud resource [{}] request failed for {}: {}."
.format(resource_type,
self.region_name,
str(e)), extra=self.log_extra)
# Clear the cache so that the old token will not be validated
sdk.OpenStackDriver.delete_region_clients(self.region_name)
# Recreate the subcloud clients so that the token will be
# refreshed
self.initialize_sc_clients()
# Retry with re-authenticated ks client
return self._get_resource_audit_handler(
resource_type, self.get_ks_client(self.region_name))
resource_type, self.get_sc_ks_client())
except Exception as e:
LOG.exception(e)
return None

View File

@ -25,6 +25,8 @@ from oslo_utils import timeutils
from dccommon import consts as dccommon_consts
from dccommon.drivers.openstack import sdk_platform as sdk
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
from dccommon.endpoint_cache import build_subcloud_endpoint
from dccommon import exceptions as dccommon_exceptions
from dcorch.common import consts
from dcorch.common import exceptions
@ -33,6 +35,7 @@ from dcorch.engine.fernet_key_manager import FERNET_REPO_MASTER_ID
from dcorch.engine.fernet_key_manager import FernetKeyManager
from dcorch.engine.sync_thread import AUDIT_RESOURCE_EXTRA
from dcorch.engine.sync_thread import AUDIT_RESOURCE_MISSING
from dcorch.engine.sync_thread import get_os_client
from dcorch.engine.sync_thread import SyncThread
LOG = logging.getLogger(__name__)
@ -54,9 +57,11 @@ class SysinvSyncThread(SyncThread):
SYNC_CERTIFICATES = ["ssl_ca", "openstack_ca"]
def __init__(self, subcloud_name, endpoint_type=None, engine_id=None):
def __init__(self, subcloud_name, endpoint_type=None, management_ip=None,
engine_id=None):
super(SysinvSyncThread, self).__init__(subcloud_name,
endpoint_type=endpoint_type,
management_ip=management_ip,
engine_id=engine_id)
if not self.endpoint_type:
self.endpoint_type = dccommon_consts.ENDPOINT_TYPE_PLATFORM
@ -81,19 +86,37 @@ class SysinvSyncThread(SyncThread):
consts.RESOURCE_TYPE_SYSINV_FERNET_REPO,
]
self.sc_sysinv_client = None
LOG.info("SysinvSyncThread initialized", extra=self.log_extra)
def initialize_sc_clients(self):
super().initialize_sc_clients()
sc_sysinv_url = build_subcloud_endpoint(self.management_ip, 'sysinv')
LOG.debug(f"Built sc_sysinv_url {sc_sysinv_url} for subcloud "
f"{self.subcloud_name}")
self.sc_sysinv_client = SysinvClient(
region=self.subcloud_name,
session=self.sc_admin_session,
endpoint=sc_sysinv_url)
def get_master_sysinv_client(self):
return get_os_client(self.master_region_name, ['sysinv']).sysinv_client
def get_sc_sysinv_client(self):
if self.sc_sysinv_client is None:
self.initialize_sc_clients()
return self.sc_sysinv_client
def sync_platform_resource(self, request, rsrc):
try:
s_os_client = sdk.OpenStackDriver(
region_name=self.region_name,
thread_name='sync',
region_clients=["sysinv"])
# invoke the sync method for the requested resource_type
# I.e. sync_idns
s_func_name = "sync_" + rsrc.resource_type
LOG.info("Obj:%s, func:%s" % (type(self), s_func_name))
getattr(self, s_func_name)(s_os_client, request, rsrc)
getattr(self, s_func_name)(self.get_sc_sysinv_client(), request, rsrc)
except AttributeError:
LOG.error("{} not implemented for {}"
.format(request.orch_job.operation_type,
@ -121,16 +144,16 @@ class SysinvSyncThread(SyncThread):
LOG.exception(e)
raise exceptions.SyncRequestFailedRetry
def update_dns(self, s_os_client, nameservers):
def update_dns(self, sysinv_client, nameservers):
try:
idns = s_os_client.sysinv_client.update_dns(nameservers)
idns = sysinv_client.update_dns(nameservers)
return idns
except (AttributeError, TypeError) as e:
LOG.info("update_dns error {}".format(e),
extra=self.log_extra)
raise exceptions.SyncRequestFailedRetry
def sync_idns(self, s_os_client, request, rsrc):
def sync_idns(self, sysinv_client, request, rsrc):
# The system is created with default dns; thus there
# is a prepopulated dns entry.
LOG.info("sync_idns resource_info={}".format(
@ -158,7 +181,7 @@ class SysinvSyncThread(SyncThread):
extra=self.log_extra)
nameservers = ""
idns = self.update_dns(s_os_client, nameservers)
idns = self.update_dns(sysinv_client, nameservers)
# Ensure subcloud resource is persisted to the DB for later
subcloud_rsrc_id = self.persist_db_subcloud_resource(
@ -167,11 +190,11 @@ class SysinvSyncThread(SyncThread):
.format(rsrc.id, subcloud_rsrc_id, nameservers),
extra=self.log_extra)
def update_certificate(self, s_os_client, signature,
def update_certificate(self, sysinv_client, signature,
certificate=None, data=None):
try:
icertificate = s_os_client.sysinv_client.update_certificate(
icertificate = sysinv_client.update_certificate(
signature, certificate=certificate, data=data)
return icertificate
except (AttributeError, TypeError) as e:
@ -210,7 +233,7 @@ class SysinvSyncThread(SyncThread):
metadata))
return certificate, metadata
def create_certificate(self, s_os_client, request, rsrc):
def create_certificate(self, sysinv_client, request, rsrc):
LOG.info("create_certificate resource_info={}".format(
request.orch_job.resource_info),
extra=self.log_extra)
@ -249,7 +272,7 @@ class SysinvSyncThread(SyncThread):
signature = rsrc.master_id
if signature and signature != self.CERTIFICATE_SIG_NULL:
icertificate = self.update_certificate(
s_os_client,
sysinv_client,
signature,
certificate=certificate,
data=metadata)
@ -269,13 +292,13 @@ class SysinvSyncThread(SyncThread):
sub_certs_updated),
extra=self.log_extra)
def delete_certificate(self, s_os_client, request, rsrc):
def delete_certificate(self, sysinv_client, request, rsrc):
subcloud_rsrc = self.get_db_subcloud_resource(rsrc.id)
if not subcloud_rsrc:
return
try:
certificates = self.get_certificates_resources(s_os_client)
certificates = self.get_certificates_resources(sysinv_client)
cert_to_delete = None
for certificate in certificates:
if certificate.signature == subcloud_rsrc.subcloud_resource_id:
@ -285,7 +308,7 @@ class SysinvSyncThread(SyncThread):
raise dccommon_exceptions.CertificateNotFound(
region_name=self.region_name,
signature=subcloud_rsrc.subcloud_resource_id)
s_os_client.sysinv_client.delete_certificate(cert_to_delete)
sysinv_client.delete_certificate(cert_to_delete)
except dccommon_exceptions.CertificateNotFound:
# Certificate already deleted in subcloud, carry on.
LOG.info("Certificate not in subcloud, may be already deleted",
@ -303,7 +326,7 @@ class SysinvSyncThread(SyncThread):
subcloud_rsrc.subcloud_resource_id),
extra=self.log_extra)
def sync_certificates(self, s_os_client, request, rsrc):
def sync_certificates(self, sysinv_client, request, rsrc):
switcher = {
consts.OPERATION_TYPE_POST: self.create_certificate,
consts.OPERATION_TYPE_CREATE: self.create_certificate,
@ -312,7 +335,7 @@ class SysinvSyncThread(SyncThread):
func = switcher[request.orch_job.operation_type]
try:
func(s_os_client, request, rsrc)
func(sysinv_client, request, rsrc)
except (keystone_exceptions.connection.ConnectTimeout,
keystone_exceptions.ConnectFailure) as e:
LOG.info("sync_certificates: subcloud {} is not reachable [{}]"
@ -326,23 +349,23 @@ class SysinvSyncThread(SyncThread):
LOG.exception(e)
raise exceptions.SyncRequestFailedRetry
def update_user(self, s_os_client, passwd_hash,
def update_user(self, sysinv_client, passwd_hash,
root_sig, passwd_expiry_days):
LOG.info("update_user={} {} {}".format(
passwd_hash, root_sig, passwd_expiry_days),
extra=self.log_extra)
try:
iuser = s_os_client.sysinv_client.update_user(passwd_hash,
root_sig,
passwd_expiry_days)
iuser = sysinv_client.update_user(passwd_hash,
root_sig,
passwd_expiry_days)
return iuser
except (AttributeError, TypeError) as e:
LOG.info("update_user error {} region_name".format(e),
extra=self.log_extra)
raise exceptions.SyncRequestFailedRetry
def sync_iuser(self, s_os_client, request, rsrc):
def sync_iuser(self, sysinv_client, request, rsrc):
# The system is populated with user entry for sysadmin.
LOG.info("sync_user resource_info={}".format(
request.orch_job.resource_info),
@ -375,7 +398,7 @@ class SysinvSyncThread(SyncThread):
extra=self.log_extra)
return
iuser = self.update_user(s_os_client, passwd_hash, root_sig,
iuser = self.update_user(sysinv_client, passwd_hash, root_sig,
passwd_expiry_days)
# Ensure subcloud resource is persisted to the DB for later
@ -385,7 +408,7 @@ class SysinvSyncThread(SyncThread):
.format(rsrc.id, subcloud_rsrc_id, passwd_hash),
extra=self.log_extra)
def sync_fernet_repo(self, s_os_client, request, rsrc):
def sync_fernet_repo(self, sysinv_client, request, rsrc):
switcher = {
consts.OPERATION_TYPE_PUT: self.update_fernet_repo,
consts.OPERATION_TYPE_PATCH: self.update_fernet_repo,
@ -394,7 +417,7 @@ class SysinvSyncThread(SyncThread):
func = switcher[request.orch_job.operation_type]
try:
func(s_os_client, request, rsrc)
func(sysinv_client, request, rsrc)
except (keystone_exceptions.connection.ConnectTimeout,
keystone_exceptions.ConnectFailure) as e:
LOG.info("sync_fernet_resources: subcloud {} is not reachable [{}]"
@ -405,7 +428,7 @@ class SysinvSyncThread(SyncThread):
LOG.exception(e)
raise exceptions.SyncRequestFailedRetry
def create_fernet_repo(self, s_os_client, request, rsrc):
def create_fernet_repo(self, sysinv_client, request, rsrc):
LOG.info("create_fernet_repo region {} resource_info={}".format(
self.region_name,
request.orch_job.resource_info),
@ -413,7 +436,7 @@ class SysinvSyncThread(SyncThread):
resource_info = jsonutils.loads(request.orch_job.resource_info)
try:
s_os_client.sysinv_client.post_fernet_repo(
sysinv_client.post_fernet_repo(
FernetKeyManager.from_resource_info(resource_info))
# Ensure subcloud resource is persisted to the DB for later
subcloud_rsrc_id = self.persist_db_subcloud_resource(
@ -427,7 +450,7 @@ class SysinvSyncThread(SyncThread):
subcloud_rsrc_id, resource_info),
extra=self.log_extra)
def update_fernet_repo(self, s_os_client, request, rsrc):
def update_fernet_repo(self, sysinv_client, request, rsrc):
LOG.info("update_fernet_repo region {} resource_info={}".format(
self.region_name,
request.orch_job.resource_info),
@ -435,7 +458,7 @@ class SysinvSyncThread(SyncThread):
resource_info = jsonutils.loads(request.orch_job.resource_info)
try:
s_os_client.sysinv_client.put_fernet_repo(
sysinv_client.put_fernet_repo(
FernetKeyManager.from_resource_info(resource_info))
# Ensure subcloud resource is persisted to the DB for later
subcloud_rsrc_id = self.persist_db_subcloud_resource(
@ -454,18 +477,15 @@ class SysinvSyncThread(SyncThread):
LOG.debug("get_master_resources thread:{}".format(
threading.currentThread().getName()), extra=self.log_extra)
try:
os_client = sdk.OpenStackDriver(
region_name=dccommon_consts.CLOUD_0,
thread_name='audit',
region_clients=["sysinv"])
if resource_type == consts.RESOURCE_TYPE_SYSINV_DNS:
return [self.get_dns_resource(os_client)]
return [self.get_dns_resource(self.get_master_sysinv_client())]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_CERTIFICATE:
return self.get_certificates_resources(os_client)
return self.get_certificates_resources(
self.get_master_sysinv_client())
elif resource_type == consts.RESOURCE_TYPE_SYSINV_USER:
return [self.get_user_resource(os_client)]
return [self.get_user_resource(self.get_master_sysinv_client())]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_FERNET_REPO:
return [self.get_fernet_resources(os_client)]
return [self.get_fernet_resources(self.get_master_sysinv_client())]
else:
LOG.error("Wrong resource type {}".format(resource_type),
extra=self.log_extra)
@ -478,17 +498,14 @@ class SysinvSyncThread(SyncThread):
LOG.debug("get_subcloud_resources thread:{}".format(
threading.currentThread().getName()), extra=self.log_extra)
try:
os_client = sdk.OpenStackDriver(region_name=self.region_name,
thread_name='audit',
region_clients=["sysinv"])
if resource_type == consts.RESOURCE_TYPE_SYSINV_DNS:
return [self.get_dns_resource(os_client)]
return [self.get_dns_resource(self.get_sc_sysinv_client())]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_CERTIFICATE:
return self.get_certificates_resources(os_client)
return self.get_certificates_resources(self.get_sc_sysinv_client())
elif resource_type == consts.RESOURCE_TYPE_SYSINV_USER:
return [self.get_user_resource(os_client)]
return [self.get_user_resource(self.get_sc_sysinv_client())]
elif resource_type == consts.RESOURCE_TYPE_SYSINV_FERNET_REPO:
return [self.get_fernet_resources(os_client)]
return [self.get_fernet_resources(self.get_sc_sysinv_client())]
else:
LOG.error("Wrong resource type {}".format(resource_type),
extra=self.log_extra)
@ -525,11 +542,11 @@ class SysinvSyncThread(SyncThread):
sdk.OpenStackDriver.delete_region_clients_for_thread(
dccommon_consts.CLOUD_0, 'audit')
def get_dns_resource(self, os_client):
return os_client.sysinv_client.get_dns()
def get_dns_resource(self, sysinv_client):
return sysinv_client.get_dns()
def get_certificates_resources(self, os_client):
certificate_list = os_client.sysinv_client.get_certificates()
def get_certificates_resources(self, sysinv_client):
certificate_list = sysinv_client.get_certificates()
# Only sync the specified certificates to subclouds
filtered_list = [certificate
for certificate in certificate_list
@ -537,11 +554,11 @@ class SysinvSyncThread(SyncThread):
self.SYNC_CERTIFICATES]
return filtered_list
def get_user_resource(self, os_client):
return os_client.sysinv_client.get_user()
def get_user_resource(self, sysinv_client):
return sysinv_client.get_user()
def get_fernet_resources(self, os_client):
keys = os_client.sysinv_client.get_fernet_keys()
def get_fernet_resources(self, sysinv_client):
keys = sysinv_client.get_fernet_keys()
return FernetKeyManager.to_resource_info(keys)
def get_resource_id(self, resource_type, resource):

View File

@ -23,7 +23,9 @@ from oslo_log import log as logging
from oslo_utils import timeutils
from dccommon import consts as dccommon_consts
from dccommon.endpoint_cache import EndpointCache
from dccommon.drivers.openstack import sdk_platform as sdk
from dccommon.endpoint_cache import build_subcloud_endpoint
from dccommon.endpoint_cache import OptimizedEndpointCache
from dcdbsync.dbsyncclient import client as dbsyncclient
from dcmanager.rpc import client as dcmanager_rpc_client
from dcorch.common import consts
@ -62,6 +64,21 @@ AUDIT_RESOURCE_EXTRA = 'extra_resource'
AUDIT_LOCK_NAME = 'dcorch-audit'
def get_os_client(region, region_clients):
# Used by the master clients only. The subcloud clients don't need to be
# cached in the openstack driver, because we don't want to hold the admin
# sessions for the subclouds.
try:
os_client = sdk.OptimizedOpenStackDriver(
region_name=region,
region_clients=region_clients)
except Exception as e:
LOG.error(
f"Failed to get os_client for {region}/{region_clients}: {e}.")
raise e
return os_client
class SyncThread(object):
"""Manages tasks related to resource management."""
@ -75,10 +92,11 @@ class SyncThread(object):
# used by the audit to cache the master resources
master_resources_dict = collections.defaultdict(dict)
def __init__(self, subcloud_name, endpoint_type=None, engine_id=None):
super(SyncThread, self).__init__()
def __init__(self, subcloud_name, endpoint_type=None, management_ip=None,
engine_id=None):
self.endpoint_type = endpoint_type # endpoint type
self.subcloud_name = subcloud_name # subcloud name
self.management_ip = management_ip
self.engine_id = engine_id
self.ctxt = context.get_admin_context()
self.sync_handler_map = {}
@ -91,6 +109,7 @@ class SyncThread(object):
self.dcmanager_rpc_client = dcmanager_rpc_client.ManagerClient()
self.sc_admin_session = None
self.sc_auth_url = None
self.admin_session = None
self.ks_client = None
self.dbs_client = None
@ -129,7 +148,7 @@ class SyncThread(object):
if self.endpoint_type in dccommon_consts.ENDPOINT_TYPES_LIST:
config = cfg.CONF.endpoint_cache
self.admin_session = EndpointCache.get_admin_session(
self.admin_session = OptimizedEndpointCache.get_admin_session(
config.auth_uri,
config.username,
config.user_domain_name,
@ -139,7 +158,7 @@ class SyncThread(object):
timeout=60)
elif self.endpoint_type in dccommon_consts.ENDPOINT_TYPES_LIST_OS:
config = cfg.CONF.openstack_cache
self.admin_session = EndpointCache.get_admin_session(
self.admin_session = OptimizedEndpointCache.get_admin_session(
config.auth_uri,
config.admin_username,
config.admin_user_domain_name,
@ -167,27 +186,16 @@ class SyncThread(object):
# The specific SyncThread subclasses may extend this
if (not self.sc_admin_session):
# Subclouds will use token from the Subcloud specific Keystone,
# so define a session against that subcloud's identity
identity_service = self.ks_client.services.list(
name='keystone', type='identity')
sc_auth_url = self.ks_client.endpoints.list(
service=identity_service[0].id,
interface=dccommon_consts.KS_ENDPOINT_ADMIN,
region=self.subcloud_name)
try:
LOG.info("Found sc_auth_url: {}".format(sc_auth_url))
sc_auth_url = sc_auth_url[0].url
except IndexError:
# It may happen that this subcloud was not managed
LOG.info("Cannot find identity auth_url",
extra=self.log_extra)
return
# so define a session against that subcloud's keystone endpoint
self.sc_auth_url = build_subcloud_endpoint(
self.management_ip, 'keystone')
LOG.debug(f"Built sc_auth_url {self.sc_auth_url} for subcloud "
f"{self.subcloud_name}")
config = None
if self.endpoint_type in dccommon_consts.ENDPOINT_TYPES_LIST:
config = cfg.CONF.endpoint_cache
self.sc_admin_session = EndpointCache.get_admin_session(
sc_auth_url,
self.sc_admin_session = OptimizedEndpointCache.get_admin_session(
self.sc_auth_url,
config.username,
config.user_domain_name,
config.password,
@ -196,8 +204,8 @@ class SyncThread(object):
timeout=60)
elif self.endpoint_type in dccommon_consts.ENDPOINT_TYPES_LIST_OS:
config = cfg.CONF.openstack_cache
self.sc_admin_session = EndpointCache.get_admin_session(
sc_auth_url,
self.sc_admin_session = OptimizedEndpointCache.get_admin_session(
self.sc_auth_url,
config.admin_username,
config.admin_user_domain_name,
config.admin_password,

View File

@ -108,11 +108,38 @@ class OrchestratorTestCase(base.BaseTestCase):
self.addCleanup(mock_patch.stop)
def _mock_openstack_driver(self):
mock_patch = \
mock.patch('dccommon.drivers.openstack.sdk_platform.OpenStackDriver')
mock_patch = mock.patch(
'dccommon.drivers.openstack.sdk_platform.OptimizedOpenStackDriver')
self.mock_openstack_driver = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_keystone_client(self):
mock_patch = mock.patch('keystoneclient.client.Client')
self.mock_keystone_client = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_endpoint_cache_from_keystone(self):
mock_patch = mock.patch(
'dccommon.drivers.openstack.keystone_v3.OptimizedEndpointCache')
self.mock_endpoint_cache_from_keystone = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_endpoint_cache(self):
mock_patch = mock.patch(
'dccommon.endpoint_cache.OptimizedEndpointCache')
self.mock_endpoint_cache = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_m_dbs_client(self):
mock_patch = mock.patch('dcorch.engine.sync_thread.dbsyncclient.Client')
self.mock_m_dbs_client = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_sc_dbs_client(self):
mock_patch = mock.patch('dcorch.engine.sync_services.identity.Client')
self.mock_sc_dbs_client = mock_patch.start()
self.addCleanup(mock_patch.stop)
def _mock_sysinv_client(self, target):
mock_patch = mock.patch.object(target, 'SysinvClient')
self.mock_sysinv_client = mock_patch.start()

View File

@ -29,6 +29,11 @@ class BaseTestIdentitySyncThread(OrchestratorTestCase, mixins.BaseMixin):
super().setUp()
self._mock_openstack_driver()
self._mock_keystone_client()
self._mock_endpoint_cache_from_keystone()
self._mock_endpoint_cache()
self._mock_m_dbs_client()
self._mock_sc_dbs_client()
self._mock_rpc_client_subcloud_state_client()
self._mock_rpc_client_manager()
self._mock_log(identity_service)
@ -37,7 +42,7 @@ class BaseTestIdentitySyncThread(OrchestratorTestCase, mixins.BaseMixin):
self._create_subcloud_and_subcloud_resource()
self.identity_sync_thread = identity_service.IdentitySyncThread(
self.subcloud.region_name
self.subcloud.region_name, management_ip=self.subcloud.management_ip
)
self.method = lambda *args: None
@ -146,7 +151,7 @@ class BaseTestIdentitySyncThreadUsers(BaseTestIdentitySyncThread):
'local_user': {'name': 'fake value'}
}
self.resource_ref_name = self.resource_ref.get('local_user').get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
identity_user_manager.user_detail
@ -159,7 +164,7 @@ class TestIdentitySyncThreadUsersPost(
super().setUp()
self.method = self.identity_sync_thread.post_users
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
identity_user_manager.add_user
@ -172,7 +177,7 @@ class TestIdentitySyncThreadUsersPut(
super().setUp()
self.method = self.identity_sync_thread.put_users
self.resource_update = self.mock_openstack_driver().dbsync_client.\
self.resource_update = self.identity_sync_thread.get_sc_dbs_client().\
identity_user_manager.update_user
@ -186,8 +191,8 @@ class TestIdentitySyncThreadUsersPatch(
self.method = self.identity_sync_thread.patch_users
self.request.orch_job.resource_info = f'{{"{self.resource_name}": {{}}}}'
self.resource_keystone_update = self.mock_openstack_driver().\
keystone_client.keystone_client.users.update
self.resource_keystone_update = self.identity_sync_thread.\
get_sc_ks_client().users.update
class TestIdentitySyncThreadUsersDelete(
@ -199,8 +204,8 @@ class TestIdentitySyncThreadUsersDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_users
self.resource_keystone_delete = self.mock_openstack_driver().\
keystone_client.keystone_client.users.delete
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_ks_client().users.delete
class BaseTestIdentitySyncThreadGroups(BaseTestIdentitySyncThread):
@ -214,7 +219,7 @@ class BaseTestIdentitySyncThreadGroups(BaseTestIdentitySyncThread):
{self.resource_name: {'id': RESOURCE_ID, 'name': 'fake value'}}
self.resource_ref_name = \
self.resource_ref.get(self.resource_name).get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
identity_group_manager.group_detail
@ -227,7 +232,7 @@ class TestIdentitySyncThreadGroupsPost(
super().setUp()
self.method = self.identity_sync_thread.post_groups
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
identity_group_manager.add_group
@ -240,7 +245,7 @@ class TestIdentitySyncThreadGroupsPut(
super().setUp()
self.method = self.identity_sync_thread.put_groups
self.resource_update = self.mock_openstack_driver().dbsync_client.\
self.resource_update = self.identity_sync_thread.get_sc_dbs_client().\
identity_group_manager.update_group
@ -254,8 +259,8 @@ class TestIdentitySyncThreadGroupsPatch(
self.method = self.identity_sync_thread.patch_groups
self.request.orch_job.resource_info = f'{{"{self.resource_name}": {{}}}}'
self.resource_keystone_update = self.mock_openstack_driver().\
keystone_client.keystone_client.groups.update
self.resource_keystone_update = self.identity_sync_thread.\
get_sc_ks_client().groups.update
class TestIdentitySyncThreadGroupsDelete(
@ -267,8 +272,8 @@ class TestIdentitySyncThreadGroupsDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_groups
self.resource_keystone_delete = self.mock_openstack_driver().\
keystone_client.keystone_client.groups.delete
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_ks_client().groups.delete
class BaseTestIdentitySyncThreadProjects(BaseTestIdentitySyncThread):
@ -283,7 +288,7 @@ class BaseTestIdentitySyncThreadProjects(BaseTestIdentitySyncThread):
}
self.resource_ref_name = \
self.resource_ref.get(self.resource_name).get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
project_manager.project_detail
@ -296,7 +301,7 @@ class TestIdentitySyncThreadProjectsPost(
super().setUp()
self.method = self.identity_sync_thread.post_projects
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
project_manager.add_project
@ -309,7 +314,7 @@ class TestIdentitySyncThreadProjectsPut(
super().setUp()
self.method = self.identity_sync_thread.put_projects
self.resource_update = self.mock_openstack_driver().dbsync_client.\
self.resource_update = self.identity_sync_thread.get_sc_dbs_client().\
project_manager.update_project
@ -323,8 +328,8 @@ class TestIdentitySyncThreadProjectsPatch(
self.method = self.identity_sync_thread.patch_projects
self.request.orch_job.resource_info = f'{{"{self.resource_name}": {{}}}}'
self.resource_keystone_update = self.mock_openstack_driver().\
keystone_client.keystone_client.projects.update
self.resource_keystone_update = self.identity_sync_thread.\
get_sc_ks_client().projects.update
class TestIdentitySyncThreadProjectsDelete(
@ -336,8 +341,8 @@ class TestIdentitySyncThreadProjectsDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_projects
self.resource_keystone_delete = self.mock_openstack_driver().\
keystone_client.keystone_client.projects.delete
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_ks_client().projects.delete
class BaseTestIdentitySyncThreadRoles(BaseTestIdentitySyncThread):
@ -352,7 +357,7 @@ class BaseTestIdentitySyncThreadRoles(BaseTestIdentitySyncThread):
}
self.resource_ref_name = \
self.resource_ref.get(self.resource_name).get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
role_manager.role_detail
@ -365,7 +370,7 @@ class TestIdentitySyncThreadRolesPost(
super().setUp()
self.method = self.identity_sync_thread.post_roles
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
role_manager.add_role
@ -378,7 +383,7 @@ class TestIdentitySyncThreadRolesPut(
super().setUp()
self.method = self.identity_sync_thread.put_roles
self.resource_update = self.mock_openstack_driver().dbsync_client.\
self.resource_update = self.identity_sync_thread.get_sc_dbs_client().\
role_manager.update_role
@ -392,8 +397,8 @@ class TestIdentitySyncThreadRolesPatch(
self.method = self.identity_sync_thread.patch_roles
self.request.orch_job.resource_info = f'{{"{self.resource_name}": {{}}}}'
self.resource_keystone_update = self.mock_openstack_driver().\
keystone_client.keystone_client.roles.update
self.resource_keystone_update = self.identity_sync_thread.\
get_sc_ks_client().roles.update
class TestIdentitySyncThreadRolesDelete(
@ -405,8 +410,8 @@ class TestIdentitySyncThreadRolesDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_roles
self.resource_keystone_delete = self.mock_openstack_driver().\
keystone_client.keystone_client.roles.delete
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_ks_client().roles.delete
class BaseTestIdentitySyncThreadProjectRoleAssignments(BaseTestIdentitySyncThread):
@ -435,13 +440,13 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
self.rsrc.master_id = self.resource_tags
self.mock_sc_role = self._create_mock_object(self.role_id)
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
roles.list.return_value = [self.mock_sc_role]
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
projects.list.return_value = [self._create_mock_object(self.project_id)]
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
domains.list.return_value = [self._create_mock_object(self.project_id)]
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
users.list.return_value = [self._create_mock_object(self.actor_id)]
def _create_mock_object(self, id):
@ -462,9 +467,9 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
def test_post_succeeds_with_sc_group(self):
"""Test post succeeds with sc group"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
users.list.return_value = []
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
groups.list.return_value = [self._create_mock_object(self.actor_id)]
self._execute()
@ -487,7 +492,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
def test_post_fails_without_sc_role(self):
"""Test post fails without sc role"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
roles.list.return_value = []
self._execute_and_assert_exception(exceptions.SyncRequestFailed)
@ -500,7 +505,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
def test_post_fails_without_sc_proj(self):
"""Test post fails without sc proj"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
projects.list.return_value = []
self._execute_and_assert_exception(exceptions.SyncRequestFailed)
@ -513,7 +518,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
def test_post_fails_wihtout_sc_user_and_sc_group(self):
"""Test post fails without sc user and sc group"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
users.list.return_value = []
self._execute_and_assert_exception(exceptions.SyncRequestFailed)
@ -526,7 +531,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsPost(
def test_post_fails_without_role_ref(self):
"""Test post fails without role ref"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
role_assignments.list.return_value = []
self._execute_and_assert_exception(exceptions.SyncRequestFailed)
@ -576,7 +581,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsDelete(
def test_delete_succeeds(self):
"""Test delete succeeds"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
role_assignments.list.return_value = []
self._execute()
@ -615,9 +620,9 @@ class TestIdentitySyncThreadProjectRoleAssignmentsDelete(
def test_delete_for_user_succeeds_with_keystone_not_found_exception(self):
"""Test delete fails for user with keystone not found exception"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
roles.revoke.side_effect = [keystone_exceptions.NotFound, None]
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
role_assignments.list.return_value = []
self._execute()
@ -639,7 +644,7 @@ class TestIdentitySyncThreadProjectRoleAssignmentsDelete(
def test_delete_for_group_succeeds_with_keystone_not_found_exception(self):
"""Test delete fails for group with keystone not found exception"""
self.mock_openstack_driver().keystone_client.keystone_client.\
self.identity_sync_thread.get_sc_ks_client().\
roles.revoke.side_effect = keystone_exceptions.NotFound
self._execute()
@ -681,7 +686,7 @@ class BaseTestIdentitySyncThreadRevokeEvents(BaseTestIdentitySyncThread):
}
self.resource_ref_name = \
self.resource_ref.get('revocation_event').get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
revoke_event_manager.revoke_event_detail
@ -696,7 +701,7 @@ class BaseTestIdentitySyncThreadRevokeEventsPost(
self.resource_info = {"token_revoke_event": {"audit_id": RESOURCE_ID}}
self.request.orch_job.resource_info = jsonutils.dumps(self.resource_info)
self.method = self.identity_sync_thread.post_revoke_events
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
revoke_event_manager.add_revoke_event
def test_post_succeeds(self):
@ -762,8 +767,8 @@ class BaseTestIdentitySyncThreadRevokeEventsDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_revoke_events
self.resource_keystone_delete = self.mock_openstack_driver().dbsync_client.\
revoke_event_manager.delete_revoke_event
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_dbs_client().revoke_event_manager.delete_revoke_event
def test_delete_succeeds_with_keystone_not_found_exception(self):
"""Test delete succeeds with keystone's not found exception
@ -810,7 +815,7 @@ class BaseTestIdentitySyncThreadRevokeEventsForUser(BaseTestIdentitySyncThread):
}
self.resource_ref_name = \
self.resource_ref.get('revocation_event').get('name')
self.resource_detail = self.mock_openstack_driver().dbsync_client.\
self.resource_detail = self.identity_sync_thread.get_master_dbs_client().\
revoke_event_manager.revoke_event_detail
@ -823,7 +828,7 @@ class TestIdentitySyncThreadRevokeEventsForUserPost(
super().setUp()
self.method = self.identity_sync_thread.post_revoke_events_for_user
self.resource_add = self.mock_openstack_driver().dbsync_client.\
self.resource_add = self.identity_sync_thread.get_sc_dbs_client().\
revoke_event_manager.add_revoke_event
def test_post_succeeds(self):
@ -888,8 +893,8 @@ class TestIdentitySyncThreadRevokeEventsForUserDelete(
super().setUp()
self.method = self.identity_sync_thread.delete_revoke_events_for_user
self.resource_keystone_delete = self.mock_openstack_driver().dbsync_client.\
revoke_event_manager.delete_revoke_event
self.resource_keystone_delete = self.identity_sync_thread.\
get_sc_dbs_client().revoke_event_manager.delete_revoke_event
def test_delete_succeeds_with_keystone_not_found_exception(self):
"""Test delete succeeds with keystone's not found exception

View File

@ -113,21 +113,24 @@ class TestGenericSyncManager(base.OrchestratorTestCase):
name='subcloud' + str(i),
management_state=dccommon_consts.MANAGEMENT_MANAGED,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED)
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED,
management_ip='10.10.10.' + str(i))
utils.create_subcloud_sync_static(
self.ctx,
name='subcloud' + str(i),
endpoint_type=dccommon_consts.ENDPOINT_TYPE_IDENTITY,
sync_request='requested')
subcloud_sync_list.append((subcloud.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY))
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
subcloud.management_ip))
utils.create_subcloud_sync_static(
self.ctx,
name='subcloud' + str(i),
endpoint_type=dccommon_consts.ENDPOINT_TYPE_PLATFORM,
sync_request='requested')
subcloud_sync_list.append((subcloud.region_name,
dccommon_consts.ENDPOINT_TYPE_PLATFORM))
dccommon_consts.ENDPOINT_TYPE_PLATFORM,
subcloud.management_ip))
gsm = generic_sync_manager.GenericSyncManager()
gsm._process_subclouds = mock.MagicMock()
@ -193,7 +196,8 @@ class TestGenericSyncManager(base.OrchestratorTestCase):
name='subcloud' + str(i),
management_state=dccommon_consts.MANAGEMENT_MANAGED,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED)
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED,
management_ip='10.10.10.' + str(i))
last_audit_time = timeutils.utcnow() - \
timedelta(seconds=generic_sync_manager.AUDIT_INTERVAL)
utils.create_subcloud_sync_static(
@ -203,7 +207,8 @@ class TestGenericSyncManager(base.OrchestratorTestCase):
audit_status=consts.AUDIT_STATUS_COMPLETED,
last_audit_time=last_audit_time)
subcloud_sync_list.append((subcloud.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY))
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
subcloud.management_ip))
utils.create_subcloud_sync_static(
self.ctx,
name='subcloud' + str(i),
@ -211,7 +216,8 @@ class TestGenericSyncManager(base.OrchestratorTestCase):
audit_status=consts.AUDIT_STATUS_COMPLETED,
last_audit_time=last_audit_time)
subcloud_sync_list.append((subcloud.region_name,
dccommon_consts.ENDPOINT_TYPE_PLATFORM))
dccommon_consts.ENDPOINT_TYPE_PLATFORM,
subcloud.management_ip))
gsm = generic_sync_manager.GenericSyncManager()
gsm._process_subclouds = mock.MagicMock()

View File

@ -15,10 +15,10 @@ from dcorch.tests import base
from dcorch.tests import utils
SUBCLOUD_SYNC_LIST = [
('subcloud1', dccommon_consts.ENDPOINT_TYPE_IDENTITY),
('subcloud1', dccommon_consts.ENDPOINT_TYPE_PLATFORM),
('subcloud2', dccommon_consts.ENDPOINT_TYPE_IDENTITY),
('subcloud2', dccommon_consts.ENDPOINT_TYPE_PLATFORM)
('subcloud1', dccommon_consts.ENDPOINT_TYPE_IDENTITY, '192.168.1.11'),
('subcloud1', dccommon_consts.ENDPOINT_TYPE_PLATFORM, '192.168.1.11'),
('subcloud2', dccommon_consts.ENDPOINT_TYPE_IDENTITY, '192.168.1.12'),
('subcloud2', dccommon_consts.ENDPOINT_TYPE_PLATFORM, '192.168.1.12')
]
@ -56,7 +56,8 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase):
self.assertIsNotNone(self.gswm)
def test_create_sync_objects(self):
sync_objs = self.gswm.create_sync_objects('subcloud1', base.CAPABILITES)
sync_objs = self.gswm.create_sync_objects(
'subcloud1', base.CAPABILITES, '192.168.1.11')
# Verify both endpoint types have corresponding sync object
self.assertEqual(len(sync_objs), 2)
@ -107,12 +108,13 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase):
self.gswm.sync_subclouds(self.ctx, SUBCLOUD_SYNC_LIST)
# Verify 4 threads started, one for each endpoint_type of a subcloud
for subcloud_name, endpoint_type in SUBCLOUD_SYNC_LIST:
for subcloud_name, endpoint_type, ip in SUBCLOUD_SYNC_LIST:
self.mock_thread_start.assert_any_call(
self.gswm._sync_subcloud,
mock.ANY,
subcloud_name,
endpoint_type)
endpoint_type,
ip)
def test_run_sync_audit(self):
self.gswm._audit_subcloud = mock.MagicMock()
@ -120,7 +122,7 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase):
self.gswm.run_sync_audit(self.ctx, SUBCLOUD_SYNC_LIST)
# Verify 4 threads started, one for each endpoint_type of a subcloud
for subcloud_name, endpoint_type in SUBCLOUD_SYNC_LIST:
for subcloud_name, endpoint_type, ip in SUBCLOUD_SYNC_LIST:
self.mock_thread_start.assert_any_call(
self.gswm._audit_subcloud,
mock.ANY,

View File

@ -100,8 +100,10 @@ class TestInitialSyncManager(base.OrchestratorTestCase):
subcloud = utils.create_subcloud_static(
self.ctx,
name='subcloud' + str(i),
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
chunks[chunk_num][subcloud.region_name] = base.CAPABILITES
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED,
management_ip='192.168.1.' + str(i))
chunks[chunk_num][subcloud.region_name] = \
(base.CAPABILITES, subcloud.management_ip)
ism = initial_sync_manager.InitialSyncManager()

View File

@ -34,7 +34,7 @@ class FakeGSWM(object):
subcloud_name,
values={'initial_sync_state': initial_sync_state})
def create_sync_objects(self, subcloud_name, capabilities):
def create_sync_objects(self, subcloud_name, capabilities, management_ip):
sync_objs = {}
endpoint_type_list = capabilities.get('endpoint_types', None)
if endpoint_type_list:
@ -104,13 +104,15 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
subcloud = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED,
management_ip="192.168.1.11")
self.assertIsNotNone(subcloud)
# Initial sync the subcloud
self.iswm._initial_sync_subcloud(self.ctx,
subcloud.region_name,
base.CAPABILITES)
base.CAPABILITES,
subcloud.management_ip)
self.mock_distribute_keys.assert_called_once()
@ -126,7 +128,8 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
subcloud = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state='')
initial_sync_state='',
management_ip='192.168.1.11')
self.assertIsNotNone(subcloud)
self.iswm.initial_sync = mock.MagicMock()
@ -134,7 +137,8 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
# Initial sync the subcloud
self.iswm._initial_sync_subcloud(self.ctx,
subcloud.region_name,
base.CAPABILITES)
base.CAPABILITES,
subcloud.management_ip)
# Verify that the initial sync steps were not done
self.iswm.initial_sync.assert_not_called()
@ -149,7 +153,8 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
subcloud = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED,
management_ip='192.168.1.11')
self.assertIsNotNone(subcloud)
self.iswm.enable_subcloud = mock.MagicMock()
@ -159,7 +164,8 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
# Initial sync the subcloud
self.iswm._initial_sync_subcloud(self.ctx,
subcloud.region_name,
base.CAPABILITES)
base.CAPABILITES,
subcloud.management_ip)
# Verify the initial sync was failed
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
@ -203,13 +209,17 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
subcloud1 = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state='')
initial_sync_state='',
management_ip='192.168.1.11')
subcloud2 = utils.create_subcloud_static(
self.ctx,
name='subcloud2',
initial_sync_state='')
subcloud_capabilities = {subcloud1.region_name: base.CAPABILITES,
subcloud2.region_name: base.CAPABILITES}
initial_sync_state='',
management_ip='192.168.1.12')
subcloud_capabilities = {
subcloud1.region_name: (base.CAPABILITES, subcloud1.management_ip),
subcloud2.region_name: (base.CAPABILITES, subcloud2.management_ip)
}
self.iswm.initial_sync_subclouds(self.ctx, subcloud_capabilities)
@ -217,10 +227,10 @@ class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
self.mock_thread_start.assert_any_call(self.iswm._initial_sync_subcloud,
mock.ANY,
subcloud1.region_name,
subcloud_capabilities.get(
subcloud1.region_name))
base.CAPABILITES,
subcloud1.management_ip)
self.mock_thread_start.assert_called_with(self.iswm._initial_sync_subcloud,
mock.ANY,
subcloud2.region_name,
subcloud_capabilities.get(
subcloud2.region_name))
base.CAPABILITES,
subcloud2.management_ip)