diff --git a/controllerconfig/controllerconfig/upgrade-scripts/65-k8s-app-upgrade.sh b/controllerconfig/controllerconfig/upgrade-scripts/65-k8s-app-upgrade.sh index 968c42bc8d..c3f28b6929 100644 --- a/controllerconfig/controllerconfig/upgrade-scripts/65-k8s-app-upgrade.sh +++ b/controllerconfig/controllerconfig/upgrade-scripts/65-k8s-app-upgrade.sh @@ -137,8 +137,12 @@ if [ "$ACTION" == "activate" ]; then sleep $RECOVER_RESULT_SLEEP done + # Sort applications by version. Lower versions are attempted first. + APPS_SORTED_BY_VERSION=$(find $PLATFORM_APPLICATION_PATH/* | sort -V) + + LAST_APP_CHECKED="" # Get the list of applications installed in the new release - for fqpn_app in $PLATFORM_APPLICATION_PATH/*; do + for fqpn_app in $APPS_SORTED_BY_VERSION; do # Extract the app name and version from the tarball name: app_name-version.tgz re='^(.*)-([0-9]+\.[0-9]+-[0-9]+).tgz' [[ "$(basename $fqpn_app)" =~ $re ]] @@ -146,6 +150,18 @@ if [ "$ACTION" == "activate" ]; then UPGRADE_APP_VERSION=${BASH_REMATCH[2]} log "$NAME: Found application ${UPGRADE_APP_NAME}, version ${UPGRADE_APP_VERSION} at $fqpn_app" + # Confirm application is loaded. + EXISTING_APP_NAME=$(system application-show $UPGRADE_APP_NAME --column name --format value) + if [ -z "${EXISTING_APP_NAME}" ]; then + log "$NAME: ${UPGRADE_APP_NAME} is currently not uploaded in the system. skipping..." + continue + fi + + # If the last iteration for the same app was sucessful no further updates are necessary + if [ "${LAST_APP_CHECKED}" == "${UPGRADE_APP_NAME}" ] && [[ "${EXISTING_APP_STATUS}" =~ ^(uploaded|applied)$ ]]; then + continue + fi + # Confirm application is upgradable # TODO: move nginx back to the supported platform applications list when # fluxcd application upgrade is supported @@ -156,13 +172,6 @@ if [ "$ACTION" == "activate" ]; then continue fi - # Confirm application is loaded. - EXISTING_APP_NAME=$(system application-show $UPGRADE_APP_NAME --column name --format value) - if [ -z "${EXISTING_APP_NAME}" ]; then - log "$NAME: ${UPGRADE_APP_NAME} is currently not uploaded in the system. skipping..." - continue - fi - # Get the existing application details EXISTING_APP_INFO=$(system application-show $EXISTING_APP_NAME --column app_version --column status --format yaml) EXISTING_APP_VERSION=$(echo ${EXISTING_APP_INFO} | sed 's/.*app_version:[[:space:]]\(\S*\).*/\1/') @@ -174,7 +183,7 @@ if [ "$ACTION" == "activate" ]; then # If the app is in uploaded or applied state, then we continue with next iteration. # Else, the code execution proceeds and the script would exit with an unexpected state. if [[ "${EXISTING_APP_STATUS}" =~ ^(uploaded|applied)$ ]]; then - log "$NAME: ${UPGRADE_APP_NAME}, version ${EXISTING_APP_VERSION}, is already present. skipping..." + log "$NAME: ${UPGRADE_APP_NAME}, version ${EXISTING_APP_VERSION}, is already present. Skipping..." continue fi fi @@ -242,6 +251,8 @@ if [ "$ACTION" == "activate" ]; then if ! grep -q "${EXISTING_APP_NAME},${EXISTING_APP_VERSION},${UPGRADE_APP_VERSION}" $UPGRADE_IN_PROGRESS_APPS_FILE; then echo "${EXISTING_APP_NAME},${EXISTING_APP_VERSION},${UPGRADE_APP_VERSION}" >> $UPGRADE_IN_PROGRESS_APPS_FILE fi + + LAST_APP_CHECKED=${UPGRADE_APP_NAME} done log "$NAME: Completed Kubernetes application updates for release $FROM_RELEASE to $TO_RELEASE with action $ACTION" diff --git a/sysinv/sysinv/sysinv/sysinv/common/app_metadata.py b/sysinv/sysinv/sysinv/sysinv/common/app_metadata.py index 45e6209d40..e0aab54a11 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/app_metadata.py +++ b/sysinv/sysinv/sysinv/sysinv/common/app_metadata.py @@ -10,16 +10,18 @@ import io import glob import os +import ruamel.yaml import shutil import six +import tarfile import tempfile import yaml from oslo_log import log as logging - from sysinv._i18n import _ from sysinv.common import constants from sysinv.common import exception +from sysinv.common import kubernetes from sysinv.common import utils LOG = logging.getLogger(__name__) @@ -562,3 +564,78 @@ def verify_application(path: str) -> bool: "metadata verification failed. {}".format(e))) return is_verified + + +def extract_bundle_metadata(file_path): + """Extract metadata from a given tarball + + :param file_path: Application bundle file path + """ + + try: + tarball = tarfile.open(file_path) + metadata_yaml_path = "./{}".format(constants.APP_METADATA_FILE) + tarball.getmember(metadata_yaml_path) + + with tarball.extractfile(metadata_yaml_path) as metadata_file: + metadata = ruamel.yaml.load(metadata_file, + Loader=ruamel.yaml.RoundTripLoader, + preserve_quotes=True) + + minimum_supported_k8s_version = metadata.get( + constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get( + constants.APP_METADATA_MINIMUM, None) + + if minimum_supported_k8s_version is None: + # TODO(ipiresso): Turn this into an error message rather than + # a warning when the k8s app upgrade implementation is in place + # and remove the hardcoded value. Also, do not add the bundle to + # the database in this scenario. + LOG.warning("Minimum supported Kubernetes version missing from {}" + .format(file_path)) + minimum_supported_k8s_version = kubernetes.get_kube_versions()[0]['version'] + + minimum_supported_k8s_version = minimum_supported_k8s_version.strip().lstrip('v') + + maximum_supported_k8s_version = metadata.get( + constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get( + constants.APP_METADATA_MAXIMUM, None) + + if maximum_supported_k8s_version is not None: + maximum_supported_k8s_version = maximum_supported_k8s_version.strip().lstrip('v') + + k8s_upgrades = metadata.get(constants.APP_METADATA_K8S_UPGRADES, None) + if k8s_upgrades is None: + k8s_auto_update = constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE + k8s_update_timing = constants.APP_METADATA_TIMING_DEFAULT_VALUE + LOG.warning("k8s_upgrades section missing from {} metadata" + .format(file_path)) + else: + k8s_auto_update = tarball.metadata.get( + constants.APP_METADATA_K8S_UPGRADES).get( + constants.APP_METADATA_AUTO_UPDATE, + constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE) + k8s_update_timing = tarball.metadata.get( + constants.APP_METADATA_K8S_UPGRADES).get( + constants.APP_METADATA_TIMING, + constants.APP_METADATA_TIMING_DEFAULT_VALUE) + + bundle_data = { + 'name': metadata.get(constants.APP_METADATA_NAME), + 'version': metadata.get(constants.APP_METADATA_VERSION), + 'file_path': file_path, + 'auto_update': + metadata.get(constants.APP_METADATA_UPGRADES, {}).get( + constants.APP_METADATA_AUTO_UPDATE, + constants.APP_METADATA_AUTO_UPDATE_DEFAULT_VALUE), + 'k8s_auto_update': k8s_auto_update, + 'k8s_timing': k8s_update_timing, + 'k8s_minimum_version': minimum_supported_k8s_version, + 'k8s_maximum_version': maximum_supported_k8s_version + } + + return bundle_data + except KeyError: + LOG.warning("Application bundle {} does not contain a metadata file.".format(file_path)) + except Exception as e: + LOG.exception(e) diff --git a/sysinv/sysinv/sysinv/sysinv/common/constants.py b/sysinv/sysinv/sysinv/sysinv/common/constants.py index e0b3fbb3ce..be838f4a9b 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/constants.py +++ b/sysinv/sysinv/sysinv/sysinv/common/constants.py @@ -1813,7 +1813,8 @@ APP_PENDING_REAPPLY_FLAG = os.path.join( # FluxCD APP_FLUXCD_MANIFEST_DIR = 'fluxcd-manifests' -APP_FLUXCD_DATA_PATH = os.path.join(tsc.PLATFORM_PATH, 'fluxcd', tsc.SW_VERSION) +APP_FLUXCD_BASE_PATH = os.path.join(tsc.PLATFORM_PATH, 'fluxcd') +APP_FLUXCD_DATA_PATH = os.path.join(APP_FLUXCD_BASE_PATH, tsc.SW_VERSION) APP_ROOT_KUSTOMIZE_FILE = 'kustomization.yaml' APP_HELMREPOSITORY_FILE = "helmrepository.yaml" APP_BASE_HELMREPOSITORY_FILE = os.path.join("base", APP_HELMREPOSITORY_FILE) @@ -2399,6 +2400,13 @@ OS_DEBIAN = 'debian' SUPPORTED_OS_TYPES = [OS_CENTOS, OS_DEBIAN] OS_UPGRADE_FEED_FOLDER = '/var/www/pages/feed/' +# OSTree +OSTREE_ROOT_FOLDER = '/sysroot/ostree/' +OSTREE_LOCK_FILE = 'lock' + +# INotify +INOTIFY_DELETE_EVENT = 'DELETE' + # Configuration support placeholders CONFIGURABLE = 'configurable' NOT_CONFIGURABLE = 'not-configurable' diff --git a/sysinv/sysinv/sysinv/sysinv/common/inotify.py b/sysinv/sysinv/sysinv/sysinv/common/inotify.py new file mode 100644 index 0000000000..968e63ae8e --- /dev/null +++ b/sysinv/sysinv/sysinv/sysinv/common/inotify.py @@ -0,0 +1,274 @@ +# Based on inotify_simple: https://github.com/chrisjbillington/inotify_simple +# Licensed under the BSD 2-Clause "Simplified" License +# +# Copyright (c) 2016, Chris Billington +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided wi6h the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Copyright (c) 2024 Wind River Systems, Inc. +# + +import errno +import os + +from collections import namedtuple +from ctypes import CDLL +from ctypes import get_errno +from ctypes import c_int +from ctypes.util import find_library +from enum import IntEnum +from errno import EINTR +from fcntl import ioctl +from io import FileIO +from os import fsencode +from os import fsdecode +from select import select +from struct import calcsize +from struct import unpack_from +from termios import FIONREAD +from time import sleep + +__version__ = '1.3.5' + +__all__ = ['Event', 'INotify', 'flags', 'masks', 'parse_events'] + +_libc = None + + +def _libc_call(function, *args): + """Wrapper which raises errors and retries on EINTR.""" + while True: + rc = function(*args) + if rc != -1: + return rc + err = get_errno() + if err != EINTR: + raise OSError(errno, os.strerror(errno)) + + +#: A ``namedtuple`` (wd, mask, cookie, name) for an inotify event. The +#: :attr:`~inotify_simple.Event.name` field is a ``str`` decoded with +#: ``os.fsdecode()``. +Event = namedtuple('Event', ['wd', 'mask', 'cookie', 'name']) + +_EVENT_FMT = 'iIII' +_EVENT_SIZE = calcsize(_EVENT_FMT) + + +class INotify(FileIO): + + #: The inotify file descriptor returned by ``inotify_init()``. You are + #: free to use it directly with ``os.read`` if you'd prefer not to call + #: :func:`~inotify_simple.INotify.read` for some reason. Also available as + #: :func:`~inotify_simple.INotify.fileno` + fd = property(FileIO.fileno) + + def __init__(self, inheritable=False, nonblocking=False): + """File-like object wrapping ``inotify_init1()``. Raises ``OSError`` on failure. + :func:`~inotify_simple.INotify.close` should be called when no longer needed. + Can be used as a context manager to ensure it is closed, and can be used + directly by functions expecting a file-like object, such as ``select``, or with + functions expecting a file descriptor via + :func:`~inotify_simple.INotify.fileno`. + + Args: + inheritable (bool): whether the inotify file descriptor will be inherited by + child processes. The default,``False``, corresponds to passing the + ``IN_CLOEXEC`` flag to ``inotify_init1()``. Setting this flag when + opening filedescriptors is the default behaviour of Python standard + library functions since PEP 446. On Python < 3.3, the file descriptor + will be inheritable and this argument has no effect, one must instead + use fcntl to set FD_CLOEXEC to make it non-inheritable. + + nonblocking (bool): whether to open the inotify file descriptor in + nonblocking mode, corresponding to passing the ``IN_NONBLOCK`` flag to + ``inotify_init1()``. This does not affect the normal behaviour of + :func:`~inotify_simple.INotify.read`, which uses ``poll()`` to control + blocking behaviour according to the given timeout, but will cause other + reads of the file descriptor (for example if the application reads data + manually with ``os.read(fd)``) to raise ``BlockingIOError`` if no data + is available.""" + try: + libc_so = find_library('c') + except RuntimeError: # Python on Synology NASs raises a RuntimeError + libc_so = None + global _libc + _libc = _libc or CDLL(libc_so or 'libc.so.6', use_errno=True) + O_CLOEXEC = getattr(os, 'O_CLOEXEC', 0) # Only defined in Python 3.3+ + flags = (not inheritable) * O_CLOEXEC | bool(nonblocking) * os.O_NONBLOCK + FileIO.__init__(self, _libc_call(_libc.inotify_init1, flags), mode='rb') + + def add_watch(self, path, mask): + """Wrapper around ``inotify_add_watch()``. Returns the watch + descriptor or raises an ``OSError`` on failure. + + Args: + path (str, bytes, or PathLike): The path to watch. Will be encoded with + ``os.fsencode()`` before being passed to ``inotify_add_watch()``. + + mask (int): The mask of events to watch for. Can be constructed by + bitwise-ORing :class:`~inotify_simple.flags` together. + + Returns: + int: watch descriptor""" + # Explicit conversion of Path to str required on Python < 3.6 + + path = str(path) if hasattr(path, 'parts') else path + return _libc_call(_libc.inotify_add_watch, self.fileno(), fsencode(path), mask) + + def rm_watch(self, wd): + """Wrapper around ``inotify_rm_watch()``. Raises ``OSError`` on failure. + + Args: + wd (int): The watch descriptor to remove""" + + _libc_call(_libc.inotify_rm_watch, self.fileno(), wd) + + def poll(self): + """Wait for I/O completion""" + + while True: + try: + read_fs, _, _ = select([self.fileno()], [], []) + break + except select.error as err: + if err.errno == errno.EINTR: + break + else: + raise + + return bool(read_fs) + + def read(self, timeout=None, read_delay=None): + """Read the inotify file descriptor and return the resulting + :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). + + Args: + timeout (int): The time in milliseconds to wait for events if there are + none. If negative or ``None``, block until there are events. If zero, + return immediately if there are no events to be read. + + read_delay (int): If there are no events immediately available for reading, + then this is the time in milliseconds to wait after the first event + arrives before reading the file descriptor. This allows further events + to accumulate before reading, which allows the kernel to coalesce like + events and can decrease the number of events the application needs to + process. However, this also increases the risk that the event queue will + overflow due to not being emptied fast enough. + + Returns: + generator: generator producing :attr:`~inotify_simple.Event` namedtuples + + .. warning:: + If the same inotify file descriptor is being read by multiple threads + simultaneously, this method may attempt to read the file descriptor when no + data is available. It may return zero events, or block until more events + arrive (regardless of the requested timeout), or in the case that the + :func:`~inotify_simple.INotify` object was instantiated with + ``nonblocking=True``, raise ``BlockingIOError``. + """ + + data = self._readall() + if not data and timeout != 0 and self.poll(): + if read_delay is not None: + sleep(read_delay / 1000.0) + data = self._readall() + return parse_events(data) + + def _readall(self): + bytes_avail = c_int() + ioctl(self, FIONREAD, bytes_avail) + if not bytes_avail.value: + return b'' + return os.read(self.fileno(), bytes_avail.value) + + +def parse_events(data): + """Unpack data read from an inotify file descriptor into + :attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This function + can be used if the application has read raw data from the inotify file + descriptor rather than calling :func:`~inotify_simple.INotify.read`. + + Args: + data (bytes): A bytestring as read from an inotify file descriptor. + + Returns: + list: list of :attr:`~inotify_simple.Event` namedtuples""" + + pos = 0 + events = [] + while pos < len(data): + wd, mask, cookie, namesize = unpack_from(_EVENT_FMT, data, pos) + pos += _EVENT_SIZE + namesize + name = data[pos - namesize: pos].split(b'\x00', 1)[0] + events.append(Event(wd, mask, cookie, fsdecode(name))) + return events + + +class flags(IntEnum): + """Inotify flags as defined in ``inotify.h`` but with ``IN_`` prefix omitted. + Includes a convenience method :func:`~inotify_simple.flags.from_mask` for extracting + flags from a mask.""" + + ACCESS = 0x00000001 #: File was accessed + MODIFY = 0x00000002 #: File was modified + ATTRIB = 0x00000004 #: Metadata changed + CLOSE_WRITE = 0x00000008 #: Writable file was closed + CLOSE_NOWRITE = 0x00000010 #: Unwritable file closed + OPEN = 0x00000020 #: File was opened + MOVED_FROM = 0x00000040 #: File was moved from X + MOVED_TO = 0x00000080 #: File was moved to Y + CREATE = 0x00000100 #: Subfile was created + DELETE = 0x00000200 #: Subfile was deleted + DELETE_SELF = 0x00000400 #: Self was deleted + MOVE_SELF = 0x00000800 #: Self was moved + + UNMOUNT = 0x00002000 #: Backing fs was unmounted + Q_OVERFLOW = 0x00004000 #: Event queue overflowed + IGNORED = 0x00008000 #: File was ignored + + ONLYDIR = 0x01000000 #: only watch the path if it is a directory + DONT_FOLLOW = 0x02000000 #: don't follow a sym link + EXCL_UNLINK = 0x04000000 #: exclude events on unlinked objects + MASK_ADD = 0x20000000 #: add to the mask of an already existing watch + ISDIR = 0x40000000 #: event occurred against dir + ONESHOT = 0x80000000 #: only send event once + + @classmethod + def from_mask(cls, mask): + """Convenience method that returns a list of every flag in a mask.""" + return [flag for flag in cls.__members__.values() if flag & mask] + + +class masks(IntEnum): + """Convenience masks as defined in ``inotify.h`` but with ``IN_`` prefix omitted.""" + + #: helper event mask equal to ``flags.CLOSE_WRITE | flags.CLOSE_NOWRITE`` + CLOSE = flags.CLOSE_WRITE | flags.CLOSE_NOWRITE + #: helper event mask equal to ``flags.MOVED_FROM | flags.MOVED_TO`` + MOVE = flags.MOVED_FROM | flags.MOVED_TO + + #: bitwise-OR of all the events that can be passed to + #: :func:`~inotify_simple.INotify.add_watch` + ALL_EVENTS = (flags.ACCESS | flags.MODIFY | flags.ATTRIB | flags.CLOSE_WRITE | + flags.CLOSE_NOWRITE | flags.OPEN | flags.MOVED_FROM | flags.MOVED_TO | + flags.CREATE | flags.DELETE | flags.DELETE_SELF | flags.MOVE_SELF) diff --git a/sysinv/sysinv/sysinv/sysinv/common/utils.py b/sysinv/sysinv/sysinv/sysinv/common/utils.py index 98078e6905..b71754f798 100644 --- a/sysinv/sysinv/sysinv/sysinv/common/utils.py +++ b/sysinv/sysinv/sysinv/sysinv/common/utils.py @@ -3612,3 +3612,15 @@ def checkout_ostree(ostree_repo, commit, target_dir, subpath): raise exception.SysinvException( "Error checkout ostree commit: %s" % (error), ) + + +def is_bundle_extension_valid(filename): + """Check if application bundles have the correct extension + + :param filename: Bundle filename + :return: Returns True if the extension is correct. + Otherwise returns False. + """ + + file_extension = pathlib.Path(filename).suffix + return file_extension.lower() == ".tgz" diff --git a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py index 6ba1feb09c..a7a45ff90a 100644 --- a/sysinv/sysinv/sysinv/sysinv/conductor/manager.py +++ b/sysinv/sysinv/sysinv/sysinv/conductor/manager.py @@ -29,6 +29,7 @@ collection of inventory data for each host. """ +from enum import Enum import errno import filecmp import glob @@ -52,7 +53,6 @@ import xml.etree.ElementTree as ElementTree from contextlib import contextmanager from datetime import datetime from datetime import timedelta -from distutils.util import strtobool from distutils.version import LooseVersion from copy import deepcopy @@ -93,6 +93,7 @@ from sysinv.api.controllers.v1 import kube_app as kube_api from sysinv.api.controllers.v1 import mtce_api from sysinv.api.controllers.v1 import utils from sysinv.api.controllers.v1 import vim_api +from sysinv.common import app_metadata from sysinv.common import fpga_constants from sysinv.common import constants from sysinv.common import ceph as cceph @@ -108,6 +109,8 @@ from sysinv.common import kubernetes from sysinv.common import retrying from sysinv.common import service from sysinv.common import utils as cutils +from sysinv.common.inotify import flags +from sysinv.common.inotify import INotify from sysinv.common.retrying import retry from sysinv.common.storage_backend_conf import StorageBackendConfig from cephclient import wrapper as ceph @@ -232,6 +235,63 @@ AppTarBall = namedtuple( "tarball_name app_name app_version manifest_name manifest_file metadata") +class KubeAppBundleStorageType(Enum): + DATABASE = 1 + + +class KubeAppBundleStorageFactory(object): + """Factory class that aims to abstract calls to storage operations when + handling application bundle metadata. + + This allows supporting a database implementation going forward and an + in-memory implementation for patchback scenarios if needed. + """ + + @staticmethod + def createKubeAppBundleStorage(storage_type=KubeAppBundleStorageType.DATABASE): + """Factory Method + + :param storage_type: Storage type used to house the metadata + """ + if storage_type == KubeAppBundleStorageType.DATABASE: + return KubeAppBundleDatabase() + + +class KubeAppBundleDatabase(KubeAppBundleStorageFactory): + """Database implementation to store application bundle metadata.""" + + def __init__(self): + self.dbapi = dbapi.get_instance() + + def create(self, bundle_data): + """Add a bundle to the database.""" + self.dbapi.kube_app_bundle_create(bundle_data) + + def create_all(self, bundle_bulk_data): + """Insert a list of bundles to the database.""" + self.dbapi.kube_app_bundle_create_all(bundle_bulk_data) + + def is_empty(self): + """Check if the table is empty.""" + return self.dbapi.kube_app_bundle_is_empty() + + def get_all(self): + """Get a list containing all bundles.""" + return self.dbapi.kube_app_bundle_get_all() + + def get_by_name(self, app_name): + """Get a list of bundles by their name.""" + return self.dbapi.kube_app_bundle_get_by_name(app_name) + + def destroy_all(self): + """Prune all bundle metadata.""" + self.dbapi.kube_app_bundle_destroy_all() + + def destroy_by_file_path(self, file_path): + """Delete bundle with a given file path.""" + self.dbapi.kube_app_bundle_destroy_by_file_path(file_path) + + class ConductorManager(service.PeriodicService): """Sysinv Conductor service main class.""" @@ -272,12 +332,17 @@ class ConductorManager(service.PeriodicService): endpoint='http://localhost:{}'.format(constants.CEPH_MGR_PORT)) self._kube = None self._fernet = None + self._inotify = None self._openstack = None self._api_token = None self._mtc_address = constants.LOCALHOST_HOSTNAME self._mtc_port = 2112 + # Store and track available application bundles + self._kube_app_bundle_storage = None + self._cached_app_bundle_set = set() + # Timeouts for adding & removing operations self._pv_op_timeouts = {} self._stor_bck_op_timeouts = {} @@ -358,6 +423,7 @@ class ConductorManager(service.PeriodicService): self.fm_log = fm.FmCustomerLog() self.host_uuid = self._get_active_controller_uuid() + self._kube_app_bundle_storage = KubeAppBundleStorageFactory.createKubeAppBundleStorage() self._openstack = openstack.OpenStackOperator(self.dbapi) self._puppet = puppet.PuppetOperator(self.dbapi) @@ -397,12 +463,36 @@ class ConductorManager(service.PeriodicService): # Runtime config tasks self._prune_runtime_config_table() + # Populate/update app bundle table as needed + if self._kube_app_bundle_storage.is_empty(): + self._populate_app_bundle_metadata() + else: + self._update_cached_app_bundles_set() + self._update_app_bundles_storage() + + # Initialize inotify and launch thread to monitor + # changes to the ostree root folder + self._initialize_ostree_inotify() + greenthread.spawn(self._monitor_ostree_root_folder) + LOG.info("sysinv-conductor start committed system=%s" % system.as_dict()) # Save our start time for time limited init actions self._start_time = timeutils.utcnow() + def _initialize_ostree_inotify(self): + """ Initialize inotify to watch for changes under the ostree root + folder. + + Created or removed files under that folder suggest that a patch + was applied and a new ostree commit was deployed. + """ + + self._inotify = INotify() + watch_flags = flags.CREATE | flags.DELETE + self._inotify.add_watch(constants.OSTREE_ROOT_FOLDER, watch_flags) + def _get_active_controller_uuid(self): ahost = utils.HostHelper.get_active_controller(self.dbapi) if ahost: @@ -7211,67 +7301,53 @@ class ConductorManager(service.PeriodicService): self._inner_sync_auto_apply(context, app_name) - @staticmethod - def check_app_k8s_auto_update(app_name, tarball): - """ Check whether an application should be automatically updated - based on its Kubernetes upgrade metadata fields. + def _get_app_bundle_for_update(self, app): + """ Retrieve metadata from the most updated application bundle + that can be used to update the given app. - :param tarball: tarball object of the application to be checked + :param app: The application to be updated + :return The bundle metadata from the new version of the app """ - minimum_supported_k8s_version = tarball.metadata.get( - constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get( - constants.APP_METADATA_MINIMUM, None) + bundle_metadata_list = self._kube_app_bundle_storage.get_by_name(app.name) - if minimum_supported_k8s_version is None: - # TODO: Turn this into an error message rather than a warning - # when the k8s app upgrade implementation is in place. Also, - # return False in this scenario. - LOG.warning("Minimum supported Kubernetes version missing from " - "{} metadata".format(app_name)) - else: - LOG.debug("minimum_supported_k8s_version for {}: {}" - .format(app_name, minimum_supported_k8s_version)) + latest_version_bundle = None + k8s_version = self._kube.kube_get_kubernetes_version().strip().lstrip('v') + for bundle_metadata in bundle_metadata_list: + if LooseVersion(bundle_metadata.version) <= LooseVersion(app.app_version): + LOG.debug("Bundle {} version {} lower than installed app version ({})" + .format(bundle_metadata.file_path, + bundle_metadata.version, + app.app_version)) + elif not bundle_metadata.auto_update: + LOG.debug("Application auto update disabled for bundle {}" + .format(bundle_metadata.file_path)) + elif not bundle_metadata.k8s_auto_update: + LOG.debug("Kubernetes application auto update disabled for bundle {}" + .format(bundle_metadata.file_path)) + elif LooseVersion(k8s_version) < LooseVersion(bundle_metadata.k8s_minimum_version): + LOG.debug("Kubernetes version {} is lower than {} which is " + "the minimum required for bundle {}" + .format(k8s_version, + bundle_metadata.k8s_minimum_version, + bundle_metadata.file_path)) + elif ((bundle_metadata.k8s_maximum_version is not None) and (LooseVersion(k8s_version) > + LooseVersion(bundle_metadata.k8s_maximum_version))): + LOG.debug("Kubernetes version {} is higher than {} which is " + "the maximum allowed for bundle {}" + .format(k8s_version, + bundle_metadata.k8s_maximum_version, + bundle_metadata.file_path)) + elif ((latest_version_bundle is None) or + (LooseVersion(bundle_metadata.version) > + LooseVersion(latest_version_bundle.version))): + # Only set the chosen bundle if it was not set before or if the version + # of the current one is higher than the one previously set. + latest_version_bundle = bundle_metadata - maximum_supported_k8s_version = tarball.metadata.get( - constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get( - constants.APP_METADATA_MAXIMUM, None) + return latest_version_bundle - if maximum_supported_k8s_version: - LOG.debug("maximum_supported_k8s_version for {}: {}" - .format(app_name, maximum_supported_k8s_version)) - - k8s_upgrades = tarball.metadata.get( - constants.APP_METADATA_K8S_UPGRADES, None) - - if k8s_upgrades is None: - k8s_auto_update = constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE - k8s_update_timing = constants.APP_METADATA_TIMING_DEFAULT_VALUE - LOG.warning("k8s_upgrades section missing from {} metadata" - .format(app_name)) - else: - k8s_auto_update = tarball.metadata.get( - constants.APP_METADATA_K8S_UPGRADES).get( - constants.APP_METADATA_AUTO_UPDATE, - constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE) - k8s_update_timing = tarball.metadata.get( - constants.APP_METADATA_K8S_UPGRADES).get( - constants.APP_METADATA_TIMING, - constants.APP_METADATA_TIMING_DEFAULT_VALUE) - - # TODO: check if the application meets the criteria to be updated - # according to the 'supported_k8s_version' and 'k8s_upgrades' - # metadata sections. This initial implementation is only intended to - # set the default values for each entry. - - LOG.debug("k8s_auto_update value for {}: {}" - .format(app_name, k8s_auto_update)) - LOG.debug("k8s_update_timing value for {}: {}" - .format(app_name, k8s_update_timing)) - - return True - - def _auto_update_app(self, context, app_name, managed_app): + def _auto_update_app(self, context, app_name): """Auto update applications""" try: app = kubeapp_obj.get_by_name(context, app_name) @@ -7314,19 +7390,15 @@ class ConductorManager(service.PeriodicService): LOG.debug("Application %s: Checking " "for update ..." % app_name) - tarfile = self._search_tarfile(app_name, managed_app=managed_app) - if tarfile is None: - # Skip if no tarball or multiple tarballs found - return - - applied_app = '{}-{}'.format(app.name, app.app_version) - if applied_app in tarfile: - # Skip if the tarfile version is already applied + app_bundle = self._get_app_bundle_for_update(app) + if app_bundle is None: + # Skip if no bundles are found + LOG.debug("No bundle found for updating %s" % app_name) return LOG.info("Found new tarfile version for %s: %s" - % (app.name, tarfile)) - tarball = self._check_tarfile(app_name, tarfile, + % (app.name, app_bundle.file_path)) + tarball = self._check_tarfile(app_name, app_bundle.file_path, preserve_metadata=True) if ((tarball.app_name is None) or (tarball.app_version is None) or @@ -7335,18 +7407,7 @@ class ConductorManager(service.PeriodicService): # Skip if tarball check fails return - if not tarball.metadata: - # Skip if app doesn't have metadata - return - - auto_update = tarball.metadata.get( - constants.APP_METADATA_UPGRADES, {}).get( - constants.APP_METADATA_AUTO_UPDATE, False) - if not bool(strtobool(str(auto_update))): - # Skip if app is not set to auto_update - return - - if tarball.app_version in \ + if app_bundle.version in \ app.app_metadata.get( constants.APP_METADATA_UPGRADES, {}).get( constants.APP_METADATA_FAILED_VERSIONS, []): @@ -7357,11 +7418,6 @@ class ConductorManager(service.PeriodicService): % (app.name, tarball.app_version, app.app_version)) return - # Check if the update should proceed based on the application's - # Kubernetes metadata - if not ConductorManager.check_app_k8s_auto_update(app_name, tarball): - return - self._inner_sync_auto_update(context, app, tarball) @cutils.synchronized(LOCK_APP_AUTO_MANAGE) @@ -7541,14 +7597,14 @@ class ConductorManager(service.PeriodicService): """ Load metadata of apps from the directory containing apps bundled with the iso. """ - for tarfile in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH): + for app_bundle in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH): # Get the app name from the tarball name # If the app has the metadata loaded already, by conductor restart, # then skip the tarball extraction app_name = None pattern = re.compile("^(.*)-([0-9]+\.[0-9]+-[0-9]+)") - match = pattern.search(tarfile) + match = pattern.search(app_bundle) if match: app_name = match.group(1) @@ -7560,7 +7616,7 @@ class ConductorManager(service.PeriodicService): # Proceed with extracting the tarball tarball_name = '{}/{}'.format( - constants.HELM_APP_ISO_INSTALL_PATH, tarfile) + constants.HELM_APP_ISO_INSTALL_PATH, app_bundle) with cutils.TempDirectory() as app_path: if not cutils.extract_tarfile(app_path, tarball_name): @@ -7733,6 +7789,90 @@ class ConductorManager(service.PeriodicService): # No need to detect again until conductor restart self._do_detect_swact = False + def _populate_app_bundle_metadata(self): + """Read metadata of all application bundles and store in the database""" + + bundle_list = [] + for file_path in glob.glob("{}/*.tgz".format(constants.HELM_APP_ISO_INSTALL_PATH)): + bundle_data = app_metadata.extract_bundle_metadata(file_path) + if bundle_data: + bundle_list.append(bundle_data) + + self._kube_app_bundle_storage.create_all(bundle_list) + self._update_cached_app_bundles_set() + + def _add_app_bundle(self, full_bundle_path): + """Add a new application bundle record""" + + bundle_data = app_metadata.extract_bundle_metadata(full_bundle_path) + if bundle_data: + LOG.info("New application bundle available: {}".format(full_bundle_path)) + try: + self._kube_app_bundle_storage.create(bundle_data) + except exception.KubeAppBundleAlreadyExists as e: + LOG.exception(e) + except Exception as e: + LOG.exception("Error while storing bundle data for {}: {}" + .format(full_bundle_path, e)) + + def _remove_app_bundle(self, full_bundle_path): + """Remove application bundle record""" + + LOG.info("Application bundle deleted: {}".format(full_bundle_path)) + try: + self._kube_app_bundle_storage.destroy_by_file_path(full_bundle_path) + except Exception as e: + LOG.error("Error while removing bundle data for {}: {}" + .format(full_bundle_path, e)) + + def _update_cached_app_bundles_set(self): + """Update internal cache of application bundles""" + + self._cached_app_bundle_set = set(bundle.file_path for bundle in + self._kube_app_bundle_storage.get_all()) + + def _update_app_bundles_storage(self): + """Update application bundle storage to account for new and removed files""" + + filesystem_app_bundle_set = set(glob.glob("{}/*.tgz" + .format(constants.HELM_APP_ISO_INSTALL_PATH))) + if filesystem_app_bundle_set != self._cached_app_bundle_set: + new_files = set(file_path for file_path in filesystem_app_bundle_set + if file_path not in self._cached_app_bundle_set) + + # Add new files to the database + for file_path in new_files: + self._add_app_bundle(file_path) + + # Delete removed files from the database + for file_path in self._cached_app_bundle_set: + if file_path not in filesystem_app_bundle_set: + self._remove_app_bundle(file_path) + + # Update internal bundle set to reflect the storage + self._update_cached_app_bundles_set() + + def _monitor_ostree_root_folder(self): + """Update application bundle storage to account for new and removed files""" + + if self._inotify is None: + LOG.error("Inotify has not been initialized.") + return + + while True: + for event in self._inotify.read(timeout=0): + event_types = [f.name for f in flags.from_mask(event.mask)] + LOG.debug("Event {}. Event types: {}".format(event, event_types)) + + # If the "lock" file was deleted inside the ostree root it means + # that a new ostree has finished to be deployed. Therefore we may + # need to update the list of available application bundles. + if constants.INOTIFY_DELETE_EVENT in event_types and \ + event.name == constants.OSTREE_LOCK_FILE: + self._update_app_bundles_storage() + + time.sleep(1) + @periodic_task.periodic_task(spacing=CONF.conductor_periodic_task_intervals.k8s_application, run_immediately=True) def _k8s_application_audit(self, context): @@ -7839,7 +7979,7 @@ class ConductorManager(service.PeriodicService): self._auto_recover_managed_app(context, app_name) elif status == constants.APP_APPLY_SUCCESS: self.check_pending_app_reapply(context) - self._auto_update_app(context, app_name, managed_app=True) + self._auto_update_app(context, app_name) # Special case, we want to apply some logic to non-managed applications for app_name in self.apps_metadata[constants.APP_METADATA_APPS].keys(): @@ -7859,7 +7999,7 @@ class ConductorManager(service.PeriodicService): # Automatically update non-managed applications if status == constants.APP_APPLY_SUCCESS: self.check_pending_app_reapply(context) - self._auto_update_app(context, app_name, managed_app=False) + self._auto_update_app(context, app_name) LOG.debug("Periodic Task: _k8s_application_audit: Finished") diff --git a/sysinv/sysinv/sysinv/sysinv/tests/api/test_storage_tier.py b/sysinv/sysinv/sysinv/sysinv/tests/api/test_storage_tier.py index 56f5da6ec5..cd3dab3b2c 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/api/test_storage_tier.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/api/test_storage_tier.py @@ -558,6 +558,8 @@ class StorageTierDependentTCs(base.FunctionalTest): self.set_is_initial_config_patcher.return_value = True self.service = manager.ConductorManager('test-host', 'test-topic') self.service.dbapi = dbapi.get_instance() + self.service._populate_app_bundle_metadata = mock.Mock() + self.service._initialize_ostree_inotify = mock.Mock() self.context = context.get_admin_context() self.dbapi = dbapi.get_instance() self.system = dbutils.create_test_isystem() diff --git a/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_ceph.py b/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_ceph.py index 476479977f..8cc9a0e396 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_ceph.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_ceph.py @@ -62,6 +62,8 @@ class UpdateCephCluster(base.DbTestCase): self.mock_fix_crushmap.return_value = True self.service._sx_to_dx_post_migration_actions = mock.Mock() + self.service._populate_app_bundle_metadata = mock.Mock() + self.service._initialize_ostree_inotify = mock.Mock() def tearDown(self): super(UpdateCephCluster, self).tearDown() diff --git a/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_manager.py b/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_manager.py index 3881a95f5b..3e2230d455 100644 --- a/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_manager.py +++ b/sysinv/sysinv/sysinv/sysinv/tests/conductor/test_manager.py @@ -482,6 +482,8 @@ class ManagerTestCase(base.DbTestCase): self.service._update_pxe_config = mock.Mock() self.service._ceph_mon_create = mock.Mock() self.service._sx_to_dx_post_migration_actions = mock.Mock() + self.service._populate_app_bundle_metadata = mock.Mock() + self.service._initialize_ostree_inotify = mock.Mock() self.alarm_raised = False self.kernel_alarms = {}