Merge "Improve unit test coverage for dcmanager's APIs (system_peers)"

This commit is contained in:
Zuul 2024-04-16 21:23:54 +00:00 committed by Gerrit Code Review
commit f30002f332
2 changed files with 698 additions and 316 deletions

View File

@ -1,316 +0,0 @@
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import uuid
import mock
from six.moves import http_client
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.rpc import client as rpc_client
from dcmanager.tests.unit.api import test_root_controller as testroot
from dcmanager.tests.unit.api.v1.controllers.mixins import APIMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import DeleteMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import GetMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import PostJSONMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import UpdateMixin
from dcmanager.tests import utils
SAMPLE_SYSTEM_PEER_UUID = str(uuid.uuid4())
SAMPLE_SYSTEM_PEER_NAME = 'SystemPeer1'
SAMPLE_MANAGER_ENDPOINT = 'http://127.0.0.1:5000'
SAMPLE_MANAGER_USERNAME = 'admin'
SAMPLE_MANAGER_PASSWORD = 'password'
SAMPLE_ADMINISTRATIVE_STATE = 'enabled'
SAMPLE_HEARTBEAT_INTERVAL = 10
SAMPLE_HEARTBEAT_FAILURE_THRESHOLD = 3
SAMPLE_HEARTBEAT_FAILURES_POLICY = 'alarm'
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT = 600
SAMPLE_PEER_CONTROLLER_GATEWAY_IP = '128.128.128.1'
class SystemPeerAPIMixin(APIMixin):
API_PREFIX = '/v1.0/system-peers'
RESULT_KEY = 'system_peers'
EXPECTED_FIELDS = ['id',
'peer-uuid',
'peer-name',
'manager-endpoint',
'manager-username',
'peer-controller-gateway-address',
'administrative-state',
'heartbeat-interval',
'heartbeat-failure-threshold',
'heartbeat-failure-policy',
'heartbeat-maintenance-timeout',
'created-at',
'updated-at']
def setUp(self):
super(SystemPeerAPIMixin, self).setUp()
self.fake_rpc_client.some_method = mock.MagicMock()
def _get_test_system_peer_dict(self, data_type, **kw):
# id should not be part of the structure
system_peer = {
'peer_uuid': kw.get('peer_uuid', SAMPLE_SYSTEM_PEER_UUID),
'peer_name': kw.get('peer_name', SAMPLE_SYSTEM_PEER_NAME),
'administrative_state': kw.get('administrative_state',
SAMPLE_ADMINISTRATIVE_STATE),
'heartbeat_interval': kw.get('heartbeat_interval',
SAMPLE_HEARTBEAT_INTERVAL),
'heartbeat_failure_threshold': kw.get(
'heartbeat_failure_threshold', SAMPLE_HEARTBEAT_FAILURE_THRESHOLD),
'heartbeat_failure_policy': kw.get(
'heartbeat_failure_policy', SAMPLE_HEARTBEAT_FAILURES_POLICY),
'heartbeat_maintenance_timeout': kw.get(
'heartbeat_maintenance_timeout',
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT)
}
if data_type == 'db':
system_peer['endpoint'] = kw.get('manager_endpoint',
SAMPLE_MANAGER_ENDPOINT)
system_peer['username'] = kw.get('manager_username',
SAMPLE_MANAGER_USERNAME)
system_peer['password'] = kw.get('manager_password',
SAMPLE_MANAGER_PASSWORD)
system_peer['gateway_ip'] = kw.get(
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP)
else:
system_peer['manager_endpoint'] = kw.get('manager_endpoint',
SAMPLE_MANAGER_ENDPOINT)
system_peer['manager_username'] = kw.get('manager_username',
SAMPLE_MANAGER_USERNAME)
system_peer['manager_password'] = kw.get('manager_password',
SAMPLE_MANAGER_PASSWORD)
system_peer['peer_controller_gateway_address'] = kw.get(
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP)
return system_peer
def _post_get_test_system_peer(self, **kw):
post_body = self._get_test_system_peer_dict('dict', **kw)
return post_body
# The following methods are required for subclasses of APIMixin
def get_api_prefix(self):
return self.API_PREFIX
def get_result_key(self):
return self.RESULT_KEY
def get_expected_api_fields(self):
return self.EXPECTED_FIELDS
def get_omitted_api_fields(self):
return []
def _create_db_object(self, context, **kw):
creation_fields = self._get_test_system_peer_dict('db', **kw)
return db_api.system_peer_create(context, **creation_fields)
def get_post_object(self):
return self._post_get_test_system_peer()
def get_update_object(self):
update_object = {
'peer_controller_gateway_address': '192.168.205.1'
}
return update_object
# Combine System Peer API with mixins to test post, get, update and delete
class TestSystemPeerPost(testroot.DCManagerApiTest,
SystemPeerAPIMixin, PostJSONMixin):
def setUp(self):
super(TestSystemPeerPost, self).setUp()
def verify_post_failure(self, response):
# Failures will return text rather than JSON
self.assertEqual(response.content_type, 'text/plain')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_numerical_uuid_fails(self, mock_client):
# A numerical uuid is not permitted. otherwise the 'get' operations
# which support getting by either name or ID could become confused
# if a name for one peer was the same as an ID for another.
ndict = self.get_post_object()
ndict['peer_uuid'] = '123'
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_blank_uuid_fails(self, mock_client):
# An empty name is not permitted
ndict = self.get_post_object()
ndict['peer_uuid'] = ''
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_empty_manager_endpoint_fails(self, mock_client):
# An empty description is considered invalid
ndict = self.get_post_object()
ndict['manager_endpoint'] = ''
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_wrong_manager_endpoint_fails(self, mock_client):
# An empty description is considered invalid
ndict = self.get_post_object()
ndict['manager_endpoint'] = 'ftp://somepath'
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_wrong_peergw_ip_fails(self, mock_client):
# An empty description is considered invalid
ndict = self.get_post_object()
ndict['peer_controller_gateway_address'] = '123'
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_bad_administrative_state(self, mock_client):
# update_apply_type must be either 'enabled' or 'disabled'
ndict = self.get_post_object()
ndict['administrative_state'] = 'something_invalid'
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_create_with_bad_heartbeat_interval(self, mock_client):
# heartbeat_interval must be an integer between 1 and 600
ndict = self.get_post_object()
# All the entries in bad_values should be considered invalid
bad_values = [0, 601, -1, 'abc']
for bad_value in bad_values:
ndict['heartbeat_interval'] = bad_value
response = self.app.post_json(self.get_api_prefix(),
ndict,
headers=self.get_api_headers(),
expect_errors=True)
self.verify_post_failure(response)
class TestSystemPeerGet(testroot.DCManagerApiTest,
SystemPeerAPIMixin, GetMixin):
def setUp(self):
super(TestSystemPeerGet, self).setUp()
@mock.patch.object(rpc_client, 'ManagerClient')
def test_get_single_by_uuid(self, mock_client):
# create a system peer
context = utils.dummy_context()
peer_uuid = str(uuid.uuid4())
self._create_db_object(context, peer_uuid=peer_uuid)
# Test that a GET operation for a valid ID works
response = self.app.get(self.get_single_url(peer_uuid),
headers=self.get_api_headers())
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.OK)
self.validate_entry(response.json)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_get_single_by_name(self, mock_client):
# create a system peer
context = utils.dummy_context()
peer_name = 'TestPeer'
self._create_db_object(context, peer_name=peer_name)
# Test that a GET operation for a valid ID works
response = self.app.get(self.get_single_url(peer_name),
headers=self.get_api_headers())
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.OK)
self.validate_entry(response.json)
class TestSystemPeerUpdate(testroot.DCManagerApiTest,
SystemPeerAPIMixin, UpdateMixin):
def setUp(self):
super(TestSystemPeerUpdate, self).setUp()
def validate_updated_fields(self, sub_dict, full_obj):
for key, value in sub_dict.items():
key = key.replace('_', '-')
self.assertEqual(value, full_obj.get(key))
@mock.patch.object(rpc_client, 'ManagerClient')
def test_update_invalid_administrative_state(self, mock_client):
context = utils.dummy_context()
single_obj = self._create_db_object(context)
update_data = {
'administrative_state': 'something_bad'
}
response = self.app.patch_json(self.get_single_url(single_obj.id),
headers=self.get_api_headers(),
params=update_data,
expect_errors=True)
# Failures will return text rather than json
self.assertEqual(response.content_type, 'text/plain')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_update_invalid_heartbeat_interval(self, mock_client):
context = utils.dummy_context()
single_obj = self._create_db_object(context)
update_data = {
'heartbeat_interval': -1
}
response = self.app.patch_json(self.get_single_url(single_obj.id),
headers=self.get_api_headers(),
params=update_data,
expect_errors=True)
# Failures will return text rather than json
self.assertEqual(response.content_type, 'text/plain')
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
class TestSystemPeerDelete(testroot.DCManagerApiTest,
SystemPeerAPIMixin, DeleteMixin):
def setUp(self):
super(TestSystemPeerDelete, self).setUp()
@mock.patch.object(rpc_client, 'ManagerClient')
def test_delete_by_uuid(self, mock_client):
context = utils.dummy_context()
peer_uuid = str(uuid.uuid4())
self._create_db_object(context, peer_uuid=peer_uuid)
response = self.app.delete_json(self.get_single_url(peer_uuid),
headers=self.get_api_headers())
self.assertEqual(response.status_int, 200)
@mock.patch.object(rpc_client, 'ManagerClient')
def test_delete_by_name(self, mock_client):
context = utils.dummy_context()
peer_name = 'TestPeer'
self._create_db_object(context, peer_name=peer_name)
response = self.app.delete_json(self.get_single_url(peer_name),
headers=self.get_api_headers())
self.assertEqual(response.status_int, 200)

View File

@ -0,0 +1,698 @@
# Copyright (c) 2023-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import http.client
import json
import uuid
import mock
from oslo_messaging import RemoteError
from dcmanager.api.controllers.v1 import system_peers
from dcmanager.common import consts
from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.tests.unit.api.test_root_controller import DCManagerApiTest
from dcmanager.tests.unit.api.v1.controllers.mixins import APIMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import DeleteMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import GetMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import PostJSONMixin
from dcmanager.tests.unit.api.v1.controllers.mixins import UpdateMixin
from dcmanager.tests.unit.common import fake_subcloud
SAMPLE_SYSTEM_PEER_UUID = str(uuid.uuid4())
SAMPLE_SYSTEM_PEER_NAME = 'SystemPeer1'
SAMPLE_MANAGER_ENDPOINT = 'http://127.0.0.1:5000'
SAMPLE_MANAGER_USERNAME = 'admin'
SAMPLE_MANAGER_PASSWORD = 'password'
SAMPLE_ADMINISTRATIVE_STATE = 'enabled'
SAMPLE_HEARTBEAT_INTERVAL = 10
SAMPLE_HEARTBEAT_FAILURE_THRESHOLD = 3
SAMPLE_HEARTBEAT_FAILURES_POLICY = 'alarm'
SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT = 600
SAMPLE_PEER_CONTROLLER_GATEWAY_IP = '128.128.128.1'
class SystemPeersAPIMixin(APIMixin):
API_PREFIX = '/v1.0/system-peers'
RESULT_KEY = 'system_peers'
EXPECTED_FIELDS = [
'id', 'peer-uuid', 'peer-name', 'manager-endpoint', 'manager-username',
'peer-controller-gateway-address', 'administrative-state',
'heartbeat-interval', 'heartbeat-failure-threshold',
'heartbeat-failure-policy', 'heartbeat-maintenance-timeout', 'created-at',
'updated-at'
]
def _get_test_system_peer_dict(self, data_type, **kw):
# id should not be part of the structure
system_peer = {
'peer_uuid': kw.get('peer_uuid', SAMPLE_SYSTEM_PEER_UUID),
'peer_name': kw.get('peer_name', SAMPLE_SYSTEM_PEER_NAME),
'administrative_state': kw.get(
'administrative_state', SAMPLE_ADMINISTRATIVE_STATE
),
'heartbeat_interval': kw.get(
'heartbeat_interval', SAMPLE_HEARTBEAT_INTERVAL
),
'heartbeat_failure_threshold': kw.get(
'heartbeat_failure_threshold', SAMPLE_HEARTBEAT_FAILURE_THRESHOLD
),
'heartbeat_failure_policy': kw.get(
'heartbeat_failure_policy', SAMPLE_HEARTBEAT_FAILURES_POLICY
),
'heartbeat_maintenance_timeout': kw.get(
'heartbeat_maintenance_timeout', SAMPLE_HEARTBEAT_MAINTENANCE_TIMEOUT
)
}
if data_type == 'db':
system_peer['endpoint'] = \
kw.get('manager_endpoint', SAMPLE_MANAGER_ENDPOINT)
system_peer['username'] = \
kw.get('manager_username', SAMPLE_MANAGER_USERNAME)
system_peer['password'] = \
kw.get('manager_password', SAMPLE_MANAGER_PASSWORD)
system_peer['gateway_ip'] = kw.get(
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP
)
else:
system_peer['manager_endpoint'] = \
kw.get('manager_endpoint', SAMPLE_MANAGER_ENDPOINT)
system_peer['manager_username'] = \
kw.get('manager_username', SAMPLE_MANAGER_USERNAME)
system_peer['manager_password'] = \
kw.get('manager_password', SAMPLE_MANAGER_PASSWORD)
system_peer['peer_controller_gateway_address'] = kw.get(
'peer_controller_gateway_ip', SAMPLE_PEER_CONTROLLER_GATEWAY_IP
)
return system_peer
def _post_get_test_system_peer(self, **kw):
return self._get_test_system_peer_dict('dict', **kw)
# The following methods are required for subclasses of APIMixin
def get_api_prefix(self):
return self.API_PREFIX
def get_result_key(self):
return self.RESULT_KEY
def get_expected_api_fields(self):
return self.EXPECTED_FIELDS
def get_omitted_api_fields(self):
return []
def _create_db_object(self, context, **kw):
creation_fields = self._get_test_system_peer_dict('db', **kw)
return db_api.system_peer_create(context, **creation_fields)
def get_post_object(self):
return self._post_get_test_system_peer()
def get_update_object(self):
return {'peer_controller_gateway_address': '192.168.205.1'}
class SystemPeersPropertiesValidationMixin(object):
"""Specifies common test cases to validate payload properties in requests"""
def _remove_empty_string_in_patch_request(self, invalid_values):
"""Removes the empty string in patch requests
When the request method is patch, the properties can be sent as empty string
values, which does not happen in post requests. Because of that, it's
necessary to remove it from the validated values.
"""
if self.method == self.app.patch_json:
invalid_values.remove("")
def test_request_fails_without_payload(self):
"""Test request fails without payload"""
self.params = {}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Body required"
)
@mock.patch.object(json, 'loads')
def test_request_fails_with_json_loads_exception(self, mock_json_loads):
"""Test request fails with json loads exception"""
mock_json_loads.side_effect = Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Request body is malformed."
)
mock_json_loads.assert_called_once()
def test_request_fails_with_invalid_payload(self):
"""Test request fails with invalid payload"""
self.params = "invalid"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid request body format"
)
def test_request_fails_with_invalid_uuid(self):
"""Test request fails with invalid uuid
A numerical uuid is not permitted. Otherwise, the 'get' operation which
supports getting a system peer by either name or ID could become confusing
if the name for a peer was the same as the ID for another.
"""
invalid_values = ["", "999"]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["peer_uuid"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer uuid", index
)
def test_request_fails_with_invalid_name(self):
"""Test request fails with invalid name"""
invalid_values = ["", "999", "a" * 256, ".*+?|()[]{}^$"]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["peer_name"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer name", index
)
def test_request_fails_with_invalid_manager_endpoint(self):
"""Test request fails with invalid manager endpoint"""
invalid_values = [
"", "ftp://somepath",
"a" * system_peers.MAX_SYSTEM_PEER_MANAGER_ENDPOINT_LEN
]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["manager_endpoint"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer manager_endpoint",
call_count=index
)
def test_request_fails_with_invalid_manager_username(self):
"""Test request fails with invalid manager username"""
invalid_values = [
"", "a" * system_peers.MAX_SYSTEM_PEER_MANAGER_USERNAME_LEN
]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["manager_username"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer manager_username",
call_count=index
)
def test_request_fails_with_invalid_manager_password(self):
"""Test request fails with invalid manager password"""
invalid_values = [
"", "a" * system_peers.MAX_SYSTEM_PEER_MANAGER_PASSWORD_LEN
]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["manager_password"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer manager_password",
call_count=index
)
def test_request_fails_with_invalid_peer_controller_gateway_address(self):
"""Test request fails with invalid peer controller gateway address"""
invalid_values = [
"", "a" * system_peers.MAX_SYSTEM_PEER_STRING_DEFAULT_LEN,
"192.168.0.0.1"
]
self._remove_empty_string_in_patch_request(invalid_values)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["peer_controller_gateway_address"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"Invalid peer peer_controller_gateway_address", call_count=index
)
def test_request_fails_with_invalid_administrative_state(self):
"""Test request fails with invalid administrative state
The administrative state must be either enabled or disabled.
"""
self.params["administrative_state"] = "fake"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer administrative_state"
)
def test_request_fails_with_invalid_heartbeat_interval(self):
"""Test request fails with invalid heartbeat interval
The heartbeat interval must be between 1 and 600.
"""
invalid_values = [
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_INTERVAL - 1,
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_INTERVAL + 1,
-1, "fake"
]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["heartbeat_interval"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "Invalid peer heartbeat_interval",
call_count=index
)
def test_request_fails_with_invalid_heartbeat_failure_threshold(self):
"""Test request fails with invalid heartbeat failure threshold"""
invalid_values = [
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD - 1,
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD + 1,
-1, "fake"
]
# When the request method is patch, the invalid_value 0 results in the if
# condition returning false as if a value was not sent. Because of that,
# it needs to be removed from the validation.
if self.method == self.app.patch_json:
invalid_values.remove(
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_FAILURE_THRESHOLD - 1
)
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["heartbeat_failure_threshold"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"Invalid peer heartbeat_failure_threshold", call_count=index
)
def test_request_fails_with_invalid_heartbeat_failure_policy(self):
"""Test request fails with invalid heartbeat failure policy
The heartbeat failure policy must be either alarm, rehome or delegate.
"""
self.params["heartbeat_failure_policy"] = "fake"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"Invalid peer heartbeat_failure_policy"
)
def test_request_fails_with_invalid_heartbeat_maintenance_timeout(self):
"""Test request fails with invalid heartbeat maintenance timeout"""
invalid_values = [
system_peers.MIN_SYSTEM_PEER_HEARTBEAT_MAINTENACE_TIMEOUT - 1,
system_peers.MAX_SYSTEM_PEER_HEARTBEAT_MAINTENACE_TIMEOUT + 1,
-1, "fake"
]
for index, invalid_value in enumerate(invalid_values, start=1):
self.params["heartbeat_maintenance_timeout"] = invalid_value
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"Invalid peer heartbeat_maintenance_timeout", call_count=index
)
class BaseTestSystemPeersController(DCManagerApiTest, SystemPeersAPIMixin):
"""Base class for testing the SystemPeersController"""
def setUp(self):
super().setUp()
self.url = "/v1.0/system-peers"
self._mock_rpc_client()
class TestSystemPeersController(BaseTestSystemPeersController):
"""Test class for SystemPeersController"""
def setUp(self):
super().setUp()
def test_unmapped_method(self):
"""Test requesting an unmapped method results in success with null content"""
self.method = self.app.put
response = self._send_request()
self._assert_response(response)
self.assertEqual(response.text, "null")
class TestSystemPeersGet(BaseTestSystemPeersController, GetMixin):
"""Test class for get requests"""
def setUp(self):
super().setUp()
self.method = self.app.get
# TODO(rlima): update the GetMixin to create the object in setUp rather
# than in the test case and update this class. This should be done in
# all of the classes
def _assert_response_content(self, response):
"""Assert the response content from get requests
The database returned values use _ while the returned dict from the API
response return values with -.
"""
for key, value in self.system_peer.items():
key = key.replace("_", "-")
if key == "created-at" or key == "updated-at" or "deleted-at":
continue
self.assertEqual(response.json[key], value)
def test_get_succeeds_by_id(self):
"""Test get succeeds by id"""
self.system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{self.system_peer.id}"
response = self._send_request()
self._assert_response(response)
self._assert_response_content(response)
def test_get_succeeds_by_name(self):
"""Test get succeeds by name"""
self.system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{self.system_peer.peer_name}"
response = self._send_request()
self._assert_response(response)
self._assert_response_content(response)
def test_get_succeeds_with_subcloud_peer_groups(self):
"""Test get succeeds with subcloud peer groups"""
self.system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{self.system_peer.peer_name}/True"
response = self._send_request()
self._assert_response(response)
self.assertTrue("subcloud_peer_groups" in response.json)
self.assertEqual(response.json["subcloud_peer_groups"], [])
class TestSystemPeersPost(
BaseTestSystemPeersController, SystemPeersPropertiesValidationMixin,
PostJSONMixin
):
"""Test class for post requests"""
def setUp(self):
super().setUp()
self.method = self.app.post_json
self.params = self.get_post_object()
def test_post_fails_with_db_api_duplicate_entry_exception(self):
"""Test post fails with db api duplicate entry exception"""
response = self._send_request()
self._assert_response(response)
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.CONFLICT,
"A system peer with this UUID already exists"
)
@mock.patch.object(db_api, "system_peer_create")
def test_post_fails_with_db_api_remote_error(self, mock_db_api):
"""Test post fails with db api remote error"""
mock_db_api.side_effect = RemoteError("msg", "value")
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY, "value"
)
@mock.patch.object(db_api, "system_peer_create")
def test_post_fails_with_db_api_generic_exception(self, mock_db_api):
"""Test post fails with db api generic exception"""
mock_db_api.side_effect = Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR,
"Unable to create system peer"
)
class BaseTestSystemPeersPatch(BaseTestSystemPeersController):
"""Base test class for patch requests"""
def setUp(self):
super().setUp()
self.method = self.app.patch_json
self.params = self.get_post_object()
class TestSystemPeersPatchPropertiesValidation(
BaseTestSystemPeersPatch, SystemPeersPropertiesValidationMixin
):
"""Test class for validating the payload properties in patch requests"""
def setUp(self):
super().setUp()
self.system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{self.system_peer.peer_uuid}"
class TestSystemPeersPatch(BaseTestSystemPeersPatch, UpdateMixin):
"""Test class for patch requests"""
def setUp(self):
super().setUp()
# Overrides validate_updated_fields from UpdateMixin
def validate_updated_fields(self, sub_dict, full_obj):
for key, value in sub_dict.items():
key = key.replace('_', '-')
self.assertEqual(value, full_obj.get(key))
def test_patch_fails_with_inexistent_system_peer(self):
"""Test patch fails with inexistent system peer"""
self.url = f"{self.url}/9999"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.NOT_FOUND, "System Peer not found"
)
def test_patch_fails_without_properties_to_update(self):
"""Test patch fails without properties to update"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
self.params = {"key": "value"}
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST, "nothing to update"
)
@mock.patch.object(db_api, "system_peer_update")
def test_patch_fails_with_db_api_remote_error(self, mock_db_api):
"""Test patch fails with db api remote error"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
mock_db_api.side_effect = RemoteError("msg", "value")
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY, "value"
)
@mock.patch.object(db_api, "system_peer_update")
def test_patch_fails_with_db_api_generic_exception(self, mock_db_api):
"""Test patch fails with db api generic exception"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
mock_db_api.side_effect = Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR,
"Unable to update system peer"
)
class TestSystemPeersDelete(BaseTestSystemPeersController, DeleteMixin):
"""Test class for delete requests"""
def setUp(self):
super().setUp()
self.method = self.app.delete
def test_delete_succeeds_by_id(self):
"""Test delete succeeds by id"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
response = self._send_request()
self._assert_response(response)
self.assertEqual(len(db_api.system_peer_get_all(self.ctx)), 0)
def test_delete_succeeds_by_name(self):
"""Test delete succeeds by name"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_name}"
response = self._send_request()
self._assert_response(response)
self.assertEqual(len(db_api.system_peer_get_all(self.ctx)), 0)
def test_delete_fails_with_existing_association(self):
"""Test delete fails with existing association"""
system_peer = self._create_db_object(self.ctx)
subcloud = fake_subcloud.create_fake_subcloud(self.ctx)
db_api.peer_group_association_create(
self.ctx, subcloud.peer_group_id, system_peer.id,
consts.PEER_GROUP_PRIMARY_PRIORITY, consts.ASSOCIATION_TYPE_PRIMARY,
consts.ASSOCIATION_SYNC_STATUS_IN_SYNC, "None"
)
self.url = f"{self.url}/{system_peer.peer_uuid}"
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.BAD_REQUEST,
"Cannot delete a system peer which is associated with peer group."
)
@mock.patch.object(db_api, "system_peer_destroy")
def test_delete_fails_with_db_api_remote_error(self, mock_db_api):
"""Test delete fails with db api remote error"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
mock_db_api.side_effect = RemoteError("msg", "value")
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.UNPROCESSABLE_ENTITY, "value"
)
@mock.patch.object(db_api, "system_peer_destroy")
def test_delete_fails_with_db_api_generic_exception(self, mock_db_api):
"""Test delete fails with db api generic exception"""
system_peer = self._create_db_object(self.ctx)
self.url = f"{self.url}/{system_peer.peer_uuid}"
mock_db_api.side_effect = Exception()
response = self._send_request()
self._assert_pecan_and_response(
response, http.client.INTERNAL_SERVER_ERROR,
"Unable to delete system peer"
)