distcloud/distributedcloud/dcmanager/tests/unit/audit/test_rpc_client.py

82 lines
3.1 KiB
Python

# Copyright (c) 2023 Wind River Systems, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
import oslo_messaging
from dcmanager.audit import rpcapi as rpc_client
from dcmanager.common import config
from dcmanager.common import consts
from dcmanager.common import messaging
from dcmanager.tests import base
from dcmanager.tests import utils
config.register_options()
class ManagerRpcAuditAPITestCase(base.DCManagerTestCase):
def setUp(self):
messaging.setup("fake://", optional=True)
self.addCleanup(messaging.cleanup)
self.context = utils.dummy_context()
super(ManagerRpcAuditAPITestCase, self).setUp()
def test_cast(self):
rpcapi = rpc_client.ManagerAuditWorkerClient()
transport = messaging.get_transport()
transport._send = mock.Mock()
fake_endpoints = {'service': 'fake_ip', 'service2': 'other_fake_ip'}
rpcapi.update_subcloud_endpoints(
self.context, 'subcloud', fake_endpoints)
exp_msg = {'method': 'update_subcloud_endpoints',
'args': {'subcloud_name': 'subcloud',
'endpoints': fake_endpoints},
'version': '1.0'}
# With fanout a new target is created
new_target = oslo_messaging.Target(
fanout=True, version=rpcapi.BASE_RPC_API_VERSION,
topic=consts.TOPIC_DC_MANAGER_AUDIT_WORKER)
transport._send.assert_called_with(new_target,
mock.ANY,
exp_msg,
retry=None,
transport_options=None)
# Without fanout the target is the same
rpcapi.audit_subclouds(
self.context, ['subcloud1', 'subcloud2'],
True, False, True, True, False, False)
exp_msg2 = {'method': 'audit_subclouds',
'args': {'subcloud_ids': ['subcloud1', 'subcloud2'],
'patch_audit_data': True,
'firmware_audit_data': False,
'kubernetes_audit_data': True,
'do_openstack_audit': True,
'kube_rootca_update_audit_data': False,
'software_audit_data': False},
'version': '1.0'}
transport._send.assert_called_with(rpcapi._client.target,
mock.ANY,
exp_msg2,
retry=None,
transport_options=None)