distcloud/distributedcloud/dcmanager/tests/unit/api/v1/controllers/test_subcloud_backup.py

1311 lines
60 KiB
Python

#
# Copyright (c) 2022-2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from oslo_utils import timeutils
import base64
import copy
import mock
import six
import webtest
from dccommon import consts as dccommon_consts
from dcmanager.common import consts
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.rpc import client as rpc_client
from dcmanager.tests.unit.api import test_root_controller as testroot
from dcmanager.tests.unit.common import fake_subcloud
from dcmanager.tests import utils
FAKE_TENANT = utils.UUID1
FAKE_URL_CREATE = '/v1.0/subcloud-backup'
FAKE_URL_DELETE = '/v1.0/subcloud-backup/delete/'
FAKE_URL_RESTORE = '/v1.0/subcloud-backup/restore'
FAKE_HEADERS = {'X-Tenant-Id': FAKE_TENANT, 'X_ROLE': 'admin,member,reader',
'X-Identity-Status': 'Confirmed', 'X-Project-Name': 'admin',
'Content-Type': 'json'}
FAKE_GOOD_SYSTEM_HEALTH = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"No alarms: [Fail]\n"
"[1] alarms found, [0] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_GOOD_SYSTEM_HEALTH_NO_ALARMS = \
("System Health:"
"All hosts are provisioned: [OK]"
"All hosts are unlocked/enabled: [OK]"
"All hosts have current configurations: [OK]"
"All hosts are patch current: [OK]"
"No alarms: [OK]"
"All kubernetes nodes are ready: [OK]"
"All kubernetes control plane pods are ready: [OK]")
FAKE_SYSTEM_HEALTH_CEPH_FAIL = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_SYSTEM_HEALTH_MGMT_ALARM = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
FAKE_SYSTEM_HEALTH_K8S_FAIL = \
("System Health:\n"
"All hosts are provisioned: [OK]\n"
"All hosts are unlocked/enabled: [OK]\n"
"All hosts have current configurations: [OK]\n"
"All hosts are patch current: [OK]\n"
"Ceph Storage Healthy: [Fail]\n"
"No alarms: [Fail]\n"
"[2] alarms found, [2] of which are management affecting\n"
"All kubernetes nodes are ready: [OK]\n"
"All kubernetes control plane pods are ready: [OK]\n")
class TestSubcloudCreate(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudCreate, self).setUp()
self.ctx = utils.dummy_context()
p = mock.patch.object(rpc_client, 'SubcloudStateClient')
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud(self, mock_rpc_client, mock_sysinv,
mock_openstack):
mock_rpc_client().backup_subclouds.return_value = True
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
good_health_states = [FAKE_GOOD_SYSTEM_HEALTH,
FAKE_GOOD_SYSTEM_HEALTH_NO_ALARMS]
for system_health in good_health_states:
mock_sysinv().get_system_health.return_value = system_health
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_with_bad_system_health(
self, mock_rpc_client, mock_sysinv, mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
mock_rpc_client().backup_subclouds.return_value = True
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1'}
bad_health_states = [FAKE_SYSTEM_HEALTH_MGMT_ALARM, FAKE_SYSTEM_HEALTH_CEPH_FAIL,
FAKE_SYSTEM_HEALTH_K8S_FAIL]
for system_health in bad_health_states:
mock_sysinv().get_system_health.return_value = system_health
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_unknown_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '123'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_offline_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_unmanaged_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_invalid_state(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
deploy_status=consts.DEPLOY_STATE_BOOTSTRAPPING)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().backup_subclouds.return_value = True
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_unknown_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': 'Fake'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_group_not_online(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_group_not_managed(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_group_no_valid_state(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN,
deploy_status=consts.DEPLOY_STATE_BOOTSTRAPPING)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'group': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_backup_values(self, mock_rpc_client, mock_sysinv, mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'backup_values': 'TestFileDirectory'}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_no_password(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
data = {'subcloud': '1'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_local_only(self, mock_rpc_client, mock_sysinv,
mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'local_only': 'True'}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_local_only_registry_images(
self, mock_rpc_client, mock_sysinv, mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'local_only': 'True',
'registry_images': 'True'}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_no_local_only_registry_images(
self, mock_rpc_client, mock_sysinv, mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'registry_images': 'True'}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_unknown_parameter(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'unkown_variable': 'FakeValue'}
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_invalid_payload_format(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
data = 'WrongFormat'
mock_rpc_client().backup_subclouds.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_create_subcloud_json_file(self, mock_rpc_client, mock_sysinv,
mock_openstack):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().backup_subclouds.return_value = True
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch('dcmanager.common.utils.OpenStackDriver')
@mock.patch('dcmanager.common.utils.SysinvClient')
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_concurrent_backup(self, mock_rpc_client, mock_sysinv,
mock_openstack):
mock_sysinv().get_system_health.return_value = FAKE_GOOD_SYSTEM_HEALTH
mock_rpc_client().backup_subclouds.return_value = True
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
db_api.subcloud_update(self.ctx,
subcloud.id,
deploy_status=consts.DEPLOY_STATE_DONE,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED)
ongoing_backup_states = [consts.BACKUP_STATE_INITIAL,
consts.BACKUP_STATE_VALIDATING,
consts.BACKUP_STATE_PRE_BACKUP,
consts.BACKUP_STATE_IN_PROGRESS]
final_backup_states = [consts.BACKUP_STATE_VALIDATE_FAILED,
consts.BACKUP_STATE_PREP_FAILED,
consts.BACKUP_STATE_FAILED,
consts.BACKUP_STATE_UNKNOWN,
consts.BACKUP_STATE_COMPLETE_CENTRAL,
consts.BACKUP_STATE_COMPLETE_LOCAL,
consts.BACKUP_STATE_IN_PROGRESS]
for backup_state in ongoing_backup_states + final_backup_states:
db_api.subcloud_update(self.ctx, subcloud.id,
backup_status=backup_state)
if backup_state in ongoing_backup_states:
# Expect the operation to fail
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL_CREATE,
headers=FAKE_HEADERS, params=data)
else:
# Expect the operation to succeed
response = self.app.post_json(FAKE_URL_CREATE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
class TestSubcloudDelete(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudDelete, self).setUp()
self.ctx = utils.dummy_context()
p = mock.patch.object(rpc_client, 'SubcloudStateClient')
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_unknown_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_OFFLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '999'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def _test_backup_delete_subcloud_unmanaged(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'group': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
HEADER = copy.copy(FAKE_HEADERS)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'group': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=HEADER, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_invalid_url(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
invalid_url = '/v1.0/subcloud-backup/fake/'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.patch_json, invalid_url,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_no_release_version(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_DELETE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_no_content(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().delete_subcloud_backups.return_value = None
response = self.app.patch_json(FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 204)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_exception(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1'}
mock_rpc_client().delete_subcloud_backups.side_effect = Exception()
six.assertRaisesRegex(self, webtest.app.AppError, "500 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_local_only(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'local_only': 'True'}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_central(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_CENTRAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'local_only': 'False'}
mock_rpc_client().delete_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 207)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_invalid_local_only(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'local_only': 'Fake'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_delete_local_only_unknown_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=timeutils.utcnow(),
backup_status=consts.BACKUP_STATE_COMPLETE_LOCAL)
release_version = '22.12'
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '123',
'local_only': 'True'}
mock_rpc_client().delete_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.patch_json, FAKE_URL_DELETE + release_version,
headers=FAKE_HEADERS, params=data)
class TestSubcloudRestore(testroot.DCManagerApiTest):
def setUp(self):
super(TestSubcloudRestore, self).setUp()
self.ctx = utils.dummy_context()
p = mock.patch.object(rpc_client, 'SubcloudStateClient')
self.mock_rpc_state_client = p.start()
self.addCleanup(p.stop)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1'}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_unknown_subcloud(self, mock_rpc_client):
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '999'}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_group(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(self.ctx, group_id=test_group_id)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'group': str(test_group_id)}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_group_single_valid_subcloud(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(self.ctx, group_id=test_group_id)
subcloud2 = fake_subcloud.create_fake_subcloud(self.ctx, group_id=test_group_id, name='subcloud2')
# Valid subcloud, management state is 'unmanaged'
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
# Invalid subcloud, management state is 'managed'
db_api.subcloud_update(self.ctx,
subcloud2.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'group': str(test_group_id)}
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_unknown_subcloud_group(self, mock_rpc_client):
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'group': '123'}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "404 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_no_payload(self, mock_rpc_client):
test_group_id = 1
subcloud = fake_subcloud.create_fake_subcloud(self.ctx, group_id=test_group_id)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_no_local_only_registry_images(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1',
'local_only': 'false', 'registry_images': 'true'}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_managed(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1'}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_invalid_deploy_states(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1'}
mock_rpc_client().restore_subcloud_backups.return_value = True
for status in consts.INVALID_DEPLOY_STATES_FOR_RESTORE:
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_MANAGED,
deploy_status=status,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_and_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password, 'subcloud': '1', 'group': '1'}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_no_subcloud_no_group(self, mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
backup_datetime=None,
backup_status=consts.BACKUP_STATE_UNKNOWN)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
@mock.patch('os.path.isdir')
@mock.patch('os.listdir')
def test_backup_restore_subcloud_with_install_no_release(self,
mock_listdir,
mock_isdir,
mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data_install = str(fake_subcloud.FAKE_SUBCLOUD_INSTALL_VALUES).replace('\'', '"')
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
data_install=data_install)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'with_install': 'True'
}
mock_isdir.return_value = True
mock_listdir.return_value = ['test.iso', 'test.sig']
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
@mock.patch('os.path.isdir')
@mock.patch('os.listdir')
def test_backup_restore_subcloud_with_install_with_release(self,
mock_listdir,
mock_isdir,
mock_rpc_client):
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
data_install = str(fake_subcloud.FAKE_SUBCLOUD_INSTALL_VALUES).replace('\'', '"')
db_api.subcloud_update(self.ctx,
subcloud.id,
availability_status=dccommon_consts.AVAILABILITY_ONLINE,
management_state=dccommon_consts.MANAGEMENT_UNMANAGED,
data_install=data_install)
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'with_install': 'True',
'release': '22.12'
}
mock_isdir.return_value = True
mock_listdir.return_value = ['test.iso', 'test.sig']
mock_rpc_client().restore_subcloud_backups.return_value = True
response = self.app.patch_json(FAKE_URL_RESTORE,
headers=FAKE_HEADERS,
params=data)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_backup_restore_subcloud_no_install_with_release(self, mock_rpc_client):
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'release': '22.12'
}
mock_rpc_client().restore_subcloud_backups.return_value = True
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerClient')
@mock.patch('dcmanager.common.utils.get_matching_iso')
def test_backup_restore_subcloud_invalid_release(self,
mock_rpc_client,
mock_matching_iso):
fake_password = (base64.b64encode('testpass'.encode("utf-8"))).decode('ascii')
data = {'sysadmin_password': fake_password,
'subcloud': '1',
'release': '00.00'
}
mock_rpc_client().restore_subcloud_backups.return_value = True
mock_matching_iso.return_value = [None, True]
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.patch_json, FAKE_URL_RESTORE,
headers=FAKE_HEADERS, params=data)