Introduce support for multiple application bundles

Parse the metadata of all application bundles under the helm application
folder and save to the kube_app_bundle table. This is done during sysinv
startup and when a new ostree commit is deployed.

The auto update logic was changed to enable retrieving metadata from
the database for all available bundles of a given app and compute which
bundle should be used to carry out the upgrade.

The bundle choice is done based on the minimum and maximum Kubernetes
versions supported by the application. If multiple bundles fit that
criteria then the application with the highest version number is chosen.

The 65-k8s-app-upgrade.sh script also takes into account multiple
bundles during the platform upgrade activation step, prioritizing
lowest versions available to ensure compatibility with the Kubernetes
version carried over from the N release. A follow-up change will improve
this mechanism to discover specific app versions.

When platform patches are applied and the ostree is changed then the
content of the helm application folder is reevaluated and the database
updated accordingly if there are new or removed bundles.

Test plan:
PASS: build-pkgs -a & build-image
PASS: Fresh AIO-SX install.
PASS: Fresh AIO-DX install.
PASS: Manually place multiple tarballs of one application with
      different versions under /usr/local/share/applications/helm/
      and check if the app is updated correctly.
PASS: Build a reboot required patch that removes the istio
      bundle and adds a new metrics-server version.
      Apply the reboot required patch.
      Check if istio was removed from the kube_app_bundle table.
      Check if the metrics-server previous version was removed from the
      kube_app_bundle table.
      Check if the metrics-server new version was added to the
      kube_app_bundle table.
      Check if metrics-server was updated to the new version added
      to the database.
PASS: Build a no reboot required patch that does not restart
      sysinv, removes the istio bundle and adds a new metrics-server
      version.
      Apply the no reboot required patch.
      Check if istio was removed from the kube_app_bundle table.
      Check if the metrics-server previous version was removed from the
      kube_app_bundle table.
      Check if the metrics-server new version was added to the
      kube_app_bundle table.
      Check if metrics-server was updated to the new version added
      to the database.
PASS: Build a no reboot required patch that restarts sysinv,
      removes the istio bundle and adds a new metrics-server version.
      Apply the no reboot required patch.
      Check if istio was removed from the kube_app_bundle table.
      Check if the metrics-server previous version was removed from the
      kube_app_bundle table.
      Check if the metrics-server new version was added to the
      kube_app_bundle table and was updated.
      Check if metrics-server was updated to the new version added
      to the database.
PASS: Install power-metrics on stx-8.
      Run platform upgrade from stx-8 placing two different versions of
      metrics-server under /usr/local/share/applications/helm/.
      Check if default apps and metrics-server were properly updated
      during upgrade-activate step.
      Check if power-metrics was auto updated after upgrade-complete
      step.

Story: 2010929
Task: 49097

Change-Id: I46f7cb6ebc59ad49157e9044a4937a406313671e
Signed-off-by: Igor Soares <Igor.PiresSoares@windriver.com>
This commit is contained in:
Igor Soares 2023-11-15 16:24:13 -03:00
parent ab469de093
commit ea00765271
9 changed files with 627 additions and 99 deletions

View File

@ -137,8 +137,12 @@ if [ "$ACTION" == "activate" ]; then
sleep $RECOVER_RESULT_SLEEP
done
# Sort applications by version. Lower versions are attempted first.
APPS_SORTED_BY_VERSION=$(find $PLATFORM_APPLICATION_PATH/* | sort -V)
LAST_APP_CHECKED=""
# Get the list of applications installed in the new release
for fqpn_app in $PLATFORM_APPLICATION_PATH/*; do
for fqpn_app in $APPS_SORTED_BY_VERSION; do
# Extract the app name and version from the tarball name: app_name-version.tgz
re='^(.*)-([0-9]+\.[0-9]+-[0-9]+).tgz'
[[ "$(basename $fqpn_app)" =~ $re ]]
@ -146,6 +150,18 @@ if [ "$ACTION" == "activate" ]; then
UPGRADE_APP_VERSION=${BASH_REMATCH[2]}
log "$NAME: Found application ${UPGRADE_APP_NAME}, version ${UPGRADE_APP_VERSION} at $fqpn_app"
# Confirm application is loaded.
EXISTING_APP_NAME=$(system application-show $UPGRADE_APP_NAME --column name --format value)
if [ -z "${EXISTING_APP_NAME}" ]; then
log "$NAME: ${UPGRADE_APP_NAME} is currently not uploaded in the system. skipping..."
continue
fi
# If the last iteration for the same app was sucessful no further updates are necessary
if [ "${LAST_APP_CHECKED}" == "${UPGRADE_APP_NAME}" ] && [[ "${EXISTING_APP_STATUS}" =~ ^(uploaded|applied)$ ]]; then
continue
fi
# Confirm application is upgradable
# TODO: move nginx back to the supported platform applications list when
# fluxcd application upgrade is supported
@ -156,13 +172,6 @@ if [ "$ACTION" == "activate" ]; then
continue
fi
# Confirm application is loaded.
EXISTING_APP_NAME=$(system application-show $UPGRADE_APP_NAME --column name --format value)
if [ -z "${EXISTING_APP_NAME}" ]; then
log "$NAME: ${UPGRADE_APP_NAME} is currently not uploaded in the system. skipping..."
continue
fi
# Get the existing application details
EXISTING_APP_INFO=$(system application-show $EXISTING_APP_NAME --column app_version --column status --format yaml)
EXISTING_APP_VERSION=$(echo ${EXISTING_APP_INFO} | sed 's/.*app_version:[[:space:]]\(\S*\).*/\1/')
@ -174,7 +183,7 @@ if [ "$ACTION" == "activate" ]; then
# If the app is in uploaded or applied state, then we continue with next iteration.
# Else, the code execution proceeds and the script would exit with an unexpected state.
if [[ "${EXISTING_APP_STATUS}" =~ ^(uploaded|applied)$ ]]; then
log "$NAME: ${UPGRADE_APP_NAME}, version ${EXISTING_APP_VERSION}, is already present. skipping..."
log "$NAME: ${UPGRADE_APP_NAME}, version ${EXISTING_APP_VERSION}, is already present. Skipping..."
continue
fi
fi
@ -242,6 +251,8 @@ if [ "$ACTION" == "activate" ]; then
if ! grep -q "${EXISTING_APP_NAME},${EXISTING_APP_VERSION},${UPGRADE_APP_VERSION}" $UPGRADE_IN_PROGRESS_APPS_FILE; then
echo "${EXISTING_APP_NAME},${EXISTING_APP_VERSION},${UPGRADE_APP_VERSION}" >> $UPGRADE_IN_PROGRESS_APPS_FILE
fi
LAST_APP_CHECKED=${UPGRADE_APP_NAME}
done
log "$NAME: Completed Kubernetes application updates for release $FROM_RELEASE to $TO_RELEASE with action $ACTION"

View File

@ -10,16 +10,18 @@
import io
import glob
import os
import ruamel.yaml
import shutil
import six
import tarfile
import tempfile
import yaml
from oslo_log import log as logging
from sysinv._i18n import _
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import kubernetes
from sysinv.common import utils
LOG = logging.getLogger(__name__)
@ -562,3 +564,78 @@ def verify_application(path: str) -> bool:
"metadata verification failed. {}".format(e)))
return is_verified
def extract_bundle_metadata(file_path):
"""Extract metadata from a given tarball
:param file_path: Application bundle file path
"""
try:
tarball = tarfile.open(file_path)
metadata_yaml_path = "./{}".format(constants.APP_METADATA_FILE)
tarball.getmember(metadata_yaml_path)
with tarball.extractfile(metadata_yaml_path) as metadata_file:
metadata = ruamel.yaml.load(metadata_file,
Loader=ruamel.yaml.RoundTripLoader,
preserve_quotes=True)
minimum_supported_k8s_version = metadata.get(
constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get(
constants.APP_METADATA_MINIMUM, None)
if minimum_supported_k8s_version is None:
# TODO(ipiresso): Turn this into an error message rather than
# a warning when the k8s app upgrade implementation is in place
# and remove the hardcoded value. Also, do not add the bundle to
# the database in this scenario.
LOG.warning("Minimum supported Kubernetes version missing from {}"
.format(file_path))
minimum_supported_k8s_version = kubernetes.get_kube_versions()[0]['version']
minimum_supported_k8s_version = minimum_supported_k8s_version.strip().lstrip('v')
maximum_supported_k8s_version = metadata.get(
constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get(
constants.APP_METADATA_MAXIMUM, None)
if maximum_supported_k8s_version is not None:
maximum_supported_k8s_version = maximum_supported_k8s_version.strip().lstrip('v')
k8s_upgrades = metadata.get(constants.APP_METADATA_K8S_UPGRADES, None)
if k8s_upgrades is None:
k8s_auto_update = constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE
k8s_update_timing = constants.APP_METADATA_TIMING_DEFAULT_VALUE
LOG.warning("k8s_upgrades section missing from {} metadata"
.format(file_path))
else:
k8s_auto_update = tarball.metadata.get(
constants.APP_METADATA_K8S_UPGRADES).get(
constants.APP_METADATA_AUTO_UPDATE,
constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE)
k8s_update_timing = tarball.metadata.get(
constants.APP_METADATA_K8S_UPGRADES).get(
constants.APP_METADATA_TIMING,
constants.APP_METADATA_TIMING_DEFAULT_VALUE)
bundle_data = {
'name': metadata.get(constants.APP_METADATA_NAME),
'version': metadata.get(constants.APP_METADATA_VERSION),
'file_path': file_path,
'auto_update':
metadata.get(constants.APP_METADATA_UPGRADES, {}).get(
constants.APP_METADATA_AUTO_UPDATE,
constants.APP_METADATA_AUTO_UPDATE_DEFAULT_VALUE),
'k8s_auto_update': k8s_auto_update,
'k8s_timing': k8s_update_timing,
'k8s_minimum_version': minimum_supported_k8s_version,
'k8s_maximum_version': maximum_supported_k8s_version
}
return bundle_data
except KeyError:
LOG.warning("Application bundle {} does not contain a metadata file.".format(file_path))
except Exception as e:
LOG.exception(e)

View File

@ -1813,7 +1813,8 @@ APP_PENDING_REAPPLY_FLAG = os.path.join(
# FluxCD
APP_FLUXCD_MANIFEST_DIR = 'fluxcd-manifests'
APP_FLUXCD_DATA_PATH = os.path.join(tsc.PLATFORM_PATH, 'fluxcd', tsc.SW_VERSION)
APP_FLUXCD_BASE_PATH = os.path.join(tsc.PLATFORM_PATH, 'fluxcd')
APP_FLUXCD_DATA_PATH = os.path.join(APP_FLUXCD_BASE_PATH, tsc.SW_VERSION)
APP_ROOT_KUSTOMIZE_FILE = 'kustomization.yaml'
APP_HELMREPOSITORY_FILE = "helmrepository.yaml"
APP_BASE_HELMREPOSITORY_FILE = os.path.join("base", APP_HELMREPOSITORY_FILE)
@ -2397,6 +2398,13 @@ OS_DEBIAN = 'debian'
SUPPORTED_OS_TYPES = [OS_CENTOS, OS_DEBIAN]
OS_UPGRADE_FEED_FOLDER = '/var/www/pages/feed/'
# OSTree
OSTREE_ROOT_FOLDER = '/sysroot/ostree/'
OSTREE_LOCK_FILE = 'lock'
# INotify
INOTIFY_DELETE_EVENT = 'DELETE'
# Configuration support placeholders
CONFIGURABLE = 'configurable'
NOT_CONFIGURABLE = 'not-configurable'

View File

@ -0,0 +1,274 @@
# Based on inotify_simple: https://github.com/chrisjbillington/inotify_simple
# Licensed under the BSD 2-Clause "Simplified" License
#
# Copyright (c) 2016, Chris Billington
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided wi6h the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
# Copyright (c) 2024 Wind River Systems, Inc.
#
import errno
import os
from collections import namedtuple
from ctypes import CDLL
from ctypes import get_errno
from ctypes import c_int
from ctypes.util import find_library
from enum import IntEnum
from errno import EINTR
from fcntl import ioctl
from io import FileIO
from os import fsencode
from os import fsdecode
from select import select
from struct import calcsize
from struct import unpack_from
from termios import FIONREAD
from time import sleep
__version__ = '1.3.5'
__all__ = ['Event', 'INotify', 'flags', 'masks', 'parse_events']
_libc = None
def _libc_call(function, *args):
"""Wrapper which raises errors and retries on EINTR."""
while True:
rc = function(*args)
if rc != -1:
return rc
err = get_errno()
if err != EINTR:
raise OSError(errno, os.strerror(errno))
#: A ``namedtuple`` (wd, mask, cookie, name) for an inotify event. The
#: :attr:`~inotify_simple.Event.name` field is a ``str`` decoded with
#: ``os.fsdecode()``.
Event = namedtuple('Event', ['wd', 'mask', 'cookie', 'name'])
_EVENT_FMT = 'iIII'
_EVENT_SIZE = calcsize(_EVENT_FMT)
class INotify(FileIO):
#: The inotify file descriptor returned by ``inotify_init()``. You are
#: free to use it directly with ``os.read`` if you'd prefer not to call
#: :func:`~inotify_simple.INotify.read` for some reason. Also available as
#: :func:`~inotify_simple.INotify.fileno`
fd = property(FileIO.fileno)
def __init__(self, inheritable=False, nonblocking=False):
"""File-like object wrapping ``inotify_init1()``. Raises ``OSError`` on failure.
:func:`~inotify_simple.INotify.close` should be called when no longer needed.
Can be used as a context manager to ensure it is closed, and can be used
directly by functions expecting a file-like object, such as ``select``, or with
functions expecting a file descriptor via
:func:`~inotify_simple.INotify.fileno`.
Args:
inheritable (bool): whether the inotify file descriptor will be inherited by
child processes. The default,``False``, corresponds to passing the
``IN_CLOEXEC`` flag to ``inotify_init1()``. Setting this flag when
opening filedescriptors is the default behaviour of Python standard
library functions since PEP 446. On Python < 3.3, the file descriptor
will be inheritable and this argument has no effect, one must instead
use fcntl to set FD_CLOEXEC to make it non-inheritable.
nonblocking (bool): whether to open the inotify file descriptor in
nonblocking mode, corresponding to passing the ``IN_NONBLOCK`` flag to
``inotify_init1()``. This does not affect the normal behaviour of
:func:`~inotify_simple.INotify.read`, which uses ``poll()`` to control
blocking behaviour according to the given timeout, but will cause other
reads of the file descriptor (for example if the application reads data
manually with ``os.read(fd)``) to raise ``BlockingIOError`` if no data
is available."""
try:
libc_so = find_library('c')
except RuntimeError: # Python on Synology NASs raises a RuntimeError
libc_so = None
global _libc
_libc = _libc or CDLL(libc_so or 'libc.so.6', use_errno=True)
O_CLOEXEC = getattr(os, 'O_CLOEXEC', 0) # Only defined in Python 3.3+
flags = (not inheritable) * O_CLOEXEC | bool(nonblocking) * os.O_NONBLOCK
FileIO.__init__(self, _libc_call(_libc.inotify_init1, flags), mode='rb')
def add_watch(self, path, mask):
"""Wrapper around ``inotify_add_watch()``. Returns the watch
descriptor or raises an ``OSError`` on failure.
Args:
path (str, bytes, or PathLike): The path to watch. Will be encoded with
``os.fsencode()`` before being passed to ``inotify_add_watch()``.
mask (int): The mask of events to watch for. Can be constructed by
bitwise-ORing :class:`~inotify_simple.flags` together.
Returns:
int: watch descriptor"""
# Explicit conversion of Path to str required on Python < 3.6
path = str(path) if hasattr(path, 'parts') else path
return _libc_call(_libc.inotify_add_watch, self.fileno(), fsencode(path), mask)
def rm_watch(self, wd):
"""Wrapper around ``inotify_rm_watch()``. Raises ``OSError`` on failure.
Args:
wd (int): The watch descriptor to remove"""
_libc_call(_libc.inotify_rm_watch, self.fileno(), wd)
def poll(self):
"""Wait for I/O completion"""
while True:
try:
read_fs, _, _ = select([self.fileno()], [], [])
break
except select.error as err:
if err.errno == errno.EINTR:
break
else:
raise
return bool(read_fs)
def read(self, timeout=None, read_delay=None):
"""Read the inotify file descriptor and return the resulting
:attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name).
Args:
timeout (int): The time in milliseconds to wait for events if there are
none. If negative or ``None``, block until there are events. If zero,
return immediately if there are no events to be read.
read_delay (int): If there are no events immediately available for reading,
then this is the time in milliseconds to wait after the first event
arrives before reading the file descriptor. This allows further events
to accumulate before reading, which allows the kernel to coalesce like
events and can decrease the number of events the application needs to
process. However, this also increases the risk that the event queue will
overflow due to not being emptied fast enough.
Returns:
generator: generator producing :attr:`~inotify_simple.Event` namedtuples
.. warning::
If the same inotify file descriptor is being read by multiple threads
simultaneously, this method may attempt to read the file descriptor when no
data is available. It may return zero events, or block until more events
arrive (regardless of the requested timeout), or in the case that the
:func:`~inotify_simple.INotify` object was instantiated with
``nonblocking=True``, raise ``BlockingIOError``.
"""
data = self._readall()
if not data and timeout != 0 and self.poll():
if read_delay is not None:
sleep(read_delay / 1000.0)
data = self._readall()
return parse_events(data)
def _readall(self):
bytes_avail = c_int()
ioctl(self, FIONREAD, bytes_avail)
if not bytes_avail.value:
return b''
return os.read(self.fileno(), bytes_avail.value)
def parse_events(data):
"""Unpack data read from an inotify file descriptor into
:attr:`~inotify_simple.Event` namedtuples (wd, mask, cookie, name). This function
can be used if the application has read raw data from the inotify file
descriptor rather than calling :func:`~inotify_simple.INotify.read`.
Args:
data (bytes): A bytestring as read from an inotify file descriptor.
Returns:
list: list of :attr:`~inotify_simple.Event` namedtuples"""
pos = 0
events = []
while pos < len(data):
wd, mask, cookie, namesize = unpack_from(_EVENT_FMT, data, pos)
pos += _EVENT_SIZE + namesize
name = data[pos - namesize: pos].split(b'\x00', 1)[0]
events.append(Event(wd, mask, cookie, fsdecode(name)))
return events
class flags(IntEnum):
"""Inotify flags as defined in ``inotify.h`` but with ``IN_`` prefix omitted.
Includes a convenience method :func:`~inotify_simple.flags.from_mask` for extracting
flags from a mask."""
ACCESS = 0x00000001 #: File was accessed
MODIFY = 0x00000002 #: File was modified
ATTRIB = 0x00000004 #: Metadata changed
CLOSE_WRITE = 0x00000008 #: Writable file was closed
CLOSE_NOWRITE = 0x00000010 #: Unwritable file closed
OPEN = 0x00000020 #: File was opened
MOVED_FROM = 0x00000040 #: File was moved from X
MOVED_TO = 0x00000080 #: File was moved to Y
CREATE = 0x00000100 #: Subfile was created
DELETE = 0x00000200 #: Subfile was deleted
DELETE_SELF = 0x00000400 #: Self was deleted
MOVE_SELF = 0x00000800 #: Self was moved
UNMOUNT = 0x00002000 #: Backing fs was unmounted
Q_OVERFLOW = 0x00004000 #: Event queue overflowed
IGNORED = 0x00008000 #: File was ignored
ONLYDIR = 0x01000000 #: only watch the path if it is a directory
DONT_FOLLOW = 0x02000000 #: don't follow a sym link
EXCL_UNLINK = 0x04000000 #: exclude events on unlinked objects
MASK_ADD = 0x20000000 #: add to the mask of an already existing watch
ISDIR = 0x40000000 #: event occurred against dir
ONESHOT = 0x80000000 #: only send event once
@classmethod
def from_mask(cls, mask):
"""Convenience method that returns a list of every flag in a mask."""
return [flag for flag in cls.__members__.values() if flag & mask]
class masks(IntEnum):
"""Convenience masks as defined in ``inotify.h`` but with ``IN_`` prefix omitted."""
#: helper event mask equal to ``flags.CLOSE_WRITE | flags.CLOSE_NOWRITE``
CLOSE = flags.CLOSE_WRITE | flags.CLOSE_NOWRITE
#: helper event mask equal to ``flags.MOVED_FROM | flags.MOVED_TO``
MOVE = flags.MOVED_FROM | flags.MOVED_TO
#: bitwise-OR of all the events that can be passed to
#: :func:`~inotify_simple.INotify.add_watch`
ALL_EVENTS = (flags.ACCESS | flags.MODIFY | flags.ATTRIB | flags.CLOSE_WRITE |
flags.CLOSE_NOWRITE | flags.OPEN | flags.MOVED_FROM | flags.MOVED_TO |
flags.CREATE | flags.DELETE | flags.DELETE_SELF | flags.MOVE_SELF)

View File

@ -3612,3 +3612,15 @@ def checkout_ostree(ostree_repo, commit, target_dir, subpath):
raise exception.SysinvException(
"Error checkout ostree commit: %s" % (error),
)
def is_bundle_extension_valid(filename):
"""Check if application bundles have the correct extension
:param filename: Bundle filename
:return: Returns True if the extension is correct.
Otherwise returns False.
"""
file_extension = pathlib.Path(filename).suffix
return file_extension.lower() == ".tgz"

View File

@ -29,6 +29,7 @@ collection of inventory data for each host.
"""
from enum import Enum
import errno
import filecmp
import glob
@ -52,7 +53,6 @@ import xml.etree.ElementTree as ElementTree
from contextlib import contextmanager
from datetime import datetime
from datetime import timedelta
from distutils.util import strtobool
from distutils.version import LooseVersion
from copy import deepcopy
@ -93,6 +93,7 @@ from sysinv.api.controllers.v1 import kube_app as kube_api
from sysinv.api.controllers.v1 import mtce_api
from sysinv.api.controllers.v1 import utils
from sysinv.api.controllers.v1 import vim_api
from sysinv.common import app_metadata
from sysinv.common import fpga_constants
from sysinv.common import constants
from sysinv.common import ceph as cceph
@ -108,6 +109,8 @@ from sysinv.common import kubernetes
from sysinv.common import retrying
from sysinv.common import service
from sysinv.common import utils as cutils
from sysinv.common.inotify import flags
from sysinv.common.inotify import INotify
from sysinv.common.retrying import retry
from sysinv.common.storage_backend_conf import StorageBackendConfig
from cephclient import wrapper as ceph
@ -232,6 +235,63 @@ AppTarBall = namedtuple(
"tarball_name app_name app_version manifest_name manifest_file metadata")
class KubeAppBundleStorageType(Enum):
DATABASE = 1
class KubeAppBundleStorageFactory(object):
"""Factory class that aims to abstract calls to storage operations when
handling application bundle metadata.
This allows supporting a database implementation going forward and an
in-memory implementation for patchback scenarios if needed.
"""
@staticmethod
def createKubeAppBundleStorage(storage_type=KubeAppBundleStorageType.DATABASE):
"""Factory Method
:param storage_type: Storage type used to house the metadata
"""
if storage_type == KubeAppBundleStorageType.DATABASE:
return KubeAppBundleDatabase()
class KubeAppBundleDatabase(KubeAppBundleStorageFactory):
"""Database implementation to store application bundle metadata."""
def __init__(self):
self.dbapi = dbapi.get_instance()
def create(self, bundle_data):
"""Add a bundle to the database."""
self.dbapi.kube_app_bundle_create(bundle_data)
def create_all(self, bundle_bulk_data):
"""Insert a list of bundles to the database."""
self.dbapi.kube_app_bundle_create_all(bundle_bulk_data)
def is_empty(self):
"""Check if the table is empty."""
return self.dbapi.kube_app_bundle_is_empty()
def get_all(self):
"""Get a list containing all bundles."""
return self.dbapi.kube_app_bundle_get_all()
def get_by_name(self, app_name):
"""Get a list of bundles by their name."""
return self.dbapi.kube_app_bundle_get_by_name(app_name)
def destroy_all(self):
"""Prune all bundle metadata."""
self.dbapi.kube_app_bundle_destroy_all()
def destroy_by_file_path(self, file_path):
"""Delete bundle with a given file path."""
self.dbapi.kube_app_bundle_destroy_by_file_path(file_path)
class ConductorManager(service.PeriodicService):
"""Sysinv Conductor service main class."""
@ -272,12 +332,17 @@ class ConductorManager(service.PeriodicService):
endpoint='http://localhost:{}'.format(constants.CEPH_MGR_PORT))
self._kube = None
self._fernet = None
self._inotify = None
self._openstack = None
self._api_token = None
self._mtc_address = constants.LOCALHOST_HOSTNAME
self._mtc_port = 2112
# Store and track available application bundles
self._kube_app_bundle_storage = None
self._cached_app_bundle_set = set()
# Timeouts for adding & removing operations
self._pv_op_timeouts = {}
self._stor_bck_op_timeouts = {}
@ -358,6 +423,7 @@ class ConductorManager(service.PeriodicService):
self.fm_log = fm.FmCustomerLog()
self.host_uuid = self._get_active_controller_uuid()
self._kube_app_bundle_storage = KubeAppBundleStorageFactory.createKubeAppBundleStorage()
self._openstack = openstack.OpenStackOperator(self.dbapi)
self._puppet = puppet.PuppetOperator(self.dbapi)
@ -397,12 +463,36 @@ class ConductorManager(service.PeriodicService):
# Runtime config tasks
self._prune_runtime_config_table()
# Populate/update app bundle table as needed
if self._kube_app_bundle_storage.is_empty():
self._populate_app_bundle_metadata()
else:
self._update_cached_app_bundles_set()
self._update_app_bundles_storage()
# Initialize inotify and launch thread to monitor
# changes to the ostree root folder
self._initialize_ostree_inotify()
greenthread.spawn(self._monitor_ostree_root_folder)
LOG.info("sysinv-conductor start committed system=%s" %
system.as_dict())
# Save our start time for time limited init actions
self._start_time = timeutils.utcnow()
def _initialize_ostree_inotify(self):
""" Initialize inotify to watch for changes under the ostree root
folder.
Created or removed files under that folder suggest that a patch
was applied and a new ostree commit was deployed.
"""
self._inotify = INotify()
watch_flags = flags.CREATE | flags.DELETE
self._inotify.add_watch(constants.OSTREE_ROOT_FOLDER, watch_flags)
def _get_active_controller_uuid(self):
ahost = utils.HostHelper.get_active_controller(self.dbapi)
if ahost:
@ -7211,67 +7301,53 @@ class ConductorManager(service.PeriodicService):
self._inner_sync_auto_apply(context, app_name)
@staticmethod
def check_app_k8s_auto_update(app_name, tarball):
""" Check whether an application should be automatically updated
based on its Kubernetes upgrade metadata fields.
def _get_app_bundle_for_update(self, app):
""" Retrieve metadata from the most updated application bundle
that can be used to update the given app.
:param tarball: tarball object of the application to be checked
:param app: The application to be updated
:return The bundle metadata from the new version of the app
"""
minimum_supported_k8s_version = tarball.metadata.get(
constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get(
constants.APP_METADATA_MINIMUM, None)
bundle_metadata_list = self._kube_app_bundle_storage.get_by_name(app.name)
if minimum_supported_k8s_version is None:
# TODO: Turn this into an error message rather than a warning
# when the k8s app upgrade implementation is in place. Also,
# return False in this scenario.
LOG.warning("Minimum supported Kubernetes version missing from "
"{} metadata".format(app_name))
else:
LOG.debug("minimum_supported_k8s_version for {}: {}"
.format(app_name, minimum_supported_k8s_version))
latest_version_bundle = None
k8s_version = self._kube.kube_get_kubernetes_version().strip().lstrip('v')
for bundle_metadata in bundle_metadata_list:
if LooseVersion(bundle_metadata.version) <= LooseVersion(app.app_version):
LOG.debug("Bundle {} version {} lower than installed app version ({})"
.format(bundle_metadata.file_path,
bundle_metadata.version,
app.app_version))
elif not bundle_metadata.auto_update:
LOG.debug("Application auto update disabled for bundle {}"
.format(bundle_metadata.file_path))
elif not bundle_metadata.k8s_auto_update:
LOG.debug("Kubernetes application auto update disabled for bundle {}"
.format(bundle_metadata.file_path))
elif LooseVersion(k8s_version) < LooseVersion(bundle_metadata.k8s_minimum_version):
LOG.debug("Kubernetes version {} is lower than {} which is "
"the minimum required for bundle {}"
.format(k8s_version,
bundle_metadata.k8s_minimum_version,
bundle_metadata.file_path))
elif ((bundle_metadata.k8s_maximum_version is not None) and (LooseVersion(k8s_version) >
LooseVersion(bundle_metadata.k8s_maximum_version))):
LOG.debug("Kubernetes version {} is higher than {} which is "
"the maximum allowed for bundle {}"
.format(k8s_version,
bundle_metadata.k8s_maximum_version,
bundle_metadata.file_path))
elif ((latest_version_bundle is None) or
(LooseVersion(bundle_metadata.version) >
LooseVersion(latest_version_bundle.version))):
# Only set the chosen bundle if it was not set before or if the version
# of the current one is higher than the one previously set.
latest_version_bundle = bundle_metadata
maximum_supported_k8s_version = tarball.metadata.get(
constants.APP_METADATA_SUPPORTED_K8S_VERSION, {}).get(
constants.APP_METADATA_MAXIMUM, None)
return latest_version_bundle
if maximum_supported_k8s_version:
LOG.debug("maximum_supported_k8s_version for {}: {}"
.format(app_name, maximum_supported_k8s_version))
k8s_upgrades = tarball.metadata.get(
constants.APP_METADATA_K8S_UPGRADES, None)
if k8s_upgrades is None:
k8s_auto_update = constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE
k8s_update_timing = constants.APP_METADATA_TIMING_DEFAULT_VALUE
LOG.warning("k8s_upgrades section missing from {} metadata"
.format(app_name))
else:
k8s_auto_update = tarball.metadata.get(
constants.APP_METADATA_K8S_UPGRADES).get(
constants.APP_METADATA_AUTO_UPDATE,
constants.APP_METADATA_K8S_AUTO_UPDATE_DEFAULT_VALUE)
k8s_update_timing = tarball.metadata.get(
constants.APP_METADATA_K8S_UPGRADES).get(
constants.APP_METADATA_TIMING,
constants.APP_METADATA_TIMING_DEFAULT_VALUE)
# TODO: check if the application meets the criteria to be updated
# according to the 'supported_k8s_version' and 'k8s_upgrades'
# metadata sections. This initial implementation is only intended to
# set the default values for each entry.
LOG.debug("k8s_auto_update value for {}: {}"
.format(app_name, k8s_auto_update))
LOG.debug("k8s_update_timing value for {}: {}"
.format(app_name, k8s_update_timing))
return True
def _auto_update_app(self, context, app_name, managed_app):
def _auto_update_app(self, context, app_name):
"""Auto update applications"""
try:
app = kubeapp_obj.get_by_name(context, app_name)
@ -7314,19 +7390,15 @@ class ConductorManager(service.PeriodicService):
LOG.debug("Application %s: Checking "
"for update ..." % app_name)
tarfile = self._search_tarfile(app_name, managed_app=managed_app)
if tarfile is None:
# Skip if no tarball or multiple tarballs found
return
applied_app = '{}-{}'.format(app.name, app.app_version)
if applied_app in tarfile:
# Skip if the tarfile version is already applied
app_bundle = self._get_app_bundle_for_update(app)
if app_bundle is None:
# Skip if no bundles are found
LOG.debug("No bundle found for updating %s" % app_name)
return
LOG.info("Found new tarfile version for %s: %s"
% (app.name, tarfile))
tarball = self._check_tarfile(app_name, tarfile,
% (app.name, app_bundle.file_path))
tarball = self._check_tarfile(app_name, app_bundle.file_path,
preserve_metadata=True)
if ((tarball.app_name is None) or
(tarball.app_version is None) or
@ -7335,18 +7407,7 @@ class ConductorManager(service.PeriodicService):
# Skip if tarball check fails
return
if not tarball.metadata:
# Skip if app doesn't have metadata
return
auto_update = tarball.metadata.get(
constants.APP_METADATA_UPGRADES, {}).get(
constants.APP_METADATA_AUTO_UPDATE, False)
if not bool(strtobool(str(auto_update))):
# Skip if app is not set to auto_update
return
if tarball.app_version in \
if app_bundle.version in \
app.app_metadata.get(
constants.APP_METADATA_UPGRADES, {}).get(
constants.APP_METADATA_FAILED_VERSIONS, []):
@ -7357,11 +7418,6 @@ class ConductorManager(service.PeriodicService):
% (app.name, tarball.app_version, app.app_version))
return
# Check if the update should proceed based on the application's
# Kubernetes metadata
if not ConductorManager.check_app_k8s_auto_update(app_name, tarball):
return
self._inner_sync_auto_update(context, app, tarball)
@cutils.synchronized(LOCK_APP_AUTO_MANAGE)
@ -7541,14 +7597,14 @@ class ConductorManager(service.PeriodicService):
""" Load metadata of apps from the directory containing
apps bundled with the iso.
"""
for tarfile in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH):
for app_bundle in os.listdir(constants.HELM_APP_ISO_INSTALL_PATH):
# Get the app name from the tarball name
# If the app has the metadata loaded already, by conductor restart,
# then skip the tarball extraction
app_name = None
pattern = re.compile("^(.*)-([0-9]+\.[0-9]+-[0-9]+)")
match = pattern.search(tarfile)
match = pattern.search(app_bundle)
if match:
app_name = match.group(1)
@ -7560,7 +7616,7 @@ class ConductorManager(service.PeriodicService):
# Proceed with extracting the tarball
tarball_name = '{}/{}'.format(
constants.HELM_APP_ISO_INSTALL_PATH, tarfile)
constants.HELM_APP_ISO_INSTALL_PATH, app_bundle)
with cutils.TempDirectory() as app_path:
if not cutils.extract_tarfile(app_path, tarball_name):
@ -7733,6 +7789,90 @@ class ConductorManager(service.PeriodicService):
# No need to detect again until conductor restart
self._do_detect_swact = False
def _populate_app_bundle_metadata(self):
"""Read metadata of all application bundles and store in the database"""
bundle_list = []
for file_path in glob.glob("{}/*.tgz".format(constants.HELM_APP_ISO_INSTALL_PATH)):
bundle_data = app_metadata.extract_bundle_metadata(file_path)
if bundle_data:
bundle_list.append(bundle_data)
self._kube_app_bundle_storage.create_all(bundle_list)
self._update_cached_app_bundles_set()
def _add_app_bundle(self, full_bundle_path):
"""Add a new application bundle record"""
bundle_data = app_metadata.extract_bundle_metadata(full_bundle_path)
if bundle_data:
LOG.info("New application bundle available: {}".format(full_bundle_path))
try:
self._kube_app_bundle_storage.create(bundle_data)
except exception.KubeAppBundleAlreadyExists as e:
LOG.exception(e)
except Exception as e:
LOG.exception("Error while storing bundle data for {}: {}"
.format(full_bundle_path, e))
def _remove_app_bundle(self, full_bundle_path):
"""Remove application bundle record"""
LOG.info("Application bundle deleted: {}".format(full_bundle_path))
try:
self._kube_app_bundle_storage.destroy_by_file_path(full_bundle_path)
except Exception as e:
LOG.error("Error while removing bundle data for {}: {}"
.format(full_bundle_path, e))
def _update_cached_app_bundles_set(self):
"""Update internal cache of application bundles"""
self._cached_app_bundle_set = set(bundle.file_path for bundle in
self._kube_app_bundle_storage.get_all())
def _update_app_bundles_storage(self):
"""Update application bundle storage to account for new and removed files"""
filesystem_app_bundle_set = set(glob.glob("{}/*.tgz"
.format(constants.HELM_APP_ISO_INSTALL_PATH)))
if filesystem_app_bundle_set != self._cached_app_bundle_set:
new_files = set(file_path for file_path in filesystem_app_bundle_set
if file_path not in self._cached_app_bundle_set)
# Add new files to the database
for file_path in new_files:
self._add_app_bundle(file_path)
# Delete removed files from the database
for file_path in self._cached_app_bundle_set:
if file_path not in filesystem_app_bundle_set:
self._remove_app_bundle(file_path)
# Update internal bundle set to reflect the storage
self._update_cached_app_bundles_set()
def _monitor_ostree_root_folder(self):
"""Update application bundle storage to account for new and removed files"""
if self._inotify is None:
LOG.error("Inotify has not been initialized.")
return
while True:
for event in self._inotify.read(timeout=0):
event_types = [f.name for f in flags.from_mask(event.mask)]
LOG.debug("Event {}. Event types: {}".format(event, event_types))
# If the "lock" file was deleted inside the ostree root it means
# that a new ostree has finished to be deployed. Therefore we may
# need to update the list of available application bundles.
if constants.INOTIFY_DELETE_EVENT in event_types and \
event.name == constants.OSTREE_LOCK_FILE:
self._update_app_bundles_storage()
time.sleep(1)
@periodic_task.periodic_task(spacing=CONF.conductor_periodic_task_intervals.k8s_application,
run_immediately=True)
def _k8s_application_audit(self, context):
@ -7839,7 +7979,7 @@ class ConductorManager(service.PeriodicService):
self._auto_recover_managed_app(context, app_name)
elif status == constants.APP_APPLY_SUCCESS:
self.check_pending_app_reapply(context)
self._auto_update_app(context, app_name, managed_app=True)
self._auto_update_app(context, app_name)
# Special case, we want to apply some logic to non-managed applications
for app_name in self.apps_metadata[constants.APP_METADATA_APPS].keys():
@ -7859,7 +7999,7 @@ class ConductorManager(service.PeriodicService):
# Automatically update non-managed applications
if status == constants.APP_APPLY_SUCCESS:
self.check_pending_app_reapply(context)
self._auto_update_app(context, app_name, managed_app=False)
self._auto_update_app(context, app_name)
LOG.debug("Periodic Task: _k8s_application_audit: Finished")

View File

@ -558,6 +558,8 @@ class StorageTierDependentTCs(base.FunctionalTest):
self.set_is_initial_config_patcher.return_value = True
self.service = manager.ConductorManager('test-host', 'test-topic')
self.service.dbapi = dbapi.get_instance()
self.service._populate_app_bundle_metadata = mock.Mock()
self.service._initialize_ostree_inotify = mock.Mock()
self.context = context.get_admin_context()
self.dbapi = dbapi.get_instance()
self.system = dbutils.create_test_isystem()

View File

@ -62,6 +62,8 @@ class UpdateCephCluster(base.DbTestCase):
self.mock_fix_crushmap.return_value = True
self.service._sx_to_dx_post_migration_actions = mock.Mock()
self.service._populate_app_bundle_metadata = mock.Mock()
self.service._initialize_ostree_inotify = mock.Mock()
def tearDown(self):
super(UpdateCephCluster, self).tearDown()

View File

@ -482,6 +482,8 @@ class ManagerTestCase(base.DbTestCase):
self.service._update_pxe_config = mock.Mock()
self.service._ceph_mon_create = mock.Mock()
self.service._sx_to_dx_post_migration_actions = mock.Mock()
self.service._populate_app_bundle_metadata = mock.Mock()
self.service._initialize_ostree_inotify = mock.Mock()
self.alarm_raised = False
self.kernel_alarms = {}