diff --git a/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/nfvi_compute_api.py b/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/nfvi_compute_api.py index b2a93eca..b952f261 100755 --- a/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/nfvi_compute_api.py +++ b/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/nfvi_compute_api.py @@ -3405,6 +3405,22 @@ class NFVIComputeAPI(nfvi.api.v1.NFVIComputeAPI): """ self._instance_delete_callbacks.append(callback) + def ready_to_initialize(self, config_file): + """ + Check if the plugin is ready to initialize + """ + config.load(config_file) + + # In order for the compute plugin to initialize successfully, the + # rabbitmq server must be running. If it is not running, the plugin + # initialization cannot register with rabbitmq and will throw an + # exception. It is essentially impossible to clean up the plugin in + # that case, so we must avoid it. + return rpc_listener.test_connection( + config.CONF['amqp']['host'], config.CONF['amqp']['port'], + config.CONF['amqp']['user_id'], config.CONF['amqp']['password'], + config.CONF['amqp']['virt_host'], "nova") + def initialize(self, config_file): """ Initialize the plugin diff --git a/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/openstack/rpc_listener.py b/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/openstack/rpc_listener.py index 5f37e674..8e3f6829 100755 --- a/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/openstack/rpc_listener.py +++ b/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/openstack/rpc_listener.py @@ -145,3 +145,33 @@ class RPCListener(threading.Thread): Stop RPC Listener """ self._exit.set() + + +def test_connection(host, port, user_id, password, virt_host, exchange_name): + """ + Test a connection to an exchange on a virtual host + """ + connection = None + connected = False + success = False + + try: + # Connect to the virtual host - will raise exception if it fails. + connection = Connection(host, user_id, password, virt_host, port) + connection.connect() + connected = connection.connected + if connected: + # Check whether exchange exists - will raise exception if it fails. + exchange = Exchange(exchange_name, channel=connection, + type='topic', durable=False, passive=True) + exchange.declare() + success = True + except Exception as e: + DLOG.info("Unable to connect to virt_host %s, exchange %s, error: %s" % + (virt_host, exchange_name, e)) + + finally: + if connected: + connection.close() + + return success diff --git a/nfv/nfv-vim/nfv_vim/nfvi/__init__.py b/nfv/nfv-vim/nfv_vim/nfvi/__init__.py index 4b13f897..9e2569d4 100755 --- a/nfv/nfv-vim/nfv_vim/nfvi/__init__.py +++ b/nfv/nfv-vim/nfv_vim/nfvi/__init__.py @@ -134,4 +134,5 @@ from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_query_hosts # noqa: from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_update_host # noqa: F401 from nfv_vim.nfvi._nfvi_sw_mgmt_module import nfvi_sw_mgmt_update_hosts # noqa: F401 from nfv_vim.nfvi._nfvi_module import nfvi_initialize # noqa: F401 +from nfv_vim.nfvi._nfvi_module import nfvi_reinitialize # noqa: F401 from nfv_vim.nfvi._nfvi_module import nfvi_finalize # noqa: F401 diff --git a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_compute_module.py b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_compute_module.py index ddf8a022..b1c521ad 100755 --- a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_compute_module.py +++ b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_compute_module.py @@ -470,8 +470,13 @@ def nfvi_compute_initialize(config, pool): """ global _compute_plugin - _compute_plugin = NFVIComputePlugin(config['namespace'], pool) - _compute_plugin.initialize(config['config_file']) + if _compute_plugin is None: + _compute_plugin = NFVIComputePlugin(config['namespace'], pool) + if _compute_plugin.ready_to_initialize(config['config_file']): + _compute_plugin.initialize(config['config_file']) + return True + else: + return False def nfvi_compute_finalize(): diff --git a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_module.py b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_module.py index 8d07a47d..9844dcf6 100755 --- a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_module.py +++ b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_module.py @@ -27,6 +27,8 @@ DLOG = debug.debug_get_logger('nfv_vim.nfvi.nfvi_module') _task_worker_pools = dict() +DISABLED_LIST = ['Yes', 'yes', 'Y', 'y', 'True', 'true', 'T', 't', '1'] + def nfvi_initialize(config): """ @@ -34,18 +36,18 @@ def nfvi_initialize(config): """ global _task_worker_pools - disabled_list = ['Yes', 'yes', 'Y', 'y', 'True', 'true', 'T', 't', '1'] + init_complete = True image_plugin_disabled = (config.get('image_plugin_disabled', - 'False') in disabled_list) + 'False') in DISABLED_LIST) block_storage_plugin_disabled = (config.get( - 'block_storage_plugin_disabled', 'False') in disabled_list) + 'block_storage_plugin_disabled', 'False') in DISABLED_LIST) compute_plugin_disabled = (config.get('compute_plugin_disabled', - 'False') in disabled_list) + 'False') in DISABLED_LIST) network_plugin_disabled = (config.get('network_plugin_disabled', - 'False') in disabled_list) + 'False') in DISABLED_LIST) guest_plugin_disabled = (config.get('guest_plugin_disabled', - 'False') in disabled_list) + 'False') in DISABLED_LIST) _task_worker_pools['identity'] = \ tasks.TaskWorkerPool('Identity', num_workers=1) @@ -66,7 +68,8 @@ def nfvi_initialize(config): # two requests to the nova-api at a time. _task_worker_pools['compute'] = \ tasks.TaskWorkerPool('Compute', num_workers=2) - nfvi_compute_initialize(config, _task_worker_pools['compute']) + init_complete = nfvi_compute_initialize(config, + _task_worker_pools['compute']) if not network_plugin_disabled: _task_worker_pools['network'] = \ @@ -86,6 +89,24 @@ def nfvi_initialize(config): tasks.TaskWorkerPool('Sw-Mgmt', num_workers=1) nfvi_sw_mgmt_initialize(config, _task_worker_pools['sw_mgmt']) + return init_complete + + +def nfvi_reinitialize(config): + """ + Re-initialize the NFVI package + """ + global _task_worker_pools + + init_complete = True + compute_plugin_disabled = (config.get('compute_plugin_disabled', + 'False') in DISABLED_LIST) + if not compute_plugin_disabled: + init_complete = nfvi_compute_initialize(config, + _task_worker_pools['compute']) + + return init_complete + def nfvi_finalize(): """ diff --git a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_plugin.py b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_plugin.py index 049eee66..9458001c 100755 --- a/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_plugin.py +++ b/nfv/nfv-vim/nfv_vim/nfvi/_nfvi_plugin.py @@ -77,6 +77,15 @@ class NFVIPlugin(object): tasks.TASK_PRIORITY.MED, command, *command_args, **command_kwargs) return command_id + def ready_to_initialize(self, config_file): + """ + Check if we are ready to initialize plugin + """ + if self._plugin is not None: + return self._plugin.obj.ready_to_initialize(config_file) + else: + return False + def initialize(self, config_file): """ Initialize plugin diff --git a/nfv/nfv-vim/nfv_vim/nfvi/api/v1/_nfvi_compute_api.py b/nfv/nfv-vim/nfv_vim/nfvi/api/v1/_nfvi_compute_api.py index e0e1fbf3..ff02c565 100755 --- a/nfv/nfv-vim/nfv_vim/nfvi/api/v1/_nfvi_compute_api.py +++ b/nfv/nfv-vim/nfv_vim/nfvi/api/v1/_nfvi_compute_api.py @@ -332,6 +332,13 @@ class NFVIComputeAPI(object): """ pass + @abc.abstractmethod + def ready_to_initialize(self, config_file): + """ + Check if the plugin is ready to initialize + """ + pass + @abc.abstractmethod def initialize(self, config_file): """ diff --git a/nfv/nfv-vim/nfv_vim/vim.py b/nfv/nfv-vim/nfv_vim/vim.py index 65946db8..1f9a605d 100755 --- a/nfv/nfv-vim/nfv_vim/vim.py +++ b/nfv/nfv-vim/nfv_vim/vim.py @@ -3,6 +3,7 @@ # # SPDX-License-Identifier: Apache-2.0 # +import os import sys import signal import argparse @@ -63,6 +64,8 @@ def process_initialize(): """ Virtual Infrastructure Manager - Initialize """ + init_complete = True + debug.debug_initialize(config.CONF['debug'], 'VIM') profiler.profiler_initialize() selobj.selobj_initialize() @@ -72,7 +75,9 @@ def process_initialize(): schedule.schedule_initialize() event_log.event_log_initialize(config.CONF['event-log']) alarm.alarm_initialize(config.CONF['alarm']) - nfvi.nfvi_initialize(config.CONF['nfvi']) + if not nfvi.nfvi_initialize(config.CONF['nfvi']): + DLOG.info("nfvi_initialize failed") + init_complete = False database.database_initialize(config.CONF['database']) database.database_migrate_data() tables.tables_initialize() @@ -80,6 +85,21 @@ def process_initialize(): events.events_initialize() audits.audits_initialize() dor.dor_initialize() + return init_complete + + +def process_reinitialize(): + """ + Virtual Infrastructure Manager - Reinitialize + """ + init_complete = True + + if not nfvi.nfvi_reinitialize(config.CONF['nfvi']): + DLOG.info("nfvi_reinitialize failed") + init_complete = False + else: + DLOG.info("nfvi_reinitialize succeeded") + return init_complete def process_finalize(): @@ -106,8 +126,13 @@ def process_main(): """ Virtual Infrastructure Manager - Main """ + def _force_exit(): + os._exit(-1) + global do_reload, dump_data_captured, reset_data_captured + process_start_time = timers.get_monotonic_timestamp_in_ms() + try: # signal.signal(signal.SIGTERM, process_signal_handler) signal.signal(signal.SIGINT, process_signal_handler) @@ -128,7 +153,8 @@ def process_main(): debug_ini = sys.prefix + '/' + config.CONF['debug']['config_file'] config.CONF['debug']['config_file'] = debug_ini - process_initialize() + init_complete = process_initialize() + last_init_time = timers.get_monotonic_timestamp_in_ms() DLOG.info("Started") @@ -164,6 +190,20 @@ def process_main(): DLOG.info("Reset captured data complete.") reset_data_captured = False + if not init_complete: + # Retry initialization for up to 3 minutes. + now_ms = timers.get_monotonic_timestamp_in_ms() + secs_expired = (now_ms - process_start_time) / 1000 + if secs_expired < 180: + time_since_init = (now_ms - last_init_time) / 1000 + # Reattempt initialization every 10 seconds. + if time_since_init > 10: + init_complete = process_reinitialize() + last_init_time = timers.get_monotonic_timestamp_in_ms() + else: + DLOG.warn("Initialization failed - exiting.") + sys.exit(200) + except KeyboardInterrupt: print("Keyboard Interrupt received.") @@ -173,4 +213,8 @@ def process_main(): finally: open(PROCESS_NOT_RUNNING_FILE, 'w').close() + # Allow up to 10 seconds for the process to shut down. If the + # process_finalize hangs, we will do a hard exit. + signal.signal(signal.SIGALRM, _force_exit) + signal.alarm(10) process_finalize()