integ/ceph/ceph/files/mgr-restful-plugin.py

1148 lines
43 KiB
Python

#!/usr/bin/python
#
# Copyright (c) 2019-2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
### BEGIN INIT INFO
# Provides: ceph/mgr RESTful API plugin
# Required-Start: $ceph
# Required-Stop: $ceph
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Ceph MGR RESTful API plugin
# Description: Ceph MGR RESTful API plugin
### END INIT INFO
import argparse
import contextlib
import errno
import fcntl
import inspect
import json
import logging
import multiprocessing
import os
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import time
from datetime import datetime
import daemon
import psutil
import requests
# 'timeout' command returns exit status 124
# if command times out (see man page)
GNU_TIMEOUT_EXPIRED_RETCODE = 124
def psutil_terminate_kill(target, timeout):
"""Extend psutil functionality to stop a process.
SIGINT is sent to each target then after a grace period SIGKILL
is sent to the ones that are still running.
"""
if not isinstance(target, list):
target = [target]
_, target = psutil.wait_procs(target, timeout=0)
for action in [lambda p: p.terminate(), lambda p: p.kill()]:
for proc in target:
action(proc)
_, target = psutil.wait_procs(
target, timeout=timeout)
class Config(object):
"""ceph-mgr service wrapper configuration options.
In the future we may want to load them from a configuration file
(for example /etc/ceph/mgr-restful-plugin.conf )
"""
def __init__(self):
self.log_level = logging.INFO
self.log_dir = '/var/log'
self.ceph_mgr_service = '/usr/bin/ceph-mgr'
self.ceph_mgr_config = '/etc/ceph/ceph.conf'
self.ceph_mgr_cluster = 'ceph'
self.ceph_mgr_rundir = '/var/run/ceph/mgr'
self.ceph_mgr_confdir = '/var/lib/ceph/mgr'
self.ceph_mgr_identity = socket.gethostname()
self.service_name = 'mgr-restful-plugin'
self.service_socket = os.path.join(
self.ceph_mgr_rundir, '{}.socket'.format(self.service_name))
self.service_lock = os.path.join(
self.ceph_mgr_rundir, '{}.lock'.format(self.service_name))
self.service_pid_file = os.path.join(
'/var/run/ceph', '{}.pid'.format(self.service_name))
self.restful_plugin_port = 7999
# maximum size of a message received/sent via
# service monitor control socket
self.service_socket_bufsize = 1024
# maximum time to wait for ceph cli to exit
self.ceph_cli_timeout_sec = 30
# how much time to wait after ceph cli commands fail with timeout
# before running any other commands
self.cluster_grace_period_sec = 30
# after ceph-mgr is started it goes through an internal initialization
# phase before; how much time to wait before querying ceph-mgr
self.ceph_mgr_grace_period_sec = 15
# after sending SIGTERM to ceph-mgr how much time to wait before
# sending SIGKILL (maximum time allowed for ceph-mgr cleanup)
self.ceph_mgr_kill_delay_sec = 5
# if service monitor is running a recovery procedure it reports
# status OK even if ceph-mgr is currently down. This sets the
# maximum number of consecutive ceph-mgr failures before reporting
# status error
self.ceph_mgr_fail_count_report_error = 3
# maximum number of consecutive ceph-mgr failures before
# stopping mgr-restful-plugin service
self.ceph_mgr_fail_count_exit = 5
# maximum time allowed for ceph-mgr to respond to a REST API request
self.rest_api_timeout_sec = 15
# interval between consecutive REST API requests (ping's). A smaller
# value here triggers more requests to ceph-mgr restful plugin. A
# higher value makes recovery slower when services become unavailable
self.restful_plugin_ping_delay_sec = 3
# where to save the self-signed certificate generated by ceph-mgr
self.restful_plugin_cert_path = os.path.join(
self.ceph_mgr_rundir, 'restful.crt')
# time to wait after enabling restful plugin
self.restful_plugin_grace_period_sec = 3
# after how many REST API ping failures to restart ceph-mgr
self.ping_fail_count_restart_mgr = 3
# after how many REST API ping failures to report status error.
# Until then service monitor reports status OK just in case
# restful plugin recovers
self.ping_fail_count_report_error = 5
# Number of days for ceph-mgr to be restarted to avoid possible
# memory overflow due to memory growth (-1 to disable)
self.ceph_mgr_lifecycle_days = 7
@staticmethod
def load():
return Config()
def setup_logging(name=None, cleanup_handlers=False):
if not name:
name = CONFIG.service_name
log = logging.getLogger(name)
log.setLevel(CONFIG.log_level)
if cleanup_handlers:
try:
for handler in log.handlers:
if isinstance(handler, logging.StreamHandler):
handler.flush()
if isinstance(handler, logging.FileHandler):
handler.close()
log.handlers = []
except Exception:
pass
elif log.handlers:
return log
handler = logging.FileHandler(
os.path.join(CONFIG.log_dir,
'{}.log'.format(CONFIG.service_name)))
handler.setFormatter(
logging.Formatter('%(asctime)s %(process)s %(levelname)s %(name)s %(message)s'))
log.addHandler(handler)
return log
CONFIG = Config.load()
LOG = setup_logging(name='init-wrapper')
class ServiceException(Exception):
"""Generic mgr-restful-plugin service exception.
Build exception string based on static (per exception class)
string plus args, keyword args passed to exception constructor.
"""
message = ""
def __init__(self, *args, **kwargs):
if "message" not in kwargs:
try:
message = self.message.format(*args, **kwargs)
except Exception: # noqa
message = '{}, args:{}, kwargs: {}'.format(
self.message, args, kwargs)
else:
message = kwargs["message"]
super(ServiceException, self).__init__(message)
class ServiceAlreadyStarted(ServiceException):
message = ('Service monitor already started')
class ServiceLockFailed(ServiceException):
message = ('Unable to lock service monitor: '
'reason={reason}')
class ServiceNoSocket(ServiceException):
message = ('Unable to create service monitor socket: '
'reason={reason}')
class ServiceSocketBindFailed(ServiceException):
message = ('Failed to bind service monitor socket: '
'path={path}, reason={reason}')
class ServiceNoPidFile(ServiceException):
message = ('Failed to update pid file: '
'path={path}, reason={reason}')
class CommandFailed(ServiceException):
message = ('Command failed: command={command}, '
'reason={reason}, out={out}')
class CommandTimeout(ServiceException):
message = ('Command timeout: command={command}, '
'timeout={timeout}')
class CephMgrStartFailed(ServiceException):
message = ('Failed to start ceph_mgr: '
'reason={reason}')
class CephRestfulPluginFailed(ServiceException):
message = ('Failed to start restful plugin: '
'reason={reason}')
class RestApiPingFailed(ServiceException):
message = ('REST API ping failed: '
'reason={reason}')
class ServiceMonitor(object):
"""Configure and monitor ceph-mgr and restful plugin (Ceph REST API)
1. process init script service requests: status, stop. Requests are
received via a control socket. Stop has priority over whatever
the monitor is doing currently. Any ceph command that may be running
is terminated/killed. Note that while ceph-mgr and restful plugin
configuration is in progress ServiceMonitor reports status OK to
avoid being restarted by SM.
2. configure ceph-mgr and mgr restful plugin: authentication, REST API
service port, self signed certificate. This runs as a separate
process so it can be stopped when init script requests it.
3. periodically check (ping) REST API responds to HTTPS requests.
Recovery actions are taken if REST API fails to respond: restart
ceph-mgr, wait for cluster to become available again.
"""
def __init__(self):
# process running configuration & REST API ping loop
self.monitor = None
# command socket used by init script
self.command = None
# ceph-mgr process
self.ceph_mgr = None
# date the ceph-mgr process was started
self.ceph_mgr_start_date = None
# consecutive ceph-mgr/restful-plugin start failures. Service monitor
# reports failure after CONFIG.ceph_mgr_max_failure_count
self.ceph_mgr_failure_count = 0
# consecutive REST API ping failures. ceph-mgr service is restarted
# after CONFIG.ping_fail_count_restart_mgr threshold is exceeded
self.ping_failure_count = 0
# REST API url reported by ceph-mgr after enabling restful plugin
self.restful_plugin_url = ''
# REST API self signed certificate generated by restful plugin
self.certificate = ''
def run(self):
self.disable_certificate_check()
with self.service_lock(), self.service_socket(), \
self.service_pid_file():
self.start_monitor()
self.server_loop()
def disable_certificate_check(self):
# ceph-mgr restful plugin is configured with a self-signed
# certificate. Certificate host is hard-coded to "ceph-restful"
# which causes HTTPS requests to fail because they don't
# match current host name ("controller-..."). Disable HTTPS
# certificates check in urllib3
LOG.warning('Disable urllib3 certifcates check')
requests.packages.urllib3.disable_warnings()
def server_loop(self):
self.command.listen(2)
while True:
try:
client, _ = self.command.accept()
request = client.recv(CONFIG.service_socket_bufsize)
LOG.debug('Monitor command socket: request=%s', str(request))
cmd = request.split(b' ')
cmd, args = cmd[0], cmd[1:]
if cmd == b'status':
self.send_response(client, request, self.status())
elif cmd == b'stop':
self.stop()
self.send_response(client, request, 'OK')
break
elif cmd == b'restful-url':
try:
self.restful_plugin_url = args[0]
self.send_response(client, request, 'OK')
except IndexError:
LOG.warning('Failed to update restful plugin url: '
'args=%s', str(args))
self.send_response(client, request, 'ERR')
elif cmd == b'certificate':
try:
self.certificate = args[0] if args else ''
self.send_response(client, request, 'OK')
except IndexError:
LOG.warning('Failed to update certificate path: '
'args=%s', str(args))
self.send_response(client, request, 'ERR')
elif cmd == b'ceph-mgr-failures':
try:
self.ceph_mgr_failure_count = int(args[0])
self.send_response(client, request, 'OK')
if self.ceph_mgr_failure_count >= CONFIG.ceph_mgr_fail_count_exit:
self.stop()
break
except (IndexError, ValueError):
LOG.warning('Failed to update ceph-mgr failures: '
'args=%s', str(args))
self.send_response(client, request, 'ERR')
elif cmd == b'ping-failures':
try:
self.ping_failure_count = int(args[0])
self.send_response(client, request, 'OK')
except (IndexError, ValueError):
LOG.warning('Failed to update ping failures: '
'args=%s', str(args))
self.send_response(client, request, 'ERR')
except Exception as err:
LOG.exception(err)
@staticmethod
def send_response(client, request, response):
try:
client.send(response.encode('utf-8'))
except socket.error as err:
LOG.warning('Failed to send response back. '
'request=%s, response=%s, reason=%s',
request, response, err)
def status(self):
if not self.restful_plugin_url:
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
LOG.debug('Monitor is starting services. Report status OK')
return 'OK'
LOG.debug('Too many failures: '
'ceph_mgr=%d < %d, ping=%d < %d. '
'Report status ERR',
self.ceph_mgr_failure_count,
CONFIG.ceph_mgr_fail_count_report_error,
self.ping_failure_count,
CONFIG.ping_fail_count_report_error)
return 'ERR.down'
try:
self.restful_plugin_ping()
LOG.debug('Restful plugin ping successful. Report status OK')
return 'OK'
except (CommandFailed, RestApiPingFailed):
if self.ceph_mgr_failure_count < CONFIG.ceph_mgr_fail_count_report_error \
and self.ping_failure_count < CONFIG.ping_fail_count_report_error:
LOG.info('Restful plugin does not respond but failure '
'count is within acceptable limits: '
' ceph_mgr=%d < %d, ping=%d < %d. '
'Report status OK',
self.ceph_mgr_failure_count,
CONFIG.ceph_mgr_fail_count_report_error,
self.ping_failure_count,
CONFIG.ping_fail_count_report_error)
return 'OK'
LOG.debug('Restful does not respond (ping failure count %d). '
'Report status ERR', self.ping_failure_count)
return 'ERR.ping_failed'
def stop(self):
if not self.monitor:
return
LOG.info('Stop monitor with SIGTERM to process group %d',
self.monitor.pid)
try:
os.killpg(self.monitor.pid, signal.SIGTERM)
except OSError as err:
LOG.info('Stop monitor failed: reason=%s', str(err))
return
time.sleep(CONFIG.ceph_mgr_kill_delay_sec)
LOG.info('Stop monitor with SIGKILL to process group %d',
self.monitor.pid)
try:
os.killpg(self.monitor.pid, signal.SIGKILL)
os.waitpid(self.monitor.pid, 0)
except OSError as err:
LOG.info('Stop monitor failed: reason=%s', str(err))
return
LOG.info('Monitor stopped: pid=%d', self.monitor.pid)
@contextlib.contextmanager
def service_lock(self):
LOG.info('Take service lock: path=%s', CONFIG.service_lock)
try:
os.makedirs(os.path.dirname(CONFIG.service_lock))
except OSError:
pass
lock_file = open(CONFIG.service_lock, 'w')
try:
fcntl.flock(lock_file.fileno(),
fcntl.LOCK_EX | fcntl.LOCK_NB)
except (IOError, OSError) as err:
if err.errno == errno.EAGAIN:
raise ServiceAlreadyStarted()
else:
raise ServiceLockFailed(reason=str(err))
# even if we have the lock here there might be another service manager
# running whose CONFIG.ceph_mgr_rundir was removed before starting
# this instance. Make sure there is only one service manager running
self.stop_other_service_managers()
try:
yield
finally:
os.unlink(CONFIG.service_lock)
lock_file.close()
LOG.info('Release service lock: path=%s', CONFIG.service_lock)
def stop_other_service_managers(self):
service = os.path.join('/etc/init.d', CONFIG.service_name)
for p in psutil.process_iter():
if p.cmdline()[:2] not in [[service], ['/usr/bin/python', service]]:
continue
if p.pid == os.getpid():
continue
p.kill()
@contextlib.contextmanager
def service_socket(self):
LOG.info('Create service socket')
try:
self.command = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
except socket.error as err:
raise ServiceNoSocket(reason=str(err))
LOG.info('Remove existing socket files')
try:
os.unlink(CONFIG.service_socket)
except OSError:
pass
LOG.info('Bind service socket: path=%s', CONFIG.service_socket)
try:
self.command.bind(CONFIG.service_socket)
except socket.error as err:
raise ServiceSocketBindFailed(
path=CONFIG.service_socket, reason=str(err))
try:
yield
finally:
LOG.info('Close service socket and remove file: path=%s',
CONFIG.service_socket)
self.command.close()
os.unlink(CONFIG.service_socket)
@contextlib.contextmanager
def service_pid_file(self):
LOG.info('Update service pid file: path=%s', CONFIG.service_pid_file)
try:
pid_file = open(CONFIG.service_pid_file, 'w')
pid_file.write(str(os.getpid()))
pid_file.flush()
except OSError as err:
raise ServiceNoPidFile(
path=CONFIG.service_pid_file, reason=str(err))
try:
yield
finally:
LOG.info('Remove service pid file: path=%s',
CONFIG.service_pid_file)
try:
os.unlink(CONFIG.service_pid_file)
except OSError:
pass
def start_monitor(self):
LOG.info('Start monitor loop')
self.monitor = multiprocessing.Process(target=self.monitor_loop)
self.monitor.start()
def stop_unmanaged_ceph_mgr(self):
LOG.info('Stop unmanaged running ceph-mgr processes')
service_name = os.path.basename(CONFIG.ceph_mgr_service)
if self.ceph_mgr:
psutil_terminate_kill(
[proc for proc in psutil.process_iter()
if (proc.name() == service_name
and proc.pid != self.ceph_mgr.pid)],
CONFIG.ceph_mgr_kill_delay_sec)
else:
psutil_terminate_kill(
[proc for proc in psutil.process_iter()
if proc.name() == service_name],
CONFIG.ceph_mgr_kill_delay_sec)
def monitor_loop(self):
"""Bring up and monitor ceph-mgr restful plugin.
Steps:
- wait for Ceph cluster to become available
- configure and start ceph-mgr
- configure and enable restful plugin
- send periodic requests to REST API
- recover from failures
Note: because this runs as a separate process it
must send status updates to service monitor
via control socket for: ping_failure_count,
restful_plugin_url and certificate.
"""
# Promote to process group leader so parent (service monitor)
# can kill the monitor plus processes spawned by it. Otherwise
# children of monitor_loop() will keep running in background and
# will be reaped by init when they finish but by then they might
# interfere with any new service instance.
os.setpgrp()
# Ignoring SIGTERM here ensures process group is not reused by
# the time parent (service monitor) issues the final SIGKILL.
signal.signal(signal.SIGTERM, signal.SIG_IGN)
while True:
try:
# steps to configure/start ceph-mgr and restful plugin
self.ceph_fsid_get()
self.ceph_mgr_auth_create()
self.restful_plugin_set_server_port()
self.restful_plugin_create_certificate()
self.ceph_mgr_start()
self.restful_plugin_enable()
self.restful_plugin_create_admin_key()
self.restful_plugin_get_url()
self.restful_plugin_get_certificate()
# REST API should be available now
# start making periodic requests (ping)
while True:
if self.ceph_mgr_lifecycle_days != -1 \
and self.ceph_mgr_uptime() >= self.ceph_mgr_lifecycle_days:
self.ceph_mgr_start_date = None
LOG.info("Restarting ceph-mgr to control RSS memory growth")
self.ceph_mgr_restart()
try:
self.restful_plugin_ping()
self.ping_failure_count = 0
self.request_update_ping_failures(
self.ping_failure_count)
self.ceph_mgr_failure_count = 0
self.request_update_ceph_mgr_failures(
self.ceph_mgr_failure_count)
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
continue
except RestApiPingFailed as err:
LOG.warning(str(err))
LOG.info('REST API ping failure count=%d',
self.ping_failure_count)
self.ping_failure_count += 1
self.request_update_ping_failures(
self.ping_failure_count)
# maybe request failed because ceph-mgr is not running
if not self.ceph_mgr_is_running():
self.ceph_mgr_failure_count += 1
self.request_update_ceph_mgr_failures(
self.ceph_mgr_failure_count)
self.ceph_mgr_start()
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
continue
# maybe request failed because cluster health is not ok
if not self.ceph_fsid_get():
LOG.info('Unable to get cluster fsid. '
'Sleep for a while')
time.sleep(CONFIG.cluster_grace_period_sec)
break
# too many failures? Restart ceph-mgr and go again
# through configuration steps
if (self.ping_failure_count
% CONFIG.ping_fail_count_restart_mgr == 0):
LOG.info('Too many consecutive REST API failures. '
'Restart ceph-mgr. Update service '
'url and certificate')
self.ceph_mgr_stop()
self.restful_plugin_url = ''
self.request_update_plugin_url(self.restful_plugin_url)
self.certificate = ''
self.request_update_certificate(self.certificate)
break
time.sleep(CONFIG.restful_plugin_ping_delay_sec)
except CommandFailed as err:
LOG.warning(str(err))
time.sleep(CONFIG.cluster_grace_period_sec)
except CommandTimeout as err:
LOG.warning(str(err))
except (CephMgrStartFailed, CephRestfulPluginFailed) as err:
LOG.warning(str(err))
self.ceph_mgr_failure_count += 1
self.request_update_ceph_mgr_failures(
self.ceph_mgr_failure_count)
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
except Exception as err:
LOG.exception(err)
time.sleep(CONFIG.cluster_grace_period_sec)
@staticmethod
def run_with_timeout(command, timeout, stderr=subprocess.STDOUT):
try:
LOG.info('Run command: %s', ' '.join(command))
return subprocess.check_output(
['/usr/bin/timeout', str(timeout)] + command,
stdin=subprocess.PIPE,
stderr=stderr, shell=False,
universal_newlines=True).strip()
except subprocess.CalledProcessError as err:
if err.returncode == GNU_TIMEOUT_EXPIRED_RETCODE:
raise CommandTimeout(command=err.cmd, timeout=timeout)
raise CommandFailed(command=err.cmd, reason=str(err),
out=err.output)
def ceph_fsid_get(self):
return self.run_with_timeout(['/usr/bin/ceph', 'fsid'],
CONFIG.ceph_cli_timeout_sec)
def ceph_mgr_has_auth(self):
path = '{}/ceph-{}'.format(
CONFIG.ceph_mgr_confdir, CONFIG.ceph_mgr_identity)
try:
os.makedirs(path)
except OSError as err:
pass
try:
self.run_with_timeout(
['/usr/bin/ceph', 'auth', 'get',
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
'-o', '{}/keyring'.format(path)],
CONFIG.ceph_cli_timeout_sec)
return True
except CommandFailed as err:
if 'ENOENT' in str(err):
return False
raise
def ceph_mgr_auth_create(self):
if self.ceph_mgr_has_auth():
return
LOG.info('Create ceph-mgr authentication')
self.run_with_timeout(
['/usr/bin/ceph', 'auth', 'get-or-create',
'mgr.{}'.format(CONFIG.ceph_mgr_identity),
'mon', 'allow *', 'osd', 'allow *'],
CONFIG.ceph_cli_timeout_sec)
def ceph_mgr_is_running(self):
if not self.ceph_mgr:
return None
try:
self.ceph_mgr.wait(timeout=0)
except psutil.TimeoutExpired:
return True
return False
def ceph_mgr_start(self):
if self.ceph_mgr_is_running():
return
self.stop_unmanaged_ceph_mgr()
LOG.info('Start ceph-mgr daemon')
try:
with open(os.devnull, 'wb') as null:
self.ceph_mgr = psutil.Popen(
[CONFIG.ceph_mgr_service,
'--cluster', CONFIG.ceph_mgr_cluster,
'--conf', CONFIG.ceph_mgr_config,
'--id', CONFIG.ceph_mgr_identity,
'-f'],
close_fds=True,
stdout=null,
stderr=null,
shell=False)
self.ceph_mgr_start_date = datetime.now()
except (OSError, ValueError) as err:
raise CephMgrStartFailed(reason=str(err))
time.sleep(CONFIG.ceph_mgr_grace_period_sec)
def ceph_mgr_stop(self):
if not self.ceph_mgr:
return
LOG.info('Stop ceph-mgr')
psutil_terminate_kill(self.ceph_mgr, CONFIG.ceph_mgr_kill_delay_sec)
def ceph_mgr_restart(self):
self.ceph_mgr_stop()
self.ceph_mgr_start()
def ceph_mgr_uptime(self):
if not self.ceph_mgr_start_date:
return 0
return (datetime.now() - self.ceph_mgr_start_date).days
def restful_plugin_has_server_port(self):
try:
with open(os.devnull, 'wb') as null:
out = self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'config/mgr/mgr/restful/server_port'],
CONFIG.ceph_cli_timeout_sec, stderr=null)
if out == str(CONFIG.restful_plugin_port):
return True
LOG.warning('Restful plugin port mismatch: '
'current=%d, expected=%d', out,
CONFIG.restful_plugin_port)
except CommandFailed as err:
LOG.warning('Failed to get restful plugin port: '
'reason=%s', str(err))
return False
def restful_plugin_set_server_port(self):
if self.restful_plugin_has_server_port():
return
LOG.info('Set restful plugin port=%d', CONFIG.restful_plugin_port)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'set',
'config/mgr/mgr/restful/server_port', str(CONFIG.restful_plugin_port)],
CONFIG.ceph_cli_timeout_sec)
def restful_plugin_has_admin_key(self):
try:
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'mgr/restful/keys/admin'],
CONFIG.ceph_cli_timeout_sec)
return True
except CommandFailed:
pass
return False
def restful_plugin_create_admin_key(self):
if self.restful_plugin_has_admin_key():
return
LOG.info('Create restful plugin admin key')
self.run_with_timeout(
['/usr/bin/ceph', 'restful',
'create-key', 'admin'],
CONFIG.ceph_cli_timeout_sec)
def restful_plugin_has_certificate(self):
try:
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'config/mgr/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'get',
'/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity)],
CONFIG.ceph_cli_timeout_sec)
return True
except CommandFailed:
pass
return False
def restful_plugin_create_certificate(self):
if self.restful_plugin_has_certificate():
return
LOG.info('Create restful plugin self signed certificate')
path = tempfile.mkdtemp()
try:
try:
with tempfile.NamedTemporaryFile() as restful_cnf:
restful_cnf.write((
'[req]\n'
'req_extensions = v3_ca\n'
'distinguished_name = req_distinguished_name\n'
'[v3_ca]\n'
'subjectAltName=DNS:{}\n'
'basicConstraints = CA:true\n'
'[ req_distinguished_name ]\n'
'0.organizationName = IT\n'
'commonName = ceph-restful\n').format(
CONFIG.ceph_mgr_identity).encode('utf-8'))
restful_cnf.flush()
subprocess.check_call([
'/usr/bin/openssl', 'req', '-new', '-nodes', '-x509',
'-subj', '/O=IT/CN=' + CONFIG.ceph_mgr_identity,
'-days', '3650',
'-config', restful_cnf.name,
'-out', os.path.join(path, 'crt'),
'-keyout', os.path.join(path, 'key'),
'-extensions', 'v3_ca'])
except subprocess.CalledProcessError as err:
raise CommandFailed(
command=' '.join(err.cmd),
reason='failed to generate self-signed certificate: {}'.format(str(err)),
out=err.output)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'set',
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
'-i', os.path.join(path, 'crt')],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'set',
'mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity),
'-i', os.path.join(path, 'crt')],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'set',
'config/mgr/mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
'-i', os.path.join(path, 'key')],
CONFIG.ceph_cli_timeout_sec)
self.run_with_timeout(
['/usr/bin/ceph', 'config-key', 'set',
'mgr/restful/{}/key'.format(CONFIG.ceph_mgr_identity),
'-i', os.path.join(path, 'key')],
CONFIG.ceph_cli_timeout_sec)
finally:
shutil.rmtree(path)
def restful_plugin_is_enabled(self):
command = ['/usr/bin/ceph', 'mgr', 'module', 'ls',
'--format', 'json']
with open(os.devnull, 'wb') as null:
out = self.run_with_timeout(
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
try:
if 'restful' in json.loads(out)['enabled_modules']:
return True
except ValueError as err:
raise CommandFailed(
command=' '.join(command),
reason='unable to decode json: {}'.format(err), out=out)
except KeyError as err:
raise CommandFailed(
command=' '.join(command),
reason='missing expected key: {}'.format(err), out=out)
return False
def restful_plugin_enable(self):
if not self.restful_plugin_is_enabled():
LOG.info('Enable restful plugin')
self.run_with_timeout(
['/usr/bin/ceph', 'mgr',
'module', 'enable', 'restful'],
CONFIG.ceph_cli_timeout_sec)
time.sleep(CONFIG.restful_plugin_grace_period_sec)
def restful_plugin_get_url(self):
command = ['/usr/bin/ceph', 'mgr', 'services',
'--format', 'json']
with open(os.devnull, 'wb') as null:
out = self.run_with_timeout(
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
try:
self.restful_plugin_url = json.loads(out)['restful']
except ValueError as err:
raise CephRestfulPluginFailed(
reason='unable to decode json: {} output={}'.format(err, out))
except KeyError as err:
raise CephRestfulPluginFailed(
reason='missing expected key: {} in ouput={}'.format(err, out))
self.request_update_plugin_url(self.restful_plugin_url)
def restful_plugin_get_certificate(self):
command = ['/usr/bin/ceph', 'config-key', 'get',
'config/mgr/mgr/restful/{}/crt'.format(CONFIG.ceph_mgr_identity)]
with open(os.devnull, 'wb') as null:
certificate = self.run_with_timeout(
command, CONFIG.ceph_cli_timeout_sec, stderr=null)
with open(CONFIG.restful_plugin_cert_path, 'w') as cert_file:
cert_file.write(certificate)
self.certificate = CONFIG.restful_plugin_cert_path
self.request_update_certificate(
self.certificate)
def restful_plugin_ping(self):
if not self.restful_plugin_url:
raise RestApiPingFailed(reason='missing service url')
if not self.certificate:
raise RestApiPingFailed(reason='missing certificate')
LOG.debug('Ping restful plugin: url=%d', self.restful_plugin_url)
try:
response = requests.request(
'GET', self.restful_plugin_url, verify=False,
timeout=CONFIG.rest_api_timeout_sec)
if not response.ok:
raise RestApiPingFailed(
reason='response not ok ({})'.format(response))
LOG.debug('Ping restful plugin OK')
except (requests.ConnectionError,
requests.Timeout,
requests.HTTPError) as err:
raise RestApiPingFailed(reason=str(err))
@staticmethod
def _make_client_socket():
sock = socket.socket(
socket.AF_UNIX, socket.SOCK_SEQPACKET)
sock.settimeout(2 * CONFIG.rest_api_timeout_sec)
sock.connect(CONFIG.service_socket)
return sock
@staticmethod
def request_status():
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send(b'status')
status = sock.recv(CONFIG.service_socket_bufsize)
LOG.debug('Status %s', status)
return status.startswith(b'OK')
except socket.error as err:
LOG.error('Status error: reason=%s', err)
return False
@staticmethod
def request_stop():
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send(b'stop')
response = sock.recv(CONFIG.service_socket_bufsize)
LOG.debug('Stop response: %s', response)
return True
except socket.error as err:
LOG.error('Stop error: reason=%s', err)
return False
@staticmethod
def request_update_ceph_mgr_failures(count):
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send('ceph-mgr-failures {}'.format(count).encode('utf-8'))
sock.recv(CONFIG.service_socket_bufsize)
return True
except socket.error as err:
LOG.error('Stop error: reason=%s', err)
return False
@staticmethod
def request_update_ping_failures(count):
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send('ping-failures {}'.format(count).encode('utf-8'))
sock.recv(CONFIG.service_socket_bufsize)
return True
except socket.error as err:
LOG.error('Stop error: reason=%s', err)
return False
@staticmethod
def request_update_plugin_url(url):
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send('restful-url {}'.format(url).encode('utf-8'))
sock.recv(CONFIG.service_socket_bufsize)
return True
except socket.error as err:
LOG.error('Stop error: reason=%s', err)
return False
@staticmethod
def request_update_certificate(path):
try:
with contextlib.closing(
ServiceMonitor._make_client_socket()) as sock:
sock.send('certificate {}'.format(path).encode('utf-8'))
sock.recv(CONFIG.service_socket_bufsize)
return True
except socket.error as err:
LOG.error('Stop error: reason=%s', err)
return False
class InitWrapper(object):
"""Handle System V init script actions: start, stop, restart, etc. """
def __init__(self):
"""Dispatch command line action to the corresponding function.
Candidate action functions are all class methods except ones
that start with an underscore.
"""
parser = argparse.ArgumentParser()
actions = [m[0]
for m in inspect.getmembers(self)
if (inspect.ismethod(m[1])
and not m[0].startswith('_'))]
parser.add_argument(
'action',
choices=actions)
self.args = parser.parse_args()
getattr(self, self.args.action)()
def start(self):
"""Start ServiceMonitor as a daemon unless one is already running.
Use a pipe to report monitor status back to this process.
"""
pipe = os.pipe()
child = os.fork()
if child == 0:
os.close(pipe[0])
with daemon.DaemonContext(files_preserve=[pipe[1]]):
# prevent duplication of messages in log
global LOG
LOG = setup_logging(cleanup_handlers=True)
try:
monitor = ServiceMonitor()
status = b'OK'
except ServiceAlreadyStarted:
os.write(pipe[1], b'OK')
os.close(pipe[1])
return
except Exception as err:
status = str(err)
os.write(pipe[1], status)
os.close(pipe[1])
if status == b'OK':
try:
monitor.run()
except ServiceException as err:
LOG.warning(str(err))
except Exception as err:
LOG.exception('Service monitor error: reason=%s', err)
else:
os.close(pipe[1])
try:
status = os.read(pipe[0], CONFIG.service_socket_bufsize)
if status == b'OK':
sys.exit(0)
else:
LOG.warning('Service monitor failed to start: '
'status=%s', status)
except IOError as err:
LOG.warning('Failed to read monitor status: reason=%s', err)
os.close(pipe[0])
os.waitpid(child, 0)
sys.exit(1)
def stop(self):
"""Tell ServiceMonitor daemon to stop running.
In case request fails stop ServiceMonitor and ceph_mgr proecsses
using SIGTERM followed by SIGKILL.
"""
result = ServiceMonitor.request_stop()
if not result:
ceph_mgr = os.path.basename(CONFIG.ceph_mgr_service)
procs = []
for proc in psutil.process_iter():
name = proc.name()
if name == CONFIG.service_name:
procs.append(proc)
if name == ceph_mgr:
procs.append(proc)
psutil_terminate_kill(procs, CONFIG.ceph_mgr_kill_delay_sec)
def restart(self):
self.stop()
self.start()
def force_reload(self):
self.stop()
self.start()
def reload(self):
self.stop()
self.start()
def status(self):
"""Report status from ServiceMonitor.
We don't just try to access REST API here because ServiceMonitor may
be in the process of starting/configuring ceph-mgr and restful
plugin in which case we report OK to avoid being restarted by SM.
"""
status = ServiceMonitor.request_status()
sys.exit(0 if status is True else 1)
if __name__ == '__main__':
InitWrapper()