distcloud/distributedcloud/dcmanager/tests/unit/api/v1/controllers/test_subcloud_backup.py

1630 lines
55 KiB
Python

#
# Copyright (c) 2022-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import base64
import copy
import mock
from oslo_utils import timeutils
import six
import webtest
from dccommon import consts as dccommon_consts
from dcmanager.common import consts
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.rpc import client as rpc_client
from dcmanager.tests import base
from dcmanager.tests.unit.api import test_root_controller as testroot
from dcmanager.tests.unit.common import fake_subcloud
from dcmanager.tests import utils
FAKE_TENANT = utils.UUID1
FAKE_URL_CREATE = '/v1.0/subcloud-backup'
FAKE_URL_DELETE = '/v1.0/subcloud-backup/delete/'
FAKE_URL_RESTORE = '/v1.0/subcloud-backup/restore'
FAKE_HEADERS = {'X-Tenant-Id': FAKE_TENANT, 'X_ROLE': 'admin,member,reader',
'X-Identity-Status': 'Confirmed', 'X-Project-Name': 'admin',
'Content-Type': 'json'}
FAKE_GOOD_SYSTEM_HEALTH = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"No alarms: [Fail]\n"
"[1] alarms found, [0] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_GOOD_SYSTEM_HEALTH_NO_ALARMS = \
("System Health:"
"All hosts are provisioned: [OK]"
"All hosts are unlocked/enabled: [OK]"
"All hosts have current configurations: [OK]"
"All hosts are patch current: [OK]"
"No alarms: [OK]"
"All kubernetes nodes are ready: [OK]"
"All kubernetes control plane pods are ready: [OK]")
FAKE_SYSTEM_HEALTH_CEPH_FAIL = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_SYSTEM_HEALTH_MGMT_ALARM = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_SYSTEM_HEALTH_K8S_FAIL = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_RESTORE_VALUES_INVALID_IP = {
'bootstrap_address': {
'subcloud1': '10.10.20.12.22'
}
}
FAKE_RESTORE_VALUES_VALID_IP = {
'bootstrap_address': {
'subcloud1': '10.10.20.12'
}
}
class TestSubcloudCreate(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudCreate, self).setUp()
self.ctx = utils.dummy_context()
p = mock.patch.object(rpc_client, "SubcloudStateClient")
self.fake_password = base64.b64encode("testpass".encode("utf-8")).decode(
"ascii"
)
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
mock_rpc_client().backup_subclouds.return_value = True
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
good_health_states = [
FAKE_GOOD_SYSTEM_HEALTH,
FAKE_GOOD_SYSTEM_HEALTH_NO_ALARMS,
]
for system_health in good_health_states:
mock_sysinv().get_system_health.return_value = system_health
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_with_bad_system_health(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
mock_rpc_client().backup_subclouds.return_value = True
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
bad_health_states = [
FAKE_SYSTEM_HEALTH_MGMT_ALARM,
FAKE_SYSTEM_HEALTH_CEPH_FAIL,
FAKE_SYSTEM_HEALTH_K8S_FAIL,
]
for system_health in bad_health_states:
mock_sysinv().get_system_health.return_value = system_health
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_unknown_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "123"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_offline_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_unmanaged_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_invalid_state(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
deploy_status=consts.DEPLOY_STATE_BOOTSTRAPPING,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().backup_subclouds.return_value = True
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_unknown_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": "Fake"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_group_not_online(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_group_not_managed(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_group_no_valid_state(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
deploy_status=consts.DEPLOY_STATE_BOOTSTRAPPING,
)
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"group": "1",
}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_backup_values(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"backup_values": "TestFileDirectory",
}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_no_password(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"subcloud": "1"}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_local_only(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "True",
}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_local_only_registry_images(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "True",
"registry_images": "True",
}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_no_local_only_registry_images(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"registry_images": "True",
}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_unknown_parameter(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"unkown_variable": "FakeValue",
}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_invalid_payload_format(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = "WrongFormat"
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_create_subcloud_json_file(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch("dcmanager.common.utils.OpenStackDriver")
@mock.patch("dcmanager.common.utils.SysinvClient")
@mock.patch.object(rpc_client, "ManagerClient")
def test_create_concurrent_backup(
self, mock_rpc_client, mock_sysinv, mock_openstack
):
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
mock_rpc_client().backup_subclouds.return_value = True
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
db_api.subcloud_update(
self.ctx,
subcloud.id,
deploy_status=consts.DEPLOY_STATE_DONE,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
)
ongoing_backup_states = [
consts.BACKUP_STATE_INITIAL,
consts.BACKUP_STATE_VALIDATING,
consts.BACKUP_STATE_PRE_BACKUP,
consts.BACKUP_STATE_IN_PROGRESS,
]
final_backup_states = [
consts.BACKUP_STATE_VALIDATE_FAILED,
consts.BACKUP_STATE_PREP_FAILED,
consts.BACKUP_STATE_FAILED,
consts.BACKUP_STATE_UNKNOWN,
consts.BACKUP_STATE_COMPLETE_CENTRAL,
consts.BACKUP_STATE_COMPLETE_LOCAL,
consts.BACKUP_STATE_IN_PROGRESS,
]
for backup_state in ongoing_backup_states + final_backup_states:
db_api.subcloud_update(self.ctx, subcloud.id, backup_status=backup_state)
if backup_state in ongoing_backup_states:
# Expect the operation to fail
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.post_json,
FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data,
)
else:
# Expect the operation to succeed
response = self.app.post_json(
FAKE_URL_CREATE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
class TestSubcloudDelete(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudDelete, self).setUp()
self.ctx = utils.dummy_context()
self.fake_password = base64.b64encode("testpass".encode("utf-8")).decode(
"ascii"
)
p = mock.patch.object(rpc_client, "SubcloudStateClient")
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_DELETE + release_version, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_unknown_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "group": "999"}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def _test_backup_delete_subcloud_unmanaged(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "group": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_DELETE + release_version, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
HEADER = copy.copy(FAKE_HEADERS)
release_version = "22.12"
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"group": "1",
}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=HEADER,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_invalid_url(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
invalid_url = "/v1.0/subcloud-backup/fake/"
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.patch_json,
invalid_url,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_no_release_version(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_DELETE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_no_content(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().delete_subcloud_backups.return_value = None
response = self.app.patch_json(
FAKE_URL_DELETE + release_version, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 204)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_exception(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().delete_subcloud_backups.side_effect = Exception()
six.assertRaisesRegex(
self,
webtest.app.AppError,
"500 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_local_only(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL,
)
release_version = "22.12"
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "True",
}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_DELETE + release_version, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_central(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL,
)
release_version = "22.12"
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "False",
}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_DELETE + release_version, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_invalid_local_only(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL,
)
release_version = "22.12"
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "Fake",
}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_delete_local_only_unknown_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL,
)
release_version = "22.12"
data = {
"sysadmin_password": self.fake_password,
"subcloud": "123",
"local_only": "True",
}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.patch_json,
FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data,
)
class TestSubcloudRestore(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudRestore, self).setUp()
self.ctx = utils.dummy_context()
self.fake_password = base64.b64encode("testpass".encode("utf-8")).decode(
"ascii"
)
p = mock.patch.object(rpc_client, "SubcloudStateClient")
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_RESTORE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_valid_restore_values(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx, subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = \
(base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1',
'restore_values': FAKE_RESTORE_VALUES_VALID_IP}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_invalid_restore_values(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx, subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = \
(base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1',
'restore_values': FAKE_RESTORE_VALUES_INVALID_IP}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_unknown_subcloud(self, mock_rpc_client):
data = {"sysadmin_password": self.fake_password, "subcloud": "999"}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_subcloud_group(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(
self.ctx, group_id=test_group_id
)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": str(test_group_id)}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_RESTORE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_group_single_valid_subcloud(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(
self.ctx, group_id=test_group_id
)
subcloud2 = fake_subcloud.create_fake_subcloud(
self.ctx, group_id=test_group_id, name=base.SUBCLOUD_2['name'],
region_name=base.SUBCLOUD_2['region_name']
)
# Valid subcloud, management state is 'unmanaged'
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
# Invalid subcloud, management state is 'managed'
db_api.subcloud_update(
self.ctx,
subcloud2.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "group": str(test_group_id)}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_RESTORE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_unknown_subcloud_group(self, mock_rpc_client):
data = {"sysadmin_password": self.fake_password, "group": "123"}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"404 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_no_payload(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(
self.ctx, group_id=test_group_id
)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_no_local_only_registry_images(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"local_only": "false",
"registry_images": "true",
}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_subcloud_managed(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_subcloud_invalid_deploy_states(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data = {"sysadmin_password": self.fake_password, "subcloud": "1"}
mock_rpc_client().restore_subcloud_backups.return_value = True
for status in consts.INVALID_DEPLOY_STATES_FOR_RESTORE:
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
deploy_status=status,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"group": "1",
}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
)
data = {"sysadmin_password": self.fake_password}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
@mock.patch("os.path.isdir")
@mock.patch("os.listdir")
def test_backup_restore_subcloud_with_install_no_release(
self, mock_listdir, mock_isdir, mock_rpc_client
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data_install = str(fake_subcloud.FAKE_SUBCLOUD_INSTALL_VALUES).replace(
"'", '"'
)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
data_install=data_install,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"with_install": "True",
}
mock_isdir.return_value = True
mock_listdir.return_value = ["test.iso", "test.sig"]
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(
FAKE_URL_RESTORE, headers=FAKE_HEADERS, params=data
)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
@mock.patch("os.path.isdir")
@mock.patch("os.listdir")
def test_backup_restore_subcloud_with_install_with_release(
self, mock_listdir, mock_isdir, mock_rpc_client
):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data_install = str(fake_subcloud.FAKE_SUBCLOUD_INSTALL_VALUES).replace(
"'", '"'
)
db_api.subcloud_update(
self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
data_install=data_install,
)
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"with_install": "True",
"release": "22.12",
}
mock_isdir.return_value = True
mock_listdir.return_value = ["test.iso", "test.sig"]
mock_rpc_client().restore_subcloud_backups.return_value = True
with mock.patch('builtins.open',
mock.mock_open(
read_data=fake_subcloud.FAKE_UPGRADES_METADATA
)):
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, "ManagerClient")
def test_backup_restore_subcloud_no_install_with_release(self, mock_rpc_client):
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"release": "22.12",
}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)
@mock.patch.object(rpc_client, "ManagerClient")
@mock.patch("dcmanager.common.utils.get_matching_iso")
def test_backup_restore_subcloud_invalid_release(
self, mock_rpc_client, mock_matching_iso
):
data = {
"sysadmin_password": self.fake_password,
"subcloud": "1",
"release": "00.00",
}
mock_rpc_client().restore_subcloud_backups.return_value = True
mock_matching_iso.return_value = [None, True]
six.assertRaisesRegex(
self,
webtest.app.AppError,
"400 *",
self.app.patch_json,
FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data,
)