From b700e3f4a1ccd906c664773a611b586ce955896c Mon Sep 17 00:00:00 2001 From: Bin Qian Date: Wed, 28 Feb 2024 17:03:07 +0000 Subject: [PATCH] Create 2nd thread to handle USM REST API requests This change is to create 2nd thread to provide concurrent service. In a different commit [1], the haproxy is to be configured to distribute the slow requests to the 2nd thread, and the fast requests to the primiary thread. TCs: passed: concurrent keystone requests of "software upload/ deploy precheck/deploy start" and "software list/deploy show/ deploy host-list" passed: keystone authenticated "software deploy precheck" request completed. Story: 2010676 Task: 49647 [1] https://review.opendev.org/c/starlingx/stx-puppet/+/910644 Change-Id: I0e8e8ac1b5177f1bbf40e047335c075b0a471fc1 Signed-off-by: Bin Qian --- software/software/authapi/__init__.py | 7 ++- software/software/authapi/config.py | 8 +-- software/software/db/api.py | 69 ++++++++++++++++++++---- software/software/software_controller.py | 25 +++++---- 4 files changed, 82 insertions(+), 27 deletions(-) diff --git a/software/software/authapi/__init__.py b/software/software/authapi/__init__.py index b80ec7b2..e0b2f9b7 100755 --- a/software/software/authapi/__init__.py +++ b/software/software/authapi/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 Wind River Systems, Inc. +# Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -11,7 +11,10 @@ API_SERVICE_OPTS = [ help='IP for the authenticated Unified Software Management API server to bind to'), cfg.IntOpt('auth_api_port', default=5497, - help='The port for the authenticated Unified Software Management API server'), + help='The port for the authenticated Unified Software Management API server for GET operations'), + cfg.IntOpt('auth_api_alt_port', + default=5499, + help='The port for the authenticated Unified Software Management API server for update and slow operations'), cfg.IntOpt('api_limit_max', default=1000, help='the maximum number of items returned in a single ' diff --git a/software/software/authapi/config.py b/software/software/authapi/config.py index 0126b827..6d0f6fd5 100755 --- a/software/software/authapi/config.py +++ b/software/software/authapi/config.py @@ -1,16 +1,10 @@ """ -Copyright (c) 2023 Wind River Systems, Inc. +Copyright (c) 2023-2024 Wind River Systems, Inc. SPDX-License-Identifier: Apache-2.0 """ -# Server Specific Configurations -server = { - 'port': '5497', - 'host': '0.0.0.0' -} - # Pecan Application Configurations app = { 'root': 'software.api.controllers.root.RootController', diff --git a/software/software/db/api.py b/software/software/db/api.py index 42832595..253d0d46 100644 --- a/software/software/db/api.py +++ b/software/software/db/api.py @@ -1,14 +1,18 @@ """ -Copyright (c) 2023 Wind River Systems, Inc. +Copyright (c) 2023-2024 Wind River Systems, Inc. SPDX-License-Identifier: Apache-2.0 """ +import logging +import threading from software.software_entities import DeployHandler from software.software_entities import DeployHostHandler from software.constants import DEPLOY_STATES +LOG = logging.getLogger('main_logger') + def get_instance(): """Return a Software API instance.""" @@ -17,6 +21,7 @@ def get_instance(): class SoftwareAPI: _instance = None + _lock = threading.RLock() def __new__(cls): if cls._instance is None: @@ -28,28 +33,74 @@ class SoftwareAPI: self.deploy_host_handler = DeployHostHandler() def create_deploy(self, from_release, to_release, reboot_required: bool): + self.begin_update() self.deploy_handler.create(from_release, to_release, reboot_required) + self.end_update() def get_deploy(self): - return self.deploy_handler.query() + self.begin_update() + try: + return self.deploy_handler.query() + finally: + self.end_update() def update_deploy(self, state: DEPLOY_STATES): - self.deploy_handler.update(state) + self.begin_update() + try: + self.deploy_handler.update(state) + finally: + self.end_update() def delete_deploy(self): - self.deploy_handler.delete() + self.begin_update() + try: + self.deploy_handler.delete() + finally: + self.end_update() def create_deploy_host(self, hostname): - self.deploy_host_handler.create(hostname) + self.begin_update() + try: + self.deploy_host_handler.create(hostname) + finally: + self.end_update() def get_deploy_host(self): - return self.deploy_host_handler.query_all() + self.begin_update() + try: + return self.deploy_host_handler.query_all() + finally: + self.end_update() def update_deploy_host(self, hostname, state): - return self.deploy_host_handler.update(hostname, state) + self.begin_update() + try: + return self.deploy_host_handler.update(hostname, state) + finally: + self.end_update() def delete_deploy_host(self, hostname): - self.deploy_host_handler.delete(hostname) + self.begin_update() + try: + self.deploy_host_handler.delete(hostname) + finally: + self.end_update() def delete_deploy_host_all(self): - self.deploy_host_handler.delete_all() + self.begin_update() + try: + self.deploy_host_handler.delete_all() + finally: + self.end_update() + + def begin_update(self): + tid = threading.get_native_id() + msg = f"{tid} is to acquire lock." + LOG.info(msg) + SoftwareAPI._lock.acquire() + + def end_update(self): + SoftwareAPI._lock.release() + tid = threading.get_native_id() + msg = f"{tid} released lock." + LOG.info(msg) diff --git a/software/software/software_controller.py b/software/software/software_controller.py index 54753ffb..acf89cbf 100644 --- a/software/software/software_controller.py +++ b/software/software/software_controller.py @@ -2267,10 +2267,15 @@ class PatchController(PatchService): if self._deploy_upgrade_start(to_release): collect_current_load_for_hosts() - self.update_and_sync_deploy_state(self.db_api_instance.create_deploy, - SW_VERSION, to_release, True) - self.update_and_sync_deploy_state(self.db_api_instance.update_deploy, - DEPLOY_STATES.START) + self.db_api_instance.begin_update() + try: + self.update_and_sync_deploy_state(self.db_api_instance.create_deploy, + SW_VERSION, to_release, True) + self.update_and_sync_deploy_state(self.db_api_instance.update_deploy, + DEPLOY_STATES.START) + finally: + self.db_api_instance.end_update() + sw_rel = self.release_collection.get_release_by_id(deployment) if sw_rel is None: raise InternalError("%s cannot be found" % to_release) @@ -2931,7 +2936,6 @@ class PatchController(PatchService): return query_hosts return deploy_host_list - def update_and_sync_deploy_state(self, func, *args, **kwargs): """ :param func: SoftwareApi method @@ -3001,14 +3005,14 @@ class PatchControllerApiThread(threading.Thread): class PatchControllerAuthApiThread(threading.Thread): - def __init__(self): + def __init__(self, port): threading.Thread.__init__(self) # LOG.info ("Initializing Authenticated API thread") self.wsgi = None + self.port = port def run(self): host = CONF.auth_api_bind_ip - port = CONF.auth_api_port if host is None: host = utils.get_versioned_address_all() try: @@ -3027,7 +3031,7 @@ class PatchControllerAuthApiThread(threading.Thread): server_class.address_family = utils.get_management_family() self.wsgi = simple_server.make_server( - host, port, + host, self.port, auth_app.VersionSelectorApplication(), server_class=server_class) @@ -3286,11 +3290,13 @@ def main(): LOG.info("launching") api_thread = PatchControllerApiThread() - auth_api_thread = PatchControllerAuthApiThread() + auth_api_thread = PatchControllerAuthApiThread(CONF.auth_api_port) + auth_api_alt_thread = PatchControllerAuthApiThread(CONF.auth_api_alt_port) main_thread = PatchControllerMainThread() api_thread.start() auth_api_thread.start() + auth_api_alt_thread.start() main_thread.start() thread_death.wait() @@ -3299,4 +3305,5 @@ def main(): api_thread.join() auth_api_thread.join() + auth_api_alt_thread.join() main_thread.join()