Merge "Minor dcmanager unit tests cleanup"

This commit is contained in:
Zuul 2021-10-27 21:40:08 +00:00 committed by Gerrit Code Review
commit 73c2c164f4
6 changed files with 47 additions and 147 deletions

View File

@ -102,6 +102,7 @@ class DCManagerTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(DCManagerTestCase, self).setUp() super(DCManagerTestCase, self).setUp()
self.setup_dummy_db() # register cleanup of DB before setup, in case setup fails
self.addCleanup(self.reset_dummy_db) self.addCleanup(self.reset_dummy_db)
self.setup_dummy_db()
self.ctx = utils.dummy_context() self.ctx = utils.dummy_context()

View File

@ -16,42 +16,18 @@
# of this software may be licensed only pursuant to the terms # of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement. # of an applicable Wind River license agreement.
# #
import sqlalchemy
from oslo_config import cfg
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
from oslo_db import options
from dcmanager.common import consts from dcmanager.common import consts
from dcmanager.common import exceptions as exception from dcmanager.common import exceptions as exception
from dcmanager.db import api as api from dcmanager.db import api as api
from dcmanager.db.sqlalchemy import api as db_api from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.tests import base from dcmanager.tests import base
from dcmanager.tests import utils
get_engine = api.get_engine get_engine = api.get_engine
class DBAPISubcloudAlarm(base.DCManagerTestCase): class DBAPISubcloudAlarm(base.DCManagerTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
@staticmethod @staticmethod
def create_subcloud_alarms(ctxt, name): def create_subcloud_alarms(ctxt, name):
@ -64,74 +40,71 @@ class DBAPISubcloudAlarm(base.DCManagerTestCase):
def setUp(self): def setUp(self):
super(DBAPISubcloudAlarm, self).setUp() super(DBAPISubcloudAlarm, self).setUp()
# calling setUp for the superclass sets up the DB and context
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctxt = utils.dummy_context()
def test_subcloud_alarms_create(self): def test_subcloud_alarms_create(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEqual(result['name'], 'subcloud1') self.assertEqual(result['name'], 'subcloud1')
self.assertEqual(result['cloud_status'], 'disabled') self.assertEqual(result['cloud_status'], 'disabled')
def test_subcloud_alarms_create_duplicate(self): def test_subcloud_alarms_create_duplicate(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertRaises(db_exception.DBDuplicateEntry, self.assertRaises(db_exception.DBDuplicateEntry,
self.create_subcloud_alarms, self.create_subcloud_alarms,
self.ctx, 'subcloud1') self.ctx, 'subcloud1')
def test_subcloud_alarms_get(self): def test_subcloud_alarms_get(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
subcloud = db_api.subcloud_alarms_get(self.ctxt, 'subcloud1') subcloud = db_api.subcloud_alarms_get(self.ctx, 'subcloud1')
self.assertIsNotNone(subcloud) self.assertIsNotNone(subcloud)
self.assertEqual(subcloud['name'], 'subcloud1') self.assertEqual(subcloud['name'], 'subcloud1')
def test_subcloud_alarms_get_not_found(self): def test_subcloud_alarms_get_not_found(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertRaises(exception.SubcloudNameNotFound, self.assertRaises(exception.SubcloudNameNotFound,
db_api.subcloud_alarms_get, db_api.subcloud_alarms_get,
self.ctx, 'subcloud2') self.ctx, 'subcloud2')
def test_subcloud_alarms_get_all(self): def test_subcloud_alarms_get_all(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
result = self.create_subcloud_alarms(self.ctxt, 'subcloud2') result = self.create_subcloud_alarms(self.ctx, 'subcloud2')
self.assertIsNotNone(result) self.assertIsNotNone(result)
subclouds = db_api.subcloud_alarms_get_all(self.ctxt) subclouds = db_api.subcloud_alarms_get_all(self.ctx)
self.assertEqual(len(subclouds), 2) self.assertEqual(len(subclouds), 2)
self.assertEqual(subclouds[0]['name'], 'subcloud2') self.assertEqual(subclouds[0]['name'], 'subcloud2')
self.assertEqual(subclouds[1]['name'], 'subcloud1') self.assertEqual(subclouds[1]['name'], 'subcloud1')
def test_subcloud_alarms_get_one(self): def test_subcloud_alarms_get_one(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
result = self.create_subcloud_alarms(self.ctxt, 'subcloud2') result = self.create_subcloud_alarms(self.ctx, 'subcloud2')
self.assertIsNotNone(result) self.assertIsNotNone(result)
subclouds = db_api.subcloud_alarms_get_all(self.ctxt, 'subcloud1') subclouds = db_api.subcloud_alarms_get_all(self.ctx, 'subcloud1')
self.assertEqual(subclouds[0]['name'], 'subcloud1') self.assertEqual(subclouds[0]['name'], 'subcloud1')
def test_subcloud_alarms_update(self): def test_subcloud_alarms_update(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
values = {'critical_alarms': 0, values = {'critical_alarms': 0,
'major_alarms': 1, 'major_alarms': 1,
'minor_alarms': 2, 'minor_alarms': 2,
'warnings': 3, 'warnings': 3,
'cloud_status': consts.ALARM_DEGRADED_STATUS} 'cloud_status': consts.ALARM_DEGRADED_STATUS}
result = db_api.subcloud_alarms_update(self.ctxt, 'subcloud1', values) result = db_api.subcloud_alarms_update(self.ctx, 'subcloud1', values)
self.assertIsNotNone(result) self.assertIsNotNone(result)
self.assertEqual(result['major_alarms'], 1) self.assertEqual(result['major_alarms'], 1)
subcloud = db_api.subcloud_alarms_get(self.ctxt, 'subcloud1') subcloud = db_api.subcloud_alarms_get(self.ctx, 'subcloud1')
self.assertIsNotNone(subcloud) self.assertIsNotNone(subcloud)
self.assertEqual(subcloud['major_alarms'], 1) self.assertEqual(subcloud['major_alarms'], 1)
def test_subcloud_alarms_delete(self): def test_subcloud_alarms_delete(self):
result = self.create_subcloud_alarms(self.ctxt, 'subcloud1') result = self.create_subcloud_alarms(self.ctx, 'subcloud1')
self.assertIsNotNone(result) self.assertIsNotNone(result)
db_api.subcloud_alarms_delete(self.ctxt, 'subcloud1') db_api.subcloud_alarms_delete(self.ctx, 'subcloud1')
subclouds = db_api.subcloud_alarms_get_all(self.ctxt) subclouds = db_api.subcloud_alarms_get_all(self.ctx)
self.assertEqual(len(subclouds), 0) self.assertEqual(len(subclouds), 0)

View File

@ -20,27 +20,17 @@
import datetime import datetime
import sqlalchemy import sqlalchemy
from oslo_config import cfg
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
from oslo_db import options
from dcmanager.common import exceptions as exception from dcmanager.common import exceptions as exception
from dcmanager.db import api as api from dcmanager.db import api as api
from dcmanager.db.sqlalchemy import api as db_api from dcmanager.db.sqlalchemy import api as db_api
from dcmanager.tests import base from dcmanager.tests import base
from dcmanager.tests import utils
get_engine = api.get_engine get_engine = api.get_engine
class DBAPISubcloudAuditsTest(base.DCManagerTestCase): class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
@staticmethod @staticmethod
def create_subcloud(ctxt, name, **kwargs): def create_subcloud(ctxt, name, **kwargs):
@ -61,27 +51,13 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
values.update(kwargs) values.update(kwargs)
return db_api.subcloud_create(ctxt, **values) return db_api.subcloud_create(ctxt, **values)
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
def setUp(self): def setUp(self):
super(DBAPISubcloudAuditsTest, self).setUp() super(DBAPISubcloudAuditsTest, self).setUp()
# calling setUp for the superclass sets up the DB and context
self.setup_dummy_db() self.create_subcloud(self.ctx, "subcloud1")
self.addCleanup(self.reset_dummy_db) self.create_subcloud(self.ctx, "subcloud2")
self.ctxt = utils.dummy_context() self.create_subcloud(self.ctx, "subcloud3")
# Create some subclouds
self.create_subcloud(self.ctxt, "subcloud1")
self.create_subcloud(self.ctxt, "subcloud2")
self.create_subcloud(self.ctxt, "subcloud3")
def test_subcloud_audits_get(self): def test_subcloud_audits_get(self):
# Test the SubcloudAudits created when we created subcloud2 in setup. # Test the SubcloudAudits created when we created subcloud2 in setup.
@ -110,7 +86,7 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
self.ctx, 2) self.ctx, 2)
def test_subcloud_audits_get_all(self): def test_subcloud_audits_get_all(self):
subcloud_audits = db_api.subcloud_audits_get_all(self.ctxt) subcloud_audits = db_api.subcloud_audits_get_all(self.ctx)
self.assertEqual(len(subcloud_audits), 3) self.assertEqual(len(subcloud_audits), 3)
self.assertEqual(subcloud_audits[0]['subcloud_id'], 1) self.assertEqual(subcloud_audits[0]['subcloud_id'], 1)
self.assertEqual(subcloud_audits[1]['subcloud_id'], 2) self.assertEqual(subcloud_audits[1]['subcloud_id'], 2)
@ -118,7 +94,7 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
def test_subcloud_alarms_delete(self): def test_subcloud_alarms_delete(self):
result = db_api.subcloud_audits_get(self.ctx, 2) result = db_api.subcloud_audits_get(self.ctx, 2)
db_api.subcloud_destroy(self.ctxt, result['subcloud_id']) db_api.subcloud_destroy(self.ctx, result['subcloud_id'])
self.assertRaises(exception.SubcloudNotFound, self.assertRaises(exception.SubcloudNotFound,
db_api.subcloud_audits_get, db_api.subcloud_audits_get,
self.ctx, result['subcloud_id']) self.ctx, result['subcloud_id'])
@ -129,7 +105,7 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
result = db_api.subcloud_audits_get(self.ctx, 2) result = db_api.subcloud_audits_get(self.ctx, 2)
self.assertEqual(result['patch_audit_requested'], False) self.assertEqual(result['patch_audit_requested'], False)
values = {'patch_audit_requested': True} values = {'patch_audit_requested': True}
result = db_api.subcloud_audits_update(self.ctxt, 2, values) result = db_api.subcloud_audits_update(self.ctx, 2, values)
self.assertEqual(result['patch_audit_requested'], True) self.assertEqual(result['patch_audit_requested'], True)
result = db_api.subcloud_audits_get(self.ctx, 1) result = db_api.subcloud_audits_get(self.ctx, 1)
self.assertEqual(result['patch_audit_requested'], False) self.assertEqual(result['patch_audit_requested'], False)
@ -137,15 +113,15 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
self.assertEqual(result['patch_audit_requested'], True) self.assertEqual(result['patch_audit_requested'], True)
def test_subcloud_audits_update_all(self): def test_subcloud_audits_update_all(self):
subcloud_audits = db_api.subcloud_audits_get_all(self.ctxt) subcloud_audits = db_api.subcloud_audits_get_all(self.ctx)
for audit in subcloud_audits: for audit in subcloud_audits:
self.assertEqual(audit['patch_audit_requested'], False) self.assertEqual(audit['patch_audit_requested'], False)
self.assertEqual(audit['load_audit_requested'], False) self.assertEqual(audit['load_audit_requested'], False)
values = {'patch_audit_requested': True, values = {'patch_audit_requested': True,
'load_audit_requested': True} 'load_audit_requested': True}
result = db_api.subcloud_audits_update_all(self.ctxt, values) result = db_api.subcloud_audits_update_all(self.ctx, values)
self.assertEqual(result, 3) self.assertEqual(result, 3)
subcloud_audits = db_api.subcloud_audits_get_all(self.ctxt) subcloud_audits = db_api.subcloud_audits_get_all(self.ctx)
for audit in subcloud_audits: for audit in subcloud_audits:
self.assertEqual(audit['patch_audit_requested'], True) self.assertEqual(audit['patch_audit_requested'], True)
self.assertEqual(audit['load_audit_requested'], True) self.assertEqual(audit['load_audit_requested'], True)
@ -155,22 +131,22 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
last_audit_threshold = current_time - datetime.timedelta( last_audit_threshold = current_time - datetime.timedelta(
seconds=1000) seconds=1000)
audits = db_api.subcloud_audits_get_all_need_audit( audits = db_api.subcloud_audits_get_all_need_audit(
self.ctxt, last_audit_threshold) self.ctx, last_audit_threshold)
# They should all need audits. # They should all need audits.
self.assertEqual(len(audits), 3) self.assertEqual(len(audits), 3)
# Update subcloud1 to show it's been audited recently and # Update subcloud1 to show it's been audited recently and
# check it doesn't come back as needing an audit. # check it doesn't come back as needing an audit.
db_api.subcloud_audits_end_audit(self.ctxt, 1, []) db_api.subcloud_audits_end_audit(self.ctx, 1, [])
audits = db_api.subcloud_audits_get_all_need_audit( audits = db_api.subcloud_audits_get_all_need_audit(
self.ctxt, last_audit_threshold) self.ctx, last_audit_threshold)
subcloud_ids = [audit.subcloud_id for audit in audits] subcloud_ids = [audit.subcloud_id for audit in audits]
self.assertEqual(len(subcloud_ids), 2) self.assertEqual(len(subcloud_ids), 2)
self.assertNotIn(1, subcloud_ids) self.assertNotIn(1, subcloud_ids)
# Set one of the special audits to make sure it overrides. # Set one of the special audits to make sure it overrides.
values = {'patch_audit_requested': True} values = {'patch_audit_requested': True}
db_api.subcloud_audits_update(self.ctxt, 1, values) db_api.subcloud_audits_update(self.ctx, 1, values)
audits = db_api.subcloud_audits_get_all_need_audit( audits = db_api.subcloud_audits_get_all_need_audit(
self.ctxt, last_audit_threshold) self.ctx, last_audit_threshold)
self.assertEqual(len(audits), 3) self.assertEqual(len(audits), 3)
def test_db_migration(self): def test_db_migration(self):
@ -200,38 +176,38 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
db_api.db_sync(get_engine()) db_api.db_sync(get_engine())
# Add another subcloud after the DB migration to test auto-creation # Add another subcloud after the DB migration to test auto-creation
# of subcloud-audit entries during subcloud creation. # of subcloud-audit entries during subcloud creation.
self.create_subcloud(self.ctxt, "subcloud4") self.create_subcloud(self.ctx, "subcloud4")
# Now make sure all four get detected as needing audits. # Now make sure all four get detected as needing audits.
last_audit_threshold = (datetime.datetime.utcnow() - last_audit_threshold = (datetime.datetime.utcnow() -
datetime.timedelta(seconds=1000)) datetime.timedelta(seconds=1000))
audits = db_api.subcloud_audits_get_all_need_audit( audits = db_api.subcloud_audits_get_all_need_audit(
self.ctxt, last_audit_threshold) self.ctx, last_audit_threshold)
# They should all need audits. # They should all need audits.
self.assertEqual(len(audits), 4) self.assertEqual(len(audits), 4)
def test_subcloud_audits_start_and_end(self): def test_subcloud_audits_start_and_end(self):
audit = db_api.subcloud_audits_get_and_start_audit(self.ctxt, 3) audit = db_api.subcloud_audits_get_and_start_audit(self.ctx, 3)
self.assertTrue((datetime.datetime.utcnow() - audit.audit_started_at) < self.assertTrue((datetime.datetime.utcnow() - audit.audit_started_at) <
datetime.timedelta(seconds=1)) datetime.timedelta(seconds=1))
audit = db_api.subcloud_audits_end_audit(self.ctxt, 3, []) audit = db_api.subcloud_audits_end_audit(self.ctx, 3, [])
self.assertTrue((datetime.datetime.utcnow() - audit.audit_finished_at) < self.assertTrue((datetime.datetime.utcnow() - audit.audit_finished_at) <
datetime.timedelta(seconds=1)) datetime.timedelta(seconds=1))
self.assertFalse(audit.state_update_requested) self.assertFalse(audit.state_update_requested)
def test_subcloud_audits_fix_expired(self): def test_subcloud_audits_fix_expired(self):
# Set the 'finished' timestamp later than the 'start' timestamp. # Set the 'finished' timestamp later than the 'start' timestamp.
db_api.subcloud_audits_end_audit(self.ctxt, 3, []) db_api.subcloud_audits_end_audit(self.ctx, 3, [])
# Set the 'start' timestamp later than the 'finished' timestamp # Set the 'start' timestamp later than the 'finished' timestamp
# but with the 'finished' timestamp long ago. # but with the 'finished' timestamp long ago.
db_api.subcloud_audits_get_and_start_audit(self.ctxt, 1) db_api.subcloud_audits_get_and_start_audit(self.ctx, 1)
# Set the 'start' timestamp later than the 'finished' timestamp # Set the 'start' timestamp later than the 'finished' timestamp
# but with the 'finished' timestamp recent. # but with the 'finished' timestamp recent.
db_api.subcloud_audits_end_audit(self.ctxt, 2, []) db_api.subcloud_audits_end_audit(self.ctx, 2, [])
db_api.subcloud_audits_get_and_start_audit(self.ctxt, 2) db_api.subcloud_audits_get_and_start_audit(self.ctx, 2)
last_audit_threshold = (datetime.datetime.utcnow() - last_audit_threshold = (datetime.datetime.utcnow() -
datetime.timedelta(seconds=100)) datetime.timedelta(seconds=100))
count = db_api.subcloud_audits_fix_expired_audits( count = db_api.subcloud_audits_fix_expired_audits(
self.ctxt, last_audit_threshold) self.ctx, last_audit_threshold)
self.assertEqual(count, 1) self.assertEqual(count, 1)
# Check that for the one that was updated we didn't trigger sub-audits. # Check that for the one that was updated we didn't trigger sub-audits.
result = db_api.subcloud_audits_get(self.ctx, 1) result = db_api.subcloud_audits_get(self.ctx, 1)
@ -240,12 +216,12 @@ class DBAPISubcloudAuditsTest(base.DCManagerTestCase):
def test_subcloud_audits_fix_expired_trigger_audits(self): def test_subcloud_audits_fix_expired_trigger_audits(self):
# Set the 'start' timestamp later than the 'finished' timestamp # Set the 'start' timestamp later than the 'finished' timestamp
# but with the 'finished' timestamp long ago. # but with the 'finished' timestamp long ago.
db_api.subcloud_audits_get_and_start_audit(self.ctxt, 1) db_api.subcloud_audits_get_and_start_audit(self.ctx, 1)
last_audit_threshold = (datetime.datetime.utcnow() - last_audit_threshold = (datetime.datetime.utcnow() -
datetime.timedelta(seconds=100)) datetime.timedelta(seconds=100))
# Fix up expired audits and trigger subaudits. # Fix up expired audits and trigger subaudits.
count = db_api.subcloud_audits_fix_expired_audits( count = db_api.subcloud_audits_fix_expired_audits(
self.ctxt, last_audit_threshold, trigger_audits=True) self.ctx, last_audit_threshold, trigger_audits=True)
self.assertEqual(count, 1) self.assertEqual(count, 1)
# For the fixed-up audits, subaudits should be requested. # For the fixed-up audits, subaudits should be requested.
result = db_api.subcloud_audits_get(self.ctx, 1) result = db_api.subcloud_audits_get(self.ctx, 1)

View File

@ -19,12 +19,7 @@
# of this software may be licensed only pursuant to the terms # of this software may be licensed only pursuant to the terms
# of an applicable Wind River license agreement. # of an applicable Wind River license agreement.
# #
import sqlalchemy
from oslo_config import cfg
from oslo_db import exception as db_exception from oslo_db import exception as db_exception
from oslo_db import options
from dcmanager.common import config from dcmanager.common import config
from dcmanager.common import consts from dcmanager.common import consts
@ -51,23 +46,6 @@ def set_sqlite_pragma(dbapi_connection, connection_record):
class DBAPISubcloudTest(base.DCManagerTestCase): class DBAPISubcloudTest(base.DCManagerTestCase):
def setup_dummy_db(self):
options.cfg.set_defaults(options.database_opts,
sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
@staticmethod
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
@staticmethod @staticmethod
def create_subcloud_static(ctxt, **kwargs): def create_subcloud_static(ctxt, **kwargs):
@ -141,10 +119,7 @@ class DBAPISubcloudTest(base.DCManagerTestCase):
def setUp(self): def setUp(self):
super(DBAPISubcloudTest, self).setUp() super(DBAPISubcloudTest, self).setUp()
# calling setUp for the superclass sets up the DB and context
self.setup_dummy_db()
self.addCleanup(self.reset_dummy_db)
self.ctx = utils.dummy_context()
def test_create_subcloud(self): def test_create_subcloud(self):
fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0) fake_subcloud = utils.create_subcloud_dict(base.SUBCLOUD_SAMPLE_DATA_0)

View File

@ -22,13 +22,9 @@
import eventlet import eventlet
import random import random
import sqlalchemy
import string import string
import uuid import uuid
from oslo_config import cfg
from oslo_db import options
from dcmanager.common import context from dcmanager.common import context
from dcmanager.db import api as db_api from dcmanager.db import api as db_api
@ -58,25 +54,6 @@ def random_name():
for x in range(10)) for x in range(10))
def setup_dummy_db():
options.cfg.set_defaults(options.database_opts, sqlite_synchronous=False)
options.set_defaults(cfg.CONF, connection="sqlite://")
engine = get_engine()
db_api.db_sync(engine)
engine.connect()
def reset_dummy_db():
engine = get_engine()
meta = sqlalchemy.MetaData()
meta.reflect(bind=engine)
for table in reversed(meta.sorted_tables):
if table.name == 'migrate_version':
continue
engine.execute(table.delete())
def dummy_context(user='test_username', tenant='test_project_id', def dummy_context(user='test_username', tenant='test_project_id',
region_name=None): region_name=None):
return context.RequestContext.from_dict({ return context.RequestContext.from_dict({

View File

@ -17,8 +17,6 @@ testresources>=0.2.4 # Apache-2.0/BSD
testscenarios>=0.4 # Apache-2.0/BSD testscenarios>=0.4 # Apache-2.0/BSD
WebTest>=2.0 # MIT WebTest>=2.0 # MIT
oslotest>=1.10.0 # Apache-2.0 oslotest>=1.10.0 # Apache-2.0
os-testr>=0.8.0 # Apache-2.0
tempest-lib>=0.14.0 # Apache-2.0
pylint==1.9.2;python_version<"3.0" # GPLv2 pylint==1.9.2;python_version<"3.0" # GPLv2
pylint==2.3.1;python_version>="3.0" # GPLv2 pylint==2.3.1;python_version>="3.0" # GPLv2
PyYAML>=3.1.0 PyYAML>=3.1.0