Application update support

A new command "system application-update" is introduced in this
commit to support updating an applied application to a new version
with a new versioned app tarfile.

The application update leverages the existing application upload
workflow to first validating/uploading the new app tarfile, then
invokes Armada apply or rollback to deploy the charts for the new
versioned application. If the version has ever applied before,
Armada rollback will be performed, otherwise, Armada apply will be
performed.

After apply/rollback to the new version is done, the files for the
old application version will be cleaned up as well as the releases
which are not in the new application version. Once the update is
completed successfully, the status will be set to "applied" so that
user can continue applying app with user overrides.

If there has any failure during updating, application recover will be
triggered to recover the app to the old version. If application recover
fails, the application status will be populated to "apply-failed" so
that user can re-apply app.

In order to use Armada rollback, a new sysinv table "kube_app_releases"
is created to record deployed helm releases versions. After each app
apply, if any helm release version changed, the corresponding release
needs to be updated in sysinv db as well.

The application overrides have been changed to tie to a specific
application in commit https://review.opendev.org/#/c/660498/. Therefore,
the user overrides is preserved when updating.

Note: On the AIO-SX, always use Armada apply even it was applied issue
      on AIO-SX(replicas is 1) to leverage rollback, Armada/helm
      rollback --wait does not wait for pods to be ready before it
      returns.
      Related helm issue,
      https://github.com/helm/helm/issues/4210
      https://github.com/helm/helm/issues/2006

Tests conducted(AIO-SX, DX, Standard):
  - functional tests (both stx-openstack and simple custom app)
    - upload stx-openstack-1.0-13-centos-stable-latest tarfile
      which uses latest docker images
    - apply stx-openstack
    - update to stx-openstack-1.0-13-centos-stable-versioned
      which uses versioned docker images
    - update back to stx-openstack-1.0-13-centos-stable-latest
    - update to a version that has less/more charts compared to
      the old version
    - remove stx-openstack
    - delete stx-openstack
  - failure tests
    - application-update rejected
      (app not found, update to a same version,
       operation not permitted etc...)
    - application-update fails that trigger recover
      - upload failure
        ie. invalid tarfile, manifest file validation failed ...
      - apply/rollback failure
        ie. download images failure, Armada apply/rollback fails

Change-Id: I4e094427e673639e2bdafd8c476b897b7b4327a3
Story: 2005350
Task: 33568
Signed-off-by: Angie Wang <angie.wang@windriver.com>
This commit is contained in:
Angie Wang 2019-05-22 10:59:21 -04:00
parent 54cda51f68
commit 28d069bccc
23 changed files with 1227 additions and 234 deletions

View File

@ -278,6 +278,7 @@ data:
images:
tags:
prometheus_rabbitmq_exporter_helm_tests: docker.io/starlingx/stx-heat:master-centos-stable-latest
rabbitmq_init: docker.io/starlingx/stx-heat:master-centos-stable-latest
pod:
affinity:
anti:
@ -787,36 +788,7 @@ data:
delete:
- type: job
labels:
application: nova
component: db-init
- type: job
labels:
application: nova
component: db-sync
- type: job
labels:
application: nova
component: ks-user
- type: job
labels:
application: nova
component: ks-service
- type: job
labels:
application: placement
component: ks-user
- type: job
labels:
application: placement
component: ks-service
- type: job
labels:
application: placement
component: ks-endpoints
- type: job
labels:
application: nova
component: cell-setup
release_group: osh-openstack-nova
values:
manifests:
job_ks_endpoints: false
@ -1082,24 +1054,7 @@ data:
delete:
- type: job
labels:
application: neutron
component: db-init
- type: job
labels:
application: neutron
component: db-sync
- type: job
labels:
application: neutron
component: ks-user
- type: job
labels:
application: neutron
component: ks-service
- type: job
labels:
application: neutron
component: ks-endpoints
release_group: osh-openstack-neutron
values:
pod:
replicas:

View File

@ -1,2 +1,2 @@
SRC_DIR="cgts-client"
TIS_PATCH_VER=66
TIS_PATCH_VER=67

View File

@ -53,6 +53,14 @@ class AppManager(base.Manager):
return self._update(self._path(app_name) + '?directive=apply',
{'values': data})
def update(self, data):
"""Upgrade/rollback the deployed application to a different version.
:param data: location of tarfile, optional application name and version
"""
resp, body = self.api.json_request('POST', self._path() + "/update", body=data)
return self.resource_class(self, body)
def remove(self, app_name):
"""Uninstall the specified application

View File

@ -42,6 +42,28 @@ def _is_url(url_str):
return False
def _application_check(args):
tarfile = args.tarfile
if not _is_url(tarfile):
if not os.path.isabs(tarfile):
tarfile = os.path.join(os.getcwd(), tarfile)
if not os.path.isfile(tarfile):
raise exc.CommandError("Error: Tar file %s does not exist" % tarfile)
if not tarfile.endswith('.tgz') and not tarfile.endswith('.tar.gz'):
raise exc.CommandError("Error: File %s has unrecognizable tar file "
"extension. Supported extensions are: .tgz "
"and .tar.gz" % tarfile)
data = {'tarfile': tarfile}
if args.app_name:
data.update({'name': args.app_name})
if args.app_version:
data.update({'app_version': args.app_version})
return data
def do_application_list(cc, args):
"""List all containerized applications"""
apps = cc.app.list()
@ -62,7 +84,7 @@ def do_application_show(cc, args):
@utils.arg('tarfile', metavar='<tar file>',
help='Tarball containing application manifest, helm charts and'
help='Tarball containing application manifest, Helm charts and'
' config file')
@utils.arg('-n', '--app-name',
metavar='<app name>',
@ -72,30 +94,29 @@ def do_application_show(cc, args):
help='Version of the application')
def do_application_upload(cc, args):
"""Upload application Helm chart(s) and manifest"""
tarfile = args.tarfile
if not _is_url(tarfile):
if not os.path.isabs(tarfile):
tarfile = os.path.join(os.getcwd(), tarfile)
if not os.path.isfile(tarfile):
raise exc.CommandError("Error: Tar file %s does not exist" % tarfile)
if not tarfile.endswith('.tgz') and not tarfile.endswith('.tar.gz'):
raise exc.CommandError("Error: File %s has unrecognizable tar file "
"extension. Supported extensions are: .tgz "
"and .tar.gz" % tarfile)
data = {'tarfile': tarfile}
if args.app_name:
data.update({'name': args.app_name})
if args.app_version:
data.update({'app_version': args.app_version})
data = _application_check(args)
response = cc.app.upload(data)
_print_application_show(response)
_print_reminder_msg(response.name)
@utils.arg('tarfile', metavar='<tar file>',
help='Tarball containing application manifest, Helm charts and'
' config file')
@utils.arg('-n', '--app-name',
metavar='<app name>',
help='Name of the application')
@utils.arg('-v', '--app-version',
metavar='<app version>',
help='Version of the application')
def do_application_update(cc, args):
"""Update the deployed application to a different version"""
data = _application_check(args)
response = cc.app.update(data)
_print_application_show(response)
_print_reminder_msg(response.name)
@utils.arg('name', metavar='<app name>',
help='Name of the application')
@utils.arg('-m', '--mode',

View File

@ -119,6 +119,10 @@ LOCK_NAME = 'KubeAppController'
class KubeAppController(rest.RestController):
"""REST controller for Helm applications."""
_custom_actions = {
'update': ['POST'],
}
def __init__(self, parent=None, **kwargs):
self._parent = parent
@ -126,12 +130,25 @@ class KubeAppController(rest.RestController):
if not utils.is_kubernetes_config():
raise exception.OperationNotPermitted
def _check_tarfile(self, app_tarfile, app_name, app_version):
def _check_tarfile(self, app_tarfile, app_name, app_version, operation):
def _handle_upload_failure(reason):
raise wsme.exc.ClientSideError(_(
"Application-upload rejected: " + reason))
"Application-{} rejected: ".format(operation) + reason))
if app_tarfile:
if cutils.is_url(app_tarfile):
# For tarfile that is downloaded remotely, defer the checksum, manifest
# and tarfile content validations to sysinv-conductor as download can
# take some time depending on network traffic, target server and file
# size.
if not app_name:
app_name = constants.APP_NAME_PLACEHOLDER
if not app_version:
app_version = constants.APP_VERSION_PLACEHOLDER
mname = constants.APP_MANIFEST_NAME_PLACEHOLDER
mfile = constants.APP_TARFILE_NAME_PLACEHOLDER
return app_name, app_version, mname, mfile
if not os.path.isfile(app_tarfile):
_handle_upload_failure(
"application tar file {} does not exist.".format(app_tarfile))
@ -157,14 +174,12 @@ class KubeAppController(rest.RestController):
mname, mfile = app_helper._find_manifest_file(app_path)
app_helper._extract_helm_charts(app_path)
LOG.info("Tar file of application %s verified." % name)
return name, version, mname, mfile
except exception.SysinvException as e:
_handle_upload_failure(str(e))
return name, version, mname, mfile
else:
raise ValueError(_(
"Application-upload rejected: tar file must be specified."))
"Application-{} rejected: tar file must be specified.".format(operation)))
def _get_one(self, app_name):
# can result in KubeAppNotFound
@ -194,20 +209,8 @@ class KubeAppController(rest.RestController):
tarfile = body.get('tarfile')
name = body.get('name', '')
version = body.get('app_version', '')
if not cutils.is_url(tarfile):
name, version, mname, mfile = self._check_tarfile(tarfile, name, version)
else:
# For tarfile that is downloaded remotely, defer the checksum, manifest
# and tarfile content validations to sysinv-conductor as download can
# take some time depending on network traffic, target server and file
# size.
mname = constants.APP_MANIFEST_NAME_PLACEHOLDER
mfile = constants.APP_TARFILE_NAME_PLACEHOLDER
if not name:
name = constants.APP_NAME_PLACEHOLDER
if not version:
version = constants.APP_VERSION_PLACEHOLDER
name, version, mname, mfile = self._check_tarfile(tarfile, name, version,
constants.APP_UPLOAD_OP)
try:
objects.kube_app.get_by_name(pecan.request.context, name)
@ -302,6 +305,91 @@ class KubeAppController(rest.RestController):
db_app)
return KubeApp.convert_with_links(db_app)
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(KubeApp, body=types.apidict)
def update(self, body):
"""Update the applied application to a different version"""
self._check_environment()
tarfile = body.get('tarfile')
name = body.get('name', '')
version = body.get('app_version', '')
name, version, mname, mfile = self._check_tarfile(tarfile, name, version,
constants.APP_UPDATE_OP)
try:
applied_app = objects.kube_app.get_by_name(pecan.request.context, name)
except exception.KubeAppNotFound:
LOG.error("Received a request to update app %s which does not exist." %
name)
raise wsme.exc.ClientSideError(_(
"Application-update rejected: application not found."))
if applied_app.status == constants.APP_UPDATE_IN_PROGRESS:
raise wsme.exc.ClientSideError(_(
"Application-update rejected: update is already "
"in progress."))
elif applied_app.status != constants.APP_APPLY_SUCCESS:
raise wsme.exc.ClientSideError(_(
"Application-update rejected: operation is not allowed "
"while the current status is {}.".format(applied_app.status)))
if applied_app.app_version == version:
raise wsme.exc.ClientSideError(_(
"Application-update rejected: the version %s is already "
"applied." % version))
# Set the status for the current applied app to inactive
applied_app.status = constants.APP_INACTIVE_STATE
applied_app.progress = None
applied_app.save()
# If the version has ever applied before(inactive app found),
# use armada rollback to apply application later, otherwise,
# use armada apply.
# On the AIO-SX, always use armada apply even it was applied
# before, issue on AIO-SX(replicas is 1) to leverage rollback,
# armada/helm rollback --wait does not wait for pods to be
# ready before it returns.
# related to helm issue,
# https://github.com/helm/helm/issues/4210
# https://github.com/helm/helm/issues/2006
try:
target_app = objects.kube_app.get_inactive_app_by_name_version(
pecan.request.context, name, version)
target_app.status = constants.APP_UPDATE_IN_PROGRESS
target_app.save()
if utils.is_aio_simplex_system(pecan.request.dbapi):
operation = constants.APP_APPLY_OP
else:
operation = constants.APP_ROLLBACK_OP
except exception.KubeAppInactiveNotFound:
target_app_data = {
'name': name,
'app_version': version,
'manifest_name': mname,
'manifest_file': os.path.basename(mfile),
'status': constants.APP_UPDATE_IN_PROGRESS,
'active': True
}
operation = constants.APP_APPLY_OP
try:
target_app = pecan.request.dbapi.kube_app_create(target_app_data)
except exception.KubeAppAlreadyExists as e:
applied_app.status = constants.APP_APPLY_SUCCESS
applied_app.progress = constants.APP_PROGRESS_COMPLETED
applied_app.save()
LOG.exception(e)
raise wsme.exc.ClientSideError(_(
"Application-update failed: Unable to start application update, "
"application info update failed."))
pecan.request.rpcapi.perform_app_update(pecan.request.context,
applied_app, target_app,
tarfile, operation)
return KubeApp.convert_with_links(target_app)
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(None, wtypes.text, status_code=204)
def delete(self, name):
@ -316,7 +404,14 @@ class KubeAppController(rest.RestController):
except exception.KubeAppNotFound:
LOG.error("Received a request to delete app %s which does not "
"exist." % name)
raise
raise wsme.exc.ClientSideError(_(
"Application-delete rejected: application not found."))
if db_app.status not in [constants.APP_UPLOAD_SUCCESS,
constants.APP_UPLOAD_FAILURE]:
raise wsme.exc.ClientSideError(_(
"Application-delete rejected: operation is not allowed "
"while the current status is {}.".format(db_app.status)))
response = pecan.request.rpcapi.perform_app_delete(
pecan.request.context, db_app)

View File

@ -1513,12 +1513,17 @@ APP_APPLY_SUCCESS = 'applied'
APP_APPLY_FAILURE = 'apply-failed'
APP_REMOVE_IN_PROGRESS = 'removing'
APP_REMOVE_FAILURE = 'remove-failed'
APP_INACTIVE_STATE = 'inactive'
APP_UPDATE_IN_PROGRESS = 'updating'
APP_RECOVER_IN_PROGRESS = 'recovering'
# Operation constants
APP_UPLOAD_OP = 'upload'
APP_APPLY_OP = 'apply'
APP_REMOVE_OP = 'remove'
APP_DELETE_OP = 'delete'
APP_UPDATE_OP = 'update'
APP_ROLLBACK_OP = 'rollback'
# Progress constants
APP_PROGRESS_ABORTED = 'operation aborted, check logs for detail'
@ -1531,6 +1536,14 @@ APP_PROGRESS_GENERATE_OVERRIDES = 'generating application overrides'
APP_PROGRESS_TARFILE_DOWNLOAD = 'downloading tarfile'
APP_PROGRESS_VALIDATE_UPLOAD_CHARTS = 'validating and uploading charts'
APP_PROGRESS_DEPS_PLATFORM_APP = "%s is required and is not applied" % HELM_APP_PLATFORM
APP_PROGRESS_ROLLBACK_RELEASES = 'rolling back application releases'
APP_PROGRESS_UPDATE_ABORTED = 'application update from version {} to version {} aborted. '
APP_PROGRESS_UPDATE_COMPLETED = 'application update from version {} to version {} completed.'
APP_PROGRESS_RECOVER_ABORTED = 'application recover to version {} aborted. '
APP_PROGRESS_RECOVER_COMPLETED = 'application recover to version {} completed. '
APP_PROGRESS_CLEANUP_FAILED = 'application files/helm release cleanup for version {} failed.'
APP_PROGRESS_RECOVER_IN_PROGRESS = 'recovering version {} '
APP_PROGRESS_RECOVER_CHARTS = 'recovering helm charts'
# Node label operation constants
LABEL_ASSIGN_OP = 'assign'

View File

@ -244,6 +244,10 @@ class KubeAppDeleteFailure(SysinvException):
message = _("Delete of application %(name)s (%(version)s) failed: %(reason)s")
class HelmTillerFailure(SysinvException):
message = _("Helm operation failure: %(reason)s")
class InvalidCPUInfo(Invalid):
message = _("Unacceptable CPU info") + ": %(reason)s"
@ -560,7 +564,12 @@ class HelmOverrideAlreadyExists(Conflict):
class KubeAppAlreadyExists(Conflict):
message = _("An application with name %(name)s already exists.")
message = _("An application with name %(name)s %(version)s already exists.")
class KubeAppChartReleaseAlreadyExists(Conflict):
message = _("A chart release with name %(name)s and namespace "
"%(namespace)s for application %(app_id)s already exists.")
class InstanceDeployFailure(Invalid):
@ -895,6 +904,19 @@ class KubeAppNotFound(NotFound):
message = _("No application with name %(name)s.")
class KubeAppInactiveNotFound(NotFound):
message = _("No inactive application with name %(name)s and version %(version)s")
class KubeAppChartReleaseNotFound(NotFound):
message = _("No chart release with name %(name)s and "
"namespace %(namespace)s for application %(app_id)s")
class KubeAppReleasesNotFound(NotFound):
message = _("No releases found for application %(app_id)s")
class DockerRegistryCredentialNotFound(NotFound):
message = _("Credentials to access local docker registry "
"for user %(name)s could not be found.")

View File

@ -29,23 +29,32 @@ class KubeOperator(object):
def __init__(self, dbapi):
self._dbapi = dbapi
self._kube_client = None
self._kube_client_batch = None
self._kube_client_core = None
def _get_kubernetesclient(self):
if not self._kube_client:
config.load_kube_config('/etc/kubernetes/admin.conf')
def _load_kube_config(self):
config.load_kube_config('/etc/kubernetes/admin.conf')
# Workaround: Turn off SSL/TLS verification
c = Configuration()
c.verify_ssl = False
Configuration.set_default(c)
# Workaround: Turn off SSL/TLS verification
c = Configuration()
c.verify_ssl = False
Configuration.set_default(c)
self._kube_client = client.CoreV1Api()
return self._kube_client
def _get_kubernetesclient_batch(self):
if not self._kube_client_batch:
self._load_kube_config()
self._kube_client_batch = client.BatchV1Api()
return self._kube_client_batch
def _get_kubernetesclient_core(self):
if not self._kube_client_core:
self._load_kube_config()
self._kube_client_core = client.CoreV1Api()
return self._kube_client_core
def kube_patch_node(self, name, body):
try:
api_response = self._get_kubernetesclient().patch_node(name, body)
api_response = self._get_kubernetesclient_core().patch_node(name, body)
LOG.debug("Response: %s" % api_response)
except ApiException as e:
if e.status == httplib.UNPROCESSABLE_ENTITY:
@ -61,7 +70,7 @@ class KubeOperator(object):
def kube_get_nodes(self):
try:
api_response = self._get_kubernetesclient().list_node()
api_response = self._get_kubernetesclient_core().list_node()
LOG.debug("Response: %s" % api_response)
return api_response.items
except Exception as e:
@ -71,7 +80,7 @@ class KubeOperator(object):
def kube_create_namespace(self, namespace):
body = {'metadata': {'name': namespace}}
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.create_namespace(body)
except ApiException as e:
@ -87,7 +96,7 @@ class KubeOperator(object):
raise
def kube_get_namespace(self, namespace):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.read_namespace(namespace)
return True
@ -103,7 +112,7 @@ class KubeOperator(object):
raise
def kube_get_secret(self, name, namespace):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.read_namespaced_secret(name, namespace)
return True
@ -119,7 +128,7 @@ class KubeOperator(object):
raise
def kube_create_secret(self, namespace, body):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.create_namespaced_secret(namespace, body)
except Exception as e:
@ -128,7 +137,7 @@ class KubeOperator(object):
raise
def kube_copy_secret(self, name, src_namespace, dst_namespace):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
body = c.read_namespaced_secret(name, src_namespace, export=True)
body.metadata.namespace = dst_namespace
@ -139,7 +148,7 @@ class KubeOperator(object):
raise
def kube_delete_persistent_volume_claim(self, namespace, **kwargs):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.delete_collection_namespaced_persistent_volume_claim(
namespace, **kwargs)
@ -154,7 +163,7 @@ class KubeOperator(object):
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.delete_namespaced_secret(name, namespace, body)
except ApiException as e:
@ -175,7 +184,7 @@ class KubeOperator(object):
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.delete_namespace(namespace, body)
except ApiException as e:
@ -190,7 +199,7 @@ class KubeOperator(object):
raise
def kube_get_config_map(self, name, namespace):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.read_namespaced_config_map(name, namespace)
return True
@ -206,7 +215,7 @@ class KubeOperator(object):
raise
def kube_create_config_map(self, namespace, body):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.create_namespaced_config_map(namespace, body)
except Exception as e:
@ -215,7 +224,7 @@ class KubeOperator(object):
raise
def kube_copy_config_map(self, name, src_namespace, dst_namespace):
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
body = c.read_namespaced_config_map(name, src_namespace, export=True)
body.metadata.namespace = dst_namespace
@ -231,7 +240,7 @@ class KubeOperator(object):
if kwargs:
body.update(kwargs)
c = self._get_kubernetesclient()
c = self._get_kubernetesclient_core()
try:
c.delete_namespaced_config_map(name, namespace, body)
except ApiException as e:
@ -245,3 +254,13 @@ class KubeOperator(object):
except Exception as e:
LOG.error("Kubernetes exception in kube_delete_config_map: %s" % e)
raise
def kube_delete_collection_namespaced_job(self, namespace, label):
c = self._get_kubernetesclient_batch()
try:
c.delete_collection_namespaced_job(namespace, label_selector=label)
except Exception as e:
LOG.error("Failed to delete Jobs with label %s under "
"Namespace %s: %s" % (label, namespace, e))
raise

View File

@ -2024,27 +2024,6 @@ def is_default_huge_pages_required(host):
return True
def refresh_helm_repo_information():
"""Refresh the helm chart repository information.
Ensure that the local repository information maintained in key user home
directories are updated. Run this when the conductor is initialized and
after application uploads.
This handles scenarios where an upload occurs on the active controller
followed by a swact. The newly actvated controller needs to make sure that
the local repository cache reflect any changes.
"""
with open(os.devnull, "w") as fnull:
try:
subprocess.check_call(['sudo', '-u', 'wrsroot',
'helm', 'repo', 'update'],
stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
# Just log an error. Don't stop any callers from further execution.
LOG.error("Failed to update helm repo data for user wrsroot.")
def is_inventory_config_complete(dbapi, forihostid):
"""Check if the initial inventory has completed

View File

@ -37,6 +37,7 @@ from sysinv.common import utils as cutils
from sysinv.common.storage_backend_conf import K8RbdProvisioner
from sysinv.helm import common
from sysinv.helm import helm
from sysinv.helm import utils as helm_utils
# Log and config
@ -55,8 +56,10 @@ CONF.register_opts(kube_app_opts)
APPLY_SEARCH_PATTERN = 'Processing Chart,'
ARMADA_CONTAINER_NAME = 'armada_service'
ARMADA_MANIFEST_APPLY_SUCCESS_MSG = 'Done applying manifest'
ARMADA_RELEASE_ROLLBACK_FAILURE_MSG = 'Error while rolling back tiller release'
CONTAINER_ABNORMAL_EXIT_CODE = 137
DELETE_SEARCH_PATTERN = 'Deleting release'
ROLLBACK_SEARCH_PATTERN = 'Helm rollback of release'
INSTALLATION_TIMEOUT = 3600
MAX_DOWNLOAD_THREAD = 5
TARFILE_DOWNLOAD_CONNECTION_TIMEOUT = 60
@ -125,7 +128,7 @@ def get_local_docker_registry_auth():
password=registry_password)
Chart = namedtuple('Chart', 'name namespace location')
Chart = namedtuple('Chart', 'name namespace location release labels sequenced')
class AppOperator(object):
@ -141,20 +144,26 @@ class AppOperator(object):
self._app = kube_app.KubeAppHelper(self._dbapi)
self._lock = threading.Lock()
def _cleanup(self, app):
def _cleanup(self, app, app_dir=True):
"""" Remove application directories and override files """
try:
if os.path.exists(app.overrides_dir):
shutil.rmtree(os.path.dirname(
app.overrides_dir))
shutil.rmtree(app.overrides_dir)
if app_dir:
shutil.rmtree(os.path.dirname(
app.overrides_dir))
if os.path.exists(app.armada_mfile_dir):
shutil.rmtree(os.path.dirname(
app.armada_mfile_dir))
shutil.rmtree(app.armada_mfile_dir)
if app_dir:
shutil.rmtree(os.path.dirname(
app.armada_mfile_dir))
if os.path.exists(app.path):
shutil.rmtree(os.path.dirname(
app.path))
shutil.rmtree(app.path)
if app_dir:
shutil.rmtree(os.path.dirname(
app.path))
except OSError as e:
LOG.error(e)
raise
@ -164,9 +173,6 @@ class AppOperator(object):
if new_status is None:
new_status = app.status
elif (new_status in [constants.APP_UPLOAD_SUCCESS,
constants.APP_APPLY_SUCCESS]):
new_progress = constants.APP_PROGRESS_COMPLETED
with self._lock:
app.update_status(new_status, new_progress)
@ -455,9 +461,9 @@ class AppOperator(object):
# Extract the list of images from the charts and overrides where
# applicable. Save the list to the same location as the armada manifest
# so it can be sync'ed.
app.charts = self._get_list_of_charts(app.armada_mfile_abs)
if app.system_app:
LOG.info("Generating application overrides...")
app.charts = self._get_list_of_charts(app.armada_mfile_abs)
self._helm.generate_helm_application_overrides(
app.overrides_dir, app.name, mode=None, cnamespace=None,
armada_format=True, armada_chart_info=app.charts, combined=True)
@ -593,8 +599,8 @@ class AppOperator(object):
except KeyError:
pass
LOG.info("Application %s will load charts to chart repo %s" % (
app.name, repo))
LOG.info("Application %s (%s) will load charts to chart repo %s" % (
app.name, app.version, repo))
return repo
def _upload_helm_charts(self, app):
@ -617,7 +623,7 @@ class AppOperator(object):
LOG.info("Helm chart %s uploaded" % os.path.basename(chart))
# Make sure any helm repo changes are reflected for the users
cutils.refresh_helm_repo_information()
helm_utils.refresh_helm_repo_information()
except Exception as e:
raise exception.KubeAppUploadFailure(
@ -853,24 +859,116 @@ class AppOperator(object):
raise
def _get_list_of_charts(self, manifest_file):
"""Get the charts information from the manifest file
The following chart data for each chart in the manifest file
are extracted and stored into a namedtuple Chart object:
- chart_name
- namespace
- location
- release
- pre-delete job labels
The method returns a list of namedtuple charts which following
the install order in the manifest chart_groups.
:param manifest_file: the manifest file of the application
:return: a list of namedtuple charts
"""
charts = []
release_prefix = ""
chart_group = {}
chart_groups = []
armada_charts = {}
with open(manifest_file, 'r') as f:
docs = yaml.safe_load_all(f)
for doc in docs:
# iterative docs in the manifest file to get required
# chart information
try:
if "armada/Chart/" in doc['schema']:
charts.append(Chart(
name=doc['data']['chart_name'],
namespace=doc['data']['namespace'],
location=doc['data']['source']['location']))
if "armada/Manifest/" in doc['schema']:
release_prefix = doc['data']['release_prefix']
chart_groups = doc['data']['chart_groups']
elif "armada/ChartGroup/" in doc['schema']:
chart_group.update(
{doc['metadata']['name']: {
'chart_group': doc['data']['chart_group'],
'sequenced': doc.get('data').get('sequenced', False)}})
elif "armada/Chart/" in doc['schema']:
labels = []
delete_resource = \
doc['data'].get('upgrade', {}).get('pre', {}).get('delete', [])
for resource in delete_resource:
if resource.get('type') == 'job':
label = ''
for k, v in resource['labels'].items():
label = k + '=' + v + ',' + label
labels.append(label[:-1])
armada_charts.update(
{doc['metadata']['name']: {
'chart_name': doc['data']['chart_name'],
'namespace': doc['data']['namespace'],
'location': doc['data']['source']['location'],
'release': doc['data']['release'],
'labels': labels}})
LOG.debug("Manifest: Chart: {} Namespace: {} "
"Location: {}".format(
"Location: {} Release: {}".format(
doc['data']['chart_name'],
doc['data']['namespace'],
doc['data']['source']['location']))
doc['data']['source']['location'],
doc['data']['release']))
except KeyError:
pass
# Push Chart to the list that following the order
# in the chart_groups(install list)
for c_group in chart_groups:
for chart in chart_group[c_group]['chart_group']:
charts.append(Chart(
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=chart_group[c_group]['sequenced']))
del armada_charts[chart]
del chart_group[c_group]
# Push Chart to the list that are not referenced
# in the chart_groups (install list)
if chart_group:
for c_group in chart_group:
for chart in chart_group[c_group]['chart_group']:
charts.append(Chart(
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=chart_group[c_group]['sequenced']))
del armada_charts[chart]
if armada_charts:
for chart in armada_charts:
charts.append(Chart(
name=armada_charts[chart]['chart_name'],
namespace=armada_charts[chart]['namespace'],
location=armada_charts[chart]['location'],
release=armada_charts[chart]['release'],
labels=armada_charts[chart]['labels'],
sequenced=False))
# Update each Chart in the list if there has release prefix
# for each release
if release_prefix:
for i, chart in enumerate(charts):
charts[i] = chart._replace(
release=release_prefix + "-" + chart.release)
return charts
def _get_overrides_files(self, overrides_dir, charts, app_name, mode):
@ -919,6 +1017,57 @@ class AppOperator(object):
chart.name,
chart.namespace)
def _update_app_releases_version(self, app_name):
"""Update application helm releases records
This method retrieves the deployed helm releases and updates the
releases records in sysinv db if needed
:param app_name: the name of the application
"""
try:
deployed_releases = helm_utils.retrieve_helm_releases()
app = self._dbapi.kube_app_get(app_name)
app_releases = self._dbapi.kube_app_chart_release_get_all(app.id)
for r in app_releases:
if (r.release in deployed_releases and
r.namespace in deployed_releases[r.release] and
r.version != deployed_releases[r.release][r.namespace]):
self._dbapi.kube_app_chart_release_update(
app.id, r.release, r.namespace,
{'version': deployed_releases[r.release][r.namespace]})
except Exception as e:
LOG.exception(e)
raise exception.SysinvException(_(
"Failed to update/record application %s releases' versions." % str(e)))
def _create_app_releases_version(self, app_name, app_charts):
"""Create application helm releases records
This method creates/initializes the helm releases objects for the application.
:param app_name: the name of the application
:param app_charts: the charts of the application
"""
kube_app = self._dbapi.kube_app_get(app_name)
app_releases = self._dbapi.kube_app_chart_release_get_all(kube_app.id)
if app_releases:
return
for chart in app_charts:
values = {
'release': chart.release,
'version': 0,
'namespace': chart.namespace,
'app_id': kube_app.id
}
try:
self._dbapi.kube_app_chart_release_create(values)
except Exception as e:
LOG.exception(e)
def _make_armada_request_with_monitor(self, app, request, overrides_str=None):
"""Initiate armada request with monitoring
@ -938,17 +1087,21 @@ class AppOperator(object):
inner method is to be replaced with an official API call when
it becomes available.
"""
if pattern == ROLLBACK_SEARCH_PATTERN:
print_chart = '{print $10}'
else:
print_chart = '{print $NF}'
p1 = subprocess.Popen(['docker', 'exec', ARMADA_CONTAINER_NAME,
'grep', pattern, logfile],
stdout=subprocess.PIPE)
p2 = subprocess.Popen(['awk', '{print $NF}'], stdin=p1.stdout,
p2 = subprocess.Popen(['awk', print_chart], stdin=p1.stdout,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p1.stdout.close()
result, err = p2.communicate()
if result:
# Strip out ANSI color code that might be in the text stream
r = re.compile("\x1b\[[0-9;]*m")
result = r.sub('', result)
result = r.sub('', result).replace(',', '')
matches = result.split()
num_chart_processed = len(matches)
last_chart_processed = matches[num_chart_processed - 1]
@ -998,13 +1151,15 @@ class AppOperator(object):
logfile = ARMADA_CONTAINER_LOG_LOCATION + '/' + app.name + '-' + request + '.log'
if request == constants.APP_APPLY_OP:
pattern = APPLY_SEARCH_PATTERN
else:
elif request == constants.APP_DELETE_OP:
pattern = DELETE_SEARCH_PATTERN
else:
pattern = ROLLBACK_SEARCH_PATTERN
monitor = greenthread.spawn_after(1, _check_progress, mqueue, app,
pattern, logfile)
rc = self._docker.make_armada_request(request, app.armada_mfile,
overrides_str, logfile)
overrides_str, app.releases, logfile)
mqueue.put('done')
monitor.kill()
return rc
@ -1066,31 +1221,163 @@ class AppOperator(object):
self._delete_namespace(common.HELM_NS_OPENSTACK)
def _inter_app_dependencies_are_met(self, app):
"""Verify that any required applications are applied.
def _perform_app_recover(self, old_app, new_app, armada_process_required=True):
"""Perform application recover
Some applications may require that another application is already
uploaded and applied in order to correctly function. Verify those
dependencies here.
This recover method is triggered when application update failed, it cleans
up the files/data for the new application and recover helm charts for the
old application. If the armada process is required, armada apply is invoked
to recover the application releases for the old version.
:param app: application object with which to verify dependencies.
The app status will be populated to "apply-failed" if recover fails so that
the user can re-apply app.
:param old_app: the application object that application recovering to
:param new_app: the application object that application recovering from
:param armada_process_required: boolean, whether armada operation is needed
"""
LOG.info("Starting recover Application %s from version: %s to version: %s" %
(old_app.name, new_app.version, old_app.version))
self._update_app_status(
old_app, constants.APP_RECOVER_IN_PROGRESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_IN_PROGRESS.format(old_app.version))
# Set the status for the new app to inactive
self._update_app_status(new_app, constants.APP_INACTIVE_STATE)
try:
self._cleanup(new_app, app_dir=False)
self._app._patch_report_app_dependencies(
new_app.name + '-' + new_app.version)
self._dbapi.kube_app_destroy(new_app.name,
version=new_app.version,
inactive=True)
LOG.info("Recovering helm charts for Application %s (%s)..."
% (old_app.name, old_app.version))
self._update_app_status(old_app,
new_progress=constants.APP_PROGRESS_RECOVER_CHARTS)
with self._lock:
self._upload_helm_charts(old_app)
rc = True
if armada_process_required:
overrides_str = ''
old_app.charts = self._get_list_of_charts(old_app.armada_mfile_abs)
if old_app.system_app:
overrides_files = self._get_overrides_files(old_app.overrides_dir,
old_app.charts,
old_app.name, mode=None)
overrides_str = \
self._generate_armada_overrides_str(old_app.name, old_app.version,
overrides_files)
if self._make_armada_request_with_monitor(old_app,
constants.APP_APPLY_OP,
overrides_str):
old_app_charts = [c.release for c in old_app.charts]
deployed_releases = helm_utils.retrieve_helm_releases()
for new_chart in new_app.charts:
if (new_chart.release not in old_app_charts and
new_chart.release in deployed_releases):
# Cleanup the releases in the new application version
# but are not in the old application version
helm_utils.delete_helm_release(new_chart.release)
else:
rc = False
except Exception as e:
# ie. patch report error, cleanup application files error
# helm release delete failure
self._update_app_status(
old_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_COMPLETED.format(old_app.version) +
constants.APP_PROGRESS_CLEANUP_FAILED.format(new_app.version) +
'please check logs for detail.')
LOG.error(e)
return
if rc:
self._update_app_status(
old_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_COMPLETED.format(old_app.version) +
'please check logs for detail.')
LOG.info("Application %s recover to version %s completed."
% (old_app.name, old_app.version))
else:
self._update_app_status(
old_app, constants.APP_APPLY_FAILURE,
constants.APP_PROGRESS_UPDATE_ABORTED.format(old_app.version, new_app.version) +
constants.APP_PROGRESS_RECOVER_ABORTED.format(old_app.version) +
'please check logs for detail.')
LOG.error("Application %s recover to version %s aborted!"
% (old_app.name, old_app.version))
def _perform_app_rollback(self, from_app, to_app):
"""Perform application rollback request
This method invokes Armada to rollback the application releases to
previous installed versions. The jobs for the current installed
releases require to be cleaned up before starting armada rollback.
:param from_app: application object that application updating from
:param to_app: application object that application updating to
:return boolean: whether application rollback was successful
"""
if app.name == constants.HELM_APP_OPENSTACK:
try:
dep_app = self._dbapi.kube_app_get(constants.HELM_APP_PLATFORM)
status = dep_app.status
except exception.KubeAppNotFound:
status = constants.APP_NOT_PRESENT
LOG.info("Application %s (%s) rollback started." % (to_app.name, to_app.version))
if status != constants.APP_APPLY_SUCCESS:
self._update_app_status(app,
new_status=constants.APP_APPLY_FAILURE,
new_progress=constants.APP_PROGRESS_DEPS_PLATFORM_APP)
LOG.error("Cannot apply %s until %s is applied." % (
constants.HELM_APP_OPENSTACK, constants.HELM_APP_PLATFORM))
return False
return True
try:
to_db_app = self._dbapi.kube_app_get(to_app.name)
to_app_releases = \
self._dbapi.kube_app_chart_release_get_all(to_db_app.id)
from_db_app = self._dbapi.kube_app_get_inactive_by_name_version(
from_app.name, version=from_app.version)
from_app_releases = \
self._dbapi.kube_app_chart_release_get_all(from_db_app.id)
from_app_r_dict = {r.release: r.version for r in from_app_releases}
self._update_app_status(
to_app, new_progress=constants.APP_PROGRESS_ROLLBACK_RELEASES)
charts_sequence = {c.release: c.sequenced for c in to_app.charts}
charts_labels = {c.release: c.labels for c in to_app.charts}
for to_app_r in to_app_releases:
if to_app_r.version != 0:
if (to_app_r.release not in from_app_r_dict or
(to_app_r.release in from_app_r_dict and
to_app_r.version != from_app_r_dict[to_app_r.release])):
# Append the release which needs to be rolled back
to_app.releases.append(
{'release': to_app_r.release,
'version': to_app_r.version,
'sequenced': charts_sequence[to_app_r.release]})
# Cleanup the jobs for the current installed release
if to_app_r.release in charts_labels:
for label in charts_labels[to_app_r.release]:
self._kube.kube_delete_collection_namespaced_job(
to_app_r.namespace, label)
LOG.info("Jobs deleted for release %s" % to_app_r.release)
if self._make_armada_request_with_monitor(to_app,
constants.APP_ROLLBACK_OP):
self._update_app_status(to_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
LOG.info("Application %s (%s) rollback completed."
% (to_app.name, to_app.version))
return True
except Exception as e:
# unexpected KubeAppNotFound, KubeAppInactiveNotFound, KeyError
# k8s exception:fail to cleanup release jobs
LOG.exception(e)
LOG.error("Application rollback aborted!")
return False
def perform_app_upload(self, rpc_app, tarfile):
"""Process application upload request
@ -1106,7 +1393,7 @@ class AppOperator(object):
app = AppOperator.Application(rpc_app,
rpc_app.get('name') in self._helm.get_helm_applications())
LOG.info("Application (%s) upload started." % app.name)
LOG.info("Application %s (%s) upload started." % (app.name, app.version))
try:
app.tarfile = tarfile
@ -1117,7 +1404,10 @@ class AppOperator(object):
downloaded_tarfile = self._download_tarfile(app)
if downloaded_tarfile is None:
self._abort_operation(app, constants.APP_UPLOAD_OP)
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="Failed to find the downloaded tarball.")
else:
app.tarfile = downloaded_tarfile
@ -1133,8 +1423,12 @@ class AppOperator(object):
self._extract_tarfile(app)
shutil.copy(app.mfile_abs, app.armada_mfile_abs)
if not self._docker.make_armada_request('validate', app.armada_mfile):
return self._abort_operation(app, constants.APP_UPLOAD_OP)
if not self._docker.make_armada_request(
'validate', manifest_file=app.armada_mfile):
raise exception.KubeAppUploadFailure(
name=app.name,
version=app.version,
reason="Failed to validate application manifest.")
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_VALIDATE_UPLOAD_CHARTS)
@ -1146,15 +1440,21 @@ class AppOperator(object):
self._save_images_list(app)
if app.patch_dependencies:
self._app._patch_report_app_dependencies(
app.name, app.patch_dependencies)
self._update_app_status(app, constants.APP_UPLOAD_SUCCESS)
LOG.info("Application (%s) upload completed." % app.name)
app.name + '-' + app.version, app.patch_dependencies)
self._create_app_releases_version(app.name, app.charts)
self._update_app_status(app, constants.APP_UPLOAD_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
LOG.info("Application %s (%s) upload completed." % (app.name, app.version))
return app
except exception.KubeAppUploadFailure as e:
LOG.exception(e)
self._abort_operation(app, constants.APP_UPLOAD_OP, str(e))
raise
except Exception as e:
LOG.exception(e)
self._abort_operation(app, constants.APP_UPLOAD_OP)
raise exception.KubeAppUploadFailure(
name=app.name, version=app.version, reason=e)
def perform_app_apply(self, rpc_app, mode):
"""Process application install request
@ -1179,11 +1479,7 @@ class AppOperator(object):
app = AppOperator.Application(rpc_app,
rpc_app.get('name') in self._helm.get_helm_applications())
if not self._inter_app_dependencies_are_met(app):
return False
LOG.info("Application (%s) apply started." % app.name)
LOG.info("Application %s (%s) apply started." % (app.name, app.version))
overrides_str = ''
ready = True
@ -1204,9 +1500,6 @@ class AppOperator(object):
app.name, mode)
if overrides_files:
LOG.info("Application overrides generated.")
# Ensure all chart overrides are readable by Armada
for file in overrides_files:
os.chmod(file, 0o644)
overrides_str =\
self._generate_armada_overrides_str(app.name, app.version,
overrides_files)
@ -1221,25 +1514,140 @@ class AppOperator(object):
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_DOWNLOAD_IMAGES)
self._download_images(app)
except exception.KubeAppApplyFailure as e:
# ex:Image download failure
LOG.exception(e)
self._abort_operation(app, constants.APP_APPLY_OP, str(e))
raise
except Exception as e:
# ex:K8s resource creation failure
LOG.exception(e)
self._abort_operation(app, constants.APP_APPLY_OP)
raise exception.KubeAppApplyFailure(
name=app.name, version=app.version, reason=e)
try:
if ready:
self._update_app_status(
app, new_progress=constants.APP_PROGRESS_APPLY_MANIFEST)
if self._make_armada_request_with_monitor(app,
constants.APP_APPLY_OP,
overrides_str):
self._update_app_releases_version(app.name)
self._update_app_status(app,
constants.APP_APPLY_SUCCESS)
constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
app.update_active(True)
LOG.info("Application (%s) apply completed." % app.name)
LOG.info("Application %s (%s) apply completed." % (app.name, app.version))
return True
except Exception as e:
# ex: update release version failure
LOG.exception(e)
# If it gets here, something went wrong
self._abort_operation(app, constants.APP_APPLY_OP)
return False
def perform_app_update(self, from_rpc_app, to_rpc_app, tarfile, operation):
"""Process application update request
This method leverages the existing application upload workflow to
validate/upload the new application tarfile, then invokes Armada
apply or rollback to update application from an applied version
to the new version. If any failure happens during updating, the
recover action will be triggered to recover the application to
the old version.
After apply/rollback to the new version is done, the files for the
old application version will be cleaned up as well as the releases
which are not in the new application version.
The app status will be populated to "applied" once update is completed
so that user can continue applying app with user overrides.
Usage ex: the method can be used to update from v1 to v2 and also
update back from v2 to v1
:param from_rpc_app: application object in the RPC request that
application updating from
:param to_rpc_app: application object in the RPC request that
application updating to
:param tarfile: location of application tarfile
:param operation: apply or rollback
"""
from_app = AppOperator.Application(from_rpc_app,
from_rpc_app.get('name') in self._helm.get_helm_applications())
to_app = AppOperator.Application(to_rpc_app,
to_rpc_app.get('name') in self._helm.get_helm_applications())
LOG.info("Start updating Application %s from version %s to version %s ..."
% (to_app.name, from_app.version, to_app.version))
try:
# Upload new app tarball
to_app = self.perform_app_upload(to_rpc_app, tarfile)
self._update_app_status(to_app, constants.APP_UPDATE_IN_PROGRESS)
result = False
if operation == constants.APP_APPLY_OP:
result = self.perform_app_apply(to_rpc_app, mode=None)
elif operation == constants.APP_ROLLBACK_OP:
result = self._perform_app_rollback(from_app, to_app)
if not result:
LOG.error("Application %s update from version %s to version "
"%s aborted." % (to_app.name, from_app.version, to_app.version))
return self._perform_app_recover(from_app, to_app)
self._update_app_status(to_app, constants.APP_UPDATE_IN_PROGRESS,
"cleanup application version {}".format(from_app.version))
# App apply/rollback succeeded
# Starting cleanup old application
from_app.charts = self._get_list_of_charts(from_app.armada_mfile_abs)
to_app_charts = [c.release for c in to_app.charts]
deployed_releases = helm_utils.retrieve_helm_releases()
for from_chart in from_app.charts:
if (from_chart.release not in to_app_charts and
from_chart.release in deployed_releases):
# Cleanup the releases in the old application version
# but are not in the new application version
helm_utils.delete_helm_release(from_chart.release)
LOG.info("Helm release %s for Application %s (%s) deleted"
% (from_chart.release, from_app.name, from_app.version))
self._cleanup(from_app, app_dir=False)
self._app._patch_report_app_dependencies(
from_app.name + '-' + from_app.version)
self._update_app_status(
to_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_COMPLETED.format(from_app.version,
to_app.version))
LOG.info("Application %s update from version %s to version "
"%s completed." % (to_app.name, from_app.version, to_app.version))
except (exception.KubeAppUploadFailure,
exception.KubeAppApplyFailure):
# Error occurs during app uploading or applying but before
# armada apply process...
# ie.images download/k8s resource creation failure
# Start recovering without trigger armada process
return self._perform_app_recover(from_app, to_app,
armada_process_required=False)
except Exception as e:
# Application update successfully(armada apply/rollback)
# Error occurs during cleanup old app
# ie. delete app files failure, patch controller failure,
# helm release delete failure
self._update_app_status(
to_app, constants.APP_APPLY_SUCCESS,
constants.APP_PROGRESS_UPDATE_COMPLETED.format(from_app.version, to_app.version) +
constants.APP_PROGRESS_CLEANUP_FAILED.format(from_app.version) +
'please check logs for detail.')
LOG.exception(e)
return True
def perform_app_remove(self, rpc_app):
"""Process application remove request
@ -1260,6 +1668,19 @@ class AppOperator(object):
app, new_progress=constants.APP_PROGRESS_DELETE_MANIFEST)
if self._make_armada_request_with_monitor(app, constants.APP_DELETE_OP):
# After armada delete, the data for the releases are purged from
# tiller/etcd, the releases info for the active app stored in sysinv
# db should be set back to 0 and the inactive apps require to be
# destroyed too.
db_app = self._dbapi.kube_app_get(app.name)
app_releases = self._dbapi.kube_app_chart_release_get_all(db_app.id)
for r in app_releases:
if r.version != 0:
self._dbapi.kube_app_chart_release_update(
db_app.id, r.release, r.namespace, {'version': 0})
if self._dbapi.kube_app_get_inactive(app.name):
self._dbapi.kube_app_destroy(app.name, inactive=True)
if app.system_app:
try:
@ -1271,7 +1692,8 @@ class AppOperator(object):
LOG.exception(e)
return False
self._update_app_status(app, constants.APP_UPLOAD_SUCCESS)
self._update_app_status(app, constants.APP_UPLOAD_SUCCESS,
constants.APP_PROGRESS_COMPLETED)
LOG.info("Application (%s) remove completed." % app.name)
return True
else:
@ -1320,7 +1742,7 @@ class AppOperator(object):
try:
self._dbapi.kube_app_destroy(app.name)
self._cleanup(app)
self._app._patch_report_app_dependencies(app.name)
self._app._patch_report_app_dependencies(app.name + '-' + app.version)
LOG.info("Application (%s) has been purged from the system." %
app.name)
msg = None
@ -1370,6 +1792,7 @@ class AppOperator(object):
self.patch_dependencies = []
self.charts = []
self.releases = []
@property
def name(self):
@ -1533,8 +1956,8 @@ class DockerHelper(object):
os.unlink(kube_config)
return None
def make_armada_request(self, request, manifest_file, overrides_str='',
logfile=None):
def make_armada_request(self, request, manifest_file='', overrides_str='',
app_releases=[], logfile=None):
if logfile is None:
logfile = request + '.log'
@ -1587,6 +2010,44 @@ class DockerHelper(object):
else:
LOG.error("Failed to apply application manifest %s: "
"%s." % (manifest_file, exec_logs))
elif request == constants.APP_ROLLBACK_OP:
cmd_rm = "rm " + logfile
armada_svc.exec_run(cmd_rm)
for app_release in app_releases:
release = app_release.get('release')
version = app_release.get('version')
sequenced = app_release.get('sequenced')
if sequenced:
cmd = "/bin/bash -c 'armada rollback --debug --wait --timeout 1800 " +\
"--release " + release + " --version " + str(version) + tiller_host +\
" | tee -a " + logfile + "'"
else:
cmd = "/bin/bash -c 'armada rollback --debug --release " +\
release + " --version " + str(version) + tiller_host +\
" | tee -a " + logfile + "'"
(exit_code, exec_logs) = armada_svc.exec_run(cmd)
if exit_code == 0:
if ARMADA_RELEASE_ROLLBACK_FAILURE_MSG in exec_logs:
rc = False
LOG.error("Received a false positive response from "
"Docker/Armada. Failed to rollback release "
"(%s): %s" % (release, exec_logs))
break
else:
rc = False
if exit_code == CONTAINER_ABNORMAL_EXIT_CODE:
LOG.error("Failed to rollback release (%s). "
"Armada service has exited abnormally."
% release)
else:
LOG.error("Failed to rollback release (%s): %s"
% (release, exec_logs))
break
if rc:
LOG.info("Application releases %s were successfully "
"rolled back." % app_releases)
elif request == constants.APP_DELETE_OP:
cmd = "/bin/bash -c 'armada delete --debug --manifest " +\
manifest_file + tiller_host + " | tee " + logfile + "'"

View File

@ -100,6 +100,7 @@ from sysinv.puppet import common as puppet_common
from sysinv.puppet import puppet
from sysinv.helm import helm
from sysinv.helm import common as helm_common
from sysinv.helm import utils as helm_utils
MANAGER_TOPIC = 'sysinv.conductor_manager'
@ -214,7 +215,7 @@ class ConductorManager(service.PeriodicService):
self._handle_restore_in_progress()
cutils.refresh_helm_repo_information()
helm_utils.refresh_helm_repo_information()
LOG.info("sysinv-conductor start committed system=%s" %
system.as_dict())
@ -10833,6 +10834,20 @@ class ConductorManager(service.PeriodicService):
return app_applied
def perform_app_update(self, context, from_rpc_app, to_rpc_app, tarfile, operation):
"""Handling of application update request (via AppOperator)
:param context: request context.
:param from_rpc_app: data object provided in the rpc request that
application update from
:param to_rpc_app: data object provided in the rpc request that
application update to
:param tarfile: location of the application tarfile to be extracted
:param operation: apply or rollback
"""
self._app.perform_app_update(from_rpc_app, to_rpc_app, tarfile, operation)
def perform_app_remove(self, context, rpc_app):
"""Handling of application removal request (via AppOperator)

View File

@ -1777,6 +1777,24 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
rpc_app=rpc_app,
mode=mode))
def perform_app_update(self, context, from_rpc_app, to_rpc_app, tarfile, operation):
"""Handle application update request
:param context: request context.
:param from_rpc_app: data object provided in the rpc request that
application update from
:param to_rpc_app: data object provided in the rpc request that
application update to
:param tarfile: location of application tarfile to be extracted
:param operation: apply or rollback
"""
return self.cast(context,
self.make_msg('perform_app_update',
from_rpc_app=from_rpc_app,
to_rpc_app=to_rpc_app,
tarfile=tarfile,
operation=operation))
def perform_app_remove(self, context, rpc_app):
"""Handle application remove request

View File

@ -7528,13 +7528,40 @@ class Connection(api.Connection):
def _kube_app_get(self, name):
query = model_query(models.KubeApp)
query = query.filter_by(name=name)
query = query.filter(
models.KubeApp.name == name,
models.KubeApp.status != constants.APP_INACTIVE_STATE)
try:
result = query.one()
except NoResultFound:
raise exception.KubeAppNotFound(name=name)
return result
@objects.objectify(objects.kube_app)
def kube_app_get_inactive(self, name, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.KubeApp)
query = query.filter(
models.KubeApp.name == name,
models.KubeApp.status == constants.APP_INACTIVE_STATE)
return _paginate_query(models.KubeApp, limit, marker,
sort_key, sort_dir, query)
@objects.objectify(objects.kube_app)
def kube_app_get_inactive_by_name_version(self, name, version):
query = model_query(models.KubeApp)
query = query.filter(
models.KubeApp.name == name,
models.KubeApp.app_version == version,
models.KubeApp.status == constants.APP_INACTIVE_STATE)
try:
result = query.one()
except NoResultFound:
raise exception.KubeAppInactiveNotFound(name=name,
version=version)
return result
@objects.objectify(objects.kube_app)
def kube_app_create(self, values):
app = models.KubeApp()
@ -7545,15 +7572,18 @@ class Connection(api.Connection):
session.flush()
except db_exc.DBDuplicateEntry:
LOG.error("Failed to add application %s. "
"Already exists with this name" %
(values['name']))
"Already exists with this name"
"and version" % (values['name']))
raise exception.KubeAppAlreadyExists(
name=values['name'])
name=values['name'],
version=values['app_version'])
return self.kube_app_get(values['name'])
@objects.objectify(objects.kube_app)
def kube_app_get_all(self):
query = model_query(models.KubeApp)
query = query.filter(
models.KubeApp.status != constants.APP_INACTIVE_STATE)
return query.all()
@objects.objectify(objects.kube_app)
@ -7568,26 +7598,81 @@ class Connection(api.Connection):
count = query.update(values, synchronize_session='fetch')
if count == 0:
raise exception.KubeAppNotFound(id)
raise exception.KubeAppNotFound(values['name'])
return query.one()
def kube_app_destroy(self, name):
def kube_app_destroy(self, name, version=None, inactive=False):
with _session_for_write() as session:
query = model_query(models.KubeApp, session=session)
query = query.filter_by(name=name)
if version:
query = query.filter_by(app_version=version)
if inactive:
query = query.filter_by(
status=constants.APP_INACTIVE_STATE)
if query.all():
query.delete()
@objects.objectify(objects.kube_app_releases)
def kube_app_chart_release_get(self, app_id, release, namespace):
query = model_query(models.KubeAppReleases)
query = query.filter(models.KubeAppReleases.app_id == app_id,
models.KubeAppReleases.release == release,
models.KubeAppReleases.namespace == namespace)
try:
result = query.one()
except NoResultFound:
raise exception.KubeAppChartReleaseNotFound(
name=release,
namespace=namespace,
app_id=app_id)
return result
@objects.objectify(objects.kube_app_releases)
def kube_app_chart_release_update(self, app_id, release, namespace, values):
with _session_for_write() as session:
query = model_query(models.KubeAppReleases, session=session)
query = query.filter(models.KubeAppReleases.app_id == app_id,
models.KubeAppReleases.release == release,
models.KubeAppReleases.namespace == namespace)
count = query.update(values, synchronize_session='fetch')
if count == 0:
raise exception.KubeAppChartReleaseNotFound(
name=release,
namespace=namespace,
app_id=app_id)
return query.one()
@objects.objectify(objects.kube_app_releases)
def kube_app_chart_release_create(self, values):
app_release = models.KubeAppReleases()
app_release.update(values)
with _session_for_write() as session:
try:
app = query.one()
if app.status not in [constants.APP_UPLOAD_SUCCESS,
constants.APP_UPLOAD_FAILURE]:
failure_reason =\
"operation is not allowed while status is " + app.status
raise exception.KubeAppDeleteFailure(
name=name,
version=app.app_version,
reason=failure_reason)
except NoResultFound:
raise exception.KubeAppNotFound(name)
query.delete()
session.add(app_release)
session.flush()
except db_exc.DBDuplicateEntry:
LOG.error("Failed to add chart release %s for application %s. "
"Already exists with this name %s and namespace %s" %
(values['release'], values['app_id'],
values['release'], values['namespace']))
raise exception.KubeAppChartReleaseAlreadyExists(
name=values['release'], namespace=values['namespace'],
app_id=values['app_id'])
return self.kube_app_chart_release_get(
values['app_id'], values['release'], values['namespace'])
@objects.objectify(objects.kube_app_releases)
def kube_app_chart_release_get_all(self, app_id, limit=None, marker=None,
sort_key=None, sort_dir=None):
query = model_query(models.KubeAppReleases)
query = query.filter(
models.KubeAppReleases.app_id == app_id)
return _paginate_query(models.KubeAppReleases, limit, marker,
sort_key, sort_dir, query)
def _datanetwork_get(self, model_class, datanetwork_id, obj=None):
session = None

View File

@ -0,0 +1,37 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from migrate.changeset import UniqueConstraint
from sqlalchemy import MetaData, Table
ENGINE = 'InnoDB'
CHARSET = 'utf8'
def upgrade(migrate_engine):
"""
This database upgrade drops the old unique constraint and creates
new unique constraint for the kube_app table.
"""
meta = MetaData()
meta.bind = migrate_engine
kube_app = Table('kube_app', meta, autoload=True)
UniqueConstraint('name', table=kube_app).drop()
UniqueConstraint('name', 'app_version', table=kube_app,
name='u_app_name_version').create()
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# As per other openstack components, downgrade is
# unsupported in this release.
raise NotImplementedError('SysInv database downgrade is unsupported.')

View File

@ -0,0 +1,54 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
from sqlalchemy import DateTime, String, Integer
from sqlalchemy import Column, MetaData, Table, ForeignKey, UniqueConstraint
ENGINE = 'InnoDB'
CHARSET = 'utf8'
def upgrade(migrate_engine):
"""
This database upgrade creates a new table for storing kubenetes
application releases info.
"""
meta = MetaData()
meta.bind = migrate_engine
Table('kube_app', meta, autoload=True)
# Define and create the kube application releases table.
kube_app_releases = Table(
'kube_app_releases',
meta,
Column('created_at', DateTime),
Column('updated_at', DateTime),
Column('id', Integer, primary_key=True),
Column('release', String(255), nullable=True),
Column('namespace', String(255), nullable=True),
Column('version', Integer),
Column('app_id', Integer,
ForeignKey('kube_app.id', ondelete='CASCADE')),
UniqueConstraint('release', 'namespace', 'app_id', name='u_app_release_namespace'),
mysql_engine=ENGINE,
mysql_charset=CHARSET,
)
kube_app_releases.create()
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
# As per other openstack components, downgrade is
# unsupported in this release.
raise NotImplementedError('SysInv database downgrade is unsupported.')

View File

@ -1701,10 +1701,23 @@ class KubeApp(Base):
__tablename__ = 'kube_app'
id = Column(Integer, primary_key=True)
name = Column(String(255), unique=True, nullable=False)
name = Column(String(255), nullable=False)
app_version = Column(String(255), nullable=False)
manifest_name = Column(String(255), nullable=False)
manifest_file = Column(String(255), nullable=False)
status = Column(String(255), nullable=False)
progress = Column(String(255), nullable=True)
active = Column(Boolean, nullable=False, default=False)
UniqueConstraint('name', 'app_version', name='u_app_name_version')
class KubeAppReleases(Base):
__tablename__ = 'kube_app_releases'
id = Column(Integer, primary_key=True)
release = Column(String(255), nullable=True)
namespace = Column(String(255), nullable=True)
version = Column(Integer)
app_id = Column(Integer, ForeignKey('kube_app.id', ondelete='CASCADE'))
kube_app = relationship("KubeApp", lazy="joined", join_depth=1)
UniqueConstraint('release', 'namespace', 'app_id', name='u_app_release_namespace')

View File

@ -584,6 +584,8 @@ class HelmOperator(object):
yaml.dump(overrides, f, default_flow_style=False)
os.close(fd)
os.rename(tmppath, filepath)
# Change the permission to be readable to non-root users(ie.Armada)
os.chmod(filepath, 0o644)
except Exception:
LOG.exception("failed to write overrides file: %s" % filepath)
raise

View File

@ -170,9 +170,21 @@ class OpenstackBaseHelm(base.BaseHelm):
if password:
return password.encode('utf8', 'strict')
# The password is not present, so generate one and store it to
# the override
password = self._generate_random_password()
# The password is not present, dump from inactive app if available,
# otherwise generate one and store it to the override
try:
inactive_apps = self.dbapi.kube_app_get_inactive(
constants.HELM_APP_OPENSTACK)
app_override = self.dbapi.helm_override_get(app_id=inactive_apps[0].id,
name=chart,
namespace=namespace)
password = app_override.system_overrides.get(field, None)
except (IndexError, exception.HelmOverrideNotFound):
# No inactive app or no overrides for the inactive app
pass
if not password:
password = self._generate_random_password()
values = {'system_overrides': override.system_overrides}
values['system_overrides'].update({
field: password,
@ -359,11 +371,27 @@ class OpenstackBaseHelm(base.BaseHelm):
if privatekey and publickey:
return str(privatekey), str(publickey)
# ssh keys are not set so generate them and store in overrides
key = RSA.generate(2048)
pubkey = key.publickey()
newprivatekey = key.exportKey('PEM')
newpublickey = pubkey.exportKey('OpenSSH')
# ssh keys are not set, dump from inactive app if available,
# otherwise generate them and store in overrides
newprivatekey = None
newpublickey = None
try:
inactive_apps = self.dbapi.kube_app_get_inactive(
constants.HELM_APP_OPENSTACK)
app_override = self.dbapi.helm_override_get(app_id=inactive_apps[0].id,
name=chart,
namespace=namespace)
newprivatekey = str(app_override.system_overrides.get('privatekey', None))
newpublickey = str(app_override.system_overrides.get('publickey', None))
except (IndexError, exception.HelmOverrideNotFound):
# No inactive app or no overrides for the inactive app
pass
if not newprivatekey or not newprivatekey:
key = RSA.generate(2048)
pubkey = key.publickey()
newprivatekey = key.exportKey('PEM')
newpublickey = pubkey.exportKey('OpenSSH')
values = {'system_overrides': override.system_overrides}
values['system_overrides'].update({'privatekey': newprivatekey,
'publickey': newpublickey})

View File

@ -0,0 +1,125 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# All Rights Reserved.
#
"""Helm utilities and helper functions."""
import ruamel.yaml as yaml
from oslo_log import log as logging
import subprocess
from sysinv.common import exception
import threading
import os
LOG = logging.getLogger(__name__)
def refresh_helm_repo_information():
"""Refresh the helm chart repository information.
Ensure that the local repository information maintained in key user home
directories are updated. Run this when the conductor is initialized and
after application uploads.
This handles scenarios where an upload occurs on the active controller
followed by a swact. The newly actvated controller needs to make sure that
the local repository cache reflect any changes.
"""
with open(os.devnull, "w") as fnull:
try:
subprocess.check_call(['sudo', '-u', 'wrsroot',
'helm', 'repo', 'update'],
stdout=fnull, stderr=fnull)
except subprocess.CalledProcessError:
# Just log an error. Don't stop any callers from further execution.
LOG.error("Failed to update helm repo data for user wrsroot.")
def retrieve_helm_releases():
"""Retrieve the deployed helm releases from tiller
Get the name, namespace and version for the deployed releases
by querying helm tiller
:return: a dict of deployed helm releases
"""
helm_list = subprocess.Popen(
['helm', '--kubeconfig', '/etc/kubernetes/admin.conf',
'list', '--deployed', '--output', 'yaml'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timer = threading.Timer(20, helm_list.kill)
try:
releases = {}
deployed_releases = {}
timer.start()
out, err = helm_list.communicate()
if out and not err:
output = yaml.safe_load(out)
releases = output.get('Releases', None)
elif err and not out:
raise exception.HelmTillerFailure(
reason="Failed to retrieve releases: %s" % err)
elif not err and not out:
err_msg = "Failed to retrieve releases. " \
"Helm tiller response timeout."
raise exception.HelmTillerFailure(reason=err_msg)
for r in releases:
r_name = r.get('Name')
r_version = r.get('Revision')
r_namespace = r.get('Namespace')
deployed_releases.setdefault(r_name, {}).update(
{r_namespace: r_version})
except Exception as e:
raise exception.HelmTillerFailure(
reason="Failed to retrieve releases: %s" % e)
finally:
timer.cancel()
return deployed_releases
def delete_helm_release(release):
"""Delete helm release
This method deletes a helm release without --purge which removes
all associated resources from kubernetes but not from the store(ETCD)
In the scenario of updating application, the method is needed to clean
up the releases if there were deployed releases in the old application
but not in the new application
:param release: the name of the helm release
"""
helm_cmd = subprocess.Popen(
['helm', '--kubeconfig', '/etc/kubernetes/admin.conf',
'delete', release],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
timer = threading.Timer(20, helm_cmd.kill)
try:
timer.start()
out, err = helm_cmd.communicate()
if err and not out:
if ("deletion completed" or "not found" or "is already deleted") in err:
LOG.debug("Release %s not found or deleted already" % release)
return True
raise exception.HelmTillerFailure(
reason="Failed to delete release: %s" % err)
elif not err and not out:
err_msg = "Failed to delete release. " \
"Helm tiller response timeout."
raise exception.HelmTillerFailure(reason=err_msg)
return True
except Exception as e:
LOG.error("Failed to delete release: %s" % e)
raise exception.HelmTillerFailure(
reason="Failed to delete release: %s" % e)
finally:
timer.cancel()

View File

@ -37,6 +37,7 @@ from sysinv.objects import helm_overrides
from sysinv.objects import host
from sysinv.objects import host_upgrade
from sysinv.objects import kube_app
from sysinv.objects import kube_app_releases
from sysinv.objects import interface
from sysinv.objects import interface_ae
from sysinv.objects import interface_ethernet
@ -182,6 +183,7 @@ storage_ceph_external = storage_ceph_external.StorageCephExternal
helm_overrides = helm_overrides.HelmOverrides
label = label.Label
kube_app = kube_app.KubeApp
kube_app_releases = kube_app_releases.KubeAppReleases
datanetwork = datanetwork.DataNetwork
__all__ = (system,
@ -249,6 +251,7 @@ __all__ = (system,
storage_ceph_external,
helm_overrides,
kube_app,
kube_app_releases,
datanetwork,
interface_network,
# alias objects for RPC compatibility

View File

@ -31,5 +31,9 @@ class KubeApp(base.SysinvObject):
def get_by_name(cls, context, name):
return cls.dbapi.kube_app_get(name)
@base.remotable_classmethod
def get_inactive_app_by_name_version(cls, context, name, version):
return cls.dbapi.kube_app_get_inactive_by_name_version(name, version)
def save_changes(self, context, updates):
self.dbapi.kube_app_update(self.id, updates)

View File

@ -0,0 +1,36 @@
#
# Copyright (c) 2019 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# coding=utf-8
#
from sysinv.db import api as db_api
from sysinv.objects import base
from sysinv.objects import utils
class KubeAppReleases(base.SysinvObject):
# VERSION 1.0: Initial version
VERSION = '1.0'
dbapi = db_api.get_instance()
fields = {'id': int,
'release': utils.str_or_none,
'namespace': utils.str_or_none,
'version': int,
'app_id': int,
}
@base.remotable_classmethod
def get_by_id(cls, context, app_id, release, namespace):
return cls.dbapi.kube_app_chart_release_get(app_id, release, namespace)
def save_changes(self, context, updates):
self.dbapi.kube_app_chart_release_update(self.app_id, self.release,
self.namespace, updates)

View File

@ -44,7 +44,7 @@ import testtools
import eventlet
eventlet.monkey_patch(os=False)
import sysinv.common.utils
import sysinv.helm.utils
CONF = cfg.CONF
_DB_CACHE = None
@ -122,7 +122,7 @@ class TestingException(Exception):
class TestCase(testtools.TestCase):
"""Test case base class for all unit tests."""
helm_refresh_patcher = mock.patch.object(sysinv.common.utils, 'refresh_helm_repo_information')
helm_refresh_patcher = mock.patch.object(sysinv.helm.utils, 'refresh_helm_repo_information')
def setUp(self):
"""Run before each test method to initialize test environment."""