New RESTful API and DB schema for network to address-pools.

This change introduces a new API that leverages the network-addrpool
table to establish relationships between network and address pool
resources. This functionality enhances network management by enabling
efficient address pool allocation and association with specific
networks.

The new dual-stack support introduces a prioritization mechanism for
address pools associated with a network. The primary pool will be the
default choice for address selection unless a preferred pool is
explicitly specified. For optimal flexibility, each network supports
a maximum of two pools – one per address family (IPv4 and IPv6).
Networks configured for PXE boot will currently only offer IPv4 address pools due to limitations in PXE over IPv6 support.

To enhance consistency during address pool assignments, the network
table now includes the primary_pool_family field. Networks initially
assigned to IPv4 pools cannot have IPv6 pools designated as primary
later. A separate task will make the network field pool_uuid optional.

To streamline access to primary pool addresses, a wrapper function
named get_primary_address_by_name() was implemented. This function
ensures retrieval of the correct address by name, avoiding potential
confusion caused by duplicate entries due to the presence of both
IPv4 and IPv6 pools.

Upgrade will be handled in a separate task within this story.

Test Plan
[PASS] new set of unit tests
[PASS] Install AIO-SX and AIO-DX and verify:
       - no alarms or failed services
       - direct access to the database tables to verify correct values
       - Lock/Unlock and swact
[PASS] Install DC (1 subcloud AIO-SX)

Story: 2011027
Task: 49627

Change-Id: I52d66804560b6f10bcad62b20485ac8d17b6b85f
Signed-off-by: Andre Kantek <andrefernandozanella.kantek@windriver.com>
This commit is contained in:
Andre Kantek 2024-02-13 10:49:59 -03:00
parent 7964871f9b
commit 634d491647
33 changed files with 2143 additions and 325 deletions

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2015 Wind River Systems, Inc.
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -23,8 +23,8 @@ def _print_network_show(obj):
help="UUID of IP network")
def do_network_show(cc, args):
"""Show IP network details."""
labels = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid']
fields = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid']
labels = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid', 'primary_pool_family']
fields = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid', 'primary_pool_family']
network = cc.network.get(args.network_uuid)
data = [(f, getattr(network, f, '')) for f in fields]
utils.print_tuple_list(data, tuple_labels=labels)
@ -32,8 +32,8 @@ def do_network_show(cc, args):
def do_network_list(cc, args):
"""List IP networks on host."""
labels = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid']
fields = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid']
labels = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid', 'primary_pool_family']
fields = ['id', 'uuid', 'name', 'type', 'dynamic', 'pool_uuid', 'primary_pool_family']
networks = cc.network.list()
utils.print_list(networks, fields, labels, sortby=1)

View File

@ -59,6 +59,7 @@ from sysinv.api.controllers.v1 import lvg
from sysinv.api.controllers.v1 import license
from sysinv.api.controllers.v1 import memory
from sysinv.api.controllers.v1 import network
from sysinv.api.controllers.v1 import network_addrpool
from sysinv.api.controllers.v1 import network_oam
from sysinv.api.controllers.v1 import node
from sysinv.api.controllers.v1 import ntp
@ -207,6 +208,9 @@ class V1(base.APIBase):
networks = [link.Link]
"Links to the network resource"
network_addrpools = [link.Link]
"Links to the network address-pool resource"
datanetworks = [link.Link]
"Links to the datanetwork resource"
@ -671,6 +675,14 @@ class V1(base.APIBase):
bookmark=True)
]
v1.network_addrpools = [link.Link.make_link('self', pecan.request.host_url,
'network_addrpools', ''),
link.Link.make_link('bookmark',
pecan.request.host_url,
'network_addrpools', '',
bookmark=True)
]
v1.interface_networks = [link.Link.make_link('self', pecan.request.host_url,
'interface_networks', ''),
link.Link.make_link('bookmark',
@ -991,6 +1003,7 @@ class Controller(rest.RestController):
device_image_state = device_image_state.DeviceImageStateController()
device_labels = device_label.DeviceLabelController()
restore = restore.RestoreController()
network_addresspools = network_addrpool.NetworkAddresspoolController()
@wsme_pecan.wsexpose(V1)
def get(self):

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2015-2022 Wind River Systems, Inc.
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -383,12 +383,13 @@ class AddressController(rest.RestController):
def _check_name_conflict(self, address):
name = address.get('name', None)
family = address.get('family', 0)
if name is None:
return
try:
pecan.request.dbapi.address_get_by_name(name)
pecan.request.dbapi.address_get_by_name_and_family(name, family)
raise exception.AddressNameExists(name=name)
except exception.AddressNotFoundByName:
except exception.AddressNotFoundByNameAndFamily:
pass
def _check_subnet_valid(self, pool, address):

View File

@ -1335,9 +1335,10 @@ class HostController(rest.RestController):
return 0
# get the active controller's floating ip address
floating_address = pecan.request.dbapi.address_get_by_name(
floating_address = utils.get_primary_address_by_name(
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT)).address
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT).address
try:
# get the management IP for the host that matches this mgmt mac
ihost_obj = pecan.request.dbapi.ihost_get_by_mgmt_mac(mac)
@ -1346,8 +1347,9 @@ class HostController(rest.RestController):
mgmt_addr = ihost_obj['mgmt_ip']
if mgmt_addr is None:
address_name = cutils.format_address_name(hostname,
constants.NETWORK_TYPE_MGMT)
address = pecan.request.dbapi.address_get_by_name(address_name)
constants.NETWORK_TYPE_MGMT)
address = utils.get_primary_address_by_name(address_name,
constants.NETWORK_TYPE_MGMT, True)
mgmt_addr = address.address
if mgmt_addr is not None:
@ -1654,7 +1656,8 @@ class HostController(rest.RestController):
mgmt_address_name = cutils.format_address_name(
ihost_dict['hostname'], constants.NETWORK_TYPE_MGMT)
self._validate_address_not_allocated(mgmt_address_name,
ihost_dict.get('mgmt_ip'))
ihost_dict.get('mgmt_ip'),
constants.NETWORK_TYPE_MGMT)
if ihost_dict.get('mgmt_ip'):
self._validate_ip_in_mgmt_network(ihost_dict['mgmt_ip'])
@ -1697,7 +1700,8 @@ class HostController(rest.RestController):
# Notify maintenance about updated mgmt_ip
address_name = cutils.format_address_name(ihost_obj.hostname,
constants.NETWORK_TYPE_MGMT)
address = pecan.request.dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(address_name,
constants.NETWORK_TYPE_MGMT, True)
ihost_obj['mgmt_ip'] = address.address
# Add ihost to mtc
@ -2295,7 +2299,8 @@ class HostController(rest.RestController):
else:
address_name = cutils.format_address_name(ihost_obj.hostname,
constants.NETWORK_TYPE_MGMT)
address = pecan.request.dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(address_name,
constants.NETWORK_TYPE_MGMT, True)
ihost_obj['mgmt_ip'] = address.address
hostupdate.notify_mtce = True
@ -3170,7 +3175,7 @@ class HostController(rest.RestController):
utils.validate_address_within_nework(ip, network)
@staticmethod
def _validate_address_not_allocated(name, ip_address):
def _validate_address_not_allocated(name, ip_address, net_type):
"""Validate that address isn't allocated
:param name: Address name to check isn't allocated.
@ -3185,7 +3190,7 @@ class HostController(rest.RestController):
except exception.AddressNotFoundByAddress:
pass
try:
address = pecan.request.dbapi.address_get_by_name(name)
address = utils.get_primary_address_by_name(name, net_type, True)
if address.address != ip_address:
raise exception.AddressAlreadyAllocated(address=name)
except exception.AddressNotFoundByName:
@ -3574,7 +3579,7 @@ class HostController(rest.RestController):
and host['config_status'] == constants.CONFIG_STATUS_OUT_OF_DATE
and host['administrative'] == constants.ADMIN_LOCKED):
flip = utils.config_flip_reboot_required(host['config_target'])
if(utils.config_is_reboot_required(host['config_target'])
if (utils.config_is_reboot_required(host['config_target'])
and flip == host['config_applied']):
check_sriov_port_data = False

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -526,9 +526,10 @@ def _update_host_admin_address(host, interface):
address_name = cutils.format_address_name(host.hostname,
constants.NETWORK_TYPE_ADMIN)
try:
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
except exception.AddressNotFoundByName:
# For non-controller hosts, allocate address from pool if dynamic
admin_network = pecan.request.dbapi.network_get_by_type(
@ -546,17 +547,19 @@ def _update_host_oam_address(host, interface):
else:
address_name = cutils.format_address_name(host.hostname,
constants.NETWORK_TYPE_OAM)
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
def _update_host_pxeboot_address(host, interface):
address_name = cutils.format_address_name(host.hostname,
constants.NETWORK_TYPE_PXEBOOT)
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
def _update_host_cluster_address(host, interface):
@ -568,9 +571,10 @@ def _update_host_cluster_address(host, interface):
address_name = cutils.format_address_name(
host.hostname, constants.NETWORK_TYPE_CLUSTER_HOST)
try:
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
except exception.AddressNotFoundByName:
cluster_host_network = pecan.request.dbapi.network_get_by_type(
constants.NETWORK_TYPE_CLUSTER_HOST)
@ -583,18 +587,20 @@ def _update_host_cluster_address(host, interface):
def _update_host_ironic_address(host, interface):
address_name = cutils.format_address_name(host.hostname,
constants.NETWORK_TYPE_IRONIC)
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
def _update_host_storage_address(host, interface):
address_name = cutils.format_address_name(host.hostname,
constants.NETWORK_TYPE_STORAGE)
try:
address = pecan.request.dbapi.address_get_by_name(address_name)
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(address.uuid, updates)
addresses = pecan.request.dbapi.address_get_by_name(address_name)
for addr in addresses:
updates = {'interface_id': interface['id']}
pecan.request.dbapi.address_update(addr.uuid, updates)
except exception.AddressNotFoundByName:
# For non-controller hosts, allocate address from pool if dynamic
storage_network = pecan.request.dbapi.network_get_by_type(

View File

@ -1,10 +1,9 @@
#
# Copyright (c) 2021 Wind River Systems, Inc.
# Copyright (c) 2021-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import pecan
from pecan import rest
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
@ -12,6 +11,7 @@ from six.moves.urllib.parse import urlparse
from sysinv.api.controllers.v1 import base
from sysinv.api.controllers.v1 import collection
from sysinv.api.controllers.v1 import utils as apiutils
from sysinv.common import constants
from sysinv.common import kubernetes
from sysinv.common import utils
@ -127,5 +127,6 @@ class KubeClusterController(rest.RestController):
def _get_oam_address(self):
address_name = utils.format_address_name(
constants.CONTROLLER_HOSTNAME, constants.NETWORK_TYPE_OAM)
address = pecan.request.dbapi.address_get_by_name(address_name)
address = apiutils.get_primary_address_by_name(address_name,
constants.NETWORK_TYPE_OAM, True)
return address.address

View File

@ -15,12 +15,11 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2015-2023 Wind River Systems, Inc.
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import collections
import pecan
from pecan import rest
import uuid
@ -83,6 +82,9 @@ class Network(base.APIBase):
pool_uuid = wtypes.text
"The UUID of the address pool associated with the network"
primary_pool_family = wtypes.text
"The primary pool address family (IPv4 or IPv6)"
def __init__(self, **kwargs):
self.fields = list(objects.network.fields.keys())
for k in self.fields:
@ -95,7 +97,8 @@ class Network(base.APIBase):
network = Network(**rpc_network.as_dict())
if not expand:
network.unset_fields_except(['id', 'uuid', 'type', 'name',
'dynamic', 'pool_uuid'])
'dynamic', 'pool_uuid',
'primary_pool_family'])
return network
def _validate_network_type(self):
@ -189,154 +192,6 @@ class NetworkController(rest.RestController):
if addresses:
raise exception.NetworkAddressPoolInUse()
def _create_network_addresses(self, pool, network):
if network['type'] == constants.NETWORK_TYPE_MGMT:
addresses = self._create_mgmt_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_ADMIN:
addresses = self._create_admin_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_PXEBOOT:
addresses = self._create_pxeboot_network_address()
elif network['type'] == constants.NETWORK_TYPE_CLUSTER_HOST:
addresses = self._create_cluster_host_network_address()
elif network['type'] == constants.NETWORK_TYPE_OAM:
addresses = self._create_oam_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_MULTICAST:
addresses = self._create_multicast_network_address()
elif network['type'] == constants.NETWORK_TYPE_IRONIC:
addresses = self._create_ironic_network_address()
elif network['type'] == constants.NETWORK_TYPE_SYSTEM_CONTROLLER:
addresses = self._create_system_controller_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_STORAGE:
addresses = self._create_storage_network_address()
else:
return
self._populate_network_addresses(pool, network, addresses)
def _create_mgmt_network_address(self, pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
else:
addresses.update({constants.CONTROLLER_HOSTNAME: None})
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
else:
addresses.update({constants.CONTROLLER_0_HOSTNAME: None})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
else:
addresses.update({constants.CONTROLLER_1_HOSTNAME: None})
if pool.gateway_address is not None:
if utils.get_distributed_cloud_role() == \
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
# In subcloud configurations, the management gateway is used
# to communicate with the central cloud.
addresses[constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME] =\
pool.gateway_address
else:
addresses[constants.CONTROLLER_GATEWAY] =\
pool.gateway_address
return addresses
def _create_admin_network_address(self, pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
else:
addresses.update({constants.CONTROLLER_HOSTNAME: None})
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
else:
addresses.update({constants.CONTROLLER_0_HOSTNAME: None})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
else:
addresses.update({constants.CONTROLLER_1_HOSTNAME: None})
if pool.gateway_address is not None:
if utils.get_distributed_cloud_role() == \
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
# In subcloud configurations, the admin gateway is used
# to communicate with the central cloud.
addresses[constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME] =\
pool.gateway_address
else:
addresses[constants.CONTROLLER_GATEWAY] =\
pool.gateway_address
return addresses
def _create_pxeboot_network_address(self):
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
def _create_cluster_host_network_address(self):
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
def _create_oam_network_address(self, pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
if utils.get_system_mode() != constants.SYSTEM_MODE_SIMPLEX:
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
if pool.gateway_address:
addresses.update(
{constants.CONTROLLER_GATEWAY: pool.gateway_address})
return addresses
def _create_multicast_network_address(self):
addresses = collections.OrderedDict()
addresses[constants.SM_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.MTCE_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.PATCH_CONTROLLER_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.PATCH_AGENT_MULTICAST_MGMT_IP_NAME] = None
return addresses
def _create_system_controller_network_address(self, pool):
addresses = {}
return addresses
def _create_ironic_network_address(self):
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
def _create_storage_network_address(self):
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
def _populate_network_addresses(self, pool, network, addresses):
opt_fields = {}
for name, address in addresses.items():
@ -344,6 +199,7 @@ class NetworkController(rest.RestController):
if not address:
address = address_pool.AddressPoolController.allocate_address(
pool, order=address_pool.SEQUENTIAL_ALLOCATION)
LOG.debug("address_name={} address={}".format(address_name, address))
values = {
'address_pool_id': pool.id,
@ -395,11 +251,23 @@ class NetworkController(rest.RestController):
if pool_uuid:
pool = pecan.request.dbapi.address_pool_get(pool_uuid)
network.update({'address_pool_id': pool.id})
if self._use_legacy_api():
# this value is filled here based on the provided pool, if not provided
# it will be executed via network-addrpool API
network.update({'primary_pool_family': constants.IP_FAMILIES[pool.family]})
# Attempt to create the new network record
result = pecan.request.dbapi.network_create(network)
self._create_network_addresses(pool, network)
if pool_uuid and self._use_legacy_api():
# create here the network-addrpool object
net_pool = pecan.request.dbapi.network_addrpool_create({"address_pool_id": pool.id,
"network_id": result.id})
LOG.info(f"added network-addrpool {net_pool.uuid}")
# the new network-addrpool API will take care of addresses
addresses = utils.PopulateAddresses.create_network_addresses(pool, network)
self._populate_network_addresses(pool, network, addresses)
# If the host has already been created, make an RPC request
# reconfigure the service endpoints. As oam network is processed
@ -421,6 +289,7 @@ class NetworkController(rest.RestController):
network['type'] in [constants.NETWORK_TYPE_SYSTEM_CONTROLLER,
constants.NETWORK_TYPE_SYSTEM_CONTROLLER_OAM]:
self._update_system_controller_network_config(network['type'])
return Network.convert_with_links(result)
def _update_system_controller_network_config(self, type):
@ -433,6 +302,12 @@ class NetworkController(rest.RestController):
pecan.request.rpcapi.update_dnsmasq_config(
pecan.request.context)
def _use_legacy_api(self):
""" Using this function to mark the usage of this API in legacy mode
when the pool's UUID is provided, otherwise it is using the new
network-addrpool API"""
return True
@wsme_pecan.wsexpose(NetworkCollection,
types.uuid, int, wtypes.text, wtypes.text)
def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc'):
@ -484,4 +359,15 @@ class NetworkController(rest.RestController):
.format(network['type'], network_uuid, host['hostname']))
raise wsme.exc.ClientSideError(msg)
if network.pool_uuid and self._use_legacy_api():
try:
# remove the network-addrpool object that creates the relationship for both tables
pool = pecan.request.dbapi.address_pool_get(network.pool_uuid)
values = {'address_pool_id': pool.id, 'network_id': network.id}
net_pool = pecan.request.dbapi.network_addrpool_query(values)
pecan.request.dbapi.network_addrpool_destroy(net_pool.uuid)
LOG.info(f"removed network-addrpool {net_pool.uuid}")
except Exception as ex:
LOG.exception(f"Exception when removing the network-addrpool[{net_pool.uuid}]: {ex}")
pecan.request.dbapi.network_destroy(network_uuid)

View File

@ -0,0 +1,328 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from oslo_log import log
import wsme
from wsme import types as wtypes
import pecan
from pecan import rest
import uuid
import wsmeext.pecan as wsme_pecan
from sysinv.api.controllers.v1 import address_pool
from sysinv.api.controllers.v1 import base
from sysinv.api.controllers.v1 import types
from sysinv.api.controllers.v1 import collection
from sysinv.api.controllers.v1 import utils
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils as cutils
from sysinv import objects
LOG = log.getLogger(__name__)
class NetworkAddresspool(base.APIBase):
"""API representation of an IP network.
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of an IP
network.
"""
id = int
"Unique ID for this network"
uuid = types.uuid
"Unique UUID for this network"
address_pool_id = int
"Unique ID of the associated network"
address_pool_uuid = types.uuid
"Unique UUID of the associated network"
address_pool_name = wtypes.text
"User defined name of the associated network"
network_id = int
"Unique ID of the associated network"
network_uuid = types.uuid
"Unique UUID of the associated network"
network_name = wtypes.text
"User defined name of the associated network"
def __init__(self, **kwargs):
self.fields = list(objects.network_addrpool.fields.keys())
for k in self.fields:
if not hasattr(self, k):
continue
setattr(self, k, kwargs.get(k, wtypes.Unset))
@classmethod
def convert_with_links(cls, rpc_network_addrpool, expand=True):
network_addrpool = NetworkAddresspool(**rpc_network_addrpool.as_dict())
if not expand:
network_addrpool.unset_fields_except(['id', 'uuid',
'address_pool_id',
'address_pool_uuid',
'address_pool_name',
'network_id',
'network_uuid',
'network_name'])
return network_addrpool
def validate_syntax(self):
"""
Validates the syntax of each field.
"""
pass
class NetworkAddresspoolCollection(collection.Collection):
"""API representation of a collection of IP networks."""
network_addresspools = [NetworkAddresspool]
"A list containing Network Addresspool objects"
def __init__(self, **kwargs):
self._type = 'network_addresspools'
@classmethod
def convert_with_links(cls, rpc_network_addresspolls, limit, url=None,
expand=False, **kwargs):
collection = NetworkAddresspoolCollection()
collection.network_addresspools = [NetworkAddresspool.convert_with_links(n, expand)
for n in rpc_network_addresspolls]
collection.next = collection.get_next(limit, url=url, **kwargs)
return collection
LOCK_NAME = 'NetworkController'
class NetworkAddresspoolController(rest.RestController):
"""REST controller for NetworkAddresspool."""
def __init__(self, parent=None, **kwargs):
self._parent = parent
def _get_one(self, network_addrpool_uuid):
rpc_network_addrpool = objects.network_addrpool.get_by_uuid(
pecan.request.context, network_addrpool_uuid)
return NetworkAddresspool.convert_with_links(rpc_network_addrpool)
def _get_network_addrpool_collection(self, marker=None, limit=None, sort_key=None,
sort_dir=None, expand=False,
resource_url=None):
limit = utils.validate_limit(limit)
sort_dir = utils.validate_sort_dir(sort_dir)
marker_obj = None
if marker:
marker_obj = objects.network_addrpool.get_by_uuid(
pecan.request.context, marker)
networks = pecan.request.dbapi.network_addrpool_get_all(
limit, marker_obj, sort_key=sort_key, sort_dir=sort_dir)
return NetworkAddresspoolCollection.convert_with_links(
networks, limit, url=resource_url, expand=expand,
sort_key=sort_key, sort_dir=sort_dir)
def _populate_network_addresses(self, pool, network, addresses, net_pool):
if_net_list = pecan.request.dbapi.interface_network_get_by_network_id(net_pool.network_id)
hostname_dict = dict()
for if_net in if_net_list:
host = pecan.request.dbapi.ihost_get(if_net.forihostid)
hostname_dict.update({host.hostname: if_net})
opt_fields = {}
for name, address in addresses.items():
address_name = cutils.format_address_name(name, network['type'])
if not address:
address = address_pool.AddressPoolController.allocate_address(
pool, order=address_pool.SEQUENTIAL_ALLOCATION)
LOG.debug("address_name={} address={}".format(address_name, address))
values = {
'address_pool_id': pool.id,
'address': str(address),
'prefix': pool['prefix'],
'family': pool['family'],
'enable_dad': constants.IP_DAD_STATES[pool['family']],
'name': address_name,
}
addr_intf = dict()
for hostname in hostname_dict:
if address_name == f"{hostname}-{net_pool.network_type}":
addr_intf.update({'interface_id':
hostname_dict[hostname].interface_id})
break
if utils.get_system_mode() == constants.SYSTEM_MODE_SIMPLEX \
and net_pool.network_type == constants.NETWORK_TYPE_OAM:
if address_name == f"{constants.CONTROLLER}-{net_pool.network_type}":
addr_intf.update({'interface_id':
hostname_dict[hostname].interface_id})
# Check for address existent before creation
try:
address_obj = pecan.request.dbapi.address_get_by_address(
str(address))
upd_values = {'name': address_name}
upd_values.update(addr_intf)
pecan.request.dbapi.address_update(address_obj.uuid, upd_values)
except exception.AddressNotFoundByAddress:
values.update(addr_intf)
address_obj = pecan.request.dbapi.address_create(values)
# Update address pool with associated address
if name == constants.CONTROLLER_0_HOSTNAME:
opt_fields.update({
address_pool.ADDRPOOL_CONTROLLER0_ADDRESS_ID:
address_obj.id})
elif name == constants.CONTROLLER_1_HOSTNAME:
opt_fields.update({
address_pool.ADDRPOOL_CONTROLLER1_ADDRESS_ID:
address_obj.id})
elif name == constants.CONTROLLER_HOSTNAME:
opt_fields.update({
address_pool.ADDRPOOL_FLOATING_ADDRESS_ID: address_obj.id})
elif name == constants.CONTROLLER_GATEWAY:
opt_fields.update({
address_pool.ADDRPOOL_GATEWAY_ADDRESS_ID: address_obj.id})
# update pool with addresses IDs
if opt_fields:
pecan.request.dbapi.address_pool_update(pool.uuid, opt_fields)
def _create_network_addrpool(self, network_addrpool):
# Perform syntactic validation
network_addrpool.validate_syntax()
network_addrpool = network_addrpool.as_dict()
network_addrpool['uuid'] = str(uuid.uuid4())
network = pecan.request.dbapi.network_get(network_addrpool['network_uuid'])
pool = pecan.request.dbapi.address_pool_get(network_addrpool['address_pool_uuid'])
pool_family = constants.IP_FAMILIES[pool.family]
net_pool_list = pecan.request.dbapi.network_addrpool_get_by_network_id(network.id)
if len(net_pool_list) == 2:
# each network can have a max of 2 address-pools
attached_pools = list()
for netpool in net_pool_list:
this_pool = pecan.request.dbapi.address_pool_get(netpool.address_pool_uuid)
attached_pools.append(this_pool.name)
msg = (f"Network of type {network.type} already have "
f"maximum of 2 pools attached: {attached_pools}")
raise wsme.exc.ClientSideError(msg)
elif len(net_pool_list) == 1:
# each network can have only 1 address-pool per protocol
this_pool = pecan.request.dbapi.address_pool_get(net_pool_list[0].address_pool_uuid)
if this_pool.family == pool.family:
msg = (f"Network of type {network.type} already have "
f"pool {this_pool.name} for family {pool_family}")
raise wsme.exc.ClientSideError(msg)
elif len(net_pool_list) == 0:
# if the network have primary_pool_family set, check address family
if not network.pool_uuid and network.primary_pool_family:
if pool_family != network.primary_pool_family:
msg = (f"Network of type {network.type} requires "
f"primary pool of family {network.primary_pool_family}")
raise wsme.exc.ClientSideError(msg)
if network.type == constants.NETWORK_TYPE_PXEBOOT \
and pool.family != constants.IPV4_FAMILY:
msg = (f"Network of type {network.type} only supports "
f"pool of family {network.primary_pool_family}")
raise wsme.exc.ClientSideError(msg)
net_pool_list = pecan.request.dbapi.network_addrpool_get_by_pool_id(pool.id)
if len(net_pool_list):
msg = ("Address pool already in use by another network")
raise wsme.exc.ClientSideError(msg)
result = pecan.request.dbapi.network_addrpool_create({'address_pool_id': pool.id,
'network_id': network.id})
values = {}
if not network.pool_uuid and not network.primary_pool_family:
values = {'address_pool_id': pool.id, 'primary_pool_family': pool_family}
elif not network.pool_uuid and network.primary_pool_family:
values = {'address_pool_id': pool.id}
if values:
pecan.request.dbapi.network_update(network.uuid, values)
# add the addresses
addresses = utils.PopulateAddresses.create_network_addresses(pool, network)
self._populate_network_addresses(pool, network, addresses, result)
return NetworkAddresspool.convert_with_links(result)
@wsme_pecan.wsexpose(NetworkAddresspool, types.uuid)
def get_one(self, network_addrpool_uuid):
"""Retrieve a single Network-Addresspool object."""
return self._get_one(network_addrpool_uuid)
@wsme_pecan.wsexpose(NetworkAddresspoolCollection,
types.uuid, int, wtypes.text, wtypes.text)
def get_all(self, marker=None, limit=None, sort_key='id', sort_dir='asc'):
"""Retrieve a list of Network-Addresspool objects."""
return self._get_network_addrpool_collection(marker, limit,
sort_key=sort_key,
sort_dir=sort_dir)
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(NetworkAddresspool, body=NetworkAddresspool)
def post(self, network_addrpool):
"""Create a new Network-Addresspool object."""
new_net_pool = self._create_network_addrpool(network_addrpool)
return new_net_pool
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, network_addrpool_uuid):
"""Delete a Network-Addresspool object."""
to_delete = pecan.request.dbapi.network_addrpool_get(network_addrpool_uuid)
network = pecan.request.dbapi.network_get(to_delete['network_uuid'])
pool = pecan.request.dbapi.address_pool_get(to_delete['address_pool_uuid'])
if cutils.is_initial_config_complete():
if (network['type'] in [constants.NETWORK_TYPE_OAM,
constants.NETWORK_TYPE_CLUSTER_HOST,
constants.NETWORK_TYPE_PXEBOOT,
constants.NETWORK_TYPE_CLUSTER_POD,
constants.NETWORK_TYPE_CLUSTER_SERVICE,
constants.NETWORK_TYPE_STORAGE]):
msg = (f"Cannot delete relation for network {network.uuid}"
f" with type {network['type']} after initial configuration completion")
raise wsme.exc.ClientSideError(msg)
elif (network['type'] in [constants.NETWORK_TYPE_MGMT] and
utils.get_system_mode() != constants.SYSTEM_MODE_SIMPLEX):
msg = (f"Cannot delete relation for network {network.uuid}"
f" with type {network['type']} after initial configuration completion")
raise wsme.exc.ClientSideError(msg)
if network.pool_uuid == pool.uuid:
# this operation is blocked for now
msg = (f"Cannot remove primary pool '{pool.name}' from '{network.name}'"
f" with type {network['type']}")
raise wsme.exc.ClientSideError(msg)
# since the association with the pool is removed, remove the interface id from the address
pool_addresses = pecan.request.dbapi.addresses_get_by_pool(pool.id)
for addr in pool_addresses:
if addr.interface_id:
pecan.request.dbapi.address_update(addr.uuid, {'interface_id': None})
pecan.request.dbapi.network_addrpool_destroy(to_delete.uuid)

View File

@ -16,10 +16,11 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2022 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
from eventlet.green import subprocess
import collections
import jsonpatch
import netaddr
import os
@ -41,6 +42,7 @@ from sysinv.common import exception
from sysinv.common import health
from sysinv.helm import common as helm_common
LOG = log.getLogger(__name__)
CONF = cfg.CONF
@ -349,7 +351,7 @@ def lookup_static_ip_address(name, networktype):
# address names are refined by network type to ensure they are
# unique across different address pools
name = '%s-%s' % (name, networktype)
address = pecan.request.dbapi.address_get_by_name(name)
address = get_primary_address_by_name(name, networktype, True)
return address.address
except exception.AddressNotFoundByName:
return None
@ -890,3 +892,217 @@ def is_host_lvg_updated(host_fs_list, host_lvg_list):
if last_resize < lvg['updated_at']:
return True
return False
class PopulateAddresses(object):
@staticmethod
def create_network_addresses(pool, network):
addresses = dict()
if network['type'] == constants.NETWORK_TYPE_MGMT:
addresses = PopulateAddresses._create_mgmt_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_ADMIN:
addresses = PopulateAddresses._create_admin_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_PXEBOOT:
addresses = PopulateAddresses._create_pxeboot_network_address()
elif network['type'] == constants.NETWORK_TYPE_CLUSTER_HOST:
addresses = PopulateAddresses._create_cluster_host_network_address()
elif network['type'] == constants.NETWORK_TYPE_OAM:
addresses = PopulateAddresses._create_oam_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_MULTICAST:
addresses = PopulateAddresses._create_multicast_network_address()
elif network['type'] == constants.NETWORK_TYPE_IRONIC:
addresses = PopulateAddresses._create_ironic_network_address()
elif network['type'] == constants.NETWORK_TYPE_SYSTEM_CONTROLLER:
addresses = PopulateAddresses._create_system_controller_network_address(pool)
elif network['type'] == constants.NETWORK_TYPE_STORAGE:
addresses = PopulateAddresses._create_storage_network_address()
return addresses
@staticmethod
def _create_mgmt_network_address(pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
else:
addresses.update({constants.CONTROLLER_HOSTNAME: None})
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
else:
addresses.update({constants.CONTROLLER_0_HOSTNAME: None})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
else:
addresses.update({constants.CONTROLLER_1_HOSTNAME: None})
if pool.gateway_address is not None:
if get_distributed_cloud_role() == \
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
# In subcloud configurations, the management gateway is used
# to communicate with the central cloud.
addresses[constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME] =\
pool.gateway_address
else:
addresses[constants.CONTROLLER_GATEWAY] =\
pool.gateway_address
return addresses
@staticmethod
def _create_admin_network_address(pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
else:
addresses.update({constants.CONTROLLER_HOSTNAME: None})
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
else:
addresses.update({constants.CONTROLLER_0_HOSTNAME: None})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
else:
addresses.update({constants.CONTROLLER_1_HOSTNAME: None})
if pool.gateway_address is not None:
if get_distributed_cloud_role() == \
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
# In subcloud configurations, the admin gateway is used
# to communicate with the central cloud.
addresses[constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME] =\
pool.gateway_address
else:
addresses[constants.CONTROLLER_GATEWAY] =\
pool.gateway_address
return addresses
@staticmethod
def _create_pxeboot_network_address():
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
@staticmethod
def _create_cluster_host_network_address():
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
@staticmethod
def _create_oam_network_address(pool):
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
if get_system_mode() != constants.SYSTEM_MODE_SIMPLEX:
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
if pool.gateway_address:
addresses.update(
{constants.CONTROLLER_GATEWAY: pool.gateway_address})
return addresses
@staticmethod
def _create_multicast_network_address():
addresses = collections.OrderedDict()
addresses[constants.SM_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.MTCE_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.PATCH_CONTROLLER_MULTICAST_MGMT_IP_NAME] = None
addresses[constants.PATCH_AGENT_MULTICAST_MGMT_IP_NAME] = None
return addresses
@staticmethod
def _create_system_controller_network_address(pool):
addresses = {}
return addresses
@staticmethod
def _create_ironic_network_address():
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
@staticmethod
def _create_storage_network_address():
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
return addresses
def get_primary_address_by_name(db_address_name, networktype, raise_exc=False):
"""Search address by database name to retrieve the relevant addres from
the primary pool, if multipĺe entries for the same name are found, the
query will use the network's pool_uuid to get the address family (IPv4 or
IPv6) related to the primary.
:param db_address_name: the address name in the database
:param networktype: the network type
:param raise_exc: raise AddressNotFoundByName instead of returning None
:return: the address object if found, None otherwise
"""
address = None
# first search directly by name
try:
address = pecan.request.dbapi.address_get_by_name(db_address_name)
except exception.AddressNotFoundByName():
# if there is no match by name return here
LOG.info(f"address {db_address_name} not found, returning")
if raise_exc:
raise exception.AddressNotFoundByName(name=db_address_name)
return address
# if there is a single entry, return here
if len(address) == 1:
return address[0]
# if there are more than one entry, it is dual-stack, search the primary pool
# to return the desired IP based on primary pool address family.
if len(address) > 1:
address = None
try:
# there only one network per type
networks = pecan.request.dbapi.networks_get_by_type(networktype)
if networks:
if networks[0].pool_uuid:
pool = pecan.request.dbapi.address_pool_get(networks[0].pool_uuid)
address = pecan.request.dbapi.address_get_by_name_and_family(db_address_name,
pool.family)
else:
LOG.info(f"cannot find network for type {networktype}")
except exception.AddressNotFoundByNameAndFamily:
LOG.info(f"cannot find address for name={db_address_name} with"
f" network type={networktype}")
pass
except Exception as ex:
LOG.info(f"get_primary_address_by_name general exception: {ex}")
if not address and raise_exc:
raise exception.AddressNotFoundByName(name=db_address_name)
return address

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
@ -682,6 +682,20 @@ class NetworkAlreadyExists(Conflict):
message = _("Network of type %(type)s already exists.")
class NetworkAddrpoolNotFound(NotFound):
message = _("Network addrpool %(network_addrpool_uuid)s could not be found.")
class NetworkAddrpoolAlreadyExists(Conflict):
message = _("Network addrpool with address pool ID %(address_pool_id)s "
"and network ID %(network_id)s already exists.")
class NetworkAddrpoolNetIdAndPoolIdNotFound(NotFound):
message = _("Network addrpool with address pool ID %(address_pool_id)s and "
"network ID %(network_id)s could not be found.")
class InterfaceNetworkNotFound(NotFound):
message = _("Interface network %(uuid)s could not be found.")
@ -726,6 +740,10 @@ class AddressNotFoundByName(NotFound):
message = _("Address could not be found for %(name)s")
class AddressNotFoundByNameAndFamily(NotFound):
message = _("Address could not be found for %(name)s and family %(family)s")
class AddressNotFoundByInterfacePool(NotFound):
message = _("Address could not be found for interface %(interface)s "
"pool %(pool)s")

View File

@ -1009,7 +1009,7 @@ def is_virtual():
def is_virtual_worker(ihost):
if not(os.path.isdir("/etc/sysinv/.virtual_worker_nodes")):
if not (os.path.isdir("/etc/sysinv/.virtual_worker_nodes")):
return False
try:
ip = ihost['mgmt_ip']
@ -1803,6 +1803,7 @@ def perform_distributed_cloud_config(dbapi, mgmt_iface_id):
# for local & reachable gateway etc, as config_subcloud
# will have already done these checks before allowing
# the system controller gateway into the database.
nettype = None
try:
# Prefer admin network
sc_network = dbapi.network_get_by_type(
@ -1810,18 +1811,18 @@ def perform_distributed_cloud_config(dbapi, mgmt_iface_id):
cc_gtwy_addr_name = '%s-%s' % (
constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME,
constants.NETWORK_TYPE_ADMIN)
nettype = constants.NETWORK_TYPE_ADMIN
except exception.NetworkTypeNotFound:
sc_network = dbapi.network_get_by_type(
constants.NETWORK_TYPE_MGMT)
cc_gtwy_addr_name = '%s-%s' % (
constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME,
constants.NETWORK_TYPE_MGMT)
nettype = constants.NETWORK_TYPE_MGMT
pass
try:
cc_gtwy_addr = dbapi.address_get_by_name(
cc_gtwy_addr_name)
except exception.AddressNotFoundByName:
cc_gtwy_addr = get_primary_address_by_name(dbapi, cc_gtwy_addr_name, nettype)
if not cc_gtwy_addr:
LOG.warning("DC Config: Failed to retrieve central "
"cloud gateway ip address")
return
@ -3699,6 +3700,61 @@ def is_bundle_extension_valid(filename):
return file_extension.lower() == ".tgz"
def get_primary_address_by_name(dbapi, db_address_name, networktype, raise_exc=False):
"""Search address by database name to retrieve the relevant addres from
the primary pool, if multipĺe entries for the same name are found, the
query will use the network's pool_uuid to get the address family (IPv4 or
IPv6) related to the primary.
:param dbapi: the database api reference
:param db_address_name: the address name in the database
:param networktype: the network type
:param raise_exc: raise AddressNotFoundByName instead of returning None
:return: the address object if found, None otherwise
"""
address = None
# first search directly by name
try:
address = dbapi.address_get_by_name(db_address_name)
except exception.AddressNotFoundByName():
# if there is no match by name return here
LOG.info(f"address {db_address_name} not found, returning")
if raise_exc:
raise exception.AddressNotFoundByName(name=db_address_name)
return address
# if there is a single entry, return here
if len(address) == 1:
return address[0]
# if there are more than one entry, it is dual-stack, search the primary pool
# to return the desired IP based on address family
if len(address) > 1:
address = None
try:
# there only one network per type
networks = dbapi.networks_get_by_type(networktype)
if networks:
if networks[0].pool_uuid:
pool = dbapi.address_pool_get(networks[0].pool_uuid)
address = dbapi.address_get_by_name_and_family(db_address_name,
pool.family)
else:
LOG.info(f"cannot find network for type {networktype}")
except exception.AddressNotFoundByNameAndFamily:
LOG.info(f"cannot find address for name={db_address_name} with"
f" network type={networktype}")
pass
except Exception as ex:
LOG.info(f"get_primary_address_by_name general exception: {str(ex)}")
if not address and raise_exc:
raise exception.AddressNotFoundByName(name=db_address_name)
return address
def update_config_file(config_filepath: str, values_to_update: list):
"""Update a config file with the desired information

View File

@ -2,7 +2,7 @@
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (C) 2019 Intel Corporation
# Copyright (c) 2021-2022 Wind River Systems, Inc.
# Copyright (c) 2021-2024 Wind River Systems, Inc.
#
"""
Sysinv Keystone notification listener.
@ -69,10 +69,10 @@ class NotificationEndpoint(object):
def get_transport_url():
try:
db_api = dbapi.get_instance()
network_object = db_api.address_get_by_name(
utils.format_address_name(constants.CONTROLLER_HOSTNAME,
addr_name = utils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT)
)
network_object = utils.get_primary_address_by_name(db_api, addr_name,
constants.NETWORK_TYPE_MGMT, True)
except Exception as e:
LOG.error("Failed to get management IP address: %s" % str(e))

View File

@ -663,11 +663,12 @@ class ConductorManager(service.PeriodicService):
return True
# get the controller-0 and floating management IP address
controller_0_address = self.dbapi.address_get_by_name(
constants.CONTROLLER_0_MGMT).address
floating_address = self.dbapi.address_get_by_name(
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT)).address
controller_0_address = cutils.get_primary_address_by_name(self.dbapi,
constants.CONTROLLER_0_MGMT,
constants.NETWORK_TYPE_MGMT, True).address
floating_address = cutils.get_primary_address_by_name(self.dbapi,
constants.CONTROLLER_FLOATING_MGMT,
constants.NETWORK_TYPE_MGMT, True).address
try:
cmd = ["/usr/bin/ceph_k8s_update_monitors.sh",
controller_0_address,
@ -1026,7 +1027,9 @@ class ConductorManager(service.PeriodicService):
# address names are refined by network type to ensure they are
# unique across different address pools
name = cutils.format_address_name(name, networktype)
address = self.dbapi.address_get_by_name(name)
address = cutils.get_primary_address_by_name(self.dbapi,
name, networktype,
True)
return address.address
except exception.AddressNotFoundByName:
return None
@ -1300,15 +1303,15 @@ class ConductorManager(service.PeriodicService):
self.dbapi.network_get_by_type(
constants.NETWORK_TYPE_PXEBOOT
)
address = self.dbapi.address_get_by_name(
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_PXEBOOT)
)
constants.NETWORK_TYPE_PXEBOOT),
constants.NETWORK_TYPE_PXEBOOT, True)
except exception.NetworkTypeNotFound:
address = self.dbapi.address_get_by_name(
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT)
)
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT, True)
addn_line = self._dnsmasq_addn_host_entry_to_string(
address.address, constants.PXECONTROLLER_HOSTNAME
)
@ -1356,10 +1359,10 @@ class ConductorManager(service.PeriodicService):
# Add pxecontroller to dnsmasq.hosts file
pxeboot_network = self.dbapi.network_get_by_type(
constants.NETWORK_TYPE_PXEBOOT)
address = self.dbapi.address_get_by_name(
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_PXEBOOT)
)
constants.NETWORK_TYPE_PXEBOOT),
constants.NETWORK_TYPE_PXEBOOT, True)
# This is the gateway address 169.254.202.1
LOG.info("%s: pxeboot gateway address: %s" % (
func, address.address))
@ -1972,9 +1975,12 @@ class ConductorManager(service.PeriodicService):
address = self.dbapi.address_get_by_address(ip_address)
address_uuid = address['uuid']
# If name is already set, return
if (self.dbapi.address_get_by_name(address_name) ==
address_uuid and iface_id is None):
return
search_addr = cutils.get_primary_address_by_name(self.dbapi,
address_name,
iface_type, True)
if search_addr:
if (search_addr.uuid == address_uuid and iface_id is None):
return
except exception.AddressNotFoundByAddress:
address_uuid = None
except exception.AddressNotFoundByName:
@ -2177,7 +2183,9 @@ class ConductorManager(service.PeriodicService):
if not interface_name:
return
address = self.dbapi.address_get_by_name(address_name)
address = cutils.get_primary_address_by_name(self.dbapi,
address_name,
network_type, True)
interface_id = address.interface_id
ip_address = address.address
@ -2202,7 +2210,9 @@ class ConductorManager(service.PeriodicService):
if network_type == constants.NETWORK_TYPE_MGMT:
self._remove_lease_for_address(hostname, network_type)
try:
address_uuid = self.dbapi.address_get_by_name(address_name).uuid
address_uuid = cutils.get_primary_address_by_name(self.dbapi,
address_name,
network_type, True).uuid
self.dbapi.address_remove_interface(address_uuid)
except exception.AddressNotFoundByName:
pass
@ -2213,7 +2223,9 @@ class ConductorManager(service.PeriodicService):
if network_type == constants.NETWORK_TYPE_MGMT:
self._remove_lease_for_address(hostname, network_type)
try:
address_uuid = self.dbapi.address_get_by_name(address_name).uuid
address_uuid = cutils.get_primary_address_by_name(self.dbapi,
address_name,
network_type, True).uuid
self.dbapi.address_destroy(address_uuid)
except exception.AddressNotFoundByName:
pass
@ -3506,27 +3518,23 @@ class ConductorManager(service.PeriodicService):
if set_address_interface:
if new_interface and 'id' in new_interface:
values = {'interface_id': new_interface['id']}
try:
addr_name = cutils.format_address_name(
ihost.hostname, new_interface_networktype)
address = self.dbapi.address_get_by_name(addr_name)
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(ihost.hostname, new_interface_networktype),
new_interface_networktype)
if address:
self.dbapi.address_update(address['uuid'], values)
except exception.AddressNotFoundByName:
pass
# Do any potential distributed cloud config
# We do this here where the interface is created.
cutils.perform_distributed_cloud_config(self.dbapi,
new_interface['id'])
if port:
values = {'interface_id': port.interface_id}
try:
addr_name = cutils.format_address_name(ihost.hostname,
networktype)
address = self.dbapi.address_get_by_name(addr_name)
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(ihost.hostname, networktype),
networktype)
if address:
if address['interface_id'] is None:
self.dbapi.address_update(address['uuid'], values)
except exception.AddressNotFoundByName:
pass
if ihost.invprovision not in [constants.PROVISIONED, constants.PROVISIONING, constants.UPGRADING]:
LOG.info("Updating %s host invprovision from %s to %s" %
@ -6532,12 +6540,11 @@ class ConductorManager(service.PeriodicService):
:returns: ihost object, including all fields.
"""
try:
name = cutils.format_address_name(name, networktype)
address = self.dbapi.address_get_by_name(name)
address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(name, networktype),
networktype)
if address:
return address.address
except exception.AddressNotFoundByName:
pass
LOG.info("RPC get_address_by_host_networktype called but found no address.")
@staticmethod
@ -8780,8 +8787,10 @@ class ConductorManager(service.PeriodicService):
try:
# remove IP address from DB
address_uuid = self.dbapi.address_get_by_name(address_name).uuid
self.dbapi.address_destroy(address_uuid)
address = cutils.get_primary_address_by_name(self.dbapi,
address_name,
constants.NETWORK_TYPE_MGMT, True)
self.dbapi.address_destroy(address.uuid)
LOG.info("{} removed from addresses DB".format(address_name))
except exception.AddressNotFoundByName:
LOG.info("exception: AddressNotFoundByName: {}".format(address_name))
@ -14071,10 +14080,10 @@ class ConductorManager(service.PeriodicService):
subprocess.check_call(['/usr/sbin/upgrade-start-pkg-extract', # pylint: disable=not-callable
'-r', to_version])
# get the floating management IP
mgmt_address = self.dbapi.address_get_by_name(
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT)
)
mgmt_address = cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT, True)
i_system = self.dbapi.isystem_get_one()
upgrades_management.prepare_upgrade(
from_version, to_version, i_system, mgmt_address.address)
@ -14149,7 +14158,9 @@ class ConductorManager(service.PeriodicService):
plat_nfs_address_name = cutils.format_address_name("controller-platform-nfs",
constants.NETWORK_TYPE_MGMT)
try:
self.dbapi.address_get_by_name(plat_nfs_address_name)
cutils.get_primary_address_by_name(self.dbapi,
plat_nfs_address_name,
constants.NETWORK_TYPE_MGMT, True)
LOG.info("platform-nfs-ip exists in the DB, updating all references")
self.update_platform_nfs_ip_references(context)
@ -14468,7 +14479,9 @@ class ConductorManager(service.PeriodicService):
hostname, constants.NETWORK_TYPE_MGMT)
try:
self.dbapi.address_get_by_name(address_name)
cutils.get_primary_address_by_name(self.dbapi,
cutils.format_address_name(hostname, constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT, True)
LOG.debug("Address %s already reserved, continuing." % address_name)
except exception.AddressNotFoundByName:
LOG.debug("Reserving address for %s." % address_name)
@ -14494,11 +14507,10 @@ class ConductorManager(service.PeriodicService):
network_type = constants.NETWORK_TYPE_MGMT
# Reserve new ip address, if not present
try:
self.dbapi.address_get_by_name(
self._get_cinder_address_name(network_type)
)
except exception.NotFound:
address = cutils.get_primary_address_by_name(self.dbapi,
self._get_cinder_address_name(network_type),
network_type)
if not address:
self._allocate_pool_address(None, network.pool_uuid,
self._get_cinder_address_name(network_type))

View File

@ -14,7 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
"""SQLAlchemy storage backend."""
@ -5165,6 +5165,94 @@ class Connection(api.Connection):
raise exception.NetworkNotFound(network_uuid=network_uuid)
query.delete()
def _network_addrpool_get(self, network_addrpool_uuid):
query = model_query(models.NetworkAddressPools)
query = add_identity_filter(query, network_addrpool_uuid)
try:
result = query.one()
except NoResultFound:
raise exception.NetworkAddrpoolNotFound(network_addrpool_uuid=network_addrpool_uuid)
return result
def _network_addrpoool_get_by_network_id(self, network_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
query = model_query(models.NetworkAddressPools)
query = query.filter_by(network_id=network_id)
return _paginate_query(models.NetworkAddressPools, limit, marker,
sort_key, sort_dir, query)
def _network_addrpoool_get_by_pool_id(self, pool_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
query = model_query(models.NetworkAddressPools)
query = query.filter_by(address_pool_id=pool_id)
return _paginate_query(models.NetworkAddressPools, limit, marker,
sort_key, sort_dir, query)
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_create(self, values):
if not values.get('uuid'):
values['uuid'] = uuidutils.generate_uuid()
network_addrpool = models.NetworkAddressPools(**values)
with _session_for_write() as session:
try:
session.add(network_addrpool)
session.flush()
except db_exc.DBDuplicateEntry:
raise exception.NetworkAddrpoolAlreadyExists(
address_pool_id=values['address_pool_id'],
network_id=values['network_id'])
return self._network_addrpool_get(values['uuid'])
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_get_all(self, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.NetworkAddressPools)
return _paginate_query(models.NetworkAddressPools, limit, marker,
sort_key, sort_dir, query)
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_get_by_network_id(self, network_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
return self._network_addrpoool_get_by_network_id(network_id, limit, marker,
sort_key, sort_dir)
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_get_by_pool_id(self, pool_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
return self._network_addrpoool_get_by_pool_id(pool_id, limit, marker,
sort_key, sort_dir)
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_query(self, values):
query = model_query(models.NetworkAddressPools)
query = (query.
filter(models.NetworkAddressPools.address_pool_id == values['address_pool_id']).
filter(models.NetworkAddressPools.network_id == values['network_id']))
try:
result = query.one()
except NoResultFound:
raise exception.NetworkAddrpoolNetIdAndPoolIdNotFound(
address_pool_id=values['address_pool_id'],
network_id=values['network_id'])
return result
@db_objects.objectify(objects.network_addrpool)
def network_addrpool_get(self, network_addrpool_uuid):
return self._network_addrpool_get(network_addrpool_uuid)
def network_addrpool_destroy(self, network_addrpool_uuid):
query = model_query(models.NetworkAddressPools)
query = add_identity_filter(query, network_addrpool_uuid)
try:
query.one()
except NoResultFound:
raise exception.NetworkAddrpoolNotFound(network_addrpool_uuid=network_addrpool_uuid)
query.delete()
def _interface_network_get(self, uuid):
query = model_query(models.InterfaceNetworks)
query = add_identity_filter(query, uuid)
@ -5219,6 +5307,14 @@ class Connection(api.Connection):
network_id=values['network_id'])
return result
def _interface_network_get_by_network_id(self, network_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
query = model_query(models.InterfaceNetworks)
query = query.filter_by(network_id=network_id)
return _paginate_query(models.InterfaceNetworks, limit, marker,
sort_key, sort_dir, query)
@db_objects.objectify(objects.interface_network)
def interface_network_create(self, values):
if not values.get('uuid'):
@ -5259,6 +5355,13 @@ class Connection(api.Connection):
return self._interface_network_get_by_interface(
interface_id, limit, marker, sort_key, sort_dir)
@db_objects.objectify(objects.interface_network)
def interface_network_get_by_network_id(self, network_id, limit=None,
marker=None, sort_key=None,
sort_dir=None):
return self._interface_network_get_by_network_id(network_id, limit, marker,
sort_key, sort_dir)
def interface_network_destroy(self, uuid):
query = model_query(models.InterfaceNetworks)
query = add_identity_filter(query, uuid)
@ -5310,13 +5413,23 @@ class Connection(api.Connection):
return self._address_get(address_uuid)
@db_objects.objectify(objects.address)
def address_get_by_name(self, name):
def address_get_by_name(self, name, limit=None, marker=None,
sort_key=None, sort_dir=None):
# this method can return a list of IPv4 and IPv6 addresses
query = model_query(models.Addresses)
query = query.filter_by(name=name)
return _paginate_query(models.Addresses, limit, marker,
sort_key, sort_dir, query)
@db_objects.objectify(objects.address)
def address_get_by_name_and_family(self, name, family):
query = model_query(models.Addresses)
query = query.filter_by(name=name, family=family)
try:
result = query.one()
except NoResultFound:
raise exception.AddressNotFoundByName(name=name)
raise exception.AddressNotFoundByNameAndFamily(name=name,
family=family)
return result
@db_objects.objectify(objects.address)

View File

@ -0,0 +1,42 @@
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from sqlalchemy import Column, MetaData, Table
from sqlalchemy import DateTime, Integer, String
from sqlalchemy import ForeignKey, UniqueConstraint
ENGINE = 'InnoDB'
CHARSET = 'utf8'
def upgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
Table('address_pools', meta, autoload=True)
networks = Table('networks', meta, autoload=True)
networks.create_column(Column('primary_pool_family', String(4)))
network_addrpool = Table(
'network_addresspools',
meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('deleted_at', DateTime),
Column('id', Integer, primary_key=True, nullable=False),
Column('uuid', String(36), unique=True),
Column('address_pool_id', Integer, ForeignKey('address_pools.id', ondelete='CASCADE')),
Column('network_id', Integer, ForeignKey('networks.id', ondelete='CASCADE')),
UniqueConstraint('network_id', 'address_pool_id', name='u_network_id@address_pool_id'),
mysql_engine=ENGINE,
mysql_charset=CHARSET,
)
network_addrpool.create()
def downgrade(migrate_engine):
raise NotImplementedError('SysInv databse downgrade is unsupported.')

View File

@ -1329,11 +1329,27 @@ class Networks(Base):
address_pool_id = Column(Integer,
ForeignKey('address_pools.id',
ondelete='CASCADE'),
nullable=False)
nullable=True)
primary_pool_family = Column(String(4))
address_pool = relationship("AddressPools", lazy="joined")
class NetworkAddressPools(Base):
__tablename__ = 'network_addresspools'
id = Column(Integer, primary_key=True, nullable=False)
uuid = Column(String(36), unique=True)
address_pool_id = Column(Integer, ForeignKey('address_pools.id', ondelete='CASCADE'))
network_id = Column(Integer, ForeignKey('networks.id', ondelete='CASCADE'))
address_pool = relationship("AddressPools", lazy="joined", backref="network_addresspools")
network = relationship("Networks", lazy="joined", backref="network_addresspools")
UniqueConstraint('network_id', 'address_pool_id', name='u_network_id@address_pool_id')
class InterfaceNetworks(Base):
__tablename__ = 'interface_networks'

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2018-2022 Wind River Systems, Inc.
# Copyright (c) 2018-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -263,7 +263,9 @@ class BaseHelm(object):
address_name = utils.format_address_name(name, networktype)
address = addresses.get(address_name)
if address is None:
address = self.dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(self.dbapi,
address_name,
networktype)
addresses[address_name] = address
return address

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
from sysinv.objects import address
@ -62,6 +62,7 @@ from sysinv.objects import lvg
from sysinv.objects import memory
from sysinv.objects import network
from sysinv.objects import network_oam
from sysinv.objects import network_addrpool
from sysinv.objects import node
from sysinv.objects import ntp
from sysinv.objects import pci_device
@ -203,6 +204,7 @@ restore = restore.Restore
kube_rootca_host_update = kube_rootca_host_update.KubeRootCAHostUpdate
kube_rootca_update = kube_rootca_update.KubeRootCAUpdate
runtime_config = runtime_config.RuntimeConfig
network_addrpool = network_addrpool.NetworkAddrpool
__all__ = ("system",
"cluster",
@ -290,6 +292,7 @@ __all__ = ("system",
"kube_rootca_host_update",
"kube_rootca_update",
"runtime_config",
"network_addrpool",
# alias objects for RPC compatibility
"ihost",
"ilvg")

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2015 Wind River Systems, Inc.
# Copyright (c) 2015-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -25,6 +25,7 @@ class Network(base.SysinvObject):
'type': utils.str_or_none,
'dynamic': utils.bool_or_none,
'pool_uuid': utils.uuid_or_none,
'primary_pool_family': utils.str_or_none,
}
_foreign_fields = {'pool_uuid': 'address_pool:uuid'}

View File

@ -0,0 +1,45 @@
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf-8
#
from sysinv.db import api as db_api
from sysinv.objects import base
from sysinv.objects import utils
class NetworkAddrpool(base.SysinvObject):
VERSION = '1.0'
dbapi = db_api.get_instance()
fields = {
'id': int,
'uuid': utils.uuid_or_none,
'address_pool_id': utils.int_or_none,
'address_pool_uuid': utils.uuid_or_none,
'address_pool_name': utils.str_or_none,
'network_id': utils.int_or_none,
'network_uuid': utils.uuid_or_none,
'network_name': utils.str_or_none,
'network_type': utils.str_or_none,
}
_foreign_fields = {
'address_pool_id': 'address_pool:id',
'address_pool_uuid': 'address_pool:uuid',
'address_pool_name': 'address_pool:name',
'network_uuid': 'network:uuid',
'network_id': 'network:id',
'network_name': 'network:name',
'network_type': 'network:type'
}
@base.remotable_classmethod
def get_by_uuid(cls, context, uuid):
return cls.dbapi.network_addrpool_get(uuid)

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2017-2023 Wind River Systems, Inc.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -176,7 +176,9 @@ class BasePuppet(object):
address_name = utils.format_address_name(name, networktype)
address = addresses.get(address_name)
if address is None:
address = self.dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(self.dbapi,
address_name,
networktype, True)
addresses[address_name] = address
return address

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2017-2023 Wind River Systems, Inc.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -1810,11 +1810,8 @@ def generate_unassigned_pxeboot_intf_config(context, config, db_api,
constants.NETWORK_TYPE_PXEBOOT)
if address_name:
address = None
try:
address = db_api.address_get_by_name(address_name)
except exception.AddressNotFoundByName:
LOG.info(f"cannot find address:{address_name} ")
address = utils.get_primary_address_by_name(db_api, address_name,
constants.NETWORK_TYPE_PXEBOOT)
if (address and net_config['method'] == 'static'):
addr_data = _set_address_netmask({'address': address.address,
'prefix': address.prefix})

View File

@ -1,5 +1,5 @@
# Copyright (c) 2017-2023 Wind River Systems, Inc.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -768,17 +768,14 @@ class PlatformFirewallPuppet(base.BasePuppet):
elif host.hostname == constants.CONTROLLER_1_HOSTNAME:
address_name = cutils.format_address_name(constants.CONTROLLER_1_HOSTNAME, net_type)
address = None
try:
address = self.dbapi.address_get_by_name(address_name)
except exception.AddressNotFoundByName:
LOG.info(f"cannot find address:{address_name} for net_type:{net_type} expectedIPs")
address = cutils.get_primary_address_by_name(self.dbapi, address_name, net_type)
if (address):
if ("expectedIPs" in host_endpoints["spec"].keys()):
host_endpoints["spec"]["expectedIPs"].append(str(address.address))
else:
host_endpoints["spec"].update({"expectedIPs": [str(address.address)]})
else:
LOG.info(f"cannot find address:{address_name} for net_type:{net_type} expectedIPs")
def _get_dc_role(dbapi):

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2017-2023 Wind River Systems, Inc.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -230,8 +230,9 @@ class PuppetOperator(object):
"""
Retrieve an address entry by name and scoped by network type
"""
address_name = utils.format_address_name(name, networktype)
address = self.dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(self.dbapi,
utils.format_address_name(name, networktype),
networktype, True)
return address
def _merge_host_config(self, host, target_load, config):

View File

@ -2,7 +2,7 @@
# -*- encoding: utf-8 -*-
#
#
# Copyright (c) 2013-2018 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -66,6 +66,20 @@ class InterfaceNetworkTestCase(base.FunctionalTest):
link_capacity=1000,
vlan_id=2,
address_pool_id=self.address_pool_mgmt.id)
self.mgmt_c0_address = dbutils.create_test_address(
family=constants.IPV4_FAMILY,
address='192.168.204.2',
prefix=24,
name='controller-0-mgmt',
address_pool_id=self.address_pool_mgmt.id)
self.controller['mgmt_ip'] = self.mgmt_c0_address.address
self.mgmt_w0_address = dbutils.create_test_address(
family=constants.IPV4_FAMILY,
address='192.168.204.3',
prefix=24,
name='worker-0-mgmt',
address_pool_id=self.address_pool_mgmt.id)
self.worker['mgmt_ip'] = self.mgmt_w0_address.address
self.address_pool_cluster_host = dbutils.create_test_address_pool(
id=2,
network='192.168.206.0',
@ -91,7 +105,7 @@ class InterfaceNetworkTestCase(base.FunctionalTest):
type=constants.NETWORK_TYPE_OAM,
address_pool_id=self.address_pool_oam.id)
self.oam_address = dbutils.create_test_address(
family=2,
family=constants.IPV4_FAMILY,
address='10.10.10.3',
prefix=24,
name='controller-0-oam',
@ -107,7 +121,7 @@ class InterfaceNetworkTestCase(base.FunctionalTest):
type=constants.NETWORK_TYPE_PXEBOOT,
address_pool_id=self.address_pool_pxeboot.id)
self.pxeboot_address = dbutils.create_test_address(
family=2,
family=constants.IPV4_FAMILY,
address='192.168.202.3',
prefix=24,
name='controller-0-pxeboot',

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023 Wind River Systems, Inc.
# Copyright (c) 2020-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -9,6 +9,7 @@ Tests for the API / network / methods.
"""
import mock
from six.moves import http_client
from oslo_utils import uuidutils
@ -41,6 +42,7 @@ class NetworkTestCase(base.FunctionalTest, dbbase.BaseHostTestCase):
'type',
'dynamic',
'pool_uuid',
'primary_pool_family'
]
# hidden_api_fields are attributes that should not be populated by
@ -55,7 +57,7 @@ class NetworkTestCase(base.FunctionalTest, dbbase.BaseHostTestCase):
def assert_fields(self, api_object):
# check the uuid is a uuid
assert(uuidutils.is_uuid_like(api_object['uuid']))
assert (uuidutils.is_uuid_like(api_object['uuid']))
# Verify that expected attributes are returned
for field in self.expected_api_fields:

View File

@ -0,0 +1,734 @@
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Tests for the API / network_addresspools / methods.
"""
import mock
import netaddr
from six.moves import http_client
from oslo_utils import uuidutils
from sysinv.common import constants
from sysinv.tests.api import base
from sysinv.tests.db import base as dbbase
from sysinv.tests.db import utils as dbutils
class NetworkAddrpoolTestCase(base.FunctionalTest, dbbase.BaseHostTestCase):
# API_HEADERS are a generic header passed to most API calls
API_HEADERS = {'User-Agent': 'sysinv-test'}
# API_PREFIX is the prefix for the URL
API_PREFIX = '/network_addresspools'
# RESULT_KEY is the python table key for the list of results
RESULT_KEY = 'network_addresspools'
# COMMON_FIELD is a field that is known to exist for inputs and outputs
COMMON_FIELD = 'network_uuid'
# expected_api_fields are attributes that should be populated by
# an API query
expected_api_fields = ['id',
'uuid',
'network_uuid',
'network_name',
'addresspool_uuid',
'addresspool_name',
]
# hidden_api_fields are attributes that should not be populated by
# an API query
hidden_api_fields = ['forihostid']
def setUp(self):
super(NetworkAddrpoolTestCase, self).setUp()
self.networks = dict()
network_table = dbutils.get_network_table()
for net in network_table:
self.networks.update({net.type: net})
self.create_ipv6_pools()
self.address_pools = dict()
address_pool_table = dbutils.get_address_pool_table()
for pool in address_pool_table:
self.address_pools.update({pool.name: pool})
def get_single_url(self, uuid):
return '%s/%s' % (self.API_PREFIX, uuid)
def get_single_network_url(self, uuid):
return '%s/%s' % ("/networks", uuid)
def assert_fields(self, api_object):
# check the uuid is a uuid
assert (uuidutils.is_uuid_like(api_object['uuid']))
# Verify that expected attributes are returned
for field in self.expected_api_fields:
self.assertIn(field, api_object)
# Verify that hidden attributes are not returned
for field in self.hidden_api_fields:
self.assertNotIn(field, api_object)
def get_post_object(self, network_uuid, address_pool_uuid):
net_pool_db = dbutils.get_post_network_addrpool(
address_pool_uuid=address_pool_uuid,
network_uuid=network_uuid
)
return net_pool_db
def create_ipv6_pools(self):
mgmt_subnet6 = netaddr.IPNetwork('fd01::/64')
oam_subnet6 = netaddr.IPNetwork('fd00::/64')
cluster_host_subnet6 = netaddr.IPNetwork('fd02::/64')
cluster_pod_subnet6 = netaddr.IPNetwork('fd03::/64')
cluster_service_subnet6 = netaddr.IPNetwork('fd04::/112')
multicast_subnet6 = netaddr.IPNetwork('ff08::1:1:0/124')
storage_subnet6 = netaddr.IPNetwork('fd05::/64')
admin_subnet6 = netaddr.IPNetwork('fd09::/64')
self._create_test_address_pool(name="management-ipv6", subnet=mgmt_subnet6)
self._create_test_address_pool(name="oam-ipv6", subnet=oam_subnet6)
self._create_test_address_pool(name="cluster-host-ipv6", subnet=cluster_host_subnet6)
self._create_test_address_pool(name="cluster-pod-ipv6", subnet=cluster_pod_subnet6)
self._create_test_address_pool(name="cluster-service-ipv6", subnet=cluster_service_subnet6)
self._create_test_address_pool(name="multicast-ipv6", subnet=multicast_subnet6)
self._create_test_address_pool(name="storage-ipv6", subnet=storage_subnet6)
self._create_test_address_pool(name="admin-ipv6", subnet=admin_subnet6)
def create_network(self, **kw):
network = dbutils.create_test_network(**kw)
return network
def _setup_context(self):
print("_setup_context")
self.host0 = self._create_test_host(personality=constants.CONTROLLER, unit=0,
id=1, mgmt_ip="1.1.1.1")
self.c0_oam_if = dbutils.create_test_interface(ifname='enp0s3', forihostid=self.host0.id)
dbutils.create_test_interface_network_type_assign(self.c0_oam_if.id,
constants.NETWORK_TYPE_OAM)
self.c0_mgmt_if = dbutils.create_test_interface(ifname='enp0s8', forihostid=self.host0.id)
dbutils.create_test_interface_network_type_assign(self.c0_mgmt_if.id,
constants.NETWORK_TYPE_MGMT)
dbutils.create_test_interface_network_type_assign(self.c0_mgmt_if.id,
constants.NETWORK_TYPE_CLUSTER_HOST)
self.host1 = self._create_test_host(personality=constants.CONTROLLER, unit=1,
id=2, mgmt_ip="1.1.1.2")
self.c1_oam_if = dbutils.create_test_interface(ifname='enp0s3', forihostid=self.host1.id)
dbutils.create_test_interface_network_type_assign(self.c1_oam_if.id,
constants.NETWORK_TYPE_OAM)
self.c1_mgmt_if = dbutils.create_test_interface(ifname='enp0s8',
forihostid=self.host1.id)
dbutils.create_test_interface_network_type_assign(self.c1_mgmt_if.id,
constants.NETWORK_TYPE_MGMT)
dbutils.create_test_interface_network_type_assign(self.c1_mgmt_if.id,
constants.NETWORK_TYPE_CLUSTER_HOST)
class TestPostMixin(NetworkAddrpoolTestCase):
def setUp(self):
super(TestPostMixin, self).setUp()
dbutils.cleanup_network_addrpool_table()
dbutils.cleanup_address_table()
def test_success_create_network_addrpool_primary(self):
self._setup_context()
# Test creation of object
net_type = constants.NETWORK_TYPE_MGMT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# Check that an expected field matches.
self.assertEqual(response.json['address_pool_name'], self.address_pools['management'].name)
self.assertEqual(response.json['address_pool_id'], self.address_pools['management'].id)
self.assertEqual(response.json['address_pool_uuid'], self.address_pools['management'].uuid)
self.assertEqual(response.json['network_name'], self.networks[net_type].name)
self.assertEqual(response.json['network_id'], self.networks[net_type].id)
self.assertEqual(response.json['network_uuid'], self.networks[net_type].uuid)
uuid = response.json['uuid']
# Verify that the object was created and some basic attribute matches
response = self.get_json(self.get_single_url(uuid))
self.assertEqual(response['address_pool_name'], self.address_pools['management'].name)
self.assertEqual(response['address_pool_id'], self.address_pools['management'].id)
self.assertEqual(response['address_pool_uuid'], self.address_pools['management'].uuid)
self.assertEqual(response['network_name'], self.networks[net_type].name)
self.assertEqual(response['network_id'], self.networks[net_type].id)
self.assertEqual(response['network_uuid'], self.networks[net_type].uuid)
addr_list = dbutils.get_address_table()
self.assertEqual(3, len(addr_list))
for addr in addr_list:
self.assertIn(addr.name,
[f"{constants.CONTROLLER_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}",
f"{constants.CONTROLLER_0_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}",
f"{constants.CONTROLLER_1_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}"])
if addr.name == f"{constants.CONTROLLER_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, None)
elif addr.name == f"{constants.CONTROLLER_0_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c0_mgmt_if.id)
elif addr.name == f"{constants.CONTROLLER_1_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c1_mgmt_if.id)
def test_success_create_network_addrpool_secondary(self):
self._setup_context()
# add primary
net_type = constants.NETWORK_TYPE_MGMT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add secondary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management-ipv6'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
uuid = response.json['uuid']
# Verify that the object was created and some basic attribute matches
response = self.get_json(self.get_single_url(uuid))
self.assertEqual(response['address_pool_name'], self.address_pools['management-ipv6'].name)
self.assertEqual(response['address_pool_id'], self.address_pools['management-ipv6'].id)
self.assertEqual(response['address_pool_uuid'], self.address_pools['management-ipv6'].uuid)
self.assertEqual(response['network_name'], self.networks[net_type].name)
self.assertEqual(response['network_id'], self.networks[net_type].id)
self.assertEqual(response['network_uuid'], self.networks[net_type].uuid)
addr_list = dbutils.get_address_table()
self.assertEqual(6, len(addr_list))
ip4_list = list()
ip6_list = list()
for addr in addr_list:
self.assertIn(addr.name,
[f"{constants.CONTROLLER_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}",
f"{constants.CONTROLLER_0_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}",
f"{constants.CONTROLLER_1_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}"])
if addr.name == f"{constants.CONTROLLER_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, None)
elif addr.name == f"{constants.CONTROLLER_0_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c0_mgmt_if.id)
elif addr.name == f"{constants.CONTROLLER_1_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c1_mgmt_if.id)
if addr.family == constants.IPV6_FAMILY:
ip6_list.append(addr)
elif addr.family == constants.IPV4_FAMILY:
ip4_list.append(addr)
self.assertEqual(3, len(ip4_list))
self.assertEqual(3, len(ip6_list))
def test_success_create_network_addrpool_secondary_oam(self):
self._setup_context()
# add primary
net_type = constants.NETWORK_TYPE_OAM
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['oam'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add secondary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['oam-ipv6'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
uuid = response.json['uuid']
# Verify that the object was created and some basic attribute matches
response = self.get_json(self.get_single_url(uuid))
self.assertEqual(response['address_pool_name'], self.address_pools['oam-ipv6'].name)
self.assertEqual(response['address_pool_id'], self.address_pools['oam-ipv6'].id)
self.assertEqual(response['address_pool_uuid'], self.address_pools['oam-ipv6'].uuid)
self.assertEqual(response['network_name'], self.networks[net_type].name)
self.assertEqual(response['network_id'], self.networks[net_type].id)
self.assertEqual(response['network_uuid'], self.networks[net_type].uuid)
def test_success_create_network_addrpool_primary_subcloud(self):
self._setup_context()
net_type = constants.NETWORK_TYPE_MGMT
p = mock.patch('sysinv.api.controllers.v1.utils.get_distributed_cloud_role')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD
self.addCleanup(p.stop)
mgmt_subnet = netaddr.IPNetwork('3001::/64')
ranges = [(str(mgmt_subnet[2]), str(mgmt_subnet[-2]))]
c0_address = dbutils.create_test_address(
name=f"{constants.CONTROLLER_0_HOSTNAME}-{net_type}",
family=mgmt_subnet.version, prefix=mgmt_subnet.prefixlen,
address="3001::3")
c1_address = dbutils.create_test_address(
name=f"{constants.CONTROLLER_1_HOSTNAME}-{net_type}",
family=mgmt_subnet.version, prefix=mgmt_subnet.prefixlen,
address="3001::4")
float_address = dbutils.create_test_address(
name=f"{constants.CONTROLLER_HOSTNAME}-{net_type}",
family=mgmt_subnet.version, prefix=mgmt_subnet.prefixlen,
address="3001::2")
gw_address = dbutils.create_test_address(
name=f"{constants.SYSTEM_CONTROLLER_GATEWAY_IP_NAME}-{net_type}",
family=mgmt_subnet.version, prefix=mgmt_subnet.prefixlen,
address="3001::1")
test_pool = dbutils.create_test_address_pool(
name="subcloud-mgmt-ipv6", network=str(mgmt_subnet.network),
family=mgmt_subnet.version, prefix=mgmt_subnet.prefixlen,
ranges=ranges,
floating_address=float_address.address,
controller0_address=c0_address.address,
controller1_address=c1_address.address,
gateway_address=gw_address.address)
# Test creation of object
ndict = self.get_post_object(self.networks[net_type].uuid,
test_pool.uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# Check that an expected field matches.
self.assertEqual(response.json['address_pool_name'], test_pool.name)
self.assertEqual(response.json['address_pool_id'], test_pool.id)
self.assertEqual(response.json['address_pool_uuid'], test_pool.uuid)
self.assertEqual(response.json['network_name'], self.networks[net_type].name)
self.assertEqual(response.json['network_id'], self.networks[net_type].id)
self.assertEqual(response.json['network_uuid'], self.networks[net_type].uuid)
uuid = response.json['uuid']
# Verify that the object was created and some basic attribute matches
response = self.get_json(self.get_single_url(uuid))
self.assertEqual(response['address_pool_name'], test_pool.name)
self.assertEqual(response['address_pool_id'], test_pool.id)
self.assertEqual(response['address_pool_uuid'], test_pool.uuid)
self.assertEqual(response['network_name'], self.networks[net_type].name)
self.assertEqual(response['network_id'], self.networks[net_type].id)
self.assertEqual(response['network_uuid'], self.networks[net_type].uuid)
addr_list = dbutils.get_address_table()
self.assertEqual(4, len(addr_list))
for addr in addr_list:
self.assertIn(addr.name,
[c0_address.name, c1_address.name, gw_address.name, float_address.name])
if (addr.name == gw_address.name):
self.assertEqual(gw_address.address, addr.address)
if addr.name == f"{constants.CONTROLLER_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, None)
elif addr.name == f"{constants.CONTROLLER_0_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c0_mgmt_if.id)
elif addr.name == f"{constants.CONTROLLER_1_HOSTNAME}-{constants.NETWORK_TYPE_MGMT}":
self.assertEqual(addr.interface_id, self.c1_mgmt_if.id)
def test_error_create_network_addrpool_secondary_same_family(self):
# add primary
net_type = constants.NETWORK_TYPE_MGMT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add secondary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['oam'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_create_pxeboot_network_addrpool_secondary_ipv6(self):
# add primary
net_type = constants.NETWORK_TYPE_PXEBOOT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['pxeboot'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add secondary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['oam-ipv6'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_create_network_addrpool_tertiary(self):
# add primary
net_type = constants.NETWORK_TYPE_MGMT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add secondary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management-ipv6'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# add tertiary
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['oam-ipv6'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_create_network_addrpool_primary_duplicate(self):
net_type = constants.NETWORK_TYPE_MGMT
ndict = self.get_post_object(self.networks[net_type].uuid,
self.address_pools['management'].uuid)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
class TestDelete(NetworkAddrpoolTestCase):
""" Tests deletion.
Typically delete APIs return NO CONTENT.
python2 and python3 libraries may return different
content_type (None, or empty json) when NO_CONTENT returned.
"""
def setUp(self):
super(TestDelete, self).setUp()
dbutils.cleanup_network_addrpool_table()
def test_error_delete_mgmt_network_addrpool_primary_aio_sx_config_complete(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_MGMT
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['management'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_mgmt_network_addrpool_primary_aio_dx_config_complete(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_DUPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_MGMT
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['management'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_success_delete_mgmt_network_addrpool_secondary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_MGMT
net_pool_1 = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['management'].id,
network_id=self.networks[net_type].id)
net_pool_2 = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['management-ipv6'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool_2.uuid),
headers=self.API_HEADERS)
self.assertEqual(response.status_code, http_client.NO_CONTENT)
# Test deletion of net_pool_2
response = self.get_json(self.get_single_url(net_pool_2.uuid),
expect_errors=True)
self.assertEqual(response.status_code, http_client.NOT_FOUND)
# Test presence of net_pool_1
response = self.get_json(self.get_single_url(net_pool_1.uuid),
expect_errors=True)
self.assertEqual(response.status_code, http_client.OK)
# check that pool_uuid is filled since it was the secondary pool
response = self.get_json(self.get_single_network_url(self.networks[net_type].uuid))
self.assertEqual(response['pool_uuid'], self.address_pools['management'].uuid)
self.assertEqual(response['type'], self.networks[net_type].type)
self.assertEqual(response['primary_pool_family'],
self.networks[net_type].primary_pool_family)
def test_error_delete_oam_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_OAM
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['oam'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_pxeboot_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_PXEBOOT
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['pxeboot'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_cluster_host_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_CLUSTER_HOST
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['cluster-host'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_cluster_pod_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_CLUSTER_HOST
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['cluster-pod'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_cluster_service_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_CLUSTER_SERVICE
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['cluster-service'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
def test_error_delete_storage_network_addrpool_primary(self):
p = mock.patch('sysinv.api.controllers.v1.utils.get_system_mode')
self.mock_utils_get_system_mode = p.start()
self.mock_utils_get_system_mode.return_value = constants.SYSTEM_MODE_SIMPLEX
self.addCleanup(p.stop)
p = mock.patch('sysinv.common.utils.is_initial_config_complete')
self.mock_utils_is_initial_config_complete = p.start()
self.mock_utils_is_initial_config_complete.return_value = True
self.addCleanup(p.stop)
net_type = constants.NETWORK_TYPE_STORAGE
net_pool = dbutils.create_test_network_addrpool(
address_pool_id=self.address_pools['storage'].id,
network_id=self.networks[net_type].id)
response = self.delete(self.get_single_url(net_pool.uuid),
headers=self.API_HEADERS,
expect_errors=True)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
class TestList(NetworkAddrpoolTestCase):
""" Network Addrpool list operations
"""
def setUp(self):
super(TestList, self).setUp()
def test_get_all(self):
response = self.get_json(self.API_PREFIX)
self.assertEqual(10, len(response[self.RESULT_KEY]))
def test_empty_list(self):
dbutils.cleanup_network_addrpool_table()
response = self.get_json(self.API_PREFIX)
self.assertEqual([], response[self.RESULT_KEY])
def test_get_list_with_one(self):
dbutils.cleanup_network_addrpool_table()
dbutils.create_test_network_addrpool(address_pool_id=1,
network_id=1)
response = self.get_json(self.API_PREFIX)
self.assertEqual(1, len(response[self.RESULT_KEY]))
class TestPatch(NetworkAddrpoolTestCase):
patch_path = '/dynamic'
patch_field = 'dynamic'
patch_value = False
def setUp(self):
super(TestPatch, self).setUp()
dbutils.cleanup_network_addrpool_table()
self.patch_object = dbutils.create_test_network_addrpool(address_pool_id=1,
network_id=1)
def test_patch_not_allowed(self):
# Try and patch an unmodifiable value
response = self.patch_json(self.get_single_url(self.patch_object.uuid),
[{'path': '/junk_field',
'value': self.patch_value,
'op': 'replace'}],
headers=self.API_HEADERS,
expect_errors=True)
# Verify the expected API response
self.assertEqual(response.content_type, 'application/json')
self.assertEqual(response.status_code, http_client.METHOD_NOT_ALLOWED)
self.assertIn("The method PATCH is not allowed for this resource.",
response.json['error_message'])

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -11,33 +11,29 @@ Test class for Sysinv Keystone notification listener.
import mock
from sysinv.conductor import keystone_listener
from sysinv.common import utils
from sysinv.common import constants
from sysinv.tests.db import base
from sysinv.db import api as dbapi
class KeystoneListenerTestCase(base.DbTestCase):
class KeystoneListenerTestCase(base.BaseSystemTestCase):
def test_get_transport_url(self):
class db_api_test(object):
@staticmethod
def get_instance():
return get_db()
class get_db(object):
def address_get_by_name(self, param1):
return get_network_ob()
class get_network_ob(object):
address = '192.168.101.1'
db_api = dbapi.get_instance()
network_object = utils.get_primary_address_by_name(db_api,
utils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT).address
class keyring_obj(object):
@staticmethod
def get_password(param1, param2):
return 'passwrd'
with mock.patch("sysinv.conductor.keystone_listener.dbapi", db_api_test):
with mock.patch("sysinv.conductor.keystone_listener.keyring", keyring_obj):
self.assertEqual(
keystone_listener.get_transport_url(),
"rabbit://guest:passwrd@192.168.101.1:5672"
)
with mock.patch("sysinv.conductor.keystone_listener.keyring", keyring_obj):
self.assertEqual(
keystone_listener.get_transport_url(),
f"rabbit://guest:passwrd@{network_object}:5672"
)

View File

@ -15,7 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -136,6 +136,7 @@ class BaseSystemTestCase(BaseIPv4Mixin, DbTestCase):
self.hosts = []
self.address_pools = []
self.networks = []
self.network_addrpools = []
self.datanetworks = []
self._create_test_common()
@ -152,6 +153,7 @@ class BaseSystemTestCase(BaseIPv4Mixin, DbTestCase):
self.hosts = []
self.address_pools = []
self.networks = []
self.network_addrpools = []
self.datanetworks = []
self.oam = None
@ -209,11 +211,22 @@ class BaseSystemTestCase(BaseIPv4Mixin, DbTestCase):
def _create_test_network(self, name, network_type, subnet, ranges=None):
address_pool_id = self._create_test_address_pool(name, subnet, ranges).id
primary_pool_family = ""
if not isinstance(subnet, netaddr.IPNetwork):
subnet = netaddr.IPNetwork(subnet)
primary_pool_family = constants.IP_FAMILIES[subnet.version]
network = dbutils.create_test_network(
type=network_type,
address_pool_id=address_pool_id)
address_pool_id=address_pool_id,
primary_pool_family=primary_pool_family)
self.networks.append(network)
network_addrpool = dbutils.create_test_network_addrpool(address_pool_id=address_pool_id,
network_id=network.id)
self.network_addrpools.append(network_addrpool)
return network
def _create_test_route(self, interface, gateway, family=4, network='10.10.10.0', prefix=24):
@ -232,7 +245,7 @@ class BaseSystemTestCase(BaseIPv4Mixin, DbTestCase):
self.datanetworks.append(datanetwork)
return datanetwork
def _create_test_address_pool(self, name, subnet, ranges=None):
def _create_test_address_pool(self, name, subnet, ranges=None, append=True):
if not ranges:
ranges = [(str(subnet[2]), str(subnet[-2]))]
floating_address = None
@ -252,7 +265,8 @@ class BaseSystemTestCase(BaseIPv4Mixin, DbTestCase):
floating_address=str(floating_address),
controller0_address=str(controller0_address),
controller1_address=str(controller1_address))
self.address_pools.append(pool)
if append:
self.address_pools.append(pool)
return pool
def _create_test_networks(self):
@ -399,8 +413,13 @@ class BaseHostTestCase(BaseSystemTestCase):
if personality == constants.CONTROLLER:
hostname = '%s-%s' % (personality, unit)
name = utils.format_address_name(hostname, constants.NETWORK_TYPE_MGMT)
address = self.dbapi.address_get_by_name(name)
mgmt_ipaddr = address.address
address = dbutils.get_primary_address_by_name(name, constants.NETWORK_TYPE_MGMT)
if address:
mgmt_ipaddr = address.address
else:
mgmt_ipaddr = kw.get("mgmt_ip", "0.0.0.0")
if 'mgmt_ip' in kw:
del kw['mgmt_ip']
host = dbutils.create_test_ihost(
uuid=uuidutils.generate_uuid(),

View File

@ -0,0 +1,186 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2013-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""Tests for manipulating Network objects via the DB API"""
# from oslo_utils import uuidutils
from sysinv.common import constants
from sysinv.common import exception
from sysinv.db import api as dbapi
from sysinv.tests.db import base
# from sysinv.tests.db import utils
import netaddr
class DbNetworkTestCaseIPv4(base.BaseHostTestCase):
def setUp(self):
super(DbNetworkTestCaseIPv4, self).setUp()
self.dbapi = dbapi.get_instance()
mgmt_subnet6 = netaddr.IPNetwork('fd01::/64')
oam_subnet6 = netaddr.IPNetwork('fd00::/64')
cluster_host_subnet6 = netaddr.IPNetwork('fd02::/64')
cluster_pod_subnet6 = netaddr.IPNetwork('fd03::/64')
cluster_service_subnet6 = netaddr.IPNetwork('fd04::/112')
multicast_subnet6 = netaddr.IPNetwork('ff08::1:1:0/124')
storage_subnet6 = netaddr.IPNetwork('fd05::/64')
admin_subnet6 = netaddr.IPNetwork('fd09::/64')
self._create_test_address_pool(name="management-ipv6", subnet=mgmt_subnet6)
self._create_test_address_pool(name="oam-ipv6", subnet=oam_subnet6)
self._create_test_address_pool(name="cluster-host-ipv6", subnet=cluster_host_subnet6)
self._create_test_address_pool(name="cluster-pod-ipv6", subnet=cluster_pod_subnet6)
self._create_test_address_pool(name="cluster-service-ipv6", subnet=cluster_service_subnet6)
self._create_test_address_pool(name="multicast-ipv6", subnet=multicast_subnet6)
self._create_test_address_pool(name="storage-ipv6", subnet=storage_subnet6)
self._create_test_address_pool(name="admin-ipv6", subnet=admin_subnet6)
def _db_dump(self):
print("==============================================================")
address_pools = self.dbapi.address_pools_get_all()
for pool in address_pools:
print(type(pool), vars(pool))
print("==============================================================")
networks = self.dbapi.networks_get_all()
for net in networks:
print(type(net), vars(net))
print("==============================================================")
addresses = self.dbapi.addresses_get_all()
for addr in addresses:
print(type(addr), vars(addr))
print("==============================================================")
net_pools = self.dbapi.network_addrpool_get_all()
for net_pool in net_pools:
print(type(net_pool), vars(net_pool))
print("==============================================================")
def test_network_addrpool_db(self):
to_add = [
(constants.NETWORK_TYPE_MGMT, ('management', 'management-ipv6')),
(constants.NETWORK_TYPE_OAM, ('oam', 'oam-ipv6')),
(constants.NETWORK_TYPE_ADMIN, ('admin', 'admin-ipv6')),
(constants.NETWORK_TYPE_CLUSTER_HOST, ('cluster-host', 'cluster-host-ipv6')),
(constants.NETWORK_TYPE_CLUSTER_POD, ('cluster-pod', 'cluster-pod-ipv6')),
(constants.NETWORK_TYPE_CLUSTER_SERVICE, ('cluster-service', 'cluster-service-ipv6')),
(constants.NETWORK_TYPE_STORAGE, ('storage', 'storage-ipv6'))
]
# test network_addrpool_create()
try:
for net_pool in to_add:
net = self.dbapi.network_get_by_type(net_pool[0])
pool6 = self.dbapi.address_pool_query({'name': net_pool[1][1]})
net_pool_obj = self.dbapi.network_addrpool_create({"address_pool_id": pool6.id,
"network_id": net.id})
self.assertEqual(net_pool_obj.address_pool_id, pool6.id)
self.assertEqual(net_pool_obj.network_id, net.id)
# network-addrpool objects already created in the parent test class
pool4 = self.dbapi.address_pool_query({'name': net_pool[1][0]})
net_pool_obj = self.dbapi.network_addrpool_query({"address_pool_id": pool4.id,
"network_id": net.id})
self.assertEqual(net_pool_obj.address_pool_id, pool4.id)
self.assertEqual(net_pool_obj.network_id, net.id)
except Exception as e:
print(e)
# test network_addrpool_get_all()
net_pools = self.dbapi.network_addrpool_get_all()
self.assertEqual(len(net_pools), 17)
# test network_addrpool_get_by_network_id()
net = self.dbapi.network_get_by_type(constants.NETWORK_TYPE_MGMT)
net_pools = self.dbapi.network_addrpool_get_by_network_id(net.id)
self.assertEqual(len(net_pools), 2)
self.assertEqual(net_pools[0].network_type, constants.NETWORK_TYPE_MGMT)
self.assertEqual(net_pools[0].address_pool_name, "management")
self.assertEqual(net_pools[1].network_type, constants.NETWORK_TYPE_MGMT)
self.assertEqual(net_pools[1].address_pool_name, "management-ipv6")
# test network_addrpool_get_by_pool_id()
pool4 = self.dbapi.address_pool_query({'name': 'management'})
net_pools = self.dbapi.network_addrpool_get_by_pool_id(pool4.id)
self.assertEqual(len(net_pools), 1)
self.assertEqual(net_pools[0].address_pool_id, pool4.id)
self.assertEqual(net_pools[0].address_pool_name, 'management')
# test network_addrpool_query()
net_pool_q = self.dbapi.network_addrpool_query({'address_pool_id': pool4.id,
'network_id': net.id})
self.assertEqual(net_pool_q.address_pool_id, pool4.id)
self.assertEqual(net_pool_q.network_id, net.id)
# test network_addrpool_get()
net_pool_q2 = self.dbapi.network_addrpool_get(net_pool_q.uuid)
self.assertEqual(net_pool_q.address_pool_id, net_pool_q2.address_pool_id)
self.assertEqual(net_pool_q.network_id, net_pool_q2.network_id)
# test network_addrpool_destroy()
pool6 = self.dbapi.address_pool_query({'name': 'management-ipv6'})
net_pool_d = self.dbapi.network_addrpool_query({'address_pool_id': pool6.id,
'network_id': net.id})
self.dbapi.network_addrpool_destroy(net_pool_d.uuid)
self.assertRaises(exception.NetworkAddrpoolNotFound,
self.dbapi.network_addrpool_get, net_pool_d.uuid)
# test invalid network id
self.assertEqual(self.dbapi.network_addrpool_get_by_network_id(1000), [])
# test invalid pool id
self.assertEqual(self.dbapi.network_addrpool_get_by_pool_id(1000), [])
# test duplicate network-addrpool entry
self.assertRaises(exception.NetworkAddrpoolAlreadyExists,
self.dbapi.network_addrpool_create, {"address_pool_id": pool4.id,
"network_id": net.id})
addr = self.dbapi.address_get_by_name_and_family('controller-mgmt',
constants.IPV4_FAMILY)
self.assertEqual(addr.name, 'controller-mgmt')
self.assertEqual(addr.family, constants.IPV4_FAMILY)
self.assertRaises(exception.AddressNotFoundByNameAndFamily,
self.dbapi.address_get_by_name_and_family,
'controller-mgmt', constants.IPV6_FAMILY)
self._create_test_addresses(hostnames=[constants.CONTROLLER_HOSTNAME],
subnet=netaddr.IPNetwork('fd01::/64'),
network_type=constants.NETWORK_TYPE_MGMT)
addr = self.dbapi.address_get_by_name_and_family('controller-mgmt',
constants.IPV6_FAMILY)
self.assertEqual(addr.name, 'controller-mgmt')
self.assertEqual(addr.family, constants.IPV6_FAMILY)
addresses = self.dbapi.address_get_by_name('controller-mgmt')
self.assertEqual(len(addresses), 2)
# check parameter delete cascading
new_net_pool6 = self.dbapi.network_addrpool_create({'address_pool_id': pool6.id,
'network_id': net.id})
self.dbapi.address_pool_destroy(pool6.uuid)
net_pool_query = self.dbapi.network_addrpool_get(new_net_pool6.uuid)
self.assertEqual(net_pool_query.address_pool_uuid, None)
self.assertEqual(net_pool_query.address_pool_name, None)
self.assertEqual(net_pool_query.address_pool_id, None)
self.assertEqual(net_pool_query.network_id, net.id)
self.assertEqual(net_pool_query.network_uuid, net.uuid)
self.assertEqual(net_pool_query.network_type, net.type)
self.dbapi.network_destroy(net.uuid)
net_pool_query = self.dbapi.network_addrpool_get(new_net_pool6.uuid)
self.assertEqual(net_pool_query.address_pool_uuid, None)
self.assertEqual(net_pool_query.address_pool_name, None)
self.assertEqual(net_pool_query.address_pool_id, None)
self.assertEqual(net_pool_query.network_id, None)
self.assertEqual(net_pool_query.network_uuid, None)
self.assertEqual(net_pool_query.network_type, None)

View File

@ -24,6 +24,7 @@ import uuid
from oslo_serialization import jsonutils as json
from oslo_utils import uuidutils
from sysinv.common import constants
from sysinv.common import exception
from sysinv.db import api as db_api
@ -785,6 +786,19 @@ def create_test_address(**kw):
return dbapi.address_create(address)
def cleanup_address_table():
dbapi = db_api.get_instance()
address_list = dbapi.addresses_get_all()
for addr in address_list:
dbapi.address_destroy(addr.uuid)
def get_address_table():
dbapi = db_api.get_instance()
address_list = dbapi.addresses_get_all()
return address_list
def get_test_route(**kw):
inv = {
'id': kw.get('id'),
@ -834,11 +848,66 @@ def get_test_network(**kw):
'uuid': kw.get('uuid'),
'type': kw.get('type'),
'dynamic': kw.get('dynamic', True),
'address_pool_id': kw.get('address_pool_id', None)
'address_pool_id': kw.get('address_pool_id', None),
'primary_pool_family': kw.get('primary_pool_family', None),
'name': kw.get('name', None)
}
return inv
def get_network_table():
dbapi = db_api.get_instance()
return dbapi.networks_get_all()
def get_test_network_addrpool(**kw):
inv = {
'id': kw.get('id'),
'uuid': kw.get('uuid'),
'address_pool_id': kw.get('address_pool_id'),
'network_id': kw.get('network_id'),
}
return inv
def get_post_network_addrpool(**kw):
inv = {
'address_pool_uuid': kw.get('address_pool_uuid'),
'network_uuid': kw.get('network_uuid'),
}
return inv
def create_test_network_addrpool(**kw):
"""Create test network-addrpool entry in DB and return NetworkAddresspool DB object.
Function to be used to create test NetworkAddresspool objects in the database.
:param kw: kwargs with overriding values for network-addrpool's attributes.
:returns: Test Network DB object.
"""
network_addrpool = get_test_network_addrpool(**kw)
# Let DB generate ID if it isn't specified explicitly
if 'id' not in kw:
del network_addrpool['id']
dbapi = db_api.get_instance()
return dbapi.network_addrpool_create(network_addrpool)
def cleanup_network_addrpool_table():
""" Clean up all existing elements in the network_addrpools table
"""
dbapi = db_api.get_instance()
network_addrpool_list = dbapi.network_addrpool_get_all()
for net_pool in network_addrpool_list:
network_addrpool_list = dbapi.network_addrpool_destroy(net_pool.uuid)
def get_address_pool_table():
""" Clean up all existing elements in the network_addrpools table
"""
dbapi = db_api.get_instance()
return dbapi.address_pools_get_all()
def get_test_icpu(**kw):
inv = {
'id': kw.get('id'),
@ -1342,6 +1411,20 @@ def create_test_interface_network_assign(interface_id, network_id):
return dbapi.interface_network_create(values)
def create_test_interface_network_type_assign(interface_id, network_type):
"""Create test interface-network entry in DB and return InterfaceNetwork
object. Function to be used to create test InterfaceNetwork objects in the database.
:param interface_id: interface object id.
:param network_type: network type.
:returns: Test Network DB object.
"""
dbapi = db_api.get_instance()
net = dbapi.network_get_by_type(network_type)
values = {'interface_id': interface_id,
'network_id': net.id}
return dbapi.interface_network_create(values)
def get_test_interface_network(**kw):
inv = {
'id': kw.get('id'),
@ -1729,3 +1812,18 @@ def create_test_kube_app(**kw):
kube_app = get_test_kube_app(**kw)
dbapi = db_api.get_instance()
return dbapi.kube_app_create(kube_app)
def get_primary_address_by_name(address_name, networktype):
dbapi = db_api.get_instance()
address = None
try:
networks = dbapi.networks_get_by_type(networktype)
if networks and networks[0].pool_uuid:
pool = dbapi.address_pool_get(networks[0].pool_uuid)
address = dbapi.address_get_by_name_and_family(address_name,
pool.family)
except exception.AddressNotFoundByNameAndFamily:
pass
return address

View File

@ -1,4 +1,4 @@
# Copyright (c) 2017-2023 Wind River Systems, Inc.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -462,12 +462,18 @@ class PlatformFirewallTestCaseMixin(base.PuppetTestCaseMixin):
nodename = self.host.hostname
hep_name = f'{nodename}-oam-if-hep'
if cutils.is_aio_simplex_system(db_api):
address = db_api.address_get_by_name("controller-oam")
addr_name = cutils.format_address_name(constants.CONTROLLER_HOSTNAME,
constants.NETWORK_TYPE_OAM)
address = dbutils.get_primary_address_by_name(addr_name,
constants.NETWORK_TYPE_OAM)
else:
if nodename == "controller-0":
address = db_api.address_get_by_name("controller-0-oam")
if nodename == "controller-1":
address = db_api.address_get_by_name("controller-1-oam")
addr_name = cutils.format_address_name(nodename, constants.NETWORK_TYPE_OAM)
if nodename == constants.CONTROLLER_0_HOSTNAME:
address = dbutils.get_primary_address_by_name(addr_name,
constants.NETWORK_TYPE_OAM)
if nodename == constants.CONTROLLER_1_HOSTNAME:
address = dbutils.get_primary_address_by_name(addr_name,
constants.NETWORK_TYPE_OAM)
self.assertTrue(address)
self.assertEqual(hep[hep_name]["spec"]["expectedIPs"], [str(address.address)])

View File

@ -1,4 +1,4 @@
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022-2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
@ -172,9 +172,10 @@ class ZmqRpcClient(object):
host_fqdn = constants.CONTROLLER_1_FQDN
endpoint = get_tcp_endpoint(host_fqdn, self.port)
else:
address_name = utils.format_address_name(host.hostname,
constants.NETWORK_TYPE_MGMT)
address = dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(dbapi,
utils.format_address_name(host.hostname,
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT, True)
endpoint = get_tcp_endpoint(address.address, self.port)
client = client_provider.get_client_for_endpoint(endpoint)
@ -238,9 +239,10 @@ class ZmqRpcClient(object):
host_fqdn = constants.CONTROLLER_1_FQDN
endpoint = get_tcp_endpoint(host_fqdn, self.port)
else:
address_name = utils.format_address_name(host.hostname,
constants.NETWORK_TYPE_MGMT)
address = dbapi.address_get_by_name(address_name)
address = utils.get_primary_address_by_name(dbapi,
utils.format_address_name(host.hostname,
constants.NETWORK_TYPE_MGMT),
constants.NETWORK_TYPE_MGMT, True)
endpoint = get_tcp_endpoint(address.address, self.port)
endpoints.append(endpoint)