distcloud-client/distributedcloud-client/dcmanagerclient/tests/v1/test_subcloud_backup_manage...

666 lines
20 KiB
Python

#
# Copyright (c) 2022-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import base64
import os
import mock
from dcmanagerclient.commands.v1 import (
subcloud_backup_manager as subcloud_backup_cmd,
)
from dcmanagerclient.exceptions import DCManagerClientException
from dcmanagerclient.tests import base
OVERRIDE_VALUES = """---
platform_backup_filename_prefix: test
openstack_app_name: test
backup_dir: test
"""
class TestCLISubcloudBackUpManagerV1(base.BaseCommandTest):
def setUp(self):
super().setUp()
self.client = self.app.client_manager.subcloud_backup_manager
def test_backup_create_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--local-only",
"--registry-images",
"--backup-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_create_group(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--group",
"test",
"--backup-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_create_group_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = []
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--group",
"test",
"--local-only",
"--backup-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
(
"The command only applies to a single subcloud or a"
" subcloud group, not both."
)
in str(e)
)
def test_backup_create_no_group_no_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = []
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--local-only",
"--backup-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
("Please provide the subcloud or subcloud group name" " or id.")
in str(e)
)
def test_backup_create_backup_value_not_a_file(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = []
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--local-only",
"--backup-values",
"notADirectory",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue("Backup-values file does not exist" in str(e))
@mock.patch("getpass.getpass", return_value="testpassword")
def test_backup_create_prompt_ask_for_password(self, _mock_getpass):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--group",
"test",
"--local-only",
"--backup-values",
backup_path,
],
)
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_create_local_only_registry_images(self):
self.client.subcloud_backup_manager.backup_subcloud_create.return_value = []
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.CreateSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--registry-images",
"--backup-values",
"notADirectory",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
(
"Option --registry-images can not be used without "
"--local-only option."
)
in str(e)
)
def test_backup_delete_no_group_no_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_delete.return_value = []
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.DeleteSubcloudBackup,
app_args=[
"release",
"--local-only",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
("Please provide the subcloud or subcloud group" " name or id.")
in str(e)
)
def test_backup_delete_group_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_delete.return_value = []
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.DeleteSubcloudBackup,
app_args=[
"release",
"--subcloud",
"subcloud1",
"--group",
"group1",
"--local-only",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
(
"This command only applies to a single subcloud "
"or a subcloud group, not both."
)
in str(e)
)
def test_backup_delete_group(self):
group_name = "test_group_1"
release_version = "release_version_2"
password = "testpassword"
encoded_password = base64.b64encode(password.encode("utf-8")).decode("utf-8")
payload = {
"release": release_version,
"group": group_name,
"local_only": "true",
"sysadmin_password": encoded_password,
}
app_args = [
release_version,
"--group",
group_name,
"--local-only",
"--sysadmin-password",
password,
]
self.call(subcloud_backup_cmd.DeleteSubcloudBackup, app_args=app_args)
subcloud_delete = self.client.subcloud_backup_manager.backup_subcloud_delete
subcloud_delete.assert_called_once_with(
data=payload, release_version=release_version, subcloud_ref=None
)
def test_backup_delete_subcloud(self):
subcloud_name = "subcloud1"
release_version = "release_version_2"
password = "testpassword"
encoded_password = base64.b64encode(password.encode("utf-8")).decode("utf-8")
payload = {
"release": release_version,
"subcloud": subcloud_name,
"local_only": "true",
"sysadmin_password": encoded_password,
}
app_args = [
release_version,
"--subcloud",
subcloud_name,
"--local-only",
"--sysadmin-password",
password,
]
self.call(subcloud_backup_cmd.DeleteSubcloudBackup, app_args=app_args)
subcloud_delete = self.client.subcloud_backup_manager.backup_subcloud_delete
subcloud_delete.assert_called_once_with(
data=payload, release_version=release_version, subcloud_ref=subcloud_name
)
def test_backup_delete_no_local_only(self):
group_name = "test_group_1"
release_version = "release_version_2"
password = "testpassword"
encoded_password = base64.b64encode(password.encode("utf-8")).decode("utf-8")
payload = {
"release": release_version,
"group": group_name,
"local_only": "false",
"sysadmin_password": encoded_password,
}
app_args = [
release_version,
"--group",
group_name,
"--sysadmin-password",
password,
]
self.call(subcloud_backup_cmd.DeleteSubcloudBackup, app_args=app_args)
subcloud_delete = self.client.subcloud_backup_manager.backup_subcloud_delete
subcloud_delete.assert_called_once_with(
data=payload, release_version=release_version, subcloud_ref=None
)
@mock.patch("getpass.getpass", return_value="testpassword")
def test_backup_delete_prompt_ask_for_password(self, _mock_getpass):
group_name = "test_group_1"
release_version = "release_version_2"
password = "testpassword"
encoded_password = base64.b64encode(password.encode("utf-8")).decode("utf-8")
payload = {
"release": release_version,
"group": group_name,
"local_only": "true",
"sysadmin_password": encoded_password,
}
app_args = [release_version, "--group", group_name, "--local-only"]
self.call(subcloud_backup_cmd.DeleteSubcloudBackup, app_args=app_args)
subcloud_delete = self.client.subcloud_backup_manager.backup_subcloud_delete
subcloud_delete.assert_called_once_with(
data=payload, release_version=release_version, subcloud_ref=None
)
def test_backup_delete_subcloud_no_release_version(self):
subcloud_name = "subcloud1"
password = "testpassword"
app_args = [
"--subcloud",
subcloud_name,
"--local-only",
"--sysadmin-password",
password,
]
self.assertRaises(
SystemExit,
self.call,
subcloud_backup_cmd.DeleteSubcloudBackup,
app_args=app_args,
)
def test_backup_restore(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--local-only",
"--registry-images",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_no_restore_values(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--local-only",
"--registry-images",
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_with_group(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--group",
"test",
"--with-install",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_restore_group_and_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = []
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--group",
"test",
"--local-only",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
(
"The command only applies to a single subcloud or a"
" subcloud group, not both."
)
in str(e)
)
def test_backup_restore_no_group_and_no_subcloud(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = []
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--local-only",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
("Please provide the subcloud or subcloud group name" " or id.")
in str(e)
)
def test_backup_restore_backup_value_not_a_file(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = []
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--local-only",
"--restore-values",
"notADirectory",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue("restore_values file does not exist" in str(e))
@mock.patch("getpass.getpass", return_value="testpassword")
def test_backup_restore_prompt_ask_for_password(self, _mock_getpass):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--group",
"test",
"--local-only",
"--restore-values",
backup_path,
],
)
self.assertEqual([base.SUBCLOUD_FIELD_RESULT_LIST], actual_call[1])
def test_backup_restore_local_only_registry_images(self):
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--registry-images",
"--restore-values",
"notADirectory",
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
(
"Option --registry-images cannot be used without "
"--local-only option."
)
in str(e)
)
def test_backup_restore_with_install_no_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--with-install",
"--local-only",
"--registry-images",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_with_install_with_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
actual_call = self.call(
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--with-install",
"--release",
base.SOFTWARE_VERSION,
"--local-only",
"--registry-images",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertEqual(base.SUBCLOUD_FIELD_RESULT_LIST, actual_call[1])
def test_backup_restore_no_install_with_release(self):
self.client.subcloud_backup_manager.backup_subcloud_restore.return_value = [
base.SUBCLOUD_RESOURCE
]
backup_path = os.path.normpath(os.path.join(os.getcwd(), "test.yaml"))
with open(backup_path, mode="w", encoding="UTF-8") as f:
f.write(OVERRIDE_VALUES)
e = self.assertRaises(
DCManagerClientException,
self.call,
subcloud_backup_cmd.RestoreSubcloudBackup,
app_args=[
"--subcloud",
"subcloud1",
"--release",
base.SOFTWARE_VERSION,
"--local-only",
"--registry-images",
"--restore-values",
backup_path,
"--sysadmin-password",
"testpassword",
],
)
self.assertTrue(
("Option --release cannot be used without " "--with-install option.")
in str(e)
)