
558 lines
21 KiB

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2017 UnitedStack Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Copyright (c) 2013-2018 Wind River Systems, Inc.
import copy
import jsonpatch
import os
import six
import pecan
from pecan import rest
import wsme
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from sysinv.api.controllers.v1 import base
from sysinv.api.controllers.v1 import collection
from sysinv.api.controllers.v1 import link
from sysinv.api.controllers.v1 import types
from sysinv.api.controllers.v1 import utils
from sysinv.api.controllers.v1 import storage as storage_api
from sysinv.common import ceph
from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import utils as cutils
from sysinv import objects
from sysinv.openstack.common.gettextutils import _
from sysinv.openstack.common import log
from sysinv.openstack.common import uuidutils
LOG = log.getLogger(__name__)
class StorageTierPatchType(types.JsonPatchType):
def mandatory_attrs():
return ['/cluster_uuid']
class StorageTier(base.APIBase):
"""API representation of a Storage Tier.
This class enforces type checking and value constraints, and converts
between the internal object model and the API representation of
a storage tier.
uuid = types.uuid
"Unique UUID for this storage tier"
name = wtypes.text
"Storage tier name"
type = wtypes.text
"Storage tier type"
status = wtypes.text
"Storage tier status"
capabilities = {wtypes.text: utils.ValidTypes(wtypes.text,
"Storage tier meta data"
forbackendid = int
"The storage backend that is using this storage tier"
backend_uuid = types.uuid
"The UUID of the storage backend that is using this storage tier"
forclusterid = int
"The storage cluster that this storage tier belongs to"
cluster_uuid = types.uuid
"The UUID of the storage cluster this storage tier belongs to"
stors = types.MultiType([list])
"List of OSD ids associated with this tier"
links = [link.Link]
"A list containing a self link and associated storage tier links"
istors = [link.Link]
"Links to the collection of OSDs on this storage tier"
def __init__(self, **kwargs):
self.fields = objects.storage_tier.fields.keys()
for k in self.fields:
setattr(self, k, kwargs.get(k))
if not self.uuid:
self.uuid = uuidutils.generate_uuid()
def convert_with_links(cls, rpc_tier, expand=True):
tier = StorageTier(**rpc_tier.as_dict())
if not expand:
'uuid', 'name', 'type', 'status', 'capabilities',
'backend_uuid', 'cluster_uuid', 'stors', 'created_at',
# Don't expose ID attributes.
tier.forbackendid = wtypes.Unset
tier.forclusterid = wtypes.Unset
tier.links = [link.Link.make_link('self', pecan.request.host_url,
'storage_tiers', tier.uuid),
'storage_tiers', tier.uuid,
if expand:
tier.istors = [link.Link.make_link('self',
tier.uuid + "/istors"),
tier.uuid + "/istors",
return tier
class StorageTierCollection(collection.Collection):
"""API representation of a collection of StorageTier."""
storage_tiers = [StorageTier]
"A list containing StorageTier objects"
def __init__(self, **kwargs):
self._type = 'storage_tiers'
def convert_with_links(cls, rpc_tiers, limit, url=None,
expand=False, **kwargs):
collection = StorageTierCollection()
collection.storage_tiers = [StorageTier.convert_with_links(p, expand)
for p in rpc_tiers] = collection.get_next(limit, url=url, **kwargs)
return collection
LOCK_NAME = 'StorageTierController'
class StorageTierController(rest.RestController):
"""REST controller for storage tiers."""
istors = storage_api.StorageController(from_tier=True)
"Expose istors as a sub-element of storage_tier"
_custom_actions = {
'detail': ['GET'],
def __init__(self, from_cluster=False, **kwargs):
self._from_cluster = from_cluster
self._ceph = ceph.CephApiOperator()
def _get_tiers_collection(self, uuid, marker, limit, sort_key,
sort_dir, expand=False, resource_url=None):
if self._from_cluster and not uuid:
raise exception.InvalidParameterValue(_(
"Cluster id not specified."))
limit = utils.validate_limit(limit)
sort_dir = utils.validate_sort_dir(sort_dir)
marker_obj = None
if marker:
marker_obj = objects.storage_tier.get_by_uuid(pecan.request.context,
if self._from_cluster:
storage_tiers = pecan.request.dbapi.storage_tier_get_by_cluster(
uuid, limit=limit, marker=marker_obj,
sort_key=sort_key, sort_dir=sort_dir)
storage_tiers = pecan.request.dbapi.storage_tier_get_list(limit, marker_obj,
return StorageTierCollection.convert_with_links(
storage_tiers, limit, url=resource_url, expand=expand,
sort_key=sort_key, sort_dir=sort_dir)
@wsme_pecan.wsexpose(StorageTierCollection, types.uuid, types.uuid, int,
wtypes.text, wtypes.text)
def get_all(self, uuid=None, marker=None, limit=None, sort_key='id', sort_dir='asc'):
"""Retrieve a list of storage tiers."""
return self._get_tiers_collection(uuid, marker, limit, sort_key,
@wsme_pecan.wsexpose(StorageTierCollection, types.uuid, types.uuid, int,
wtypes.text, wtypes.text)
def detail(self, tier_uuid=None, marker=None, limit=None,
sort_key='id', sort_dir='asc'):
"""Retrieve a list of storage tiers with detail."""
parent = pecan.request.path.split('/')[:-1][-1]
if parent != 'storage_tiers':
raise exception.HTTPNotFound
expand = True
resource_url = '/'.join(['storage_tiers', 'detail'])
return self._get_tiers_collection(tier_uuid, marker, limit,
sort_key, sort_dir, expand,
@wsme_pecan.wsexpose(StorageTier, types.uuid)
def get_one(self, tier_uuid):
"""Retrieve information about the given storage tier."""
if self._from_cluster:
raise exception.OperationNotPermitted
rpc_tier = objects.storage_tier.get_by_uuid(pecan.request.context,
return StorageTier.convert_with_links(rpc_tier)
@wsme_pecan.wsexpose(StorageTier, body=StorageTier)
def post(self, tier):
"""Create a new storage tier."""
if self._from_cluster:
raise exception.OperationNotPermitted
tier = tier.as_dict()
LOG.debug("storage tier post dict= %s" % tier)
new_tier = _create(self, tier)
except exception.SysinvException as e:
raise wsme.exc.ClientSideError(_("Invalid data: failed to create "
"a storage tier object"))
return StorageTier.convert_with_links(new_tier)
@wsme.validate(types.uuid, [StorageTierPatchType])
@wsme_pecan.wsexpose(StorageTier, types.uuid,
def patch(self, tier_uuid, patch):
"""Update an existing storage tier."""
if self._from_cluster:
raise exception.OperationNotPermitted
LOG.debug("patch_data: %s" % patch)
rpc_tier = objects.storage_tier.get_by_uuid(pecan.request.context,
patch_obj = jsonpatch.JsonPatch(patch)
for p in patch_obj:
if p['path'] == '/backend_uuid':
p['path'] = '/forbackendid'
backend = objects.storage_backend.get_by_uuid(pecan.request.context,
p['value'] =
elif p['path'] == '/cluster_uuid':
p['path'] = '/forclusterid'
cluster = objects.cluster.get_by_uuid(pecan.request.context,
p['value'] =
otier = copy.deepcopy(rpc_tier)
# Validate provided patch data meets validity checks
_pre_patch_checks(rpc_tier, patch_obj)
tier = StorageTier(**jsonpatch.apply_patch(rpc_tier.as_dict(),
except utils.JSONPATCH_EXCEPTIONS as e:
raise exception.PatchError(patch=patch, reason=e)
# Semantic Checks
_check(self, "modify", tier.as_dict())
# Update only the fields that have changed
for field in objects.storage_tier.fields:
if rpc_tier[field] != getattr(tier, field):
rpc_tier[field] = getattr(tier, field)
# Obtain the fields that have changed.
delta = rpc_tier.obj_what_changed()
if len(delta) == 0:
raise wsme.exc.ClientSideError(
_("No changes to the existing tier settings were detected."))
allowed_attributes = ['name']
for d in delta:
if d not in allowed_attributes:
raise wsme.exc.ClientSideError(
_("Cannot modify '%s' with this operation." % d))"SYS_I orig storage_tier: %s " % otier.as_dict())"SYS_I new storage_tier: %s " % rpc_tier.as_dict())
# Save and return
return StorageTier.convert_with_links(rpc_tier)
except exception.HTTPNotFound:
msg = _("Storage Tier update failed: backend %s storage tier %s : patch %s"
% (backend['name'], tier['name'], patch))
raise wsme.exc.ClientSideError(msg)
@wsme_pecan.wsexpose(None, types.uuid, status_code=204)
def delete(self, tier_uuid):
"""Delete a storage tier."""
if self._from_cluster:
raise exception.OperationNotPermitted
_delete(self, tier_uuid)
def _check_parameters(tier):
# check and fill in the cluster information
clusterId = tier.get('forclusterid') or tier.get('cluster_uuid')
if not clusterId:
raise wsme.exc.ClientSideError(_("No cluster information was provided "
"for tier creation."))
cluster = pecan.request.dbapi.cluster_get(clusterId)
if uuidutils.is_uuid_like(clusterId):
forclusterid = cluster['id']
forclusterid = clusterId
tier.update({'forclusterid': forclusterid})
# Make sure that the default system tier is present
default_tier_name = constants.SB_TIER_DEFAULT_NAMES[constants.SB_TIER_TYPE_CEPH]
if 'name' not in tier or tier['name'] != default_tier_name:
tiers = pecan.request.dbapi.storage_tier_get_all(name=default_tier_name)
if len(tiers) == 0:
raise wsme.exc.ClientSideError(
_("Default system storage tier (%s) must be present before "
"adding additional tiers." % default_tier_name))
def _pre_patch_checks(tier_obj, patch_obj):
for p in patch_obj:
if p['path'] == '/name':
if == constants.SB_TIER_DEFAULT_NAMES[
raise wsme.exc.ClientSideError(
_("Storage Tier %s cannot be renamed.") %
if tier_obj.status == constants.SB_TIER_STATUS_IN_USE:
raise wsme.exc.ClientSideError(
_("Storage Tier %s cannot be renamed. It is %s") %
(, constants.SB_TIER_STATUS_IN_USE))
elif p['path'] == '/capabilities':
raise wsme.exc.ClientSideError(
_("The capabilities of storage tier %s cannot be "
"changed.") %
elif p['path'] == '/backend_uuid':
raise wsme.exc.ClientSideError(
_("The storage_backend associated with storage tier %s "
"cannot be changed.") %
elif p['path'] == '/cluster_uuid':
raise wsme.exc.ClientSideError(
_("The storage_backend associated with storage tier %s "
"cannot be changed.") %
def _check(self, op, tier):
# Semantic checks
LOG.debug("storage_tier: Semantic check for %s operation" % op)
# Check storage tier parameters
if op == "add":
# See if this storage tier already exists
tiers = pecan.request.dbapi.storage_tier_get_all(name=tier['name'])
if len(tiers) != 0:
raise wsme.exc.ClientSideError(_("Storage tier (%s) "
"already present." %
# Deny adding secondary tier if initial configuration is not done.
if not cutils.is_initial_config_complete():
msg = _("Operation denied. Adding secondary tiers to a cluster requires "
"initial configuration to be complete and controller node unlocked.")
raise wsme.exc.ClientSideError(msg)
if cutils.is_aio_system(pecan.request.dbapi):
# Deny adding secondary tiers if primary tier backend is not configured
# for cluster.
clusterId = tier.get('forclusterid') or tier.get('cluster_uuid')
cluster_tiers = pecan.request.dbapi.storage_tier_get_by_cluster(clusterId)
configured = False if cluster_tiers else True
for t in cluster_tiers:
if t.forbackendid:
bk = pecan.request.dbapi.storage_backend_get(t.forbackendid)
if bk.state != constants.SB_STATE_CONFIGURED:
msg = _("Operation denied. Storage backend '%s' "
"of tier '%s' must be in '%s' state."
% (, t['name'], constants.SB_STATE_CONFIGURED))
raise wsme.exc.ClientSideError(msg)
configured = True
if not configured:
msg = _("Operation denied. Adding secondary tiers to a cluster requires "
"primary tier storage backend of this cluster to be configured.")
raise wsme.exc.ClientSideError(msg)
# Deny adding secondary tier if ceph is down on standard
num_monitors, required_monitors, __ = \
if num_monitors < required_monitors:
raise wsme.exc.ClientSideError(_(
"Operation denied. Ceph is not operational. "
"Only %d storage monitor available. "
"At least %s unlocked and enabled hosts with "
"monitors are required. Please ensure hosts "
"with monitors are unlocked and enabled.") %
(num_monitors, required_monitors))
elif op == "delete":
if tier['name'] == constants.SB_TIER_DEFAULT_NAMES[
raise wsme.exc.ClientSideError(_("Storage Tier %s cannot be "
"deleted.") % tier['name'])
if tier['status'] != constants.SB_TIER_STATUS_DEFINED:
raise wsme.exc.ClientSideError(_("Storage Tier %s cannot be "
"deleted. It is %s") % (
elif op == "modify":
raise wsme.exc.ClientSideError(
_("Internal Error: Invalid storage tier operation: %s" % op))
return tier
def _set_defaults(tier):
defaults = {
'name': constants.SB_TIER_DEFAULT_NAMES[constants.SB_TIER_TYPE_CEPH],
'type': constants.SB_TIER_TYPE_CEPH,
'status': constants.SB_TIER_STATUS_DEFINED,
'capabilities': {},
'stors': [],
tier_merged = tier.copy()
for key in tier_merged:
if tier_merged[key] is None and key in defaults:
tier_merged[key] = defaults[key]
return tier_merged
# This method allows creating a storage tier through a non-HTTP
# request e.g. through while still passing
# through physical volume semantic checks and osd configuration
# Hence, not declared inside a class
# Param:
# tier - dictionary of storage tier values
# iprofile - True when created by a storage profile
def _create(self, tier, iprofile=None):"storage_tier._create with initial params: %s" % tier)
# Set defaults - before checks to allow for optional attributes
tier = _set_defaults(tier)
# Semantic checks
tier = _check(self, "add", tier)"storage_tier._create with validated params: %s" % tier)
ret_tier = pecan.request.dbapi.storage_tier_create(tier)"storage_tier._create final, created, tier: %s" %
# update the crushmap with the new tier
# If we are adding a tier where the crushmap file has yet to be applied,
# then set the crushmap first. This will also add this new tier to the
# crushmap, otherwise just add the new tier.
crushmap_flag_file = os.path.join(constants.SYSINV_CONFIG_PATH,
if not os.path.isfile(crushmap_flag_file):
except exception.CephCrushMapNotApplied as e:
LOG.warning("Crushmap not applied, seems like ceph cluster is not configured. "
"Operation will be retried with first occasion. "
"Reason: %s" % str(e))
except (exception.CephCrushMaxRecursion,
exception.CephCrushInvalidTierUse) as e:
raise wsme.exc.ClientSideError(_("Failed to update the crushmap for "
"tier: %s - %s") % (, e))
return ret_tier
def _delete(self, tier_uuid):
"""Delete a storage tier"""
tier = objects.storage_tier.get_by_uuid(pecan.request.context, tier_uuid)
# Semantic checks
_check(self, "delete", tier.as_dict())
# update the crushmap by removing the tier
except exception.CephCrushMapNotApplied:
# If crushmap has not been applied then there is no rule to update.
except exception.HTTPNotFound:
msg = _("Failed to delete storage tier %s." %
raise wsme.exc.ClientSideError(msg)