Merge "Update load import and delete APIs"

This commit is contained in:
Zuul 2020-06-02 18:57:02 +00:00 committed by Gerrit Code Review
commit 45c31a7973
12 changed files with 242 additions and 50 deletions

View File

@ -297,7 +297,13 @@ class HTTPClient(httplib2.Http):
self.authenticate_and_fetch_endpoint_url()
connection_url = self._get_connection_url(url)
fields = kwargs.get('data')
fields['file'] = (kwargs['body'], open(kwargs['body'], 'rb'))
files = kwargs['body']
if fields is None:
fields = dict()
for k, v in files.items():
fields[k] = (v, open(v, 'rb'),)
enc = MultipartEncoder(fields)
headers = {'Content-Type': enc.content_type,
"X-Auth-Token": self.auth_token}

View File

@ -43,7 +43,7 @@ class DeviceImageManager(base.Manager):
data[key] = value
else:
raise exc.InvalidAttribute('%s' % key)
return self._upload_multipart(self._path(), file, data=data)
return self._upload_multipart(self._path(), dict(file=file), data=data)
def apply(self, device_image_id, labels=None):
return self._update(self._path(device_image_id) + '?action=apply',

View File

@ -51,8 +51,8 @@ class LoadManager(base.Manager):
new[key] = value
else:
raise exc.InvalidAttribute(key)
res, body = self.api.json_request('POST', path, body=new)
return body
return self._upload_multipart(path, body=new)
def delete(self, load_id):
path = '/v1/loads/%s' % load_id

View File

@ -11,6 +11,12 @@
from cgtsclient.common import utils
from cgtsclient import exc
import os.path
from oslo_utils._i18n import _
import sys
import threading
import time
IMPORTED_LOAD_MAX_COUNT = 1
def _print_load_show(load):
@ -71,23 +77,64 @@ def do_load_import(cc, args):
if not os.path.isabs(args.sigpath):
args.sigpath = os.path.abspath(args.sigpath)
# Here we pass the path_to_iso to the API
# The API will perform any required actions to import the provided iso
if not os.path.isfile(args.isopath):
raise exc.CommandError(_("File %s does not exist." % args.isopath))
if not os.path.isfile(args.sigpath):
raise exc.CommandError(_("File %s does not exist." % args.sigpath))
# The following logic is taken from sysinv api as it takes a while for
# this large POST request to reach the server.
#
# Ensure the request does not exceed load import limit before sending.
loads = cc.load.list()
if len(loads) > IMPORTED_LOAD_MAX_COUNT:
raise exc.CommandError(_(
"Max number of loads (2) reached. Please remove the "
"old or unused load before importing a new one."))
patch = {'path_to_iso': args.isopath, 'path_to_sig': args.sigpath}
try:
new_load = cc.load.import_load(**patch)
except exc.HTTPNotFound:
raise exc.CommandError('Load import failed')
if new_load:
uuid = new_load["uuid"]
print("This operation will take a while. Please wait.")
wait_task = WaitThread()
wait_task.start()
resp = cc.load.import_load(**patch)
wait_task.join()
error = resp.get('error')
if error:
raise exc.CommandError("%s" % error)
except Exception as e:
wait_task.join()
raise exc.CommandError(_("Load import failed. Reason: %s" % e))
else:
raise exc.CommandError('load was not created')
new_load = resp.get('new_load')
if new_load:
uuid = new_load["uuid"]
else:
raise exc.CommandError(_("Load was not created."))
try:
load = cc.load.get(uuid)
except exc.HTTPNotFound:
raise exc.CommandError('load UUID not found: %s' % uuid)
try:
load = cc.load.get(uuid)
except exc.HTTPNotFound:
raise exc.CommandError(_("Load UUID not found: %s" % uuid))
_print_load_show(load)
_print_load_show(load)
class WaitThread(threading.Thread):
def __init__(self):
super(WaitThread, self).__init__()
self.stop = threading.Event()
def run(self):
while not self.stop.is_set():
sys.stdout.write(".")
sys.stdout.flush()
time.sleep(10)
def join(self, timeout=None): # pylint: disable=arguments-differ
self.stop.set()
super(WaitThread, self).join(timeout)
sys.stdout.write("\n")
sys.stdout.flush()

View File

@ -1750,7 +1750,7 @@ class AgentManager(service.PeriodicService):
else:
rpcapi = conductor_rpcapi.ConductorAPI(
topic=conductor_rpcapi.MANAGER_TOPIC)
rpcapi.finalize_delete_load(context)
rpcapi.finalize_delete_load(context, software_version)
else:
LOG.error("Cleanup script %s does not exist." % cleanup_script)

View File

@ -17,12 +17,14 @@
# under the License.
from oslo_config import cfg
import os
import pecan
from sysinv.api import acl
from sysinv.api import config
from sysinv.api import hooks
from sysinv.api import middleware
from sysinv.common import constants
from sysinv.common import policy
auth_opts = [
@ -41,6 +43,14 @@ def get_pecan_config():
return pecan.configuration.conf_from_file(filename)
def make_tempdir():
# Set up sysinv-api tempdir for large POST request
tempdir = constants.SYSINV_TMPDIR
if not os.path.isdir(tempdir):
os.makedirs(tempdir)
os.environ['TMPDIR'] = tempdir
def setup_app(pecan_config=None, extra_hooks=None):
policy.init()
@ -62,6 +72,8 @@ def setup_app(pecan_config=None, extra_hooks=None):
pecan.configuration.set_config(dict(pecan_config), overwrite=True)
make_tempdir()
app = pecan.make_app(
pecan_config.app.root,
static_root=pecan_config.app.static_root,

View File

@ -19,6 +19,7 @@
#
import jsonpatch
import os
import pecan
from pecan import rest
import six
@ -28,13 +29,14 @@ from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from oslo_log import log
from pecan import expose
from pecan import request
from sysinv._i18n import _
from sysinv.api.controllers.v1 import base
from sysinv.api.controllers.v1 import collection
from sysinv.api.controllers.v1 import link
from sysinv.api.controllers.v1 import types
from sysinv.api.controllers.v1 import utils
from sysinv.common.constants import ACTIVE_LOAD_STATE
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils as cutils
@ -222,7 +224,7 @@ class LoadController(rest.RestController):
patch = load.as_dict()
self._new_load_semantic_checks(patch)
patch['state'] = ACTIVE_LOAD_STATE
patch['state'] = constants.ACTIVE_LOAD_STATE
try:
new_load = pecan.request.dbapi.load_create(patch)
@ -246,41 +248,120 @@ class LoadController(rest.RestController):
return load.convert_with_links(new_load)
@wsme_pecan.wsexpose(Load, body=LoadImportType)
def import_load(self, body):
@staticmethod
def _upload_file(file_item):
dst = None
try:
staging_dir = constants.LOAD_FILES_STAGING_DIR
if not os.path.isdir(staging_dir):
os.makedirs(staging_dir)
fn = os.path.join(staging_dir,
os.path.basename(file_item.filename))
if hasattr(file_item.file, 'fileno'):
# Large iso file
dst = os.open(fn, os.O_WRONLY | os.O_CREAT)
src = file_item.file.fileno()
size = 64 * 1024
n = size
while n >= size:
s = os.read(src, size)
n = os.write(dst, s)
os.close(dst)
else:
# Small signature file
with open(fn, 'wb') as sigfile:
sigfile.write(file_item.file.read())
except Exception:
if dst:
os.close(dst)
LOG.exception("Failed to upload load file %s" % file_item.filename)
return None
return fn
@expose('json')
@cutils.synchronized(LOCK_NAME)
def import_load(self):
"""Create a new Load."""
err_msg = None
# Only import loads on controller-0. This is required because the load
# is only installed locally and we will be booting controller-1 from
# this load during the upgrade.
if socket.gethostname() != constants.CONTROLLER_0_HOSTNAME:
raise wsme.exc.ClientSideError(_(
"load-import rejected: A load can only be imported "
"when %s is active." % constants.CONTROLLER_0_HOSTNAME))
err_msg = _("A load can only be imported when %s is "
"active. ") % constants.CONTROLLER_0_HOSTNAME
else:
loads = pecan.request.dbapi.load_get_list()
import_data = body.as_dict()
path_to_iso = import_data['path_to_iso']
path_to_sig = import_data['path_to_sig']
# Only 2 loads are allowed at one time: the active load
# and an imported load regardless of its current state
# (e.g. importing, error, deleting).
if len(loads) > constants.IMPORTED_LOAD_MAX_COUNT:
for load in loads:
if load.state == constants.ACTIVE_LOAD_STATE:
pass
elif load.state == constants.ERROR_LOAD_STATE:
err_msg = _("Please remove the load in error state "
"before importing a new one.")
elif load.state == constants.DELETING_LOAD_STATE:
err_msg = _("Please wait for the current load delete "
"to complete before importing a new one.")
else:
# Already imported or being imported
err_msg = _("Max number of loads (2) reached. Please "
"remove the old or unused load before "
"importing a new one.")
if err_msg:
return dict(error=err_msg)
load_files = dict()
for f in constants.IMPORT_LOAD_FILES:
if f not in request.POST:
err_msg = _("Missing required file for %s") % f
return dict(error=err_msg)
file_item = request.POST[f]
if not file_item.filename:
err_msg = _("No %s file uploaded") % f
return dict(error=err_msg)
fn = self._upload_file(file_item)
if fn:
load_files.update({f: fn})
else:
err_msg = _("Failed to save file %s to disk. Please check "
"sysinv logs for details." % file_item.filename)
return dict(error=err_msg)
LOG.info("Load files: %s saved to disk." % load_files)
try:
new_load = pecan.request.rpcapi.start_import_load(
pecan.request.context, path_to_iso, path_to_sig)
pecan.request.context,
load_files[constants.LOAD_ISO],
load_files[constants.LOAD_SIGNATURE])
except common.RemoteError as e:
# Keep only the message raised originally by sysinv conductor.
raise wsme.exc.ClientSideError(str(e.value))
return dict(error=str(e.value))
if new_load is None:
raise wsme.exc.ClientSideError(
_("Error importing load. Load not found"))
return dict(error=_("Error importing load. Load not found"))
# Signature and upgrade path checks have passed, make rpc call
# to the conductor to run import script in the background.
try:
pecan.request.rpcapi.import_load(
pecan.request.context, path_to_iso, new_load)
pecan.request.context,
load_files[constants.LOAD_ISO],
new_load)
except common.RemoteError as e:
# Keep only the message raised originally by sysinv conductor.
raise wsme.exc.ClientSideError(str(e.value))
return dict(error=str(e.value))
return Load.convert_with_links(new_load)
new_load_dict = new_load.as_dict()
return dict(new_load=new_load_dict)
@cutils.synchronized(LOCK_NAME)
@wsme.validate(six.text_type, [LoadPatchType])
@ -315,7 +396,7 @@ class LoadController(rest.RestController):
return Load.convert_with_links(rpc_load)
@cutils.synchronized(LOCK_NAME)
@wsme_pecan.wsexpose(None, six.text_type, status_code=204)
@wsme_pecan.wsexpose(Load, six.text_type, status_code=200)
def delete(self, load_id):
"""Delete a load."""
@ -342,3 +423,5 @@ class LoadController(rest.RestController):
cutils.validate_load_for_delete(load)
pecan.request.rpcapi.delete_load(pecan.request.context, load_id)
return Load.convert_with_links(load)

View File

@ -27,6 +27,7 @@ import webob
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_log import log
from oslo_utils import uuidutils
from pecan import hooks
from sysinv._i18n import _
@ -43,6 +44,10 @@ audit_log_name = "{}.{}".format(__name__, "auditor")
auditLOG = log.getLogger(audit_log_name)
def generate_request_id():
return 'req-%s' % uuidutils.generate_uuid()
class ConfigHook(hooks.PecanHook):
"""Attach the config object to the request so controllers can get to it."""
@ -198,7 +203,11 @@ class AuditLogging(hooks.PecanHook):
return
now = time.time()
elapsed = now - state.request.start_time
try:
elapsed = now - state.request.start_time
except AttributeError:
LOG.info("Start time is not in request, setting it to 0.")
elapsed = 0
environ = state.request.environ
server_protocol = environ["SERVER_PROTOCOL"]
@ -210,7 +219,12 @@ class AuditLogging(hooks.PecanHook):
tenant_id = state.request.headers.get('X-Tenant-Id')
tenant = state.request.headers.get('X-Tenant', tenant_id)
domain_name = state.request.headers.get('X-User-Domain-Name')
request_id = state.request.context.request_id
try:
request_id = state.request.context.request_id
except AttributeError:
LOG.info("Request id is not in request, setting it to an "
"auto generated id.")
request_id = generate_request_id()
url_path = urlparse(state.request.path_qs).path
@ -242,6 +256,19 @@ class AuditLogging(hooks.PecanHook):
tenant,
domain_name)
def cleanup(environ):
post_vars, body_file = environ['webob._parsed_post_vars']
# for large post request, the body is also copied to a tempfile by webob
if not isinstance(body_file, bytes):
body_file.close()
for f in post_vars.keys():
item = post_vars[f]
if hasattr(item, 'file'):
item.file.close()
if 'webob._parsed_post_vars' in state.request.environ:
cleanup(state.request.environ)
# The following ctx object will be output in the logger as
# something like this:
# [req-088ed3b6-a2c9-483e-b2ad-f1b2d03e06e6 3d76d3c1376744e8ad9916a6c3be3e5f ca53e70c76d847fd860693f8eb301546]

View File

@ -752,6 +752,11 @@ ERROR_LOAD_STATE = 'error'
DELETING_LOAD_STATE = 'deleting'
DELETE_LOAD_SCRIPT = '/etc/sysinv/upgrades/delete_load.sh'
IMPORTED_LOAD_MAX_COUNT = 1
LOAD_ISO = 'path_to_iso'
LOAD_SIGNATURE = 'path_to_sig'
IMPORT_LOAD_FILES = [LOAD_ISO, LOAD_SIGNATURE]
LOAD_FILES_STAGING_DIR = '/scratch/tmp_load'
# Ceph
CEPH_HEALTH_OK = 'HEALTH_OK'
@ -1619,3 +1624,6 @@ KUBE_INTEL_GPU_DEVICE_PLUGIN_LABEL = "intelgpu=enabled"
# Port on which ceph manager and ceph-mgr listens
CEPH_MGR_PORT = 7999
# Tempdir for temporary storage of large post data
SYSINV_TMPDIR = '/scratch/sysinv-tmpdir'

View File

@ -9096,6 +9096,10 @@ class ConductorManager(service.PeriodicService):
raise exception.SysinvException(_(
"Failure during sw-patch init-release"))
# Remove load files in staging dir
shutil.rmtree(constants.LOAD_FILES_STAGING_DIR)
LOG.info("Load import completed.")
return True
def delete_load(self, context, load_id):
@ -9124,7 +9128,9 @@ class ConductorManager(service.PeriodicService):
except exception.NodeNotFound:
# The mate controller has not been configured so complete the
# deletion of the load now.
self.finalize_delete_load(context)
self.finalize_delete_load(context, load.software_version)
LOG.info("Load (%s) deleted." % load.software_version)
def _cleanup_load(self, load):
# Run the sw-patch del-release commands
@ -9138,11 +9144,6 @@ class ConductorManager(service.PeriodicService):
raise exception.SysinvException(_(
"Failure during sw-patch del-release"))
# delete the central patch vault if it exists
dc_vault = '/opt/dc-vault/' + load.software_version
if os.path.exists(dc_vault):
shutil.rmtree(dc_vault)
cleanup_script = constants.DELETE_LOAD_SCRIPT
if os.path.isfile(cleanup_script):
with open(os.devnull, "w") as fnull:
@ -9157,10 +9158,15 @@ class ConductorManager(service.PeriodicService):
raise exception.SysinvException(_(
"Cleanup script %s does not exist.") % cleanup_script)
def finalize_delete_load(self, context):
def finalize_delete_load(self, context, sw_version):
# Clean up the staging directory in case an error occur during the
# import and this directoy did not get cleaned up.
if os.path.exists(constants.LOAD_FILES_STAGING_DIR):
shutil.rmtree(constants.LOAD_FILES_STAGING_DIR)
loads = self.dbapi.load_get_list()
for load in loads:
if load.state == constants.DELETING_LOAD_STATE:
if load.software_version == sw_version:
self.dbapi.load_destroy(load.id)
def upgrade_ihost_pxe_config(self, context, host, load):

View File

@ -1220,14 +1220,16 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
self.make_msg('delete_load',
load_id=load_id))
def finalize_delete_load(self, context):
def finalize_delete_load(self, context, sw_version):
"""Asynchronously, delete the load from the database
:param context: request context.
:param sw_version: software version of load to be deleted
:returns: none.
"""
return self.cast(context,
self.make_msg('finalize_delete_load'))
self.make_msg('finalize_delete_load',
sw_version=sw_version))
def load_update_by_host(self, context, ihost_id, version):
"""Update the host_upgrade table with the running SW_VERSION

View File

@ -18,6 +18,7 @@
"""Base classes for API tests."""
from oslo_config import cfg
import os.path
import mock
import pecan
import pecan.testing
@ -72,7 +73,7 @@ class FunctionalTest(base.TestCase):
'acl_public_routes': ['/', '/v1'],
},
}
os.path.isdir = mock.Mock(return_value=True)
return pecan.testing.load_test_app(self.config)
def tearDown(self):