Improve unit test coverage for dcmanager's APIs (sw_update_strategy)

Improves unit test coverage for dcmanager's sw_update_strategy
API from 76% to 98%.

Test plan:
  All of the tests were created taking into account the
output of 'tox -c tox.ini -e cover' command

Story: 2007082
Task: 49652

Change-Id: I1d403abcaf503cccfed2b8242703f29fbb26844f
Signed-off-by: rlima <Raphael.Lima@windriver.com>
This commit is contained in:
rlima 2024-03-01 09:35:39 -03:00 committed by Raphael Lima
parent 2fe10ca74d
commit b82d5886cd
2 changed files with 612 additions and 276 deletions

View File

@ -27,7 +27,7 @@ from pecan.testing import load_test_app
from dcmanager.api import api_config
from dcmanager.common import config
from dcmanager.tests import base
from dcmanager.tests.base import DCManagerTestCase
from dcmanager.tests.unit.common import consts as test_consts
from dcmanager.tests import utils
@ -36,7 +36,7 @@ OPT_GROUP_NAME = 'keystone_authtoken'
cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token")
class DCManagerApiTest(base.DCManagerTestCase):
class DCManagerApiTest(DCManagerTestCase):
def setUp(self):
super().setUp()

View File

@ -1,5 +1,5 @@
# Copyright (c) 2017 Ericsson AB
# Copyright (c) 2017-2022 Wind River Systems, Inc.
# Copyright (c) 2017-2022, 2024 Wind River Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -15,317 +15,653 @@
# under the License.
#
import copy
import http.client
import mock
import six
import webtest
from oslo_messaging import RemoteError
from dccommon import consts as dccommon_consts
from dcmanager.common import consts
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.orchestrator import rpcapi as rpc_client
from dcmanager.tests.unit.api import test_root_controller as testroot
from dcmanager.tests.unit.api.test_root_controller import DCManagerApiTest
from dcmanager.tests.unit.common import fake_strategy
from dcmanager.tests.unit.common import fake_subcloud
from dcmanager.tests import utils
FAKE_TENANT = utils.UUID1
FAKE_ID = '1'
FAKE_URL = '/v1.0/sw-update-strategy'
FAKE_HEADERS = {'X-Tenant-Id': FAKE_TENANT, 'X_ROLE': 'admin,member,reader',
'X-Identity-Status': 'Confirmed', 'X-Project-Name': 'admin'}
FAKE_SW_UPDATE_DATA = {
"type": consts.SW_UPDATE_TYPE_PATCH,
"subcloud-apply-type": consts.SUBCLOUD_APPLY_TYPE_PARALLEL,
"max-parallel-subclouds": "10",
"stop-on-failure": "true"
}
FAKE_SW_UPDATE_APPLY_DATA = {
"action": consts.SW_UPDATE_ACTION_APPLY
}
FAKE_SW_UPDATE_ABORT_DATA = {
"action": consts.SW_UPDATE_ACTION_ABORT
}
class TestSwUpdateStrategy(testroot.DCManagerApiTest):
class BaseTestSwUpdateStrategyController(DCManagerApiTest):
"""Base class for testing the SwUpdateStrategyController"""
def setUp(self):
super(TestSwUpdateStrategy, self).setUp()
self.ctx = utils.dummy_context()
super().setUp()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update(self, mock_rpc_client):
data = FAKE_SW_UPDATE_DATA
mock_rpc_client().create_sw_update_strategy.return_value = True
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().create_sw_update_strategy.assert_called_once_with(
mock.ANY,
data)
self.assertEqual(response.status_int, 200)
self.url = "/v1.0/sw-update-strategy"
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_with_force_option(self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["force"] = "true"
data["cloud_name"] = "subcloud1"
mock_rpc_client().create_sw_update_strategy.return_value = True
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().create_sw_update_strategy.assert_called_once_with(
mock.ANY,
data)
self.assertEqual(response.status_int, 200)
self._mock_rpc_orchestrator_client()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_bad_type(self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["type"] = "bad type"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
def _mock_rpc_orchestrator_client(self):
"""Mock rpc's manager orchestrator client"""
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_bad_apply_type(self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["subcloud-apply-type"] = "bad type"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
mock_patch = mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
self.mock_rpc_orchestrator_client = mock_patch.start()
self.addCleanup(mock_patch.stop)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_bad_max_parallel(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["max-parallel-subclouds"] = "not an integer"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_invalid_stop_on_failure_type(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["stop-on-failure"] = "not an boolean"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
class TestSwUpdateStrategyController(BaseTestSwUpdateStrategyController):
"""Test class for SwUpdateStrategyController"""
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_invalid_force_type(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["force"] = "not an boolean"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
def setUp(self):
super().setUp()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_valid_force_type_missing_cloud_name(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["force"] = "true"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
def test_unmapped_method(self):
"""Test requesting an unmapped method results in success with null content"""
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_group_name_or_id_not_exists(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
del data["subcloud-apply-type"]
del data["max-parallel-subclouds"]
data["subcloud_group"] = "fake_group"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
self.method = self.app.put
data["subcloud_group"] = "100"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
response = self._send_request()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_with_cloud_name_and_group_id(
self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
del data["subcloud-apply-type"]
del data["max-parallel-subclouds"]
self._assert_response(response)
self.assertEqual(response.text, "null")
data["cloud_name"] = "subcloud1"
data["subcloud_group"] = "group1"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
data["subcloud_group"] = "2"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
class BaseTestSwUpdateStrategyGet(BaseTestSwUpdateStrategyController):
"""Base test class for get requests"""
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_with_group_id_and_other_group_values(
self, mock_rpc_client):
# fake data contains subcloud-apply-type and max-parallel-subclouds
data = copy.copy(FAKE_SW_UPDATE_DATA)
data["subcloud_group"] = "group1"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
def setUp(self):
super().setUp()
data["subcloud_group"] = "2"
response = self.app.post_json(FAKE_URL,
headers=FAKE_HEADERS,
params=data,
expect_errors=True)
mock_rpc_client().create_sw_update_strategy.assert_not_called()
self.assertEqual(response.status_int, 400)
self.method = self.app.get
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_no_body(self, mock_rpc_client):
data = {}
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_no_type(self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_DATA)
del data['type']
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
class TestSwUpdateStrategyGet(BaseTestSwUpdateStrategyGet):
"""Test class for get requests"""
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_apply(self, mock_rpc_client):
data = FAKE_SW_UPDATE_APPLY_DATA
mock_rpc_client().apply_sw_update_strategy.return_value = True
response = self.app.post_json(FAKE_URL + '/actions',
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().apply_sw_update_strategy.assert_called_once()
self.assertEqual(response.status_int, 200)
def setUp(self):
super().setUp()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_scoped_post_sw_update_apply(self, mock_rpc_client):
data = FAKE_SW_UPDATE_APPLY_DATA
mock_rpc_client().apply_sw_update_strategy.return_value = True
response = self.app.post_json(
FAKE_URL + '/actions?type=' + consts.SW_UPDATE_TYPE_PATCH,
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().apply_sw_update_strategy.assert_called_once()
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_abort(self, mock_rpc_client):
mock_rpc_client().abort_sw_update_strategy.return_value = True
data = FAKE_SW_UPDATE_ABORT_DATA
response = self.app.post_json(FAKE_URL + '/actions',
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().abort_sw_update_strategy.assert_called_once()
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_scoped_post_sw_update_abort(self, mock_rpc_client):
mock_rpc_client().abort_sw_update_strategy.return_value = True
data = FAKE_SW_UPDATE_ABORT_DATA
response = self.app.post_json(
FAKE_URL + '/actions?type=' + consts.SW_UPDATE_TYPE_PATCH,
headers=FAKE_HEADERS,
params=data)
mock_rpc_client().abort_sw_update_strategy.assert_called_once()
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_post_sw_update_bad_action(self, mock_rpc_client):
data = copy.copy(FAKE_SW_UPDATE_APPLY_DATA)
data["action"] = "bad action"
six.assertRaisesRegex(self, webtest.app.AppError, "400 *",
self.app.post_json, FAKE_URL,
headers=FAKE_HEADERS, params=data)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_delete_sw_update_strategy(self, mock_rpc_client):
delete_url = FAKE_URL
mock_rpc_client().delete_sw_update_strategy.return_value = True
response = self.app.delete_json(delete_url, headers=FAKE_HEADERS)
mock_rpc_client().delete_sw_update_strategy.assert_called_once_with(
mock.ANY, update_type=None)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_scoped_delete_sw_update_strategy(self,
mock_rpc_client):
delete_url = FAKE_URL + "?type=" + consts.SW_UPDATE_TYPE_PATCH
mock_rpc_client().delete_sw_update_strategy.return_value = True
response = self.app.delete_json(delete_url, headers=FAKE_HEADERS)
mock_rpc_client().delete_sw_update_strategy.assert_called_once_with(
mock.ANY, update_type=consts.SW_UPDATE_TYPE_PATCH)
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_get_sw_update_strategy(self, mock_rpc_client):
fake_strategy.create_fake_strategy(self.ctx,
consts.SW_UPDATE_TYPE_PATCH)
get_url = FAKE_URL
response = self.app.get(get_url, headers=FAKE_HEADERS)
self.strategy = fake_strategy.create_fake_strategy(
self.ctx, consts.SW_UPDATE_TYPE_PATCH
)
def _assert_response_payload(self, response):
self.assertEqual(response.json['type'], consts.SW_UPDATE_TYPE_PATCH)
self.assertEqual(response.json['state'], consts.SW_UPDATE_STATE_INITIAL)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_scoped_get_sw_update_strategy(self, mock_rpc_client):
fake_strategy.create_fake_strategy(self.ctx,
consts.SW_UPDATE_TYPE_PATCH)
def test_get_succeeds(self):
"""Test get succeeds"""
get_url = FAKE_URL + '?type=' + consts.SW_UPDATE_TYPE_PATCH
response = self.app.get(get_url, headers=FAKE_HEADERS)
response = self._send_request()
self.assertEqual(response.json['type'], consts.SW_UPDATE_TYPE_PATCH)
self.assertEqual(response.json['state'], consts.SW_UPDATE_STATE_INITIAL)
self._assert_response(response)
self._assert_response_payload(response)
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_get_sw_update_strategy_steps(self, mock_rpc_client):
fake_subcloud.create_fake_subcloud(self.ctx)
fake_strategy.create_fake_strategy_step(self.ctx,
consts.STRATEGY_STATE_INITIAL)
def test_get_succeeds_with_type(self):
"""Test get succeeds with type"""
get_url = FAKE_URL + '/steps'
response = self.app.get(get_url, headers=FAKE_HEADERS)
self.url = f"{self.url}?type={consts.SW_UPDATE_TYPE_PATCH}"
self.assertEqual(response.json['strategy-steps'][0]['state'],
consts.STRATEGY_STATE_INITIAL)
response = self._send_request()
@mock.patch.object(rpc_client, 'ManagerOrchestratorClient')
def test_get_sw_update_strategy_single_step(self, mock_rpc_client):
fake_subcloud.create_fake_subcloud(self.ctx)
fake_strategy.create_fake_strategy_step(self.ctx,
consts.STRATEGY_STATE_INITIAL)
self._assert_response(response)
self._assert_response_payload(response)
get_url = FAKE_URL + '/steps/subcloud1'
response = self.app.get(get_url, headers=FAKE_HEADERS)
def test_get_succeeds_with_invalid_verb(self):
"""Test get succeeds with invalid verb"""
self.assertEqual(response.json['state'],
consts.STRATEGY_STATE_INITIAL)
# TODO(rlima): when a get request is made with an invalid verb, the steps
# variable from the controller is not mapped to a correct execution and,
# therefore, it results in a successful response while it should've been
# a bad request.
self.url = f"{self.url}/fake"
response = self._send_request()
self._assert_response(response)
def test_get_fails_with_db_api_not_found_exception(self):
"""Test get fails with db api not found exception"""
db_api.sw_update_strategy_destroy(self.ctx)
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.NOT_FOUND, "Strategy not found"
)
def test_get_fails_with_type_and_db_api_not_found_exception(self):
"""Test get fails with db api not found exception"""
self.url = f"{self.url}?type={consts.SW_UPDATE_TYPE_PATCH}"
db_api.sw_update_strategy_destroy(self.ctx)
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.NOT_FOUND,
f"Strategy of type '{consts.SW_UPDATE_TYPE_PATCH}' not found"
)
class TestSwUpdateStrategyGetSteps(BaseTestSwUpdateStrategyGet):
"""Test class for get requests with steps verb"""
def setUp(self):
super().setUp()
self.url = f"{self.url}/steps"
self.subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
self.strategy = fake_strategy.create_fake_strategy_step(
self.ctx, consts.STRATEGY_STATE_INITIAL
)
def test_get_steps_succeeds(self):
"""Test get steps succeeds"""
response = self._send_request()
self._assert_response(response)
self.assertEqual(
response.json['strategy-steps'][0]['state'],
consts.STRATEGY_STATE_INITIAL
)
def test_get_steps_succeeds_with_subcloud_name(self):
"""Test get steps succeeds with subcloud name"""
self.url = f"{self.url}/{self.subcloud.name}"
response = self._send_request()
self._assert_response(response)
self.assertEqual(response.json['cloud'], self.subcloud.name)
def test_get_steps_fails_with_inexistent_subcloud(self):
"""Test get steps fails with inexistent subcloud"""
self.url = f"{self.url}/fake_subcloud"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.NOT_FOUND, "Strategy step not found"
)
def test_get_steps_fails_with_inexistent_strategy_for_system_controller(self):
"""Test get steps fails with inexistent strategy for system controller"""
self.url = f"{self.url}/{dccommon_consts.SYSTEM_CONTROLLER_NAME}"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.NOT_FOUND, "Strategy step not found"
)
class BaseTestSwUpdateStrategyPost(BaseTestSwUpdateStrategyController):
"""Base test class for post requests"""
def setUp(self):
super().setUp()
self.method = self.app.post_json
class TestSwUpdateStrategyPost(BaseTestSwUpdateStrategyPost):
"""Test class for post requests"""
def setUp(self):
super().setUp()
self.params = {
"type": consts.SW_UPDATE_TYPE_PATCH,
"subcloud-apply-type": consts.SUBCLOUD_APPLY_TYPE_PARALLEL,
"max-parallel-subclouds": "10",
"stop-on-failure": "true"
}
self.mock_rpc_orchestrator_client().\
create_sw_update_strategy.return_value = (
"create_sw_update_strategy", {"payload": self.params}
)
def test_post_succeeds(self):
"""Test post succeeds"""
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_called_once()
def test_post_succeeds_with_force_option(self):
"""Test post succeeds with force option"""
self.params["force"] = "true"
self.params["cloud_name"] = "subcloud1"
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_called_once()
def test_post_fails_with_invalid_type(self):
"""Test post fails with invalid type"""
self.params["type"] = "fake"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "type invalid"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_invalid_subcloud_apply_type(self):
"""Test post fails with invalid subcloud apply type"""
self.params["subcloud-apply-type"] = "fake"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "subcloud-apply-type invalid"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_invalid_max_parallel_subclouds(self):
"""Test post fails with invalid max parallel subclouds"""
invalid_values = ["fake", 0, 501, -2]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["max-parallel-subclouds"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"max-parallel-subclouds invalid", call_count=index
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_invalid_stop_on_failure(self):
"""Test post fails with invalid stop on failure"""
invalid_values = ["fake", ]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["stop-on-failure"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"stop-on-failure invalid", call_count=index
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_invalid_force(self):
"""Test post fails with invalid force"""
self.params["force"] = "fake"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "force invalid"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_force_without_cloud_name(self):
"""Test post fails with force without cloud name"""
self.params["force"] = "true"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "The --force option can only be "
"applied for a single subcloud. Please specify the subcloud name."
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_succeeds_with_force_all_types(self):
"""Test post succeeds with force all types
Some strategy types defined in FORCE_ALL_TYPES allow the use of
the force parameter for all subclouds (without specifying the cloud_name)
"""
self.params["type"] = consts.SW_UPDATE_TYPE_KUBERNETES
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_called_once()
def test_post_fails_with_inexistent_subcloud_group_name(self):
"""Test post fails with inexistent subcloud group name"""
del self.params["subcloud-apply-type"]
del self.params["max-parallel-subclouds"]
invalid_values = ["fake", "999"]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["subcloud_group"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid group_id",
call_count=index
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_cloud_name_and_subcloud_group(self):
"""Test post fails with cloud name and subcloud group"""
del self.params["subcloud-apply-type"]
del self.params["max-parallel-subclouds"]
self.params["cloud_name"] = "subcloud1"
invalid_values = ["group1", "999"]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["subcloud_group"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "cloud_name and subcloud_group "
"are mutually exclusive", call_count=index
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_subcloud_group_and_other_values(self):
"""Test post fails with subcloud group and other values
The subcloud-apply-type and max-parallel-subclouds should not be used
when subcloud_group is sent
"""
invalid_values = ["group1", "999"]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["subcloud_group"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "subcloud-apply-type and "
"max-parallel-subclouds are not supported when subcloud_group is "
"applied", call_count=index
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_without_params(self):
"""Test post fails without params"""
self.params = {}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Body required"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_without_type(self):
"""Test post fails without type"""
del self.params["type"]
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "type required"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_not_called()
def test_post_fails_with_rpc_remote_error(self):
"""Test post fails with rpc remote error"""
self.mock_rpc_orchestrator_client().create_sw_update_strategy.side_effect = \
RemoteError("msg", "value")
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY, "Unable to create strategy "
f"of type '{consts.SW_UPDATE_TYPE_PATCH}': value"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_called_once()
def test_post_fails_with_rpc_generic_exception(self):
"""Test post fails with rpc generic exception"""
self.mock_rpc_orchestrator_client().create_sw_update_strategy.side_effect = \
Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR, "Unable to create strategy"
)
self.mock_rpc_orchestrator_client().create_sw_update_strategy.\
assert_called_once()
class TestSwUpdateStrategyPostActions(BaseTestSwUpdateStrategyPost):
"""Test class for post requests with actions verb"""
def setUp(self):
super().setUp()
self.url = f"{self.url}/actions"
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.return_value = (
"apply_sw_update_strategy", {"update_type": None}
)
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.return_value = (
"abort_sw_update_strategy", {"update_type": None}
)
def test_post_actions_succeeds(self):
"""Test post actions succeeds"""
actions = [consts.SW_UPDATE_ACTION_APPLY, consts.SW_UPDATE_ACTION_ABORT]
for action in actions:
self.params = {"action": action}
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_called_once()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_called_once()
def test_post_actions_succeeds_with_type(self):
"""Test post actions succeeds with type"""
self.url = f"{self.url}?type={consts.SW_UPDATE_TYPE_PATCH}"
actions = [consts.SW_UPDATE_ACTION_APPLY, consts.SW_UPDATE_ACTION_ABORT]
for action in actions:
self.params = {"action": action}
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_called_once()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_called_once()
def test_post_actions_succeeds_with_inexistent_action(self):
"""Test post actions succeeds with inexistent action
A post request with an inexistent action results in not executing it
"""
self.params = {"action": "fake"}
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_not_called()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_not_called()
def test_post_actions_fails_without_action(self):
"""Test post actions fails without action"""
self.params = {"key": "value"}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "action required"
)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_not_called()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_not_called()
def test_post_actions_fails_with_rpc_remote_error(self):
"""Test post actions fails with rpc remote error"""
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.side_effect = \
RemoteError("msg", "value")
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.side_effect = \
RemoteError("msg", "value")
actions = [consts.SW_UPDATE_ACTION_APPLY, consts.SW_UPDATE_ACTION_ABORT]
for index, action in enumerate(actions, start=1):
self.params = {"action": action}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY, f"Unable to {action} "
f"strategy of type 'None': value", call_count=index
)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_called_once()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_called_once()
def test_post_actions_fails_with_rpc_generic_exception(self):
"""Test post actions fails with rpc generic exception"""
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.side_effect = \
Exception()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.side_effect = \
Exception()
actions = [consts.SW_UPDATE_ACTION_APPLY, consts.SW_UPDATE_ACTION_ABORT]
for index, action in enumerate(actions, start=1):
self.params = {"action": action}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR,
f"Unable to {action} strategy", call_count=index
)
self.mock_rpc_orchestrator_client().apply_sw_update_strategy.\
assert_called_once()
self.mock_rpc_orchestrator_client().abort_sw_update_strategy.\
assert_called_once()
class TestSwUpdateStrategyDelete(BaseTestSwUpdateStrategyController):
"""Test class for delete requests"""
def setUp(self):
super().setUp()
self.method = self.app.delete
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.\
return_value = ("delete_sw_update_strategy", {"update_type": None})
def test_delete_succeeds(self):
"""Test delete succeeds"""
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.\
assert_called_once()
def test_delete_succeeds_with_type(self):
"""Test delete succeeds with type"""
self.url = f"{self.url}?type={consts.SW_UPDATE_TYPE_PATCH}"
response = self._send_request()
self._assert_response(response)
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.\
assert_called_once()
def test_delete_fails_with_rpc_remote_error(self):
"""Test delete fails with rpc remote error"""
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.side_effect = \
RemoteError("msg", "value")
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY,
"Unable to delete strategy of type 'None': value"
)
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.\
assert_called_once()
def test_delete_fails_with_rpc_generic_exception(self):
"""Test delete fails with rpc generic exception"""
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.side_effect = \
Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR, "Unable to delete strategy"
)
self.mock_rpc_orchestrator_client().delete_sw_update_strategy.\
assert_called_once()