#!/usr/bin/python # # Copyright (c) 2019 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # ### BEGIN INIT INFO # Provides: ceph/mgr RESTful API plugin # Required-Start: $ceph # Required-Stop: $ceph # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: Ceph MGR RESTful API plugin # Description: Ceph MGR RESTful API plugin ### END INIT INFO import argparse import contextlib import errno import fcntl import inspect import json import logging import multiprocessing import os import shutil import signal import socket import subprocess import sys import tempfile import time import daemon import psutil import requests # 'timeout' command returns exit status 124 # if command times out (see man page) GNU_TIMEOUT_EXPIRED_RETCODE = 124 def psutil_terminate_kill(target, timeout): """Extend psutil functionality to stop a process. SIGINT is sent to each target then after a grace period SIGKILL is sent to the ones that are still running. """ if not isinstance(target, list): target = [target] _, target = psutil.wait_procs(target, timeout=0) for action in [lambda p: p.terminate(), lambda p: p.kill()]: for proc in target: action(proc) _, target = psutil.wait_procs( target, timeout=timeout) class Config(object): """ceph-mgr service wrapper configuration options. In the future we may want to load them from a configuration file (for example /etc/ceph/mgr-restful-plugin.conf ) """ def __init__(self): self.log_level = logging.INFO self.log_dir = '/var/log' self.ceph_mgr_service = '/usr/bin/ceph-mgr' self.ceph_mgr_config = '/etc/ceph/ceph.conf' self.ceph_mgr_cluster = 'ceph' self.ceph_mgr_rundir = '/var/run/ceph/mgr' self.ceph_mgr_confdir = '/var/lib/ceph/mgr' self.ceph_mgr_identity = socket.gethostname() self.service_name = 'mgr-restful-plugin' self.service_socket = os.path.join( self.ceph_mgr_rundir, '{}.socket'.format(self.service_name)) self.service_lock = os.path.join( self.ceph_mgr_rundir, '{}.lock'.format(self.service_name)) self.service_pid_file = os.path.join( '/var/run/ceph', '{}.pid'.format(self.service_name)) self.restful_plugin_port = 7999 # maximum size of a message received/sent via # service monitor control socket self.service_socket_bufsize = 1024 # maximum time to wait for ceph cli to exit self.ceph_cli_timeout_sec = 30 # how much time to wait after ceph cli commands fail with timeout # before running any other commands self.cluster_grace_period_sec = 30 # after ceph-mgr is started it goes through an internal initialization # phase before; how much time to wait before querying ceph-mgr self.ceph_mgr_grace_period_sec = 15 # after sending SIGTERM to ceph-mgr how much time to wait before # sending SIGKILL (maximum time allowed for ceph-mgr cleanup) self.ceph_mgr_kill_delay_sec = 5 # if service monitor is running a recovery procedure it reports # status OK even if ceph-mgr is currently down. This sets the # maximum number of consecutive ceph-mgr failures before reporting # status error self.ceph_mgr_fail_count_report_error = 3 # maximum number of consecutive ceph-mgr failures before # stopping mgr-restful-plugin service self.ceph_mgr_fail_count_exit = 5 # maximum time allowed for ceph-mgr to respond to a REST API request self.rest_api_timeout_sec = 15 # interval between consecutive REST API requests (ping's). A smaller # value here triggers more requests to ceph-mgr restful plugin. A # higher value makes recovery slower when services become unavailable self.restful_plugin_ping_delay_sec = 3 # where to save the self-signed certificate generated by ceph-mgr self.restful_plugin_cert_path = os.path.join( self.ceph_mgr_rundir, 'restful.crt') # time to wait after enabling restful plugin self.restful_plugin_grace_period_sec = 3 # after how many REST API ping failures to restart ceph-mgr self.ping_fail_count_restart_mgr = 3 # after how many REST API ping failures to report status error. # Until then service monitor reports status OK just in case # restful plugin recovers self.ping_fail_count_report_error = 5 @staticmethod def load(): return Config() def setup_logging(name=None, cleanup_handlers=False): if not name: name = CONFIG.service_name log = logging.getLogger(name) log.setLevel(CONFIG.log_level) if cleanup_handlers: try: for handler in log.handlers: if isinstance(handler, logging.StreamHandler): handler.flush() if isinstance(handler, logging.FileHandler): handler.close() log.handlers = [] except Exception: pass elif log.handlers: return log handler = logging.FileHandler( os.path.join(CONFIG.log_dir, '{}.log'.format(CONFIG.service_name))) handler.setFormatter( logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s')) log.addHandler(handler) return log CONFIG = Config.load() LOG = setup_logging(name='init-wrapper') class ServiceException(Exception): """Generic mgr-restful-plugin service exception. Build exception string based on static (per exception class) string plus args, keyword args passed to exception constructor. """ message = "" def __init__(self, *args, **kwargs): if "message" not in kwargs: try: message = self.message.format(*args, **kwargs) except Exception: # noqa message = '{}, args:{}, kwargs: {}'.format( self.message, args, kwargs) else: message = kwargs["message"] super(ServiceException, self).__init__(message) class ServiceAlreadyStarted(ServiceException): message = ('Service monitor already started') class ServiceLockFailed(ServiceException): message = ('Unable to lock service monitor: ' 'reason={reason}') class ServiceNoSocket(ServiceException): message = ('Unable to create service monitor socket: ' 'reason={reason}') class ServiceSocketBindFailed(ServiceException): message = ('Failed to bind service monitor socket: ' 'path={path}, reason={reason}') class ServiceNoPidFile(ServiceException): message = ('Failed to update pid file: ' 'path={path}, reason={reason}') class CommandFailed(ServiceException): message = ('Command failed: command={command}, ' 'reason={reason}, out={out}') class CommandTimeout(ServiceException): message = ('Command timeout: command={command}, ' 'timeout={timeout}') class CephMgrStartFailed(ServiceException): message = ('Failed to start ceph_mgr: ' 'reason={reason}') class CephRestfulPluginFailed(ServiceException): message = ('Failed to start restful plugin: ' 'reason={reason}') class RestApiPingFailed(ServiceException): message = ('REST API ping failed: ' 'reason={reason}') class ServiceMonitor(object): """Configure and monitor ceph-mgr and restful plugin (Ceph REST API) 1. process init script service requests: status, stop. Requests are received via a control socket. Stop has priority over whatever the monitor is doing currently. Any ceph command that may be running is terminated/killed. Note that while ceph-mgr and restful plugin configuration is in progress ServiceMonitor reports status OK to avoid being restarted by SM. 2. configure ceph-mgr and mgr restful plugin: authentication, REST API service port, self signed certificate. This runs as a separate process so it can be stopped when init script requests it. 3. periodically check (ping) REST API responds to HTTPS requests. Recovery actions are taken if REST API fails to respond: restart ceph-mgr, wait for cluster to become available again. """ def __init__(self): # process running configuration & REST API ping loop self.monitor = None # command socket used by init script self.command = None # ceph-mgr process self.ceph_mgr = None # consecutive ceph-mgr/restful-plugin start failures. Service monitor # reports failure after CONFIG.ceph_mgr_max_failure_count self.ceph_mgr_failure_count = 0 # consecutive REST API ping failures. ceph-mgr service is restarted # after CONFIG.ping_fail_count_restart_mgr threshold is exceeded self.ping_failure_count = 0 # REST API url reported by ceph-mgr after enabling restful plugin self.restful_plugin_url = '' # REST API self signed certificate generated by restful plugin self.certificate = '' def run(self): self.disable_certificate_check() with self.service_lock(), self.service_socket(), \ self.service_pid_file(): self.start_monitor() self.server_loop() def disable_certificate_check(self): # ceph-mgr restful plugin is configured with a self-signed # certificate. Certificate host is hard-coded to "ceph-restful" # which causes HTTPS requests to fail because they don't # match current host name ("controller-..."). Disable HTTPS # certificates check in urllib3 LOG.warning('Disable urllib3 certifcates check') requests.packages.urllib3.disable_warnings() def server_loop(self): self.command.listen(2) while True: try: client, _ = self.command.accept() request = client.recv(CONFIG.service_socket_bufsize) LOG.debug('Monitor command socket: request=%s', str(request)) cmd = request.split(b' ') cmd, args = cmd[0], cmd[1:] if cmd == b'status': self.send_response(client, request, self.status()) elif cmd == b'stop': self.stop() self.send_response(client, request, 'OK') break elif cmd == b'restful-url': try: self.restful_plugin_url = args[0] self.send_response(client, request, 'OK') except IndexError: LOG.warning('Failed to update restful plugin url: ' 'args=%s', str(args)) self.send_response(client, request, 'ERR') elif cmd == b'certificate': try: self.certificate = args[0] if args else '' self.send_response(client, request, 'OK') except IndexError: LOG.warning('Failed to update certificate path: ' 'args=%s', str(args)) self.send_response(client, request, 'ERR') elif cmd == b'ceph-mgr-failures': try: self.ceph_mgr_failure_count = int(args[0]) self.send_response(client, request, 'OK') if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit: self.stop() break except (IndexError, ValueError): LOG.warning('Failed to update ceph-mgr failures: ' 'args=%s', str(args)) self.send_response(client, request, 'ERR') elif cmd == b'ping-failures': try: self.ping_failure_count = int(args[0]) self.send_response(client, request, 'OK') except (IndexError, ValueError): LOG.warning('Failed to update ping failures: ' 'args=%s', str(args)) self.send_response(client, request, 'ERR') except Exception as err: LOG.exception(err) @staticmethod def send_response(client, request, response): try: client.send(response.encode('utf-8')) except socket.error as err: LOG.warning('Failed to send response back. ' 'request=%s, response=%s, reason=%s', request, response, err) def status(self): if not self.restful_plugin_url: if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \ and self.ping_failure_count < CONFIG.ping_fail_count_report_error: LOG.debug('Monitor is starting services. Report status OK') return 'OK' LOG.debug('Too many failures: ' 'ceph_mgr=%d < %d, ping=%d < %d. ' 'Report status ERR', self.ceph_mgr_failure_count, CONFIG.ceph_mgr_fail_count_report_error, self.ping_failure_count, CONFIG.ping_fail_count_report_error) return 'ERR.down' try: self.restful_plugin_ping() LOG.debug('Restful plugin ping successful. Report status OK') return 'OK' except (CommandFailed, RestApiPingFailed): if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \ and self.ping_failure_count < CONFIG.ping_fail_count_report_error: LOG.info('Restful plugin does not respond but failure ' 'count is within acceptable limits: ' ' ceph_mgr=%d < %d, ping=%d < %d. ' 'Report status OK', self.ceph_mgr_failure_count, CONFIG.ceph_mgr_fail_count_report_error, self.ping_failure_count, CONFIG.ping_fail_count_report_error) return 'OK' LOG.debug('Restful does not respond (ping failure count %d). ' 'Report status ERR', self.ping_failure_count) return 'ERR.ping_failed' def stop(self): if not self.monitor: return LOG.info('Stop monitor with SIGTERM to process group %d', self.monitor.pid) try: os.killpg(self.monitor.pid, signal.SIGTERM) except OSError as err: LOG.info('Stop monitor failed: reason=%s', str(err)) return time.sleep(CONFIG.ceph_mgr_kill_delay_sec) LOG.info('Stop monitor with SIGKILL to process group %d', self.monitor.pid) try: os.killpg(self.monitor.pid, signal.SIGKILL) os.waitpid(self.monitor.pid, 0) except OSError as err: LOG.info('Stop monitor failed: reason=%s', str(err)) return LOG.info('Monitor stopped: pid=%d', self.monitor.pid) @contextlib.contextmanager def service_lock(self): LOG.info('Take service lock: path=%s', CONFIG.service_lock) try: os.makedirs(os.path.dirname(CONFIG.service_lock)) except OSError: pass lock_file = open(CONFIG.service_lock, 'w') try: fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except (IOError, OSError) as err: if err.errno == errno.EAGAIN: raise ServiceAlreadyStarted() else: raise ServiceLockFailed(reason=str(err)) # even if we have the lock here there might be another service manager # running whose CONFIG.ceph_mgr_rundir was removed before starting # this instance. Make sure there is only one service manager running self.stop_other_service_managers() try: yield finally: os.unlink(CONFIG.service_lock) lock_file.close() LOG.info('Release service lock: path=%s', CONFIG.service_lock) def stop_other_service_managers(self): service = os.path.join('/etc/init.d', CONFIG.service_name) for p in psutil.process_iter(): if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]: continue if p.pid == os.getpid(): continue p.kill() @contextlib.contextmanager def service_socket(self): LOG.info('Create service socket') try: self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) except socket.error as err: raise ServiceNoSocket(reason=str(err)) LOG.info('Remove existing socket files') try: os.unlink(CONFIG.service_socket) except OSError: pass LOG.info('Bind service socket: path=%s', CONFIG.service_socket) try: self.command.bind(CONFIG.service_socket) except socket.error as err: raise ServiceSocketBindFailed( path=CONFIG.service_socket, reason=str(err)) try: yield finally: LOG.info('Close service socket and remove file: path=%s', CONFIG.service_socket) self.command.close() os.unlink(CONFIG.service_socket) @contextlib.contextmanager def service_pid_file(self): LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file) try: pid_file = open(CONFIG.service_pid_file, 'w') pid_file.write(str(os.getpid())) pid_file.flush() except OSError as err: raise ServiceNoPidFile( path=CONFIG.service_pid_file, reason=str(err)) try: yield finally: LOG.info('Remove service pid file: path=%s', CONFIG.service_pid_file) try: os.unlink(CONFIG.service_pid_file) except OSError: pass def start_monitor(self): LOG.info('Start monitor loop') self.monitor = multiprocessing.Process(target=self.monitor_loop) self.monitor.start() def stop_unmanaged_ceph_mgr(self): LOG.info('Stop unmanaged running ceph-mgr processes') service_name = os.path.basename(CONFIG.ceph_mgr_service) if self.ceph_mgr: psutil_terminate_kill( [proc for proc in psutil.process_iter() if (proc.name() == service_name and proc.pid != self.ceph_mgr.pid)], CONFIG.ceph_mgr_kill_delay_sec) else: psutil_terminate_kill( [proc for proc in psutil.process_iter() if proc.name() == service_name], CONFIG.ceph_mgr_kill_delay_sec) def monitor_loop(self): """Bring up and monitor ceph-mgr restful plugin. Steps: - wait for Ceph cluster to become available - configure and start ceph-mgr - configure and enable restful plugin - send periodic requests to REST API - recover from failures Note: because this runs as a separate process it must send status updates to service monitor via control socket for: ping_failure_count, restful_plugin_url and certificate. """ # Promote to process group leader so parent (service monitor) # can kill the monitor plus processes spawned by it. Otherwise # children of monitor_loop() will keep running in background and # will be reaped by init when they finish but by then they might # interfere with any new service instance. os.setpgrp() # Ignoring SIGTERM here ensures process group is not reused by # the time parent (service monitor) issues the final SIGKILL. signal.signal(signal.SIGTERM, signal.SIG_IGN) while True: try: # steps to configure/start ceph-mgr and restful plugin self.ceph_fsid_get() self.ceph_mgr_auth_create() self.restful_plugin_set_server_port() self.restful_plugin_create_certificate() self.ceph_mgr_start() self.restful_plugin_enable() self.restful_plugin_create_admin_key() self.restful_plugin_get_url() self.restful_plugin_get_certificate() # REST API should be available now # start making periodic requests (ping) while True: try: self.restful_plugin_ping() self.ping_failure_count = 0 self.request_update_ping_failures( self.ping_failure_count) self.ceph_mgr_failure_count = 0 self.request_update_ceph_mgr_failures( self.ceph_mgr_failure_count) time.sleep(CONFIG.restful_plugin_ping_delay_sec) continue except RestApiPingFailed as err: LOG.warning(str(err)) LOG.info('REST API ping failure count=%d', self.ping_failure_count) self.ping_failure_count += 1 self.request_update_ping_failures( self.ping_failure_count) # maybe request failed because ceph-mgr is not running if not self.ceph_mgr_is_running(): self.ceph_mgr_failure_count += 1 self.request_update_ceph_mgr_failures( self.ceph_mgr_failure_count) self.ceph_mgr_start() time.sleep(CONFIG.ceph_mgr_grace_period_sec) continue # maybe request failed because cluster health is not ok if not self.ceph_fsid_get(): LOG.info('Unable to get cluster fsid. ' 'Sleep for a while') time.sleep(CONFIG.cluster_grace_period_sec) break # too many failures? Restart ceph-mgr and go again # through configuration steps if (self.ping_failure_count % CONFIG.ping_fail_count_restart_mgr == 0): LOG.info('Too many consecutive REST API failures. ' 'Restart ceph-mgr. Update service ' 'url and certificate') self.ceph_mgr_stop() self.restful_plugin_url = '' self.request_update_plugin_url(self.restful_plugin_url) self.certificate = '' self.request_update_certificate(self.certificate) break time.sleep(CONFIG.restful_plugin_ping_delay_sec) except CommandFailed as err: LOG.warning(str(err)) time.sleep(CONFIG.cluster_grace_period_sec) except CommandTimeout as err: LOG.warning(str(err)) except (CephMgrStartFailed, CephRestfulPluginFailed) as err: LOG.warning(str(err)) self.ceph_mgr_failure_count += 1 self.request_update_ceph_mgr_failures( self.ceph_mgr_failure_count) time.sleep(CONFIG.ceph_mgr_grace_period_sec) except Exception as err: LOG.exception(err) time.sleep(CONFIG.cluster_grace_period_sec) @staticmethod def run_with_timeout(command, timeout, stderr=subprocess.STDOUT): try: LOG.info('Run command: %s', ' '.join(command)) return subprocess.check_output( ['/usr/bin/timeout', str(timeout)] + command, stdin=subprocess.PIPE, stderr=stderr, shell=False, universal_newlines=True).strip() except subprocess.CalledProcessError as err: if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE: raise CommandTimeout(command=err.cmd, timeout=timeout) raise CommandFailed(command=err.cmd, reason=str(err), out=err.output) def ceph_fsid_get(self): return self.run_with_timeout(['/usr/bin/ceph', 'fsid'], CONFIG.ceph_cli_timeout_sec) def ceph_mgr_has_auth(self): path = '{}/ceph-{}'.format( CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity) try: os.makedirs(path) except OSError as err: pass try: self.run_with_timeout( ['/usr/bin/ceph', 'auth', 'get', 'mgr.{}'.format(CONFIG.ceph_mgr_identity), '-o', '{}/keyring'.format(path)], CONFIG.ceph_cli_timeout_sec) return True except CommandFailed as err: if 'ENOENT' in str(err): return False raise def ceph_mgr_auth_create(self): if self.ceph_mgr_has_auth(): return LOG.info('Create ceph-mgr authentication') self.run_with_timeout( ['/usr/bin/ceph', 'auth', 'get-or-create', 'mgr.{}'.format(CONFIG.ceph_mgr_identity), 'mon', 'allow *', 'osd', 'allow *'], CONFIG.ceph_cli_timeout_sec) def ceph_mgr_is_running(self): if not self.ceph_mgr: return None try: self.ceph_mgr.wait(timeout=0) except psutil.TimeoutExpired: return True return False def ceph_mgr_start(self): if self.ceph_mgr_is_running(): return self.stop_unmanaged_ceph_mgr() LOG.info('Start ceph-mgr daemon') try: with open(os.devnull, 'wb') as null: self.ceph_mgr = psutil.Popen( [CONFIG.ceph_mgr_service, '--cluster', CONFIG.ceph_mgr_cluster, '--conf', CONFIG.ceph_mgr_config, '--id', CONFIG.ceph_mgr_identity, '-f'], close_fds=True, stdout=null, stderr=null, shell=False) except (OSError, ValueError) as err: raise CephMgrStartFailed(reason=str(err)) time.sleep(CONFIG.ceph_mgr_grace_period_sec) def ceph_mgr_stop(self): if not self.ceph_mgr: return LOG.info('Stop ceph-mgr') psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec) def restful_plugin_has_server_port(self): try: with open(os.devnull, 'wb') as null: out = self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', 'mgr/restful/server_port'], CONFIG.ceph_cli_timeout_sec, stderr=null) if out == str(CONFIG.restful_plugin_port): return True LOG.warning('Restful plugin port mismatch: ' 'current=%d, expected=%d', out, CONFIG.restful_plugin_port) except CommandFailed as err: LOG.warning('Failed to get restful plugin port: ' 'reason=%s', str(err)) return False def restful_plugin_set_server_port(self): if self.restful_plugin_has_server_port(): return LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'set', 'mgr/restful/server_port', str(CONFIG.restful_plugin_port)], CONFIG.ceph_cli_timeout_sec) def restful_plugin_has_admin_key(self): try: self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', 'mgr/restful/keys/admin'], CONFIG.ceph_cli_timeout_sec) return True except CommandFailed: pass return False def restful_plugin_create_admin_key(self): if self.restful_plugin_has_admin_key(): return LOG.info('Create restful plugin admin key') self.run_with_timeout( ['/usr/bin/ceph', 'restful', 'create-key', 'admin'], CONFIG.ceph_cli_timeout_sec) def restful_plugin_has_certificate(self): try: self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'get', '/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)], CONFIG.ceph_cli_timeout_sec) return True except CommandFailed: pass return False def restful_plugin_create_certificate(self): if self.restful_plugin_has_certificate(): return LOG.info('Create restful plugin self signed certificate') path = tempfile.mkdtemp() try: try: with tempfile.NamedTemporaryFile() as restful_cnf: restful_cnf.write(( '[req]\n' 'req_extensions = v3_ca\n' 'distinguished_name = req_distinguished_name\n' '[v3_ca]\n' 'subjectAltName=DNS:{}\n' 'basicConstraints = CA:true\n' '[ req_distinguished_name ]\n' '0.organizationName = IT\n' 'commonName = ceph-restful\n').format( CONFIG.ceph_mgr_identity).encode('utf-8')) restful_cnf.flush() subprocess.check_call([ '/usr/bin/openssl', 'req', '-new', '-nodes', '-x509', '-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity, '-days', '3650', '-config', restful_cnf.name, '-out', os.path.join(path, 'crt'), '-keyout', os.path.join(path, 'key'), '-extensions', 'v3_ca']) except subprocess.CalledProcessError as err: raise CommandFailed( command=' '.join(err.cmd), reason='failed to generate self-signed certificate: {}'.format(str(err)), out=err.output) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'set', 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity), '-i', os.path.join(path, 'crt')], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'set', 'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity), '-i', os.path.join(path, 'crt')], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'set', 'config/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity), '-i', os.path.join(path, 'key')], CONFIG.ceph_cli_timeout_sec) self.run_with_timeout( ['/usr/bin/ceph', 'config-key', 'set', 'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity), '-i', os.path.join(path, 'key')], CONFIG.ceph_cli_timeout_sec) finally: shutil.rmtree(path) def restful_plugin_is_enabled(self): command = ['/usr/bin/ceph', 'mgr', 'module', 'ls', '--format', 'json'] with open(os.devnull, 'wb') as null: out = self.run_with_timeout( command, CONFIG.ceph_cli_timeout_sec, stderr=null) try: if 'restful' in json.loads(out)['enabled_modules']: return True except ValueError as err: raise CommandFailed( command=' '.join(command), reason='unable to decode json: {}'.format(err), out=out) except KeyError as err: raise CommandFailed( command=' '.join(command), reason='missing expected key: {}'.format(err), out=out) return False def restful_plugin_enable(self): if not self.restful_plugin_is_enabled(): LOG.info('Enable restful plugin') self.run_with_timeout( ['/usr/bin/ceph', 'mgr', 'module', 'enable', 'restful'], CONFIG.ceph_cli_timeout_sec) time.sleep(CONFIG.restful_plugin_grace_period_sec) def restful_plugin_get_url(self): command = ['/usr/bin/ceph', 'mgr', 'services', '--format', 'json'] with open(os.devnull, 'wb') as null: out = self.run_with_timeout( command, CONFIG.ceph_cli_timeout_sec, stderr=null) try: self.restful_plugin_url = json.loads(out)['restful'] except ValueError as err: raise CephRestfulPluginFailed( reason='unable to decode json: {} output={}'.format(err, out)) except KeyError as err: raise CephRestfulPluginFailed( reason='missing expected key: {} in ouput={}'.format(err, out)) self.request_update_plugin_url(self.restful_plugin_url) def restful_plugin_get_certificate(self): command = ['/usr/bin/ceph', 'config-key', 'get', 'config/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)] with open(os.devnull, 'wb') as null: certificate = self.run_with_timeout( command, CONFIG.ceph_cli_timeout_sec, stderr=null) with open(CONFIG.restful_plugin_cert_path, 'w') as cert_file: cert_file.write(certificate) self.certificate = CONFIG.restful_plugin_cert_path self.request_update_certificate( self.certificate) def restful_plugin_ping(self): if not self.restful_plugin_url: raise RestApiPingFailed(reason='missing service url') if not self.certificate: raise RestApiPingFailed(reason='missing certificate') LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url) try: response = requests.request( 'GET', self.restful_plugin_url, verify=False, timeout=CONFIG.rest_api_timeout_sec) if not response.ok: raise RestApiPingFailed( reason='response not ok ({})'.format(response)) LOG.debug('Ping restful plugin OK') except (requests.ConnectionError, requests.Timeout, requests.HTTPError) as err: raise RestApiPingFailed(reason=str(err)) @staticmethod def _make_client_socket(): sock = socket.socket( socket.AF_UNIX, socket.SOCK_SEQPACKET) sock.settimeout(2 * CONFIG.rest_api_timeout_sec) sock.connect(CONFIG.service_socket) return sock @staticmethod def request_status(): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send(b'status') status = sock.recv(CONFIG.service_socket_bufsize) LOG.debug('Status %s', status) return status.startswith(b'OK') except socket.error as err: LOG.error('Status error: reason=%s', err) return False @staticmethod def request_stop(): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send(b'stop') response = sock.recv(CONFIG.service_socket_bufsize) LOG.debug('Stop response: %s', response) return True except socket.error as err: LOG.error('Stop error: reason=%s', err) return False @staticmethod def request_update_ceph_mgr_failures(count): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send('ceph-mgr-failures {}'.format(count).encode('utf-8')) sock.recv(CONFIG.service_socket_bufsize) return True except socket.error as err: LOG.error('Stop error: reason=%s', err) return False @staticmethod def request_update_ping_failures(count): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send('ping-failures {}'.format(count).encode('utf-8')) sock.recv(CONFIG.service_socket_bufsize) return True except socket.error as err: LOG.error('Stop error: reason=%s', err) return False @staticmethod def request_update_plugin_url(url): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send('restful-url {}'.format(url).encode('utf-8')) sock.recv(CONFIG.service_socket_bufsize) return True except socket.error as err: LOG.error('Stop error: reason=%s', err) return False @staticmethod def request_update_certificate(path): try: with contextlib.closing( ServiceMonitor._make_client_socket()) as sock: sock.send('certificate {}'.format(path).encode('utf-8')) sock.recv(CONFIG.service_socket_bufsize) return True except socket.error as err: LOG.error('Stop error: reason=%s', err) return False class InitWrapper(object): """Handle System V init script actions: start, stop, restart, etc. """ def __init__(self): """Dispatch command line action to the corresponding function. Candidate action functions are all class methods except ones that start with an underscore. """ parser = argparse.ArgumentParser() actions = [m[0] for m in inspect.getmembers(self) if (inspect.ismethod(m[1]) and not m[0].startswith('_'))] parser.add_argument( 'action', choices=actions) self.args = parser.parse_args() getattr(self, self.args.action)() def start(self): """Start ServiceMonitor as a daemon unless one is already running. Use a pipe to report monitor status back to this process. """ pipe = os.pipe() child = os.fork() if child == 0: os.close(pipe[0]) with daemon.DaemonContext(files_preserve=[pipe[1]]): # prevent duplication of messages in log global LOG LOG = setup_logging(cleanup_handlers=True) try: monitor = ServiceMonitor() status = b'OK' except ServiceAlreadyStarted: os.write(pipe[1], b'OK') os.close(pipe[1]) return except Exception as err: status = str(err) os.write(pipe[1], status) os.close(pipe[1]) if status == b'OK': try: monitor.run() except ServiceException as err: LOG.warning(str(err)) except Exception as err: LOG.exception('Service monitor error: reason=%s', err) else: os.close(pipe[1]) try: status = os.read(pipe[0], CONFIG.service_socket_bufsize) if status == b'OK': sys.exit(0) else: LOG.warning('Service monitor failed to start: ' 'status=%s', status) except IOError as err: LOG.warning('Failed to read monitor status: reason=%s', err) os.close(pipe[0]) os.waitpid(child, 0) sys.exit(1) def stop(self): """Tell ServiceMonitor daemon to stop running. In case request fails stop ServiceMonitor and ceph_mgr proecsses using SIGTERM followed by SIGKILL. """ result = ServiceMonitor.request_stop() if not result: ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service) procs = [] for proc in psutil.process_iter(): name = proc.name() if name == CONFIG.service_name: procs.append(proc) if name == ceph_mgr: procs.append(proc) psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec) def restart(self): self.stop() self.start() def force_reload(self): self.stop() self.start() def reload(self): self.stop() self.start() def status(self): """Report status from ServiceMonitor. We don't just try to access REST API here because ServiceMonitor may be in the process of starting/configuring ceph-mgr and restful plugin in which case we report OK to avoid being restarted by SM. """ status = ServiceMonitor.request_status() sys.exit(0 if status is True else 1) if __name__ == '__main__': InitWrapper()