Merge "Fix subcloud manage/unmanage issues caused by identity sync"

This commit is contained in:
Zuul 2020-02-18 18:17:56 +00:00 committed by Gerrit Code Review
commit 5929ff1116
20 changed files with 1002 additions and 89 deletions

View File

@ -143,9 +143,14 @@ class PatchAuditManager(manager.Manager):
try:
sc_ks_client = KeystoneClient(subcloud.name)
except (keystone_exceptions.EndpointNotFound, IndexError) as e:
LOG.warn("Identity endpoint for online subcloud %s not found."
" %s" % (subcloud.name, e))
except (keystone_exceptions.EndpointNotFound,
keystone_exceptions.ConnectFailure,
keystone_exceptions.ConnectTimeout,
IndexError):
# Since it takes some time to detect that a subcloud has gone
# offline, these errors are expected from time to time.
LOG.info("Identity endpoint for online subcloud %s not found."
% subcloud.name)
continue
try:

View File

@ -21,6 +21,7 @@
#
from keystoneauth1 import exceptions as keystone_exceptions
from oslo_config import cfg
from oslo_log import log as logging
from fm_api import constants as fm_const
@ -40,8 +41,14 @@ from dcmanager.db import api as db_api
from dcmanager.drivers.openstack.sysinv_v1 import SysinvClient
from dcmanager.manager import scheduler
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
# We will update the state of each subcloud in the dcorch about once per hour.
# Calculate how many iterations that will be.
SUBCLOUD_STATE_UPDATE_ITERATIONS = \
dcorch_consts.SECONDS_IN_HOUR / CONF.scheduler.subcloud_audit_interval
class SubcloudAuditManager(manager.Manager):
"""Manages tasks related to audits."""
@ -60,6 +67,8 @@ class SubcloudAuditManager(manager.Manager):
thread_pool_size=100)
# Track workers created for each subcloud.
self.subcloud_workers = dict()
# Number of audits since last subcloud state update
self.audit_count = 0
def periodic_subcloud_audit(self):
"""Audit availability of subclouds."""
@ -76,6 +85,13 @@ class SubcloudAuditManager(manager.Manager):
# We will be running in our own green thread here.
LOG.info('Triggered subcloud audit.')
self.audit_count += 1
# Determine whether to trigger a state update to each subcloud
if self.audit_count >= SUBCLOUD_STATE_UPDATE_ITERATIONS:
update_subcloud_state = True
else:
update_subcloud_state = False
# Determine whether OpenStack is installed in central cloud
ks_client = KeystoneClient()
@ -106,6 +122,7 @@ class SubcloudAuditManager(manager.Manager):
self.subcloud_workers[subcloud.name] = \
self.thread_group_manager.start(self._audit_subcloud,
subcloud.name,
update_subcloud_state,
openstack_installed)
# Wait for all greenthreads to complete
@ -117,7 +134,8 @@ class SubcloudAuditManager(manager.Manager):
self.subcloud_workers = dict()
LOG.info('All subcloud audits have completed.')
def _audit_subcloud(self, subcloud_name, audit_openstack):
def _audit_subcloud(self, subcloud_name, update_subcloud_state,
audit_openstack):
"""Audit a single subcloud."""
# Retrieve the subcloud
@ -322,6 +340,16 @@ class SubcloudAuditManager(manager.Manager):
'audit_fail_count update: %s' % subcloud_name)
return
elif update_subcloud_state:
# Nothing has changed, but we want to send a state update for this
# subcloud as an audit. Get the most up-to-date data.
subcloud = db_api.subcloud_get_by_name(self.context, subcloud_name)
self.dcorch_rpc_client. \
update_subcloud_states(self.context,
subcloud_name,
subcloud.management_state,
subcloud.availability_status)
if audit_openstack and sysinv_client:
# get a list of installed apps in the subcloud
try:
@ -345,6 +373,8 @@ class SubcloudAuditManager(manager.Manager):
dco_update_func = None
if openstack_installed_current and not openstack_installed:
dcm_update_func = db_api.subcloud_status_create
# TODO(andy.ning): This RPC will block for the duration of the
# initial sync. It needs to be made non-blocking.
dco_update_func = self.dcorch_rpc_client.\
add_subcloud_sync_endpoint_type
elif not openstack_installed_current and openstack_installed:

View File

@ -10,7 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2017 Wind River Systems, Inc.
# Copyright (c) 2017-2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
@ -253,9 +253,77 @@ class TestAuditManager(base.DCManagerTestCase):
self.fake_sysinv_client.get_application_results = []
# Audit the subcloud
am._audit_subcloud(subcloud.name, audit_openstack=False)
am._audit_subcloud(subcloud.name, update_subcloud_state=False,
audit_openstack=False)
# Verify the subcloud was set to online
self.fake_dcorch_api.update_subcloud_states.assert_called_with(
mock.ANY, subcloud.name, consts.MANAGEMENT_UNMANAGED,
consts.AVAILABILITY_ONLINE)
# Verify the openstack endpoints were not added
self.fake_dcorch_api.add_subcloud_sync_endpoint_type.\
assert_not_called()
# Verify the subcloud openstack_installed was not updated
updated_subcloud = db_api.subcloud_get_by_name(self.ctx, 'subcloud1')
self.assertEqual(updated_subcloud.openstack_installed, False)
def test_audit_subcloud_online_no_change(self):
subcloud = self.create_subcloud_static(self.ctx, name='subcloud1')
self.assertIsNotNone(subcloud)
mock_sm = mock.Mock()
am = subcloud_audit_manager.SubcloudAuditManager(
subcloud_manager=mock_sm)
# No stx-openstack application
self.fake_sysinv_client.get_application_results = []
# Set the subcloud to online
db_api.subcloud_update(
self.ctx, subcloud.id,
availability_status=consts.AVAILABILITY_ONLINE)
# Audit the subcloud
am._audit_subcloud(subcloud.name, update_subcloud_state=False,
audit_openstack=False)
# Verify the subcloud state was not updated
self.fake_dcorch_api.update_subcloud_states.assert_not_called()
# Verify the openstack endpoints were not added
self.fake_dcorch_api.add_subcloud_sync_endpoint_type.\
assert_not_called()
# Verify the subcloud openstack_installed was not updated
updated_subcloud = db_api.subcloud_get_by_name(self.ctx, 'subcloud1')
self.assertEqual(updated_subcloud.openstack_installed, False)
def test_audit_subcloud_online_no_change_force_update(self):
subcloud = self.create_subcloud_static(self.ctx, name='subcloud1')
self.assertIsNotNone(subcloud)
mock_sm = mock.Mock()
am = subcloud_audit_manager.SubcloudAuditManager(
subcloud_manager=mock_sm)
# No stx-openstack application
self.fake_sysinv_client.get_application_results = []
# Set the subcloud to online
db_api.subcloud_update(
self.ctx, subcloud.id,
availability_status=consts.AVAILABILITY_ONLINE)
# Audit the subcloud and force a state update
am._audit_subcloud(subcloud.name, update_subcloud_state=True,
audit_openstack=False)
# Verify the subcloud state was updated even though no change
self.fake_dcorch_api.update_subcloud_states.assert_called_with(
mock.ANY, 'subcloud1', consts.MANAGEMENT_UNMANAGED,
consts.AVAILABILITY_ONLINE)
@ -288,7 +356,8 @@ class TestAuditManager(base.DCManagerTestCase):
self.fake_sysinv_client.get_service_groups_result[3].state = 'inactive'
# Audit the subcloud
am._audit_subcloud(subcloud.name, audit_openstack=False)
am._audit_subcloud(subcloud.name, update_subcloud_state=False,
audit_openstack=False)
# Verify the subcloud was not set to offline
self.fake_dcorch_api.update_subcloud_states.assert_not_called()
@ -298,7 +367,8 @@ class TestAuditManager(base.DCManagerTestCase):
self.assertEqual(updated_subcloud.audit_fail_count, 1)
# Audit the subcloud again
am._audit_subcloud(subcloud.name, audit_openstack=False)
am._audit_subcloud(subcloud.name, update_subcloud_state=False,
audit_openstack=False)
# Verify the subcloud was set to offline
self.fake_dcorch_api.update_subcloud_states.assert_called_with(
@ -320,7 +390,8 @@ class TestAuditManager(base.DCManagerTestCase):
subcloud_manager=mock_sm)
# Audit the subcloud
am._audit_subcloud(subcloud.name, audit_openstack=True)
am._audit_subcloud(subcloud.name, update_subcloud_state=False,
audit_openstack=True)
# Verify the subcloud was set to online
self.fake_dcorch_api.update_subcloud_states.assert_called_with(

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2017-2019 Wind River Systems, Inc.
# Copyright (c) 2017-2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
@ -188,3 +188,10 @@ ALARM_DEGRADED_STATUS = "degraded"
ALARM_CRITICAL_STATUS = "critical"
SECONDS_IN_HOUR = 3600
# Subcloud initial sync state
INITIAL_SYNC_STATE_NONE = "none"
INITIAL_SYNC_STATE_REQUESTED = "requested"
INITIAL_SYNC_STATE_IN_PROGRESS = "in-progress"
INITIAL_SYNC_STATE_COMPLETED = "completed"
INITIAL_SYNC_STATE_FAILED = "failed"

View File

@ -13,6 +13,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
"""
DC Orchestrator base exception handling.
@ -137,6 +140,10 @@ class InvalidParameterValue(Invalid):
message = _("%(err)s")
class SubcloudAlreadyExists(Conflict):
message = _("Subcloud with region_name=%(region_name)s already exists")
class SubcloudResourceAlreadyExists(Conflict):
message = _("Subcloud resource with subcloud_id=%(subcloud_id)s "
"resource_id=%(resource_id)s already exists")

View File

@ -12,6 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
"""
Interface for database access.
@ -139,10 +142,12 @@ def subcloud_get(context, region_id):
def subcloud_get_all(context, region_name=None,
management_state=None,
availability_status=None):
availability_status=None,
initial_sync_state=None):
return IMPL.subcloud_get_all(context, region_name=region_name,
management_state=management_state,
availability_status=availability_status)
availability_status=availability_status,
initial_sync_state=initial_sync_state)
def subcloud_create(context, region_name, values):

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2017-2018 Wind River Systems, Inc.
# Copyright (c) 2017-2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
@ -403,14 +403,15 @@ def add_filter_by_many_identities(query, model, values):
:return: tuple (Modified query, filter field name).
"""
if not values:
raise exception.InvalidIdentity(identity=values)
raise exception.Invalid()
value = values[0]
if strutils.is_int_like(value):
return query.filter(getattr(model, 'id').in_(values)), 'id'
elif uuidutils.is_uuid_like(value):
return query.filter(getattr(model, 'uuid').in_(values)), 'uuid'
else:
raise exception.InvalidIdentity(identity=value)
raise exception.InvalidParameterValue(
err="Invalid identity filter value %s" % value)
@require_context
@ -436,7 +437,8 @@ def subcloud_get(context, region_id):
@require_context
def subcloud_get_all(context, region_name=None,
management_state=None,
availability_status=None):
availability_status=None,
initial_sync_state=None):
query = model_query(context, models.Subcloud). \
filter_by(deleted=0)
@ -446,6 +448,8 @@ def subcloud_get_all(context, region_name=None,
query = query.filter_by(management_state=management_state)
if availability_status:
query = query.filter_by(availability_status=availability_status)
if initial_sync_state:
query = query.filter_by(initial_sync_state=initial_sync_state)
return query.all()

View File

@ -0,0 +1,33 @@
# Copyright (c) 2020 Wind River Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy
def upgrade(migrate_engine):
meta = sqlalchemy.MetaData()
meta.bind = migrate_engine
subcloud = sqlalchemy.Table('subcloud', meta, autoload=True)
# Add the initial_sync_state attribute
subcloud.create_column(sqlalchemy.Column('initial_sync_state',
sqlalchemy.String(64),
default="none"))
def downgrade(migrate_engine):
raise NotImplementedError('Database downgrade not supported - '
'would drop all tables')

View File

@ -1,16 +0,0 @@
# Copyright (c) 2018 Wind River Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def upgrade(migrate_engine):
pass

View File

@ -1,4 +1,4 @@
# Copyright (c) 2017-2018 Wind River Systems, Inc.
# Copyright (c) 2017-2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
@ -182,6 +182,8 @@ class Subcloud(BASE, OrchestratorBase):
availability_status = Column('availability_status', String(64),
default=dcm_consts.AVAILABILITY_OFFLINE)
capabilities = Column(JSONEncodedDict)
initial_sync_state = Column('initial_sync_state', String(64),
default=consts.INITIAL_SYNC_STATE_NONE)
class SubcloudAlarmSummary(BASE, OrchestratorBase):

View File

@ -12,6 +12,9 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
from oslo_log import log as logging
@ -59,6 +62,38 @@ class GenericSyncManager(object):
for subcloud_engine in self.subcloud_engines.values():
subcloud_engine.wake(endpoint_type)
def subcloud_state_matches(self, subcloud_name,
management_state=None,
availability_status=None,
initial_sync_state=None):
try:
subcloud_engine = self.subcloud_engines[subcloud_name]
return subcloud_engine.state_matches(
management_state=management_state,
availability_status=availability_status,
initial_sync_state=initial_sync_state)
except KeyError:
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
def update_subcloud_state(self, subcloud_name,
management_state=None,
availability_status=None,
initial_sync_state=None):
try:
subcloud_engine = self.subcloud_engines[subcloud_name]
LOG.info('updating state for subcloud %(sc)s - '
'management_state: %(mgmt)s '
'availability_status: %(avail)s '
'initial_sync_state: %(iss)s' %
{'sc': subcloud_name, 'mgmt': management_state,
'avail': availability_status, 'iss': initial_sync_state})
subcloud_engine.update_state(
management_state=management_state,
availability_status=availability_status,
initial_sync_state=initial_sync_state)
except KeyError:
raise exceptions.SubcloudNotFound(region_name=subcloud_name)
def enable_subcloud(self, context, subcloud_name):
try:
subcloud_engine = self.subcloud_engines[subcloud_name]

View File

@ -0,0 +1,192 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
import eventlet
from oslo_log import log as logging
from dcorch.common import consts
from dcorch.common import context
from dcorch.db import api as db_api
from dcorch.engine import scheduler
LOG = logging.getLogger(__name__)
# How often the initial sync thread will wake up
SYNC_INTERVAL = 10
# How long to wait after a failed sync before retrying
SYNC_FAIL_HOLD_OFF = 60
class InitialSyncManager(object):
"""Manages the initial sync for each subcloud."""
def __init__(self, gsm, fkm, aam, *args, **kwargs):
super(InitialSyncManager, self).__init__()
self.gsm = gsm
self.fkm = fkm
self.aam = aam
self.context = context.get_admin_context()
# Keeps track of greenthreads we create to do work.
self.thread_group_manager = scheduler.ThreadGroupManager(
thread_pool_size=50)
# Track greenthreads created for each subcloud.
self.subcloud_threads = dict()
def init_actions(self):
"""Perform actions on initialization"""
# Since we are starting up, any initial syncs that were in progress
# should be considered failed and must be redone.
for subcloud in db_api.subcloud_get_all(
self.context,
initial_sync_state=consts.INITIAL_SYNC_STATE_IN_PROGRESS):
LOG.info('Initial sync for subcloud %s was in progress and will '
'be re-attempted' % subcloud.region_name)
self.gsm.update_subcloud_state(
subcloud.region_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
# Since we are starting up, any failed syncs won't be re-attempted
# because the timer will not be running. Reattempt them.
for subcloud in db_api.subcloud_get_all(
self.context,
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED):
LOG.info('Initial sync for subcloud %s was failed and will '
'be re-attempted' % subcloud.region_name)
self.gsm.update_subcloud_state(
subcloud.region_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
def initial_sync_thread(self):
"""Perform initial sync for subclouds as required."""
while True:
# Catch exceptions so the thread does not die.
try:
eventlet.greenthread.sleep(SYNC_INTERVAL)
self._initial_sync_subclouds()
except eventlet.greenlet.GreenletExit:
# We have been told to exit
return
except Exception as e:
LOG.exception(e)
def _initial_sync_subclouds(self):
"""Perform initial sync for subclouds that require it."""
LOG.debug('Starting initial sync loop.')
for subcloud in db_api.subcloud_get_all(
self.context,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED):
# Create a new greenthread for each subcloud to allow the
# initial syncs to be done in parallel. If there are not enough
# greenthreads in the pool, this will block until one becomes
# available.
self.subcloud_threads[subcloud.region_name] = \
self.thread_group_manager.start(
self._initial_sync_subcloud, subcloud.region_name)
# Wait for all greenthreads to complete. This both throttles the
# initial syncs and ensures we don't attempt to do an initial sync
# for a subcloud before a previous initial sync completes.
LOG.debug('Waiting for initial syncs to complete.')
for thread in self.subcloud_threads.values():
thread.wait()
# Clear the list of threads before next audit
self.subcloud_threads = dict()
LOG.debug('All subcloud initial syncs have completed.')
def _initial_sync_subcloud(self, subcloud_name):
"""Perform initial sync for a subcloud.
This runs in a separate greenthread for each subcloud.
"""
LOG.info('Initial sync for subcloud %s' % subcloud_name)
# Verify that the sync state hasn't changed (there can be a delay
# before the greenthread runs).
if not self.gsm.subcloud_state_matches(
subcloud_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED):
# Sync is no longer required
LOG.info('Initial sync for subcloud %s no longer required' %
subcloud_name)
return
# Indicate that initial sync has started
self.gsm.update_subcloud_state(
subcloud_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_IN_PROGRESS)
# Initial sync. It's synchronous so that identity
# get synced before fernet token keys are synced. This is
# necessary since we want to revoke all existing tokens on
# this subcloud after its services user IDs and project
# IDs are changed. Otherwise subcloud services will fail
# authentication since they keep on using their existing tokens
# issued before these IDs change, until these tokens expires.
new_state = consts.INITIAL_SYNC_STATE_COMPLETED
try:
self.gsm.initial_sync(self.context, subcloud_name)
self.fkm.distribute_keys(self.context, subcloud_name)
self.aam.enable_snmp(self.context, subcloud_name)
except Exception as e:
LOG.exception('Initial sync failed for %s: %s', subcloud_name, e)
# We need to try again
new_state = consts.INITIAL_SYNC_STATE_FAILED
# Verify that the sync wasn't cancelled while we did the sync (for
# example, the subcloud could have been unmanaged).
if self.gsm.subcloud_state_matches(
subcloud_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_IN_PROGRESS):
# Update initial sync state
self.gsm.update_subcloud_state(subcloud_name,
initial_sync_state=new_state)
if new_state == consts.INITIAL_SYNC_STATE_COMPLETED:
# The initial sync was completed and we have updated the
# subcloud state. Now we can enable syncing for the subcloud.
self.gsm.enable_subcloud(self.context, subcloud_name)
elif new_state == consts.INITIAL_SYNC_STATE_FAILED:
# Start a "timer" to wait a bit before re-attempting the sync.
# This thread is not taken from the thread pool, because we
# don't want a large number of failed syncs to prevent new
# subclouds from syncing.
eventlet.greenthread.spawn_after(SYNC_FAIL_HOLD_OFF,
self._reattempt_sync,
subcloud_name)
pass
else:
LOG.error('Unexpected new_state %s for subcloud %s' %
(new_state, subcloud_name))
else:
LOG.info('Initial sync was cancelled for subcloud %s while in '
'progress' % subcloud_name)
def _reattempt_sync(self, subcloud_name):
# Verify that the sync state hasn't changed since the last attempt.
if not self.gsm.subcloud_state_matches(
subcloud_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED):
# Sync is no longer required
LOG.info('Reattempt initial sync for subcloud %s no longer '
'required' % subcloud_name)
return
self.gsm.update_subcloud_state(
subcloud_name,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)

View File

@ -9,6 +9,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
import time
@ -26,10 +29,10 @@ wallclock = time.time
class ThreadGroupManager(object):
"""Thread group manager."""
def __init__(self):
def __init__(self, *args, **kwargs):
super(ThreadGroupManager, self).__init__()
self.threads = {}
self.group = threadgroup.ThreadGroup()
self.group = threadgroup.ThreadGroup(*args, **kwargs)
# Create dummy service task, because when there is nothing queued
# on self.tg the process exits

View File

@ -9,6 +9,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
import six
import time
@ -24,10 +27,10 @@ from dcorch.common import context
from dcorch.common import exceptions
from dcorch.common.i18n import _
from dcorch.common import messaging as rpc_messaging
from dcorch.db import api as db_api
from dcorch.engine.alarm_aggregate_manager import AlarmAggregateManager
from dcorch.engine.fernet_key_manager import FernetKeyManager
from dcorch.engine.generic_sync_manager import GenericSyncManager
from dcorch.engine.initial_sync_manager import InitialSyncManager
from dcorch.engine.quota_manager import QuotaManager
from dcorch.engine import scheduler
from dcorch.objects import service as service_obj
@ -80,6 +83,7 @@ class EngineService(service.Service):
self.gsm = None
self.aam = None
self.fkm = None
self.ism = None
def init_tgm(self):
self.TG = scheduler.ThreadGroupManager()
@ -98,6 +102,11 @@ class EngineService(service.Service):
def init_fkm(self):
self.fkm = FernetKeyManager(self.gsm)
def init_ism(self):
self.ism = InitialSyncManager(self.gsm, self.fkm, self.aam)
self.ism.init_actions()
self.TG.start(self.ism.initial_sync_thread)
def start(self):
self.engine_id = uuidutils.generate_uuid()
self.init_tgm()
@ -105,6 +114,7 @@ class EngineService(service.Service):
self.init_gsm()
self.init_aam()
self.init_fkm()
self.init_ism()
target = oslo_messaging.Target(version=self.rpc_api_version,
server=self.host,
topic=self.topic)
@ -190,45 +200,39 @@ class EngineService(service.Service):
def update_subcloud_states(self, ctxt, subcloud_name,
management_state,
availability_status):
# keep equivalent functionality for now
"""Handle subcloud state updates from dcmanager
These state updates must be processed quickly. Any work triggered by
these state updates must be done asynchronously, without delaying the
reply to the dcmanager. For example, it is not acceptable to
communicate with a subcloud while handling the state update.
"""
# Check if state has changed before doing anything
if self.gsm.subcloud_state_matches(
subcloud_name,
management_state=management_state,
availability_status=availability_status):
# No change in state - nothing to do.
LOG.debug('Ignoring unchanged state update for %s' % subcloud_name)
return
# Check if the subcloud is ready to sync.
if (management_state == dcm_consts.MANAGEMENT_MANAGED) and \
(availability_status == dcm_consts.AVAILABILITY_ONLINE):
# Initial identity sync. It's synchronous so that identity
# get synced before fernet token keys are synced. This is
# necessary since we want to revoke all existing tokens on
# this subcloud after its services user IDs and project
# IDs are changed. Otherwise subcloud services will fail
# authentication since they keep on using their existing tokens
# issued before these IDs change, until these tokens expires.
try:
self.gsm.initial_sync(ctxt, subcloud_name)
self.fkm.distribute_keys(ctxt, subcloud_name)
self.aam.enable_snmp(ctxt, subcloud_name)
self.gsm.enable_subcloud(ctxt, subcloud_name)
except Exception as ex:
LOG.warning('Update subcloud state failed for %s: %s',
subcloud_name, six.text_type(ex))
raise
# Update the subcloud state and schedule an initial sync
self.gsm.update_subcloud_state(
subcloud_name,
management_state=management_state,
availability_status=availability_status,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
else:
subcloud = db_api.subcloud_get(context.get_admin_context(),
subcloud_name)
reset_fernet_keys = False
# disable_subcloud unmanages the subcloud so we need this check
# here instead of later. We need to prevent reset of fernet keys
# when the subcloud goes online for the first time. Fernet keys
# should be reset only when the user unmanages the subcloud.
# Resetting fernet keys before that can result in failures with
# keystone operations while the keys are being reset. Without this
# check, the initial state of "unmanaged" will also trigger a
# fernet key reset unintentionally.
if subcloud['management_state'] == dcm_consts.MANAGEMENT_MANAGED\
and management_state == dcm_consts.MANAGEMENT_UNMANAGED:
reset_fernet_keys = True
self.gsm.disable_subcloud(ctxt, subcloud_name)
if reset_fernet_keys:
self.fkm.reset_keys(subcloud_name)
# Update the subcloud state and cancel the initial sync
self.gsm.update_subcloud_state(
subcloud_name,
management_state=management_state,
availability_status=availability_status,
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
@request_context
def add_subcloud_sync_endpoint_type(self, ctxt, subcloud_name,

View File

@ -1,4 +1,4 @@
# Copyright 2017 Wind River
# Copyright 2017-2020 Wind River
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -88,20 +88,52 @@ class SubCloudEngine(object):
# is this subcloud enabled
self.lock.acquire()
status = self.subcloud.availability_status
initial_sync_state = self.subcloud.initial_sync_state
self.lock.release()
return status == dcm_consts.AVAILABILITY_ONLINE
# We only enable syncing if the subcloud is online and the initial
# sync has completed.
if status == dcm_consts.AVAILABILITY_ONLINE and \
initial_sync_state == dco_consts.INITIAL_SYNC_STATE_COMPLETED:
return True
else:
return False
def is_ready(self):
# is this subcloud ready for synchronization
return self.is_managed() and self.is_enabled()
def enable(self):
# set subcloud availability to online
def state_matches(self, management_state=None, availability_status=None,
initial_sync_state=None):
# compare subcloud states
match = True
self.lock.acquire()
self.subcloud.management_state = dcm_consts.MANAGEMENT_MANAGED
self.subcloud.availability_status = dcm_consts.AVAILABILITY_ONLINE
if management_state is not None:
if self.subcloud.management_state != management_state:
match = False
if match and availability_status is not None:
if self.subcloud.availability_status != availability_status:
match = False
if match and initial_sync_state is not None:
if self.subcloud.initial_sync_state != initial_sync_state:
match = False
self.lock.release()
return match
def update_state(self, management_state=None, availability_status=None,
initial_sync_state=None):
# set subcloud states
self.lock.acquire()
if management_state is not None:
self.subcloud.management_state = management_state
if availability_status is not None:
self.subcloud.availability_status = availability_status
if initial_sync_state is not None:
self.subcloud.initial_sync_state = initial_sync_state
self.subcloud.save()
self.lock.release()
def enable(self):
# enable syncing for this subcloud
for thread in self.sync_threads:
thread.enable()
@ -113,23 +145,18 @@ class SubCloudEngine(object):
thread.wake()
def disable(self):
# set subcloud availability to offline
self.lock.acquire()
self.subcloud.management_state = dcm_consts.MANAGEMENT_UNMANAGED
self.subcloud.availability_status = dcm_consts.AVAILABILITY_OFFLINE
self.subcloud.save()
self.lock.release()
# nothing to do here at the moment
pass
def shutdown(self):
def delete(self):
# first update the state of the subcloud
self.update_state(management_state=dcm_consts.MANAGEMENT_UNMANAGED,
availability_status=dcm_consts.AVAILABILITY_OFFLINE)
# shutdown, optionally deleting queued work
self.disable()
while self.sync_threads:
thread = self.sync_threads.pop()
thread.shutdown()
def delete(self):
# delete this subcloud
self.shutdown()
self.subcloud.delete()
def initial_sync(self):

View File

@ -12,6 +12,9 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
"""Subcloud object."""
@ -35,6 +38,7 @@ class Subcloud(base.OrchestratorObject, base.VersionedObjectDictCompat):
'management_state': fields.StringField(nullable=True),
'availability_status': fields.StringField(),
'capabilities': fields.DictOfListOfStringsField(),
'initial_sync_state': fields.StringField(),
}
def create(self):

View File

@ -12,9 +12,50 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
import sqlalchemy
from oslo_config import cfg
from oslo_db import options
from dcorch.db import api as api
from dcorch.db.sqlalchemy import api as db_api
from dcorch.tests import utils
from oslotest import base
get_engine = api.get_engine
class OrchestratorTestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
def setUp(self):
super(OrchestratorTestCase, self).setUp()
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctx = utils.dummy_context()

View File

@ -0,0 +1,213 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import mock
from dcmanager.common import consts as dcm_consts
from dcorch.common import consts
from dcorch.common import exceptions
from dcorch.db.sqlalchemy import api as db_api
from dcorch.engine import generic_sync_manager
from dcorch.tests import base
class TestGenericSyncManager(base.OrchestratorTestCase):
def setUp(self):
super(TestGenericSyncManager, self).setUp()
# Mock SubCloudEngine methods
p = mock.patch(
'dcorch.engine.subcloud.SubCloudEngine.spawn_sync_threads')
self.mock_spawn_sync_threads = p.start()
self.addCleanup(p.stop)
@staticmethod
def create_subcloud_static(ctxt, name, **kwargs):
values = {
'software_version': '10.04',
'management_state': dcm_consts.MANAGEMENT_MANAGED,
'availability_status': dcm_consts.AVAILABILITY_ONLINE,
'initial_sync_state': '',
'capabilities': {},
}
values.update(kwargs)
return db_api.subcloud_create(ctxt, name, values=values)
def test_init(self):
gsm = generic_sync_manager.GenericSyncManager()
self.assertIsNotNone(gsm)
def test_init_from_db(self):
self.create_subcloud_static(
self.ctx,
name='subcloud1')
self.create_subcloud_static(
self.ctx,
name='subcloud2')
self.create_subcloud_static(
self.ctx,
name='subcloud3')
gsm = generic_sync_manager.GenericSyncManager()
# Initialize from the DB
gsm.init_from_db(self.ctx)
# Verify the engines were created
self.assertEqual(
gsm.subcloud_engines['subcloud1'].subcloud.region_name,
'subcloud1')
self.assertEqual(
gsm.subcloud_engines['subcloud2'].subcloud.region_name,
'subcloud2')
self.assertEqual(
gsm.subcloud_engines['subcloud3'].subcloud.region_name,
'subcloud3')
def test_subcloud_state_matches(self):
self.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
gsm = generic_sync_manager.GenericSyncManager()
# Initialize from the DB
gsm.init_from_db(self.ctx)
# Compare all states (match)
match = gsm.subcloud_state_matches(
'subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
self.assertTrue(match)
# Compare all states (not a match)
match = gsm.subcloud_state_matches(
'subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_OFFLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
self.assertFalse(match)
# Compare one state (match)
match = gsm.subcloud_state_matches(
'subcloud1',
availability_status=dcm_consts.AVAILABILITY_ONLINE)
self.assertTrue(match)
# Compare one state (not a match)
match = gsm.subcloud_state_matches(
'subcloud1',
initial_sync_state='')
self.assertFalse(match)
def test_subcloud_state_matches_missing(self):
self.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
gsm = generic_sync_manager.GenericSyncManager()
# Initialize from the DB
gsm.init_from_db(self.ctx)
# Compare all states for missing subcloud
self.assertRaises(
exceptions.SubcloudNotFound,
gsm.subcloud_state_matches,
'subcloud2',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
def test_update_subcloud_state(self):
self.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
gsm = generic_sync_manager.GenericSyncManager()
# Initialize from the DB
gsm.init_from_db(self.ctx)
# Update all states
gsm.update_subcloud_state(
'subcloud1',
management_state=dcm_consts.MANAGEMENT_UNMANAGED,
availability_status=dcm_consts.AVAILABILITY_OFFLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED)
# Compare all states (match)
match = gsm.subcloud_state_matches(
'subcloud1',
management_state=dcm_consts.MANAGEMENT_UNMANAGED,
availability_status=dcm_consts.AVAILABILITY_OFFLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED)
self.assertTrue(match)
# Update one state
gsm.update_subcloud_state(
'subcloud1',
availability_status=dcm_consts.AVAILABILITY_ONLINE)
# Compare all states (match)
match = gsm.subcloud_state_matches(
'subcloud1',
management_state=dcm_consts.MANAGEMENT_UNMANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_COMPLETED)
self.assertTrue(match)
def test_update_subcloud_state_missing(self):
self.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
gsm = generic_sync_manager.GenericSyncManager()
# Initialize from the DB
gsm.init_from_db(self.ctx)
# Update all states for missing subcloud
self.assertRaises(
exceptions.SubcloudNotFound,
gsm.update_subcloud_state,
'subcloud2',
management_state=dcm_consts.MANAGEMENT_MANAGED,
availability_status=dcm_consts.AVAILABILITY_ONLINE,
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)

View File

@ -0,0 +1,246 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2020 Wind River Systems, Inc.
#
# The right to copy, distribute, modify, or otherwise make use
# of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement.
#
import mock
from dcmanager.common import consts as dcm_consts
from dcorch.common import consts
from dcorch.db.sqlalchemy import api as db_api
from dcorch.engine import initial_sync_manager
from dcorch.tests import base
class FakeGSM(object):
def __init__(self, ctx):
self.ctx = ctx
self.initial_sync = mock.MagicMock()
self.enable_subcloud = mock.MagicMock()
def update_subcloud_state(self, name, initial_sync_state):
db_api.subcloud_update(
self.ctx,
name,
values={'initial_sync_state': initial_sync_state})
def subcloud_state_matches(self, name, initial_sync_state):
subcloud = db_api.subcloud_get(self.ctx, name)
return subcloud.initial_sync_state == initial_sync_state
class FakeFKM(object):
def __init__(self):
self.distribute_keys = mock.MagicMock()
class FakeAAM(object):
def __init__(self):
self.enable_snmp = mock.MagicMock()
class TestInitialSyncManager(base.OrchestratorTestCase):
def setUp(self):
super(TestInitialSyncManager, self).setUp()
# Mock eventlet
p = mock.patch('eventlet.greenthread.spawn_after')
self.mock_eventlet_spawn_after = p.start()
self.addCleanup(p.stop)
# Mock the context
p = mock.patch.object(initial_sync_manager, 'context')
self.mock_context = p.start()
self.mock_context.get_admin_context.return_value = self.ctx
self.addCleanup(p.stop)
# Mock the GSM, FKM and AAM
self.fake_gsm = FakeGSM(self.ctx)
self.fake_fkm = FakeFKM()
self.fake_aam = FakeAAM()
@staticmethod
def create_subcloud_static(ctxt, name, **kwargs):
values = {
'software_version': '10.04',
'availability_status': dcm_consts.AVAILABILITY_ONLINE,
}
values.update(kwargs)
return db_api.subcloud_create(ctxt, name, values=values)
def test_init(self):
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
self.assertIsNotNone(ism)
self.assertEqual(self.ctx, ism.context)
def test_init_actions(self):
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud2',
initial_sync_state=consts.INITIAL_SYNC_STATE_IN_PROGRESS)
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud3',
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED)
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud4',
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
# Perform init actions
ism.init_actions()
# Verify the subclouds are in the correct initial sync state
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_NONE)
subcloud = db_api.subcloud_get(self.ctx, 'subcloud2')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_REQUESTED)
subcloud = db_api.subcloud_get(self.ctx, 'subcloud3')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_REQUESTED)
subcloud = db_api.subcloud_get(self.ctx, 'subcloud4')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_REQUESTED)
def test_initial_sync_subcloud(self):
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
self.assertIsNotNone(subcloud)
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
# Initial sync the subcloud
ism._initial_sync_subcloud(subcloud.region_name)
# Verify that the initial sync steps were done
self.fake_gsm.initial_sync.assert_called_with(self.ctx,
subcloud.region_name)
self.fake_fkm.distribute_keys.assert_called_with(self.ctx,
subcloud.region_name)
self.fake_aam.enable_snmp.assert_called_with(self.ctx,
subcloud.region_name)
# Verify that the subcloud was enabled
self.fake_gsm.enable_subcloud.assert_called_with(self.ctx,
subcloud.region_name)
# Verify the initial sync was completed
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_COMPLETED)
def test_initial_sync_subcloud_not_required(self):
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state='')
self.assertIsNotNone(subcloud)
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
# Initial sync the subcloud
ism._initial_sync_subcloud(subcloud.region_name)
# Verify that the initial sync steps were not done
self.fake_gsm.initial_sync.assert_not_called()
# Verify the initial sync state was not changed
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
self.assertEqual(subcloud.initial_sync_state, '')
def test_initial_sync_subcloud_failed(self):
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
self.assertIsNotNone(subcloud)
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
# Force a failure
self.fake_gsm.initial_sync.side_effect = Exception('fake_exception')
# Initial sync the subcloud
ism._initial_sync_subcloud(subcloud.region_name)
# Verify the initial sync was failed
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_FAILED)
# Verify that the subcloud was not enabled
self.fake_gsm.enable_subcloud.assert_not_called()
# Verify the initial sync was retried
self.mock_eventlet_spawn_after.assert_called_with(
initial_sync_manager.SYNC_FAIL_HOLD_OFF, mock.ANY, 'subcloud1')
def test_reattempt_sync(self):
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud1',
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
subcloud = self.create_subcloud_static(
self.ctx,
name='subcloud2',
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED)
ism = initial_sync_manager.InitialSyncManager(self.fake_gsm,
self.fake_fkm,
self.fake_aam)
# Reattempt sync success
ism._reattempt_sync('subcloud2')
# Verify the subcloud is in the correct initial sync state
subcloud = db_api.subcloud_get(self.ctx, 'subcloud2')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_REQUESTED)
# Reattempt sync when not needed
ism._reattempt_sync('subcloud1')
# Verify the subcloud is in the correct initial sync state
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
self.assertEqual(subcloud.initial_sync_state,
consts.INITIAL_SYNC_STATE_NONE)