diff --git a/sysinv/sysinv/sysinv/sysinv/common/constants.py b/sysinv/sysinv/sysinv/sysinv/common/constants.py index 2bcc4da0c3..ae5ee52949 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/constants.py +++ b/sysinv/sysinv/sysinv/sysinv/common/constants.py @@ -1713,6 +1713,17 @@ FLUXCD_RECOVERY_HELM_CHART_STATUS_ERRORS = [ 'failed to retrieve source:', 'chart pull error:' ] +# Actually beginning of errors, should be used with +# string.startswith(FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS[number]) +# We want to recover from these errors +FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS = [ + 'Helm upgrade failed: another operation (install/upgrade/rollback) is in progress' +] +FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS = [ + 'pending-install', + 'pending-upgrade', + 'pending-rollback' +] # State constants APP_NOT_PRESENT = 'missing' diff --git a/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py b/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py index 6a6e830b07..4ac02c9a5f 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py +++ b/sysinv/sysinv/sysinv/sysinv/common/kubernetes.py @@ -720,6 +720,76 @@ class KubeOperator(object): % (namespace, e)) raise + def get_transform_patch_custom_resource(self, group, version, namespace, + plural, name, transform, raise_error=True): + """ Apply a custom resource after it was transformed by a function + + :param group: Used by k8s API to determine resource + :param version: Used by k8s API to determine resource + :param namespace: Used by k8s API to determine resource + :param plural: Used by k8s API to determine resource + :param name: Used by k8s API to determine resource + :param transform: A function used to transform the resource + For example access the dictionary and change some + fields. + :param raise_error: Control the exception handling here. + If True, log an error and raise errors further. + If False, log a warning and return from function. + + :return: True if everything finished successfully. + False otherwise. + """ + kind = group + '/' + version + try: + custom_resource = self.get_custom_resource( + group, + version, + namespace, + plural, + name) + except Exception as err: + if raise_error: + LOG.error("Failed to get resource kind {}, name {}: {}" + "".format(kind, name, err)) + raise + else: + LOG.warning("Failed to get resource kind {}, name {}: {}" + "".format(kind, name, err)) + return False + + try: + transform(custom_resource) + except Exception as err: + if raise_error: + LOG.error("Failed to transform resource {} using {}: {}" + "".format(custom_resource, transform, err)) + raise + else: + LOG.warning("Failed to transform resource {} using {}: {}" + "".format(custom_resource, transform, err)) + return False + + try: + self.apply_custom_resource( + group, + version, + namespace, + plural, + name, + custom_resource + ) + except Exception as err: + if raise_error: + LOG.error("Failed to patch kind {}, name {}: {}" + "".format(kind, name, err)) + raise + else: + LOG.warning("Failed to patch kind {}, name {}: {}" + "".format(kind, name, err)) + return False + + return True + def kube_get_service_account(self, name, namespace): c = self._get_kubernetesclient_core() try: diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py b/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py index b26f2a6774..1a6d009a07 100644 --- a/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py +++ b/sysinv/sysinv/sysinv/sysinv/conductor/kube_app.py @@ -1667,6 +1667,16 @@ class AppOperator(object): @retry(retry_on_exception=lambda x: isinstance(x, exception.ApplicationApplyFailure), stop_max_attempt_number=5, wait_fixed=30 * 1000) def _make_fluxcd_operation_with_monitor(self, app, request): + def _patch_flux_suspend_false(resource): + """ Change resource.spec.suspend = False + """ + resource['spec']['suspend'] = False + + def _patch_flux_suspend_true(resource): + """ Change resource.spec.suspend = True + """ + resource['spec']['suspend'] = True + def _recover_from_failed_helm_chart_on_app_apply(metadata_name, namespace): """ Recovery logic for FluxCD on apply @@ -1728,99 +1738,140 @@ class AppOperator(object): "".format(helm_chart_resource['metadata']['name'], err)) return attempt, True - # Need to get the resource again + # Flip to spec.suspended to False from HelmChart try: - helm_chart_resource = self._kube.get_custom_resource( + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_CHART_GROUP, constants.FLUXCD_CRD_HELM_CHART_VERSION, namespace, constants.FLUXCD_CRD_HELM_CHART_PLURAL, - helm_chart_name) - except Exception as err: - LOG.warning("Failed to get HelmChart resource {}: {}" - "".format(helm_chart_name, err)) - return attempt, True - - # Flip to spec.suspended to False from HelmChart - try: - helm_chart_resource['spec']['suspend'] = False - group, version = helm_chart_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_chart_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_CHART_PLURAL, - helm_chart_resource['metadata']['name'], - helm_chart_resource + helm_chart_name, + _patch_flux_suspend_false ) - except Exception as err: - LOG.info("Failed to patch HelmChart resource {}: {}" - "".format(helm_chart_resource['metadata']['name'], err)) + except Exception: return attempt, True # Force HelmRelease reconciliation now, saves up to reconciliation # timeout for the specific resource. Same trigger as with HelmChart. - - # Flip to spec.suspended to True from HelmRelease try: - helm_release_resource = self._kube.get_custom_resource( + # Flip to spec.suspended to True from HelmRelease + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_REL_GROUP, constants.FLUXCD_CRD_HELM_REL_VERSION, namespace, constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_name) - except Exception as err: - LOG.warning("Failed to get HelmRelease resource {}: {}" - "".format(helm_release_name, err)) - return attempt, True - - try: - helm_release_resource['spec']['suspend'] = True - group, version = helm_release_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_release_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_resource['metadata']['name'], - helm_release_resource + helm_release_name, + _patch_flux_suspend_true ) - except Exception as err: - LOG.warning("Failed to patch HelmRelease resource {}: {}" - "".format(helm_release_resource['metadata']['name'], err)) - return attempt, True - # Flip to spec.suspended to False from HelmRelease - try: - helm_release_resource = self._kube.get_custom_resource( + # Flip to spec.suspended to False from HelmRelease + self._kube.get_transform_patch_custom_resource( constants.FLUXCD_CRD_HELM_REL_GROUP, constants.FLUXCD_CRD_HELM_REL_VERSION, namespace, constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_name) - except Exception as err: - LOG.warning("Failed to get HelmRelease resource {}: {}" - "".format(helm_release_name, err)) + helm_release_name, + _patch_flux_suspend_false + ) + except Exception: return attempt, True + return attempt, False + + def _recover_from_helm_operation_in_progress_on_app_apply(metadata_name, namespace, + flux_error_message): + """ Recovery logic for FluxCD on apply + + In case a helm operation is already in progress, FluxCD will raise + an error. Recover by patching the helm release secret, forcing + the status to be 'failed'. + + :param metadata_name: metadata name from helmrelease.yaml + :param namespace: namespace from kustomization.yaml + :param flux_error_message: Error message FluxCD encountered + + :return: tuple(attempt, error). + attempt is True if recovery is triggered + error is True if an error was encountered + """ + helm_release_name = metadata_name + attempt = False + + for error_string in constants.FLUXCD_RECOVERY_HELM_RELEASE_STATUS_ERRORS: + if flux_error_message.startswith(error_string): + LOG.info("For helm release {} found a matching error string " + "we can attempt to recover from: {}" + "".format(helm_release_name, error_string)) + attempt = True + break + + if not attempt: + return attempt, False + try: - helm_release_resource['spec']['suspend'] = False - group, version = helm_release_resource['apiVersion'].split('/') - self._kube.apply_custom_resource( - group, - version, - helm_release_resource['metadata']['namespace'], - constants.FLUXCD_CRD_HELM_REL_PLURAL, - helm_release_resource['metadata']['name'], - helm_release_resource - ) + secret_list = self._kube.kube_list_secret(namespace) except Exception as err: - LOG.warning("Failed to patch HelmRelease resource {}: {}" - "".format(helm_release_resource['metadata']['name'], err)) + LOG.warning("Failed to get secrets in namespace {}: {}" + "".format(namespace, err)) + return attempt, True + + recover_list = [] + for secret in secret_list: + label = secret.metadata.labels + if not label: + continue + if 'owner' not in label: + continue + if 'status' not in label: + continue + if label['owner'] == 'helm' and \ + label['status'] in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS: + LOG.info("Found helm release {} in state {}" + "".format(secret.metadata.name, label['status'])) + recover_list.append(secret) + + # Force 'failed' status for helm releases + for secret in recover_list: + release_data = helm_utils.decompress_helm_release_data(secret.data['release']) + + for status in constants.FLUXCD_RECOVERABLE_HELM_RELEASE_STATUS: + release_data = release_data.replace('"status":"{}"'.format(status), '"status":"failed"') + + release_data = helm_utils.compress_helm_release_data(release_data) + + secret.data['release'] = release_data + try: + self._kube.kube_patch_secret(secret.metadata.name, + secret.metadata.namespace, secret) + except Exception as err: + LOG.warning("Failed to patch secret {} in namespace {}: {}" + "".format(secret.metadata.name, + secret.metadata.namespace, err)) + return attempt, True + + # Force HelmRelease reconciliation now, saves up to reconciliation + # timeout for the specific resource. Flip suspend True, then False. + try: + self._kube.get_transform_patch_custom_resource( + constants.FLUXCD_CRD_HELM_REL_GROUP, + constants.FLUXCD_CRD_HELM_REL_VERSION, + namespace, + constants.FLUXCD_CRD_HELM_REL_PLURAL, + helm_release_name, + _patch_flux_suspend_true + ) + + self._kube.get_transform_patch_custom_resource( + constants.FLUXCD_CRD_HELM_REL_GROUP, + constants.FLUXCD_CRD_HELM_REL_VERSION, + namespace, + constants.FLUXCD_CRD_HELM_REL_PLURAL, + helm_release_name, + _patch_flux_suspend_false + ) + except Exception: return attempt, True - # TODO(dvoicule): What if we extract repeated get&patch operation to a generic - # get_patch(, lambda_func_transformation) return attempt, False def _check_progress(): @@ -1884,10 +1935,16 @@ class AppOperator(object): if release_status == "False": # If the helm release failed the app must also be in a # failed state - err_msg = ":{}".format(msg) if msg else "" - LOG.exception("Application {}: release {}: Failed during {} {}" - .format(app.name, release_name, request, err_msg)) - return False + err_msg = "{}".format(msg) if msg else "" + attempt, _ = _recover_from_helm_operation_in_progress_on_app_apply( + metadata_name=release_name, + namespace=chart_obj['namespace'], + flux_error_message=err_msg) + + if not attempt: + LOG.exception("Application {}: release {}: Failed during {} :{}" + "".format(app.name, release_name, request, err_msg)) + return False elif release_status == "True": # Special validation check needed for AIO-SX only, can # go away once upstream issues are addressed. See method diff --git a/sysinv/sysinv/sysinv/sysinv/helm/utils.py b/sysinv/sysinv/sysinv/sysinv/helm/utils.py index 5dbcd1c71a..c18fffb785 100644 --- a/sysinv/sysinv/sysinv/sysinv/helm/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/helm/utils.py @@ -1,6 +1,6 @@ # sim: tabstop=4 shiftwidth=4 softtabstop=4 # -# Copyright (c) 2019-2021 Wind River Systems, Inc. +# Copyright (c) 2019-2022 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -9,18 +9,21 @@ """Helm utilities and helper functions.""" +import base64 import os -from eventlet.green import subprocess -import ruamel.yaml as yaml -from oslo_log import log as logging -from sysinv.agent import rpcapi as agent_rpcapi -from sysinv.common import kubernetes -from sysinv.common import exception -from sysinv.openstack.common import context -import tempfile -import threading import psutil import retrying +import ruamel.yaml as yaml +import tempfile +import threading +import zlib + +from eventlet.green import subprocess +from oslo_log import log as logging +from sysinv.agent import rpcapi as agent_rpcapi +from sysinv.common import exception +from sysinv.common import kubernetes +from sysinv.openstack.common import context LOG = logging.getLogger(__name__) @@ -321,3 +324,41 @@ def install_helm_chart_with_dry_run(args=None): timer.cancel() os.remove(chartfile) os.rmdir(tmpdir) + + +def decompress_helm_release_data(release_data): + """ Convert release data to format for applying transformations + + :param release_data: Helm release secret data + Format is gzip double base64 encoded + :return: string + """ + release_data = base64.b64decode(release_data) + release_data = base64.b64decode(release_data) + # wbits value needs to specify 16 for gzip header/trailer plus window size. + # Window size needs to be at least the one used for compression + # this set the largest + release_data = zlib.decompress(release_data, wbits=16 + zlib.MAX_WBITS).decode('utf-8') + + return str(release_data) + + +def compress_helm_release_data(release_data): + """ Convert release data to format for storing in cluster + + :param release_data: Helm release secret data + :return: string + Format is gzip double base64 encoded + """ + # wbits value of 25 specifies the minimum window size + # and gzip header/trailer. + compressed_object = zlib.compressobj(wbits=25) + + release_data = compressed_object.compress(release_data.encode('utf-8')) + release_data += compressed_object.flush() + release_data = base64.b64encode(release_data) + release_data = base64.b64encode(release_data) + + release_data = release_data.decode('utf-8') + + return release_data