Add 'subcloud deploy create' command to dcmanager

This commit adds the subcloud deploy create command to dcmanager
client. It accepts the same parameters as the subcloud add command,
but only performs the pre-deployment phase, where all parameters are
validated and the subcloud database entry is created. It does not
perform the install, bootstrap or config phases.

Some sample test data are also moved into a common file so that they
can be used by different tests without code duplication.

Test Plan:
1. PASS - Create a subcloud using all the parameters and verify that the
          correct API call is made;
2. PASS - Create a subcloud using only the required parameters and
          verify that the correct API call is made;
3. PASS - Verify that it's not possible to create a subcloud without the
          required parameters;
4. PASS - Verify that the dcmanager help subcloud deploy create shows
          the correct help message containing all options.

Story: 2010756
Task: 48031

Depends-On: https://review.opendev.org/c/starlingx/distcloud/+/883373

Change-Id: I1e37081cba8572143c1284204909a21d7da2aab2
Signed-off-by: Gustavo Herzmann <gustavo.herzmann@windriver.com>
This commit is contained in:
Gustavo Herzmann 2023-05-17 10:50:24 -03:00
parent 2f2ad5fc78
commit dbb950d3ff
11 changed files with 508 additions and 446 deletions

View File

@ -1,7 +1,7 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2016 - Ericsson AB.
# Copyright (c) 2017-2022 Wind River Systems, Inc.
# Copyright (c) 2017-2023 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ from dcmanagerclient.api.v1 import alarm_manager as am
from dcmanagerclient.api.v1 import fw_update_manager as fum
from dcmanagerclient.api.v1 import kube_rootca_update_manager as krum
from dcmanagerclient.api.v1 import kube_upgrade_manager as kupm
from dcmanagerclient.api.v1 import phased_subcloud_deploy_manager as psdm
from dcmanagerclient.api.v1 import strategy_step_manager as ssm
from dcmanagerclient.api.v1 import subcloud_backup_manager as sbm
from dcmanagerclient.api.v1 import subcloud_deploy_manager as sdm
@ -115,6 +116,8 @@ class Client(object):
self.strategy_step_manager = \
ssm.strategy_step_manager(self.http_client)
self.sw_strategy_manager = sstm.sw_strategy_manager(self.http_client)
self.phased_subcloud_deploy_manager = \
psdm.phased_subcloud_deploy_manager(self.http_client)
def authenticate(dcmanager_url=None, username=None,

View File

@ -0,0 +1,44 @@
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from requests_toolbelt import MultipartEncoder
from dcmanagerclient.api import base
from dcmanagerclient.api.base import get_json
BASE_URL = '/phased-subcloud-deploy/'
class phased_subcloud_deploy_manager(base.ResourceManager):
resource_class = base.Subcloud
def json_to_resource(self, json_object):
return self.resource_class.from_payload(self, json_object)
def _request_method(self, method, url, body, headers):
method = method.lower()
if method not in ("post", "patch", "put", "get", "delete"):
raise ValueError("Invalid request method: %s" % method)
return getattr(self.http_client, method)(url, body, headers)
def _deploy_operation(self, url, body, data, method="post"):
fields = dict()
for k, v in body.items():
fields.update({k: (v, open(v, 'rb'),)})
fields.update(data)
enc = MultipartEncoder(fields=fields)
headers = {'content-type': enc.content_type}
resp = self._request_method(method, url, enc, headers)
if resp.status_code != 200:
self._raise_api_exception(resp)
json_object = get_json(resp)
resource = [self.json_to_resource(json_object)]
return resource
def subcloud_deploy_create(self, **kwargs):
data = kwargs.get('data')
files = kwargs.get('files')
return self._deploy_operation(BASE_URL, files, data)

View File

@ -0,0 +1,120 @@
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import base64
import os
from dcmanagerclient.commands.v1 import base
from dcmanagerclient import exceptions
from dcmanagerclient import utils
class CreatePhasedSubcloudDeploy(base.DCManagerShowOne):
"""Creates a new subcloud."""
def _get_format_function(self):
return utils.subcloud_detail_format
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'--bootstrap-address',
required=True,
help='IP address for initial subcloud controller.'
)
parser.add_argument(
'--bootstrap-values',
required=True,
help='YAML file containing subcloud configuration settings. '
'Can be either a local file path or a URL.'
)
parser.add_argument(
'--deploy-config',
required=False,
help='YAML file containing subcloud variables to be passed to the '
'deploy playbook.'
)
parser.add_argument(
'--install-values',
required=False,
help='YAML file containing subcloud variables required for remote '
'install playbook.'
)
parser.add_argument(
'--bmc-password',
required=False,
help='bmc password of the subcloud to be configured, '
'if not provided you will be prompted. This parameter is only'
' valid if the --install-values are specified.'
)
parser.add_argument(
'--group',
required=False,
help='Name or ID of subcloud group.'
)
parser.add_argument(
'--release',
required=False,
help='software release used to install, bootstrap and/or deploy '
'the subcloud with. If not specified, the current software '
'release of the system controller will be used.'
)
return parser
def _get_resources(self, parsed_args):
dcmanager_client = self.app.client_manager.\
phased_subcloud_deploy_manager.phased_subcloud_deploy_manager
files = dict()
data = dict()
data['bootstrap-address'] = parsed_args.bootstrap_address
# Get the bootstrap values yaml file
if not os.path.isfile(parsed_args.bootstrap_values):
error_msg = "bootstrap-values does not exist: %s" % \
parsed_args.bootstrap_values
raise exceptions.DCManagerClientException(error_msg)
files['bootstrap_values'] = parsed_args.bootstrap_values
# Get the deploy config yaml file
if parsed_args.deploy_config:
if not os.path.isfile(parsed_args.deploy_config):
error_msg = "deploy-config does not exist: %s" % \
parsed_args.deploy_config
raise exceptions.DCManagerClientException(error_msg)
files['deploy_config'] = parsed_args.deploy_config
# Get the install values yaml file
if parsed_args.install_values:
if not os.path.isfile(parsed_args.install_values):
error_msg = "install-values does not exist: %s" % \
parsed_args.install_values
raise exceptions.DCManagerClientException(error_msg)
files['install_values'] = parsed_args.install_values
if parsed_args.bmc_password:
data['bmc_password'] = base64.b64encode(
parsed_args.bmc_password.encode("utf-8"))
else:
password = utils.prompt_for_password('bmc')
data["bmc_password"] = base64.b64encode(
password.encode("utf-8"))
if parsed_args.group:
data['group_id'] = parsed_args.group
if parsed_args.release:
data['release'] = parsed_args.release
return dcmanager_client.subcloud_deploy_create(
files=files, data=data)

View File

@ -1,5 +1,5 @@
# Copyright 2015 - Ericsson AB.
# Copyright (c) 2017-2022 Wind River Systems, Inc.
# Copyright (c) 2017-2023 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -36,6 +36,7 @@ from dcmanagerclient.commands.v1 import alarm_manager as am
from dcmanagerclient.commands.v1 import fw_update_manager as fum
from dcmanagerclient.commands.v1 import kube_rootca_update_manager as krum
from dcmanagerclient.commands.v1 import kube_upgrade_manager as kupm
from dcmanagerclient.commands.v1 import phased_subcloud_deploy_manager as psdm
from dcmanagerclient.commands.v1 import subcloud_backup_manager as sbm
from dcmanagerclient.commands.v1 import subcloud_deploy_manager as sdm
from dcmanagerclient.commands.v1 import subcloud_group_manager as gm
@ -495,7 +496,8 @@ class DCManagerShell(app.App):
sw_upgrade_manager=self.client,
kube_upgrade_manager=self.client,
kube_rootca_update_manager=self.client,
sw_prestage_manager=self.client)
sw_prestage_manager=self.client,
phased_subcloud_deploy_manager=self.client)
)
self.client_manager = ClientManager()
@ -542,6 +544,7 @@ class DCManagerShell(app.App):
'subcloud-group list-subclouds': gm.ListSubcloudGroupSubclouds,
'subcloud-group show': gm.ShowSubcloudGroup,
'subcloud-group update': gm.UpdateSubcloudGroup,
'subcloud deploy create': psdm.CreatePhasedSubcloudDeploy,
'subcloud-deploy upload': sdm.SubcloudDeployUpload,
'subcloud-deploy show': sdm.SubcloudDeployShow,
'alarm summary': am.ListAlarmSummary,

View File

@ -16,9 +16,123 @@
#
import json
import mock
from oslo_utils import timeutils
import testtools
from dcmanagerclient.api import base as api_base
# Subcloud sample data
BOOTSTRAP_ADDRESS = '10.10.10.12'
TIME_NOW = timeutils.utcnow().isoformat()
ID = '1'
ID_1 = '2'
NAME = 'subcloud1'
SYSTEM_MODE = "duplex"
DESCRIPTION = 'subcloud1 description'
LOCATION = 'subcloud1 location'
SOFTWARE_VERSION = '12.34'
MANAGEMENT_STATE = 'unmanaged'
AVAILABILITY_STATUS = 'offline'
DEPLOY_STATUS = 'not-deployed'
SYNC_STATUS = 'unknown'
ERROR_DESCRIPTION = 'No errors present'
DEPLOY_STATE_PRE_DEPLOY = 'pre-deploy'
DEPLOY_STATE_PRE_RESTORE = 'pre-restore'
MANAGEMENT_SUBNET = '192.168.101.0/24'
MANAGEMENT_START_IP = '192.168.101.2'
MANAGEMENT_END_IP = '192.168.101.50'
MANAGEMENT_GATEWAY_IP = '192.168.101.1'
SYSTEMCONTROLLER_GATEWAY_IP = '192.168.204.101'
EXTERNAL_OAM_SUBNET = "10.10.10.0/24"
EXTERNAL_OAM_GATEWAY_ADDRESS = "10.10.10.1"
EXTERNAL_OAM_FLOATING_ADDRESS = "10.10.10.12"
DEFAULT_SUBCLOUD_GROUP_ID = '1'
DEPLOY_CONFIG_SYNC_STATUS = 'Deployment: configurations up-to-date'
BACKUP_STATUS = 'None'
BACKUP_DATETIME = 'None'
# Subcloud CLI resource object
SUBCLOUD_RESOURCE = api_base.Subcloud(
mock,
subcloud_id=ID,
name=NAME,
description=DESCRIPTION,
location=LOCATION,
software_version=SOFTWARE_VERSION,
management_state=MANAGEMENT_STATE,
availability_status=AVAILABILITY_STATUS,
deploy_status=DEPLOY_STATUS,
error_description=ERROR_DESCRIPTION,
management_subnet=MANAGEMENT_SUBNET,
management_start_ip=MANAGEMENT_START_IP,
management_end_ip=MANAGEMENT_END_IP,
management_gateway_ip=MANAGEMENT_GATEWAY_IP,
systemcontroller_gateway_ip=SYSTEMCONTROLLER_GATEWAY_IP,
created_at=TIME_NOW,
updated_at=TIME_NOW,
group_id=DEFAULT_SUBCLOUD_GROUP_ID,
backup_status=BACKUP_STATUS,
backup_datetime=BACKUP_DATETIME)
# Subcloud result values returned from various API calls (e.g. subcloud show)
SUBCLOUD_FIELD_RESULT_LIST = (
ID,
NAME,
DESCRIPTION,
LOCATION,
SOFTWARE_VERSION,
MANAGEMENT_STATE,
AVAILABILITY_STATUS,
DEPLOY_STATUS,
MANAGEMENT_SUBNET,
MANAGEMENT_START_IP,
MANAGEMENT_END_IP,
MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW,
TIME_NOW,
BACKUP_STATUS,
BACKUP_DATETIME
)
EMPTY_SUBCLOUD_FIELD_RESULT = (('<none>',) * len(SUBCLOUD_FIELD_RESULT_LIST),)
# Subcloud result values returned from subcloud list command
SUBCLOUD_LIST_RESULT = (
ID,
NAME,
MANAGEMENT_STATE,
AVAILABILITY_STATUS,
DEPLOY_STATUS,
SYNC_STATUS,
BACKUP_STATUS,
BACKUP_DATETIME
)
EMPTY_SUBCLOUD_LIST_RESULT = (('<none>',) * len(SUBCLOUD_LIST_RESULT),)
FAKE_BOOTSTRAP_VALUES = {
"system_mode": SYSTEM_MODE,
"name": NAME,
"description": DESCRIPTION,
"location": LOCATION,
"management_subnet": MANAGEMENT_SUBNET,
"management_start_address": MANAGEMENT_START_IP,
"management_end_address": MANAGEMENT_END_IP,
"management_gateway_address": MANAGEMENT_GATEWAY_IP,
"external_oam_subnet": EXTERNAL_OAM_SUBNET,
"external_oam_gateway_address": EXTERNAL_OAM_GATEWAY_ADDRESS,
"external_oam_floating_address": EXTERNAL_OAM_FLOATING_ADDRESS,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME
}
class FakeResponse(object):
"""Fake response for testing DC Manager Client."""

View File

@ -0,0 +1,44 @@
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import mock
import os
import tempfile
from dcmanagerclient.commands.v1 import phased_subcloud_deploy_manager as cmd
from dcmanagerclient.tests import base
class TestCLIPhasedSubcloudDeployManagerV1(base.BaseCommandTest):
def setUp(self):
super().setUp()
# The client is the subcloud_deploy_manager
self.client = self.app.client_manager.phased_subcloud_deploy_manager.\
phased_subcloud_deploy_manager
@mock.patch('getpass.getpass', return_value='testpassword')
def test_subcloud_deploy_create(self, getpass):
self.client.subcloud_deploy_create.return_value = [
base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile(mode='w') as bootstrap_file,\
tempfile.NamedTemporaryFile(mode='w') as config_file,\
tempfile.NamedTemporaryFile(mode='w') as install_file:
bootstrap_file_path = os.path.abspath(bootstrap_file.name)
config_file_path = os.path.abspath(config_file.name)
install_file_path = os.path.abspath(install_file.name)
actual_call = self.call(
cmd.CreatePhasedSubcloudDeploy, app_args=[
'--bootstrap-address', base.BOOTSTRAP_ADDRESS,
'--install-values', install_file_path,
'--bootstrap-values', bootstrap_file_path,
'--deploy-config', config_file_path,
'--release', base.SOFTWARE_VERSION,
])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])

View File

@ -8,106 +8,18 @@ import base64
import mock
import os
from oslo_utils import timeutils
from dcmanagerclient.api import base as api_base
from dcmanagerclient.commands.v1 \
import subcloud_backup_manager as subcloud_backup_cmd
from dcmanagerclient.exceptions import DCManagerClientException
from dcmanagerclient.tests import base
BOOTSTRAP_ADDRESS = '10.10.10.12'
TIME_NOW = timeutils.utcnow().isoformat()
ID = '1'
ID_1 = '2'
NAME = 'subcloud1'
SYSTEM_MODE = "duplex"
DESCRIPTION = 'subcloud1 description'
LOCATION = 'subcloud1 location'
SOFTWARE_VERSION = '12.34'
MANAGEMENT_STATE = 'unmanaged'
AVAILABILITY_STATUS = 'offline'
DEPLOY_STATUS = 'not-deployed'
ERROR_DESCRIPTION = 'test error description'
DEPLOY_STATE_PRE_DEPLOY = 'pre-deploy'
DEPLOY_STATE_PRE_RESTORE = 'pre-restore'
MANAGEMENT_SUBNET = '192.168.101.0/24'
MANAGEMENT_START_IP = '192.168.101.2'
MANAGEMENT_END_IP = '192.168.101.50'
MANAGEMENT_GATEWAY_IP = '192.168.101.1'
SYSTEMCONTROLLER_GATEWAY_IP = '192.168.204.101'
EXTERNAL_OAM_SUBNET = "10.10.10.0/24"
EXTERNAL_OAM_GATEWAY_ADDRESS = "10.10.10.1"
EXTERNAL_OAM_FLOATING_ADDRESS = "10.10.10.12"
DEFAULT_SUBCLOUD_GROUP_ID = '1'
OVERRIDE_VALUES = """---
platform_backup_filename_prefix: test
openstack_app_name: test
backup_dir: test
"""
SUBCLOUD_DICT = {
'SUBCLOUD_ID': ID,
'NAME': NAME,
'DESCRIPTION': DESCRIPTION,
'LOCATION': LOCATION,
'SOFTWARE_VERSION': SOFTWARE_VERSION,
'MANAGEMENT_STATE': MANAGEMENT_STATE,
'AVAILABILITY_STATUS': AVAILABILITY_STATUS,
'DEPLOY_STATUS': DEPLOY_STATUS,
'ERROR_DESCRIPTION': ERROR_DESCRIPTION,
'MANAGEMENT_SUBNET': MANAGEMENT_SUBNET,
'MANAGEMENT_START_IP': MANAGEMENT_START_IP,
'MANAGEMENT_END_IP': MANAGEMENT_END_IP,
'MANAGEMENT_GATEWAY_IP': MANAGEMENT_GATEWAY_IP,
'SYSTEMCONTROLLER_GATEWAY_IP': SYSTEMCONTROLLER_GATEWAY_IP,
'CREATED_AT': TIME_NOW,
'UPDATED_AT': TIME_NOW,
'GROUP_ID': DEFAULT_SUBCLOUD_GROUP_ID,
'OAM_FLOATING_IP': EXTERNAL_OAM_FLOATING_ADDRESS
}
SUBCLOUD = api_base.Subcloud(
mock,
subcloud_id=SUBCLOUD_DICT['SUBCLOUD_ID'],
name=SUBCLOUD_DICT['NAME'],
description=SUBCLOUD_DICT['DESCRIPTION'],
location=SUBCLOUD_DICT['LOCATION'],
software_version=SUBCLOUD_DICT['SOFTWARE_VERSION'],
management_state=SUBCLOUD_DICT['MANAGEMENT_STATE'],
availability_status=SUBCLOUD_DICT['AVAILABILITY_STATUS'],
deploy_status=SUBCLOUD_DICT['DEPLOY_STATUS'],
error_description=SUBCLOUD_DICT['ERROR_DESCRIPTION'],
management_subnet=SUBCLOUD_DICT['MANAGEMENT_SUBNET'],
management_start_ip=SUBCLOUD_DICT['MANAGEMENT_START_IP'],
management_end_ip=SUBCLOUD_DICT['MANAGEMENT_END_IP'],
management_gateway_ip=SUBCLOUD_DICT['MANAGEMENT_GATEWAY_IP'],
systemcontroller_gateway_ip=SUBCLOUD_DICT['SYSTEMCONTROLLER_GATEWAY_IP'],
created_at=SUBCLOUD_DICT['CREATED_AT'],
updated_at=SUBCLOUD_DICT['UPDATED_AT'],
group_id=SUBCLOUD_DICT['GROUP_ID'])
DEFAULT_SUBCLOUD_FIELD_RESULT = (
ID,
NAME,
DESCRIPTION,
LOCATION,
SOFTWARE_VERSION,
MANAGEMENT_STATE,
AVAILABILITY_STATUS,
DEPLOY_STATUS,
MANAGEMENT_SUBNET,
MANAGEMENT_START_IP,
MANAGEMENT_END_IP,
MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW,
TIME_NOW,
None,
None)
class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
@ -118,7 +30,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def test_backup_create_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_create.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -131,12 +43,12 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
'--registry-images',
'--backup-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT, actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_create_group(self):
self.client.subcloud_backup_manager.backup_subcloud_create.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -147,7 +59,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
app_args=['--group', 'test',
'--backup-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual([DEFAULT_SUBCLOUD_FIELD_RESULT], actual_call[1])
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_create_group_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_create.\
@ -204,7 +116,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def test_backup_create_prompt_ask_for_password(self, getpass):
self.client.subcloud_backup_manager.backup_subcloud_create.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -215,7 +127,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
app_args=['--group', 'test',
'--local-only',
'--backup-values', backupPath])
self.assertEqual([DEFAULT_SUBCLOUD_FIELD_RESULT], actual_call[1])
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_create_local_only_registry_images(self):
@ -370,7 +282,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def test_backup_restore(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -384,12 +296,12 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
'--restore-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT, actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_no_restore_values(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
@ -397,12 +309,12 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
'--local-only',
'--registry-images',
'--sysadmin-password', 'testpassword'])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT, actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_with_group(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -414,7 +326,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
'--with-install',
'--restore-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual([DEFAULT_SUBCLOUD_FIELD_RESULT], actual_call[1])
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_restore_group_and_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
@ -473,7 +385,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def test_backup_restore_prompt_ask_for_password(self, getpass):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
@ -485,7 +397,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
app_args=['--group', 'test',
'--local-only',
'--restore-values', backupPath])
self.assertEqual([DEFAULT_SUBCLOUD_FIELD_RESULT], actual_call[1])
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_restore_local_only_registry_images(self):
@ -502,7 +414,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def test_backup_restore_with_install_no_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -517,11 +429,11 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
'--restore-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT, actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_with_install_with_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -531,17 +443,17 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=['--subcloud', 'subcloud1',
'--with-install',
'--release', SOFTWARE_VERSION,
'--release', base.SOFTWARE_VERSION,
'--local-only',
'--registry-images',
'--restore-values', backupPath,
'--sysadmin-password', 'testpassword'])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT, actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_no_install_with_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
backupPath = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backupPath, mode='w') as f:
@ -551,7 +463,7 @@ class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=['--subcloud', 'subcloud1',
'--release', SOFTWARE_VERSION,
'--release', base.SOFTWARE_VERSION,
'--local-only',
'--registry-images',
'--restore-values', backupPath,

View File

@ -26,14 +26,13 @@ DEPLOY_PLAYBOOK = 'deployment-manager-playbook.yaml'
DEPLOY_OVERRIDES = 'deployment-manager-overrides-subcloud.yaml'
DEPLOY_CHART = 'deployment-manager.tgz'
DEPLOY_PRESTAGE_IMAGES = 'prebuilt-images.lst'
SOFTWARE_VERSION = '21.12'
SUBCLOUD_DEPLOY_DICT = {
'DEPLOY_PLAYBOOK': DEPLOY_PLAYBOOK,
'DEPLOY_OVERRIDES': DEPLOY_OVERRIDES,
'DEPLOY_CHART': DEPLOY_CHART,
'DEPLOY_PRESTAGE_IMAGES': DEPLOY_PRESTAGE_IMAGES,
'SOFTWARE_VERSION': SOFTWARE_VERSION
'SOFTWARE_VERSION': base.SOFTWARE_VERSION
}
SUBCLOUD_DEPLOY_ALL = sdm.SubcloudDeploy(
@ -94,19 +93,19 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
DEPLOY_OVERRIDES,
DEPLOY_CHART,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call1[1])
# With "--release" parameter
actual_call2 = self.call(
subcloud_deploy_cmd.SubcloudDeployShow,
app_args=['--release', SOFTWARE_VERSION])
app_args=['--release', base.SOFTWARE_VERSION])
self.assertEqual((DEPLOY_PLAYBOOK,
DEPLOY_OVERRIDES,
DEPLOY_CHART,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call2[1])
def test_subcloud_deploy_upload_all(self):
@ -133,7 +132,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
DEPLOY_OVERRIDES,
DEPLOY_CHART,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
def test_subcloud_deploy_upload_no_prestage(self):
@ -157,7 +156,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
DEPLOY_OVERRIDES,
DEPLOY_CHART,
None,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
def test_subcloud_deploy_upload_prestage(self):
@ -174,7 +173,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
None,
None,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
def test_subcloud_deploy_upload_no_playbook(self):
@ -197,7 +196,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
DEPLOY_OVERRIDES,
DEPLOY_CHART,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
def test_subcloud_deploy_upload_no_playbook_overrides(self):
@ -217,7 +216,7 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
None,
DEPLOY_CHART,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
def test_subcloud_deploy_upload_no_overrides_chart(self):
@ -233,12 +232,12 @@ class TestCLISubcloudDeployManagerV1(base.BaseCommandTest):
app_args=[
'--deploy-playbook', file_path_1,
'--prestage-images', file_path_2,
'--release', SOFTWARE_VERSION])
'--release', base.SOFTWARE_VERSION])
self.assertEqual((DEPLOY_PLAYBOOK,
None,
None,
DEPLOY_PRESTAGE_IMAGES,
SOFTWARE_VERSION),
base.SOFTWARE_VERSION),
actual_call[1])
@mock.patch('builtins.print')

View File

@ -1,5 +1,5 @@
# Copyright (c) 2017 Ericsson AB.
# Copyright (c) 2017-2021 Wind River Systems, Inc.
# Copyright (c) 2017-2023 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -23,7 +23,6 @@ from dcmanagerclient.api.v1 import subcloud_group_manager as zm
from dcmanagerclient.commands.v1 \
import subcloud_group_manager as subcloud_group_cmd
from dcmanagerclient.tests import base
from dcmanagerclient.tests.v1 import test_subcloud_manager as tsm
ID = '2'
@ -79,12 +78,13 @@ class TestCLISubcloudGroupManagerV1(base.BaseCommandTest):
def test_list_subcloud_group_subclouds(self):
self.client.subcloud_group_manager.\
subcloud_group_list_subclouds.return_value = [tsm.SUBCLOUD]
subcloud_group_list_subclouds.return_value = [
base.SUBCLOUD_RESOURCE]
actual_call = self.call(subcloud_group_cmd.ListSubcloudGroupSubclouds,
app_args=[ID])
self.client.subcloud_group_manager.subcloud_group_list_subclouds.\
assert_called_once_with(ID)
self.assertEqual([tsm.DEFAULT_SUBCLOUD_FIELD_RESULT_LIST],
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST],
actual_call[1])
def test_delete_subcloud_group_by_id(self):

View File

@ -20,147 +20,30 @@ import os
import tempfile
import yaml
from oslo_utils import timeutils
from dcmanagerclient.api import base as api_base
from dcmanagerclient.commands.v1 import subcloud_manager as subcloud_cmd
from dcmanagerclient.exceptions import DCManagerClientException
from dcmanagerclient.tests import base
BOOTSTRAP_ADDRESS = '10.10.10.12'
TIME_NOW = timeutils.utcnow().isoformat()
ID = '1'
ID_1 = '2'
NAME = 'subcloud1'
SYSTEM_MODE = "duplex"
DESCRIPTION = 'subcloud1 description'
LOCATION = 'subcloud1 location'
SOFTWARE_VERSION = '12.34'
MANAGEMENT_STATE = 'unmanaged'
AVAILABILITY_STATUS = 'offline'
DEPLOY_STATUS = 'not-deployed'
ERROR_DESCRIPTION = 'No errors present'
DEPLOY_STATE_PRE_DEPLOY = 'pre-deploy'
DEPLOY_STATE_PRE_RESTORE = 'pre-restore'
MANAGEMENT_SUBNET = '192.168.101.0/24'
MANAGEMENT_START_IP = '192.168.101.2'
MANAGEMENT_END_IP = '192.168.101.50'
MANAGEMENT_GATEWAY_IP = '192.168.101.1'
SYSTEMCONTROLLER_GATEWAY_IP = '192.168.204.101'
EXTERNAL_OAM_SUBNET = "10.10.10.0/24"
EXTERNAL_OAM_GATEWAY_ADDRESS = "10.10.10.1"
EXTERNAL_OAM_FLOATING_ADDRESS = "10.10.10.12"
DEPLOY_CONFIG_SYNC_STATUS = 'Deployment: configurations up-to-date'
DEFAULT_SUBCLOUD_GROUP_ID = '1'
BACKUP_STATUS = 'complete'
BACKUP_DATETIME = '2022-09-07'
SUBCLOUD_DICT = {
'SUBCLOUD_ID': ID,
'NAME': NAME,
'DESCRIPTION': DESCRIPTION,
'LOCATION': LOCATION,
'SOFTWARE_VERSION': SOFTWARE_VERSION,
'MANAGEMENT_STATE': MANAGEMENT_STATE,
'AVAILABILITY_STATUS': AVAILABILITY_STATUS,
'DEPLOY_STATUS': DEPLOY_STATUS,
'ERROR_DESCRIPTION': ERROR_DESCRIPTION,
'MANAGEMENT_SUBNET': MANAGEMENT_SUBNET,
'MANAGEMENT_START_IP': MANAGEMENT_START_IP,
'MANAGEMENT_END_IP': MANAGEMENT_END_IP,
'MANAGEMENT_GATEWAY_IP': MANAGEMENT_GATEWAY_IP,
'SYSTEMCONTROLLER_GATEWAY_IP': SYSTEMCONTROLLER_GATEWAY_IP,
'CREATED_AT': TIME_NOW,
'UPDATED_AT': TIME_NOW,
'GROUP_ID': DEFAULT_SUBCLOUD_GROUP_ID,
'OAM_FLOATING_IP': EXTERNAL_OAM_FLOATING_ADDRESS,
'DEPLOY_CONFIG_SYNC_STATUS': DEPLOY_CONFIG_SYNC_STATUS,
'BACKUP_STATUS': BACKUP_STATUS,
'BACKUP_DATETIME': BACKUP_DATETIME
}
SUBCLOUD = api_base.Subcloud(
mock,
subcloud_id=SUBCLOUD_DICT['SUBCLOUD_ID'],
name=SUBCLOUD_DICT['NAME'],
description=SUBCLOUD_DICT['DESCRIPTION'],
location=SUBCLOUD_DICT['LOCATION'],
software_version=SUBCLOUD_DICT['SOFTWARE_VERSION'],
management_state=SUBCLOUD_DICT['MANAGEMENT_STATE'],
availability_status=SUBCLOUD_DICT['AVAILABILITY_STATUS'],
deploy_status=SUBCLOUD_DICT['DEPLOY_STATUS'],
error_description=SUBCLOUD_DICT['ERROR_DESCRIPTION'],
management_subnet=SUBCLOUD_DICT['MANAGEMENT_SUBNET'],
management_start_ip=SUBCLOUD_DICT['MANAGEMENT_START_IP'],
management_end_ip=SUBCLOUD_DICT['MANAGEMENT_END_IP'],
management_gateway_ip=SUBCLOUD_DICT['MANAGEMENT_GATEWAY_IP'],
systemcontroller_gateway_ip=SUBCLOUD_DICT['SYSTEMCONTROLLER_GATEWAY_IP'],
created_at=SUBCLOUD_DICT['CREATED_AT'],
updated_at=SUBCLOUD_DICT['UPDATED_AT'],
group_id=SUBCLOUD_DICT['GROUP_ID'],
backup_status=SUBCLOUD_DICT['BACKUP_STATUS'],
backup_datetime=SUBCLOUD_DICT['BACKUP_DATETIME'])
DEFAULT_SUBCLOUD_FIELD_RESULT_LIST = (
ID,
NAME,
DESCRIPTION,
LOCATION,
SOFTWARE_VERSION,
MANAGEMENT_STATE,
AVAILABILITY_STATUS,
DEPLOY_STATUS,
MANAGEMENT_SUBNET,
MANAGEMENT_START_IP,
MANAGEMENT_END_IP,
MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW,
TIME_NOW,
BACKUP_STATUS,
BACKUP_DATETIME)
FAKE_BOOTSTRAP_VALUES = {
"system_mode": SYSTEM_MODE,
"name": NAME,
"description": DESCRIPTION,
"location": LOCATION,
"management_subnet": MANAGEMENT_SUBNET,
"management_start_address": MANAGEMENT_START_IP,
"management_end_address": MANAGEMENT_END_IP,
"management_gateway_address": MANAGEMENT_GATEWAY_IP,
"external_oam_subnet": EXTERNAL_OAM_SUBNET,
"external_oam_gateway_address": EXTERNAL_OAM_GATEWAY_ADDRESS,
"external_oam_floating_address": EXTERNAL_OAM_FLOATING_ADDRESS,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME
}
class TestCLISubcloudManagerV1(base.BaseCommandTest):
def test_list_subclouds(self):
self.client.subcloud_manager.list_subclouds.return_value = [SUBCLOUD]
self.client.subcloud_manager.list_subclouds.return_value = \
[base.SUBCLOUD_RESOURCE]
actual_call = self.call(subcloud_cmd.ListSubcloud)
self.assertEqual([(ID, NAME, MANAGEMENT_STATE, AVAILABILITY_STATUS,
DEPLOY_STATUS, "unknown",
BACKUP_STATUS, BACKUP_DATETIME)],
self.assertEqual([base.SUBCLOUD_LIST_RESULT],
actual_call[1])
def test_negative_list_subclouds(self):
self.client.subcloud_manager.list_subclouds.return_value = []
actual_call = self.call(subcloud_cmd.ListSubcloud)
self.assertEqual((('<none>', '<none>', '<none>', '<none>', '<none>',
'<none>', '<none>', '<none>'),),
self.assertEqual(base.EMPTY_SUBCLOUD_LIST_RESULT,
actual_call[1])
def test_delete_subcloud_with_subcloud_id(self):
self.call(subcloud_cmd.DeleteSubcloud, app_args=[ID])
self.call(subcloud_cmd.DeleteSubcloud, app_args=[base.ID])
self.client.subcloud_manager.delete_subcloud.\
assert_called_once_with(ID)
assert_called_once_with(base.ID)
def test_delete_subcloud_without_subcloud_id(self):
self.assertRaises(SystemExit, self.call,
@ -168,133 +51,76 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
def test_show_subcloud_with_subcloud_id(self):
self.client.subcloud_manager.subcloud_detail.\
return_value = [SUBCLOUD]
actual_call = self.call(subcloud_cmd.ShowSubcloud, app_args=[ID])
self.assertEqual(DEFAULT_SUBCLOUD_FIELD_RESULT_LIST,
return_value = [base.SUBCLOUD_RESOURCE]
actual_call = self.call(subcloud_cmd.ShowSubcloud, app_args=[base.ID])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST,
actual_call[1])
def test_show_subcloud_with_additional_detail(self):
SUBCLOUD_WITH_ADDITIONAL_DETAIL = copy.copy(SUBCLOUD)
SUBCLOUD_WITH_ADDITIONAL_DETAIL.oam_floating_ip = \
SUBCLOUD_DICT['OAM_FLOATING_IP']
SUBCLOUD_WITH_ADDITIONAL_DETAIL = copy.copy(base.SUBCLOUD_RESOURCE)
SUBCLOUD_WITH_ADDITIONAL_DETAIL.oam_floating_ip = \
base.EXTERNAL_OAM_FLOATING_ADDRESS
SUBCLOUD_WITH_ADDITIONAL_DETAIL.deploy_config_sync_status = \
SUBCLOUD_DICT['DEPLOY_CONFIG_SYNC_STATUS']
base.DEPLOY_CONFIG_SYNC_STATUS
self.client.subcloud_manager.subcloud_additional_details.\
return_value = [SUBCLOUD_WITH_ADDITIONAL_DETAIL]
actual_call = self.call(subcloud_cmd.ShowSubcloud,
app_args=[ID, '--detail'])
self.assertEqual((ID, NAME,
DESCRIPTION,
LOCATION,
SOFTWARE_VERSION,
MANAGEMENT_STATE,
AVAILABILITY_STATUS,
DEPLOY_STATUS,
MANAGEMENT_SUBNET,
MANAGEMENT_START_IP,
MANAGEMENT_END_IP,
MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW,
BACKUP_STATUS, BACKUP_DATETIME,
EXTERNAL_OAM_FLOATING_ADDRESS,
DEPLOY_CONFIG_SYNC_STATUS),
app_args=[base.ID, '--detail'])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST +
(base.EXTERNAL_OAM_FLOATING_ADDRESS,
base.DEPLOY_CONFIG_SYNC_STATUS),
actual_call[1])
def test_show_subcloud_negative(self):
self.client.subcloud_manager.subcloud_detail.return_value = []
actual_call = self.call(subcloud_cmd.ShowSubcloud, app_args=[ID])
self.assertEqual((('<none>', '<none>', '<none>', '<none>',
'<none>', '<none>', '<none>', '<none>',
'<none>', '<none>', '<none>', '<none>',
'<none>', '<none>', '<none>', '<none>',
'<none>', '<none>'),),
actual_call = self.call(subcloud_cmd.ShowSubcloud, app_args=[base.ID])
self.assertEqual(base.EMPTY_SUBCLOUD_FIELD_RESULT,
actual_call[1])
@mock.patch('getpass.getpass', return_value='testpassword')
def test_add_subcloud(self, getpass):
self.client.subcloud_manager.add_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile(mode='w') as f:
yaml.dump(FAKE_BOOTSTRAP_VALUES, f)
yaml.dump(base.FAKE_BOOTSTRAP_VALUES, f)
file_path = os.path.abspath(f.name)
# Without "--release" parameter
actual_call1 = self.call(
subcloud_cmd.AddSubcloud, app_args=[
'--bootstrap-address', BOOTSTRAP_ADDRESS,
'--bootstrap-address', base.BOOTSTRAP_ADDRESS,
'--bootstrap-values', file_path,
])
# With "--release" parameter
actual_call2 = self.call(
subcloud_cmd.AddSubcloud, app_args=[
'--bootstrap-address', BOOTSTRAP_ADDRESS,
'--bootstrap-address', base.BOOTSTRAP_ADDRESS,
'--bootstrap-values', file_path,
'--release', SOFTWARE_VERSION,
'--release', base.SOFTWARE_VERSION,
])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call1[1])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call2[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call1[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call2[1])
@mock.patch('getpass.getpass', return_value='testpassword')
def test_add_migrate_subcloud(self, getpass):
self.client.subcloud_manager.add_subcloud.\
return_value = [SUBCLOUD]
values = {
"system_mode": SYSTEM_MODE,
"name": NAME,
"description": DESCRIPTION,
"location": LOCATION,
"management_subnet": MANAGEMENT_SUBNET,
"management_start_address": MANAGEMENT_START_IP,
"management_end_address": MANAGEMENT_END_IP,
"management_gateway_address": MANAGEMENT_GATEWAY_IP,
"external_oam_subnet": EXTERNAL_OAM_SUBNET,
"external_oam_gateway_address": EXTERNAL_OAM_GATEWAY_ADDRESS,
"external_oam_floating_address": EXTERNAL_OAM_FLOATING_ADDRESS,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME,
'backup_status': BACKUP_STATUS,
'backup_datetime': BACKUP_DATETIME
}
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile(mode='w') as f:
yaml.dump(values, f)
yaml.dump(base.FAKE_BOOTSTRAP_VALUES, f)
file_path = os.path.abspath(f.name)
actual_call = self.call(
subcloud_cmd.AddSubcloud, app_args=[
'--bootstrap-address', BOOTSTRAP_ADDRESS,
'--bootstrap-address', base.BOOTSTRAP_ADDRESS,
'--bootstrap-values', file_path,
'--migrate',
])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
@mock.patch('getpass.getpass', return_value='testpassword')
def test_add_migrate_subcloud_with_deploy_config(self, getpass):
self.client.subcloud_manager.add_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile(mode='w') as f_bootstrap:
bootstrap_file_path = os.path.abspath(f_bootstrap.name)
@ -305,7 +131,7 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
self.assertRaises(
DCManagerClientException, self.call,
subcloud_cmd.AddSubcloud, app_args=[
'--bootstrap-address', BOOTSTRAP_ADDRESS,
'--bootstrap-address', base.BOOTSTRAP_ADDRESS,
'--bootstrap-values', bootstrap_file_path,
'--deploy-config', config_file_path,
'--migrate',
@ -313,19 +139,10 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
def test_unmanage_subcloud(self):
self.client.subcloud_manager.update_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
actual_call = self.call(
subcloud_cmd.UnmanageSubcloud, app_args=[ID])
self.assertEqual((ID, NAME,
DESCRIPTION, LOCATION,
SOFTWARE_VERSION, MANAGEMENT_STATE,
AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call[1])
subcloud_cmd.UnmanageSubcloud, app_args=[base.ID])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_unmanage_subcloud_without_subcloud_id(self):
self.assertRaises(SystemExit, self.call,
@ -333,19 +150,10 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
def test_manage_subcloud(self):
self.client.subcloud_manager.update_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
actual_call = self.call(
subcloud_cmd.ManageSubcloud, app_args=[ID])
self.assertEqual((ID, NAME,
DESCRIPTION, LOCATION,
SOFTWARE_VERSION, MANAGEMENT_STATE,
AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call[1])
subcloud_cmd.ManageSubcloud, app_args=[base.ID])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_manage_subcloud_without_subcloud_id(self):
self.assertRaises(SystemExit, self.call,
@ -353,10 +161,10 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
def test_update_subcloud(self):
self.client.subcloud_manager.update_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
actual_call = self.call(
subcloud_cmd.UpdateSubcloud,
app_args=[ID,
app_args=[base.ID,
'--description', 'subcloud description',
'--location', 'subcloud location',
'--sysadmin-password', 'testpassword',
@ -365,21 +173,12 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
'--management-start-ip', 'sc network start ip',
'--management-end-ip', 'subcloud network end ip',
'--bootstrap-address', 'subcloud bootstrap address'])
self.assertEqual((ID, NAME,
DESCRIPTION, LOCATION,
SOFTWARE_VERSION, MANAGEMENT_STATE,
AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS,
BACKUP_DATETIME), actual_call[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
@mock.patch('getpass.getpass', return_value='testpassword')
def test_success_reconfigure_subcloud(self, getpass):
SUBCLOUD_BEING_DEPLOYED = copy.copy(SUBCLOUD)
SUBCLOUD_BEING_DEPLOYED.deploy_status = DEPLOY_STATE_PRE_DEPLOY
SUBCLOUD_BEING_DEPLOYED = copy.copy(base.SUBCLOUD_RESOURCE)
SUBCLOUD_BEING_DEPLOYED.deploy_status = base.DEPLOY_STATE_PRE_DEPLOY
self.client.subcloud_manager.reconfigure_subcloud.\
return_value = [SUBCLOUD_BEING_DEPLOYED]
@ -387,23 +186,18 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
file_path = os.path.abspath(f.name)
actual_call = self.call(
subcloud_cmd.ReconfigSubcloud,
app_args=[ID,
app_args=[base.ID,
'--deploy-config', file_path])
self.assertEqual((ID, NAME,
DESCRIPTION, LOCATION,
SOFTWARE_VERSION, MANAGEMENT_STATE,
AVAILABILITY_STATUS, DEPLOY_STATE_PRE_DEPLOY,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call[1])
expected_result = list(base.SUBCLOUD_FIELD_RESULT_LIST)
expected_result[7] = base.DEPLOY_STATE_PRE_DEPLOY
self.assertEqual(tuple(expected_result), actual_call[1])
@mock.patch('getpass.getpass', return_value='testpassword')
def test_reconfigure_file_does_not_exist(self, getpass):
SUBCLOUD_BEING_DEPLOYED = copy.copy(SUBCLOUD)
SUBCLOUD_BEING_DEPLOYED.deploy_status = DEPLOY_STATE_PRE_DEPLOY
SUBCLOUD_BEING_DEPLOYED = copy.copy(base.SUBCLOUD_RESOURCE)
SUBCLOUD_BEING_DEPLOYED.deploy_status = base.DEPLOY_STATE_PRE_DEPLOY
self.client.subcloud_manager.reconfigure_subcloud.\
return_value = [SUBCLOUD_BEING_DEPLOYED]
@ -413,7 +207,7 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
e = self.assertRaises(DCManagerClientException,
self.call,
subcloud_cmd.ReconfigSubcloud,
app_args=[ID, '--deploy-config', file_path])
app_args=[base.ID, '--deploy-config', file_path])
self.assertTrue('deploy-config file does not exist'
in str(e))
@ -421,75 +215,54 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
@mock.patch('six.moves.input', return_value='reinstall')
def test_success_reinstall_subcloud(self, mock_input, getpass):
self.client.subcloud_manager.reinstall_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile() as f:
file_path = os.path.abspath(f.name)
actual_call = self.call(
subcloud_cmd.ReinstallSubcloud,
app_args=[ID, '--bootstrap-values', file_path])
self.assertEqual((ID, NAME,
DESCRIPTION, LOCATION,
SOFTWARE_VERSION, MANAGEMENT_STATE,
AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call[1])
app_args=[base.ID, '--bootstrap-values', file_path])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
@mock.patch('getpass.getpass', return_value='testpassword')
@mock.patch('six.moves.input', return_value='reinstall')
def test_reinstall_subcloud(self, mock_input, getpass):
self.client.subcloud_manager.reinstall_subcloud. \
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile(mode='w') as f:
yaml.dump(FAKE_BOOTSTRAP_VALUES, f)
yaml.dump(base.FAKE_BOOTSTRAP_VALUES, f)
file_path = os.path.abspath(f.name)
# Without "--release" parameter
actual_call1 = self.call(
subcloud_cmd.ReinstallSubcloud, app_args=[
ID,
base.ID,
'--bootstrap-values', file_path,
])
# With "--release" parameter
actual_call2 = self.call(
subcloud_cmd.ReinstallSubcloud, app_args=[
ID,
base.ID,
'--bootstrap-values', file_path,
'--release', SOFTWARE_VERSION,
'--release', base.SOFTWARE_VERSION,
])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call1[1])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS, BACKUP_DATETIME
), actual_call2[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call1[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call2[1])
@mock.patch('getpass.getpass', return_value='testpassword')
@mock.patch('six.moves.input', return_value='reinstall')
def test_reinstall_bootstrap_file_does_not_exist(
self, mock_input, getpass):
self.client.subcloud_manager.reinstall_subcloud.\
return_value = [SUBCLOUD]
return_value = [base.SUBCLOUD_RESOURCE]
with tempfile.NamedTemporaryFile() as f:
file_path = os.path.abspath(f.name)
e = self.assertRaises(DCManagerClientException,
self.call,
subcloud_cmd.ReinstallSubcloud,
app_args=[ID, '--bootstrap-values', file_path])
app_args=[base.ID,
'--bootstrap-values',
file_path])
self.assertTrue('bootstrap-values does not exist'
in str(e))
@ -501,51 +274,41 @@ class TestCLISubcloudManagerV1(base.BaseCommandTest):
e = self.assertRaises(DCManagerClientException,
self.call,
subcloud_cmd.RestoreSubcloud,
app_args=[ID, '--restore-values', file_path])
app_args=[base.ID,
'--restore-values',
file_path])
deprecation_msg = ('This command has been deprecated. Please use '
'subcloud-backup restore instead.')
self.assertTrue(deprecation_msg in str(e))
def test_prestage_with_subcloudID(self):
self.client.subcloud_manager.prestage_subcloud.return_value = \
[SUBCLOUD]
self.client.subcloud_manager.prestage_subcloud.\
return_value = [base.SUBCLOUD_RESOURCE]
actual_call_without_release = self.call(
subcloud_cmd.PrestageSubcloud,
app_args=[ID,
app_args=[base.ID,
'--sysadmin-password', 'testpassword',
'--force'])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS,
BACKUP_DATETIME), actual_call_without_release[1])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST,
actual_call_without_release[1])
def test_prestage_without_subcloudID(self):
self.assertRaises(SystemExit, self.call,
subcloud_cmd.PrestageSubcloud, app_args=[])
def test_prestage_with_release(self):
SUBCLOUD_WITH_ADDITIONAL_DETAIL = copy.copy(SUBCLOUD)
SUBCLOUD_WITH_ADDITIONAL_DETAIL = copy.copy(base.SUBCLOUD_RESOURCE)
SUBCLOUD_WITH_ADDITIONAL_DETAIL.prestage_software_version = \
SOFTWARE_VERSION
base.SOFTWARE_VERSION
self.client.subcloud_manager.prestage_subcloud.return_value = \
[SUBCLOUD_WITH_ADDITIONAL_DETAIL]
actual_call_with_release = self.call(
subcloud_cmd.PrestageSubcloud,
app_args=[ID,
app_args=[base.ID,
'--sysadmin-password', 'testpassword',
'--force',
'--release', SOFTWARE_VERSION])
self.assertEqual((ID, NAME, DESCRIPTION, LOCATION, SOFTWARE_VERSION,
MANAGEMENT_STATE, AVAILABILITY_STATUS, DEPLOY_STATUS,
MANAGEMENT_SUBNET, MANAGEMENT_START_IP,
MANAGEMENT_END_IP, MANAGEMENT_GATEWAY_IP,
SYSTEMCONTROLLER_GATEWAY_IP,
DEFAULT_SUBCLOUD_GROUP_ID,
TIME_NOW, TIME_NOW, BACKUP_STATUS,
BACKUP_DATETIME, SOFTWARE_VERSION),
'--release', base.SOFTWARE_VERSION])
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST +
(base.SOFTWARE_VERSION,),
actual_call_with_release[1])

View File

@ -1,7 +1,7 @@
# Copyright 2016 - Ericsson AB
# Copyright 2015 - Huawei Technologies Co. Ltd
# Copyright 2015 - StackStorm, Inc.
# Copyright (c) 2017-2022 Wind River Systems, Inc.
# Copyright (c) 2017-2023 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -102,3 +102,63 @@ def prompt_for_password(password_type='sysadmin'):
"\nPassword prompt interrupted."
)
return password
def subcloud_detail_format(subcloud=None):
columns = (
'id',
'name',
'description',
'location',
'software_version',
'management',
'availability',
'deploy_status',
'management_subnet',
'management_start_ip',
'management_end_ip',
'management_gateway_ip',
'systemcontroller_gateway_ip',
'group_id',
'created_at',
'updated_at',
'backup_status',
'backup_datetime'
)
if subcloud:
data = (
subcloud.subcloud_id,
subcloud.name,
subcloud.description,
subcloud.location,
subcloud.software_version,
subcloud.management_state,
subcloud.availability_status,
subcloud.deploy_status,
subcloud.management_subnet,
subcloud.management_start_ip,
subcloud.management_end_ip,
subcloud.management_gateway_ip,
subcloud.systemcontroller_gateway_ip,
subcloud.group_id,
subcloud.created_at,
subcloud.updated_at,
subcloud.backup_status,
subcloud.backup_datetime
)
for _listitem, sync_status in enumerate(subcloud.endpoint_sync_status):
added_field = (sync_status['endpoint_type'] +
"_sync_status",)
added_value = (sync_status['sync_status'],)
columns += tuple(added_field)
data += tuple(added_value)
if subcloud.oam_floating_ip != "unavailable":
columns += ('oam_floating_ip',)
data += (subcloud.oam_floating_ip,)
else:
data = (('<none>',) * len(columns),)
return columns, data