Merge "Kubernetes periodic audit for cluster health"

This commit is contained in:
Zuul 2024-02-27 06:33:01 +00:00 committed by Gerrit Code Review
commit e6610a898a
3 changed files with 113 additions and 1 deletions

View File

@ -1248,6 +1248,15 @@ DEFAULT_REGISTRIES_INFO = {
SERVICE_PARAM_SECTION_KUBERNETES_CONFIG = 'config'
SERVICE_PARAM_NAME_KUBERNETES_POD_MAX_PIDS = 'pod_max_pids'
SERVICE_PARAM_NAME_KUBERNETES_AUTOMATIC_RECOVERY = 'automatic_recovery'
# Kubernetes component endpoints for cluster audit
APISERVER_READYZ_ENDPOINT = "https://localhost:6443/readyz"
SCHEDULER_HEALTHZ_ENDPOINT = "https://127.0.0.1:10259/healthz"
CONTROLLER_MANAGER_HEALTHZ_ENDPOINT = "https://127.0.0.1:10257/healthz"
KUBELET_HEALTHZ_ENDPOINT = "http://localhost:10248/healthz"
healthz_endpoints = [APISERVER_READYZ_ENDPOINT, CONTROLLER_MANAGER_HEALTHZ_ENDPOINT,
SCHEDULER_HEALTHZ_ENDPOINT, KUBELET_HEALTHZ_ENDPOINT]
# Platform pods use under 20 in steady state, but allow extra room.
SERVICE_PARAM_KUBERNETES_POD_MAX_PIDS_MIN = 100
# Account for uncontrolled changes in applications (e.g. stx-openstack) by

View File

@ -166,7 +166,7 @@ KUBE_CONTROL_PLANE_ETCD_BACKUP_PATH = os.path.join(
def k8s_health_check(tries=20, try_sleep=5, timeout=5,
healthz_endpoint='https://localhost:6443/readyz'):
healthz_endpoint=constants.APISERVER_READYZ_ENDPOINT):
"""This checks k8s control-plane component health for a specified
endpoint, and waits for that endpoint to be up and running.
This checks the endpoint 'tries' times using a API connection

View File

@ -182,6 +182,7 @@ audit_intervals_opts = [
cfg.IntOpt('device_image_update', default=300),
cfg.IntOpt('kube_upgrade_states', default=1800),
cfg.IntOpt('prune_runtime_config', default=43200),
cfg.IntOpt('k8s_cluster_health', default=180),
]
CONF = cfg.CONF
@ -466,6 +467,9 @@ class ConductorManager(service.PeriodicService):
self._update_cached_app_bundles_set()
self._update_app_bundles_storage()
# Initialize alarms raised
self._intialize_alarms_raised()
# Initialize inotify and launch thread to monitor
# changes to the ostree root folder
self._initialize_ostree_inotify()
@ -7236,6 +7240,53 @@ class ConductorManager(service.PeriodicService):
if self._app:
self._app.audit_local_registry_secrets(context)
@periodic_task.periodic_task(spacing=CONF.conductor_periodic_task_intervals.k8s_cluster_health)
def _audit_kubernetes_cluster_health(self, context):
"""Audit kubernetes cluster health"""
if not cutils.is_initial_config_complete():
LOG.debug("_audit_kubernetes_cluster_health skip")
return
# Skip kubernetes nodes audit when K8S upgrade is in progress.
# The kube-apiserver will not be available during kube-upgrade-abort operation.
try:
self.verify_k8s_upgrade_not_in_progress()
except Exception:
LOG.info("k8s Upgrade in progress - _audit_kubernetes_cluster_health skip "
"activity")
return
if self._verify_restore_in_progress():
LOG.info("Restore in progress - _audit_kubernetes_cluster_health skip "
"activity")
return
LOG.debug("Starting kubernetes cluster audit")
try:
is_k8s_cluster_healthy = True
for endpoint in constants.healthz_endpoints:
is_k8s_cluster_healthy &= kubernetes.k8s_health_check(
tries=1, healthz_endpoint=endpoint)
LOG.debug("k8s_health_check for endpoint: %s=%s" % (endpoint,
is_k8s_cluster_healthy))
if not is_k8s_cluster_healthy:
LOG.error("k8s_health_check for endpoint: %s=%s" % (endpoint,
is_k8s_cluster_healthy))
break
except Exception:
LOG.warn("Unable to fetch Kubernetes nodes status - _audit_kubernetes_nodes")
if is_k8s_cluster_healthy:
if self._is_tracked_alarm(fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN):
self._update_k8s_cluster_alarm(fm_constants.FM_ALARM_STATE_CLEAR)
self._clear_tracked_alarm(fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN)
else:
LOG.debug("Kubernetes health check failed")
reason_text = "Kubernetes health check failed"
if not self._is_tracked_alarm(fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN):
self._update_k8s_cluster_alarm(fm_constants.FM_ALARM_STATE_SET, reason_text)
self._set_tracked_alarm(fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN)
@periodic_task.periodic_task(spacing=CONF.conductor_periodic_task_intervals.kubernetes_labels)
def _audit_kubernetes_labels(self, context):
if not cutils.is_initial_config_complete():
@ -9905,6 +9956,41 @@ class ConductorManager(service.PeriodicService):
'task': str(tasks)}
self.dbapi.storage_ceph_rook_update(sb_uuid, values)
def _is_tracked_alarm(self, alarm_id):
""" Check _alarm_raised status of specific alarm_id"""
return self._alarms_raised[alarm_id]
def _set_tracked_alarm(self, alarm_id):
""" Set _alarm_raised of specific alarm_id to True"""
self._alarms_raised[alarm_id] = True
def _clear_tracked_alarm(self, alarm_id):
""" Set _alarm_raised of specific alarm_id to False"""
self._alarms_raised[alarm_id] = False
def _update_k8s_cluster_alarm(self, alarm_state, reason_text=None):
""" Raise/clear k8s cluster health alarm"""
entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_K8S,
"k8s-health-check-failed")
if alarm_state == fm_constants.FM_ALARM_STATE_SET:
fault = fm_api.Fault(
alarm_id=fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN,
alarm_state=alarm_state,
entity_type_id=fm_constants.FM_ENTITY_TYPE_K8S,
entity_instance_id=entity_instance_id,
severity=fm_constants.FM_ALARM_SEVERITY_CRITICAL,
reason_text=reason_text,
alarm_type=fm_constants.FM_ALARM_TYPE_1,
probable_cause=fm_constants.ALARM_PROBABLE_CAUSE_63,
proposed_repair_action=_("If problem persists, "
"contact next level of support."),
service_affecting=True)
self.fm_api.set_fault(fault)
else:
self.fm_api.clear_fault(fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN,
entity_instance_id)
def _update_image_conversion_alarm(self, alarm_state, fs_name, reason_text=None):
""" Raise conversion configuration alarm"""
entity_instance_id = "%s=%s" % (fm_constants.FM_ENTITY_TYPE_IMAGE_CONVERSION,
@ -15990,6 +16076,23 @@ class ConductorManager(service.PeriodicService):
actions_list = list(self._backup_action_map.keys())
self._backup_actions_log = dict(zip(actions_list, [OrderedDict()] * len(actions_list)))
def _intialize_alarms_raised(self):
"""
This initializes a dictionary of boolean for specific {alarm_id: alarm_raised}.
The current state of each alarm_id is obtained at the program start.
"""
self._alarms_raised = {}
# Dictionary of alarms to be initialised, with 'entity instance id' as value
# Ex alarms = { alarm1_id: alarm1_entity_instance_id,
# alarm2_id: alarm2_entity_instance_id}
alarms = {fm_constants.FM_ALARM_ID_K8S_CLUSTER_DOWN:
'{}={}'.format(fm_constants.FM_ENTITY_TYPE_K8S, "k8s-health-check-failed")
}
for alarm_id, entity_instance_id in alarms.items():
alarm = self.fm_api.get_fault(alarm_id, entity_instance_id)
self._alarms_raised[alarm_id] = True if alarm else False
def _revert_backup_operation(self, operation):
if operation not in self._backup_actions_log:
raise exception.BackupRestoreInvalidRevertOperation(operation=operation)