#!/usr/bin/python3 # -*- encoding: utf-8 -*- # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # # Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # """ Run platform upgrade deploy precheck as a standalone executable """ import argparse import os import re import requests import subprocess import sys import tempfile from lxml import etree as ElementTree import upgrade_utils # TODO(heitormatsui) keep updated for every release SUPPORTED_K8S_VERSIONS = [ "v1.24.4", "v1.25.3", "v1.26.1", "v1.27.5", "v1.28.4", ] class HealthCheck(object): """This class represents a general health check object that uses sysinv-client to run system health checks""" SUCCESS_MSG = 'OK' FAIL_MSG = 'Fail' def __init__(self, config): self._config = config # get sysinv token, endpoint and client self._sysinv_token, self._sysinv_endpoint = \ upgrade_utils.get_token_endpoint(config, service_type="platform") self._sysinv_client = upgrade_utils.get_sysinv_client(self._sysinv_token, self._sysinv_endpoint) # get usm token and endpoint self._software_token, self._software_endpoint = \ upgrade_utils.get_token_endpoint(config, service_type="usm") def run_health_check(self): """Run general health check using sysinv client""" force = self._config.get("force", False) output = self._sysinv_client.health.get_kube_upgrade(args={}, relaxed=force) if HealthCheck.FAIL_MSG in output: return False, output return True, output class UpgradeHealthCheck(HealthCheck): """This class represents a upgrade-specific health check object that verifies if system is in a valid state for upgrade""" def _check_valid_upgrade_path(self): """Checks if active release to specified release is a valid upgrade path""" # Get active release isystem = self._sysinv_client.isystem.list()[0] active_release = isystem.software_version # supported_release is a dict with {release: required_patch} supported_releases = dict() # Parse upgrade metadata file for supported upgrade paths root = ElementTree.parse("%s/../metadata.xml" % os.path.dirname(__file__)) upgrade_root = root.find("supported_upgrades").findall("upgrade") for upgrade in upgrade_root: version = upgrade.find("version") required_patch = upgrade.find("required_patch") supported_releases.update({version.text: required_patch.text if required_patch is not None else None}) success = active_release in supported_releases return success, active_release, supported_releases.get(active_release, None) # TODO(heitormatsui): implement patch precheck targeted against USM # and implement patch precheck for subcloud def _check_required_patch(self, required_patch): """Checks if required patch for the supported release is installed""" url = self._software_endpoint + '/query?show=deployed' headers = {"X-Auth-Token": self._software_token} response = requests.get(url, headers=headers, timeout=10) success = True required_patch = [required_patch] if required_patch else [] if response.status_code != 200: print("Could not check required patches...") return False, required_patch applied_patches = list(response.json()["sd"].keys()) missing_patch = list(set(required_patch) - set(applied_patches)) if missing_patch: success = False return success, missing_patch # TODO(heitormatsui) do we need this check on USM? Remove if we don't def _check_active_is_controller_0(self): """Checks that active controller is controller-0""" controllers = self._sysinv_client.ihost.list() for controller in controllers: if controller.hostname == "controller-0" and \ "Controller-Active" in controller.capabilities["Personality"]: return True return False def _check_license(self, version): """Validates the current license is valid for the specified version""" license_dict = self._sysinv_client.license.show() if license_dict["error"]: return False # create temp file with license content to run verify-license binary against it with tempfile.NamedTemporaryFile(mode="w", delete=True) as license_file: try: license_file.write(license_dict["content"]) subprocess.check_call(["/usr/bin/verify-license", # pylint: disable=not-callable license_file.name, version]) except subprocess.CalledProcessError: return False return True def _check_kube_version(self, supported_versions): """Check if active k8s version is in a list of supported versions :param supported_versions: list of supported k8s versions """ kube_versions = self._sysinv_client.kube_version.list() active_version = None for kv in kube_versions: if kv.state == "active": active_version = kv.version break success = active_version in supported_versions return success, active_version def run_health_check(self): """Run specific upgrade health checks""" health_ok = True output = "" # get target release from script directory location upgrade_release = re.match("^.*/rel-(\d\d.\d\d.\d*)/", __file__).group(1) # check installed license success = self._check_license(upgrade_release) output += 'License valid for upgrade: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success # check if it is a valid upgrade path success, active_release, required_patch = self._check_valid_upgrade_path() output += 'Valid upgrade path from release %s to %s: [%s]\n' \ % (active_release, upgrade_release, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success # check if required patches are applied/committed if is a valid upgrade path if success: success, missing_patches = self._check_required_patch(required_patch) output += 'Required patches are applied: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) if not success: output += 'Patches not applied: %s\n' \ % ', '.join(missing_patches) health_ok = health_ok and success else: output += 'Invalid upgrade path, skipping required patches check...' # check if k8s version is valid success, active_version = self._check_kube_version(SUPPORTED_K8S_VERSIONS) output += 'Active kubernetes version [%s] is a valid supported version: [%s]\n' \ % (active_version, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) if not active_version: output += ('-> Failed to get version info. Upgrade kubernetes to one of the ' 'supported versions [%s] and ensure that the kubernetes version ' 'information is available in the kubeadm configmap.\n' 'See "system kube-version-list"\n' % ", ".join(SUPPORTED_K8S_VERSIONS)) elif not success: output += ('-> Upgrade active kubernetes version [%s] to one of the ' 'supported versions [%s]. See "system kube-version-list"\n' % (active_version, ", ".join(SUPPORTED_K8S_VERSIONS))) health_ok = health_ok and success # TODO(heitormatsui) Do we need the following check on USM? # The load is only imported to controller-0. An upgrade can only # be started when controller-0 is active. is_controller_0 = self._check_active_is_controller_0() success = is_controller_0 output += \ 'Active controller is controller-0: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success return health_ok, output def parse_config(args=None): """Parse the parameters passed to the script""" parser = argparse.ArgumentParser(description="Run health checks to verify if the system " "meets the requirements to deploy a specific " "release.") parser.add_argument("--auth_url", help="Authentication URL", required=True) parser.add_argument("--username", help="Username", required=True) parser.add_argument("--password", help="Password", required=True) parser.add_argument("--project_name", help="Project Name", required=True) parser.add_argument("--user_domain_name", help="User Domain Name", required=True) parser.add_argument("--project_domain_name", help="Project Domain Name", required=True) parser.add_argument("--region_name", help="Region Name", default="RegionOne") parser.add_argument("--force", help="Ignore non-critical health checks", action="store_true") # if args was not passed will use sys.argv by default parsed_args = parser.parse_args(args) return vars(parsed_args) def main(argv=None): config = parse_config(argv) general_health_check = HealthCheck(config) upgrade_health_check = UpgradeHealthCheck(config) # execute general health check general_health_ok, general_output = general_health_check.run_health_check() # execute upgrade-specific health check upgrade_health_ok, upgrade_output = upgrade_health_check.run_health_check() # combine health check results removing extra line breaks/blank spaces from the output health_ok = general_health_ok and upgrade_health_ok output = general_output.strip() + "\n" + upgrade_output.strip() # print health check output and exit print(output) if health_ok: return 0 return 1 if __name__ == "__main__": sys.exit(main())