Merge "dcorch for containerized openstack services - service"
This commit is contained in:
commit
a550473d9e
|
@ -215,6 +215,10 @@ pecan_group = cfg.OptGroup(name='pecan',
|
|||
cache_opt_group = cfg.OptGroup(name='cache',
|
||||
title='OpenStack Credentials')
|
||||
|
||||
openstack_cache_opt_group = cfg.OptGroup(name='openstack_cache',
|
||||
title='Containerized OpenStack'
|
||||
' Credentials')
|
||||
|
||||
snmp_opt_group = cfg.OptGroup(name='snmp',
|
||||
title='SNMP Options')
|
||||
|
||||
|
@ -227,6 +231,7 @@ def list_opts():
|
|||
yield default_quota_group.name, neutron_quotas
|
||||
yield default_quota_group.name, cinder_quotas
|
||||
yield cache_opt_group.name, cache_opts
|
||||
yield openstack_cache_opt_group.name, cache_opts
|
||||
yield scheduler_opt_group.name, scheduler_opts
|
||||
yield pecan_group.name, pecan_opts
|
||||
yield snmp_opt_group.name, snmp_server_opts
|
||||
|
|
|
@ -137,10 +137,14 @@ ENDPOINT_TYPE_PATCHING = "patching"
|
|||
ENDPOINT_TYPE_IDENTITY = "identity"
|
||||
ENDPOINT_TYPE_FM = "faultmanagement"
|
||||
ENDPOINT_TYPE_NFV = "nfv"
|
||||
ENDPOINT_TYPE_IDENTITY_OS = "identity_openstack"
|
||||
|
||||
# platform endpoint types
|
||||
ENDPOINT_TYPES_LIST = [ENDPOINT_TYPE_PLATFORM,
|
||||
ENDPOINT_TYPE_PATCHING,
|
||||
ENDPOINT_TYPE_IDENTITY]
|
||||
# openstack endpoint types
|
||||
ENDPOINT_TYPES_LIST_OS = [ENDPOINT_TYPE_IDENTITY_OS]
|
||||
|
||||
ENDPOINT_QUOTA_MAPPING = {
|
||||
ENDPOINT_TYPE_COMPUTE: NOVA_QUOTA_FIELDS,
|
||||
|
@ -148,6 +152,7 @@ ENDPOINT_QUOTA_MAPPING = {
|
|||
ENDPOINT_TYPE_VOLUME: CINDER_QUOTA_FIELDS,
|
||||
}
|
||||
|
||||
KS_ENDPOINT_ADMIN = "admin"
|
||||
KS_ENDPOINT_INTERNAL = "internal"
|
||||
KS_ENDPOINT_DEFAULT = KS_ENDPOINT_INTERNAL
|
||||
|
||||
|
|
|
@ -150,6 +150,11 @@ class EndpointNotReachable(OrchestratorException):
|
|||
message = _("The specified resource endpoint is not reachable")
|
||||
|
||||
|
||||
class EndpointNotSupported(OrchestratorException):
|
||||
message = _("The specified resource endpoint %(endpoint)s is not"
|
||||
" supported")
|
||||
|
||||
|
||||
class SyncRequestFailed(OrchestratorException):
|
||||
message = _("The sync operation failed")
|
||||
|
||||
|
|
|
@ -75,6 +75,31 @@ class GenericSyncManager(object):
|
|||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def add_subcloud_sync_endpoint_type(self, context, subcloud_name,
|
||||
endpoint_type_list=None):
|
||||
try:
|
||||
subcloud_engine = self.subcloud_engines[subcloud_name]
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
LOG.info('adding sync endpoint type for subcloud %(sc)s' %
|
||||
{'sc': subcloud_name})
|
||||
try:
|
||||
subcloud_engine.add_sync_endpoint_type(endpoint_type_list)
|
||||
except Exception:
|
||||
subcloud_engine.remove_sync_endpoint_type(endpoint_type_list)
|
||||
raise
|
||||
|
||||
def remove_subcloud_sync_endpoint_type(self, context, subcloud_name,
|
||||
endpoint_type_list=None):
|
||||
try:
|
||||
subcloud_engine = self.subcloud_engines[subcloud_name]
|
||||
LOG.info('removing sync endpoint type for subcloud %(sc)s' %
|
||||
{'sc': subcloud_name})
|
||||
subcloud_engine.remove_sync_endpoint_type(endpoint_type_list)
|
||||
except KeyError:
|
||||
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
|
||||
|
||||
def update_subcloud_version(self, context, subcloud_name, sw_version):
|
||||
try:
|
||||
subcloud_engine = self.subcloud_engines[subcloud_name]
|
||||
|
|
|
@ -176,6 +176,7 @@ class EngineService(service.Service):
|
|||
|
||||
@request_context
|
||||
def add_subcloud(self, ctxt, subcloud_name, sw_version):
|
||||
|
||||
self.gsm.add_subcloud(ctxt, subcloud_name, sw_version)
|
||||
|
||||
@request_context
|
||||
|
@ -212,6 +213,30 @@ class EngineService(service.Service):
|
|||
if (management_state == dcm_consts.MANAGEMENT_UNMANAGED):
|
||||
self.fkm.reset_keys(subcloud_name)
|
||||
|
||||
@request_context
|
||||
def add_subcloud_sync_endpoint_type(self, ctxt, subcloud_name,
|
||||
endpoint_type_list=None):
|
||||
try:
|
||||
self.gsm.add_subcloud_sync_endpoint_type(
|
||||
ctxt, subcloud_name,
|
||||
endpoint_type_list=endpoint_type_list)
|
||||
except Exception as ex:
|
||||
LOG.warning('Add subcloud endpoint type failed for %s: %s',
|
||||
subcloud_name, six.text_type(ex))
|
||||
raise
|
||||
|
||||
@request_context
|
||||
def remove_subcloud_sync_endpoint_type(self, ctxt, subcloud_name,
|
||||
endpoint_type_list=None):
|
||||
try:
|
||||
self.gsm.remove_subcloud_sync_endpoint_type(
|
||||
ctxt, subcloud_name,
|
||||
endpoint_type_list=endpoint_type_list)
|
||||
except Exception as ex:
|
||||
LOG.warning('Remove subcloud endpoint type failed for %s: %s',
|
||||
subcloud_name, six.text_type(ex))
|
||||
raise
|
||||
|
||||
@request_context
|
||||
# todo: add authentication since ctxt not actually needed later
|
||||
def update_subcloud_version(self, ctxt, subcloud_name, sw_version):
|
||||
|
|
|
@ -16,12 +16,21 @@
|
|||
import threading
|
||||
|
||||
from dcmanager.common import consts as dcm_consts
|
||||
from dcorch.engine.sync_thread import SyncThread
|
||||
from dcorch.common import consts as dco_consts
|
||||
from dcorch.engine.sync_services.identity import IdentitySyncThread
|
||||
from dcorch.engine.sync_services.sysinv import SysinvSyncThread
|
||||
from dcorch.objects.subcloud import Subcloud
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# sync thread endpoint type and subclass mappings
|
||||
syncthread_subclass_map = {
|
||||
dco_consts.ENDPOINT_TYPE_PLATFORM: SysinvSyncThread,
|
||||
dco_consts.ENDPOINT_TYPE_IDENTITY: IdentitySyncThread,
|
||||
dco_consts.ENDPOINT_TYPE_IDENTITY_OS: IdentitySyncThread
|
||||
}
|
||||
|
||||
|
||||
class SubCloudEngine(object):
|
||||
def __init__(self, context=None, name=None,
|
||||
|
@ -37,10 +46,18 @@ class SubCloudEngine(object):
|
|||
if subcloud is not None:
|
||||
self.subcloud = subcloud
|
||||
else:
|
||||
capabilities = {}
|
||||
endpoint_type_list = dco_consts.ENDPOINT_TYPES_LIST[:]
|
||||
# patching is handled by dcmanager
|
||||
endpoint_type_list.remove(dco_consts.ENDPOINT_TYPE_PATCHING)
|
||||
capabilities.update({'endpoint_types': endpoint_type_list})
|
||||
self.subcloud = Subcloud(
|
||||
context, region_name=name, software_version=version)
|
||||
context, region_name=name, software_version=version,
|
||||
capabilities=capabilities)
|
||||
self.subcloud.create()
|
||||
self.lock = threading.Lock() # protects the status
|
||||
self.capabilities_lock = threading.Lock() # protects capabilities
|
||||
self.sync_threads_lock = threading.Lock() # protects sync_threads
|
||||
self.sync_threads = [] # the individual SyncThread objects
|
||||
|
||||
def set_version(self, version):
|
||||
|
@ -50,10 +67,15 @@ class SubCloudEngine(object):
|
|||
|
||||
def spawn_sync_threads(self):
|
||||
# spawn the threads that actually handle syncing this subcloud
|
||||
for subclass in SyncThread.__subclasses__():
|
||||
thread = subclass(self)
|
||||
self.sync_threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
capabilities = self.subcloud.capabilities
|
||||
# start sync threads
|
||||
endpoint_type_list = capabilities.get('endpoint_types', None)
|
||||
if endpoint_type_list:
|
||||
for endpoint_type in endpoint_type_list:
|
||||
thread = syncthread_subclass_map[endpoint_type](self)
|
||||
self.sync_threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
def is_managed(self):
|
||||
# is this subcloud managed
|
||||
|
@ -69,6 +91,10 @@ class SubCloudEngine(object):
|
|||
self.lock.release()
|
||||
return status == dcm_consts.AVAILABILITY_ONLINE
|
||||
|
||||
def is_ready(self):
|
||||
# is this subcloud ready for synchronization
|
||||
return self.is_managed() and self.is_enabled()
|
||||
|
||||
def enable(self):
|
||||
# set subcloud availability to online
|
||||
self.lock.acquire()
|
||||
|
@ -116,3 +142,62 @@ class SubCloudEngine(object):
|
|||
if self.is_enabled():
|
||||
for thread in self.sync_threads:
|
||||
thread.run_sync_audit()
|
||||
|
||||
def add_sync_endpoint_type(self, endpoint_type_list):
|
||||
# add the endpoint types into subcloud capabilities
|
||||
with self.capabilities_lock:
|
||||
capabilities = self.subcloud.capabilities
|
||||
c_endpoint_type_list = capabilities.get('endpoint_types', [])
|
||||
|
||||
if endpoint_type_list:
|
||||
for endpoint_type in endpoint_type_list:
|
||||
if endpoint_type not in c_endpoint_type_list:
|
||||
c_endpoint_type_list.append(endpoint_type)
|
||||
if capabilities.get('endpoint_types') is None:
|
||||
# assign back if 'endpoint_types' is not in capabilities
|
||||
capabilities['endpoint_types'] = c_endpoint_type_list
|
||||
self.subcloud.save()
|
||||
|
||||
# Start threads for the endpoint types
|
||||
if endpoint_type_list:
|
||||
with self.sync_threads_lock:
|
||||
for endpoint_type in endpoint_type_list:
|
||||
# skip creation if a thread of this endpoint type already
|
||||
# exists
|
||||
endpoint_thread_exist = False
|
||||
for exist_thread in self.sync_threads:
|
||||
if endpoint_type == exist_thread.endpoint_type:
|
||||
endpoint_thread_exist = True
|
||||
break
|
||||
if endpoint_thread_exist:
|
||||
continue
|
||||
|
||||
thread = syncthread_subclass_map[endpoint_type](
|
||||
self, endpoint_type=endpoint_type)
|
||||
|
||||
self.sync_threads.append(thread)
|
||||
thread.start()
|
||||
if self.is_ready():
|
||||
thread.enable()
|
||||
thread.initial_sync()
|
||||
|
||||
def remove_sync_endpoint_type(self, endpoint_type_list):
|
||||
# Stop threads for endpoint types to be removed
|
||||
if endpoint_type_list:
|
||||
with self.sync_threads_lock:
|
||||
for endpoint_type in endpoint_type_list:
|
||||
for thread in self.sync_threads:
|
||||
if thread.endpoint_type == endpoint_type:
|
||||
self.sync_threads.remove(thread)
|
||||
thread.shutdown()
|
||||
|
||||
# remove the endpoint types from subcloud capabilities
|
||||
with self.capabilities_lock:
|
||||
capabilities = self.subcloud.capabilities
|
||||
c_endpoint_type_list = capabilities.get('endpoint_types', [])
|
||||
|
||||
if endpoint_type_list and c_endpoint_type_list:
|
||||
for endpoint_type in endpoint_type_list:
|
||||
if endpoint_type in c_endpoint_type_list:
|
||||
c_endpoint_type_list.remove(endpoint_type)
|
||||
self.subcloud.save()
|
||||
|
|
|
@ -37,9 +37,11 @@ LOG = logging.getLogger(__name__)
|
|||
class IdentitySyncThread(SyncThread):
|
||||
"""Manages tasks related to resource management for keystone."""
|
||||
|
||||
def __init__(self, subcloud_engine):
|
||||
super(IdentitySyncThread, self).__init__(subcloud_engine)
|
||||
self.endpoint_type = consts.ENDPOINT_TYPE_IDENTITY
|
||||
def __init__(self, subcloud_engine, endpoint_type=None):
|
||||
super(IdentitySyncThread, self).__init__(subcloud_engine,
|
||||
endpoint_type=endpoint_type)
|
||||
if not self.endpoint_type:
|
||||
self.endpoint_type = consts.ENDPOINT_TYPE_IDENTITY
|
||||
self.sync_handler_map = {
|
||||
consts.RESOURCE_TYPE_IDENTITY_USERS:
|
||||
self.sync_identity_resource,
|
||||
|
@ -92,7 +94,7 @@ class IdentitySyncThread(SyncThread):
|
|||
if (not self.sc_ks_client and self.sc_admin_session):
|
||||
self.sc_ks_client = keystoneclient.Client(
|
||||
session=self.sc_admin_session,
|
||||
endpoint_type=consts.KS_ENDPOINT_INTERNAL,
|
||||
endpoint_type=consts.KS_ENDPOINT_ADMIN,
|
||||
region_name=self.subcloud_engine.subcloud.region_name)
|
||||
# create a dbsync client for the subcloud
|
||||
if (not self.sc_dbs_client and self.sc_admin_session):
|
||||
|
@ -845,7 +847,7 @@ class IdentitySyncThread(SyncThread):
|
|||
raise exceptions.SyncRequestFailed
|
||||
|
||||
sc_user = None
|
||||
sc_user_list = self.sc_ks_client.users.list()
|
||||
sc_user_list = self._get_all_users(self.sc_ks_client)
|
||||
for user in sc_user_list:
|
||||
if user.id == user_id:
|
||||
sc_user = user
|
||||
|
@ -1111,6 +1113,14 @@ class IdentitySyncThread(SyncThread):
|
|||
extra=self.log_extra)
|
||||
return None
|
||||
|
||||
def _get_all_users(self, client):
|
||||
domains = client.domains.list()
|
||||
users = []
|
||||
for domain in domains:
|
||||
domain_users = client.users.list(domain=domain)
|
||||
users = users + domain_users
|
||||
return users
|
||||
|
||||
def _get_users_resource(self, client):
|
||||
try:
|
||||
services = []
|
||||
|
@ -1132,7 +1142,7 @@ class IdentitySyncThread(SyncThread):
|
|||
filtered_users.append(user)
|
||||
# get users from keystone API
|
||||
else:
|
||||
users = client.users.list()
|
||||
users = self._get_all_users(client)
|
||||
for user in users:
|
||||
user_name = user.name
|
||||
if all(user_name != service.name for service in services)\
|
||||
|
@ -1333,12 +1343,7 @@ class IdentitySyncThread(SyncThread):
|
|||
same_local_user = (m.local_user.domain_id ==
|
||||
sc.local_user.domain_id and
|
||||
m.local_user.name == sc.local_user.name and
|
||||
# Foreign key to user.id
|
||||
m.local_user.user_id == sc.local_user.user_id and
|
||||
m.local_user.failed_auth_count ==
|
||||
sc.local_user.failed_auth_count and
|
||||
m.local_user.failed_auth_at ==
|
||||
sc.local_user.failed_auth_at)
|
||||
m.local_user.user_id == sc.local_user.user_id)
|
||||
if not same_local_user:
|
||||
return False
|
||||
|
||||
|
@ -1653,3 +1658,17 @@ class IdentitySyncThread(SyncThread):
|
|||
sc_r))
|
||||
return True
|
||||
return False
|
||||
|
||||
# check if the subcloud resource (from dcorch subcloud_resource table)
|
||||
# exists in subcloud resources.
|
||||
def resource_exists_in_subcloud(self, subcloud_rsrc, sc_resources):
|
||||
exist = False
|
||||
for sc_r in sc_resources:
|
||||
if subcloud_rsrc.subcloud_resource_id == sc_r.id:
|
||||
LOG.debug("Resource {} exists in subcloud {}"
|
||||
.format(subcloud_rsrc.subcloud_resource_id,
|
||||
self.subcloud_engine.subcloud.region_name),
|
||||
extra=self.log_extra)
|
||||
exist = True
|
||||
break
|
||||
return exist
|
||||
|
|
|
@ -60,9 +60,9 @@ class SyncThread(object):
|
|||
|
||||
MAX_RETRY = 2
|
||||
|
||||
def __init__(self, subcloud_engine):
|
||||
def __init__(self, subcloud_engine, endpoint_type=None):
|
||||
super(SyncThread, self).__init__()
|
||||
self.endpoint_type = None # endpoint type in keystone
|
||||
self.endpoint_type = endpoint_type # endpoint type
|
||||
self.subcloud_engine = subcloud_engine # engine that owns this obj
|
||||
self.thread = None # thread running sync()
|
||||
self.audit_thread = None
|
||||
|
@ -118,13 +118,23 @@ class SyncThread(object):
|
|||
# The specific SyncThread subclasses may extend this.
|
||||
loader = loading.get_plugin_loader(
|
||||
cfg.CONF.keystone_authtoken.auth_type)
|
||||
|
||||
config = None
|
||||
if self.endpoint_type in consts.ENDPOINT_TYPES_LIST:
|
||||
config = cfg.CONF.cache
|
||||
elif self.endpoint_type in consts.ENDPOINT_TYPES_LIST_OS:
|
||||
config = cfg.CONF.openstack_cache
|
||||
else:
|
||||
raise exceptions.EndpointNotSupported(
|
||||
endpoint=self.endpoint_type)
|
||||
|
||||
auth = loader.load_from_options(
|
||||
auth_url=cfg.CONF.cache.auth_uri,
|
||||
username=cfg.CONF.cache.admin_username,
|
||||
password=cfg.CONF.cache.admin_password,
|
||||
project_name=cfg.CONF.cache.admin_tenant,
|
||||
project_domain_name=cfg.CONF.cache.admin_project_domain_name,
|
||||
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
||||
auth_url=config.auth_uri,
|
||||
username=config.admin_username,
|
||||
password=config.admin_password,
|
||||
project_name=config.admin_tenant,
|
||||
project_domain_name=config.admin_project_domain_name,
|
||||
user_domain_name=config.admin_user_domain_name)
|
||||
self.admin_session = session.Session(
|
||||
auth=auth, timeout=60, additional_headers=consts.USER_HEADER)
|
||||
# keystone client
|
||||
|
@ -148,7 +158,7 @@ class SyncThread(object):
|
|||
name='keystone', type='identity')
|
||||
sc_auth_url = self.ks_client.endpoints.list(
|
||||
service=identity_service[0].id,
|
||||
interface=consts.KS_ENDPOINT_INTERNAL,
|
||||
interface=consts.KS_ENDPOINT_ADMIN,
|
||||
region=self.subcloud_engine.subcloud.region_name)
|
||||
try:
|
||||
LOG.info("Found sc_auth_url: {}".format(sc_auth_url))
|
||||
|
@ -161,13 +171,20 @@ class SyncThread(object):
|
|||
|
||||
loader = loading.get_plugin_loader(
|
||||
cfg.CONF.keystone_authtoken.auth_type)
|
||||
|
||||
config = None
|
||||
if self.endpoint_type in consts.ENDPOINT_TYPES_LIST:
|
||||
config = cfg.CONF.cache
|
||||
elif self.endpoint_type in consts.ENDPOINT_TYPES_LIST_OS:
|
||||
config = cfg.CONF.openstack_cache
|
||||
|
||||
sc_auth = loader.load_from_options(
|
||||
auth_url=sc_auth_url,
|
||||
username=cfg.CONF.cache.admin_username,
|
||||
password=cfg.CONF.cache.admin_password,
|
||||
project_name=cfg.CONF.cache.admin_tenant,
|
||||
project_domain_name=cfg.CONF.cache.admin_project_domain_name,
|
||||
user_domain_name=cfg.CONF.cache.admin_user_domain_name)
|
||||
username=config.admin_username,
|
||||
password=config.admin_password,
|
||||
project_name=config.admin_tenant,
|
||||
project_domain_name=config.admin_project_domain_name,
|
||||
user_domain_name=config.admin_user_domain_name)
|
||||
|
||||
self.sc_admin_session = session.Session(
|
||||
auth=sc_auth, timeout=60,
|
||||
|
@ -616,6 +633,19 @@ class SyncThread(object):
|
|||
.format(subcloud_rsrc.subcloud_resource_id),
|
||||
extra=self.log_extra)
|
||||
continue
|
||||
|
||||
# check if the resource exists in subcloud, no need to
|
||||
# schedule work if it doesn't exist in subcloud.
|
||||
# This is a precautionary action in case the resource
|
||||
# has already be deleted in the subcloud which can happen
|
||||
# for example, user deletes the resource from master right
|
||||
# after an audit (not through api-proxy), then user deletes
|
||||
# that resource manually in the subcloud before the
|
||||
# next audit.
|
||||
if not self.resource_exists_in_subcloud(subcloud_rsrc,
|
||||
sc_resources):
|
||||
continue
|
||||
|
||||
LOG.info("Resource ({}) and subcloud resource ({}) "
|
||||
"not in sync with master cloud"
|
||||
.format(db_resource.master_id,
|
||||
|
@ -749,3 +779,8 @@ class SyncThread(object):
|
|||
|
||||
def get_resource_info(self, resource_type, resource, operation_type=None):
|
||||
return ""
|
||||
|
||||
# check if the subcloud resource (from dcorch subcloud_resource table)
|
||||
# exists in subcloud resources.
|
||||
def resource_exists_in_subcloud(self, subcloud_rsrc, sc_resources):
|
||||
return True
|
||||
|
|
|
@ -34,6 +34,7 @@ class Subcloud(base.OrchestratorObject, base.VersionedObjectDictCompat):
|
|||
'software_version': fields.StringField(),
|
||||
'management_state': fields.StringField(nullable=True),
|
||||
'availability_status': fields.StringField(),
|
||||
'capabilities': fields.DictOfListOfStringsField(),
|
||||
}
|
||||
|
||||
def create(self):
|
||||
|
|
|
@ -98,6 +98,22 @@ class EngineClient(object):
|
|||
management_state=management_state,
|
||||
availability_status=availability_status))
|
||||
|
||||
def add_subcloud_sync_endpoint_type(self, ctxt, subcloud_name,
|
||||
endpoint_type_list):
|
||||
return self.call(
|
||||
ctxt,
|
||||
self.make_msg('add_subcloud_sync_endpoint_type',
|
||||
subcloud_name=subcloud_name,
|
||||
endpoint_type_list=endpoint_type_list))
|
||||
|
||||
def remove_subcloud_sync_endpoint_type(self, ctxt, subcloud_name,
|
||||
endpoint_type_list):
|
||||
return self.call(
|
||||
ctxt,
|
||||
self.make_msg('remove_subcloud_sync_endpoint_type',
|
||||
subcloud_name=subcloud_name,
|
||||
endpoint_type_list=endpoint_type_list))
|
||||
|
||||
def update_subcloud_version(self, ctxt, subcloud_name, sw_version):
|
||||
return self.call(
|
||||
ctxt,
|
||||
|
|
Loading…
Reference in New Issue