Merge "Enable pylint checks for unsubscriptable-object"
This commit is contained in:
commit
6123f0fa9c
|
@ -77,14 +77,12 @@ load-plugins=
|
|||
# E1120: no-value-for-parameter
|
||||
# E1121: too-many-function-args
|
||||
# E1124: redundant-keyword-arg
|
||||
# E1136: unsubscriptable-object
|
||||
# E1205: logging-too-many-args
|
||||
disable=C, R, fixme, W0101, W0105, W0106, W0107, W0108, W0110, W0123, W0150,
|
||||
W0201, W0211, W0212, W0221, W0223, W0231, W0235, W0311, W0402, W0403, W0404,
|
||||
W0603, W0612, W0613, W0621, W0622, W0631, W0632, W0701, W0703,
|
||||
W1113, W1201, W1401, W1505,
|
||||
E0202, E0203, E0213, E0401, E0604, E0611, E0633, E0701,
|
||||
E1101, E1102, E1120, E1121, E1124, E1136, E1205
|
||||
E0213, E0401, E0604, E0611, E0633, E0701,
|
||||
E1101, E1102, E1120, E1121, E1124
|
||||
|
||||
[REPORTS]
|
||||
# Set the output format. Available formats are text, parseable, colorized, msvs
|
||||
|
|
|
@ -181,6 +181,7 @@ class PartitionController(rest.RestController):
|
|||
pecan.request.context,
|
||||
marker)
|
||||
|
||||
partitions = []
|
||||
if self._from_ihosts and self._from_idisk:
|
||||
partitions = pecan.request.dbapi.partition_get_by_idisk(
|
||||
disk_uuid,
|
||||
|
@ -301,7 +302,7 @@ class PartitionController(rest.RestController):
|
|||
return Partition.convert_with_links(rpc_partition)
|
||||
except exception.HTTPNotFound:
|
||||
msg = _("Partition update failed: host %s partition %s : patch %s"
|
||||
% (ihost['hostname'], partition['device_path'], patch))
|
||||
% (ihost['hostname'], partition.device_path, patch))
|
||||
raise wsme.exc.ClientSideError(msg)
|
||||
|
||||
@cutils.synchronized(LOCK_NAME)
|
||||
|
@ -379,15 +380,15 @@ def _partition_pre_patch_checks(partition_obj, patch_obj, host_obj):
|
|||
if not cutils.is_int_like(p['value']):
|
||||
raise wsme.exc.ClientSideError(
|
||||
_("Requested partition size must be an integer "
|
||||
"greater than 0: %s GiB") % p['value'] / 1024)
|
||||
"greater than 0: %s ") % p['value'])
|
||||
if int(p['value']) <= 0:
|
||||
raise wsme.exc.ClientSideError(
|
||||
_("Requested partition size must be an integer "
|
||||
"greater than 0: %s GiB") % p['value'] / 1024)
|
||||
"greater than 0: %s GiB") % (int(p['value']) / 1024))
|
||||
if int(p['value']) <= partition_obj.size_mib:
|
||||
raise wsme.exc.ClientSideError(
|
||||
_("Requested partition size must be larger than current "
|
||||
"size: %s GiB <= %s GiB") % (p['value'] / 1024,
|
||||
"size: %s GiB <= %s GiB") % (int(p['value']) / 1024,
|
||||
math.floor(float(partition_obj.size_mib) / 1024 * 1000) / 1000.0))
|
||||
|
||||
|
||||
|
|
|
@ -334,7 +334,7 @@ class PVController(rest.RestController):
|
|||
raise exception.PatchError(patch=patch, reason=e)
|
||||
|
||||
# Semantic Checks
|
||||
_check("modify", pv)
|
||||
_check("modify", pv.as_dict())
|
||||
try:
|
||||
# Update only the fields that have changed
|
||||
for field in objects.pv.fields:
|
||||
|
@ -346,7 +346,7 @@ class PVController(rest.RestController):
|
|||
return PV.convert_with_links(rpc_pv)
|
||||
except exception.HTTPNotFound:
|
||||
msg = _("PV update failed: host %s pv %s : patch %s"
|
||||
% (ihost['hostname'], pv['lvm_pv_name'], patch))
|
||||
% (ihost['hostname'], pv.lvm_pv_name, patch))
|
||||
raise wsme.exc.ClientSideError(msg)
|
||||
|
||||
@cutils.synchronized(LOCK_NAME)
|
||||
|
|
|
@ -0,0 +1,461 @@
|
|||
#
|
||||
# Copyright (c) 2013-2019 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
"""
|
||||
Tests for the API /ipartitions/ methods.
|
||||
|
||||
Future Work Items:
|
||||
Add API links tests
|
||||
Add partition list scoped by disk tests
|
||||
(Some of these will fail unless code changes are made)
|
||||
"""
|
||||
|
||||
import mock
|
||||
import webtest.app
|
||||
from six.moves import http_client
|
||||
|
||||
from sysinv.common import constants
|
||||
from sysinv.common.exception import HTTPNotFound
|
||||
from sysinv.openstack.common import uuidutils
|
||||
|
||||
from sysinv.tests.api import base
|
||||
from sysinv.tests.db import utils as dbutils
|
||||
|
||||
|
||||
class FakeConductorAPI(object):
|
||||
|
||||
def __init__(self, dbapi):
|
||||
self.dbapi = dbapi
|
||||
self.create_controller_filesystems = mock.MagicMock()
|
||||
|
||||
# By configuring the host as provisioned, the following must be mocked
|
||||
self.update_partition_config = mock.MagicMock()
|
||||
|
||||
|
||||
class TestPartition(base.FunctionalTest):
|
||||
|
||||
# API_PREFIX is the prefix for the URL
|
||||
API_PREFIX = '/partitions'
|
||||
|
||||
# can perform API operations on partitions at a sublevel of host
|
||||
HOST_PREFIX = '/ihosts'
|
||||
|
||||
# RESULT_KEY is the python table key for the list of results
|
||||
RESULT_KEY = 'partitions'
|
||||
|
||||
# API_HEADERS are a generic header passed to most API calls
|
||||
API_HEADERS = {'User-Agent': 'sysinv-test'}
|
||||
|
||||
disk_device_path = '/dev/disk/by-path/pci-0000:00:0d.0-ata-1.0'
|
||||
partition_device_path = '/dev/disk/by-path/pci-0000:00:0d.0-ata-1.1'
|
||||
|
||||
def setUp(self):
|
||||
super(TestPartition, self).setUp()
|
||||
|
||||
# Mock the conductor API
|
||||
self.fake_conductor_api = FakeConductorAPI(self.dbapi)
|
||||
p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI')
|
||||
self.mock_conductor_api = p.start()
|
||||
self.mock_conductor_api.return_value = self.fake_conductor_api
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Behave as if the API is running on controller-0
|
||||
p = mock.patch('socket.gethostname')
|
||||
self.mock_socket_gethostname = p.start()
|
||||
self.mock_socket_gethostname.return_value = 'controller-0'
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Behave as if running on a virtual system
|
||||
p = mock.patch('sysinv.common.utils.is_virtual')
|
||||
self.mock_utils_is_virtual = p.start()
|
||||
self.mock_utils_is_virtual.return_value = True
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Create an isystem and load
|
||||
self.system = dbutils.create_test_isystem(
|
||||
capabilities={"cinder_backend": constants.CINDER_BACKEND_CEPH,
|
||||
"vswitch_type": constants.VSWITCH_TYPE_NONE,
|
||||
"region_config": False,
|
||||
"sdn_enabled": False,
|
||||
"shared_services": "[]"}
|
||||
)
|
||||
self.load = dbutils.create_test_load()
|
||||
# Create controller-0
|
||||
self.ihost = self._create_controller_0()
|
||||
self.disk = self._create_disk(self.ihost.id)
|
||||
|
||||
def get_single_url(self, uuid):
|
||||
return '%s/%s' % (self.API_PREFIX, uuid)
|
||||
|
||||
def get_host_scoped_url(self, host_uuid):
|
||||
return '%s/%s%s' % (self.HOST_PREFIX, host_uuid, self.API_PREFIX)
|
||||
|
||||
def _create_controller_0(self, **kw):
|
||||
# The host must be provisioned in order to perform PATCH operations
|
||||
ihost = dbutils.create_test_ihost(
|
||||
hostname='controller-0',
|
||||
mgmt_mac='01:34:67:9A:CD:F0',
|
||||
mgmt_ip='192.168.204.3',
|
||||
serialid='serial1',
|
||||
bm_ip='128.224.150.193',
|
||||
invprovision=constants.PROVISIONED,
|
||||
config_target='e4ec5ee2-967d-4b2d-8de8-f0a390fcbd35',
|
||||
config_applied='e4ec5ee2-967d-4b2d-8de8-f0a390fcbd35',
|
||||
**kw)
|
||||
return ihost
|
||||
|
||||
def _create_disk(self, ihost_id):
|
||||
return dbutils.create_test_idisk(
|
||||
device_node='/dev/sda',
|
||||
device_path=self.disk_device_path,
|
||||
available_mib=256,
|
||||
forihostid=ihost_id)
|
||||
|
||||
|
||||
class TestPostPartition(TestPartition):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPostPartition, self).setUp()
|
||||
|
||||
def test_create_partition(self):
|
||||
# Test creation of partition
|
||||
ndict = dbutils.post_get_test_partition(forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
size_mib=128)
|
||||
response = self.post_json(self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
# Verify that no filesystems were created
|
||||
self.fake_conductor_api.create_controller_filesystems.\
|
||||
assert_not_called()
|
||||
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
self.assertEqual(response.json['size_mib'], ndict['size_mib'])
|
||||
|
||||
uuid = response.json['uuid']
|
||||
# Verify that the partition was created and some basic attributes match
|
||||
response = self.get_json(self.get_single_url(uuid))
|
||||
self.assertEqual(response['size_mib'], ndict['size_mib'])
|
||||
|
||||
def test_create_partition_invalid_host(self):
|
||||
# Test creation of partition with an invalid host
|
||||
ndict = dbutils.post_get_test_partition(forihostid=1234567,
|
||||
idisk_id=self.disk.id,
|
||||
size_mib=128)
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
def test_create_partition_invalid_disk(self):
|
||||
# Test creation of partition with an invalid disk
|
||||
ndict = dbutils.post_get_test_partition(forihostid=self.ihost.id,
|
||||
idisk_id=1234567,
|
||||
size_mib=128)
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
def test_create_partition_invalid_size(self):
|
||||
# Test creation of partition with an invalid disk
|
||||
|
||||
# Available size is 256. Therefore a 256 partition is considered invalid.
|
||||
invalid_sizes = [None, 0, -100, 256, 257, 'xyz']
|
||||
for bad_size in invalid_sizes:
|
||||
ndict = dbutils.post_get_test_partition(forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
size_mib=bad_size)
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
def test_create_partition_invalid_additional_attributes(self):
|
||||
# Test creation of partition with an invalid attribute called 'foo'
|
||||
ndict = dbutils.post_get_test_partition(forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
foo='some value')
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
|
||||
class TestDeletePartition(TestPartition):
|
||||
""" Tests deletion of partitions.
|
||||
Typically delete APIs return NO CONTENT.
|
||||
python2 and python3 libraries may return different
|
||||
content_type (None, or empty json) when NO_CONTENT returned.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeletePartition, self).setUp()
|
||||
# create a partition
|
||||
self.partition = dbutils.create_test_partition(
|
||||
forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
size_mib=128)
|
||||
|
||||
def test_delete_partition(self):
|
||||
# Delete the partition
|
||||
uuid = self.partition.uuid
|
||||
response = self.delete(self.get_single_url(uuid),
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
# Verify the expected API response for the delete
|
||||
self.assertEqual(response.status_code, http_client.NO_CONTENT)
|
||||
|
||||
# Verify that the partition was deleted
|
||||
response = self.get_json(self.get_single_url(uuid),
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.status_code, http_client.NOT_FOUND)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
def test_double_delete_partition(self):
|
||||
# Delete the partition
|
||||
uuid = self.partition.uuid
|
||||
response = self.delete(self.get_single_url(uuid),
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
# Verify the expected API response for the delete
|
||||
self.assertEqual(response.status_code, http_client.NO_CONTENT)
|
||||
|
||||
# Verify that the partition was deleted
|
||||
response = self.get_json(self.get_single_url(uuid),
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.status_code, http_client.NOT_FOUND)
|
||||
|
||||
# Attempt to delete the partition again. This should fail.
|
||||
response = self.delete(self.get_single_url(uuid),
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.status_code, http_client.NOT_FOUND)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
class TestListPartitions(TestPartition):
|
||||
""" Partition list operations can only performed on
|
||||
a host or disk.
|
||||
Only user generated partitions are queryable.
|
||||
ie: type_guid=constants.USER_PARTITION_PHYSICAL_VOLUME
|
||||
"""
|
||||
expected_api_fields = ['uuid',
|
||||
'capabilities',
|
||||
'created_at',
|
||||
'device_node',
|
||||
'device_path',
|
||||
'end_mib',
|
||||
'idisk_uuid',
|
||||
'ihost_uuid',
|
||||
'ipv_uuid',
|
||||
'links',
|
||||
'size_mib',
|
||||
'start_mib',
|
||||
'status',
|
||||
'type_guid',
|
||||
'type_name',
|
||||
'updated_at']
|
||||
|
||||
hidden_api_fields = ['forihostid',
|
||||
'idisk_id',
|
||||
'foripvid']
|
||||
|
||||
def setUp(self):
|
||||
super(TestListPartitions, self).setUp()
|
||||
|
||||
def test_empty_list(self):
|
||||
response = self.get_json(self.API_PREFIX)
|
||||
self.assertEqual([], response[self.RESULT_KEY])
|
||||
|
||||
def test_unscoped_list_returns_empty(self):
|
||||
# create a partition
|
||||
self.partition = dbutils.create_test_partition(
|
||||
forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
type_guid=constants.USER_PARTITION_PHYSICAL_VOLUME,
|
||||
size_mib=128)
|
||||
|
||||
# Querying the base URL (unscoped)
|
||||
response = self.get_json(self.API_PREFIX)
|
||||
self.assertEqual(0, len(response[self.RESULT_KEY]))
|
||||
|
||||
def assert_fields(self, api_object):
|
||||
# check the uuid is a uuid
|
||||
assert(uuidutils.is_uuid_like(api_object['uuid']))
|
||||
|
||||
# Verify that expected attributes are returned
|
||||
for field in self.expected_api_fields:
|
||||
self.assertIn(field, api_object)
|
||||
|
||||
# Verify that hidden attributes are not returned
|
||||
for field in self.hidden_api_fields:
|
||||
self.assertNotIn(field, api_object)
|
||||
|
||||
def test_single_entry_by_host_list(self):
|
||||
expected_size = 32
|
||||
# create a partition
|
||||
self.partition = dbutils.create_test_partition(
|
||||
forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
type_guid=constants.USER_PARTITION_PHYSICAL_VOLUME,
|
||||
size_mib=expected_size)
|
||||
|
||||
# Querying the URL scoped by host
|
||||
response = self.get_json(self.get_host_scoped_url(self.ihost.uuid))
|
||||
|
||||
self.assertEqual(1, len(response[self.RESULT_KEY]))
|
||||
# Check the single result
|
||||
for result in response[self.RESULT_KEY]:
|
||||
# check fields are appropriate
|
||||
self.assert_fields(result)
|
||||
# check that the partition was created with the input size
|
||||
self.assertEqual(expected_size, result['size_mib'])
|
||||
|
||||
def test_many_entries_in_list(self):
|
||||
result_list = []
|
||||
for obj_id in range(100):
|
||||
partition = dbutils.create_test_partition(
|
||||
id=obj_id,
|
||||
forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
type_guid=constants.USER_PARTITION_PHYSICAL_VOLUME,
|
||||
size_mib=1)
|
||||
result_list.append(partition['uuid'])
|
||||
|
||||
response = self.get_json(self.get_host_scoped_url(self.ihost.uuid))
|
||||
self.assertEqual(len(result_list), len(response[self.RESULT_KEY]))
|
||||
|
||||
# Verify that the sorted list of uuids is the same
|
||||
uuids = [n['uuid'] for n in response[self.RESULT_KEY]]
|
||||
self.assertEqual(result_list.sort(), uuids.sort())
|
||||
|
||||
|
||||
class TestPatchPartition(TestPartition):
|
||||
""""Patch operations can only be applied to a partition in ready state
|
||||
"""
|
||||
patch_path_size = '/size_mib'
|
||||
patch_field = 'size_mib'
|
||||
patch_value = 64
|
||||
|
||||
def setUp(self):
|
||||
super(TestPatchPartition, self).setUp()
|
||||
# Only partition Add/Delete operations are allowed on an unprovisioned host
|
||||
# create a partition in ready state
|
||||
# device_path is required. Only the last partition can be modified.
|
||||
# setting the size small, since patching typically increases it.
|
||||
self.partition = dbutils.create_test_partition(
|
||||
forihostid=self.ihost.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
type_guid=constants.USER_PARTITION_PHYSICAL_VOLUME,
|
||||
status=constants.PARTITION_READY_STATUS,
|
||||
device_path=self.partition_device_path,
|
||||
size_mib=32)
|
||||
|
||||
def test_patch_invalid_field(self):
|
||||
# Pass a non existant field to be patched by the API
|
||||
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': '/junk_field',
|
||||
'value': self.patch_value,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
def test_patch_size_valid(self):
|
||||
# Update value of size field
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': self.patch_value,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
|
||||
# Verify that the attribute was updated
|
||||
response = self.get_json(self.get_single_url(self.partition.uuid))
|
||||
self.assertEqual(response[self.patch_field], self.patch_value)
|
||||
|
||||
def test_patch_invalid_size_reduction(self):
|
||||
# Pass an invalid size (making it smaller) to be patched by the API
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': 32,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
# Repeat the test, but passing the value as a 'string' instead of an int.
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': '32',
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
def test_patch_invalid_size_negative(self):
|
||||
# Pass an invalid size (making it negative) to be patched by the API
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': -1,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
# Repeat the test, but passing the value as a 'string' instead of an int.
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': '-1',
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
def test_patch_invalid_size_string(self):
|
||||
# Pass an invalid size (passing a junk string) to be patched by the API
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': 'xyz',
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
def test_update_partition_config_fails(self):
|
||||
# This is the same code that succeeds in a normal patch API call.
|
||||
# Testing when that update_partition_config fails
|
||||
self.fake_conductor_api.update_partition_config.side_effect = HTTPNotFound()
|
||||
response = self.patch_json(self.get_single_url(self.partition.uuid),
|
||||
[{'path': self.patch_path_size,
|
||||
'value': self.patch_value,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
|
@ -0,0 +1,349 @@
|
|||
#
|
||||
# Copyright (c) 2019 Wind River Systems, Inc.
|
||||
#
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
#
|
||||
|
||||
"""
|
||||
Tests for the API / pv / methods.
|
||||
"""
|
||||
|
||||
import mock
|
||||
import webtest.app
|
||||
from six.moves import http_client
|
||||
|
||||
from sysinv.common import constants
|
||||
from sysinv.openstack.common import uuidutils
|
||||
|
||||
from sysinv.tests.api import base
|
||||
from sysinv.tests.db import utils as dbutils
|
||||
|
||||
|
||||
class FakeConductorAPI(object):
|
||||
|
||||
def __init__(self, dbapi):
|
||||
self.dbapi = dbapi
|
||||
self.create_controller_filesystems = mock.MagicMock()
|
||||
|
||||
|
||||
class TestPV(base.FunctionalTest):
|
||||
|
||||
# can perform API operations on this object at a sublevel of host
|
||||
HOST_PREFIX = '/ihosts'
|
||||
|
||||
# API_HEADERS are a generic header passed to most API calls
|
||||
API_HEADERS = {'User-Agent': 'sysinv-test'}
|
||||
|
||||
# Generic path used when constructing disk objects
|
||||
disk_device_path = '/dev/disk/by-path/pci-0000:00:0d.0-ata-1.0'
|
||||
# The volume group name must be a member of LVG_ALLOWED_VGS
|
||||
# selecting nova local as our default for these tests
|
||||
lvm_vg_name = constants.LVG_NOVA_LOCAL
|
||||
|
||||
# API_PREFIX is the prefix for the URL
|
||||
API_PREFIX = '/ipvs'
|
||||
|
||||
# RESULT_KEY is the python table key for the list of results
|
||||
RESULT_KEY = 'ipvs'
|
||||
|
||||
# COMMON_FIELD is a field that is known to exist for inputs and outputs
|
||||
COMMON_FIELD = 'lvm_vg_name'
|
||||
|
||||
# expected_api_fields are attributes that should be populated by
|
||||
# an API query
|
||||
expected_api_fields = ['uuid',
|
||||
'pv_state',
|
||||
'pv_type',
|
||||
'disk_or_part_uuid',
|
||||
'disk_or_part_device_node',
|
||||
'disk_or_part_device_path',
|
||||
'lvm_pv_name',
|
||||
'lvm_vg_name',
|
||||
'lvm_pv_uuid',
|
||||
'lvm_pv_size',
|
||||
'lvm_pe_total',
|
||||
'lvm_pe_alloced',
|
||||
'ihost_uuid',
|
||||
'forilvgid',
|
||||
'ilvg_uuid',
|
||||
'capabilities']
|
||||
|
||||
# hidden_api_fields are attributes that should not be populated by
|
||||
# an API query
|
||||
hidden_api_fields = ['forihostid']
|
||||
|
||||
def setUp(self):
|
||||
super(TestPV, self).setUp()
|
||||
|
||||
# Mock the conductor API
|
||||
self.fake_conductor_api = FakeConductorAPI(self.dbapi)
|
||||
p = mock.patch('sysinv.conductor.rpcapi.ConductorAPI')
|
||||
self.mock_conductor_api = p.start()
|
||||
self.mock_conductor_api.return_value = self.fake_conductor_api
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Behave as if the API is running on controller-0
|
||||
p = mock.patch('socket.gethostname')
|
||||
self.mock_socket_gethostname = p.start()
|
||||
self.mock_socket_gethostname.return_value = 'controller-0'
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Behave as if running on a virtual system
|
||||
p = mock.patch('sysinv.common.utils.is_virtual')
|
||||
self.mock_utils_is_virtual = p.start()
|
||||
self.mock_utils_is_virtual.return_value = True
|
||||
self.addCleanup(p.stop)
|
||||
|
||||
# Create an isystem and load
|
||||
self.system = dbutils.create_test_isystem(
|
||||
capabilities={"cinder_backend": constants.CINDER_BACKEND_CEPH,
|
||||
"vswitch_type": constants.VSWITCH_TYPE_NONE,
|
||||
"region_config": False,
|
||||
"sdn_enabled": False,
|
||||
"shared_services": "[]"}
|
||||
)
|
||||
self.load = dbutils.create_test_load()
|
||||
# Create controller-0
|
||||
self.ihost = self._create_controller_0()
|
||||
# Create disk on the controller
|
||||
self.disk = self._create_disk(self.ihost.id)
|
||||
# Create logical volume group
|
||||
self.lvg = self._create_lvg(self.ihost.id,
|
||||
self.lvm_vg_name)
|
||||
|
||||
def get_single_url(self, uuid):
|
||||
return '%s/%s' % (self.API_PREFIX, uuid)
|
||||
|
||||
def get_host_scoped_url(self, host_uuid):
|
||||
return '%s/%s%s' % (self.HOST_PREFIX, host_uuid, self.API_PREFIX)
|
||||
|
||||
def _create_controller_0(self, **kw):
|
||||
# The host must be provisioned in order to perform PATCH operations
|
||||
ihost = dbutils.create_test_ihost(
|
||||
hostname='controller-0',
|
||||
mgmt_mac='01:34:67:9A:CD:F0',
|
||||
mgmt_ip='192.168.204.3',
|
||||
serialid='serial1',
|
||||
bm_ip='128.224.150.193',
|
||||
invprovision=constants.PROVISIONED,
|
||||
config_target='e4ec5ee2-967d-4b2d-8de8-f0a390fcbd35',
|
||||
config_applied='e4ec5ee2-967d-4b2d-8de8-f0a390fcbd35',
|
||||
**kw)
|
||||
return ihost
|
||||
|
||||
def _create_disk(self, ihost_id):
|
||||
return dbutils.create_test_idisk(
|
||||
device_node='/dev/sda',
|
||||
device_path=self.disk_device_path,
|
||||
available_mib=256,
|
||||
forihostid=ihost_id)
|
||||
|
||||
def _create_lvg(self, ihost_id, lvm_vg_name):
|
||||
return dbutils.create_test_lvg(forihostid=ihost_id,
|
||||
lvm_vg_name=lvm_vg_name)
|
||||
|
||||
# These methods have generic names and are overridden here
|
||||
# Future activity: Redo the subclasses to use mixins
|
||||
def assert_fields(self, api_object):
|
||||
# check the uuid is a uuid
|
||||
assert(uuidutils.is_uuid_like(api_object['uuid']))
|
||||
|
||||
# Verify that expected attributes are returned
|
||||
for field in self.expected_api_fields:
|
||||
self.assertIn(field, api_object)
|
||||
|
||||
# Verify that hidden attributes are not returned
|
||||
for field in self.hidden_api_fields:
|
||||
self.assertNotIn(field, api_object)
|
||||
|
||||
def get_post_object(self):
|
||||
return dbutils.post_get_test_pv(forihostid=self.ihost.id,
|
||||
forilvgid=self.lvg.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
lvm_vg_name=self.lvm_vg_name,
|
||||
disk_or_part_uuid=self.disk.uuid)
|
||||
|
||||
def _create_db_object(self, obj_id=None):
|
||||
return dbutils.create_test_pv(id=obj_id,
|
||||
forihostid=self.ihost.id,
|
||||
forilvgid=self.lvg.id,
|
||||
idisk_id=self.disk.id,
|
||||
idisk_uuid=self.disk.uuid,
|
||||
lvm_vg_name=self.lvm_vg_name,
|
||||
disk_or_part_uuid=self.disk.uuid)
|
||||
|
||||
|
||||
class TestPostPV(TestPV):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPostPV, self).setUp()
|
||||
|
||||
def test_create_success(self):
|
||||
# Test creation of object
|
||||
ndict = self.get_post_object()
|
||||
response = self.post_json(self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
# Check HTTP response is successful
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
|
||||
# Check that an expected field matches.
|
||||
self.assertEqual(response.json[self.COMMON_FIELD],
|
||||
ndict[self.COMMON_FIELD])
|
||||
|
||||
uuid = response.json['uuid']
|
||||
# Verify that the object was created and some basic attribute matches
|
||||
response = self.get_json(self.get_single_url(uuid))
|
||||
self.assertEqual(response[self.COMMON_FIELD],
|
||||
ndict[self.COMMON_FIELD])
|
||||
|
||||
def test_create_with_invalid_host(self):
|
||||
# Test creation with an invalid host
|
||||
ndict = self.get_post_object()
|
||||
ndict['forihostid'] = 1234567
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
def test_create_with_invalid_disk(self):
|
||||
# Test creation with an invalid disk
|
||||
ndict = self.get_post_object()
|
||||
ndict['idisk_id'] = 1234567
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
def test_create_with_invalid_additional_attributes(self):
|
||||
# Test creation with an invalid attribute called 'foo'
|
||||
ndict = self.get_post_object()
|
||||
ndict['foo'] = 'some value'
|
||||
self.assertRaises(webtest.app.AppError,
|
||||
self.post_json,
|
||||
self.API_PREFIX,
|
||||
ndict,
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
|
||||
class TestDeletePV(TestPV):
|
||||
""" Tests deletion.
|
||||
Typically delete APIs return NO CONTENT.
|
||||
python2 and python3 libraries may return different
|
||||
content_type (None, or empty json) when NO_CONTENT returned.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeletePV, self).setUp()
|
||||
# create a partition
|
||||
self.delete_object = self._create_db_object()
|
||||
|
||||
# The PV delete is not a blocking operation.
|
||||
# Cannot determine if the delete is completed, or has simply set the
|
||||
# pv_state to "removing"
|
||||
def test_delete(self):
|
||||
# Delete the API object
|
||||
uuid = self.delete_object.uuid
|
||||
response = self.delete(self.get_single_url(uuid),
|
||||
headers=self.API_HEADERS)
|
||||
|
||||
# Verify the expected API response for the delete
|
||||
self.assertEqual(response.status_code, http_client.NO_CONTENT)
|
||||
|
||||
|
||||
class TestListPVs(TestPV):
|
||||
""" PV list operations
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(TestListPVs, self).setUp()
|
||||
|
||||
def test_empty_list(self):
|
||||
response = self.get_json(self.API_PREFIX)
|
||||
self.assertEqual([], response[self.RESULT_KEY])
|
||||
|
||||
def test_single_entry_unscoped(self):
|
||||
# create a single object
|
||||
self.single_object = self._create_db_object()
|
||||
response = self.get_json(self.API_PREFIX)
|
||||
self.assertEqual(1, len(response[self.RESULT_KEY]))
|
||||
|
||||
def test_single_entry_by_host_list(self):
|
||||
# create a single object
|
||||
self.single_object = self._create_db_object()
|
||||
|
||||
# Querying the URL scoped by host
|
||||
response = self.get_json(self.get_host_scoped_url(self.ihost.uuid))
|
||||
|
||||
self.assertEqual(1, len(response[self.RESULT_KEY]))
|
||||
# Check the single result
|
||||
for result in response[self.RESULT_KEY]:
|
||||
# check fields are appropriate
|
||||
self.assert_fields(result)
|
||||
|
||||
def test_many_entries_in_list(self):
|
||||
result_list = []
|
||||
for obj_id in range(100):
|
||||
loop_object = self._create_db_object(obj_id=obj_id)
|
||||
result_list.append(loop_object['uuid'])
|
||||
|
||||
response = self.get_json(self.get_host_scoped_url(self.ihost.uuid))
|
||||
self.assertEqual(len(result_list), len(response[self.RESULT_KEY]))
|
||||
|
||||
# Verify that the sorted list of uuids is the same
|
||||
uuids = [n['uuid'] for n in response[self.RESULT_KEY]]
|
||||
self.assertEqual(result_list.sort(), uuids.sort())
|
||||
|
||||
|
||||
class TestPatchPV(TestPV):
|
||||
patch_path = '/lvm_pe_alloced'
|
||||
patch_field = 'lvm_pe_alloced'
|
||||
patch_value = 2
|
||||
|
||||
def setUp(self):
|
||||
super(TestPatchPV, self).setUp()
|
||||
self.patch_object = self._create_db_object()
|
||||
|
||||
def test_patch_invalid_field(self):
|
||||
# Pass a non existant field to be patched by the API
|
||||
|
||||
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
|
||||
[{'path': '/junk_field',
|
||||
'value': self.patch_value,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
||||
|
||||
def test_patch_valid(self):
|
||||
# Update value of patchable field
|
||||
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
|
||||
[{'path': self.patch_path,
|
||||
'value': self.patch_value,
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.OK)
|
||||
|
||||
# Verify that the attribute was updated
|
||||
response = self.get_json(self.get_single_url(self.patch_object.uuid))
|
||||
self.assertEqual(response[self.patch_field], self.patch_value)
|
||||
|
||||
def test_patch_invalid_value(self):
|
||||
# Pass a value that fails a semantic check when patched by the API
|
||||
# lvm_vg_name is restricted to a value in constants.LVG_ALLOWED_VGS
|
||||
|
||||
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
|
||||
[{'path': 'lvm_vg_name',
|
||||
'value': 'invalid_lvm_vg_name',
|
||||
'op': 'replace'}],
|
||||
headers=self.API_HEADERS,
|
||||
expect_errors=True)
|
||||
self.assertEqual(response.content_type, 'application/json')
|
||||
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
|
|
@ -19,6 +19,7 @@
|
|||
#
|
||||
|
||||
"""Sysinv test utilities."""
|
||||
import uuid
|
||||
|
||||
from sysinv.common import constants
|
||||
from sysinv.openstack.common import jsonutils as json
|
||||
|
@ -539,6 +540,7 @@ def get_test_idisk(**kw):
|
|||
'forihostid': kw.get('forihostid', 2),
|
||||
'foristorid': kw.get('foristorid', 2),
|
||||
'foripvid': kw.get('foripvid', 2),
|
||||
'available_mib': kw.get('available_mib', 100),
|
||||
'updated_at': None,
|
||||
'created_at': None,
|
||||
}
|
||||
|
@ -601,12 +603,27 @@ def get_test_lvg(**kw):
|
|||
return lvg
|
||||
|
||||
|
||||
def create_test_lvg(**kw):
|
||||
"""Create test lvg entry in DB and return LogicalVolumeGroup DB object.
|
||||
Function to be used to create test objects in the database.
|
||||
:param kw: kwargs with overriding values for attributes.
|
||||
kw requires: lvm_vg_name
|
||||
:returns: Test LogicalVolumeGroup DB object.
|
||||
"""
|
||||
lvg = get_test_lvg(**kw)
|
||||
if 'uuid' not in kw:
|
||||
del lvg['uuid']
|
||||
dbapi = db_api.get_instance()
|
||||
forihostid = lvg['forihostid']
|
||||
return dbapi.ilvg_create(forihostid, lvg)
|
||||
|
||||
|
||||
def get_test_pv(**kw):
|
||||
pv = {
|
||||
'id': kw.get('id', 2),
|
||||
'uuid': kw.get('uuid'),
|
||||
'lvm_vg_name': kw.get('lvm_vg_name'),
|
||||
'disk_or_part_uuid': kw.get('disk_or_part_uuid', 2),
|
||||
'disk_or_part_uuid': kw.get('disk_or_part_uuid', str(uuid.uuid4())),
|
||||
'disk_or_part_device_path': kw.get('disk_or_part_device_path',
|
||||
'/dev/disk/by-path/pci-0000:00:0d.0-ata-3.0'),
|
||||
'forihostid': kw.get('forihostid', 2),
|
||||
|
@ -615,6 +632,31 @@ def get_test_pv(**kw):
|
|||
return pv
|
||||
|
||||
|
||||
def create_test_pv(**kw):
|
||||
"""Create test pv entry in DB and return PV DB object.
|
||||
Function to be used to create test PV objects in the database.
|
||||
:param kw: kwargs with overriding values for pv's attributes.
|
||||
kw typically requires forihostid, forilvgid
|
||||
:returns: Test PV DB object.
|
||||
"""
|
||||
pv = get_test_pv(**kw)
|
||||
if 'uuid' not in kw:
|
||||
del pv['uuid']
|
||||
dbapi = db_api.get_instance()
|
||||
forihostid = pv['forihostid']
|
||||
return dbapi.ipv_create(forihostid, pv)
|
||||
|
||||
|
||||
def post_get_test_pv(**kw):
|
||||
pv = get_test_pv(**kw)
|
||||
|
||||
# When invoking a POST the following fields should not be populated:
|
||||
del pv['uuid']
|
||||
del pv['id']
|
||||
|
||||
return pv
|
||||
|
||||
|
||||
def get_test_storage_backend(**kw):
|
||||
inv = {
|
||||
'id': kw.get('id'),
|
||||
|
@ -921,6 +963,53 @@ def post_get_test_interface_network(**kw):
|
|||
return inv
|
||||
|
||||
|
||||
def get_test_partition(**kw):
|
||||
"""get_test_partition will fail unless
|
||||
forihostid is provided
|
||||
disk_id is provided
|
||||
size_mib must be a valid number
|
||||
"""
|
||||
partition = {
|
||||
'uuid': kw.get('uuid'),
|
||||
'start_mib': kw.get('start_mib'),
|
||||
'end_mib': kw.get('end_mib'),
|
||||
'size_mib': kw.get('size_mib'),
|
||||
'device_path': kw.get('device_path'),
|
||||
'device_node': kw.get('device_node'),
|
||||
'forihostid': kw.get('forihostid'),
|
||||
'idisk_id': kw.get('idisk_id'),
|
||||
'idisk_uuid': kw.get('idisk_uuid'),
|
||||
'type_guid': kw.get('type_guid'),
|
||||
'status': kw.get('status',
|
||||
constants.PARTITION_CREATE_ON_UNLOCK_STATUS),
|
||||
}
|
||||
return partition
|
||||
|
||||
|
||||
def create_test_partition(**kw):
|
||||
"""Create test partition entry in DB and return Partition DB
|
||||
object. Function to be used to create test Partition objects in the database.
|
||||
:param kw: kwargs with overriding values for partition's attributes.
|
||||
:returns: Test Partition DB object.
|
||||
"""
|
||||
partition = get_test_partition(**kw)
|
||||
if 'uuid' not in kw:
|
||||
del partition['uuid']
|
||||
dbapi = db_api.get_instance()
|
||||
forihostid = partition['forihostid']
|
||||
return dbapi.partition_create(forihostid, partition)
|
||||
|
||||
|
||||
def post_get_test_partition(**kw):
|
||||
partition = get_test_partition(**kw)
|
||||
|
||||
# When invoking a POST the following fields should not be populated:
|
||||
del partition['uuid']
|
||||
del partition['status']
|
||||
|
||||
return partition
|
||||
|
||||
|
||||
def get_test_interface_datanetwork(**kw):
|
||||
inv = {
|
||||
'id': kw.get('id'),
|
||||
|
|
Loading…
Reference in New Issue