diff --git a/distributedcloud-client/.pylintrc b/distributedcloud-client/.pylintrc index 60b7b79..3d25182 100644 --- a/distributedcloud-client/.pylintrc +++ b/distributedcloud-client/.pylintrc @@ -29,14 +29,22 @@ load-plugins= # https://pylint.readthedocs.io/en/latest/user_guide/output.html#source-code-analysis-section # R detect Refactor for a "good practice" metric violation # W detect Warning for stylistic problems, or minor programming issues -disable=R, - broad-except, +disable=broad-except, invalid-name, missing-class-docstring, missing-function-docstring, missing-module-docstring, protected-access, + too-few-public-methods, + too-many-ancestors, + too-many-arguments, + too-many-branches, + too-many-instance-attributes, too-many-lines, + too-many-locals, + too-many-public-methods, + too-many-statements, + duplicate-code, [REPORTS] # Set the output format. Available formats are text, parseable, colorized, msvs diff --git a/distributedcloud-client/dcmanagerclient/api/base.py b/distributedcloud-client/dcmanagerclient/api/base.py index fd84988..b7c9c36 100644 --- a/distributedcloud-client/dcmanagerclient/api/base.py +++ b/distributedcloud-client/dcmanagerclient/api/base.py @@ -21,7 +21,7 @@ from bs4 import BeautifulSoup from dcmanagerclient import exceptions -class Resource(object): +class Resource: # This will be overridden by the actual resource resource_name = "Something" @@ -140,14 +140,14 @@ class Subcloud(Resource): @classmethod def from_payloads(cls, manager, payloads): """Returns a list of class instances from a payload list.""" - subclouds = list() + subclouds = [] for payload in payloads: subcloud = cls.from_payload(manager, payload) subclouds.append(subcloud) return subclouds -class ResourceManager(object): +class ResourceManager: resource_class = None def __init__(self, http_client): @@ -236,5 +236,4 @@ def get_json(response): json_field_or_function = getattr(response, "json", None) if callable(json_field_or_function): return response.json() - else: - return json.loads(response.content) + return json.loads(response.content) diff --git a/distributedcloud-client/dcmanagerclient/api/httpclient.py b/distributedcloud-client/dcmanagerclient/api/httpclient.py index 056d9f6..f26e849 100644 --- a/distributedcloud-client/dcmanagerclient/api/httpclient.py +++ b/distributedcloud-client/dcmanagerclient/api/httpclient.py @@ -42,7 +42,7 @@ def log_request(func): return decorator -class HTTPClient(object): +class HTTPClient: def __init__( self, base_url, diff --git a/distributedcloud-client/dcmanagerclient/api/v1/client.py b/distributedcloud-client/dcmanagerclient/api/v1/client.py index 2e6279a..93c69f8 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/client.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/client.py @@ -45,7 +45,7 @@ from dcmanagerclient.api.v1 import system_peer_manager as sp _DEFAULT_DCMANAGER_URL = "http://localhost:8119/v1.0" -class Client(object): +class Client: """Class where the communication from KB to Keystone happens.""" def __init__( diff --git a/distributedcloud-client/dcmanagerclient/api/v1/fw_update_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/fw_update_manager.py index 4df4d48..722eee6 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/fw_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/fw_update_manager.py @@ -1,5 +1,5 @@ # Copyright (c) 2017 Ericsson AB. -# Copyright (c) 2020-2021 Wind River Systems, Inc. +# Copyright (c) 2020-2021, 2024 Wind River Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,12 +16,10 @@ # from dcmanagerclient.api.v1.sw_update_manager import sw_update_manager -SW_UPDATE_TYPE_FIRMWARE = 'firmware' +SW_UPDATE_TYPE_FIRMWARE = "firmware" class fw_update_manager(sw_update_manager): def __init__(self, http_client): - super(fw_update_manager, self).__init__( - http_client, - update_type=SW_UPDATE_TYPE_FIRMWARE) + super().__init__(http_client, update_type=SW_UPDATE_TYPE_FIRMWARE) diff --git a/distributedcloud-client/dcmanagerclient/api/v1/kube_rootca_update_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/kube_rootca_update_manager.py index a8e604f..12d95a6 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/kube_rootca_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/kube_rootca_update_manager.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -11,7 +11,5 @@ SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE = "kube-rootca-update" class kube_rootca_update_manager(sw_update_manager): def __init__(self, http_client): - super(kube_rootca_update_manager, self).__init__( - http_client, - update_type=SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE) - self.extra_args = ['subject', 'expiry-date', 'cert-file'] + super().__init__(http_client, update_type=SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE) + self.extra_args = ["subject", "expiry-date", "cert-file"] diff --git a/distributedcloud-client/dcmanagerclient/api/v1/kube_upgrade_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/kube_upgrade_manager.py index 66b696e..a4af8e7 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/kube_upgrade_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/kube_upgrade_manager.py @@ -1,5 +1,5 @@ # Copyright (c) 2017 Ericsson AB. -# Copyright (c) 2020-2021 Wind River Systems, Inc. +# Copyright (c) 2020-2021, 2024 Wind River Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,7 +22,5 @@ SW_UPDATE_TYPE_KUBERNETES = "kubernetes" class kube_upgrade_manager(sw_update_manager): def __init__(self, http_client): - super(kube_upgrade_manager, self).__init__( - http_client, - update_type=SW_UPDATE_TYPE_KUBERNETES) - self.extra_args = ['to-version'] + super().__init__(http_client, update_type=SW_UPDATE_TYPE_KUBERNETES) + self.extra_args = ["to-version"] diff --git a/distributedcloud-client/dcmanagerclient/api/v1/peer_group_association_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/peer_group_association_manager.py index 7454e4d..4e4a6de 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/peer_group_association_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/peer_group_association_manager.py @@ -78,7 +78,7 @@ class peer_group_association_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["peer_group_associations"] - resource = list() + resource = [] for json_object in json_objects: resource.append(self._json_to_resource(json_object)) return resource diff --git a/distributedcloud-client/dcmanagerclient/api/v1/phased_subcloud_deploy_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/phased_subcloud_deploy_manager.py index 7d3bbfd..a5c91c7 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/phased_subcloud_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/phased_subcloud_deploy_manager.py @@ -25,16 +25,10 @@ class phased_subcloud_deploy_manager(base.ResourceManager): return getattr(self.http_client, method)(url, body, headers) def _deploy_operation(self, url, body, data, method="post"): - fields = dict() + fields = {} for k, v in body.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} diff --git a/distributedcloud-client/dcmanagerclient/api/v1/strategy_step_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/strategy_step_manager.py index d20b628..32c2df2 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/strategy_step_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/strategy_step_manager.py @@ -47,7 +47,7 @@ class StrategyStep(base.Resource): class strategy_step_manager(base.ResourceManager): def __init__(self, http_client): - super(strategy_step_manager, self).__init__(http_client) + super().__init__(http_client) self.resource_class = StrategyStep self.steps_url = "/sw-update-strategy/steps" self.response_key = "strategy-steps" @@ -88,6 +88,6 @@ class strategy_step_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.build_from_json(json_object)) return resource diff --git a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_backup_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_backup_manager.py index d797be0..8280105 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_backup_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_backup_manager.py @@ -20,18 +20,11 @@ class subcloud_backup_manager(base.ResourceManager): return self.resource_class.from_payload(self, json_object) def subcloud_backup_create(self, url, files, data): - - fields = dict() + fields = {} if files: for k, v in files.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} @@ -47,8 +40,7 @@ class subcloud_backup_manager(base.ResourceManager): return resource def subcloud_backup_delete(self, url, data): - - fields = dict() + fields = {} fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} @@ -61,18 +53,11 @@ class subcloud_backup_manager(base.ResourceManager): return None def subcloud_backup_restore(self, url, files, data): - - fields = dict() + fields = {} if files: for k, v in files.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} diff --git a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_deploy_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_deploy_manager.py index 8bdd160..dd19363 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_deploy_manager.py @@ -21,11 +21,16 @@ from dcmanagerclient.api.base import get_json class SubcloudDeploy(base.Resource): - resource_name = 'subcloud_deploy' + resource_name = "subcloud_deploy" - def __init__(self, deploy_playbook=None, deploy_overrides=None, - deploy_chart=None, prestage_images=None, - software_version=None): + def __init__( + self, + deploy_playbook=None, + deploy_overrides=None, + deploy_chart=None, + prestage_images=None, + software_version=None, + ): self.deploy_playbook = deploy_playbook self.deploy_overrides = deploy_overrides self.deploy_chart = deploy_chart @@ -37,17 +42,22 @@ class subcloud_deploy_manager(base.ResourceManager): resource_class = SubcloudDeploy def _process_json_response(self, json_object): - resource = list() - deploy_playbook = json_object.get('deploy_playbook') - deploy_overrides = json_object.get('deploy_overrides') - deploy_chart = json_object.get('deploy_chart') - prestage_images = json_object.get('prestage_images') - software_version = json_object.get('software_version') + resource = [] + deploy_playbook = json_object.get("deploy_playbook") + deploy_overrides = json_object.get("deploy_overrides") + deploy_chart = json_object.get("deploy_chart") + prestage_images = json_object.get("prestage_images") + software_version = json_object.get("software_version") resource.append( - self.resource_class(deploy_playbook, deploy_overrides, - deploy_chart, prestage_images, - software_version)) + self.resource_class( + deploy_playbook, + deploy_overrides, + deploy_chart, + prestage_images, + software_version, + ) + ) return resource @@ -56,17 +66,18 @@ class subcloud_deploy_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_response_key = get_json(resp) - json_object = json_response_key['subcloud_deploy'] + json_object = json_response_key["subcloud_deploy"] resource = self._process_json_response(json_object) return resource def _deploy_upload(self, url, files, data): - fields = dict() + fields = {} for k, v in files.items(): - fields.update({k: (v, open(v, 'rb'),)}) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) - headers = {'content-type': enc.content_type} + headers = {"content-type": enc.content_type} resp = self.http_client.post(url, enc, headers=headers) if resp.status_code != 200: self._raise_api_exception(resp) @@ -78,25 +89,24 @@ class subcloud_deploy_manager(base.ResourceManager): resp = self.http_client.delete(url) if resp.status_code != 200: self._raise_api_exception(resp) - return None def subcloud_deploy_show(self, release): - url = '/subcloud-deploy/' + url = "/subcloud-deploy/" if release: url += release return self._subcloud_deploy_detail(url) def subcloud_deploy_upload(self, **kwargs): - files = kwargs.get('files') - data = kwargs.get('data') - url = '/subcloud-deploy/' + files = kwargs.get("files") + data = kwargs.get("data") + url = "/subcloud-deploy/" return self._deploy_upload(url, files, data) def subcloud_deploy_delete(self, release, **kwargs): - url = '/subcloud-deploy/' - data = kwargs.get('data') + url = "/subcloud-deploy/" + data = kwargs.get("data") if release: - url += release + '/' - url += '?deployment_files=' + data['deployment_files'] - url += '&prestage_images=' + data['prestage_images'] + url += release + "/" + url += "?deployment_files=" + data["deployment_files"] + url += "&prestage_images=" + data["prestage_images"] return self._deploy_delete(url) diff --git a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_group_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_group_manager.py index 0e4173f..b6808ca 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_group_manager.py @@ -49,7 +49,7 @@ class subcloud_group_manager(base.ResourceManager): resource_class = SubcloudGroup def __init__(self, http_client, subcloud_manager): - super(subcloud_group_manager, self).__init__(http_client) + super().__init__(http_client) self.subcloud_manager = subcloud_manager def _json_to_resource(self, json_object): @@ -70,7 +70,7 @@ class subcloud_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource @@ -80,7 +80,7 @@ class subcloud_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource @@ -100,7 +100,7 @@ class subcloud_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource diff --git a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_manager.py index 4713a25..977fd92 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_manager.py @@ -30,16 +30,10 @@ class subcloud_manager(base.ResourceManager): return self.resource_class.from_payload(self, json_object) def subcloud_create(self, url, body, data): - fields = dict() + fields = {} for k, v in body.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} @@ -47,22 +41,16 @@ class subcloud_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource def subcloud_update(self, url, body, data): - fields = dict() - if body is not None: + fields = {} + if body: for k, v in body.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} @@ -70,21 +58,15 @@ class subcloud_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource def subcloud_redeploy(self, url, body, data): - fields = dict() + fields = {} for k, v in body.items(): - fields.update( - { - k: ( - v, - open(v, "rb"), - ) - } - ) + with open(v, "rb") as file: + fields.update({k: (v, file)}) fields.update(data) enc = MultipartEncoder(fields=fields) headers = {"content-type": enc.content_type} @@ -92,7 +74,7 @@ class subcloud_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource @@ -102,7 +84,7 @@ class subcloud_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) if json_object.get("prestage_software_version"): resource[0].prestage_software_version = json_object[ diff --git a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_peer_group_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_peer_group_manager.py index 5c72966..e13e4ac 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/subcloud_peer_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/subcloud_peer_group_manager.py @@ -44,7 +44,7 @@ class subcloud_peer_group_manager(base.ResourceManager): resource_class = SubcloudPeerGroup def __init__(self, http_client, subcloud_manager): - super(subcloud_peer_group_manager, self).__init__(http_client) + super().__init__(http_client) self.subcloud_manager = subcloud_manager def json_to_resource(self, json_object): @@ -66,7 +66,7 @@ class subcloud_peer_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource @@ -75,7 +75,7 @@ class subcloud_peer_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(json_object) return resource @@ -85,7 +85,7 @@ class subcloud_peer_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource @@ -95,7 +95,7 @@ class subcloud_peer_group_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["subcloud_peer_groups"] - resource = list() + resource = [] for json_object in json_objects: resource.append(self.json_to_resource(json_object)) return resource @@ -106,7 +106,7 @@ class subcloud_peer_group_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self.json_to_resource(json_object)) return resource @@ -116,7 +116,7 @@ class subcloud_peer_group_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["subclouds"] - resource = list() + resource = [] for json_object in json_objects: resource.append(self.subcloud_manager.json_to_resource(json_object)) return resource @@ -128,7 +128,7 @@ class subcloud_peer_group_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["subclouds"] - resource = list() + resource = [] for json_object in json_objects: resource.append(self.subcloud_manager.json_to_resource(json_object)) return resource diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_patch_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_patch_manager.py index 6c4346e..051a5b4 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_patch_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_patch_manager.py @@ -1,5 +1,5 @@ # Copyright (c) 2017 Ericsson AB. -# Copyright (c) 2017-2023 Wind River Systems, Inc. +# Copyright (c) 2017-2024 Wind River Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,12 +16,11 @@ # from dcmanagerclient.api.v1.sw_update_manager import sw_update_manager -SW_UPDATE_TYPE_PATCH = 'patch' +SW_UPDATE_TYPE_PATCH = "patch" class sw_patch_manager(sw_update_manager): def __init__(self, http_client): - super(sw_patch_manager, self).__init__( - http_client, - update_type=SW_UPDATE_TYPE_PATCH, - extra_args=['upload-only']) + super().__init__( + http_client, update_type=SW_UPDATE_TYPE_PATCH, extra_args=["upload-only"] + ) diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_prestage_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_prestage_manager.py index 6ac178a..983c1ae 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_prestage_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_prestage_manager.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023 Wind River Systems, Inc. +# Copyright (c) 2022-2024 Wind River Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,13 +15,14 @@ # from dcmanagerclient.api.v1.sw_update_manager import sw_update_manager -SW_UPDATE_TYPE_PRESTAGE = 'prestage' +SW_UPDATE_TYPE_PRESTAGE = "prestage" class sw_prestage_manager(sw_update_manager): def __init__(self, http_client): - super(sw_prestage_manager, self).__init__( + super().__init__( http_client, update_type=SW_UPDATE_TYPE_PRESTAGE, - extra_args=['prestage-software-version']) + extra_args=["prestage-software-version"], + ) diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_strategy_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_strategy_manager.py index 11ef960..5b913af 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_strategy_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_strategy_manager.py @@ -19,7 +19,7 @@ from dcmanagerclient.api.v1.sw_update_manager import sw_update_manager class sw_strategy_manager(sw_update_manager): def __init__(self, http_client, url="sw-update-strategy"): - super(sw_strategy_manager, self).__init__(http_client, update_type=None) + super().__init__(http_client, update_type=None) # Removing strategy type from base class parameters self.get_url = f"/{url}" diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_update_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_update_manager.py index 325aee8..9373914 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_update_manager.py @@ -63,7 +63,7 @@ class sw_update_manager(base.ResourceManager): url="sw-update-strategy", extra_args=None, ): - super(sw_update_manager, self).__init__(http_client) + super().__init__(http_client) self.resource_class = resource_class self.update_type = update_type # create_url is typically // @@ -111,8 +111,7 @@ class sw_update_manager(base.ResourceManager): args_dict[x] = arg if args_dict: return args_dict - else: - return None + return None def _build_from_json(self, json_object): return self.resource_class( @@ -133,7 +132,7 @@ class sw_update_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._build_from_json(json_object)) return resource @@ -142,7 +141,7 @@ class sw_update_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._build_from_json(json_object)) return resource @@ -151,7 +150,7 @@ class sw_update_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._build_from_json(json_object)) return resource @@ -161,6 +160,6 @@ class sw_update_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._build_from_json(json_object)) return resource diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_update_options_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_update_options_manager.py index dbbaf75..e364ba6 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_update_options_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_update_options_manager.py @@ -83,7 +83,7 @@ class sw_update_options_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append( self.resource_class( self, @@ -133,7 +133,7 @@ class sw_update_options_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append( self.resource_class( self, diff --git a/distributedcloud-client/dcmanagerclient/api/v1/sw_upgrade_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/sw_upgrade_manager.py index 54379c6..0c7533b 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/sw_upgrade_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/sw_upgrade_manager.py @@ -22,6 +22,4 @@ SW_UPDATE_TYPE_UPGRADE = "upgrade" class sw_upgrade_manager(sw_update_manager): def __init__(self, http_client): - super(sw_upgrade_manager, self).__init__( - http_client, update_type=SW_UPDATE_TYPE_UPGRADE - ) + super().__init__(http_client, update_type=SW_UPDATE_TYPE_UPGRADE) diff --git a/distributedcloud-client/dcmanagerclient/api/v1/system_peer_manager.py b/distributedcloud-client/dcmanagerclient/api/v1/system_peer_manager.py index d375a3d..f8765ba 100644 --- a/distributedcloud-client/dcmanagerclient/api/v1/system_peer_manager.py +++ b/distributedcloud-client/dcmanagerclient/api/v1/system_peer_manager.py @@ -54,7 +54,7 @@ class system_peer_manager(base.ResourceManager): resource_class = SystemPeer def __init__(self, http_client, subcloud_peer_group_manager): - super(system_peer_manager, self).__init__(http_client) + super().__init__(http_client) self.subcloud_peer_group_manager = subcloud_peer_group_manager def _json_to_resource(self, json_object): @@ -86,7 +86,7 @@ class system_peer_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource @@ -96,7 +96,7 @@ class system_peer_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource @@ -106,7 +106,7 @@ class system_peer_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["system_peers"] - resource = list() + resource = [] for json_object in json_objects: resource.append(self._json_to_resource(json_object)) return resource @@ -116,7 +116,7 @@ class system_peer_manager(base.ResourceManager): if resp.status_code != 200: self._raise_api_exception(resp) json_object = get_json(resp) - resource = list() + resource = [] resource.append(self._json_to_resource(json_object)) return resource @@ -126,7 +126,7 @@ class system_peer_manager(base.ResourceManager): self._raise_api_exception(resp) json_response_key = get_json(resp) json_objects = json_response_key["subcloud_peer_groups"] - resource = list() + resource = [] for json_object in json_objects: resource.append( self.subcloud_peer_group_manager.json_to_resource(json_object) diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/alarm_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/alarm_manager.py index d59b721..c1e360f 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/alarm_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/alarm_manager.py @@ -50,7 +50,7 @@ class ListAlarmSummary(base.DCManagerLister): return basic_format def get_parser(self, prog_name): - parser = super(ListAlarmSummary, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/base.py b/distributedcloud-client/dcmanagerclient/commands/v1/base.py index 728b128..c1c769b 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/base.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/base.py @@ -1,5 +1,5 @@ # Copyright (c) 2016 Ericsson AB -# Copyright (c) 2017, 2019, 2021-2022 Wind River Systems, Inc. +# Copyright (c) 2017, 2019, 2021-2022, 2024 Wind River Systems, Inc. # All Rights Reserved # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -18,8 +18,8 @@ import abc -from osc_lib.command import command import six +from osc_lib.command import command @six.add_metaclass(abc.ABCMeta) @@ -49,8 +49,7 @@ class DCManagerLister(command.Lister): if data: return f()[0], data - else: - return f() + return f() @six.add_metaclass(abc.ABCMeta) @@ -81,8 +80,7 @@ class DCManagerShowOne(command.ShowOne): if data: return (columns[0], data[0]) - else: - return f() + return f() @six.add_metaclass(abc.ABCMeta) @@ -98,15 +96,13 @@ class DCManagerShow(DCManagerLister, DCManagerShowOne): if self.should_list(parsed_args): return DCManagerLister.take_action(self, parsed_args) - else: - return DCManagerShowOne.take_action(self, parsed_args) + return DCManagerShowOne.take_action(self, parsed_args) def produce_output(self, parsed_args, column_names, data): """Overrides method from cliff.Lister/cliff.ShowOne.""" if self.should_list(parsed_args): - return DCManagerLister.produce_output(self, parsed_args, - column_names, data) - else: - return DCManagerShowOne.produce_output(self, parsed_args, - column_names, data) + return DCManagerLister.produce_output( + self, parsed_args, column_names, data + ) + return DCManagerShowOne.produce_output(self, parsed_args, column_names, data) diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/fw_update_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/fw_update_manager.py index f27c868..d87f493 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/fw_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/fw_update_manager.py @@ -16,7 +16,7 @@ from dcmanagerclient.commands.v1 import sw_update_manager -class FwUpdateManagerMixin(object): +class FwUpdateManagerMixin: """This Mixin provides the update manager used for firmware updates.""" def get_sw_update_manager(self): diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/kube_rootca_update_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/kube_rootca_update_manager.py index e91298f..c4baa89 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/kube_rootca_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/kube_rootca_update_manager.py @@ -8,7 +8,7 @@ import os from dcmanagerclient.commands.v1 import sw_update_manager -class KubeRootcaUpdateManagerMixin(object): +class KubeRootcaUpdateManagerMixin: """This Mixin provides the update manager used for kube rootca updates.""" def get_sw_update_manager(self): @@ -25,7 +25,7 @@ class CreateKubeRootcaUpdateStrategy( """ def get_parser(self, prog_name): - parser = super(CreateKubeRootcaUpdateStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--subject", required=False, diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/kube_upgrade_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/kube_upgrade_manager.py index a5e2cc4..5325134 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/kube_upgrade_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/kube_upgrade_manager.py @@ -16,7 +16,7 @@ from dcmanagerclient.commands.v1 import sw_update_manager -class KubeUpgradeManagerMixin(object): +class KubeUpgradeManagerMixin: """This Mixin provides the update manager used for kubernetes upgrades.""" def get_sw_update_manager(self): @@ -30,7 +30,7 @@ class CreateKubeUpgradeStrategy( """Create a kubernetes upgrade strategy.""" def get_parser(self, prog_name): - parser = super(CreateKubeUpgradeStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--to-version", required=False, diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/peer_group_association_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/peer_group_association_manager.py index d8f0bdf..ed49161 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/peer_group_association_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/peer_group_association_manager.py @@ -76,7 +76,7 @@ class AddPeerGroupAssociation(base.DCManagerShowOne): return detail_association_format def get_parser(self, prog_name): - parser = super(AddPeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--peer-group-id", required=True, help="Subcloud peer group ID." @@ -112,7 +112,7 @@ class ListPeerGroupAssociation(base.DCManagerLister): return association_format def get_parser(self, prog_name): - parser = super(ListPeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): @@ -128,7 +128,7 @@ class ShowPeerGroupAssociation(base.DCManagerShowOne): return detail_association_format def get_parser(self, prog_name): - parser = super(ShowPeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "id", help="ID of the peer group association to view the details." @@ -150,7 +150,7 @@ class SyncPeerGroupAssociation(base.DCManagerShowOne): return detail_association_format def get_parser(self, prog_name): - parser = super(SyncPeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("id", help="ID of the peer group association to sync.") @@ -167,7 +167,7 @@ class DeletePeerGroupAssociation(command.Command): """Delete peer group association from the database.""" def get_parser(self, prog_name): - parser = super(DeletePeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("id", help="ID of the peer group association to delete.") return parser @@ -190,7 +190,7 @@ class UpdatePeerGroupAssociation(base.DCManagerShowOne): return detail_association_format def get_parser(self, prog_name): - parser = super(UpdatePeerGroupAssociation, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("id", help="ID of the peer group association to update.") diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/phased_subcloud_deploy_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/phased_subcloud_deploy_manager.py index 47a5938..a07fbf0 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/phased_subcloud_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/phased_subcloud_deploy_manager.py @@ -18,7 +18,7 @@ class AbortPhasedSubcloudDeploy(base.DCManagerShowOne): return utils.subcloud_detail_format def get_parser(self, prog_name): - parser = super(AbortPhasedSubcloudDeploy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", @@ -108,8 +108,8 @@ class PhasedSubcloudDeployResume(base.DCManagerShowOne): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.phased_subcloud_deploy_manager.\ phased_subcloud_deploy_manager - files = dict() - data = dict() + files = {} + data = {} if parsed_args.bootstrap_address: data["bootstrap-address"] = parsed_args.bootstrap_address @@ -228,8 +228,8 @@ class CreatePhasedSubcloudDeploy(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.phased_subcloud_deploy_manager.\ phased_subcloud_deploy_manager - files = dict() - data = dict() + files = {} + data = {} data["bootstrap-address"] = parsed_args.bootstrap_address @@ -283,7 +283,7 @@ class InstallPhasedSubcloudDeploy(base.DCManagerShowOne): return utils.subcloud_detail_format def get_parser(self, prog_name): - parser = super(InstallPhasedSubcloudDeploy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", help="Name or ID of the subcloud to install." @@ -324,8 +324,8 @@ class InstallPhasedSubcloudDeploy(base.DCManagerShowOne): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.phased_subcloud_deploy_manager.\ phased_subcloud_deploy_manager - files = dict() - data = dict() + files = {} + data = {} # Prompt the user for the subcloud's password if it isn't provided if parsed_args.sysadmin_password is not None: @@ -402,8 +402,8 @@ class BootstrapPhasedSubcloudDeploy(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.phased_subcloud_deploy_manager.\ phased_subcloud_deploy_manager - files = dict() - data = dict() + files = {} + data = {} if parsed_args.bootstrap_address: data["bootstrap-address"] = parsed_args.bootstrap_address @@ -441,7 +441,7 @@ class ConfigPhasedSubcloudDeploy(base.DCManagerShowOne): return utils.subcloud_detail_format def get_parser(self, prog_name): - parser = super(ConfigPhasedSubcloudDeploy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("subcloud", help="Name or ID of the subcloud to update.") @@ -465,8 +465,8 @@ class ConfigPhasedSubcloudDeploy(base.DCManagerShowOne): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.phased_subcloud_deploy_manager.\ phased_subcloud_deploy_manager - files = dict() - data = dict() + files = {} + data = {} # Get the deploy config yaml file if parsed_args.deploy_config is not None: diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_backup_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_backup_manager.py index 2f4442a..3d7b02a 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_backup_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_backup_manager.py @@ -82,7 +82,7 @@ class CreateSubcloudBackup(base.DCManagerShow): return parsed_args.group def get_parser(self, prog_name): - parser = super(CreateSubcloudBackup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--local-only", @@ -132,13 +132,11 @@ class CreateSubcloudBackup(base.DCManagerShow): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_backup_manager - data = dict() - files = dict() + data = {} + files = {} if not parsed_args.subcloud and not parsed_args.group: - error_msg = ( - "Please provide the subcloud or subcloud group name or id." - ) + error_msg = "Please provide the subcloud or subcloud group name or id." raise exceptions.DCManagerClientException(error_msg) if parsed_args.subcloud and parsed_args.group: @@ -210,7 +208,7 @@ class DeleteSubcloudBackup(command.Command): return detail_format def get_parser(self, prog_name): - parser = super(DeleteSubcloudBackup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "release", help="Release version that the user is trying to delete." @@ -250,14 +248,12 @@ class DeleteSubcloudBackup(command.Command): dcmanager_client = self.app.client_manager.subcloud_backup_manager release_version = parsed_args.release subcloud_ref = parsed_args.subcloud - data = dict() + data = {} data["release"] = parsed_args.release if not parsed_args.subcloud and not parsed_args.group: - error_msg = ( - "Please provide the subcloud or subcloud group name or id." - ) + error_msg = "Please provide the subcloud or subcloud group name or id." raise exceptions.DCManagerClientException(error_msg) if parsed_args.subcloud and parsed_args.group: @@ -309,7 +305,7 @@ class RestoreSubcloudBackup(base.DCManagerShow): return parsed_args.group def get_parser(self, prog_name): - parser = super(RestoreSubcloudBackup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--with-install", @@ -377,13 +373,11 @@ class RestoreSubcloudBackup(base.DCManagerShow): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_backup_manager - data = dict() - files = dict() + data = {} + files = {} if not parsed_args.subcloud and not parsed_args.group: - error_msg = ( - "Please provide the subcloud or subcloud group name or id." - ) + error_msg = "Please provide the subcloud or subcloud group name or id." raise exceptions.DCManagerClientException(error_msg) if parsed_args.subcloud and parsed_args.group: diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_deploy_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_deploy_manager.py index 40cbd1b..54a69bf 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_deploy_manager.py @@ -17,19 +17,19 @@ import os from osc_lib.command import command -from dcmanagerclient.commands.v1 import base from dcmanagerclient import exceptions +from dcmanagerclient.commands.v1 import base def _format(subcloud_deploy=None): columns = ( - 'deploy_playbook', - 'deploy_overrides', - 'deploy_chart', - 'prestage_images', - 'software_version' + "deploy_playbook", + "deploy_overrides", + "deploy_chart", + "prestage_images", + "software_version", ) - temp = list() + temp = [] try: temp.append(subcloud_deploy.deploy_playbook) except Exception: @@ -63,74 +63,77 @@ class SubcloudDeployUpload(base.DCManagerShowOne): return _format def get_parser(self, prog_name): - parser = super(SubcloudDeployUpload, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( - '--deploy-playbook', + "--deploy-playbook", required=False, - help='An ansible playbook to be run after the subcloud ' - 'has been successfully bootstrapped. It will be run with the ' - 'subcloud as the target and authentication is ' - 'handled automatically. ' - 'Must be a local file path' + help="An ansible playbook to be run after the subcloud " + "has been successfully bootstrapped. It will be run with the " + "subcloud as the target and authentication is " + "handled automatically. " + "Must be a local file path", ) parser.add_argument( - '--deploy-overrides', + "--deploy-overrides", required=False, - help='YAML file containing subcloud variables to be passed to the ' - 'deploy playbook.' - 'Must be a local file path' + help="YAML file containing subcloud variables to be passed to the " + "deploy playbook." + "Must be a local file path", ) parser.add_argument( - '--deploy-chart', + "--deploy-chart", required=False, - help='Deployment Manager helm chart to be passed to the ' - 'deploy playbook.' - 'Must be a local file path' + help="Deployment Manager helm chart to be passed to the " + "deploy playbook." + "Must be a local file path", ) parser.add_argument( - '--prestage-images', + "--prestage-images", required=False, - help='Container image list to be passed to ' - 'prestage_images playbook. ' - 'Must be a local file path' + help="Container image list to be passed to " + "prestage_images playbook. " + "Must be a local file path", ) parser.add_argument( - '--release', + "--release", required=False, - help='software release used to install, bootstrap and/or deploy ' - 'the subcloud with. If not specified, the current software ' - 'release of the system controller will be used.' + help="software release used to install, bootstrap and/or deploy " + "the subcloud with. If not specified, the current software " + "release of the system controller will be used.", ) return parser def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_deploy_manager - data = dict() - files = dict() - variable_dict = {'deploy_playbook': parsed_args.deploy_playbook, - 'deploy_overrides': parsed_args.deploy_overrides, - 'deploy_chart': parsed_args.deploy_chart, - 'prestage_images': parsed_args.prestage_images} + data = {} + files = {} + variable_dict = { + "deploy_playbook": parsed_args.deploy_playbook, + "deploy_overrides": parsed_args.deploy_overrides, + "deploy_chart": parsed_args.deploy_chart, + "prestage_images": parsed_args.prestage_images, + } for key, val in variable_dict.items(): if val is None: continue - elif not os.path.isfile(val): + if not os.path.isfile(val): error_msg = f"{key} file does not exist: {val}" raise exceptions.DCManagerClientException(error_msg) files[key] = val if parsed_args.release is not None: - data['release'] = parsed_args.release + data["release"] = parsed_args.release try: - return dcmanager_client.subcloud_deploy_manager.\ - subcloud_deploy_upload(files=files, data=data) + return dcmanager_client.subcloud_deploy_manager.subcloud_deploy_upload( + files=files, data=data + ) except Exception as e: print(e) error_msg = "Unable to upload subcloud deploy files" @@ -144,34 +147,39 @@ class SubcloudDeployShow(base.DCManagerShowOne): return _format def get_parser(self, prog_name): - parser = super(SubcloudDeployShow, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( - '--release', + "--release", required=False, - help='software release used to install, bootstrap and/or deploy ' - 'the subcloud with. If not specified, the current software ' - 'release of the system controller will be used.' + help="software release used to install, bootstrap and/or deploy " + "the subcloud with. If not specified, the current software " + "release of the system controller will be used.", ) return parser def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_deploy_manager return dcmanager_client.subcloud_deploy_manager.subcloud_deploy_show( - parsed_args.release) + parsed_args.release + ) class DeprecatedSubcloudDeployShow(SubcloudDeployShow): def _get_resources(self, parsed_args): - deprecation_msg = ('This command has been deprecated. Please use ' - 'subcloud deploy show instead.') + deprecation_msg = ( + "This command has been deprecated. Please use " + "subcloud deploy show instead." + ) raise exceptions.DCManagerClientException(deprecation_msg) class DeprecatedSubcloudDeployUpload(SubcloudDeployUpload): def _get_resources(self, parsed_args): - deprecation_msg = ('This command has been deprecated. Please use ' - 'subcloud deploy upload instead.') + deprecation_msg = ( + "This command has been deprecated. Please use " + "subcloud deploy upload instead." + ) raise exceptions.DCManagerClientException(deprecation_msg) @@ -179,29 +187,28 @@ class SubcloudDeployDelete(command.Command): """Delete the uploaded subcloud deployment files""" def get_parser(self, prog_name): - parser = super(SubcloudDeployDelete, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( - '--release', + "--release", required=False, - help='Release version that the user is trying to delete ' - 'If not specified, the current software ' - 'release of the system controller will be used.' + help="Release version that the user is trying to delete " + "If not specified, the current software " + "release of the system controller will be used.", ) parser.add_argument( - '--prestage-images', + "--prestage-images", required=False, - action='store_true', - help='Delete prestage images list file only ' + action="store_true", + help="Delete prestage images list file only ", ) parser.add_argument( - '--deployment-files', + "--deployment-files", required=False, - action='store_true', - help='Delete deploy playbook, deploy overrides, ' - 'deploy chart files ' + action="store_true", + help="Delete deploy playbook, deploy overrides, deploy chart files ", ) return parser @@ -209,15 +216,16 @@ class SubcloudDeployDelete(command.Command): def take_action(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_deploy_manager release = parsed_args.release - data = dict() + data = {} if parsed_args.prestage_images is not None: - data['prestage_images'] = str(parsed_args.prestage_images) + data["prestage_images"] = str(parsed_args.prestage_images) if parsed_args.deployment_files is not None: - data['deployment_files'] = str(parsed_args.deployment_files) + data["deployment_files"] = str(parsed_args.deployment_files) try: - dcmanager_client.subcloud_deploy_manager.\ - subcloud_deploy_delete(release, data=data) + dcmanager_client.subcloud_deploy_manager.subcloud_deploy_delete( + release, data=data + ) except Exception as e: print(e) error_msg = "Unable to delete subcloud deploy files" diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_group_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_group_manager.py index 362d914..a5f79d6 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_group_manager.py @@ -80,7 +80,7 @@ class AddSubcloudGroup(base.DCManagerShowOne): return detail_group_format def get_parser(self, prog_name): - parser = super(AddSubcloudGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--name", required=True, help="Name for the new subcloud group." @@ -110,7 +110,7 @@ class AddSubcloudGroup(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_group_manager - kwargs = dict() + kwargs = {} if parsed_args.name is not None: kwargs["name"] = parsed_args.name @@ -133,7 +133,7 @@ class ListSubcloudGroup(base.DCManagerLister): return group_format def get_parser(self, prog_name): - parser = super(ListSubcloudGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): @@ -148,7 +148,7 @@ class ListSubcloudGroupSubclouds(base.DCManagerLister): return detail_format def get_parser(self, prog_name): - parser = super(ListSubcloudGroupSubclouds, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of subcloud group to list associated subclouds.", @@ -174,7 +174,7 @@ class ShowSubcloudGroup(base.DCManagerShowOne): return detail_group_format def get_parser(self, prog_name): - parser = super(ShowSubcloudGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of subcloud group to view the details." @@ -194,7 +194,7 @@ class DeleteSubcloudGroup(command.Command): """Delete subcloud group details from the database.""" def get_parser(self, prog_name): - parser = super(DeleteSubcloudGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of the subcloud group to delete." @@ -221,7 +221,7 @@ class UpdateSubcloudGroup(base.DCManagerShowOne): return detail_group_format def get_parser(self, prog_name): - parser = super(UpdateSubcloudGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of the subcloud group to update." @@ -251,7 +251,7 @@ class UpdateSubcloudGroup(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_group_ref = parsed_args.group dcmanager_client = self.app.client_manager.subcloud_group_manager - kwargs = dict() + kwargs = {} if parsed_args.name: kwargs["name"] = parsed_args.name if parsed_args.description: diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_manager.py index 414504b..51b3bbe 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_manager.py @@ -195,7 +195,7 @@ class AddSubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(AddSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("--name", required=False, help="Subcloud name") @@ -264,8 +264,8 @@ class AddSubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_manager - files = dict() - data = dict() + files = {} + data = {} data["bootstrap-address"] = parsed_args.bootstrap_address # Get the install values yaml file @@ -345,21 +345,19 @@ class ListSubcloud(base.DCManagerLister): """List subclouds.""" def __init__(self, app, app_args): - super(ListSubcloud, self).__init__(app, app_args) + super().__init__(app, app_args) # Set a flag to indicate displaying a basic column list or # a list with customized or all columns self.show_basic_list = True def _validate_parsed_args(self, parsed_args): - self.show_basic_list = ( - False if parsed_args.columns or parsed_args.detail else True - ) + self.show_basic_list = not (parsed_args.columns or parsed_args.detail) def _get_format_function(self): return basic_format if self.show_basic_list else detail_list_format def get_parser(self, prog_name): - parser = super(ListSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--all", required=False, @@ -399,7 +397,7 @@ class ShowSubcloud(base.DCManagerShowOne): return detail_show_format def get_parser(self, prog_name): - parser = super(ShowSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", help="Name or ID of subcloud to view the details." @@ -421,15 +419,14 @@ class ShowSubcloud(base.DCManagerShowOne): return dcmanager_client.subcloud_manager.subcloud_additional_details( subcloud_ref ) - else: - return dcmanager_client.subcloud_manager.subcloud_detail(subcloud_ref) + return dcmanager_client.subcloud_manager.subcloud_detail(subcloud_ref) class ShowSubcloudError(command.Command): """Show the error of the last failed operation.""" def get_parser(self, prog_name): - parser = super(ShowSubcloudError, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", help="Name or ID of subcloud to view the errors details." @@ -448,7 +445,7 @@ class DeleteSubcloud(command.Command): """Delete subcloud details from the database.""" def get_parser(self, prog_name): - parser = super(DeleteSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("subcloud", help="Name or ID of the subcloud to delete.") return parser @@ -471,7 +468,7 @@ class UnmanageSubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(UnmanageSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", help="Name or ID of the subcloud to unmanage." @@ -489,7 +486,7 @@ class UnmanageSubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.subcloud_manager - kwargs = dict() + kwargs = {} kwargs["management-state"] = "unmanaged" if parsed_args.migrate: @@ -514,7 +511,7 @@ class ManageSubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(ManageSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("subcloud", help="Name or ID of the subcloud to manage.") @@ -530,7 +527,7 @@ class ManageSubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.subcloud_manager - kwargs = dict() + kwargs = {} kwargs["management-state"] = "managed" if parsed_args.force: kwargs["force"] = "true" @@ -554,7 +551,7 @@ class UpdateSubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(UpdateSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("subcloud", help="Name or ID of the subcloud to update.") @@ -635,8 +632,8 @@ class UpdateSubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.subcloud_manager - files = dict() - data = dict() + files = {} + data = {} if parsed_args.name: data["name"] = parsed_args.name @@ -773,7 +770,7 @@ class RedeploySubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(RedeploySubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", help="Name or ID of the subcloud to redeploy." @@ -827,8 +824,8 @@ class RedeploySubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.subcloud_manager - files = dict() - data = dict() + files = {} + data = {} # Get the install values yaml file if parsed_args.install_values is not None: @@ -908,7 +905,7 @@ class RestoreSubcloud(base.DCManagerShowOne): return detail_format def get_parser(self, prog_name): - parser = super(RestoreSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--restore-values", @@ -951,7 +948,7 @@ class PrestageSubcloud(base.DCManagerShowOne): return detail_prestage_format def get_parser(self, prog_name): - parser = super(PrestageSubcloud, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--sysadmin-password", @@ -984,7 +981,7 @@ class PrestageSubcloud(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.subcloud_manager - data = dict() + data = {} if parsed_args.force: data["force"] = "true" diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_peer_group_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_peer_group_manager.py index bdb667d..47474c0 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_peer_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/subcloud_peer_group_manager.py @@ -49,7 +49,7 @@ class MigrateSubcloudPeerGroup(base.DCManagerLister): return utils.subcloud_detail_format def get_parser(self, prog_name): - parser = super(MigrateSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of the subcloud peer group to migrate." @@ -66,7 +66,7 @@ class MigrateSubcloudPeerGroup(base.DCManagerLister): def _get_resources(self, parsed_args): subcloud_peer_group_ref = parsed_args.group dcmanager_client = self.app.client_manager.subcloud_peer_group_manager - kwargs = dict() + kwargs = {} if parsed_args.sysadmin_password is not None: kwargs["sysadmin_password"] = base64.b64encode( @@ -94,7 +94,7 @@ class AddSubcloudPeerGroup(base.DCManagerShowOne): return group_format def get_parser(self, prog_name): - parser = super(AddSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--peer-group-name", @@ -121,7 +121,7 @@ class AddSubcloudPeerGroup(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.subcloud_peer_group_manager - kwargs = dict() + kwargs = {} kwargs["peer-group-name"] = parsed_args.peer_group_name @@ -140,7 +140,7 @@ class DeleteSubcloudPeerGroup(command.Command): """Delete subcloud peer group details from the database.""" def get_parser(self, prog_name): - parser = super(DeleteSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of the subcloud peer group to delete." @@ -167,7 +167,7 @@ class ShowSubcloudPeerGroup(base.DCManagerShowOne): return group_format def get_parser(self, prog_name): - parser = super(ShowSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of subcloud peer group to view the details." @@ -192,7 +192,7 @@ class ListSubcloudPeerGroup(base.DCManagerLister): return group_format def get_parser(self, prog_name): - parser = super(ListSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): @@ -209,7 +209,7 @@ class ListSubcloudPeerGroupSubclouds(base.DCManagerLister): return utils.subcloud_detail_format def get_parser(self, prog_name): - parser = super(ListSubcloudPeerGroupSubclouds, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of subcloud peer group to list " @@ -232,7 +232,7 @@ class UpdateSubcloudPeerGroup(base.DCManagerShowOne): return group_format def get_parser(self, prog_name): - parser = super(UpdateSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of the subcloud peer group to update." @@ -262,7 +262,7 @@ class UpdateSubcloudPeerGroup(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_peer_group_ref = parsed_args.group dcmanager_client = self.app.client_manager.subcloud_peer_group_manager - kwargs = dict() + kwargs = {} if parsed_args.peer_group_name is not None: kwargs["peer-group-name"] = parsed_args.peer_group_name @@ -324,7 +324,7 @@ class StatusSubcloudPeerGroup(base.DCManagerShowOne): return detail_status_format def get_parser(self, prog_name): - parser = super(StatusSubcloudPeerGroup, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "group", help="Name or ID of subcloud peer group to view the status." diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_deploy_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_deploy_manager.py index d3f1dfe..4b4ef12 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_deploy_manager.py @@ -7,7 +7,7 @@ from dcmanagerclient.commands.v1 import sw_update_manager -class SwDeployManagerMixin(object): +class SwDeployManagerMixin: """This Mixin provides the manager used for software deploy releases.""" def get_sw_update_manager(self): @@ -24,7 +24,7 @@ class SwDeployManagerMixin(object): # Find the index of 'stop on failure' in the tuple failure_status_index = columns.index("stop on failure") - # Insert the 'release_id' field before the 'stop on failure', + # Insert the 'release_id' field after the 'stop on failure', columns = ( columns[:failure_status_index + 1] + ("release_id",) @@ -55,7 +55,7 @@ class CreateSwDeployStrategy( ) def get_parser(self, prog_name): - parser = super(CreateSwDeployStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--release-id", diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_patch_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_patch_manager.py index def8881..d4d26b4 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_patch_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_patch_manager.py @@ -16,7 +16,7 @@ from dcmanagerclient.commands.v1 import sw_update_manager -class SwPatchManagerMixin(object): +class SwPatchManagerMixin: """This Mixin provides the update manager used for sw patch.""" def get_sw_update_manager(self): @@ -49,7 +49,7 @@ class CreatePatchUpdateStrategy( """Create a patch update strategy.""" def get_parser(self, prog_name): - parser = super(CreatePatchUpdateStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--upload-only", diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_prestage_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_prestage_manager.py index b4a9034..4426c0a 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_prestage_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_prestage_manager.py @@ -19,7 +19,7 @@ from dcmanagerclient import utils from dcmanagerclient.commands.v1 import sw_update_manager -class SwPrestageManagerMixin(object): +class SwPrestageManagerMixin: """This Mixin provides the update manager used for sw prestage.""" def get_sw_update_manager(self): @@ -58,12 +58,11 @@ class CreateSwPrestageStrategy( "--force", required=False, action="store_true", - help="Skip checking the subcloud for \ - management affecting alarms. ", + help="Skip checking the subcloud for management affecting alarms.", ) def get_parser(self, prog_name): - parser = super(CreateSwPrestageStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--sysadmin-password", required=False, @@ -72,9 +71,11 @@ class CreateSwPrestageStrategy( parser.add_argument( "--release", required=False, - help="software release used to prestage the subcloud with. " - "If not specified, the current software release of " - "the subcloud will be used.", + help=( + "software release used to prestage the subcloud with. " + "If not specified, the current software release of " + "the subcloud will be used." + ), ) return parser diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_manager.py index 90467cf..611efac 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_manager.py @@ -128,7 +128,7 @@ class CreateSwUpdateStrategy(base.DCManagerShowOne): ) def get_parser(self, prog_name): - parser = super(CreateSwUpdateStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--subcloud-apply-type", @@ -201,7 +201,7 @@ class CreateSwUpdateStrategy(base.DCManagerShowOne): """Updates kwargs dictionary from parsed_args based on the subclass""" def _get_resources(self, parsed_args): - kwargs = dict() + kwargs = {} if parsed_args.subcloud_apply_type: kwargs["subcloud-apply-type"] = parsed_args.subcloud_apply_type if parsed_args.max_parallel_subclouds: @@ -305,7 +305,7 @@ class ShowSwUpdateStrategyStep(base.DCManagerShowOne): return detail_strategy_step_format def get_parser(self, prog_name): - parser = super(ShowSwUpdateStrategyStep, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("cloud_name", help="Name of cloud to view the details.") return parser diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_options_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_options_manager.py index b4d7712..877a4ea 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_options_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_update_options_manager.py @@ -85,7 +85,7 @@ class UpdateSwUpdateOptions(base.DCManagerShowOne): return options_detail_format def get_parser(self, prog_name): - parser = super(UpdateSwUpdateOptions, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--storage-apply-type", @@ -135,7 +135,7 @@ class UpdateSwUpdateOptions(base.DCManagerShowOne): def _get_resources(self, parsed_args): subcloud_ref = parsed_args.subcloud dcmanager_client = self.app.client_manager.sw_update_options_manager - kwargs = dict() + kwargs = {} kwargs["storage-apply-type"] = parsed_args.storage_apply_type kwargs["worker-apply-type"] = parsed_args.worker_apply_type kwargs["max-parallel-workers"] = parsed_args.max_parallel_workers @@ -161,7 +161,7 @@ class ListSwUpdateOptions(base.DCManagerLister): return options_list_format def get_parser(self, prog_name): - parser = super(ListSwUpdateOptions, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): @@ -176,7 +176,7 @@ class ShowSwUpdateOptions(base.DCManagerShowOne): return options_detail_format def get_parser(self, prog_name): - parser = super(ShowSwUpdateOptions, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "subcloud", @@ -199,7 +199,7 @@ class DeleteSwUpdateOptions(command.Command): """Delete per subcloud patch options.""" def get_parser(self, prog_name): - parser = super(DeleteSwUpdateOptions, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("subcloud", help="Subcloud name or id") diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/sw_upgrade_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/sw_upgrade_manager.py index af5ee9f..80a8b95 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/sw_upgrade_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/sw_upgrade_manager.py @@ -16,7 +16,7 @@ from dcmanagerclient.commands.v1 import sw_update_manager -class SwUpgradeManagerMixin(object): +class SwUpgradeManagerMixin: """This Mixin provides the update manager used for software upgrades.""" def get_sw_update_manager(self): @@ -41,7 +41,7 @@ class CreateSwUpgradeStrategy( ) def get_parser(self, prog_name): - parser = super(CreateSwUpgradeStrategy, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser diff --git a/distributedcloud-client/dcmanagerclient/commands/v1/system_peer_manager.py b/distributedcloud-client/dcmanagerclient/commands/v1/system_peer_manager.py index e7b2d8e..f7323c1 100644 --- a/distributedcloud-client/dcmanagerclient/commands/v1/system_peer_manager.py +++ b/distributedcloud-client/dcmanagerclient/commands/v1/system_peer_manager.py @@ -114,7 +114,7 @@ class AddSystemPeer(base.DCManagerShowOne): return detail_peer_format def get_parser(self, prog_name): - parser = super(AddSystemPeer, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "--peer-uuid", required=True, help="UUID of the new system peer." @@ -192,7 +192,7 @@ class AddSystemPeer(base.DCManagerShowOne): def _get_resources(self, parsed_args): dcmanager_client = self.app.client_manager.system_peer_manager - kwargs = dict() + kwargs = {} if parsed_args.peer_uuid is not None: kwargs["peer_uuid"] = parsed_args.peer_uuid @@ -252,7 +252,7 @@ class ListSystemPeer(base.DCManagerLister): return peer_format def get_parser(self, prog_name): - parser = super(ListSystemPeer, self).get_parser(prog_name) + parser = super().get_parser(prog_name) return parser def _get_resources(self, parsed_args): @@ -267,11 +267,13 @@ class ListSystemPeerSubcloudPeerGroups(base.DCManagerLister): return group_format def get_parser(self, prog_name): - parser = super(ListSystemPeerSubcloudPeerGroups, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "peer", - help="Name or ID or UUID of system peer to list \ - associated subcloud peer groups.", + help=( + "Name or ID or UUID of system peer to list " + "associated subcloud peer groups." + ), ) return parser @@ -290,7 +292,7 @@ class ShowSystemPeer(base.DCManagerShowOne): return detail_peer_format def get_parser(self, prog_name): - parser = super(ShowSystemPeer, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument( "peer", help="UUID or ID of system peer to view the details." @@ -310,7 +312,7 @@ class DeleteSystemPeer(command.Command): """Delete system peer details from the database.""" def get_parser(self, prog_name): - parser = super(DeleteSystemPeer, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("peer", help="UUID or ID of the system peer to delete.") return parser @@ -333,7 +335,7 @@ class UpdateSystemPeer(base.DCManagerShowOne): return detail_peer_format def get_parser(self, prog_name): - parser = super(UpdateSystemPeer, self).get_parser(prog_name) + parser = super().get_parser(prog_name) parser.add_argument("peer", help="UUID or ID of the system peer to update.") @@ -381,15 +383,16 @@ class UpdateSystemPeer(base.DCManagerShowOne): parser.add_argument( "--heartbeat-interval", required=False, - help="Interval between heartbeat messages (in seconds) (default \ - 60).", + help=("Interval between heartbeat messages (in seconds) (default 60)."), ) parser.add_argument( "--heartbeat-failure-threshold", required=False, - help="Consecutive heartbeat failures before failure declared \ - (default 3).", + help=( + "Consecutive heartbeat failures before failure declared " + "(default 3)." + ), ) parser.add_argument( @@ -402,8 +405,10 @@ class UpdateSystemPeer(base.DCManagerShowOne): parser.add_argument( "--heartbeat-maintenance-timeout", required=False, - help="Overall failure timeout during maintenance state (in \ - seconds) (default 600).", + help=( + "Overall failure timeout during maintenance state (in seconds) " + "(default 600)." + ), ) return parser @@ -411,7 +416,7 @@ class UpdateSystemPeer(base.DCManagerShowOne): def _get_resources(self, parsed_args): system_peer_ref = parsed_args.peer dcmanager_client = self.app.client_manager.system_peer_manager - kwargs = dict() + kwargs = {} if parsed_args.peer_uuid: kwargs["peer_uuid"] = parsed_args.peer_uuid if parsed_args.peer_name: diff --git a/distributedcloud-client/dcmanagerclient/exceptions.py b/distributedcloud-client/dcmanagerclient/exceptions.py index f198236..4fab2b4 100644 --- a/distributedcloud-client/dcmanagerclient/exceptions.py +++ b/distributedcloud-client/dcmanagerclient/exceptions.py @@ -30,9 +30,7 @@ class DCManagerClientException(Exception): def __init__(self, message=message): self.message = message - super(DCManagerClientException, self).__init__( - f"{self.code}: {self.message}" - ) + super().__init__(f"{self.code}: {self.message}") class IllegalArgumentException(DCManagerClientException): @@ -40,7 +38,7 @@ class IllegalArgumentException(DCManagerClientException): code = "ILLEGAL_ARGUMENT_EXCEPTION" def __init__(self, message=None): - super(IllegalArgumentException, self).__init__(message) + super().__init__(message) if message: self.message = message @@ -50,13 +48,13 @@ class CommandError(DCManagerClientException): code = "COMMAND_ERROR_EXCEPTION" def __init__(self, message=None): - super(CommandError, self).__init__(message) + super().__init__(message) if message: self.message = message class APIException(Exception): def __init__(self, error_code=None, error_message=None): - super(APIException, self).__init__(error_message) + super().__init__(error_message) self.error_code = error_code self.error_message = error_message diff --git a/distributedcloud-client/dcmanagerclient/shell.py b/distributedcloud-client/dcmanagerclient/shell.py index c8381fb..2232967 100644 --- a/distributedcloud-client/dcmanagerclient/shell.py +++ b/distributedcloud-client/dcmanagerclient/shell.py @@ -70,14 +70,12 @@ class OpenStackHelpFormatter(argparse.HelpFormatter): max_help_position=32, width=None, ): - super(OpenStackHelpFormatter, self).__init__( - prog, indent_increment, max_help_position, width - ) + super().__init__(prog, indent_increment, max_help_position, width) def start_section(self, heading): # Title-case the headings. heading = f"{heading[0].upper()}{heading[1:]}" - super(OpenStackHelpFormatter, self).start_section(heading) + super().start_section(heading) class HelpCommand(cliff_help.HelpCommand): @@ -149,7 +147,7 @@ class BashCompletionCommand(command.Command): class DCManagerShell(app.App): def __init__(self): - super(DCManagerShell, self).__init__( + super().__init__( description=__doc__.strip(), version=dcmanager_version, command_manager=commandmanager.CommandManager("dcmanager.cli"), @@ -404,14 +402,15 @@ class DCManagerShell(app.App): "--profile", dest="profile", metavar="HMAC_KEY", - help="HMAC key to use for encrypting context data for performance " - "profiling of operation. This key should be one of the " - "values configured for the osprofiler middleware in " - "dcmanager, it is specified in the profiler section of the " - "dcmanager configuration " - "(i.e. /etc/dcmanager/dcmanager.conf). " - "Without the key, profiling will not be triggered even if " - "osprofiler is enabled on the server side.", + help=( + "HMAC key to use for encrypting context data for performance " + "profiling of operation. This key should be one of the " + "values configured for the osprofiler middleware in " + "dcmanager, it is specified in the profiler section of the " + "dcmanager configuration (i.e. /etc/dcmanager/dcmanager.conf). " + "Without the key, profiling will not be triggered even if " + "osprofiler is enabled on the server side." + ), ) return parser @@ -493,25 +492,25 @@ class DCManagerShell(app.App): ClientManager = type( "ClientManager", (object,), - dict( - subcloud_manager=self.client, - subcloud_backup_manager=self.client, - subcloud_group_manager=self.client, - subcloud_deploy_manager=self.client, - system_peer_manager=self.client, - alarm_manager=self.client, - fw_update_manager=self.client, - sw_patch_manager=self.client, - strategy_step_manager=self.client, - sw_update_options_manager=self.client, - sw_upgrade_manager=self.client, - kube_upgrade_manager=self.client, - kube_rootca_update_manager=self.client, - sw_prestage_manager=self.client, - phased_subcloud_deploy_manager=self.client, - subcloud_peer_group_manager=self.client, - peer_group_association_manager=self.client, - ), + { + "subcloud_manager": self.client, + "subcloud_backup_manager": self.client, + "subcloud_group_manager": self.client, + "subcloud_deploy_manager": self.client, + "system_peer_manager": self.client, + "alarm_manager": self.client, + "fw_update_manager": self.client, + "sw_patch_manager": self.client, + "strategy_step_manager": self.client, + "sw_update_options_manager": self.client, + "sw_upgrade_manager": self.client, + "kube_upgrade_manager": self.client, + "kube_rootca_update_manager": self.client, + "sw_prestage_manager": self.client, + "phased_subcloud_deploy_manager": self.client, + "subcloud_peer_group_manager": self.client, + "peer_group_association_manager": self.client, + }, ) self.client_manager = ClientManager() diff --git a/distributedcloud-client/dcmanagerclient/tests/base.py b/distributedcloud-client/dcmanagerclient/tests/base.py index 56fa725..616f8f0 100644 --- a/distributedcloud-client/dcmanagerclient/tests/base.py +++ b/distributedcloud-client/dcmanagerclient/tests/base.py @@ -260,7 +260,7 @@ FAKE_INSTALL_VALUES = { } -class FakeResponse(object): +class FakeResponse: """Fake response for testing DC Manager Client.""" def __init__(self, status_code, content=None): @@ -316,7 +316,7 @@ class BaseClientTest(testtools.TestCase): class BaseCommandTest(testtools.TestCase): def setUp(self): - super(BaseCommandTest, self).setUp() + super().setUp() self.app = mock.Mock() self.client = self.app.client_manager.subcloud_manager self.parsed_args = None diff --git a/distributedcloud-client/dcmanagerclient/tests/test_httpclient.py b/distributedcloud-client/dcmanagerclient/tests/test_httpclient.py index 6658ccb..4a69907 100644 --- a/distributedcloud-client/dcmanagerclient/tests/test_httpclient.py +++ b/distributedcloud-client/dcmanagerclient/tests/test_httpclient.py @@ -19,10 +19,10 @@ import copy import uuid import mock -from osprofiler import _utils as osprofiler_utils import osprofiler.profiler import requests import testtools +from osprofiler import _utils as osprofiler_utils from dcmanagerclient.api import httpclient @@ -48,12 +48,12 @@ EXPECTED_REQ_OPTIONS = {"headers": EXPECTED_AUTH_HEADERS} EXPECTED_BODY = {"k1": "abc", "k2": 123, "k3": True} -class FakeRequest(object): +class FakeRequest: def __init__(self, method): self.method = method -class FakeResponse(object): +class FakeResponse: def __init__(self, method, url, status_code): self.request = FakeRequest(method) self.url = url @@ -63,7 +63,7 @@ class FakeResponse(object): class HTTPClientTest(testtools.TestCase): def setUp(self): - super(HTTPClientTest, self).setUp() + super().setUp() osprofiler.profiler.init(None) self.client = httpclient.HTTPClient( API_BASE_URL, AUTH_TOKEN, PROJECT_ID, USER_ID diff --git a/distributedcloud-client/dcmanagerclient/tests/test_utils.py b/distributedcloud-client/dcmanagerclient/tests/test_utils.py index 2b9aa1b..514fc8e 100644 --- a/distributedcloud-client/dcmanagerclient/tests/test_utils.py +++ b/distributedcloud-client/dcmanagerclient/tests/test_utils.py @@ -1,5 +1,5 @@ # Copyright 2015 - StackStorm, Inc. -# Copyright (c) 2017, 2019, 2021 Wind River Systems, Inc. +# Copyright (c) 2017, 2019, 2021, 2024 Wind River Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,13 +15,13 @@ # import json + import testtools import yaml from dcmanagerclient import utils - -ENV_DICT = {'k1': 'abc', 'k2': 123, 'k3': True} +ENV_DICT = {"k1": "abc", "k2": 123, "k3": True} ENV_STR = json.dumps(ENV_DICT) ENV_YAML = yaml.safe_dump(ENV_DICT, default_flow_style=False) @@ -29,10 +29,10 @@ ENV_YAML = yaml.safe_dump(ENV_DICT, default_flow_style=False) class UtilityTest(testtools.TestCase): def test_load_empty(self): - self.assertDictEqual(dict(), utils.load_content(None)) - self.assertDictEqual(dict(), utils.load_content('')) - self.assertDictEqual(dict(), utils.load_content('{}')) - self.assertListEqual(list(), utils.load_content('[]')) + self.assertDictEqual({}, utils.load_content(None)) + self.assertDictEqual({}, utils.load_content("")) + self.assertDictEqual({}, utils.load_content("{}")) + self.assertListEqual([], utils.load_content("[]")) def test_load_json_content(self): self.assertDictEqual(ENV_DICT, utils.load_content(ENV_STR)) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/mixins.py b/distributedcloud-client/dcmanagerclient/tests/v1/mixins.py index 36539a5..b693d9a 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/mixins.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/mixins.py @@ -6,7 +6,7 @@ from dcmanagerclient.tests.v1 import utils -class UpdateStrategyMixin(object): +class UpdateStrategyMixin: """Mixin for testing the different types of dcmanager update strategies. Used by concrete testsuites of strategy types such as patch, upgrade, etc.. diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_alarm_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_alarm_manager.py index 13866f2..0469863 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_alarm_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_alarm_manager.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -9,12 +9,12 @@ from dcmanagerclient.commands.v1 import alarm_manager as alarm_cmd from dcmanagerclient.tests import base FAKE_MANAGER = None -FAKE_NAME = 'subcloud1' -FAKE_CRITICAL = '0' -FAKE_MAJOR = '1' -FAKE_MINOR = '0' -FAKE_WARNINGS = '0' -FAKE_STATUS = 'degraded' +FAKE_NAME = "subcloud1" +FAKE_CRITICAL = "0" +FAKE_MAJOR = "1" +FAKE_MINOR = "0" +FAKE_WARNINGS = "0" +FAKE_STATUS = "degraded" ALARM_SUMMARY = AlarmSummary( FAKE_MANAGER, @@ -23,24 +23,30 @@ ALARM_SUMMARY = AlarmSummary( FAKE_MAJOR, FAKE_MINOR, FAKE_WARNINGS, - FAKE_STATUS + FAKE_STATUS, ) class TestCLIAlarmSummaryV1(base.BaseCommandTest): def setUp(self): - super(TestCLIAlarmSummaryV1, self).setUp() + super().setUp() # The client is the alarm_manager self.client = self.app.client_manager.alarm_manager def test_list_alarm_summary(self): self.client.alarm_manager.list_alarms.return_value = [ALARM_SUMMARY] actual_call = self.call(alarm_cmd.ListAlarmSummary) - self.assertEqual([(FAKE_NAME, - FAKE_CRITICAL, - FAKE_MAJOR, - FAKE_MINOR, - FAKE_WARNINGS, - FAKE_STATUS)], - actual_call[1]) + self.assertEqual( + [ + ( + FAKE_NAME, + FAKE_CRITICAL, + FAKE_MAJOR, + FAKE_MINOR, + FAKE_WARNINGS, + FAKE_STATUS, + ) + ], + actual_call[1], + ) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_fw_update_strategy.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_fw_update_strategy.py index 7aaa0bf..122c427 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_fw_update_strategy.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_fw_update_strategy.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2021 Wind River Systems, Inc. +# Copyright (c) 2020-2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -12,9 +12,10 @@ from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin class TestFwUpdateStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestFwUpdateStrategy, self).setUp() - self.sw_update_manager = \ + super().setUp() + self.sw_update_manager = ( self.app.client_manager.fw_update_manager.fw_update_manager + ) self.create_command = cli_cmd.CreateFwUpdateStrategy self.show_command = cli_cmd.ShowFwUpdateStrategy self.delete_command = cli_cmd.DeleteFwUpdateStrategy diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_rootca_update_strategy.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_rootca_update_strategy.py index 0725ae8..03a0510 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_rootca_update_strategy.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_rootca_update_strategy.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -12,9 +12,9 @@ from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin class TestKubeRootcaUpdateStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestKubeRootcaUpdateStrategy, self).setUp() - self.sw_update_manager = self.app.client_manager.\ - kube_rootca_update_manager.kube_rootca_update_manager + super().setUp() + self.sw_update_manager = self.app.client_manager.kube_rootca_update_manager.\ + kube_rootca_update_manager self.create_command = cli_cmd.CreateKubeRootcaUpdateStrategy self.show_command = cli_cmd.ShowKubeRootcaUpdateStrategy self.delete_command = cli_cmd.DeleteKubeRootcaUpdateStrategy diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_upgrade_strategy.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_upgrade_strategy.py index f5faea8..94f53ad 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_upgrade_strategy.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_kube_upgrade_strategy.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2021 Wind River Systems, Inc. +# Copyright (c) 2020-2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -12,9 +12,10 @@ from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin class TestKubeUpgradeStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestKubeUpgradeStrategy, self).setUp() - self.sw_update_manager = \ + super().setUp() + self.sw_update_manager = ( self.app.client_manager.kube_upgrade_manager.kube_upgrade_manager + ) self.create_command = cli_cmd.CreateKubeUpgradeStrategy self.show_command = cli_cmd.ShowKubeUpgradeStrategy self.delete_command = cli_cmd.DeleteKubeUpgradeStrategy diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_patch_update_strategy.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_patch_update_strategy.py index e940f5b..84d5fd6 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_patch_update_strategy.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_patch_update_strategy.py @@ -13,13 +13,14 @@ from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin class TestPatchUpdateStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestPatchUpdateStrategy, self).setUp() + super().setUp() # Increase results_length due to the 'upload only' field self.results_length += 1 - self.sw_update_manager = self.app.client_manager.sw_patch_manager.\ - sw_patch_manager + self.sw_update_manager = ( + self.app.client_manager.sw_patch_manager.sw_patch_manager + ) self.create_command = cli_cmd.CreatePatchUpdateStrategy self.show_command = cli_cmd.ShowPatchUpdateStrategy self.delete_command = cli_cmd.DeletePatchUpdateStrategy @@ -34,15 +35,16 @@ class TestPatchUpdateStrategy(UpdateStrategyMixin, base.BaseCommandTest): expected_strategy_type = manager_to_test.update_type # mock the result of the API call - strategy = utils.make_strategy(strategy_type=expected_strategy_type, - extra_args={"upload-only": True}) + strategy = utils.make_strategy( + strategy_type=expected_strategy_type, extra_args={"upload-only": True} + ) # mock that there is no pre-existing strategy manager_to_test.create_sw_update_strategy.return_value = strategy # invoke the backend method for the CLI. # Returns a tuple of field descriptions, and a second tuple of values - fields, results = self.call(self.create_command, ['--upload-only']) + fields, results = self.call(self.create_command, ["--upload-only"]) # results is a tuple of expected length self.assertEqual(len(results), self.results_length) @@ -57,5 +59,5 @@ class TestPatchUpdateStrategy(UpdateStrategyMixin, base.BaseCommandTest): # - updated_at self.assertEqual(results[0], expected_strategy_type) - self.assertEqual(fields[-4], 'upload only') + self.assertEqual(fields[-4], "upload only") self.assertEqual(results[-4], True) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_peer_group_association.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_peer_group_association.py index 810001f..bb723dd 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_peer_group_association.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_peer_group_association.py @@ -1,17 +1,20 @@ # -# Copyright (c) 2023 Wind River Systems, Inc. +# Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import copy + import mock from oslo_utils import timeutils -from dcmanagerclient.api.v1.peer_group_association_manager \ - import PeerGroupAssociation as PeerAssociation -from dcmanagerclient.commands.v1 import peer_group_association_manager \ - as peer_group_association_cmd +from dcmanagerclient.api.v1.peer_group_association_manager import ( + PeerGroupAssociation as PeerAssociation, +) +from dcmanagerclient.commands.v1 import ( + peer_group_association_manager as peer_group_association_cmd, +) from dcmanagerclient.tests import base PEER_GROUP_ASSOCIATION_ID = "1" @@ -36,134 +39,154 @@ PEER_GROUP_ASSOCIATION = PeerAssociation( SYNC_STATUS, SYNC_MESSAGE, CREATED_AT, - UPDATED_AT + UPDATED_AT, ) -PEER_GROUP_ASSOCIATION_TUPLE = (PEER_GROUP_ASSOCIATION_ID, - PEER_GROUP_ID, - SYSTEM_PEER_ID, - ASSOCIATION_TYPE, - SYNC_STATUS, - PG_GROUP_PRIORITY) +PEER_GROUP_ASSOCIATION_TUPLE = ( + PEER_GROUP_ASSOCIATION_ID, + PEER_GROUP_ID, + SYSTEM_PEER_ID, + ASSOCIATION_TYPE, + SYNC_STATUS, + PG_GROUP_PRIORITY, +) -PEER_GROUP_ASSOCIATION_DETAIL_TUPLE = \ - PEER_GROUP_ASSOCIATION_TUPLE + (SYNC_MESSAGE, CREATED_AT, UPDATED_AT) +PEER_GROUP_ASSOCIATION_DETAIL_TUPLE = PEER_GROUP_ASSOCIATION_TUPLE + ( + SYNC_MESSAGE, + CREATED_AT, + UPDATED_AT, +) -PEER_GROUP_ASSOCIATION_TUPLE_UPDATED = (PEER_GROUP_ASSOCIATION_ID, - PEER_GROUP_ID, - SYSTEM_PEER_ID, - ASSOCIATION_TYPE, - SYNC_STATUS, - PG_GROUP_PRIORITY_UPDATED, - SYNC_MESSAGE, - CREATED_AT, - UPDATED_AT) +PEER_GROUP_ASSOCIATION_TUPLE_UPDATED = ( + PEER_GROUP_ASSOCIATION_ID, + PEER_GROUP_ID, + SYSTEM_PEER_ID, + ASSOCIATION_TYPE, + SYNC_STATUS, + PG_GROUP_PRIORITY_UPDATED, + SYNC_MESSAGE, + CREATED_AT, + UPDATED_AT, +) class TestCLIPeerGroupAssociationV1(base.BaseCommandTest): def setUp(self): - super(TestCLIPeerGroupAssociationV1, self).setUp() + super().setUp() # The client is the peer_group_association_manager self.client = self.app.client_manager.peer_group_association_manager def test_list_peer_group_association(self): - self.client.peer_group_association_manager.\ - list_peer_group_associations.return_value =\ - [PEER_GROUP_ASSOCIATION] - actual_call = self.call(peer_group_association_cmd. - ListPeerGroupAssociation) - self.assertEqual([PEER_GROUP_ASSOCIATION_TUPLE], - actual_call[1]) + self.client.peer_group_association_manager.list_peer_group_associations.\ + return_value = [PEER_GROUP_ASSOCIATION] + actual_call = self.call(peer_group_association_cmd.ListPeerGroupAssociation) + self.assertEqual([PEER_GROUP_ASSOCIATION_TUPLE], actual_call[1]) def test_list_peer_group_association_empty(self): - self.client.peer_group_association_manager.\ - list_peer_group_associations.return_value = [] - actual_call = self.call(peer_group_association_cmd. - ListPeerGroupAssociation) - self.assertEqual((tuple('' for _ in range( - len(PEER_GROUP_ASSOCIATION_TUPLE))),), actual_call[1]) + self.client.peer_group_association_manager.list_peer_group_associations.\ + return_value = ([]) + actual_call = self.call(peer_group_association_cmd.ListPeerGroupAssociation) + self.assertEqual( + (tuple("" for _ in range(len(PEER_GROUP_ASSOCIATION_TUPLE))),), + actual_call[1], + ) def test_add_peer_group_association(self): self.client.peer_group_association_manager.add_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] actual_call = self.call( - peer_group_association_cmd.AddPeerGroupAssociation, app_args=[ - '--peer-group-id', PEER_GROUP_ID, - '--system-peer-id', SYSTEM_PEER_ID, - '--peer-group-priority', PG_GROUP_PRIORITY - ]) - self.assertEqual( - PEER_GROUP_ASSOCIATION_DETAIL_TUPLE, - actual_call[1]) + peer_group_association_cmd.AddPeerGroupAssociation, + app_args=[ + "--peer-group-id", + PEER_GROUP_ID, + "--system-peer-id", + SYSTEM_PEER_ID, + "--peer-group-priority", + PG_GROUP_PRIORITY, + ], + ) + self.assertEqual(PEER_GROUP_ASSOCIATION_DETAIL_TUPLE, actual_call[1]) def test_show_peer_group_association(self): - self.client.peer_group_association_manager.\ - peer_group_association_detail.return_value =\ - [PEER_GROUP_ASSOCIATION] - actual_call = self.call(peer_group_association_cmd. - ShowPeerGroupAssociation, - app_args=[PEER_GROUP_ASSOCIATION_ID]) - self.assertEqual((PEER_GROUP_ASSOCIATION_ID, - PEER_GROUP_ID, - SYSTEM_PEER_ID, - ASSOCIATION_TYPE, - SYNC_STATUS, - PG_GROUP_PRIORITY, - SYNC_MESSAGE, - CREATED_AT, - UPDATED_AT), actual_call[1]) + self.client.peer_group_association_manager.peer_group_association_detail.\ + return_value = [PEER_GROUP_ASSOCIATION] + actual_call = self.call( + peer_group_association_cmd.ShowPeerGroupAssociation, + app_args=[PEER_GROUP_ASSOCIATION_ID], + ) + self.assertEqual( + ( + PEER_GROUP_ASSOCIATION_ID, + PEER_GROUP_ID, + SYSTEM_PEER_ID, + ASSOCIATION_TYPE, + SYNC_STATUS, + PG_GROUP_PRIORITY, + SYNC_MESSAGE, + CREATED_AT, + UPDATED_AT, + ), + actual_call[1], + ) def test_show_peer_group_association_without_id(self): - self.client.peer_group_association_manager.\ - peer_group_association_detail.return_value = [] - self.assertRaises(SystemExit, self.call, - peer_group_association_cmd.ShowPeerGroupAssociation, - app_args=[]) + self.client.peer_group_association_manager.peer_group_association_detail.\ + return_value = ([]) + self.assertRaises( + SystemExit, + self.call, + peer_group_association_cmd.ShowPeerGroupAssociation, + app_args=[], + ) def test_delete_peer_group_association(self): - self.call(peer_group_association_cmd.DeletePeerGroupAssociation, - app_args=[PEER_GROUP_ASSOCIATION_ID]) - self.client.peer_group_association_manager. \ - delete_peer_group_association.\ + self.call( + peer_group_association_cmd.DeletePeerGroupAssociation, + app_args=[PEER_GROUP_ASSOCIATION_ID], + ) + self.client.peer_group_association_manager.delete_peer_group_association.\ assert_called_once_with(PEER_GROUP_ASSOCIATION_ID) def test_delete_peer_group_association_without_id(self): - self.assertRaises(SystemExit, self.call, - peer_group_association_cmd. - DeletePeerGroupAssociation, app_args=[]) + self.assertRaises( + SystemExit, + self.call, + peer_group_association_cmd.DeletePeerGroupAssociation, + app_args=[], + ) def test_update_peer_group_association(self): - UPDATED_PEER_GROUP_ASSOCIATION = copy.copy(PEER_GROUP_ASSOCIATION) - UPDATED_PEER_GROUP_ASSOCIATION.peer_group_priority =\ + updated_peed_group_association = copy.copy(PEER_GROUP_ASSOCIATION) + updated_peed_group_association.peer_group_priority = ( PG_GROUP_PRIORITY_UPDATED - self.client.peer_group_association_manager.\ - update_peer_group_association.\ - return_value = [UPDATED_PEER_GROUP_ASSOCIATION] + ) + self.client.peer_group_association_manager.update_peer_group_association.\ + return_value = [updated_peed_group_association] actual_call = self.call( peer_group_association_cmd.UpdatePeerGroupAssociation, - app_args=[PEER_GROUP_ASSOCIATION_ID, - '--peer-group-priority', PG_GROUP_PRIORITY_UPDATED]) - self.assertEqual( - (PEER_GROUP_ASSOCIATION_TUPLE_UPDATED), - actual_call[1]) + app_args=[ + PEER_GROUP_ASSOCIATION_ID, + "--peer-group-priority", + PG_GROUP_PRIORITY_UPDATED, + ], + ) + self.assertEqual((PEER_GROUP_ASSOCIATION_TUPLE_UPDATED), actual_call[1]) def test_update_peer_group_association_without_priority(self): - self.client.peer_group_association_manager.\ - update_peer_group_association.\ + self.client.peer_group_association_manager.update_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] - self.assertRaises(SystemExit, - self.call, - peer_group_association_cmd. - UpdatePeerGroupAssociation, - app_args=[PEER_GROUP_ID]) + self.assertRaises( + SystemExit, + self.call, + peer_group_association_cmd.UpdatePeerGroupAssociation, + app_args=[PEER_GROUP_ID], + ) def test_sync_peer_group_association(self): - self.client.peer_group_association_manager.\ - sync_peer_group_association.\ + self.client.peer_group_association_manager.sync_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] actual_call = self.call( peer_group_association_cmd.SyncPeerGroupAssociation, - app_args=[PEER_GROUP_ASSOCIATION_ID]) - self.assertEqual( - (PEER_GROUP_ASSOCIATION_DETAIL_TUPLE), - actual_call[1]) + app_args=[PEER_GROUP_ASSOCIATION_ID], + ) + self.assertEqual((PEER_GROUP_ASSOCIATION_DETAIL_TUPLE), actual_call[1]) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_deploy_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_deploy_manager.py index 7ae8b57..b01d789 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_deploy_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_deploy_manager.py @@ -18,8 +18,7 @@ import tempfile import mock from dcmanagerclient.api.v1 import subcloud_deploy_manager as sdm -from dcmanagerclient.commands.v1 import subcloud_deploy_manager as \ - subcloud_deploy_cmd +from dcmanagerclient.commands.v1 import subcloud_deploy_manager from dcmanagerclient.exceptions import DCManagerClientException from dcmanagerclient.tests import base @@ -79,7 +78,7 @@ SUBCLOUD_DEPLOY_NO_OVERRIDES_CHART = sdm.SubcloudDeploy( class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): def setUp(self): - super(TestCLISubcloudDeployManagerV1, self).setUp() + super().setUp() # The client is the subcloud_deploy_manager self.client = self.app.client_manager.subcloud_deploy_manager @@ -89,7 +88,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): ] # Without "--release" parameter - actual_call1 = self.call(subcloud_deploy_cmd.SubcloudDeployShow) + actual_call1 = self.call(subcloud_deploy_manager.SubcloudDeployShow) self.assertEqual( ( @@ -104,7 +103,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): # With "--release" parameter actual_call2 = self.call( - subcloud_deploy_cmd.SubcloudDeployShow, + subcloud_deploy_manager.SubcloudDeployShow, app_args=["--release", base.SOFTWARE_VERSION], ) @@ -133,7 +132,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): file_path_3 = os.path.abspath(f3.name) file_path_4 = os.path.abspath(f4.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-playbook", file_path_1, @@ -169,7 +168,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): file_path_2 = os.path.abspath(f2.name) file_path_3 = os.path.abspath(f3.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-playbook", file_path_1, @@ -199,7 +198,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): with tempfile.NamedTemporaryFile() as f1: file_path_1 = os.path.abspath(f1.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=["--prestage-images", file_path_1], ) self.assertEqual( @@ -219,7 +218,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): file_path_2 = os.path.abspath(f2.name) file_path_3 = os.path.abspath(f3.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-overrides", file_path_1, @@ -250,7 +249,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): file_path_1 = os.path.abspath(f1.name) file_path_2 = os.path.abspath(f2.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-chart", file_path_1, @@ -275,7 +274,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): file_path_1 = os.path.abspath(f1.name) file_path_2 = os.path.abspath(f2.name) actual_call = self.call( - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-playbook", file_path_1, @@ -311,7 +310,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): e = self.assertRaises( DCManagerClientException, self.call, - subcloud_deploy_cmd.SubcloudDeployUpload, + subcloud_deploy_manager.SubcloudDeployUpload, app_args=[ "--deploy-playbook", file_path_1, @@ -332,14 +331,14 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest): data = {"prestage_images": "False", "deployment_files": "False"} app_args = ["--release", release_version] - self.call(subcloud_deploy_cmd.SubcloudDeployDelete, app_args=app_args) + self.call(subcloud_deploy_manager.SubcloudDeployDelete, app_args=app_args) self.client.subcloud_deploy_manager.subcloud_deploy_delete.\ assert_called_once_with(release_version, data=data) def test_subcloud_deploy_delete_without_release(self): - self.call(subcloud_deploy_cmd.SubcloudDeployDelete) + self.call(subcloud_deploy_manager.SubcloudDeployDelete) data = {"prestage_images": "False", "deployment_files": "False"} self.client.subcloud_deploy_manager.subcloud_deploy_delete.\ assert_called_once_with(None, data=data) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_group_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_group_manager.py index 42bcf3f..ba31a20 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_group_manager.py @@ -1,5 +1,5 @@ # Copyright (c) 2017 Ericsson AB. -# Copyright (c) 2017-2023 Wind River Systems, Inc. +# Copyright (c) 2017-2024 Wind River Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,78 +15,74 @@ # import copy -import mock +import mock from oslo_utils import timeutils from dcmanagerclient.api.v1 import subcloud_group_manager as zm -from dcmanagerclient.commands.v1 \ - import subcloud_group_manager as subcloud_group_cmd +from dcmanagerclient.commands.v1 import subcloud_group_manager as subcloud_group_cmd from dcmanagerclient.tests import base - -ID = '2' -NAME = 'GroupX' -DESCRIPTION = 'Custom subcloud group' -APPLY_TYPE = 'parallel' +ID = "2" +NAME = "GroupX" +DESCRIPTION = "Custom subcloud group" +APPLY_TYPE = "parallel" MAX_PARALLEL_SUBCLOUDS = 3 TIME_NOW = timeutils.utcnow().isoformat() -NEW_DESCRIPTION = 'Slightly different subcloud group' +NEW_DESCRIPTION = "Slightly different subcloud group" SUBCLOUD_GROUP_DICT = { - 'GROUP_ID': ID, - 'NAME': NAME, - 'DESCRIPTION': DESCRIPTION, - 'APPLY_TYPE': APPLY_TYPE, - 'MAX_PARALLEL_SUBCLOUDS': MAX_PARALLEL_SUBCLOUDS, - 'CREATED_AT': TIME_NOW, - 'UPDATED_AT': TIME_NOW + "GROUP_ID": ID, + "NAME": NAME, + "DESCRIPTION": DESCRIPTION, + "APPLY_TYPE": APPLY_TYPE, + "MAX_PARALLEL_SUBCLOUDS": MAX_PARALLEL_SUBCLOUDS, + "CREATED_AT": TIME_NOW, + "UPDATED_AT": TIME_NOW, } SUBCLOUD_GROUP = zm.SubcloudGroup( mock, - group_id=SUBCLOUD_GROUP_DICT['GROUP_ID'], - name=SUBCLOUD_GROUP_DICT['NAME'], - description=SUBCLOUD_GROUP_DICT['DESCRIPTION'], - update_apply_type=SUBCLOUD_GROUP_DICT['APPLY_TYPE'], - max_parallel_subclouds=SUBCLOUD_GROUP_DICT['MAX_PARALLEL_SUBCLOUDS'], - created_at=SUBCLOUD_GROUP_DICT['CREATED_AT'], - updated_at=SUBCLOUD_GROUP_DICT['UPDATED_AT'] + group_id=SUBCLOUD_GROUP_DICT["GROUP_ID"], + name=SUBCLOUD_GROUP_DICT["NAME"], + description=SUBCLOUD_GROUP_DICT["DESCRIPTION"], + update_apply_type=SUBCLOUD_GROUP_DICT["APPLY_TYPE"], + max_parallel_subclouds=SUBCLOUD_GROUP_DICT["MAX_PARALLEL_SUBCLOUDS"], + created_at=SUBCLOUD_GROUP_DICT["CREATED_AT"], + updated_at=SUBCLOUD_GROUP_DICT["UPDATED_AT"], ) class TestCLISubcloudGroupManagerV1(base.BaseCommandTest): def setUp(self): - super(TestCLISubcloudGroupManagerV1, self).setUp() + super().setUp() # The client is the subcloud_group_manager self.client = self.app.client_manager.subcloud_group_manager def test_list_subcloud_groups(self): - self.client.subcloud_group_manager.\ - list_subcloud_groups.return_value = [SUBCLOUD_GROUP] + self.client.subcloud_group_manager.list_subcloud_groups.return_value = [ + SUBCLOUD_GROUP + ] actual_call = self.call(subcloud_group_cmd.ListSubcloudGroup) - self.assertEqual([(ID, NAME, DESCRIPTION)], - actual_call[1]) + self.assertEqual([(ID, NAME, DESCRIPTION)], actual_call[1]) def test_list_subcloud_groups_empty(self): - self.client.subcloud_group_manager.\ - list_subcloud_groups.return_value = [] + self.client.subcloud_group_manager.list_subcloud_groups.return_value = [] actual_call = self.call(subcloud_group_cmd.ListSubcloudGroup) - self.assertEqual((('', '', ''),), - actual_call[1]) + self.assertEqual((("", "", ""),), actual_call[1]) def test_list_subcloud_group_subclouds(self): self.client.subcloud_group_manager.\ - subcloud_group_list_subclouds.return_value = [ - base.SUBCLOUD_RESOURCE] - actual_call = self.call(subcloud_group_cmd.ListSubcloudGroupSubclouds, - app_args=[ID]) + subcloud_group_list_subclouds.return_value = [base.SUBCLOUD_RESOURCE] + actual_call = self.call( + subcloud_group_cmd.ListSubcloudGroupSubclouds, app_args=[ID] + ) self.client.subcloud_group_manager.subcloud_group_list_subclouds.\ assert_called_once_with(ID) - self.assertEqual([ - base.SUBCLOUD_FIELD_RESULT_LIST_WITH_PEERID], - actual_call[1]) + self.assertEqual( + [base.SUBCLOUD_FIELD_RESULT_LIST_WITH_PEERID], actual_call[1] + ) def test_delete_subcloud_group_by_id(self): self.call(subcloud_group_cmd.DeleteSubcloudGroup, app_args=[ID]) @@ -94,64 +90,90 @@ class TestCLISubcloudGroupManagerV1(base.BaseCommandTest): assert_called_once_with(ID) def test_delete_subcloud_group_without_id(self): - self.assertRaises(SystemExit, self.call, - subcloud_group_cmd.DeleteSubcloudGroup, app_args=[]) + self.assertRaises( + SystemExit, + self.call, + subcloud_group_cmd.DeleteSubcloudGroup, + app_args=[], + ) def test_show_subcloud_group_with_id(self): - self.client.subcloud_group_manager.subcloud_group_detail.\ - return_value = [SUBCLOUD_GROUP] - actual_call = self.call(subcloud_group_cmd.ShowSubcloudGroup, - app_args=[ID]) - self.assertEqual((ID, - NAME, - DESCRIPTION, - APPLY_TYPE, - MAX_PARALLEL_SUBCLOUDS, - TIME_NOW, - TIME_NOW), - actual_call[1]) + self.client.subcloud_group_manager.subcloud_group_detail.return_value = [ + SUBCLOUD_GROUP + ] + actual_call = self.call(subcloud_group_cmd.ShowSubcloudGroup, app_args=[ID]) + self.assertEqual( + ( + ID, + NAME, + DESCRIPTION, + APPLY_TYPE, + MAX_PARALLEL_SUBCLOUDS, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], + ) def test_show_subcloud_group_without_id(self): - self.client.subcloud_group_manager.subcloud_group_detail.\ - return_value = [] - actual_call = self.call(subcloud_group_cmd.ShowSubcloudGroup, - app_args=[ID]) - self.assertEqual((('', '', '', '', - '', '', ''),), - actual_call[1]) + self.client.subcloud_group_manager.subcloud_group_detail.return_value = [] + actual_call = self.call(subcloud_group_cmd.ShowSubcloudGroup, app_args=[ID]) + self.assertEqual( + ( + ( + "", + "", + "", + "", + "", + "", + "", + ), + ), + actual_call[1], + ) def test_add_subcloud_group(self): - self.client.subcloud_group_manager.add_subcloud_group.\ - return_value = [SUBCLOUD_GROUP] + self.client.subcloud_group_manager.add_subcloud_group.return_value = [ + SUBCLOUD_GROUP + ] actual_call = self.call( subcloud_group_cmd.AddSubcloudGroup, - app_args=['--name', NAME, - '--description', DESCRIPTION] + app_args=["--name", NAME, "--description", DESCRIPTION], + ) + self.assertEqual( + ( + ID, + NAME, + DESCRIPTION, + APPLY_TYPE, + MAX_PARALLEL_SUBCLOUDS, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], ) - self.assertEqual((ID, - NAME, - DESCRIPTION, - APPLY_TYPE, - MAX_PARALLEL_SUBCLOUDS, - TIME_NOW, - TIME_NOW), - actual_call[1]) def test_update_subcloud_group(self): - UPDATED_SUBCLOUD = copy.copy(SUBCLOUD_GROUP) - UPDATED_SUBCLOUD.description = NEW_DESCRIPTION - self.client.subcloud_group_manager.update_subcloud_group.\ - return_value = [UPDATED_SUBCLOUD] + updated_subloud = copy.copy(SUBCLOUD_GROUP) + updated_subloud.description = NEW_DESCRIPTION + self.client.subcloud_group_manager.update_subcloud_group.return_value = [ + updated_subloud + ] actual_call = self.call( subcloud_group_cmd.UpdateSubcloudGroup, - app_args=[SUBCLOUD_GROUP.group_id, - '--description', NEW_DESCRIPTION]) - self.assertEqual((ID, - NAME, - NEW_DESCRIPTION, - APPLY_TYPE, - MAX_PARALLEL_SUBCLOUDS, - TIME_NOW, - TIME_NOW), - actual_call[1]) + app_args=[SUBCLOUD_GROUP.group_id, "--description", NEW_DESCRIPTION], + ) + self.assertEqual( + ( + ID, + NAME, + NEW_DESCRIPTION, + APPLY_TYPE, + MAX_PARALLEL_SUBCLOUDS, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], + ) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_peer_group_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_peer_group_manager.py index e315e97..9808d7f 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_peer_group_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_peer_group_manager.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023 Wind River Systems, Inc. +# Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -6,10 +6,12 @@ import mock from oslo_utils import timeutils -from dcmanagerclient.api.v1.subcloud_peer_group_manager \ - import SubcloudPeerGroup as Peergroup -from dcmanagerclient.commands.v1 \ - import subcloud_peer_group_manager as subcloud_peer_group_cmd +from dcmanagerclient.api.v1.subcloud_peer_group_manager import ( + SubcloudPeerGroup as Peergroup, +) +from dcmanagerclient.commands.v1 import ( + subcloud_peer_group_manager as subcloud_peer_group_cmd, +) from dcmanagerclient.exceptions import DCManagerClientException from dcmanagerclient.tests import base @@ -34,77 +36,76 @@ SubcloudPeerGroup = Peergroup( PG_SYSTEM_LEADER_NAME, PG_MAX_SUBCLOUD_REHOMING, PG_CREATED_AT, - PG_UPDATED_AT + PG_UPDATED_AT, ) -PG_TUPLE = (PG_ID, - PG_NAME, - PG_GROUP_PRIORITY, - PG_GROUP_STATE, - PG_SYSTEM_LEADER_ID, - PG_SYSTEM_LEADER_NAME, - PG_MAX_SUBCLOUD_REHOMING, - ) +PG_TUPLE = ( + PG_ID, + PG_NAME, + PG_GROUP_PRIORITY, + PG_GROUP_STATE, + PG_SYSTEM_LEADER_ID, + PG_SYSTEM_LEADER_NAME, + PG_MAX_SUBCLOUD_REHOMING, +) PG_TUPLE_WITH_DATE = PG_TUPLE + (PG_CREATED_AT, PG_UPDATED_AT) class TestCLISubcloudPeerGroupManager(base.BaseCommandTest): def setUp(self): - super(TestCLISubcloudPeerGroupManager, self).setUp() + super().setUp() # The client is the subcloud_peer_group_manager self.client = self.app.client_manager.subcloud_peer_group_manager def test_list_subcloud_peer_groups(self): self.client.subcloud_peer_group_manager.\ - list_subcloud_peer_groups.return_value =\ - [SubcloudPeerGroup] + list_subcloud_peer_groups.return_value = [SubcloudPeerGroup] actual_call = self.call(subcloud_peer_group_cmd.ListSubcloudPeerGroup) - self.assertEqual([PG_TUPLE_WITH_DATE], - actual_call[1]) + self.assertEqual([PG_TUPLE_WITH_DATE], actual_call[1]) def test_show_subcloud_peer_group(self): self.client.subcloud_peer_group_manager.\ - subcloud_peer_group_detail.return_value =\ - [SubcloudPeerGroup] - actual_call = self.call(subcloud_peer_group_cmd.ShowSubcloudPeerGroup, - app_args=[PG_ID]) + subcloud_peer_group_detail.return_value = [SubcloudPeerGroup] + actual_call = self.call( + subcloud_peer_group_cmd.ShowSubcloudPeerGroup, app_args=[PG_ID] + ) self.assertEqual(PG_TUPLE_WITH_DATE, actual_call[1]) def test_list_subcloud_peer_group_subclouds(self): - self.client.subcloud_peer_group_manager.\ - subcloud_peer_group_list_subclouds.return_value = \ - [base.SUBCLOUD_RESOURCE_WITH_PEERID] + self.client.subcloud_peer_group_manager.subcloud_peer_group_list_subclouds.\ + return_value = [base.SUBCLOUD_RESOURCE_WITH_PEERID] actual_call = self.call( subcloud_peer_group_cmd.ListSubcloudPeerGroupSubclouds, - app_args=[base.ID]) - self.assertEqual([ - base.SUBCLOUD_FIELD_RESULT_LIST_WITH_PEERID], - actual_call[1]) + app_args=[base.ID], + ) + self.assertEqual( + [base.SUBCLOUD_FIELD_RESULT_LIST_WITH_PEERID], actual_call[1] + ) def test_add_subcloud_peer_group(self): self.client.subcloud_peer_group_manager.add_subcloud_peer_group.\ return_value = [SubcloudPeerGroup] actual_call1 = self.call( - subcloud_peer_group_cmd.AddSubcloudPeerGroup, app_args=[ - '--peer-group-name', PG_NAME - ]) + subcloud_peer_group_cmd.AddSubcloudPeerGroup, + app_args=["--peer-group-name", PG_NAME], + ) actual_call2 = self.call( - subcloud_peer_group_cmd.AddSubcloudPeerGroup, app_args=[ - '--peer-group-name', PG_NAME, - '--group-state', PG_GROUP_STATE, - '--max-subcloud-rehoming', PG_MAX_SUBCLOUD_REHOMING - ]) - self.assertEqual( - PG_TUPLE_WITH_DATE, - actual_call1[1]) - self.assertEqual( - PG_TUPLE_WITH_DATE, - actual_call2[1]) + subcloud_peer_group_cmd.AddSubcloudPeerGroup, + app_args=[ + "--peer-group-name", + PG_NAME, + "--group-state", + PG_GROUP_STATE, + "--max-subcloud-rehoming", + PG_MAX_SUBCLOUD_REHOMING, + ], + ) + self.assertEqual(PG_TUPLE_WITH_DATE, actual_call1[1]) + self.assertEqual(PG_TUPLE_WITH_DATE, actual_call2[1]) def test_delete_subcloud_peer_group(self): - self.call(subcloud_peer_group_cmd.DeleteSubcloudPeerGroup, - app_args=[PG_ID]) + self.call(subcloud_peer_group_cmd.DeleteSubcloudPeerGroup, app_args=[PG_ID]) self.client.subcloud_peer_group_manager.delete_subcloud_peer_group.\ assert_called_once_with(PG_ID) @@ -115,15 +116,20 @@ class TestCLISubcloudPeerGroupManager(base.BaseCommandTest): subcloud_peer_group_cmd.UpdateSubcloudPeerGroup, app_args=[ base.ID, - '--peer-group-name', PG_NAME, - '--group-state', PG_GROUP_STATE, - '--max-subcloud-rehoming', PG_MAX_SUBCLOUD_REHOMING]) - self.assertEqual( - (PG_TUPLE_WITH_DATE), - actual_call[1]) + "--peer-group-name", + PG_NAME, + "--group-state", + PG_GROUP_STATE, + "--max-subcloud-rehoming", + PG_MAX_SUBCLOUD_REHOMING, + ], + ) + self.assertEqual((PG_TUPLE_WITH_DATE), actual_call[1]) - e = self.assertRaises(DCManagerClientException, - self.call, - subcloud_peer_group_cmd.UpdateSubcloudPeerGroup, - app_args=[base.ID]) - self.assertTrue('Nothing to update' in str(e)) + e = self.assertRaises( + DCManagerClientException, + self.call, + subcloud_peer_group_cmd.UpdateSubcloudPeerGroup, + app_args=[base.ID], + ) + self.assertTrue("Nothing to update" in str(e)) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_prestage_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_prestage_manager.py index 69e7e75..820bc45 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_prestage_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_prestage_manager.py @@ -1,16 +1,17 @@ # -# Copyright (c) 2022-2023 Wind River Systems, Inc. +# Copyright (c) 2022-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import getpass + import mock from dcmanagerclient.commands.v1 import sw_prestage_manager as cli_cmd from dcmanagerclient.tests import base -from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin from dcmanagerclient.tests.v1 import utils +from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin FAKE_RELEASE = "21.12" @@ -18,13 +19,14 @@ FAKE_RELEASE = "21.12" class TestSwPrestageStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestSwPrestageStrategy, self).setUp() - self.sw_update_manager = \ + super().setUp() + self.sw_update_manager = ( self.app.client_manager.sw_prestage_manager.sw_prestage_manager + ) - p = mock.patch.object(getpass, 'getpass') + p = mock.patch.object(getpass, "getpass") self.mock_prompt = p.start() - self.mock_prompt.return_value = str('testpassword') + self.mock_prompt.return_value = str("testpassword") self.addCleanup(p.stop) self.create_command = cli_cmd.CreateSwPrestageStrategy @@ -41,17 +43,17 @@ class TestSwPrestageStrategy(UpdateStrategyMixin, base.BaseCommandTest): expected_strategy_type = manager_to_test.update_type # mock the result of the API call - strategy = utils.make_strategy(strategy_type=expected_strategy_type, - extra_args={"prestage-software-version": - FAKE_RELEASE}) + strategy = utils.make_strategy( + strategy_type=expected_strategy_type, + extra_args={"prestage-software-version": FAKE_RELEASE}, + ) # mock that there is no pre-existing strategy manager_to_test.create_sw_update_strategy.return_value = strategy # invoke the backend method for the CLI. # Returns a tuple of field descriptions, and a second tuple of values - fields, results = self.call(self.create_command, - ['--release', FAKE_RELEASE]) + fields, results = self.call(self.create_command, ["--release", FAKE_RELEASE]) # results is a tuple of expected length self.assertEqual(len(results), self.results_length + 1) @@ -66,5 +68,5 @@ class TestSwPrestageStrategy(UpdateStrategyMixin, base.BaseCommandTest): # - updated_at self.assertEqual(results[0], expected_strategy_type) - self.assertEqual(fields[-4], 'prestage software version') + self.assertEqual(fields[-4], "prestage software version") self.assertEqual(results[-4], FAKE_RELEASE) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_manager.py index e380082..097aa3b 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_manager.py @@ -1,20 +1,18 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -from dcmanagerclient.api.v1.strategy_step_manager \ - import StrategyStep -from dcmanagerclient.commands.v1 \ - import sw_update_manager as sw_update_cmd +from dcmanagerclient.api.v1.strategy_step_manager import StrategyStep +from dcmanagerclient.commands.v1 import sw_update_manager as sw_update_cmd from dcmanagerclient.tests import base FAKE_MANAGER = None -FAKE_CLOUD = 'subcloud1' -FAKE_STAGE = '1' -FAKE_STATE = 'initial' -FAKE_DETAILS = '' +FAKE_CLOUD = "subcloud1" +FAKE_STAGE = "1" +FAKE_STATE = "initial" +FAKE_DETAILS = "" FAKE_STARTED_AT = None FAKE_FINISHED_AT = None FAKE_CREATED_AT = None @@ -29,30 +27,32 @@ STRATEGY_STEP = StrategyStep( FAKE_STARTED_AT, FAKE_FINISHED_AT, FAKE_CREATED_AT, - FAKE_UPDATED_AT + FAKE_UPDATED_AT, ) class TestCLISWUpdateManagerV1(base.BaseCommandTest): def setUp(self): - super(TestCLISWUpdateManagerV1, self).setUp() + super().setUp() self.client = self.app.client_manager.strategy_step_manager def test_show_sw_update_strategy_step(self): - results = list() + results = [] results.append(STRATEGY_STEP) - self.client.strategy_step_manager.\ - strategy_step_detail.return_value = results + self.client.strategy_step_manager.strategy_step_detail.return_value = results actual_call = self.call( - sw_update_cmd.ShowSwUpdateStrategyStep, - app_args=[FAKE_CLOUD] + sw_update_cmd.ShowSwUpdateStrategyStep, app_args=[FAKE_CLOUD] + ) + self.assertEqual( + ( + FAKE_CLOUD, + FAKE_STAGE, + FAKE_STATE, + FAKE_DETAILS, + FAKE_STARTED_AT, + FAKE_FINISHED_AT, + FAKE_CREATED_AT, + FAKE_UPDATED_AT, + ), + actual_call[1], ) - self.assertEqual((FAKE_CLOUD, - FAKE_STAGE, - FAKE_STATE, - FAKE_DETAILS, - FAKE_STARTED_AT, - FAKE_FINISHED_AT, - FAKE_CREATED_AT, - FAKE_UPDATED_AT), - actual_call[1]) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_options_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_options_manager.py index c73583f..21dd699 100755 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_options_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_update_options_manager.py @@ -1,23 +1,23 @@ # -# Copyright (c) 2021 Wind River Systems, Inc. +# Copyright (c) 2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # -from dcmanagerclient.api.v1.sw_update_options_manager \ - import SwUpdateOptions -from dcmanagerclient.commands.v1 \ - import sw_update_options_manager as sw_update_options_cmd +from dcmanagerclient.api.v1.sw_update_options_manager import SwUpdateOptions +from dcmanagerclient.commands.v1 import ( + sw_update_options_manager as sw_update_options_cmd, +) from dcmanagerclient.tests import base FAKE_MANAGER = None -FAKE_CLOUD = '1' +FAKE_CLOUD = "1" FAKE_STORAGE_APPLY_TYPE = "parallel" NEW_FAKE_STORAGE_APPLY_TYPE = "serial" FAKE_WORKER_APPLY_TYPE = "serial" -FAKE_MAX_PARALLEL = '5' -FAKE_ALARM_RESTRICTIONS = 'strict' -FAKE_DEFAULT_INSTANCE_ACTION = 'migrate' +FAKE_MAX_PARALLEL = "5" +FAKE_ALARM_RESTRICTIONS = "strict" +FAKE_DEFAULT_INSTANCE_ACTION = "migrate" FAKE_CREATED_AT = None FAKE_UPDATED_AT = None @@ -30,108 +30,132 @@ SW_UPDATE_OPTION = SwUpdateOptions( FAKE_ALARM_RESTRICTIONS, FAKE_DEFAULT_INSTANCE_ACTION, FAKE_CREATED_AT, - FAKE_UPDATED_AT + FAKE_UPDATED_AT, +) + +UPDATED_SW_UPDATE_OPTION = SwUpdateOptions( + FAKE_MANAGER, + FAKE_CLOUD, + NEW_FAKE_STORAGE_APPLY_TYPE, + FAKE_WORKER_APPLY_TYPE, + FAKE_MAX_PARALLEL, + FAKE_ALARM_RESTRICTIONS, + FAKE_DEFAULT_INSTANCE_ACTION, + FAKE_CREATED_AT, + FAKE_UPDATED_AT, ) class TestCLISWUpdateOptionsManagerV1(base.BaseCommandTest): def setUp(self): - super(TestCLISWUpdateOptionsManagerV1, self).setUp() + super().setUp() # The client is the subcloud_group_manager self.client = self.app.client_manager.sw_update_options_manager def test_list_sw_update_options(self): - self.client.sw_update_options_manager.\ - sw_update_options_list.return_value = [SW_UPDATE_OPTION] + self.client.sw_update_options_manager.sw_update_options_list.return_value = [ + SW_UPDATE_OPTION + ] actual_call = self.call(sw_update_options_cmd.ListSwUpdateOptions) - self.assertEqual([(FAKE_CLOUD, - FAKE_STORAGE_APPLY_TYPE, - FAKE_WORKER_APPLY_TYPE, - FAKE_MAX_PARALLEL, - FAKE_ALARM_RESTRICTIONS, - FAKE_DEFAULT_INSTANCE_ACTION)], - actual_call[1]) + self.assertEqual( + [ + ( + FAKE_CLOUD, + FAKE_STORAGE_APPLY_TYPE, + FAKE_WORKER_APPLY_TYPE, + FAKE_MAX_PARALLEL, + FAKE_ALARM_RESTRICTIONS, + FAKE_DEFAULT_INSTANCE_ACTION, + ) + ], + actual_call[1], + ) def test_list_sw_update_options_empty(self): - self.client.sw_update_options_manager.\ - sw_update_options_list.return_value = [] + self.client.sw_update_options_manager.sw_update_options_list.return_value = ( + [] + ) actual_call = self.call(sw_update_options_cmd.ListSwUpdateOptions) - self.assertEqual((('', '', '', - '', '', ''),), - actual_call[1]) + self.assertEqual( + (("", "", "", "", "", ""),), + actual_call[1], + ) def test_show_sw_update_options_default(self): - self.client.sw_update_options_manager.\ - sw_update_options_detail.return_value = [SW_UPDATE_OPTION] + self.client.sw_update_options_manager.sw_update_options_detail.\ + return_value = [SW_UPDATE_OPTION] actual_call = self.call(sw_update_options_cmd.ShowSwUpdateOptions) - self.assertEqual((FAKE_CLOUD, - FAKE_STORAGE_APPLY_TYPE, - FAKE_WORKER_APPLY_TYPE, - FAKE_MAX_PARALLEL, - FAKE_ALARM_RESTRICTIONS, - FAKE_DEFAULT_INSTANCE_ACTION, - FAKE_CREATED_AT, - FAKE_UPDATED_AT), - actual_call[1]) + self.assertEqual( + ( + FAKE_CLOUD, + FAKE_STORAGE_APPLY_TYPE, + FAKE_WORKER_APPLY_TYPE, + FAKE_MAX_PARALLEL, + FAKE_ALARM_RESTRICTIONS, + FAKE_DEFAULT_INSTANCE_ACTION, + FAKE_CREATED_AT, + FAKE_UPDATED_AT, + ), + actual_call[1], + ) def test_show_sw_update_options_by_ref(self): - self.client.sw_update_options_manager.\ - sw_update_options_detail.return_value = [SW_UPDATE_OPTION] - actual_call = self.call(sw_update_options_cmd.ShowSwUpdateOptions, - app_args=[FAKE_CLOUD]) - self.assertEqual((FAKE_CLOUD, - FAKE_STORAGE_APPLY_TYPE, - FAKE_WORKER_APPLY_TYPE, - FAKE_MAX_PARALLEL, - FAKE_ALARM_RESTRICTIONS, - FAKE_DEFAULT_INSTANCE_ACTION, - FAKE_CREATED_AT, - FAKE_UPDATED_AT), - actual_call[1]) + self.client.sw_update_options_manager.sw_update_options_detail.\ + return_value = [SW_UPDATE_OPTION] + actual_call = self.call( + sw_update_options_cmd.ShowSwUpdateOptions, app_args=[FAKE_CLOUD] + ) + self.assertEqual( + ( + FAKE_CLOUD, + FAKE_STORAGE_APPLY_TYPE, + FAKE_WORKER_APPLY_TYPE, + FAKE_MAX_PARALLEL, + FAKE_ALARM_RESTRICTIONS, + FAKE_DEFAULT_INSTANCE_ACTION, + FAKE_CREATED_AT, + FAKE_UPDATED_AT, + ), + actual_call[1], + ) def test_update_sw_update_options(self): - UPDATED_SW_UPDATE_OPTION = SwUpdateOptions( - FAKE_MANAGER, - FAKE_CLOUD, - NEW_FAKE_STORAGE_APPLY_TYPE, - FAKE_WORKER_APPLY_TYPE, - FAKE_MAX_PARALLEL, - FAKE_ALARM_RESTRICTIONS, - FAKE_DEFAULT_INSTANCE_ACTION, - FAKE_CREATED_AT, - FAKE_UPDATED_AT - ) - self.client.sw_update_options_manager.\ - sw_update_options_update.return_value = [UPDATED_SW_UPDATE_OPTION] - print(UPDATED_SW_UPDATE_OPTION.storage_apply_type) + self.client.sw_update_options_manager.sw_update_options_update.\ + return_value = [UPDATED_SW_UPDATE_OPTION] actual_call = self.call( sw_update_options_cmd.UpdateSwUpdateOptions, app_args=[ - '--storage-apply-type=' + NEW_FAKE_STORAGE_APPLY_TYPE, - '--worker-apply-type=' + FAKE_WORKER_APPLY_TYPE, - '--max-parallel-workers=' + FAKE_MAX_PARALLEL, - '--alarm-restriction-type=' + FAKE_ALARM_RESTRICTIONS, - '--default-instance-action=' + FAKE_DEFAULT_INSTANCE_ACTION - ]) - self.assertEqual((FAKE_CLOUD, - NEW_FAKE_STORAGE_APPLY_TYPE, - FAKE_WORKER_APPLY_TYPE, - FAKE_MAX_PARALLEL, - FAKE_ALARM_RESTRICTIONS, - FAKE_DEFAULT_INSTANCE_ACTION, - FAKE_CREATED_AT, - FAKE_UPDATED_AT), - actual_call[1]) + "--storage-apply-type=" + NEW_FAKE_STORAGE_APPLY_TYPE, + "--worker-apply-type=" + FAKE_WORKER_APPLY_TYPE, + "--max-parallel-workers=" + FAKE_MAX_PARALLEL, + "--alarm-restriction-type=" + FAKE_ALARM_RESTRICTIONS, + "--default-instance-action=" + FAKE_DEFAULT_INSTANCE_ACTION, + ], + ) + self.assertEqual( + ( + FAKE_CLOUD, + NEW_FAKE_STORAGE_APPLY_TYPE, + FAKE_WORKER_APPLY_TYPE, + FAKE_MAX_PARALLEL, + FAKE_ALARM_RESTRICTIONS, + FAKE_DEFAULT_INSTANCE_ACTION, + FAKE_CREATED_AT, + FAKE_UPDATED_AT, + ), + actual_call[1], + ) def test_delete_sw_update_options_by_ref(self): - self.call(sw_update_options_cmd.DeleteSwUpdateOptions, - app_args=[FAKE_CLOUD]) - self.client.sw_update_options_manager.\ - sw_update_options_delete.assert_called_once_with(FAKE_CLOUD) + self.call(sw_update_options_cmd.DeleteSwUpdateOptions, app_args=[FAKE_CLOUD]) + self.client.sw_update_options_manager.sw_update_options_delete.\ + assert_called_once_with(FAKE_CLOUD) def test_delete_sw_update_options_without_ref(self): - self.assertRaises(SystemExit, - self.call, - sw_update_options_cmd.DeleteSwUpdateOptions, - app_args=[]) + self.assertRaises( + SystemExit, + self.call, + sw_update_options_cmd.DeleteSwUpdateOptions, + app_args=[], + ) diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_upgrade_strategy.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_upgrade_strategy.py index 8ae4453..6a89763 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_upgrade_strategy.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_sw_upgrade_strategy.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2021 Wind River Systems, Inc. +# Copyright (c) 2020-2021, 2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -12,9 +12,10 @@ from dcmanagerclient.tests.v1.mixins import UpdateStrategyMixin class TestSwUpgradeStrategy(UpdateStrategyMixin, base.BaseCommandTest): def setUp(self): - super(TestSwUpgradeStrategy, self).setUp() - self.sw_update_manager = \ + super().setUp() + self.sw_update_manager = ( self.app.client_manager.sw_upgrade_manager.sw_upgrade_manager + ) self.create_command = cli_cmd.CreateSwUpgradeStrategy self.show_command = cli_cmd.ShowSwUpgradeStrategy self.delete_command = cli_cmd.DeleteSwUpgradeStrategy diff --git a/distributedcloud-client/dcmanagerclient/tests/v1/test_system_peer_manager.py b/distributedcloud-client/dcmanagerclient/tests/v1/test_system_peer_manager.py index bead0b9..b387afc 100644 --- a/distributedcloud-client/dcmanagerclient/tests/v1/test_system_peer_manager.py +++ b/distributedcloud-client/dcmanagerclient/tests/v1/test_system_peer_manager.py @@ -1,37 +1,37 @@ # -# Copyright (c) 2023 Wind River Systems, Inc. +# Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import copy -import mock +import mock from oslo_utils import timeutils -from dcmanagerclient.api.v1.subcloud_peer_group_manager \ - import SubcloudPeerGroup as Peergroup from dcmanagerclient.api.v1 import system_peer_manager as spm -from dcmanagerclient.commands.v1 \ - import system_peer_manager as system_peer_cmd +from dcmanagerclient.api.v1.subcloud_peer_group_manager import ( + SubcloudPeerGroup as Peergroup, +) +from dcmanagerclient.commands.v1 import system_peer_manager as system_peer_cmd from dcmanagerclient.tests import base # Sample System Peer data -ID = '2' -SYSTEM_PEER_UUID = 'test1234-0dfd-46cd-9a93-e3c2b74ef20f' -SYSTEM_PEER_NAME = 'SystemPeer1' -MANAGER_ENDPOINT = 'http://127.0.0.1:5000' -MANAGER_USERNAME = 'admin' -MANAGER_PASSWORD = 'password' -ADMINISTRATIVE_STATE = 'enabled' +ID = "2" +SYSTEM_PEER_UUID = "test1234-0dfd-46cd-9a93-e3c2b74ef20f" +SYSTEM_PEER_NAME = "SystemPeer1" +MANAGER_ENDPOINT = "http://127.0.0.1:5000" +MANAGER_USERNAME = "admin" +MANAGER_PASSWORD = "password" +ADMINISTRATIVE_STATE = "enabled" HEARTBEAT_INTERVAL = 10 HEARTBEAT_FAILURE_THRESHOLD = 3 -HEARTBEAT_FAILURES_POLICY = 'alarm' +HEARTBEAT_FAILURES_POLICY = "alarm" HEARTBEAT_MAINTENANCE_TIMEOUT = 600 -AVAILABILITY_STATE = 'available' -PEER_CONTROLLER_GATEWAY_IP = '128.128.128.1' +AVAILABILITY_STATE = "available" +PEER_CONTROLLER_GATEWAY_IP = "128.128.128.1" TIME_NOW = timeutils.utcnow().isoformat() -NEW_PEER_CONTROLLER_GATEWAY_IP = '128.1.1.1' +NEW_PEER_CONTROLLER_GATEWAY_IP = "128.1.1.1" # Sample Subcloud Peer Group data PG_ID = "1" @@ -45,42 +45,39 @@ PG_CREATED_AT = TIME_NOW PG_UPDATED_AT = TIME_NOW SYSTEM_PEER_DICT = { - 'PEER_ID': ID, - 'PEER_UUID': SYSTEM_PEER_UUID, - 'PEER_NAME': SYSTEM_PEER_NAME, - 'MANAGER_ENDPOINT': MANAGER_ENDPOINT, - 'MANAGER_USERNAME': MANAGER_USERNAME, - 'ADMINISTRATIVE_STATE': ADMINISTRATIVE_STATE, - 'HEARTBEAT_INTERVAL': HEARTBEAT_INTERVAL, - 'HEARTBEAT_FAILURE_THRESHOLD': HEARTBEAT_FAILURE_THRESHOLD, - 'HEARTBEAT_FAILURE_POLICY': HEARTBEAT_FAILURES_POLICY, - 'HEARTBEAT_MAINTENANCE_TIMEOUT': HEARTBEAT_MAINTENANCE_TIMEOUT, - 'AVAILABILITY_STATE': AVAILABILITY_STATE, - 'PEER_CONTROLLER_GATEWAY_IP': PEER_CONTROLLER_GATEWAY_IP, - 'CREATED_AT': TIME_NOW, - 'UPDATED_AT': TIME_NOW + "PEER_ID": ID, + "PEER_UUID": SYSTEM_PEER_UUID, + "PEER_NAME": SYSTEM_PEER_NAME, + "MANAGER_ENDPOINT": MANAGER_ENDPOINT, + "MANAGER_USERNAME": MANAGER_USERNAME, + "ADMINISTRATIVE_STATE": ADMINISTRATIVE_STATE, + "HEARTBEAT_INTERVAL": HEARTBEAT_INTERVAL, + "HEARTBEAT_FAILURE_THRESHOLD": HEARTBEAT_FAILURE_THRESHOLD, + "HEARTBEAT_FAILURE_POLICY": HEARTBEAT_FAILURES_POLICY, + "HEARTBEAT_MAINTENANCE_TIMEOUT": HEARTBEAT_MAINTENANCE_TIMEOUT, + "AVAILABILITY_STATE": AVAILABILITY_STATE, + "PEER_CONTROLLER_GATEWAY_IP": PEER_CONTROLLER_GATEWAY_IP, + "CREATED_AT": TIME_NOW, + "UPDATED_AT": TIME_NOW, } # System Peer CLI resource object SYSTEM_PEER = spm.SystemPeer( mock, - peer_id=SYSTEM_PEER_DICT['PEER_ID'], - peer_name=SYSTEM_PEER_DICT['PEER_NAME'], - peer_uuid=SYSTEM_PEER_DICT['PEER_UUID'], - manager_endpoint=SYSTEM_PEER_DICT['MANAGER_ENDPOINT'], - manager_username=SYSTEM_PEER_DICT['MANAGER_USERNAME'], - administrative_state=SYSTEM_PEER_DICT['ADMINISTRATIVE_STATE'], - heartbeat_interval=SYSTEM_PEER_DICT['HEARTBEAT_INTERVAL'], - heartbeat_failure_threshold=SYSTEM_PEER_DICT[ - 'HEARTBEAT_FAILURE_THRESHOLD'], - heartbeat_failure_policy=SYSTEM_PEER_DICT['HEARTBEAT_FAILURE_POLICY'], - heartbeat_maintenance_timeout=SYSTEM_PEER_DICT[ - 'HEARTBEAT_MAINTENANCE_TIMEOUT'], - availability_state=SYSTEM_PEER_DICT['AVAILABILITY_STATE'], - peer_controller_gateway_address=SYSTEM_PEER_DICT[ - 'PEER_CONTROLLER_GATEWAY_IP'], - created_at=SYSTEM_PEER_DICT['CREATED_AT'], - updated_at=SYSTEM_PEER_DICT['UPDATED_AT'] + peer_id=SYSTEM_PEER_DICT["PEER_ID"], + peer_name=SYSTEM_PEER_DICT["PEER_NAME"], + peer_uuid=SYSTEM_PEER_DICT["PEER_UUID"], + manager_endpoint=SYSTEM_PEER_DICT["MANAGER_ENDPOINT"], + manager_username=SYSTEM_PEER_DICT["MANAGER_USERNAME"], + administrative_state=SYSTEM_PEER_DICT["ADMINISTRATIVE_STATE"], + heartbeat_interval=SYSTEM_PEER_DICT["HEARTBEAT_INTERVAL"], + heartbeat_failure_threshold=SYSTEM_PEER_DICT["HEARTBEAT_FAILURE_THRESHOLD"], + heartbeat_failure_policy=SYSTEM_PEER_DICT["HEARTBEAT_FAILURE_POLICY"], + heartbeat_maintenance_timeout=SYSTEM_PEER_DICT["HEARTBEAT_MAINTENANCE_TIMEOUT"], + availability_state=SYSTEM_PEER_DICT["AVAILABILITY_STATE"], + peer_controller_gateway_address=SYSTEM_PEER_DICT["PEER_CONTROLLER_GATEWAY_IP"], + created_at=SYSTEM_PEER_DICT["CREATED_AT"], + updated_at=SYSTEM_PEER_DICT["UPDATED_AT"], ) # Subcloud Peer Group CLI resource object @@ -94,146 +91,181 @@ PEER_GROUP = Peergroup( PG_SYSTEM_LEADER_NAME, PG_MAX_SUBCLOUD_REHOMING, PG_CREATED_AT, - PG_UPDATED_AT + PG_UPDATED_AT, +) +PG_TUPLE = ( + PG_ID, + PG_NAME, + PG_GROUP_PRIORITY, + PG_GROUP_STATE, + PG_SYSTEM_LEADER_ID, + PG_SYSTEM_LEADER_NAME, + PG_MAX_SUBCLOUD_REHOMING, ) -PG_TUPLE = (PG_ID, - PG_NAME, - PG_GROUP_PRIORITY, - PG_GROUP_STATE, - PG_SYSTEM_LEADER_ID, - PG_SYSTEM_LEADER_NAME, - PG_MAX_SUBCLOUD_REHOMING) class TestCLISystemPeerManagerV1(base.BaseCommandTest): def setUp(self): - super(TestCLISystemPeerManagerV1, self).setUp() + super().setUp() # The client is the system_peer_manager self.client = self.app.client_manager.system_peer_manager def test_list_system_peers(self): - self.client.system_peer_manager.\ - list_system_peers.return_value = [SYSTEM_PEER] + self.client.system_peer_manager.list_system_peers.return_value = [ + SYSTEM_PEER + ] actual_call = self.call(system_peer_cmd.ListSystemPeer) - self.assertEqual([(ID, SYSTEM_PEER_UUID, SYSTEM_PEER_NAME, - MANAGER_ENDPOINT, PEER_CONTROLLER_GATEWAY_IP)], - actual_call[1]) + self.assertEqual( + [ + ( + ID, + SYSTEM_PEER_UUID, + SYSTEM_PEER_NAME, + MANAGER_ENDPOINT, + PEER_CONTROLLER_GATEWAY_IP, + ) + ], + actual_call[1], + ) def test_list_system_peers_empty(self): - self.client.system_peer_manager.\ - list_system_peers.return_value = [] + self.client.system_peer_manager.list_system_peers.return_value = [] actual_call = self.call(system_peer_cmd.ListSystemPeer) - self.assertEqual((tuple('' for _ in range(5)),), - actual_call[1]) + self.assertEqual((tuple("" for _ in range(5)),), actual_call[1]) def test_delete_system_peer_by_id(self): self.call(system_peer_cmd.DeleteSystemPeer, app_args=[ID]) - self.client.system_peer_manager.delete_system_peer.\ - assert_called_once_with(ID) + self.client.system_peer_manager.delete_system_peer.assert_called_once_with( + ID + ) def test_delete_system_peer_without_id(self): - self.assertRaises(SystemExit, self.call, - system_peer_cmd.DeleteSystemPeer, app_args=[]) + self.assertRaises( + SystemExit, self.call, system_peer_cmd.DeleteSystemPeer, app_args=[] + ) def test_show_system_peer_with_id(self): - self.client.system_peer_manager.system_peer_detail.\ - return_value = [SYSTEM_PEER] - actual_call = self.call(system_peer_cmd.ShowSystemPeer, - app_args=[ID]) - self.assertEqual((ID, - SYSTEM_PEER_UUID, - SYSTEM_PEER_NAME, - MANAGER_ENDPOINT, - MANAGER_USERNAME, - PEER_CONTROLLER_GATEWAY_IP, - ADMINISTRATIVE_STATE, - HEARTBEAT_INTERVAL, - HEARTBEAT_FAILURE_THRESHOLD, - HEARTBEAT_FAILURES_POLICY, - HEARTBEAT_MAINTENANCE_TIMEOUT, - AVAILABILITY_STATE, - TIME_NOW, - TIME_NOW), - actual_call[1]) + self.client.system_peer_manager.system_peer_detail.return_value = [ + SYSTEM_PEER + ] + actual_call = self.call(system_peer_cmd.ShowSystemPeer, app_args=[ID]) + self.assertEqual( + ( + ID, + SYSTEM_PEER_UUID, + SYSTEM_PEER_NAME, + MANAGER_ENDPOINT, + MANAGER_USERNAME, + PEER_CONTROLLER_GATEWAY_IP, + ADMINISTRATIVE_STATE, + HEARTBEAT_INTERVAL, + HEARTBEAT_FAILURE_THRESHOLD, + HEARTBEAT_FAILURES_POLICY, + HEARTBEAT_MAINTENANCE_TIMEOUT, + AVAILABILITY_STATE, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], + ) def test_show_system_peer_without_id(self): - self.client.system_peer_manager.system_peer_detail.\ - return_value = [] - actual_call = self.call(system_peer_cmd.ShowSystemPeer, - app_args=[ID]) - self.assertEqual((tuple('' for _ in range(14)),), - actual_call[1]) + self.client.system_peer_manager.system_peer_detail.return_value = [] + actual_call = self.call(system_peer_cmd.ShowSystemPeer, app_args=[ID]) + self.assertEqual((tuple("" for _ in range(14)),), actual_call[1]) def test_list_system_peer_subcloud_peer_groups(self): - self.client.system_peer_manager.\ - system_peer_list_peer_groups.return_value = [PEER_GROUP] + self.client.system_peer_manager.system_peer_list_peer_groups.return_value = [ + PEER_GROUP + ] actual_call = self.call( - system_peer_cmd.ListSystemPeerSubcloudPeerGroups, - app_args=[ID]) + system_peer_cmd.ListSystemPeerSubcloudPeerGroups, app_args=[ID] + ) self.assertEqual([PG_TUPLE], actual_call[1]) def test_add_system_peer(self): - self.client.system_peer_manager.add_system_peer.\ - return_value = [SYSTEM_PEER] + self.client.system_peer_manager.add_system_peer.return_value = [SYSTEM_PEER] actual_call = self.call( system_peer_cmd.AddSystemPeer, - app_args=['--peer-uuid', SYSTEM_PEER_UUID, - '--peer-name', SYSTEM_PEER_NAME, - '--manager-endpoint', MANAGER_ENDPOINT, - '--manager-username', MANAGER_USERNAME, - '--manager-password', MANAGER_PASSWORD, - '--heartbeat-interval', str(HEARTBEAT_INTERVAL), - '--heartbeat-failure-threshold', - str(HEARTBEAT_FAILURE_THRESHOLD), - '--heartbeat-failure-policy', - HEARTBEAT_FAILURES_POLICY, - '--heartbeat-maintenance-timeout', - str(HEARTBEAT_MAINTENANCE_TIMEOUT), - '--peer-controller-gateway-address', - PEER_CONTROLLER_GATEWAY_IP, - '--administrative-state', ADMINISTRATIVE_STATE] + app_args=[ + "--peer-uuid", + SYSTEM_PEER_UUID, + "--peer-name", + SYSTEM_PEER_NAME, + "--manager-endpoint", + MANAGER_ENDPOINT, + "--manager-username", + MANAGER_USERNAME, + "--manager-password", + MANAGER_PASSWORD, + "--heartbeat-interval", + str(HEARTBEAT_INTERVAL), + "--heartbeat-failure-threshold", + str(HEARTBEAT_FAILURE_THRESHOLD), + "--heartbeat-failure-policy", + HEARTBEAT_FAILURES_POLICY, + "--heartbeat-maintenance-timeout", + str(HEARTBEAT_MAINTENANCE_TIMEOUT), + "--peer-controller-gateway-address", + PEER_CONTROLLER_GATEWAY_IP, + "--administrative-state", + ADMINISTRATIVE_STATE, + ], + ) + self.assertEqual( + ( + ID, + SYSTEM_PEER_UUID, + SYSTEM_PEER_NAME, + MANAGER_ENDPOINT, + MANAGER_USERNAME, + PEER_CONTROLLER_GATEWAY_IP, + ADMINISTRATIVE_STATE, + HEARTBEAT_INTERVAL, + HEARTBEAT_FAILURE_THRESHOLD, + HEARTBEAT_FAILURES_POLICY, + HEARTBEAT_MAINTENANCE_TIMEOUT, + AVAILABILITY_STATE, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], ) - self.assertEqual((ID, - SYSTEM_PEER_UUID, - SYSTEM_PEER_NAME, - MANAGER_ENDPOINT, - MANAGER_USERNAME, - PEER_CONTROLLER_GATEWAY_IP, - ADMINISTRATIVE_STATE, - HEARTBEAT_INTERVAL, - HEARTBEAT_FAILURE_THRESHOLD, - HEARTBEAT_FAILURES_POLICY, - HEARTBEAT_MAINTENANCE_TIMEOUT, - AVAILABILITY_STATE, - TIME_NOW, - TIME_NOW), - actual_call[1]) def test_update_system_peer(self): UPDATED_SYSTEM_PEER = copy.copy(SYSTEM_PEER) - UPDATED_SYSTEM_PEER.peer_controller_gateway_ip = \ + UPDATED_SYSTEM_PEER.peer_controller_gateway_ip = ( NEW_PEER_CONTROLLER_GATEWAY_IP - self.client.system_peer_manager.update_system_peer.\ - return_value = [UPDATED_SYSTEM_PEER] + ) + self.client.system_peer_manager.update_system_peer.return_value = [ + UPDATED_SYSTEM_PEER + ] actual_call = self.call( system_peer_cmd.UpdateSystemPeer, - app_args=[SYSTEM_PEER.peer_id, - '--peer-controller-gateway-address', - NEW_PEER_CONTROLLER_GATEWAY_IP]) - self.assertEqual((ID, - SYSTEM_PEER_UUID, - SYSTEM_PEER_NAME, - MANAGER_ENDPOINT, - MANAGER_USERNAME, - PEER_CONTROLLER_GATEWAY_IP, - ADMINISTRATIVE_STATE, - HEARTBEAT_INTERVAL, - HEARTBEAT_FAILURE_THRESHOLD, - HEARTBEAT_FAILURES_POLICY, - HEARTBEAT_MAINTENANCE_TIMEOUT, - AVAILABILITY_STATE, - TIME_NOW, - TIME_NOW), - actual_call[1]) + app_args=[ + SYSTEM_PEER.peer_id, + "--peer-controller-gateway-address", + NEW_PEER_CONTROLLER_GATEWAY_IP, + ], + ) + self.assertEqual( + ( + ID, + SYSTEM_PEER_UUID, + SYSTEM_PEER_NAME, + MANAGER_ENDPOINT, + MANAGER_USERNAME, + PEER_CONTROLLER_GATEWAY_IP, + ADMINISTRATIVE_STATE, + HEARTBEAT_INTERVAL, + HEARTBEAT_FAILURE_THRESHOLD, + HEARTBEAT_FAILURES_POLICY, + HEARTBEAT_MAINTENANCE_TIMEOUT, + AVAILABILITY_STATE, + TIME_NOW, + TIME_NOW, + ), + actual_call[1], + ) diff --git a/distributedcloud-client/dcmanagerclient/utils.py b/distributedcloud-client/dcmanagerclient/utils.py index f2306b1..08afbd6 100644 --- a/distributedcloud-client/dcmanagerclient/utils.py +++ b/distributedcloud-client/dcmanagerclient/utils.py @@ -44,7 +44,7 @@ def do_action_on_many(action, resources, success_msg, error_msg): def load_content(content): if content is None or content == "": - return dict() + return {} try: data = yaml.safe_load(content)