distcloud/distributedcloud/dcorch/engine/generic_sync_manager.py

143 lines
5.6 KiB
Python

# Copyright 2017 Ericsson AB.
# Copyright (c) 2020-2024 Wind River Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
from oslo_config import cfg
from oslo_log import log as logging
from dccommon import consts as dccommon_consts
from dcorch.common import consts as dco_consts
from dcorch.common import context
from dcorch.db import api as db_api
from dcorch.engine import scheduler
from dcorch.rpc import client
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
CHECK_AUDIT_INTERVAL = 300 # frequency to check for audit work
class GenericSyncManager(object):
"""Manages tasks related to resource management."""
def __init__(self, *args, **kwargs):
super(GenericSyncManager, self).__init__()
self.context = context.get_admin_context()
self.engine_worker_rpc_client = client.EngineWorkerClient()
# Keeps tracking of greenthreads for dispatching the subclouds to
# engine workers. The thread pool size needs to account for both
# sync_job_thread and sync_audit_thread.
self.thread_group_manager = scheduler.ThreadGroupManager(
thread_pool_size=20)
def sync_job_thread(self):
"""Perform sync request for subclouds as required."""
while True:
try:
self.sync_subclouds()
eventlet.greenthread.sleep(5)
except eventlet.greenlet.GreenletExit:
# We have been told to exit
return
except Exception as e:
LOG.exception(e)
def sync_audit_thread(self):
"""Perform sync request for subclouds as required."""
while True:
try:
self.run_sync_audit()
eventlet.greenthread.sleep(CHECK_AUDIT_INTERVAL)
except eventlet.greenlet.GreenletExit:
# We have been told to exit
return
except Exception as e:
LOG.exception(e)
def _process_subclouds(self, rpc_method):
# get a list of subclouds that is online, managed and initial_sync is
# completed, than check if subcloud_name in self.sync_objs
# When the subcloud is managed, it will be returned in the list in the
# next cycle. When the subcloud is unmanaged, it will not be included
# in the list in the next cycle.
#
LOG.info('Start %s' % rpc_method.__name__)
# get a list of subclouds that are enabled
subclouds = db_api.subcloud_get_all(
self.context,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
initial_sync_state=dco_consts.INITIAL_SYNC_STATE_COMPLETED)
# We want a chunksize of at least 1 so add the number of workers.
chunksize = (len(subclouds) + CONF.worker_workers) // (CONF.worker_workers)
worker_threads = list()
subcloud_capabilities = {}
for sc in subclouds:
subcloud_capabilities.update({sc.region_name: sc.capabilities})
if len(subcloud_capabilities) == chunksize:
# We've gathered a batch of subclouds, send it to engine worker
# to process.
thread = self.thread_group_manager.start(
rpc_method,
self.context,
subcloud_capabilities)
worker_threads.append(thread)
LOG.debug(
"Sent %s request message for subclouds: %s"
% (rpc_method.__name__, list(subcloud_capabilities.keys()))
)
subcloud_capabilities = {}
if len(subcloud_capabilities) > 0:
# We've got a partial batch...send it off for processing.
thread = self.thread_group_manager.start(
rpc_method,
self.context,
subcloud_capabilities)
worker_threads.append(thread)
LOG.debug(
"Sent final %s request message for subclouds: %s"
% (rpc_method.__name__, list(subcloud_capabilities.keys()))
)
else:
LOG.debug("Done sending %s request messages."
% rpc_method.__name__)
# Wait for all workers to complete. This ensures we don't attempt to
# do another round of audit or sync before the previous completes.
LOG.debug('Waiting for %s to complete.' % rpc_method.__name__)
for thread in worker_threads:
thread.wait()
LOG.info('All subclouds have completed for %s.' % rpc_method.__name__)
def sync_subclouds(self):
self._process_subclouds(self.engine_worker_rpc_client.sync_subclouds)
def run_sync_audit(self):
self._process_subclouds(self.engine_worker_rpc_client.run_sync_audit)
def sync_request(self, ctxt, endpoint_type):
# Someone has enqueued a sync job. set the endpoint sync_request to
# requested
db_api.subcloud_sync_update_all(
ctxt, dccommon_consts.MANAGEMENT_MANAGED, endpoint_type,
values={'sync_request': dco_consts.SYNC_STATUS_REQUESTED})