From 7a2b6211ec03967a3adcf444baccbb680638618c Mon Sep 17 00:00:00 2001 From: albailey Date: Wed, 9 Jun 2021 10:04:01 -0500 Subject: [PATCH] Refactor DC upgrade orch thread The orch thread classes all have nearly identical code since the upgrade orch thread predated the generic orch thread class. This change makes the upgrade orch thread a subclass of orch thread. This aligns upgrade orchestration with kube and firmware orchestration. Patch orchestration alignment will be done in a separate change. This helps enforce common logging, error handling, apply, abort, delete. This prevents further diversion of these classes, and having to propagate common fixes across multiple files. Story: 2008943 Task: 42596 Signed-off-by: albailey Change-Id: Ie7f302664127a0560f4616b34edec42a68c22534 --- .../orchestrator/sw_upgrade_orch_thread.py | 513 +++--------------- 1 file changed, 61 insertions(+), 452 deletions(-) diff --git a/distributedcloud/dcmanager/orchestrator/sw_upgrade_orch_thread.py b/distributedcloud/dcmanager/orchestrator/sw_upgrade_orch_thread.py index 7499c8baa..c34e5c926 100644 --- a/distributedcloud/dcmanager/orchestrator/sw_upgrade_orch_thread.py +++ b/distributedcloud/dcmanager/orchestrator/sw_upgrade_orch_thread.py @@ -13,87 +13,59 @@ # See the License for the specific language governing permissions and # limitations under the License. # -# Copyright (c) 2017-2020 Wind River Systems, Inc. +# Copyright (c) 2017-2021 Wind River Systems, Inc. # # The right to copy, distribute, modify, or otherwise make use # of this software may be licensed only pursuant to the terms # of an applicable Wind River license agreement. # -import datetime -import threading -import time - -from keystoneauth1 import exceptions as keystone_exceptions -from oslo_log import log as logging - -from dccommon.drivers.openstack.sdk_platform import OpenStackDriver from dccommon.drivers.openstack import vim from dcmanager.common import consts -from dcmanager.common import context -from dcmanager.common import exceptions -from dcmanager.common import scheduler -from dcmanager.db import api as db_api -from dcmanager.orchestrator.states.upgrade.activating import ActivatingUpgradeState +from dcmanager.orchestrator.orch_thread import OrchThread + +from dcmanager.orchestrator.states.upgrade.activating \ + import ActivatingUpgradeState from dcmanager.orchestrator.states.upgrade.applying_vim_upgrade_strategy \ import ApplyingVIMUpgradeStrategyState -from dcmanager.orchestrator.states.upgrade.completing import CompletingUpgradeState +from dcmanager.orchestrator.states.upgrade.completing \ + import CompletingUpgradeState from dcmanager.orchestrator.states.upgrade.creating_vim_upgrade_strategy \ import CreatingVIMUpgradeStrategyState -from dcmanager.orchestrator.states.upgrade.deleting_load import DeletingLoadState +from dcmanager.orchestrator.states.upgrade.deleting_load \ + import DeletingLoadState from dcmanager.orchestrator.states.upgrade.finishing_patch_strategy \ import FinishingPatchStrategyState -from dcmanager.orchestrator.states.upgrade.importing_load import ImportingLoadState +from dcmanager.orchestrator.states.upgrade.importing_load \ + import ImportingLoadState from dcmanager.orchestrator.states.upgrade.installing_license \ import InstallingLicenseState -from dcmanager.orchestrator.states.upgrade.lock_duplex import LockDuplexState -from dcmanager.orchestrator.states.upgrade.lock_simplex import LockSimplexState +from dcmanager.orchestrator.states.upgrade.lock_duplex \ + import LockDuplexState +from dcmanager.orchestrator.states.upgrade.lock_simplex \ + import LockSimplexState from dcmanager.orchestrator.states.upgrade.migrating_data \ import MigratingDataState -from dcmanager.orchestrator.states.upgrade.pre_check import PreCheckState +from dcmanager.orchestrator.states.upgrade.pre_check \ + import PreCheckState from dcmanager.orchestrator.states.upgrade.starting_upgrade \ import StartingUpgradeState from dcmanager.orchestrator.states.upgrade.swact_to_controller_0 \ import SwactToController0State from dcmanager.orchestrator.states.upgrade.swact_to_controller_1 \ import SwactToController1State -from dcmanager.orchestrator.states.upgrade.unlock_duplex import UnlockDuplexState -from dcmanager.orchestrator.states.upgrade.unlock_simplex import UnlockSimplexState -from dcmanager.orchestrator.states.upgrade.updating_patches import UpdatingPatchesState +from dcmanager.orchestrator.states.upgrade.unlock_duplex \ + import UnlockDuplexState +from dcmanager.orchestrator.states.upgrade.unlock_simplex \ + import UnlockSimplexState +from dcmanager.orchestrator.states.upgrade.updating_patches \ + import UpdatingPatchesState from dcmanager.orchestrator.states.upgrade.upgrading_duplex \ import UpgradingDuplexState from dcmanager.orchestrator.states.upgrade.upgrading_simplex \ import UpgradingSimplexState -LOG = logging.getLogger(__name__) -# every state should have an operator -STATE_OPERATORS = { - consts.STRATEGY_STATE_PRE_CHECK: PreCheckState, - consts.STRATEGY_STATE_INSTALLING_LICENSE: InstallingLicenseState, - consts.STRATEGY_STATE_IMPORTING_LOAD: ImportingLoadState, - consts.STRATEGY_STATE_UPDATING_PATCHES: UpdatingPatchesState, - consts.STRATEGY_STATE_FINISHING_PATCH_STRATEGY: FinishingPatchStrategyState, - consts.STRATEGY_STATE_STARTING_UPGRADE: StartingUpgradeState, - consts.STRATEGY_STATE_LOCKING_CONTROLLER_0: LockSimplexState, - consts.STRATEGY_STATE_LOCKING_CONTROLLER_1: LockDuplexState, - consts.STRATEGY_STATE_UPGRADING_SIMPLEX: UpgradingSimplexState, - consts.STRATEGY_STATE_UPGRADING_DUPLEX: UpgradingDuplexState, - consts.STRATEGY_STATE_MIGRATING_DATA: MigratingDataState, - consts.STRATEGY_STATE_SWACTING_TO_CONTROLLER_0: SwactToController0State, - consts.STRATEGY_STATE_SWACTING_TO_CONTROLLER_1: SwactToController1State, - consts.STRATEGY_STATE_UNLOCKING_CONTROLLER_0: UnlockSimplexState, - consts.STRATEGY_STATE_UNLOCKING_CONTROLLER_1: UnlockDuplexState, - consts.STRATEGY_STATE_ACTIVATING_UPGRADE: ActivatingUpgradeState, - consts.STRATEGY_STATE_COMPLETING_UPGRADE: CompletingUpgradeState, - consts.STRATEGY_STATE_CREATING_VIM_UPGRADE_STRATEGY: - CreatingVIMUpgradeStrategyState, - consts.STRATEGY_STATE_APPLYING_VIM_UPGRADE_STRATEGY: - ApplyingVIMUpgradeStrategyState, - consts.STRATEGY_STATE_DELETING_LOAD: DeletingLoadState, -} - - -class SwUpgradeOrchThread(threading.Thread): +class SwUpgradeOrchThread(OrchThread): """SwUpgrade Orchestration Thread This thread is responsible for executing the upgrade orchestration strategy. @@ -109,406 +81,43 @@ class SwUpgradeOrchThread(threading.Thread): so, it executes the strategy, updating the strategy and steps in the database as it goes, with state and progress information. """ + # every state in sw upgrade orchestration should have an operator + STATE_OPERATORS = { + consts.STRATEGY_STATE_PRE_CHECK: PreCheckState, + consts.STRATEGY_STATE_INSTALLING_LICENSE: InstallingLicenseState, + consts.STRATEGY_STATE_IMPORTING_LOAD: ImportingLoadState, + consts.STRATEGY_STATE_UPDATING_PATCHES: UpdatingPatchesState, + consts.STRATEGY_STATE_FINISHING_PATCH_STRATEGY: + FinishingPatchStrategyState, + consts.STRATEGY_STATE_STARTING_UPGRADE: StartingUpgradeState, + consts.STRATEGY_STATE_LOCKING_CONTROLLER_0: LockSimplexState, + consts.STRATEGY_STATE_LOCKING_CONTROLLER_1: LockDuplexState, + consts.STRATEGY_STATE_UPGRADING_SIMPLEX: UpgradingSimplexState, + consts.STRATEGY_STATE_UPGRADING_DUPLEX: UpgradingDuplexState, + consts.STRATEGY_STATE_MIGRATING_DATA: MigratingDataState, + consts.STRATEGY_STATE_SWACTING_TO_CONTROLLER_0: + SwactToController0State, + consts.STRATEGY_STATE_SWACTING_TO_CONTROLLER_1: + SwactToController1State, + consts.STRATEGY_STATE_UNLOCKING_CONTROLLER_0: UnlockSimplexState, + consts.STRATEGY_STATE_UNLOCKING_CONTROLLER_1: UnlockDuplexState, + consts.STRATEGY_STATE_ACTIVATING_UPGRADE: ActivatingUpgradeState, + consts.STRATEGY_STATE_COMPLETING_UPGRADE: CompletingUpgradeState, + consts.STRATEGY_STATE_CREATING_VIM_UPGRADE_STRATEGY: + CreatingVIMUpgradeStrategyState, + consts.STRATEGY_STATE_APPLYING_VIM_UPGRADE_STRATEGY: + ApplyingVIMUpgradeStrategyState, + consts.STRATEGY_STATE_DELETING_LOAD: DeletingLoadState, + } def __init__(self, strategy_lock, audit_rpc_client): - super(SwUpgradeOrchThread, self).__init__() - self.context = context.get_admin_context() - self._stop = threading.Event() - # Used to protect strategy when an atomic read/update is required. - self.strategy_lock = strategy_lock - # Used to notify dcmanager-audit to trigger a patch audit - self.audit_rpc_client = audit_rpc_client - # Keeps track of greenthreads we create to do work. - self.thread_group_manager = scheduler.ThreadGroupManager( - thread_pool_size=100) - # Track worker created for each subcloud. - self.subcloud_workers = dict() + super(SwUpgradeOrchThread, self).__init__( + strategy_lock, + audit_rpc_client, + consts.SW_UPDATE_TYPE_UPGRADE, # software update strategy type + vim.STRATEGY_NAME_SW_UPGRADE, # strategy type used by vim + consts.STRATEGY_STATE_PRE_CHECK) # starting state - # When an upgrade is initiated, this is the first state - self.starting_state = consts.STRATEGY_STATE_PRE_CHECK - - def stopped(self): - return self._stop.isSet() - - def stop(self): - LOG.info("SwUpgradeOrchThread Stopping") - self._stop.set() - - def run(self): - self.upgrade_orch() - # Stop any greenthreads that are still running - self.thread_group_manager.stop() - LOG.info("SwUpgradeOrchThread Stopped") - - @staticmethod - def get_region_name(strategy_step): - """Get the region name for a strategy step""" - if strategy_step.subcloud_id is None: - # This is the SystemController. - return consts.DEFAULT_REGION_NAME - return strategy_step.subcloud.name - - @staticmethod - def get_ks_client(region_name=consts.DEFAULT_REGION_NAME): - """This will get a cached keystone client (and token)""" - try: - os_client = OpenStackDriver( - region_name=region_name, - region_clients=None) - return os_client.keystone_client - except Exception: - LOG.warn('Failure initializing KeystoneClient') - raise - - def get_vim_client(self, region_name=consts.DEFAULT_REGION_NAME): - ks_client = self.get_ks_client(region_name) - return vim.VimClient(region_name, ks_client.session) - - @staticmethod - def format_update_details(last_state, info): - # include the last state, since the current state is likely 'failed' - details = "%s: %s" % (last_state, info) - # details cannot exceed 255 chars. truncate and add '..' - if len(details) > 255: - details = details[:253] + '..' - return details - - @staticmethod - def determine_state_operator(strategy_step): - """Return the state operator for the current state""" - state_operator = STATE_OPERATORS.get(strategy_step.state) - # instantiate and return the state_operator class - return state_operator(region_name=SwUpgradeOrchThread.get_region_name(strategy_step)) - - def strategy_step_update(self, subcloud_id, state=None, details=None): - """Update the strategy step in the DB - - Sets the start and finished timestamp if necessary, based on state. - """ - started_at = None - finished_at = None - if state == self.starting_state: - started_at = datetime.datetime.now() - elif state in [consts.STRATEGY_STATE_COMPLETE, - consts.STRATEGY_STATE_ABORTED, - consts.STRATEGY_STATE_FAILED]: - finished_at = datetime.datetime.now() - # Return the updated object, in case we need to use its updated values - return db_api.strategy_step_update(self.context, - subcloud_id, - state=state, - details=details, - started_at=started_at, - finished_at=finished_at) - - def upgrade_orch(self): - while not self.stopped(): - try: - LOG.debug('Running upgrade orchestration') - - sw_update_strategy = db_api.sw_update_strategy_get( - self.context, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - - if sw_update_strategy.type == consts.SW_UPDATE_TYPE_UPGRADE: - if sw_update_strategy.state in [ - consts.SW_UPDATE_STATE_APPLYING, - consts.SW_UPDATE_STATE_ABORTING]: - self.apply(sw_update_strategy) - elif sw_update_strategy.state == \ - consts.SW_UPDATE_STATE_ABORT_REQUESTED: - self.abort(sw_update_strategy) - elif sw_update_strategy.state == \ - consts.SW_UPDATE_STATE_DELETING: - self.delete(sw_update_strategy) - - except exceptions.NotFound: - # Nothing to do if a strategy doesn't exist - pass - - except Exception as e: - # We catch all exceptions to avoid terminating the thread. - LOG.exception(e) - - # Wake up every 10 seconds to see if there is work to do. - time.sleep(10) - - LOG.info("SwUpgradeOrchThread ended main loop") - - def apply(self, sw_update_strategy): - """Apply an upgrade strategy""" - - LOG.debug("Applying upgrade strategy") - strategy_steps = db_api.strategy_step_get_all(self.context) - - # Figure out which stage we are working on - current_stage = None - stop_after_stage = None - failure_detected = False - abort_detected = False - for strategy_step in strategy_steps: - if strategy_step.state == consts.STRATEGY_STATE_COMPLETE: - # This step is complete - continue - elif strategy_step.state == consts.STRATEGY_STATE_ABORTED: - # This step was aborted - abort_detected = True - continue - elif strategy_step.state == consts.STRATEGY_STATE_FAILED: - failure_detected = True - # This step has failed and needs no further action - if strategy_step.subcloud_id is None: - # Strategy on SystemController failed. We are done. - LOG.info("Stopping strategy due to failure while " - "processing upgrade step on SystemController") - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_FAILED, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - # Trigger audit to update the sync status for - # each subcloud. - self.audit_rpc_client.trigger_patch_audit(self.context) - return - elif sw_update_strategy.stop_on_failure: - # We have been told to stop on failures - stop_after_stage = strategy_step.stage - current_stage = strategy_step.stage - break - continue - # We have found the first step that isn't complete or failed. - # This is the stage we are working on now. - current_stage = strategy_step.stage - break - else: - # The strategy application is complete - if failure_detected: - LOG.info("Strategy application has failed.") - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_FAILED, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - elif abort_detected: - LOG.info("Strategy application was aborted.") - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_ABORTED, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - else: - LOG.info("Strategy application is complete.") - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_COMPLETE, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - # Trigger audit to update the sync status for each subcloud. - self.audit_rpc_client.trigger_patch_audit(self.context) - return - - if stop_after_stage is not None: - work_remaining = False - # We are going to stop after the steps in this stage have finished. - for strategy_step in strategy_steps: - if strategy_step.stage == stop_after_stage: - if strategy_step.state != consts.STRATEGY_STATE_COMPLETE \ - and strategy_step.state != \ - consts.STRATEGY_STATE_FAILED: - # There is more work to do in this stage - work_remaining = True - break - - if not work_remaining: - # We have completed the stage that failed - LOG.info("Stopping strategy due to failure in stage %d" % - stop_after_stage) - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_FAILED, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - # Trigger audit to update the sync status for each subcloud. - self.audit_rpc_client.trigger_patch_audit(self.context) - return - - LOG.debug("Working on stage %d" % current_stage) - for strategy_step in strategy_steps: - if strategy_step.stage == current_stage: - region = self.get_region_name(strategy_step) - if self.stopped(): - LOG.info("Exiting because task is stopped") - return - if strategy_step.state == \ - consts.STRATEGY_STATE_FAILED: - LOG.debug("Intermediate step is failed") - continue - elif strategy_step.state == \ - consts.STRATEGY_STATE_COMPLETE: - LOG.debug("Intermediate step is complete") - continue - elif strategy_step.state == \ - consts.STRATEGY_STATE_INITIAL: - # Don't start upgrading this subcloud if it has been - # unmanaged by the user. If orchestration was already - # started, it will be allowed to complete. - if strategy_step.subcloud_id is not None and \ - strategy_step.subcloud.management_state == \ - consts.MANAGEMENT_UNMANAGED: - message = ("Subcloud %s is unmanaged." % - strategy_step.subcloud.name) - LOG.warn(message) - self.strategy_step_update( - strategy_step.subcloud_id, - state=consts.STRATEGY_STATE_FAILED, - details=message) - continue - - # We are just getting started, enter the first state - # Use the updated value for calling process_upgrade_step - strategy_step = self.strategy_step_update( - strategy_step.subcloud_id, - state=self.starting_state) - # Starting state should log an error if greenthread exists - self.process_upgrade_step(region, - strategy_step, - log_error=True) - else: - self.process_upgrade_step(region, - strategy_step, - log_error=False) - - def abort(self, sw_update_strategy): - """Abort an upgrade strategy""" - - LOG.info("Aborting upgrade strategy") - - # Mark any steps that have not yet started as aborted, - # so we will not run them later. - strategy_steps = db_api.strategy_step_get_all(self.context) - - for strategy_step in strategy_steps: - if strategy_step.state == consts.STRATEGY_STATE_INITIAL: - LOG.info("Aborting step for subcloud %s" % - self.get_region_name(strategy_step)) - self.strategy_step_update( - strategy_step.subcloud_id, - state=consts.STRATEGY_STATE_ABORTED, - details="") - - with self.strategy_lock: - db_api.sw_update_strategy_update( - self.context, - state=consts.SW_UPDATE_STATE_ABORTING, - update_type=consts.SW_UPDATE_TYPE_UPGRADE) - - def delete(self, sw_update_strategy): - """Delete an upgrade strategy""" - - LOG.info("Deleting upgrade strategy") - - strategy_steps = db_api.strategy_step_get_all(self.context) - - for strategy_step in strategy_steps: - self.delete_subcloud_strategy(strategy_step) - - if self.stopped(): - LOG.info("Exiting because task is stopped") - return - - # Remove the strategy from the database - try: - db_api.strategy_step_destroy_all(self.context) - db_api.sw_update_strategy_destroy(self.context) - except Exception as e: - LOG.exception(e) - raise e - - # todo(abailey): refactor delete to reuse patch orch code - def delete_subcloud_strategy(self, strategy_step): - """Delete the vim strategy in this subcloud""" - - strategy_name = vim.STRATEGY_NAME_FW_UPDATE - region = self.get_region_name(strategy_step) - - LOG.info("Deleting vim strategy %s for %s" % (strategy_name, region)) - - # First check if the strategy has been created. - try: - subcloud_strategy = self.get_vim_client(region).get_strategy( - strategy_name=strategy_name) - except (keystone_exceptions.EndpointNotFound, IndexError): - message = ("Endpoint for subcloud: %s not found." % - region) - LOG.error(message) - self.strategy_step_update( - strategy_step.subcloud_id, - state=consts.STRATEGY_STATE_FAILED, - details=message) - return - except Exception: - # Strategy doesn't exist so there is nothing to do - return - - if subcloud_strategy.state in [vim.STATE_BUILDING, - vim.STATE_APPLYING, - vim.STATE_ABORTING]: - # Can't delete a strategy in these states - message = ("Strategy for %s in wrong state (%s)for delete" % - (region, subcloud_strategy.state)) - LOG.warn(message) - raise Exception(message) - - # If we are here, we need to delete the strategy - try: - self.get_vim_client(region).delete_strategy( - strategy_name=strategy_name) - except Exception: - message = "Strategy delete failed for %s" % region - LOG.warn(message) - raise - - def process_upgrade_step(self, region, strategy_step, log_error=False): - """manage the green thread for calling perform_state_action""" - if region in self.subcloud_workers: - # A worker already exists. Let it finish whatever it was doing. - if log_error: - LOG.error("Worker should not exist for %s." % region) - else: - LOG.debug("Update worker exists for %s." % region) - else: - # Create a greenthread to start processing the upgrade for the - # subcloud and invoke the specified upgrade_thread_method - self.subcloud_workers[region] = \ - self.thread_group_manager.start(self.perform_state_action, - strategy_step) - - def perform_state_action(self, strategy_step): - """Extensible state handler for processing and transitioning states """ - try: - LOG.info("Stage: %s, State: %s, Subcloud: %s" - % (strategy_step.stage, - strategy_step.state, - self.get_region_name(strategy_step))) - # Instantiate the state operator and perform the state actions - state_operator = self.determine_state_operator(strategy_step) - state_operator.registerStopEvent(self._stop) - next_state = state_operator.perform_state_action(strategy_step) - # If we get here without an exception raised, proceed to next state - self.strategy_step_update(strategy_step.subcloud_id, - state=next_state) - except Exception as e: - # Catch ALL exceptions and set the strategy to failed - LOG.exception("Failed! Stage: %s, State: %s, Subcloud: %s" - % (strategy_step.stage, - strategy_step.state, - self.get_region_name(strategy_step))) - details = self.format_update_details(strategy_step.state, str(e)) - self.strategy_step_update(strategy_step.subcloud_id, - state=consts.STRATEGY_STATE_FAILED, - details=details) - finally: - # The worker is done. - region = self.get_region_name(strategy_step) - if region in self.subcloud_workers: - del self.subcloud_workers[region] + def trigger_audit(self): + """Trigger an audit for upgrade (which is combined with patch audit)""" + self.audit_rpc_client.trigger_patch_audit(self.context)