Merge "Ceph: update crush map when storage tier is renamed"
This commit is contained in:
commit
8d39303a5b
|
@ -282,6 +282,7 @@ class StorageTierController(rest.RestController):
|
|||
tier_uuid)
|
||||
|
||||
patch_obj = jsonpatch.JsonPatch(patch)
|
||||
backend = dict(name='*unknown*')
|
||||
for p in patch_obj:
|
||||
if p['path'] == '/backend_uuid':
|
||||
p['path'] = '/forbackendid'
|
||||
|
@ -327,12 +328,20 @@ class StorageTierController(rest.RestController):
|
|||
LOG.info("SYS_I orig storage_tier: %s " % otier.as_dict())
|
||||
LOG.info("SYS_I new storage_tier: %s " % rpc_tier.as_dict())
|
||||
|
||||
if 'name' in delta:
|
||||
default_tier_name = constants.SB_TIER_DEFAULT_NAMES[constants.SB_TIER_TYPE_CEPH]
|
||||
if rpc_tier.name == default_tier_name:
|
||||
raise wsme.exc.ClientSideError(
|
||||
_("Cannot modify tier '%s'. Name '%s' is used "
|
||||
"by the default tier" % (otier.name, rpc_tier.name)))
|
||||
self._ceph.crushmap_tier_rename(otier.name, rpc_tier.name)
|
||||
|
||||
# Save and return
|
||||
rpc_tier.save()
|
||||
return StorageTier.convert_with_links(rpc_tier)
|
||||
except exception.HTTPNotFound:
|
||||
msg = _("Storage Tier update failed: backend %s storage tier %s : patch %s"
|
||||
% (backend['name'], tier['name'], patch))
|
||||
except (exception.HTTPNotFound, exception.CephFailure) as e:
|
||||
msg = _("Storage Tier update failed: backend %s storage tier %s : patch %s. "
|
||||
" Reason: %s") % (backend['name'], otier['name'], patch, str(e))
|
||||
raise wsme.exc.ClientSideError(msg)
|
||||
|
||||
@cutils.synchronized(LOCK_NAME)
|
||||
|
|
|
@ -17,9 +17,11 @@ import subprocess
|
|||
import os
|
||||
import pecan
|
||||
import requests
|
||||
import tempfile
|
||||
|
||||
from cephclient import wrapper as ceph
|
||||
from requests.exceptions import ReadTimeout
|
||||
from contextlib import contextmanager
|
||||
|
||||
from sysinv.common import constants
|
||||
from sysinv.common import exception
|
||||
|
@ -47,6 +49,12 @@ class CephApiOperator(object):
|
|||
return name
|
||||
return name + constants.CEPH_CRUSH_TIER_SUFFIX
|
||||
|
||||
@staticmethod
|
||||
def _format_rule_name(name):
|
||||
return "{0}{1}{2}".format(
|
||||
name, constants.CEPH_CRUSH_TIER_SUFFIX,
|
||||
"-ruleset").replace('-', '_')
|
||||
|
||||
def _crush_rule_status(self, tier_name):
|
||||
present = False
|
||||
|
||||
|
@ -457,6 +465,79 @@ class CephApiOperator(object):
|
|||
ancestor_type,
|
||||
ancestor_name)
|
||||
|
||||
def _crushmap_tier_rename(self, old_name, new_name):
|
||||
old_root_name = self._format_root_name(old_name)
|
||||
new_root_name = self._format_root_name(new_name)
|
||||
response, body = self._ceph_api.osd_crush_dump(body='json')
|
||||
if response.status_code != requests.codes.ok:
|
||||
raise exception.CephFailure(reason=response.reason)
|
||||
# build map of buckets to be renamed
|
||||
rename_map = {}
|
||||
for buck in body['output']['buckets']:
|
||||
name = buck['name']
|
||||
if buck['type_name'] == 'root':
|
||||
if name == old_root_name:
|
||||
rename_map[name] = new_root_name
|
||||
else:
|
||||
old_suffix = '-{}'.format(old_name)
|
||||
new_suffix = '-{}'.format(new_name)
|
||||
if name.endswith(old_suffix):
|
||||
rename_map[name] = name[:-len(old_suffix)] + new_suffix
|
||||
conflicts = set(b['name'] for b in body['output']['buckets']) \
|
||||
.intersection(set(rename_map.values()))
|
||||
if conflicts:
|
||||
raise exception.CephCrushTierRenameFailure(
|
||||
tier=old_name, reason=(
|
||||
"Target buckets already exist: %s"
|
||||
% ', '.join(conflicts)))
|
||||
old_rule_name = self._format_rule_name(old_name)
|
||||
new_rule_name = self._format_rule_name(new_name)
|
||||
response, body = self._ceph_api.osd_crush_rule_dump(new_rule_name)
|
||||
if response.status_code == requests.codes.ok:
|
||||
raise exception.CephCrushTierRenameFailure(
|
||||
tier=old_name, reason=(
|
||||
"Target ruleset already exists %s" % new_rule_name))
|
||||
for _from, _to in rename_map.items():
|
||||
LOG.info("Rename bucket from '%s' to '%s'", _from, _to)
|
||||
response, body = self._ceph_api.osd_crush_rename_bucket(_from, _to)
|
||||
if response.status_code != requests.codes.ok:
|
||||
raise exception.CephCrushTierRenameFailure(
|
||||
tier=old_name, reason=response.reason)
|
||||
LOG.info("Rename crush rule from '%s' to '%s'",
|
||||
old_rule_name, new_rule_name)
|
||||
response, body = self._ceph_api.osd_crush_rule_rename(
|
||||
old_rule_name, new_rule_name)
|
||||
if response.status_code != requests.codes.ok:
|
||||
raise exception.CephCrushTierRenameFailure(
|
||||
tier=old_name, reason=response.reason)
|
||||
|
||||
def crushmap_tier_rename(self, old_name, new_name):
|
||||
with self.safe_crushmap_update():
|
||||
self._crushmap_tier_rename(old_name, new_name)
|
||||
|
||||
@contextmanager
|
||||
def safe_crushmap_update(self):
|
||||
with open(os.devnull, 'w') as fnull, tempfile.TemporaryFile() as backup:
|
||||
LOG.info("Saving crushmap for safe update")
|
||||
try:
|
||||
subprocess.check_call(
|
||||
"ceph osd getcrushmap",
|
||||
stdin=fnull, stdout=backup, stderr=fnull,
|
||||
shell=True)
|
||||
except subprocess.CalledProcessError as exc:
|
||||
raise exception.CephFailure(
|
||||
"failed to backup crushmap: %s" % str(exc))
|
||||
try:
|
||||
yield
|
||||
except exception.CephFailure:
|
||||
backup.seek(0, os.SEEK_SET)
|
||||
LOG.warn("Crushmap update failed. Restoring from backup")
|
||||
subprocess.call(
|
||||
"ceph osd setcrushmap",
|
||||
stdin=backup, stdout=fnull, stderr=fnull,
|
||||
shell=True)
|
||||
raise
|
||||
|
||||
def ceph_status_ok(self, timeout=10):
|
||||
"""
|
||||
returns rc bool. True if ceph ok, False otherwise
|
||||
|
|
|
@ -156,7 +156,7 @@ class CephCrushMapNotApplied(CephFailure):
|
|||
|
||||
|
||||
class CephCrushMaxRecursion(CephFailure):
|
||||
message = _("Mirroring crushmap root failed after reaching unexpected recursion "
|
||||
message = _("Processing crushmap failed after reaching unexpected recursion "
|
||||
"level of %(depth)s.")
|
||||
|
||||
|
||||
|
@ -168,6 +168,10 @@ class CephCrushTierAlreadyExists(CephCrushInvalidTierUse):
|
|||
message = _("Tier '%(tier)s' already exists")
|
||||
|
||||
|
||||
class CephCrushTierRenameFailure(CephCrushInvalidTierUse):
|
||||
message = _("Tier '%(tier)s' cannot be renamed. %(reason)s")
|
||||
|
||||
|
||||
class CephCrushInvalidRuleOperation(CephFailure):
|
||||
message = _("Cannot perform operation on rule '%(rule)s'. %(reason)s")
|
||||
|
||||
|
|
|
@ -339,10 +339,12 @@ class StorageTierIndependentTCs(base.FunctionalTest):
|
|||
patch_response.json['error_message'])
|
||||
|
||||
# Other Defined: name
|
||||
patch_response = self.patch_dict_json('/storage_tiers/%s' % confirm['uuid'],
|
||||
headers={'User-Agent': 'sysinv'},
|
||||
name='newname',
|
||||
expect_errors=True)
|
||||
with mock.patch.object(ceph_utils.CephApiOperator, 'crushmap_tier_rename'):
|
||||
patch_response = self.patch_dict_json(
|
||||
'/storage_tiers/%s' % confirm['uuid'],
|
||||
headers={'User-Agent': 'sysinv'},
|
||||
name='newname',
|
||||
expect_errors=True)
|
||||
self.assertEqual(http_client.OK, patch_response.status_int)
|
||||
self.assertEqual('newname', # Expected
|
||||
self.get_json('/storage_tiers/%s/' % patch_response.json['uuid'])['name']) # Result
|
||||
|
|
Loading…
Reference in New Issue