253 lines
9.5 KiB
Python
253 lines
9.5 KiB
Python
# Copyright (c) 2024 Wind River Systems, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import mock
|
|
from oslo_service import threadgroup
|
|
from oslo_utils import uuidutils
|
|
|
|
from dcorch.common import consts
|
|
from dcorch.db.sqlalchemy import api as db_api
|
|
from dcorch.engine import initial_sync_worker_manager
|
|
from dcorch.tests import base
|
|
from dcorch.tests import utils
|
|
|
|
|
|
class FakeSyncObject(object):
|
|
def initial_sync(self):
|
|
pass
|
|
|
|
def enable(self):
|
|
pass
|
|
|
|
|
|
class FakeGSWM(object):
|
|
def __init__(self, ctx, engine_id):
|
|
self.ctx = ctx
|
|
self.engine_id = engine_id
|
|
|
|
def update_subcloud_state(self, ctx, subcloud_name, initial_sync_state):
|
|
db_api.subcloud_update(
|
|
ctx,
|
|
subcloud_name,
|
|
values={'initial_sync_state': initial_sync_state})
|
|
|
|
def create_sync_objects(self, subcloud_name, capabilities):
|
|
sync_objs = {}
|
|
endpoint_type_list = capabilities.get('endpoint_types', None)
|
|
if endpoint_type_list:
|
|
for endpoint_type in endpoint_type_list:
|
|
sync_obj = FakeSyncObject()
|
|
sync_objs.update({endpoint_type: sync_obj})
|
|
return sync_objs
|
|
|
|
def subcloud_state_matches(self, subcloud_name,
|
|
management_state=None,
|
|
availability_status=None,
|
|
initial_sync_state=None):
|
|
# compare subcloud states
|
|
match = True
|
|
sc = db_api.subcloud_get(self.ctx, subcloud_name)
|
|
if management_state is not None:
|
|
if sc.management_state != management_state:
|
|
match = False
|
|
if match and availability_status is not None:
|
|
if sc.availability_status != availability_status:
|
|
match = False
|
|
if match and initial_sync_state is not None:
|
|
if sc.initial_sync_state != initial_sync_state:
|
|
match = False
|
|
return match
|
|
|
|
|
|
class TestInitialSyncWorkerManager(base.OrchestratorTestCase):
|
|
def setUp(self):
|
|
super(TestInitialSyncWorkerManager, self).setUp()
|
|
self.engine_id = uuidutils.generate_uuid()
|
|
self.ctx = utils.dummy_context()
|
|
self.fake_gswm = FakeGSWM(self.ctx, self.engine_id)
|
|
|
|
# Mock eventlet
|
|
p = mock.patch('eventlet.greenthread.spawn_after')
|
|
self.mock_eventlet_spawn_after = p.start()
|
|
self.addCleanup(p.stop)
|
|
|
|
# Mock FernetKeyManager distribute_Keys
|
|
p = mock.patch(
|
|
'dcorch.engine.fernet_key_manager.FernetKeyManager.distribute_keys')
|
|
self.mock_distribute_keys = p.start()
|
|
self.addCleanup(p.stop)
|
|
|
|
# Mock db_api subcloud_sync_update
|
|
p = mock.patch('dcorch.db.api.subcloud_sync_update')
|
|
self.mock_subcloud_sync_update = p.start()
|
|
self.addCleanup(p.stop)
|
|
|
|
# Mock thread
|
|
p = mock.patch.object(threadgroup, 'Thread')
|
|
self.mock_thread = p.start()
|
|
self.addCleanup(p.stop)
|
|
|
|
# Mock ThreadGroupManager start
|
|
p = mock.patch('dcorch.engine.scheduler.ThreadGroupManager.start')
|
|
self.mock_thread_start = p.start()
|
|
self.mock_thread_start.return_value = self.mock_thread
|
|
self.addCleanup(p.stop)
|
|
|
|
def test_init(self):
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
self.assertIsNotNone(iswm)
|
|
|
|
def test_initial_sync_subcloud(self):
|
|
subcloud = utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud1',
|
|
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
|
|
self.assertIsNotNone(subcloud)
|
|
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
|
|
# Initial sync the subcloud
|
|
iswm._initial_sync_subcloud(self.ctx,
|
|
subcloud.region_name,
|
|
base.CAPABILITES)
|
|
|
|
self.mock_distribute_keys.assert_called_once()
|
|
|
|
# Verify subcloud_sync_update called twice due to two endpoint types
|
|
self.assertEqual(2, self.mock_subcloud_sync_update.call_count)
|
|
|
|
# Verify the initial sync was completed
|
|
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
|
|
self.assertEqual(subcloud.initial_sync_state,
|
|
consts.INITIAL_SYNC_STATE_COMPLETED)
|
|
|
|
def test_initial_sync_subcloud_not_required(self):
|
|
subcloud = utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud1',
|
|
initial_sync_state='')
|
|
self.assertIsNotNone(subcloud)
|
|
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
|
|
iswm.initial_sync = mock.MagicMock()
|
|
|
|
# Initial sync the subcloud
|
|
iswm._initial_sync_subcloud(self.ctx,
|
|
subcloud.region_name,
|
|
base.CAPABILITES)
|
|
|
|
# Verify that the initial sync steps were not done
|
|
iswm.initial_sync.assert_not_called()
|
|
self.mock_distribute_keys.assert_not_called()
|
|
self.mock_subcloud_sync_update.assert_not_called()
|
|
|
|
# Verify the initial sync state was not changed
|
|
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
|
|
self.assertEqual(subcloud.initial_sync_state, '')
|
|
|
|
def test_initial_sync_subcloud_failed(self):
|
|
subcloud = utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud1',
|
|
initial_sync_state=consts.INITIAL_SYNC_STATE_REQUESTED)
|
|
self.assertIsNotNone(subcloud)
|
|
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
|
|
iswm.enable_subcloud = mock.MagicMock()
|
|
# Force a failure
|
|
self.mock_distribute_keys.side_effect = Exception('fake_exception')
|
|
|
|
# Initial sync the subcloud
|
|
iswm._initial_sync_subcloud(self.ctx,
|
|
subcloud.region_name,
|
|
base.CAPABILITES)
|
|
|
|
# Verify the initial sync was failed
|
|
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
|
|
self.assertEqual(subcloud.initial_sync_state,
|
|
consts.INITIAL_SYNC_STATE_FAILED)
|
|
|
|
# Verify that the subcloud was not enabled
|
|
iswm.enable_subcloud.assert_not_called()
|
|
|
|
# Verify the initial sync was retried
|
|
self.mock_eventlet_spawn_after.assert_called_with(
|
|
initial_sync_worker_manager.SYNC_FAIL_HOLD_OFF, mock.ANY, 'subcloud1')
|
|
|
|
def test_reattempt_sync(self):
|
|
utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud1',
|
|
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
|
|
utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud2',
|
|
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED)
|
|
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
|
|
# Reattempt sync success
|
|
iswm._reattempt_sync('subcloud2')
|
|
|
|
# Verify the subcloud is in the correct initial sync state
|
|
subcloud = db_api.subcloud_get(self.ctx, 'subcloud2')
|
|
self.assertEqual(subcloud.initial_sync_state,
|
|
consts.INITIAL_SYNC_STATE_REQUESTED)
|
|
|
|
# Reattempt sync when not needed
|
|
iswm._reattempt_sync('subcloud1')
|
|
|
|
# Verify the subcloud is in the correct initial sync state
|
|
subcloud = db_api.subcloud_get(self.ctx, 'subcloud1')
|
|
self.assertEqual(subcloud.initial_sync_state,
|
|
consts.INITIAL_SYNC_STATE_NONE)
|
|
|
|
def test_initial_sync_subclouds(self):
|
|
subcloud1 = utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud1',
|
|
initial_sync_state='')
|
|
subcloud2 = utils.create_subcloud_static(
|
|
self.ctx,
|
|
name='subcloud2',
|
|
initial_sync_state='')
|
|
subcloud_capabilities = {subcloud1.region_name: base.CAPABILITES,
|
|
subcloud2.region_name: base.CAPABILITES}
|
|
|
|
iswm = initial_sync_worker_manager.InitialSyncWorkerManager(
|
|
self.fake_gswm, self.engine_id)
|
|
|
|
iswm.initial_sync_subclouds(self.ctx, subcloud_capabilities)
|
|
|
|
# Verify 2 threads started, one for each of the subcloud
|
|
self.mock_thread_start.assert_any_call(iswm._initial_sync_subcloud,
|
|
mock.ANY,
|
|
subcloud1.region_name,
|
|
subcloud_capabilities.get(
|
|
subcloud1.region_name))
|
|
self.mock_thread_start.assert_called_with(iswm._initial_sync_subcloud,
|
|
mock.ANY,
|
|
subcloud2.region_name,
|
|
subcloud_capabilities.get(
|
|
subcloud2.region_name))
|
|
self.assertEqual(2, self.mock_thread.wait.call_count)
|