Implement optimized OpenStackDriver

This commit implements an optimized OpenStackDriver that builds the
endpoints for subclouds directly using their management IPs instead of
retrieving them from the keystone database. Subcloud endpoints will be
removed from Keystone due to performance reasons in a future commit.

- The driver now accepts a fetch_subcloud_ips function as an argument.
- This function retrieves a dictionary of subcloud region names to
  their management IPs (without a region argument) or a specific
  subcloud's management IP (with a region argument).
- Dcmanager services and dcorch should implement their own
  fetch_subcloud_ips function to provide the driver with subcloud
  IP information.

This approach improves performance and prepares for the removal of
subcloud endpoints from Keystone.

NOTE: The original OpenStackDriver, KeystoneClient and EndpointCache
will be removed in a future commit, after the DC services are updated
to use the new optimized OpenStackDriver. The optimized one will be
integrated with the DC services in separate commits.

Test Plan:
Remove the subcloud endpoints from the keystone DB, modify the
dcmanager-audit service to use the new classes and then run the
following tests:
1. PASS - Verify that audit is able to get both the RegionOne and
          subclouds endpoints without issues using the new driver.
2. PASS - Verify that the hourly token refresh only triggers the
          refresh of central region token and endpoints.
3. PASS - Verify that when adding a new subcloud, the endpoint cache
          is updated to include the endpoints for the new subcloud.

Story: 2011106
Task: 50035

Change-Id: I146592eb17f6a5433eae25f20e8de2f01c813055
Signed-off-by: Gustavo Herzmann <gustavo.herzmann@windriver.com>
This commit is contained in:
Gustavo Herzmann 2024-05-06 09:38:14 -03:00
parent 7a61ae8b5d
commit ffc92c1d10
4 changed files with 918 additions and 4 deletions

View File

@ -1,5 +1,5 @@
# Copyright 2012-2013 OpenStack Foundation
# Copyright (c) 2017-2021 Wind River Systems, Inc.
# Copyright (c) 2017-2021, 2024 Wind River Systems, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -20,6 +20,7 @@ from oslo_utils import importutils
from dccommon import consts
from dccommon.drivers import base
from dccommon.endpoint_cache import EndpointCache
from dccommon.endpoint_cache import OptimizedEndpointCache
from dccommon import exceptions
# Ensure keystonemiddleware options are imported
@ -139,3 +140,109 @@ class KeystoneClient(base.DriverBase):
self.keystone_client.regions.delete(region_name)
except keystone_exceptions.NotFound:
pass
class OptimizedKeystoneClient(base.DriverBase):
"""Keystone V3 Driver.
:param region_name: The name of the region.
:type region_name: str
:param auth_url: The authentication URL.
:type auth_url: str
:param fetch_subcloud_ips: A function to fetch subcloud IPs.
:type fetch_subcloud_ips: Callable
"""
def __init__(self, region_name=None, auth_url=None, fetch_subcloud_ips=None):
self.endpoint_cache = OptimizedEndpointCache(
region_name, auth_url, fetch_subcloud_ips
)
self.session = self.endpoint_cache.admin_session
self.keystone_client = self.endpoint_cache.keystone_client
if region_name in [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD]:
self.services_list = OptimizedEndpointCache.master_services_list
else:
self.services_list = self.keystone_client.services.list()
def get_enabled_projects(self, id_only=True):
project_list = self.keystone_client.projects.list()
if id_only:
return [
current_project.id
for current_project in project_list
if current_project.enabled
]
else:
return [
current_project
for current_project in project_list
if current_project.enabled
]
def get_project_by_id(self, projectid):
if not projectid:
return None
return self.keystone_client.projects.get(projectid)
def get_project_by_name(self, projectname):
if not projectname:
return None
project_list = self.get_enabled_projects(id_only=False)
for project in project_list:
if project.name == projectname:
return project
def get_enabled_users(self, id_only=True):
user_list = self.keystone_client.users.list()
if id_only:
return [
current_user.id for current_user in user_list if current_user.enabled
]
else:
return [
current_user for current_user in user_list if current_user.enabled
]
def get_user_by_id(self, userid):
if not userid:
return None
return self.keystone_client.users.get(userid)
def get_user_by_name(self, username):
if not username:
return None
user_list = self.get_enabled_users(id_only=False)
for user in user_list:
if user.name == username:
return user
def is_service_enabled(self, service):
for current_service in self.services_list:
if service in current_service.type:
return True
return False
# Returns list of regions if endpoint filter is applied for the project
def get_filtered_region(self, project_id):
try:
region_list = []
endpoint_manager = endpoint_filter.EndpointFilterManager(
self.keystone_client
)
endpoint_lists = endpoint_manager.list_endpoints_for_project(project_id)
for endpoint in endpoint_lists:
region_list.append(endpoint.region)
return region_list
except keystone_exceptions.NotFound:
raise exceptions.ProjectNotFound(project_id=project_id)
def delete_endpoints(self, region_name):
endpoints = self.keystone_client.endpoints.list(region=region_name)
for endpoint in endpoints:
self.keystone_client.endpoints.delete(endpoint)
def delete_region(self, region_name):
try:
self.keystone_client.regions.delete(region_name)
except keystone_exceptions.NotFound:
pass

View File

@ -16,6 +16,9 @@
OpenStack Driver
"""
import collections
from typing import Callable
from typing import List
from keystoneauth1 import exceptions as keystone_exceptions
from oslo_concurrency import lockutils
from oslo_log import log
@ -24,6 +27,7 @@ from dccommon import consts
from dccommon.drivers.openstack.barbican import BarbicanClient
from dccommon.drivers.openstack.fm import FmClient
from dccommon.drivers.openstack.keystone_v3 import KeystoneClient
from dccommon.drivers.openstack.keystone_v3 import OptimizedKeystoneClient
from dccommon.drivers.openstack.sysinv_v1 import SysinvClient
from dccommon import exceptions
from dccommon.utils import is_token_expiring_soon
@ -40,12 +44,12 @@ LOG = log.getLogger(__name__)
LOCK_NAME = 'dc-openstackdriver-platform'
SUPPORTED_REGION_CLIENTS = [
SUPPORTED_REGION_CLIENTS = (
SYSINV_CLIENT_NAME,
FM_CLIENT_NAME,
BARBICAN_CLIENT_NAME,
DBSYNC_CLIENT_NAME,
]
)
# Region client type and class mappings
region_client_class_map = {
@ -246,3 +250,321 @@ class OpenStackDriver(object):
return False
else:
return True
class OptimizedOpenStackDriver(object):
"""An OpenStack driver for managing external services clients.
:param region_name: The name of the region. Defaults to "RegionOne".
:type region_name: str
:param thread_name: The name of the thread. Defaults to "dcorch".
:type thread_name: str
:param auth_url: The authentication URL.
:type auth_url: str
:param region_clients: The list of region clients to initialize.
:type region_clients: list
:param endpoint_type: The type of endpoint. Defaults to "admin".
:type endpoint_type: str
:param fetch_subcloud_ips: A function to fetch subcloud management IPs.
:type fetch_subcloud_ips: Callable
"""
os_clients_dict = collections.defaultdict(dict)
_identity_tokens = {}
def __init__(
self,
region_name: str = consts.CLOUD_0,
thread_name: str = "dc",
auth_url: str = None,
region_clients: List[str] = SUPPORTED_REGION_CLIENTS,
endpoint_type: str = consts.KS_ENDPOINT_DEFAULT,
fetch_subcloud_ips: Callable = None,
):
self.region_name = region_name
self.keystone_client = None
# These clients are created dynamically by initialize_region_clients
self.sysinv_client = None
self.fm_client = None
self.barbican_client = None
self.dbsync_client = None
self.get_cached_keystone_client(region_name, auth_url, fetch_subcloud_ips)
if self.keystone_client is None:
self.initialize_keystone_client(auth_url, fetch_subcloud_ips)
OptimizedOpenStackDriver.update_region_clients_cache(
region_name, KEYSTONE_CLIENT_NAME, self.keystone_client
)
# Clear client object cache
if region_name != consts.CLOUD_0:
OptimizedOpenStackDriver.os_clients_dict[region_name] = (
collections.defaultdict(dict)
)
if region_clients:
self.initialize_region_clients(
region_clients, thread_name, endpoint_type
)
def initialize_region_clients(
self, region_clients: List[str], thread_name: str, endpoint_type: str
) -> None:
"""Initialize region clients dynamically setting them as attributes
:param region_clients: The list of region clients to initialize.
:type region_clients: list
:param thread_name: The name of the thread.
:type thread_name: str
:param endpoint_type: The type of endpoint.
:type endpoint_type: str
"""
self.get_cached_region_clients_for_thread(
self.region_name, thread_name, region_clients
)
for client_name in region_clients:
client_obj_name = f"{client_name}_client"
# If the clien object already exists, do nothing
if getattr(self, client_obj_name) is not None:
continue
# Create new client object and cache it
try:
try:
client_class = region_client_class_map[client_name]
except KeyError as e:
msg = f"Requested region client is not supported: {client_name}"
LOG.error(msg)
raise exceptions.InvalidInputError from e
args = {
"region": self.region_name,
"session": self.keystone_client.session,
"endpoint_type": endpoint_type,
}
# Since SysinvClient (cgtsclient) does not support session,
# also pass the cached endpoint so it does not need to
# retrieve it from keystone.
if client_name == "sysinv":
args["endpoint"] = (
self.keystone_client.endpoint_cache.get_endpoint("sysinv")
)
client_object = client_class(**args)
# Store the new client
setattr(self, client_obj_name, client_object)
OptimizedOpenStackDriver.update_region_clients_cache(
self.region_name, client_name, client_object, thread_name
)
except Exception as exception:
LOG.error(
f"Region {self.region_name} client {client_name} "
f"thread {thread_name} error: {str(exception)}"
)
raise exception
def initialize_keystone_client(
self, auth_url: str, fetch_subcloud_ips: Callable
) -> None:
"""Initialize a new Keystone client.
:param auth_url: The authentication URL.
:type auth_url: str
:param fetch_subcloud_ips: A function to fetch subcloud management IPs.
:type fetch_subcloud_ips: Callable
"""
LOG.debug(f"get new keystone client for region {self.region_name}")
try:
self.keystone_client = OptimizedKeystoneClient(
self.region_name, auth_url, fetch_subcloud_ips
)
except (
keystone_exceptions.ConnectFailure,
keystone_exceptions.ConnectTimeout,
keystone_exceptions.NotFound,
keystone_exceptions.ServiceUnavailable,
keystone_exceptions.ConnectFailure,
) as exception:
LOG.error(
f"keystone_client region {self.region_name} error: {str(exception)}"
)
raise exception
except Exception as exception:
LOG.exception(
f"Unable to get a new keystone client for region: {self.region_name}"
)
raise exception
@lockutils.synchronized(LOCK_NAME)
def get_cached_keystone_client(
self, region_name: str, auth_url: str, fetch_subcloud_ips: Callable
) -> None:
"""Get the cached Keystone client if it exists
:param region_name: The name of the region.
:type region_name: str
:param auth_url: The authentication URL.
:type auth_url: str
:param fetch_subcloud_ips: A function to fetch subcloud management IPs.
:type fetch_subcloud_ips: Callable
"""
os_clients_dict = OptimizedOpenStackDriver.os_clients_dict
keystone_client = os_clients_dict.get(region_name, {}).get(
KEYSTONE_CLIENT_NAME
)
# If there's a cached keystone client and the token is valid, use it
if keystone_client and self._is_token_valid(region_name):
self.keystone_client = keystone_client
# Else if master region, create a new keystone client
elif region_name in (consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD):
self.initialize_keystone_client(auth_url, fetch_subcloud_ips)
os_clients_dict[region_name][KEYSTONE_CLIENT_NAME] = self.keystone_client
@lockutils.synchronized(LOCK_NAME)
def get_cached_region_clients_for_thread(
self, region_name: str, thread_name: str, clients: List[str]
) -> None:
"""Get and assign the cached region clients as object attributes.
Also initializes the os_clients_dict region and
thread dictionaries if they don't already exist.
:param region_name: The name of the region.
:type region_name: str
:param thread_name: The name of the thread.
:type thread_name: str
:param clients: The list of client names.
:type clients: list
"""
os_clients = OpenStackDriver.os_clients_dict
for client in clients:
client_obj = (
os_clients.setdefault(region_name, {})
.setdefault(thread_name, {})
.get(client)
)
if client_obj is not None:
LOG.debug(
"Using cached OS {client} client objects "
f"{region_name} {thread_name}"
)
setattr(self, f"{client}_client", client_obj)
@classmethod
@lockutils.synchronized(LOCK_NAME)
def update_region_clients_cache(
cls,
region_name: str,
client_name: str,
client_object: object,
thread_name: str = None,
) -> None:
"""Update the region clients cache.
:param region_name: The name of the region.
:type region_name: str
:param client_name: The name of the client.
:type client_name: str
:param client_object: The client object.
:param thread_name: The name of the thread. Defaults to None.
:type thread_name: str
"""
region_dict = cls.os_clients_dict[region_name]
if thread_name is None:
region_dict[client_name] = client_object
else:
region_dict[thread_name][client_name] = client_object
@classmethod
@lockutils.synchronized(LOCK_NAME)
def delete_region_clients(
cls, region_name: str, clear_token: bool = False
) -> None:
"""Delete region clients from cache.
:param region_name: The name of the region.
:type region_name: str
:param clear_token: Whether to clear the token cache. Defaults to False.
:type clear_token: bool
"""
LOG.warn(f"delete_region_clients={region_name}, clear_token={clear_token}")
try:
del cls.os_clients_dict[region_name]
except KeyError:
pass
if clear_token:
cls._identity_tokens[region_name] = None
@classmethod
@lockutils.synchronized(LOCK_NAME)
def delete_region_clients_for_thread(
cls, region_name: str, thread_name: str
) -> None:
"""Delete region clients for a specific thread from cache.
:param region_name: The name of the region.
:type region_name: str
:param thread_name: The name of the thread.
:type thread_name: str
"""
LOG.debug(f"delete_region_clients={region_name}, thread_name={thread_name}")
try:
del cls.os_clients_dict[region_name][thread_name]
except KeyError:
pass
@staticmethod
def _reset_cached_clients_and_token(region_name: str) -> None:
OptimizedOpenStackDriver.os_clients_dict[region_name] = (
collections.defaultdict(dict)
)
OptimizedOpenStackDriver._identity_tokens[region_name] = None
def _is_token_valid(self, region_name: str) -> bool:
"""Check if the cached token is valid.
:param region_name: The name of the region.
:type region_name: str
"""
cached_os_clients = OptimizedOpenStackDriver.os_clients_dict
# If the token is not cached, validate the session token and cache it
try:
keystone = cached_os_clients[region_name]["keystone"].keystone_client
cached_tokens = OptimizedOpenStackDriver._identity_tokens
if not cached_tokens.get(region_name):
cached_tokens[region_name] = keystone.tokens.validate(
keystone.session.get_token(), include_catalog=False
)
LOG.info(
f"Token for subcloud {region_name} expires_at="
f"{cached_tokens[region_name]['expires_at']}"
)
except Exception as exception:
LOG.info(
f"_is_token_valid handle: region: {region_name} "
f"error: {str(exception)}"
)
self._reset_cached_clients_and_token(region_name)
return False
# If token is expiring soon, reset cached data and return False.
if is_token_expiring_soon(token=cached_tokens[region_name]):
LOG.info(
f"The cached keystone token for subcloud {region_name} will "
f"expire soon {cached_tokens[region_name]['expires_at']}"
)
# Reset the cached dictionary
self._reset_cached_clients_and_token(region_name)
return False
return True

View File

@ -17,9 +17,14 @@
import collections
import threading
from typing import Callable
from typing import List
from typing import Tuple
from typing import Union
from keystoneauth1 import loading
from keystoneauth1 import session
import netaddr
from keystoneclient.v3 import client as ks_client
@ -30,11 +35,21 @@ from oslo_log import log as logging
from dccommon import consts
from dccommon.utils import is_token_expiring_soon
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
LOCK_NAME = 'dc-keystone-endpoint-cache'
LOCK_NAME = "dc-keystone-endpoint-cache"
ENDPOINT_URLS = {
"fm": "https://{}:18003",
"keystone": "https://{}:5001/v3",
"patching": "https://{}:5492",
"sysinv": "https://{}:6386/v1",
"usm": "https://{}:5498",
"vim": "https://{}:4546",
}
class EndpointCache(object):
@ -276,3 +291,452 @@ class EndpointCache(object):
EndpointCache.master_keystone_client.services.list())
EndpointCache.master_service_endpoint_map = (
self._generate_master_service_endpoint_map(self))
def build_subcloud_endpoint_map(ip):
"""Builds a mapping of service endpoints for a given IP address.
:param ip: The IP address for which service endpoints need to be mapped.
:type ip: str
:return: A dictionary containing service names as keys and formatted
endpoint URLs as values.
:rtype: dict
"""
endpoint_map = {}
for service, endpoint in ENDPOINT_URLS.items():
if netaddr.IPAddress(ip).version == 6:
ip = f"[{ip}]"
endpoint_map[service] = endpoint.format(ip)
return endpoint_map
def build_subcloud_endpoints(subcloud_mgmt_ips: dict) -> dict:
"""Builds a dictionary of service endpoints for multiple subcloud management IPs.
:param subcloud_mgmt_ips: A dictionary containing subcloud regions as keys
and the corresponding management IP as value.
:type subcloud_mgmt_ips: dict
:return: A dictionary with subcloud regions as keys and their respective
service endpoints as values.
:rtype: dict
"""
subcloud_endpoints = {}
for region, ip in subcloud_mgmt_ips.items():
subcloud_endpoints[region] = build_subcloud_endpoint_map(ip)
return subcloud_endpoints
class OptimizedEndpointCache(object):
"""Cache for storing endpoint information.
:param region_name: The name of the region.
:type region_name: str
:param auth_url: The authentication URL.
:type auth_url: str
:param fetch_subcloud_ips: A function to fetch subcloud IPs. It should
accept the region_name as an optional argument. If it's called without
the region_name, it should return a dictionary where the key is the
region_name and the value is the subclouds management IP. If it's called
with the region_name, it should return the management IP of the
specified region.
:type fetch_subcloud_ips: Callable[[str], Union[str, dict]]
"""
plugin_loader = None
plugin_lock = threading.Lock()
master_keystone_client = None
master_token = {}
master_services_list = None
master_service_endpoint_map = collections.defaultdict(dict)
subcloud_endpoints: dict = None
fetch_subcloud_ips: Callable[[str], Union[str, dict]] = None
def __init__(
self,
region_name: str = None,
auth_url: str = None,
fetch_subcloud_ips: Callable[[str], Union[str, dict]] = None,
):
# Region specific service endpoint map
self.service_endpoint_map = collections.defaultdict(dict)
self.admin_session = None
self.keystone_client = None
# Cache the fetch_subcloud_ips function
if fetch_subcloud_ips:
OptimizedEndpointCache.fetch_subcloud_ips = fetch_subcloud_ips
self._initialize_subcloud_endpoints()
# if auth_url is provided use that otherwise use the one
# defined in the config
if auth_url:
self.external_auth_url = auth_url
else:
self.external_auth_url = CONF.endpoint_cache.auth_uri
self._initialize_keystone_client(region_name, auth_url)
@lockutils.synchronized("subcloud_endpoints")
def _initialize_subcloud_endpoints(self):
# Initialize and cache the subcloud endpoints
if (
OptimizedEndpointCache.subcloud_endpoints is None
and OptimizedEndpointCache.fetch_subcloud_ips
):
LOG.info("Initializing and caching subcloud endpoints")
# pylint: disable=not-callable
OptimizedEndpointCache.subcloud_endpoints = build_subcloud_endpoints(
OptimizedEndpointCache.fetch_subcloud_ips()
)
def _initialize_keystone_client(
self, region_name: str = None, auth_url: str = None
) -> None:
"""Initialize the Keystone client.
:param region_name: The name of the region.
:type region_name: str
:param auth_url: The authentication URL.
:type auth_url: str
"""
self.admin_session = OptimizedEndpointCache.get_admin_session(
self.external_auth_url,
CONF.endpoint_cache.username,
CONF.endpoint_cache.user_domain_name,
CONF.endpoint_cache.password,
CONF.endpoint_cache.project_name,
CONF.endpoint_cache.project_domain_name,
)
self.keystone_client, self.service_endpoint_map = (
self.get_cached_master_keystone_client_and_region_endpoint_map(
region_name
)
)
# If endpoint cache is intended for a subcloud then we need to
# retrieve the subcloud token and session. Skip this if auth_url
# was provided as its assumed that the auth_url would correspond
# to a subcloud so session was set up above
if (
not auth_url
and region_name
and region_name not in [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD]
):
try:
sc_auth_url = self.service_endpoint_map["keystone"]
except KeyError:
# Should not be here...
LOG.exception(
f"Endpoint not found for {region_name=}."
"Refreshing cached data..."
)
self.re_initialize_master_keystone_client()
raise
# We assume that the dcmanager user names and passwords are the
# same on this subcloud since this is an audited resource
self.admin_session = OptimizedEndpointCache.get_admin_session(
sc_auth_url,
CONF.endpoint_cache.username,
CONF.endpoint_cache.user_domain_name,
CONF.endpoint_cache.password,
CONF.endpoint_cache.project_name,
CONF.endpoint_cache.project_domain_name,
)
try:
self.keystone_client = ks_client.Client(
session=self.admin_session, region_name=region_name
)
except Exception:
LOG.error(f"Retrying keystone client creation for {region_name}")
self.keystone_client = ks_client.Client(
session=self.admin_session, region_name=region_name
)
self.external_auth_url = sc_auth_url
@classmethod
def get_admin_session(
cls,
auth_url: str,
user_name: str,
user_domain_name: str,
user_password: str,
user_project: str,
user_project_domain: str,
timeout: float = None,
) -> None:
"""Get the admin session.
:param auth_url: The authentication URL.
:type auth_url: str
:param user_name: The user name.
:type user_name: str
:param user_domain_name: The user domain name.
:type user_domain_name: str
:param user_password: The user password.
:type user_password: str
:param user_project: The user project.
:type user_project: str
:param user_project_domain: The user project domain.
:type user_project_domain: str
:param timeout: The timeout.
:type timeout: int
:return: The admin session.
:rtype: session.Session
"""
with OptimizedEndpointCache.plugin_lock:
if OptimizedEndpointCache.plugin_loader is None:
OptimizedEndpointCache.plugin_loader = loading.get_plugin_loader(
CONF.endpoint_cache.auth_plugin
)
user_auth = OptimizedEndpointCache.plugin_loader.load_from_options(
auth_url=auth_url,
username=user_name,
user_domain_name=user_domain_name,
password=user_password,
project_name=user_project,
project_domain_name=user_project_domain,
)
timeout = (
CONF.endpoint_cache.http_connect_timeout if timeout is None else timeout
)
return session.Session(
auth=user_auth, additional_headers=consts.USER_HEADER, timeout=timeout
)
@staticmethod
def _is_central_cloud(region_name: str) -> None:
"""Check if the region is a central cloud.
:param region_id: The region ID.
:type region_id: str
:return: True if the region is a central cloud, False otherwise.
:rtype: bool
"""
central_cloud_regions = [consts.CLOUD_0, consts.VIRTUAL_MASTER_CLOUD]
return region_name in central_cloud_regions
@staticmethod
def _get_master_endpoint_map() -> dict:
service_id_name_map = {}
# pylint: disable-next=not-an-iterable
for service in OptimizedEndpointCache.master_services_list:
service_id_name_map[service.id] = service.name
service_endpoint_map = collections.defaultdict(dict)
for (
endpoint
) in OptimizedEndpointCache.master_keystone_client.endpoints.list():
# Within central cloud, use only internal endpoints
if OptimizedEndpointCache._is_central_cloud(endpoint.region):
if endpoint.interface != consts.KS_ENDPOINT_INTERNAL:
continue
# For other regions store only admin endpoints
elif endpoint.interface != consts.KS_ENDPOINT_ADMIN:
continue
# Add the endpoint url to the service endpoint map
service_name = service_id_name_map[endpoint.service_id]
service_endpoint_map[endpoint.region][service_name] = endpoint.url
return service_endpoint_map
@staticmethod
def _generate_master_service_endpoint_map() -> dict:
LOG.info("Generating service endpoint map")
# Get the master endpoint map using keystone
service_endpoint_map = OptimizedEndpointCache._get_master_endpoint_map()
# Insert the subcloud endpoints into the service_endpoint_map
if OptimizedEndpointCache.subcloud_endpoints:
LOG.debug("Inserting subcloud endpoints into service_endpoint_map")
service_endpoint_map.update(OptimizedEndpointCache.subcloud_endpoints)
return service_endpoint_map
def get_endpoint(self, service: str) -> Union[str, None]:
"""Get the endpoint for the specified service.
:param service: The service name.
:type service: str
return: service url or None
"""
try:
endpoint = self.service_endpoint_map[service]
except KeyError:
LOG.error(f"Unknown service: {service}")
endpoint = None
return endpoint
@lockutils.synchronized(LOCK_NAME)
def get_all_regions(self) -> List[str]:
"""Get region list.
return: List of regions
"""
return list(OptimizedEndpointCache.master_service_endpoint_map.keys())
def get_session_from_token(self, token: str, project_id: str) -> session.Session:
"""Get session based on token to communicate with openstack services.
:param token: token with which the request is triggered.
:type token: str
:param project_id: UUID of the project.
:type project_id: str
:return: session object.
"""
loader = loading.get_plugin_loader("token")
auth = loader.load_from_options(
auth_url=self.external_auth_url, token=token, project_id=project_id
)
return session.Session(auth=auth)
@lockutils.synchronized(LOCK_NAME)
def update_master_service_endpoint_region(
self, region_name: str, endpoint_values: dict
) -> None:
"""Update the master endpoint map for a specific region.
:param region_name: The name of the region.
:type region_name: str
:param endpoint_values: The endpoint values.
:type endpoint_values: dict
"""
LOG.info(
"Updating service endpoint map for region: "
f"{region_name} with endpoints: {endpoint_values}"
)
# Update the current endpoint map
OptimizedEndpointCache.master_service_endpoint_map[region_name] = (
endpoint_values
)
# Update the cached subcloud endpoit map
if OptimizedEndpointCache.subcloud_endpoints and not self._is_central_cloud(
region_name
):
LOG.debug(
"Updating subcloud_endpoints for region: "
f"{region_name} with endpoints: {endpoint_values}"
)
# pylint: disable-next=unsupported-assignment-operation
OptimizedEndpointCache.subcloud_endpoints[region_name] = endpoint_values
def refresh_subcloud_endpoints(self, region_name: str) -> None:
"""Refresh the subcloud endpoints.
:param region_name: The name of the region.
:type region_name: str
"""
LOG.info(f"Refreshing subcloud endpoinds of region_name: {region_name}")
if not OptimizedEndpointCache.fetch_subcloud_ips:
raise Exception(
f"Unable to fetch endpoints for region {region_name}: "
"missing fetch_subcloud_ips"
)
# pylint: disable-next=not-callable
subcloud_ip = OptimizedEndpointCache.fetch_subcloud_ips(region_name)
endpoint_map = build_subcloud_endpoint_map(subcloud_ip)
# pylint: disable-next=unsupported-assignment-operation
OptimizedEndpointCache.subcloud_endpoints[region_name] = endpoint_map
@lockutils.synchronized(LOCK_NAME)
def get_cached_master_keystone_client_and_region_endpoint_map(
self, region_name: str
) -> Tuple[ks_client.Client, dict]:
"""Get the cached master Keystone client and region endpoint map.
:param region_name: The name of the region.
:type region_name: str
:return: The master Keystone client and region endpoint map.
:rtype: tuple
"""
if OptimizedEndpointCache.master_keystone_client is None:
self._create_master_cached_data()
LOG.info(
"Generated Master keystone client and master token the "
"very first time"
)
else:
token_expiring_soon = is_token_expiring_soon(
token=OptimizedEndpointCache.master_token
)
# If token is expiring soon, initialize a new master keystone
# client
if token_expiring_soon:
LOG.info(
f"The cached keystone token for '{consts.CLOUD_0}' will expire "
f"soon: {OptimizedEndpointCache.master_token['expires_at']}"
)
self._create_master_cached_data()
LOG.info(
"Generated Master keystone client and master token as they "
"are expiring soon"
)
else:
# Check if the cached master service endpoint map needs to be
# refreshed
if region_name not in self.master_service_endpoint_map:
previous_size = len(
OptimizedEndpointCache.master_service_endpoint_map
)
if not self._is_central_cloud(region_name):
self.refresh_subcloud_endpoints(region_name)
OptimizedEndpointCache.master_service_endpoint_map = (
self._generate_master_service_endpoint_map()
)
current_size = len(
OptimizedEndpointCache.master_service_endpoint_map
)
LOG.info(
"Master endpoints list refreshed to include "
f"region {region_name}: "
f"prev_size={previous_size}, current_size={current_size}"
)
if region_name is not None:
region_service_endpoint_map = (
OptimizedEndpointCache.master_service_endpoint_map[region_name]
)
else:
region_service_endpoint_map = collections.defaultdict(dict)
return (
OptimizedEndpointCache.master_keystone_client,
region_service_endpoint_map,
)
@lockutils.synchronized(LOCK_NAME)
def re_initialize_master_keystone_client(self) -> None:
"""Reinitialize the master Keystone client."""
self._create_master_cached_data()
LOG.info("Generated Master keystone client and master token upon exception")
def _create_master_cached_data(self) -> None:
OptimizedEndpointCache.master_keystone_client = ks_client.Client(
session=self.admin_session, region_name=consts.CLOUD_0
)
OptimizedEndpointCache.master_token = (
OptimizedEndpointCache.master_keystone_client.tokens.validate(
OptimizedEndpointCache.master_keystone_client.session.get_token(),
include_catalog=False,
)
)
if OptimizedEndpointCache.master_services_list is None:
OptimizedEndpointCache.master_services_list = (
OptimizedEndpointCache.master_keystone_client.services.list()
)
OptimizedEndpointCache.master_service_endpoint_map = (
self._generate_master_service_endpoint_map()
)

View File

@ -46,6 +46,7 @@ from dccommon.drivers.openstack import vim
from dccommon import exceptions as dccommon_exceptions
from dccommon import kubeoperator
from dcmanager.common import consts
from dcmanager.common import context
from dcmanager.common import exceptions
from dcmanager.common.i18n import _
from dcmanager.db import api as db_api
@ -1613,3 +1614,23 @@ def generate_sync_info_message(association_ids):
info_message += (f"$ dcmanager peer-group-association"
f" sync {association_id}\n")
return info_message
def fetch_subcloud_mgmt_ips(region_name: str = None):
"""Fetch the subcloud(s) management IP(s).
:param region_name: The subcloud region name, defaults to None
:return: A dictionary of region names to IPs (if no region provided)
or a single IP string (for specific region).
"""
LOG.info(f"Fetching subcloud(s) management IP(s) ({region_name=})")
ctx = context.get_admin_context()
if region_name:
subcloud = db_api.subcloud_get_by_region_name(ctx, region_name)
return subcloud.management_start_ip
ip_map = {}
subclouds = db_api.subcloud_get_all(ctx)
for subcloud in subclouds:
ip_map[subcloud.region_name] = subcloud.management_start_ip
return ip_map