#!/usr/bin/python3 # -*- encoding: utf-8 -*- # # vim: tabstop=4 shiftwidth=4 softtabstop=4 # # Copyright (c) 2023-2024 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # """ Run platform upgrade deploy precheck as a standalone executable """ import argparse import os import re import requests import subprocess import sys import tempfile from lxml import etree as ElementTree import upgrade_utils # TODO(heitormatsui) keep updated for every release SUPPORTED_K8S_VERSIONS = [ "v1.24.4", "v1.25.3", "v1.26.1", "v1.27.5", "v1.28.4", ] class HealthCheck(object): """This class represents a general health check object that uses sysinv-client to run system health checks""" SUCCESS_MSG = 'OK' FAIL_MSG = 'Fail' def __init__(self, config): self._config = config # get target release from script directory location self._target_release = re.match("^.*/rel-(\d\d.\d\d.\d+)/", __file__).group(1) self._major_release = self._target_release.rsplit(".", 1)[0] # get sysinv token, endpoint and client self._sysinv_token, self._sysinv_endpoint = \ upgrade_utils.get_token_endpoint(config, service_type="platform") self._sysinv_client = upgrade_utils.get_sysinv_client(self._sysinv_token, self._sysinv_endpoint) # get usm token and endpoint self._software_token, self._software_endpoint = \ upgrade_utils.get_token_endpoint(config, service_type="usm") def _check_license(self, version): """ Validates the current license is valid for the specified version :param version: version to be checked against installed license :return: True is license is valid for version, False otherwise """ license_dict = self._sysinv_client.license.show() if license_dict["error"]: return False # create temp file with license content to run verify-license binary against it with tempfile.NamedTemporaryFile(mode="w", delete=True) as license_file: try: license_file.write(license_dict["content"]) subprocess.check_call(["/usr/bin/verify-license", # pylint: disable=not-callable license_file.name, version]) except subprocess.CalledProcessError: return False return True # TODO(heitormatsui): implement patch precheck targeted against USM # and implement patch precheck for subcloud def _check_deployed_state(self, required_patches): """ Checks if every patch in a list is in 'deployed' state :param required_patches: list of patches to be checked :return: boolean indicating success/failure and list of patches that are not in the 'deployed' state """ url = self._software_endpoint + '/query?show=deployed' headers = {"X-Auth-Token": self._software_token} response = requests.get(url, headers=headers, timeout=10) success = True if response.status_code != 200: print("Could not check required patches...") return False, required_patches applied_patches = list(response.json()["sd"].keys()) missing_patch = list(set(required_patches) - set(applied_patches)) if missing_patch: success = False return success, missing_patch def run_health_check(self): """Run general health check using sysinv client""" force = self._config.get("force", False) health_ok = success = True output = self._sysinv_client.health.get_kube_upgrade(args={}, relaxed=force) if HealthCheck.FAIL_MSG in output: success = False health_ok = health_ok and success # check installed license success = self._check_license(self._major_release) output += 'Installed license is valid: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success return health_ok, output class UpgradeHealthCheck(HealthCheck): """This class represents a upgrade-specific health check object that verifies if system is in a valid state for upgrade""" # TODO(heitormatsui): switch from using upgrade metadata xml to # the new USM metadata format def _check_valid_upgrade_path(self): """Checks if active release to specified release is a valid upgrade path""" # Get active release isystem = self._sysinv_client.isystem.list()[0] active_release = isystem.software_version # supported_release is a dict with {release: required_patch} supported_releases = dict() # Parse upgrade metadata file for supported upgrade paths root = ElementTree.parse("/var/www/pages/feed/rel-%s/upgrades/metadata.xml" % self._major_release) upgrade_root = root.find("supported_upgrades").findall("upgrade") for upgrade in upgrade_root: version = upgrade.find("version") required_patch = upgrade.find("required_patch") supported_releases.update({version.text: [required_patch.text] if required_patch is not None else []}) success = active_release in supported_releases return success, active_release, supported_releases.get(active_release, []) # TODO(heitormatsui) do we need this check on USM? Remove if we don't def _check_active_is_controller_0(self): """Checks that active controller is controller-0""" controllers = self._sysinv_client.ihost.list() for controller in controllers: if controller.hostname == "controller-0" and \ "Controller-Active" in controller.capabilities["Personality"]: return True return False def _check_kube_version(self, supported_versions): """ Check if active k8s version is in a list of supported versions :param supported_versions: list of supported k8s versions :return: boolean indicating success/failure and active k8s version """ kube_versions = self._sysinv_client.kube_version.list() active_version = None for kv in kube_versions: if kv.state == "active": active_version = kv.version break success = active_version in supported_versions return success, active_version def run_health_check(self): """Run specific upgrade health checks""" health_ok = True output = "" # check if it is a valid upgrade path success, active_release, required_patches = self._check_valid_upgrade_path() output += 'Valid upgrade path from release %s to %s: [%s]\n' \ % (active_release, self._major_release, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success # check if required patches are deployed success, missing_patches = self._check_deployed_state(required_patches) output += 'Required patches are applied: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) if not success: output += '-> Patches not applied: [%s]\n' \ % ', '.join(missing_patches) health_ok = health_ok and success # check if k8s version is valid success, active_version = self._check_kube_version(SUPPORTED_K8S_VERSIONS) output += 'Active kubernetes version [%s] is a valid supported version: [%s]\n' \ % (active_version, HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) if not active_version: output += ('-> Failed to get version info. Upgrade kubernetes to one of the ' 'supported versions [%s] and ensure that the kubernetes version ' 'information is available in the kubeadm configmap.\n' 'See "system kube-version-list"\n' % ", ".join(SUPPORTED_K8S_VERSIONS)) elif not success: output += ('-> Upgrade active kubernetes version [%s] to one of the ' 'supported versions [%s]. See "system kube-version-list"\n' % (active_version, ", ".join(SUPPORTED_K8S_VERSIONS))) health_ok = health_ok and success # TODO(heitormatsui) Do we need the following check on USM? # The load is only imported to controller-0. An upgrade can only # be started when controller-0 is active. is_controller_0 = self._check_active_is_controller_0() success = is_controller_0 output += \ 'Active controller is controller-0: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) health_ok = health_ok and success return health_ok, output class PatchHealthCheck(HealthCheck): """This class represents a patch-specific health check object that verifies if system is in valid state to apply a patch""" def _get_required_patches(self): """Get required patches for a target release""" url = self._software_endpoint + '/query' headers = {"X-Auth-Token": self._software_token} response = requests.get(url, headers=headers, timeout=10) if response.status_code != 200: print("Could not get required patches...") return [] required_patches = [] for release, values in response.json()["sd"].items(): if values["sw_version"] == self._target_release: required_patches.extend(values["requires"]) break return required_patches def run_health_check(self): """Run specific patch health checks""" health_ok = True output = "" # check required patches for target release required_patches = self._get_required_patches() success, missing_patches = self._check_deployed_state(required_patches) output += 'Required patches are applied: [%s]\n' \ % (HealthCheck.SUCCESS_MSG if success else HealthCheck.FAIL_MSG) if not success: output += '-> Patches not applied: [%s]\n' \ % ', '.join(missing_patches) health_ok = health_ok and success return health_ok, output def parse_config(args=None): """Parse the parameters passed to the script""" parser = argparse.ArgumentParser(description="Run health checks to verify if the system " "meets the requirements to deploy a specific " "release.") parser.add_argument("--auth_url", help="Authentication URL", required=True) parser.add_argument("--username", help="Username", required=True) parser.add_argument("--password", help="Password", required=True) parser.add_argument("--project_name", help="Project Name", required=True) parser.add_argument("--user_domain_name", help="User Domain Name", required=True) parser.add_argument("--project_domain_name", help="Project Domain Name", required=True) parser.add_argument("--region_name", help="Region Name", default="RegionOne") parser.add_argument("--force", help="Ignore non-critical health checks", action="store_true") parser.add_argument("--patch", help="Set precheck to run against a patch release", action="store_true") # if args was not passed will use sys.argv by default parsed_args = parser.parse_args(args) return vars(parsed_args) def main(argv=None): config = parse_config(argv) patch_release = config.get("patch", False) health_ok = True output = "" # execute general health check general_health_check = HealthCheck(config) general_health_ok, general_output = general_health_check.run_health_check() # execute release-specific health check if patch_release: specific_health_check = PatchHealthCheck(config) else: specific_health_check = UpgradeHealthCheck(config) specific_health_ok, specific_output = specific_health_check.run_health_check() # combine health check results removing extra line breaks/blank spaces from the output health_ok = general_health_ok and specific_health_ok output = general_output.strip() + "\n" + specific_output.strip() # print health check output and exit print(output) if health_ok: return 0 return 1 if __name__ == "__main__": sys.exit(main())