Merge "Keystone DB sync - update dcorch to use dcdbsync" into f/keystone-db
This commit is contained in:
commit
5ea242f444
|
@ -356,7 +356,9 @@ class SubcloudsController(object):
|
|||
('heat_admin', 'heat-domain'),
|
||||
('gnocchi', 'gnocchi'),
|
||||
('fm', 'fm'),
|
||||
('barbican', 'barbican')
|
||||
('barbican', 'barbican'),
|
||||
('smapi', 'smapi'),
|
||||
('dcdbsync', 'dcdbsync')
|
||||
]
|
||||
|
||||
user_list = list()
|
||||
|
|
|
@ -310,6 +310,7 @@ class SubcloudManager(manager.Manager):
|
|||
|
||||
# Get the subcloud details from the database
|
||||
subcloud = db_api.subcloud_get(context, subcloud_id)
|
||||
original_management_state = subcloud.management_state
|
||||
|
||||
# Semantic checking
|
||||
if management_state:
|
||||
|
@ -357,7 +358,14 @@ class SubcloudManager(manager.Manager):
|
|||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.warn('Problem informing dcorch of subcloud '
|
||||
'state change, subcloud: %s' % subcloud.name)
|
||||
'state change, resume to original state, subcloud: %s'
|
||||
% subcloud.name)
|
||||
management_state = original_management_state
|
||||
subcloud = \
|
||||
db_api.subcloud_update(context, subcloud_id,
|
||||
management_state=management_state,
|
||||
description=description,
|
||||
location=location)
|
||||
|
||||
if management_state == consts.MANAGEMENT_UNMANAGED:
|
||||
|
||||
|
|
|
@ -343,6 +343,7 @@ class ComputeAPIController(APIController):
|
|||
resource_tag = self._get_resource_tag_from_header(request_header,
|
||||
operation_type,
|
||||
resource_type)
|
||||
|
||||
handler = self._resource_handler[resource_tag]
|
||||
operation_type, resource_id, resource_info = handler(
|
||||
environ=environ,
|
||||
|
@ -454,7 +455,7 @@ class IdentityAPIController(APIController):
|
|||
return response
|
||||
|
||||
def _generate_assignment_rid(self, url, environ):
|
||||
resource_id = ''
|
||||
resource_id = None
|
||||
# for role assignment or revocation, the URL is of format:
|
||||
# /v3/projects/{project_id}/users/{user_id}/roles/{role_id}
|
||||
# We need to extract all ID parameters from the URL
|
||||
|
@ -468,6 +469,23 @@ class IdentityAPIController(APIController):
|
|||
resource_id = "{}_{}_{}".format(proj_id, user_id, role_id)
|
||||
return resource_id
|
||||
|
||||
def _retrieve_token_revoke_event_rid(self, url, environ):
|
||||
resource_id = None
|
||||
# for token revocation event, we need to retrieve the audit_id
|
||||
# from the token being revoked.
|
||||
revoked_token = environ.get('HTTP_X_SUBJECT_TOKEN', None)
|
||||
|
||||
if not revoked_token:
|
||||
LOG.error("Malformed Token Revocation URL: %s", url)
|
||||
else:
|
||||
try:
|
||||
resource_id = proxy_utils.\
|
||||
retrieve_token_audit_id(revoked_token)
|
||||
except Exception as e:
|
||||
LOG.error("Failed to retrieve token audit id: %s" % e)
|
||||
|
||||
return resource_id
|
||||
|
||||
def _enqueue_work(self, environ, request_body, response):
|
||||
LOG.info("enqueue_work")
|
||||
resource_info = {}
|
||||
|
@ -482,6 +500,26 @@ class IdentityAPIController(APIController):
|
|||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS):
|
||||
resource_id = self._generate_assignment_rid(request_header,
|
||||
environ)
|
||||
# grant a role to a user (PUT) creates a project role assignment
|
||||
if operation_type == consts.OPERATION_TYPE_PUT:
|
||||
operation_type = consts.OPERATION_TYPE_POST
|
||||
elif (resource_type ==
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS):
|
||||
resource_id = self._retrieve_token_revoke_event_rid(request_header,
|
||||
environ)
|
||||
# delete (revoke) a token (DELETE) creates a token revoke event.
|
||||
if operation_type == consts.OPERATION_TYPE_DELETE and resource_id:
|
||||
operation_type = consts.OPERATION_TYPE_POST
|
||||
resource_info = {'token_revoke_event':
|
||||
{'audit_id': resource_id}}
|
||||
elif (resource_type ==
|
||||
consts.RESOURCE_TYPE_IDENTITY_USERS_PASSWORD):
|
||||
resource_id = self.get_resource_id_from_link(request_header.
|
||||
strip('/password'))
|
||||
# user change password (POST) is an update to the user
|
||||
if operation_type == consts.OPERATION_TYPE_POST:
|
||||
operation_type = consts.OPERATION_TYPE_PATCH
|
||||
resource_type = consts.RESOURCE_TYPE_IDENTITY_USERS
|
||||
else:
|
||||
if operation_type == consts.OPERATION_TYPE_POST:
|
||||
# Retrieve the ID from the response
|
||||
|
@ -490,20 +528,25 @@ class IdentityAPIController(APIController):
|
|||
else:
|
||||
resource_id = self.get_resource_id_from_link(request_header)
|
||||
|
||||
if (operation_type != consts.OPERATION_TYPE_DELETE and request_body):
|
||||
if (operation_type != consts.OPERATION_TYPE_DELETE and
|
||||
request_body and (not resource_info)):
|
||||
resource_info = json.loads(request_body)
|
||||
|
||||
LOG.info("%s: Resource id: (%s), type: (%s), info: (%s)",
|
||||
operation_type, resource_id, resource_type, resource_info)
|
||||
try:
|
||||
utils.enqueue_work(self.ctxt,
|
||||
self.ENDPOINT_TYPE,
|
||||
resource_type,
|
||||
resource_id,
|
||||
operation_type,
|
||||
json.dumps(resource_info))
|
||||
except exception.ResourceNotFound as e:
|
||||
raise webob.exc.HTTPNotFound(explanation=e.format_message())
|
||||
|
||||
if resource_id:
|
||||
try:
|
||||
utils.enqueue_work(self.ctxt,
|
||||
self.ENDPOINT_TYPE,
|
||||
resource_type,
|
||||
resource_id,
|
||||
operation_type,
|
||||
json.dumps(resource_info))
|
||||
except exception.ResourceNotFound as e:
|
||||
raise webob.exc.HTTPNotFound(explanation=e.format_message())
|
||||
else:
|
||||
LOG.warning("Empty resource id for resource: %s", operation_type)
|
||||
|
||||
|
||||
class CinderAPIController(APIController):
|
||||
|
|
|
@ -297,7 +297,10 @@ IDENTITY_PROJECTS_PATH = [
|
|||
|
||||
IDENTITY_PROJECTS_ROLE_PATH = [
|
||||
'/v3/projects/{project_id}/users/{user_id}/roles/{role_id}',
|
||||
]
|
||||
|
||||
IDENTITY_TOKEN_REVOKE_EVENTS_PATH = [
|
||||
'/v3/auth/tokens',
|
||||
]
|
||||
|
||||
IDENTITY_PATH_MAP = {
|
||||
|
@ -306,7 +309,9 @@ IDENTITY_PATH_MAP = {
|
|||
consts.RESOURCE_TYPE_IDENTITY_ROLES: IDENTITY_ROLES_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECTS: IDENTITY_PROJECTS_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS:
|
||||
IDENTITY_PROJECTS_ROLE_PATH
|
||||
IDENTITY_PROJECTS_ROLE_PATH,
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS:
|
||||
IDENTITY_TOKEN_REVOKE_EVENTS_PATH,
|
||||
}
|
||||
|
||||
ROUTE_METHOD_MAP = {
|
||||
|
@ -362,7 +367,9 @@ ROUTE_METHOD_MAP = {
|
|||
consts.RESOURCE_TYPE_IDENTITY_PROJECTS:
|
||||
['POST', 'PATCH', 'DELETE'],
|
||||
consts.RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS:
|
||||
['PUT', 'DELETE']
|
||||
['PUT', 'DELETE'],
|
||||
consts.RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS:
|
||||
['DELETE']
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,10 +13,18 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from dcorch.common import consts
|
||||
from oslo_log import log as logging
|
||||
import base64
|
||||
from cryptography import fernet
|
||||
import msgpack
|
||||
import six
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
from keystoneauth1 import exceptions as keystone_exceptions
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dcorch.common import consts
|
||||
from dcorch.drivers.openstack import sdk_platform as sdk
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -108,3 +116,84 @@ def set_request_forward_environ(req, remote_host, remote_port):
|
|||
if ('REMOTE_ADDR' in req.environ and 'HTTP_X_FORWARDED_FOR' not in
|
||||
req.environ):
|
||||
req.environ['HTTP_X_FORWARDED_FOR'] = req.environ['REMOTE_ADDR']
|
||||
|
||||
|
||||
def _get_fernet_keys():
|
||||
"""Get fernet keys from sysinv."""
|
||||
os_client = sdk.OpenStackDriver(consts.CLOUD_0)
|
||||
try:
|
||||
key_list = os_client.sysinv_client.get_fernet_keys()
|
||||
return [str(getattr(key, 'key')) for key in key_list]
|
||||
except (keystone_exceptions.connection.ConnectTimeout,
|
||||
keystone_exceptions.ConnectFailure) as e:
|
||||
LOG.info("get_fernet_keys: cloud {} is not reachable [{}]"
|
||||
.format(consts.CLOUD_0, str(e)))
|
||||
os_client.delete_region_clients(consts.CLOUD_0)
|
||||
return None
|
||||
except (AttributeError, TypeError) as e:
|
||||
LOG.info("get_fernet_keys error {}".format(e))
|
||||
os_client.delete_region_clients(consts.CLOUD_0, clear_token=True)
|
||||
return None
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
return None
|
||||
|
||||
|
||||
def _restore_padding(token):
|
||||
"""Restore padding based on token size.
|
||||
|
||||
:param token: token to restore padding on
|
||||
:returns: token with correct padding
|
||||
"""
|
||||
|
||||
# Re-inflate the padding
|
||||
mod_returned = len(token) % 4
|
||||
if mod_returned:
|
||||
missing_padding = 4 - mod_returned
|
||||
token += b'=' * missing_padding
|
||||
return token
|
||||
|
||||
|
||||
def _unpack_token(fernet_token, fernet_keys):
|
||||
"""Attempt to unpack a token using the supplied Fernet keys.
|
||||
|
||||
:param fernet_token: token to unpack
|
||||
:type fernet_token: string
|
||||
:param fernet_keys: a list consisting of keys in the repository
|
||||
:type fernet_keys: list
|
||||
:returns: the token payload
|
||||
"""
|
||||
|
||||
# create a list of fernet instances
|
||||
fernet_instances = [fernet.Fernet(key) for key in fernet_keys]
|
||||
# create a encryption/decryption object from the fernet keys
|
||||
crypt = fernet.MultiFernet(fernet_instances)
|
||||
|
||||
# attempt to decode the token
|
||||
token = _restore_padding(six.binary_type(fernet_token))
|
||||
serialized_payload = crypt.decrypt(token)
|
||||
payload = msgpack.unpackb(serialized_payload)
|
||||
|
||||
# present token values
|
||||
return payload
|
||||
|
||||
|
||||
def retrieve_token_audit_id(fernet_token):
|
||||
"""Attempt to retrieve the audit id from the fernet token.
|
||||
|
||||
:param fernet_token:
|
||||
:param keys_repository:
|
||||
:return: audit id in base64 encoded (without paddings)
|
||||
"""
|
||||
|
||||
audit_id = None
|
||||
fernet_keys = _get_fernet_keys()
|
||||
LOG.info("fernet_keys: {}".format(fernet_keys))
|
||||
|
||||
if fernet_keys:
|
||||
unpacked_token = _unpack_token(fernet_token, fernet_keys)
|
||||
if unpacked_token:
|
||||
audit_id = unpacked_token[-1][0]
|
||||
audit_id = base64.urlsafe_b64encode(audit_id).rstrip('=')
|
||||
|
||||
return audit_id
|
||||
|
|
|
@ -122,6 +122,8 @@ RESOURCE_TYPE_IDENTITY_USERS_PASSWORD = "users_password"
|
|||
RESOURCE_TYPE_IDENTITY_ROLES = "roles"
|
||||
RESOURCE_TYPE_IDENTITY_PROJECTS = "projects"
|
||||
RESOURCE_TYPE_IDENTITY_PROJECT_ROLE_ASSIGNMENTS = "project_role_assignments"
|
||||
RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS = "revoke_events"
|
||||
RESOURCE_TYPE_IDENTITY_TOKEN_REVOKE_EVENTS_FOR_USER = "revoke_events_for_user"
|
||||
|
||||
KEYPAIR_ID_DELIM = "/"
|
||||
|
||||
|
@ -151,6 +153,10 @@ ENDPOINT_QUOTA_MAPPING = {
|
|||
KS_ENDPOINT_INTERNAL = "internal"
|
||||
KS_ENDPOINT_DEFAULT = KS_ENDPOINT_INTERNAL
|
||||
|
||||
# DB sync agent endpoint
|
||||
DBS_ENDPOINT_INTERNAL = "internal"
|
||||
DBS_ENDPOINT_DEFAULT = DBS_ENDPOINT_INTERNAL
|
||||
|
||||
# Do we need separate patch/put operations or could we just use
|
||||
# create/update/delete and have the sync code know which HTTP
|
||||
# operation to use?
|
||||
|
|
|
@ -170,6 +170,10 @@ class SubcloudNotFound(NotFound):
|
|||
message = _("Subcloud %(region_name)s not found")
|
||||
|
||||
|
||||
class ThreadNotFound(NotFound):
|
||||
message = _("Thread %(thread_name)s of %(region_name)s not found")
|
||||
|
||||
|
||||
class OrchJobNotFound(NotFound):
|
||||
message = _("OrchJob %(orch_job)s not found")
|
||||
|
||||
|
|
|
@ -74,6 +74,10 @@ class SysinvClient(base.DriverBase):
|
|||
token = session.get_token()
|
||||
client = cgts_client.Client(
|
||||
api_version,
|
||||
username=session.auth._username,
|
||||
password=session.auth._password,
|
||||
tenant_name=session.auth._project_name,
|
||||
auth_url=session.auth.auth_url,
|
||||
endpoint=endpoint,
|
||||
token=token)
|
||||
except exceptions.ServiceUnavailable:
|
||||
|
@ -710,7 +714,7 @@ class SysinvClient(base.DriverBase):
|
|||
|
||||
return iuser
|
||||
|
||||
def create_fernet_repo(self, key_list):
|
||||
def post_fernet_repo(self, key_list=None):
|
||||
"""Add the fernet keys for this region
|
||||
|
||||
:param: key list payload
|
||||
|
@ -721,26 +725,26 @@ class SysinvClient(base.DriverBase):
|
|||
# [{"id": 0, "key": "GgDAOfmyr19u0hXdm5r_zMgaMLjglVFpp5qn_N4GBJQ="},
|
||||
# {"id": 1, "key": "7WfL_z54p67gWAkOmQhLA9P0ZygsbbJcKgff0uh28O8="},
|
||||
# {"id": 2, "key": ""5gsUQeOZ2FzZP58DN32u8pRKRgAludrjmrZFJSOHOw0="}]
|
||||
LOG.info("create_fernet_repo driver region={} "
|
||||
LOG.info("post_fernet_repo driver region={} "
|
||||
"fernet_repo_list={}".format(self.region_name, key_list))
|
||||
try:
|
||||
self.client.fernet.create(key_list)
|
||||
except Exception as e:
|
||||
LOG.error("create_fernet_repo exception={}".format(e))
|
||||
LOG.error("post_fernet_repo exception={}".format(e))
|
||||
raise exceptions.SyncRequestFailedRetry()
|
||||
|
||||
def update_fernet_repo(self, key_list):
|
||||
def put_fernet_repo(self, key_list):
|
||||
"""Update the fernet keys for this region
|
||||
|
||||
:param: key list payload
|
||||
:return: Nothing
|
||||
"""
|
||||
LOG.info("update_fernet_repo driver region={} "
|
||||
LOG.info("put_fernet_repo driver region={} "
|
||||
"fernet_repo_list={}".format(self.region_name, key_list))
|
||||
try:
|
||||
self.client.fernet.put(key_list)
|
||||
except Exception as e:
|
||||
LOG.error("update_fernet_repo exception={}".format(e))
|
||||
LOG.error("put_fernet_repo exception={}".format(e))
|
||||
raise exceptions.SyncRequestFailedRetry()
|
||||
|
||||
def get_fernet_keys(self):
|
||||
|
|
|
@ -27,7 +27,6 @@ from dcorch.common.i18n import _
|
|||
from dcorch.common import manager
|
||||
from dcorch.common import utils
|
||||
from dcorch.drivers.openstack import sdk_platform as sdk
|
||||
from dcorch.objects import subcloud as subcloud_obj
|
||||
|
||||
|
||||
FERNET_REPO_MASTER_ID = "keys"
|
||||
|
@ -117,9 +116,26 @@ class FernetKeyManager(manager.Manager):
|
|||
self._schedule_work(consts.OPERATION_TYPE_PUT)
|
||||
|
||||
def distribute_keys(self, ctxt, subcloud_name):
|
||||
subclouds = subcloud_obj.SubcloudList.get_all(ctxt)
|
||||
for sc in subclouds:
|
||||
if sc.region_name == subcloud_name:
|
||||
subcloud = sc
|
||||
self._schedule_work(consts.OPERATION_TYPE_CREATE, subcloud)
|
||||
break
|
||||
keys = self._get_master_keys()
|
||||
if not keys:
|
||||
LOG.info(_("No fernet keys returned from %s") % consts.CLOUD_0)
|
||||
return
|
||||
resource_info = FernetKeyManager.to_resource_info(keys)
|
||||
key_list = FernetKeyManager.from_resource_info(resource_info)
|
||||
self.update_fernet_repo(subcloud_name, key_list)
|
||||
|
||||
def reset_keys(self, subcloud_name):
|
||||
self.update_fernet_repo(subcloud_name)
|
||||
|
||||
@staticmethod
|
||||
def update_fernet_repo(subcloud_name, key_list=None):
|
||||
try:
|
||||
os_client = sdk.OpenStackDriver(subcloud_name)
|
||||
os_client.sysinv_client.post_fernet_repo(key_list)
|
||||
except (exceptions.ConnectionRefused, exceptions.NotAuthorized,
|
||||
exceptions.TimeOut):
|
||||
LOG.info(_("Update the fernet repo on %s timeout") %
|
||||
subcloud_name)
|
||||
except Exception as e:
|
||||
error_msg = "subcloud: {}, {}".format(subcloud_name, e.message)
|
||||
LOG.info(_("Fail to update fernet repo %s") % error_msg)
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from dcorch.common import exceptions
|
||||
|
@ -85,6 +84,14 @@ class GenericSyncManager(object):
|
|||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def initial_sync(self, context, subcloud_name):
|
||||
try:
|
||||
subcloud_engine = self.subcloud_engines[subcloud_name]
|
||||
LOG.info('Initial sync subcloud %(sc)s' % {'sc': subcloud_name})
|
||||
subcloud_engine.initial_sync()
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def run_sync_audit(self):
|
||||
for subcloud_engine in self.subcloud_engines.values():
|
||||
subcloud_engine.run_sync_audit()
|
||||
|
|
|
@ -193,11 +193,26 @@ class EngineService(service.Service):
|
|||
# keep equivalent functionality for now
|
||||
if (management_state == dcm_consts.MANAGEMENT_MANAGED) and \
|
||||
(availability_status == dcm_consts.AVAILABILITY_ONLINE):
|
||||
self.fkm.distribute_keys(ctxt, subcloud_name)
|
||||
self.aam.enable_snmp(ctxt, subcloud_name)
|
||||
self.gsm.enable_subcloud(ctxt, subcloud_name)
|
||||
# Initial identity sync. It's synchronous so that identity
|
||||
# get synced before fernet token keys are synced. This is
|
||||
# necessary since we want to revoke all existing tokens on
|
||||
# this subcloud after its services user IDs and project
|
||||
# IDs are changed. Otherwise subcloud services will fail
|
||||
# authentication since they keep on using their existing tokens
|
||||
# issued before these IDs change, until these tokens expires.
|
||||
try:
|
||||
self.gsm.initial_sync(ctxt, subcloud_name)
|
||||
self.fkm.distribute_keys(ctxt, subcloud_name)
|
||||
self.aam.enable_snmp(ctxt, subcloud_name)
|
||||
self.gsm.enable_subcloud(ctxt, subcloud_name)
|
||||
except Exception as ex:
|
||||
LOG.warning('Update subcloud state failed for %s: %s',
|
||||
subcloud_name, six.text_type(ex))
|
||||
raise
|
||||
else:
|
||||
self.gsm.disable_subcloud(ctxt, subcloud_name)
|
||||
if (management_state == dcm_consts.MANAGEMENT_UNMANAGED):
|
||||
self.fkm.reset_keys(subcloud_name)
|
||||
|
||||
@request_context
|
||||
# todo: add authentication since ctxt not actually needed later
|
||||
|
|
|
@ -106,6 +106,11 @@ class SubCloudEngine(object):
|
|||
self.shutdown()
|
||||
self.subcloud.delete()
|
||||
|
||||
def initial_sync(self):
|
||||
# initial synchronization of the subcloud
|
||||
for thread in self.sync_threads:
|
||||
thread.initial_sync()
|
||||
|
||||
def run_sync_audit(self):
|
||||
# run periodic sync audit on all threads in this subcloud
|
||||
if self.is_enabled():
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -799,7 +799,7 @@ class SysinvSyncThread(SyncThread):
|
|||
|
||||
s_os_client = sdk.OpenStackDriver(self.region_name)
|
||||
try:
|
||||
s_os_client.sysinv_client.create_fernet_repo(
|
||||
s_os_client.sysinv_client.post_fernet_repo(
|
||||
FernetKeyManager.from_resource_info(resource_info))
|
||||
# Ensure subcloud resource is persisted to the DB for later
|
||||
subcloud_rsrc_id = self.persist_db_subcloud_resource(
|
||||
|
@ -831,7 +831,7 @@ class SysinvSyncThread(SyncThread):
|
|||
|
||||
s_os_client = sdk.OpenStackDriver(self.region_name)
|
||||
try:
|
||||
s_os_client.sysinv_client.update_fernet_repo(
|
||||
s_os_client.sysinv_client.put_fernet_repo(
|
||||
FernetKeyManager.from_resource_info(resource_info))
|
||||
# Ensure subcloud resource is persisted to the DB for later
|
||||
subcloud_rsrc_id = self.persist_db_subcloud_resource(
|
||||
|
@ -1309,7 +1309,7 @@ class SysinvSyncThread(SyncThread):
|
|||
resource_type),
|
||||
extra=self.log_extra)
|
||||
|
||||
def audit_discrepancy(self, resource_type, m_resource, sc_resources):
|
||||
def audit_discrepancy(self, resource_type, m_resource):
|
||||
# Return true to try the audit_action
|
||||
if resource_type in self.SYSINV_ADD_DELETE_RESOURCES:
|
||||
# It could be that the details are different
|
||||
|
@ -1329,7 +1329,7 @@ class SysinvSyncThread(SyncThread):
|
|||
extra=self.log_extra)
|
||||
return False
|
||||
|
||||
def audit_action(self, resource_type, finding, resource):
|
||||
def audit_action(self, resource_type, finding, resource, sc_source=None):
|
||||
if resource_type in self.SYSINV_MODIFY_RESOURCES:
|
||||
LOG.info("audit_action: {}/{}"
|
||||
.format(finding, resource_type),
|
||||
|
|
|
@ -15,12 +15,10 @@
|
|||
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from keystoneclient import client as keystoneclient
|
||||
|
||||
from dcdbsync.dbsyncclient import client as dbsyncclient
|
||||
from dcmanager.common import consts as dcmanager_consts
|
||||
from dcmanager.rpc import client as dcmanager_rpc_client
|
||||
from dcorch.common import consts
|
||||
|
@ -30,7 +28,11 @@ from dcorch.common import utils
|
|||
from dcorch.objects import orchrequest
|
||||
from dcorch.objects import resource
|
||||
from dcorch.objects import subcloud_resource
|
||||
from oslo_config import cfg
|
||||
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from keystoneclient import client as keystoneclient
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
@ -81,6 +83,7 @@ class SyncThread(object):
|
|||
self.sc_admin_session = None
|
||||
self.admin_session = None
|
||||
self.ks_client = None
|
||||
self.dbs_client = None
|
||||
|
||||
def start(self):
|
||||
if self.status == STATUS_NEW:
|
||||
|
@ -124,9 +127,15 @@ class SyncThread(object):
|
|||
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
||||
self.admin_session = session.Session(
|
||||
auth=auth, timeout=60, additional_headers=consts.USER_HEADER)
|
||||
# keystone client
|
||||
self.ks_client = keystoneclient.Client(
|
||||
session=self.admin_session,
|
||||
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
||||
# dcdbsync client
|
||||
self.dbs_client = dbsyncclient.Client(
|
||||
endpoint_type=consts.DBS_ENDPOINT_INTERNAL,
|
||||
session=self.admin_session,
|
||||
region_name=consts.VIRTUAL_MASTER_CLOUD)
|
||||
|
||||
def initialize_sc_clients(self):
|
||||
# base implementation of initializing the subcloud specific
|
||||
|
@ -164,6 +173,10 @@ class SyncThread(object):
|
|||
auth=sc_auth, timeout=60,
|
||||
additional_headers=consts.USER_HEADER)
|
||||
|
||||
def initial_sync(self):
|
||||
# Return True to indicate initial sync success
|
||||
return True
|
||||
|
||||
def enable(self):
|
||||
# Called when DC manager thinks this subcloud is good to go.
|
||||
self.initialize()
|
||||
|
@ -530,6 +543,7 @@ class SyncThread(object):
|
|||
extra=self.log_extra)
|
||||
# Subcloud resource is present in DB, but the check
|
||||
# for same_resource() was negative. Either the resource
|
||||
|
||||
# disappeared from subcloud or the resource details
|
||||
# are different from that of master cloud. Let the
|
||||
# resource implementation decide on the audit action.
|
||||
|
@ -556,6 +570,7 @@ class SyncThread(object):
|
|||
# Resource is missing from subcloud, take action
|
||||
num_of_audit_jobs += self.audit_action(
|
||||
resource_type, AUDIT_RESOURCE_MISSING, m_r)
|
||||
|
||||
# As the subcloud resource is missing, invoke
|
||||
# the hook for dependants with no subcloud resource.
|
||||
# Resource implementation should handle this.
|
||||
|
@ -667,6 +682,9 @@ class SyncThread(object):
|
|||
def same_resource(self, resource_type, m_resource, sc_resource):
|
||||
return True
|
||||
|
||||
def has_same_ids(self, resource_type, m_resource, sc_resource):
|
||||
return False
|
||||
|
||||
def map_subcloud_resource(self, resource_type, m_r, m_rsrc_db,
|
||||
sc_resources):
|
||||
# Child classes can override this function to map an existing subcloud
|
||||
|
|
Loading…
Reference in New Issue