178 lines
5.5 KiB
Python
178 lines
5.5 KiB
Python
#
|
|
# Copyright (c) 2023-2024 Wind River Systems, Inc.
|
|
#
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
#
|
|
|
|
import mock
|
|
|
|
from dcmanager.common import consts
|
|
from dcmanager.orchestrator.states.base import BaseState
|
|
from dcmanager.orchestrator.states.software.finish_strategy import \
|
|
FinishStrategyState
|
|
from dcmanager.tests.unit.orchestrator.states.software.test_base import \
|
|
TestSoftwareOrchestrator
|
|
|
|
|
|
REGION_ONE_RELEASES = {
|
|
"DC.1": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
},
|
|
"DC.2": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
},
|
|
"DC.3": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
},
|
|
"DC.8": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
}
|
|
}
|
|
|
|
SUBCLOUD_RELEASES = {
|
|
"DC.1": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
},
|
|
"DC.2": {
|
|
"sw_version": "20.12",
|
|
"state": "committed"
|
|
},
|
|
"DC.3": {
|
|
"sw_version": "20.12",
|
|
"state": "deployed"
|
|
},
|
|
"DC.9": {
|
|
"sw_version": "20.12",
|
|
"state": "available"
|
|
},
|
|
"DC.10": {
|
|
"sw_version": "20.12",
|
|
"state": "deployed"
|
|
}
|
|
}
|
|
|
|
|
|
class TestFinishStrategyState(TestSoftwareOrchestrator):
|
|
def setUp(self):
|
|
super().setUp()
|
|
|
|
self._mock_read_from_cache(FinishStrategyState)
|
|
|
|
self.on_success_state = consts.STRATEGY_STATE_COMPLETE
|
|
|
|
# Add the subcloud being processed by this unit test
|
|
self.subcloud = self.setup_subcloud()
|
|
|
|
# Add the strategy_step state being processed by this unit test
|
|
self.strategy_step = self.setup_strategy_step(
|
|
self.subcloud.id, consts.STRATEGY_STATE_SW_FINISH_STRATEGY)
|
|
|
|
# Add mock API endpoints for software client calls
|
|
# invoked by this state
|
|
self.software_client.query = mock.MagicMock()
|
|
self.software_client.delete = mock.MagicMock()
|
|
self.software_client.commit_patch = mock.MagicMock()
|
|
self._read_from_cache = mock.MagicMock()
|
|
|
|
def test_finish_strategy_success(self):
|
|
"""Test software finish strategy when the API call succeeds."""
|
|
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = [SUBCLOUD_RELEASES]
|
|
|
|
# invoke the strategy state operation on the orch thread
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
call_args, _ = self.software_client.delete.call_args_list[0]
|
|
self.assertItemsEqual(['DC.9'], call_args[0])
|
|
|
|
call_args, _ = self.software_client.commit_patch.call_args_list[0]
|
|
self.assertItemsEqual(['DC.3'], call_args[0])
|
|
|
|
# On success, the state should transition to the next state
|
|
self.assert_step_updated(self.strategy_step.subcloud_id,
|
|
self.on_success_state)
|
|
|
|
def test_finish_strategy_no_operation_required(self):
|
|
"""Test software finish strategy when no operation is required."""
|
|
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = [REGION_ONE_RELEASES]
|
|
|
|
# invoke the strategy state operation on the orch thread
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
self.software_client.delete.assert_not_called()
|
|
|
|
self.software_client.commit_patch.assert_not_called()
|
|
|
|
# On success, the state should transition to the next state
|
|
self.assert_step_updated(self.strategy_step.subcloud_id,
|
|
self.on_success_state)
|
|
|
|
def test_finish_strategy_fails_when_query_exception(self):
|
|
"""Test finish strategy fails when software client query raises exception"""
|
|
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = Exception()
|
|
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
self.assert_step_updated(
|
|
self.strategy_step.subcloud_id, consts.STRATEGY_STATE_FAILED
|
|
)
|
|
|
|
def test_finish_strategy_fails_when_delete_exception(self):
|
|
"""Test finish strategy fails when software client delete raises exception"""
|
|
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = [SUBCLOUD_RELEASES]
|
|
self.software_client.delete.side_effect = Exception()
|
|
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
self.assert_step_updated(
|
|
self.strategy_step.subcloud_id, consts.STRATEGY_STATE_FAILED
|
|
)
|
|
|
|
@mock.patch.object(BaseState, 'stopped')
|
|
def test_finish_strategy_fails_when_stopped(self, mock_base_stopped):
|
|
"""Test finish strategy fails when stopped"""
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = [SUBCLOUD_RELEASES]
|
|
|
|
mock_base_stopped.return_value = True
|
|
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
self.assert_step_updated(
|
|
self.strategy_step.subcloud_id, consts.STRATEGY_STATE_FAILED
|
|
)
|
|
|
|
def test_finish_strategy_fails_when_commit_patch_exception(self):
|
|
"""Test finish strategy fails when software client commit_patch
|
|
|
|
raises exception
|
|
"""
|
|
|
|
self.mock_read_from_cache.return_value = REGION_ONE_RELEASES
|
|
|
|
self.software_client.query.side_effect = [SUBCLOUD_RELEASES]
|
|
self.software_client.commit_patch.side_effect = Exception()
|
|
|
|
self.worker.perform_state_action(self.strategy_step)
|
|
|
|
self.assert_step_updated(
|
|
self.strategy_step.subcloud_id, consts.STRATEGY_STATE_FAILED
|
|
)
|