Merge "Allow VIM to start before OpenStack pods"

This commit is contained in:
Zuul 2018-12-19 14:35:41 +00:00 committed by Gerrit Code Review
commit b27c6b86f6
8 changed files with 144 additions and 11 deletions

View File

@ -3405,6 +3405,22 @@ class NFVIComputeAPI(nfvi.api.v1.NFVIComputeAPI):
"""
self._instance_delete_callbacks.append(callback)
def ready_to_initialize(self, config_file):
"""
Check if the plugin is ready to initialize
"""
config.load(config_file)
# In order for the compute plugin to initialize successfully, the
# rabbitmq server must be running. If it is not running, the plugin
# initialization cannot register with rabbitmq and will throw an
# exception. It is essentially impossible to clean up the plugin in
# that case, so we must avoid it.
return rpc_listener.test_connection(
config.CONF['amqp']['host'], config.CONF['amqp']['port'],
config.CONF['amqp']['user_id'], config.CONF['amqp']['password'],
config.CONF['amqp']['virt_host'], "nova")
def initialize(self, config_file):
"""
Initialize the plugin

View File

@ -145,3 +145,33 @@ class RPCListener(threading.Thread):
Stop RPC Listener
"""
self._exit.set()
def test_connection(host, port, user_id, password, virt_host, exchange_name):
"""
Test a connection to an exchange on a virtual host
"""
connection = None
connected = False
success = False
try:
# Connect to the virtual host - will raise exception if it fails.
connection = Connection(host, user_id, password, virt_host, port)
connection.connect()
connected = connection.connected
if connected:
# Check whether exchange exists - will raise exception if it fails.
exchange = Exchange(exchange_name, channel=connection,
type='topic', durable=False, passive=True)
exchange.declare()
success = True
except Exception as e:
DLOG.info("Unable to connect to virt_host %s, exchange %s, error: %s" %
(virt_host, exchange_name, e))
finally:
if connected:
connection.close()
return success

View File

@ -134,4 +134,5 @@ from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_query_hosts # noqa:
from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_update_host # noqa: F401
from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_update_hosts # noqa: F401
from nfv_vim.nfvi._nfvi_module import nfvi_initialize # noqa: F401
from nfv_vim.nfvi._nfvi_module import nfvi_reinitialize # noqa: F401
from nfv_vim.nfvi._nfvi_module import nfvi_finalize # noqa: F401

View File

@ -470,8 +470,13 @@ def nfvi_compute_initialize(config, pool):
"""
global _compute_plugin
_compute_plugin = NFVIComputePlugin(config['namespace'], pool)
_compute_plugin.initialize(config['config_file'])
if _compute_plugin is None:
_compute_plugin = NFVIComputePlugin(config['namespace'], pool)
if _compute_plugin.ready_to_initialize(config['config_file']):
_compute_plugin.initialize(config['config_file'])
return True
else:
return False
def nfvi_compute_finalize():

View File

@ -27,6 +27,8 @@ DLOG = debug.debug_get_logger('nfv_vim.nfvi.nfvi_module')
_task_worker_pools = dict()
DISABLED_LIST = ['Yes', 'yes', 'Y', 'y', 'True', 'true', 'T', 't', '1']
def nfvi_initialize(config):
"""
@ -34,18 +36,18 @@ def nfvi_initialize(config):
"""
global _task_worker_pools
disabled_list = ['Yes', 'yes', 'Y', 'y', 'True', 'true', 'T', 't', '1']
init_complete = True
image_plugin_disabled = (config.get('image_plugin_disabled',
'False') in disabled_list)
'False') in DISABLED_LIST)
block_storage_plugin_disabled = (config.get(
'block_storage_plugin_disabled', 'False') in disabled_list)
'block_storage_plugin_disabled', 'False') in DISABLED_LIST)
compute_plugin_disabled = (config.get('compute_plugin_disabled',
'False') in disabled_list)
'False') in DISABLED_LIST)
network_plugin_disabled = (config.get('network_plugin_disabled',
'False') in disabled_list)
'False') in DISABLED_LIST)
guest_plugin_disabled = (config.get('guest_plugin_disabled',
'False') in disabled_list)
'False') in DISABLED_LIST)
_task_worker_pools['identity'] = \
tasks.TaskWorkerPool('Identity', num_workers=1)
@ -66,7 +68,8 @@ def nfvi_initialize(config):
# two requests to the nova-api at a time.
_task_worker_pools['compute'] = \
tasks.TaskWorkerPool('Compute', num_workers=2)
nfvi_compute_initialize(config, _task_worker_pools['compute'])
init_complete = nfvi_compute_initialize(config,
_task_worker_pools['compute'])
if not network_plugin_disabled:
_task_worker_pools['network'] = \
@ -86,6 +89,24 @@ def nfvi_initialize(config):
tasks.TaskWorkerPool('Sw-Mgmt', num_workers=1)
nfvi_sw_mgmt_initialize(config, _task_worker_pools['sw_mgmt'])
return init_complete
def nfvi_reinitialize(config):
"""
Re-initialize the NFVI package
"""
global _task_worker_pools
init_complete = True
compute_plugin_disabled = (config.get('compute_plugin_disabled',
'False') in DISABLED_LIST)
if not compute_plugin_disabled:
init_complete = nfvi_compute_initialize(config,
_task_worker_pools['compute'])
return init_complete
def nfvi_finalize():
"""

View File

@ -77,6 +77,15 @@ class NFVIPlugin(object):
tasks.TASK_PRIORITY.MED, command, *command_args, **command_kwargs)
return command_id
def ready_to_initialize(self, config_file):
"""
Check if we are ready to initialize plugin
"""
if self._plugin is not None:
return self._plugin.obj.ready_to_initialize(config_file)
else:
return False
def initialize(self, config_file):
"""
Initialize plugin

View File

@ -332,6 +332,13 @@ class NFVIComputeAPI(object):
"""
pass
@abc.abstractmethod
def ready_to_initialize(self, config_file):
"""
Check if the plugin is ready to initialize
"""
pass
@abc.abstractmethod
def initialize(self, config_file):
"""

View File

@ -3,6 +3,7 @@
#
# SPDX-License-Identifier: Apache-2.0
#
import os
import sys
import signal
import argparse
@ -63,6 +64,8 @@ def process_initialize():
"""
Virtual Infrastructure Manager - Initialize
"""
init_complete = True
debug.debug_initialize(config.CONF['debug'], 'VIM')
profiler.profiler_initialize()
selobj.selobj_initialize()
@ -72,7 +75,9 @@ def process_initialize():
schedule.schedule_initialize()
event_log.event_log_initialize(config.CONF['event-log'])
alarm.alarm_initialize(config.CONF['alarm'])
nfvi.nfvi_initialize(config.CONF['nfvi'])
if not nfvi.nfvi_initialize(config.CONF['nfvi']):
DLOG.info("nfvi_initialize failed")
init_complete = False
database.database_initialize(config.CONF['database'])
database.database_migrate_data()
tables.tables_initialize()
@ -80,6 +85,21 @@ def process_initialize():
events.events_initialize()
audits.audits_initialize()
dor.dor_initialize()
return init_complete
def process_reinitialize():
"""
Virtual Infrastructure Manager - Reinitialize
"""
init_complete = True
if not nfvi.nfvi_reinitialize(config.CONF['nfvi']):
DLOG.info("nfvi_reinitialize failed")
init_complete = False
else:
DLOG.info("nfvi_reinitialize succeeded")
return init_complete
def process_finalize():
@ -106,8 +126,13 @@ def process_main():
"""
Virtual Infrastructure Manager - Main
"""
def _force_exit():
os._exit(-1)
global do_reload, dump_data_captured, reset_data_captured
process_start_time = timers.get_monotonic_timestamp_in_ms()
try:
# signal.signal(signal.SIGTERM, process_signal_handler)
signal.signal(signal.SIGINT, process_signal_handler)
@ -128,7 +153,8 @@ def process_main():
debug_ini = sys.prefix + '/' + config.CONF['debug']['config_file']
config.CONF['debug']['config_file'] = debug_ini
process_initialize()
init_complete = process_initialize()
last_init_time = timers.get_monotonic_timestamp_in_ms()
DLOG.info("Started")
@ -164,6 +190,20 @@ def process_main():
DLOG.info("Reset captured data complete.")
reset_data_captured = False
if not init_complete:
# Retry initialization for up to 3 minutes.
now_ms = timers.get_monotonic_timestamp_in_ms()
secs_expired = (now_ms - process_start_time) / 1000
if secs_expired < 180:
time_since_init = (now_ms - last_init_time) / 1000
# Reattempt initialization every 10 seconds.
if time_since_init > 10:
init_complete = process_reinitialize()
last_init_time = timers.get_monotonic_timestamp_in_ms()
else:
DLOG.warn("Initialization failed - exiting.")
sys.exit(200)
except KeyboardInterrupt:
print("Keyboard Interrupt received.")
@ -173,4 +213,8 @@ def process_main():
finally:
open(PROCESS_NOT_RUNNING_FILE, 'w').close()
# Allow up to 10 seconds for the process to shut down. If the
# process_finalize hangs, we will do a hard exit.
signal.signal(signal.SIGALRM, _force_exit)
signal.alarm(10)
process_finalize()