# # Copyright (c) 2023 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import copy import mock from oslo_utils import timeutils from dcmanagerclient.api.v1.peer_group_association_manager \ import PeerGroupAssociation as PeerAssociation from dcmanagerclient.commands.v1 import peer_group_association_manager \ as peer_group_association_cmd from dcmanagerclient.tests import base PEER_GROUP_ASSOCIATION_ID = "1" PEER_GROUP_ID = "2" SYSTEM_PEER_ID = "3" PG_GROUP_PRIORITY = "99" SYNC_STATUS = "synced" SYNC_MESSAGE = "None" PG_GROUP_PRIORITY_UPDATED = "1" TIME_NOW = timeutils.utcnow().isoformat() CREATED_AT = TIME_NOW UPDATED_AT = TIME_NOW PEER_GROUP_ASSOCIATION = PeerAssociation( mock, PEER_GROUP_ASSOCIATION_ID, PEER_GROUP_ID, SYSTEM_PEER_ID, PG_GROUP_PRIORITY, SYNC_STATUS, SYNC_MESSAGE, CREATED_AT, UPDATED_AT ) PEER_GROUP_ASSOCIATION_TUPLE = (PEER_GROUP_ASSOCIATION_ID, PEER_GROUP_ID, SYSTEM_PEER_ID, PG_GROUP_PRIORITY, SYNC_STATUS) PEER_GROUP_ASSOCIATION_DETAIL_TUPLE = \ PEER_GROUP_ASSOCIATION_TUPLE + (SYNC_MESSAGE, CREATED_AT, UPDATED_AT) PEER_GROUP_ASSOCIATION_TUPLE_UPDATED = (PEER_GROUP_ASSOCIATION_ID, PEER_GROUP_ID, SYSTEM_PEER_ID, PG_GROUP_PRIORITY_UPDATED, SYNC_STATUS, SYNC_MESSAGE, CREATED_AT, UPDATED_AT) class TestCLIPeerGroupAssociationV1(base.BaseCommandTest): def setUp(self): super(TestCLIPeerGroupAssociationV1, self).setUp() # The client is the peer_group_association_manager self.client = self.app.client_manager.peer_group_association_manager def test_list_peer_group_association(self): self.client.peer_group_association_manager.\ list_peer_group_associations.return_value =\ [PEER_GROUP_ASSOCIATION] actual_call = self.call(peer_group_association_cmd. ListPeerGroupAssociation) self.assertEqual([PEER_GROUP_ASSOCIATION_TUPLE], actual_call[1]) def test_list_peer_group_association_empty(self): self.client.peer_group_association_manager.\ list_peer_group_associations.return_value = [] actual_call = self.call(peer_group_association_cmd. ListPeerGroupAssociation) self.assertEqual((tuple('' for _ in range( len(PEER_GROUP_ASSOCIATION_TUPLE))),), actual_call[1]) def test_add_peer_group_association(self): self.client.peer_group_association_manager.add_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] actual_call = self.call( peer_group_association_cmd.AddPeerGroupAssociation, app_args=[ '--peer-group-id', PEER_GROUP_ID, '--system-peer-id', SYSTEM_PEER_ID, '--peer-group-priority', PG_GROUP_PRIORITY ]) self.assertEqual( PEER_GROUP_ASSOCIATION_DETAIL_TUPLE, actual_call[1]) def test_show_peer_group_association(self): self.client.peer_group_association_manager.\ peer_group_association_detail.return_value =\ [PEER_GROUP_ASSOCIATION] actual_call = self.call(peer_group_association_cmd. ShowPeerGroupAssociation, app_args=[PEER_GROUP_ASSOCIATION_ID]) self.assertEqual((PEER_GROUP_ASSOCIATION_ID, PEER_GROUP_ID, SYSTEM_PEER_ID, PG_GROUP_PRIORITY, SYNC_STATUS, SYNC_MESSAGE, CREATED_AT, UPDATED_AT), actual_call[1]) def test_show_peer_group_association_without_id(self): self.client.peer_group_association_manager.\ peer_group_association_detail.return_value = [] self.assertRaises(SystemExit, self.call, peer_group_association_cmd.ShowPeerGroupAssociation, app_args=[]) def test_delete_peer_group_association(self): self.call(peer_group_association_cmd.DeletePeerGroupAssociation, app_args=[PEER_GROUP_ASSOCIATION_ID]) self.client.peer_group_association_manager. \ delete_peer_group_association.\ assert_called_once_with(PEER_GROUP_ASSOCIATION_ID) def test_delete_peer_group_association_without_id(self): self.assertRaises(SystemExit, self.call, peer_group_association_cmd. DeletePeerGroupAssociation, app_args=[]) def test_update_peer_group_association(self): UPDATED_PEER_GROUP_ASSOCIATION = copy.copy(PEER_GROUP_ASSOCIATION) UPDATED_PEER_GROUP_ASSOCIATION.peer_group_priority =\ PG_GROUP_PRIORITY_UPDATED self.client.peer_group_association_manager.\ update_peer_group_association.\ return_value = [UPDATED_PEER_GROUP_ASSOCIATION] actual_call = self.call( peer_group_association_cmd.UpdatePeerGroupAssociation, app_args=[PEER_GROUP_ASSOCIATION_ID, '--peer-group-priority', PG_GROUP_PRIORITY_UPDATED]) self.assertEqual( (PEER_GROUP_ASSOCIATION_TUPLE_UPDATED), actual_call[1]) def test_update_peer_group_association_without_priority(self): self.client.peer_group_association_manager.\ update_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] self.assertRaises(SystemExit, self.call, peer_group_association_cmd. UpdatePeerGroupAssociation, app_args=[PEER_GROUP_ID]) def test_sync_peer_group_association(self): self.client.peer_group_association_manager.\ sync_peer_group_association.\ return_value = [PEER_GROUP_ASSOCIATION] actual_call = self.call( peer_group_association_cmd.SyncPeerGroupAssociation, app_args=[PEER_GROUP_ASSOCIATION_ID]) self.assertEqual( (PEER_GROUP_ASSOCIATION_DETAIL_TUPLE), actual_call[1])