diff --git a/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py b/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py deleted file mode 100644 index 4e814c3b..00000000 --- a/ceph-manager/ceph-manager/ceph_manager/cache_tiering.py +++ /dev/null @@ -1,705 +0,0 @@ -# -# Copyright (c) 2016 Wind River Systems, Inc. -# -# SPDX-License-Identifier: Apache-2.0 -# - -import copy -import contextlib -import functools -import math -import subprocess -import time -import traceback -# noinspection PyUnresolvedReferences -import eventlet -# noinspection PyUnresolvedReferences -from eventlet.semaphore import Semaphore -# noinspection PyUnresolvedReferences -from oslo_log import log as logging -# noinspection PyUnresolvedReferences -from sysinv.conductor.cache_tiering_service_config import ServiceConfig - -from i18n import _LI, _LW, _LE - -import constants -import exception -import ceph - -LOG = logging.getLogger(__name__) -CEPH_POOLS = copy.deepcopy(constants.CEPH_POOLS) - -MAX_WAIT = constants.CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC -MIN_WAIT = constants.CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC - - -class LockOwnership(object): - def __init__(self, sem): - self.sem = sem - - @contextlib.contextmanager - def __call__(self): - try: - yield - finally: - if self.sem: - self.sem.release() - - def transfer(self): - new_lo = LockOwnership(self.sem) - self.sem = None - return new_lo - - -class Lock(object): - - def __init__(self): - self.sem = Semaphore(value=1) - - def try_lock(self): - result = self.sem.acquire(blocking=False) - if result: - return LockOwnership(self.sem) - - -class CacheTiering(object): - - def __init__(self, service): - self.service = service - self.lock = Lock() - # will be unlocked by set_initial_config() - self._init_config_lock = self.lock.try_lock() - self.config = None - self.config_desired = None - self.config_applied = None - self.target_max_bytes = {} - - def set_initial_config(self, config): - with self._init_config_lock(): - LOG.info("Setting Ceph cache tiering initial configuration") - self.config = ServiceConfig.from_dict( - config.get(constants.CACHE_TIERING, {})) or \ - ServiceConfig() - self.config_desired = ServiceConfig.from_dict( - config.get(constants.CACHE_TIERING_DESIRED, {})) or \ - ServiceConfig() - self.config_applied = ServiceConfig.from_dict( - config.get(constants.CACHE_TIERING_APPLIED, {})) or \ - ServiceConfig() - if self.config_desired: - LOG.debug("set_initial_config config_desired %s " % - self.config_desired.to_dict()) - if self.config_applied: - LOG.debug("set_initial_config config_applied %s " % - self.config_applied.to_dict()) - - # Check that previous caching tier operation completed - # successfully or perform recovery - if (self.config_desired and - self.config_applied and - (self.config_desired.cache_enabled != - self.config_applied.cache_enabled)): - if self.config_desired.cache_enabled: - self.enable_cache(self.config_desired.to_dict(), - self.config_applied.to_dict(), - self._init_config_lock.transfer()) - else: - self.disable_cache(self.config_desired.to_dict(), - self.config_applied.to_dict(), - self._init_config_lock.transfer()) - - def is_locked(self): - lock_ownership = self.lock.try_lock() - if not lock_ownership: - return True - with lock_ownership(): - return False - - def update_pools_info(self): - global CEPH_POOLS - cfg = self.service.sysinv_conductor.call( - {}, 'get_ceph_pools_config') - CEPH_POOLS = copy.deepcopy(cfg) - LOG.info(_LI("update_pools_info: pools: {}").format(CEPH_POOLS)) - - def enable_cache(self, new_config, applied_config, lock_ownership=None): - new_config = ServiceConfig.from_dict(new_config) - applied_config = ServiceConfig.from_dict(applied_config) - if not lock_ownership: - lock_ownership = self.lock.try_lock() - if not lock_ownership: - raise exception.CephCacheEnableFailure() - with lock_ownership(): - eventlet.spawn(self.do_enable_cache, - new_config, applied_config, - lock_ownership.transfer()) - - def do_enable_cache(self, new_config, applied_config, lock_ownership): - LOG.info(_LI("cache_tiering_enable_cache: " - "new_config={}, applied_config={}").format( - new_config.to_dict(), applied_config.to_dict())) - _unwind_actions = [] - with lock_ownership(): - success = False - _exception = None - try: - self.config_desired.cache_enabled = True - self.update_pools_info() - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - self.cache_pool_create(pool) - _unwind_actions.append( - functools.partial(self.cache_pool_delete, pool)) - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - self.cache_tier_add(pool) - _unwind_actions.append( - functools.partial(self.cache_tier_remove, pool)) - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - self.cache_mode_set(pool, 'writeback') - self.cache_pool_set_config(pool, new_config) - self.cache_overlay_create(pool) - success = True - except Exception as e: - LOG.error(_LE('Failed to enable cache: reason=%s') % - traceback.format_exc()) - for action in reversed(_unwind_actions): - try: - action() - except Exception: - LOG.warn(_LW('Failed cache enable ' - 'unwind action: reason=%s') % - traceback.format_exc()) - success = False - _exception = str(e) - finally: - self.service.monitor.monitor_check_cache_tier(success) - if success: - self.config_applied.cache_enabled = True - self.service.sysinv_conductor.call( - {}, 'cache_tiering_enable_cache_complete', - success=success, exception=_exception, - new_config=new_config.to_dict(), - applied_config=applied_config.to_dict()) - # Run first update of periodic target_max_bytes - self.update_cache_target_max_bytes() - - @contextlib.contextmanager - def ignore_ceph_failure(self): - try: - yield - except exception.CephManagerException: - pass - - def disable_cache(self, new_config, applied_config, lock_ownership=None): - new_config = ServiceConfig.from_dict(new_config) - applied_config = ServiceConfig.from_dict(applied_config) - if not lock_ownership: - lock_ownership = self.lock.try_lock() - if not lock_ownership: - raise exception.CephCacheDisableFailure() - with lock_ownership(): - eventlet.spawn(self.do_disable_cache, - new_config, applied_config, - lock_ownership.transfer()) - - def do_disable_cache(self, new_config, applied_config, lock_ownership): - LOG.info(_LI("cache_tiering_disable_cache: " - "new_config={}, applied_config={}").format( - new_config, applied_config)) - with lock_ownership(): - success = False - _exception = None - try: - self.config_desired.cache_enabled = False - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - with self.ignore_ceph_failure(): - self.cache_mode_set( - pool, 'forward') - - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - retries_left = 3 - while True: - try: - self.cache_flush(pool) - break - except exception.CephCacheFlushFailure: - retries_left -= 1 - if not retries_left: - # give up - break - else: - time.sleep(1) - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - with self.ignore_ceph_failure(): - self.cache_overlay_delete(pool) - self.cache_tier_remove(pool) - for pool in CEPH_POOLS: - if (pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or - pool['pool_name'] == - constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER): - object_pool_name = \ - self.service.monitor._get_object_pool_name() - pool['pool_name'] = object_pool_name - - with self.ignore_ceph_failure(): - self.cache_pool_delete(pool) - success = True - except Exception as e: - LOG.warn(_LE('Failed to disable cache: reason=%s') % - traceback.format_exc()) - _exception = str(e) - finally: - self.service.monitor.monitor_check_cache_tier(False) - if success: - self.config_desired.cache_enabled = False - self.config_applied.cache_enabled = False - self.service.sysinv_conductor.call( - {}, 'cache_tiering_disable_cache_complete', - success=success, exception=_exception, - new_config=new_config.to_dict(), - applied_config=applied_config.to_dict()) - - def get_pool_pg_num(self, pool_name): - return self.service.sysinv_conductor.call( - {}, 'get_pool_pg_num', - pool_name=pool_name) - - def cache_pool_create(self, pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - pg_num = self.get_pool_pg_num(cache_pool) - if not ceph.osd_pool_exists(self.service.ceph_api, cache_pool): - ceph.osd_pool_create( - self.service.ceph_api, cache_pool, - pg_num, pg_num) - - def cache_pool_delete(self, pool): - cache_pool = pool['pool_name'] + '-cache' - ceph.osd_pool_delete( - self.service.ceph_api, cache_pool) - - def cache_tier_add(self, pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - response, body = self.service.ceph_api.osd_tier_add( - backing_pool, cache_pool, - force_nonempty="--force-nonempty", - body='json') - if response.ok: - LOG.info(_LI("Added OSD tier: " - "backing_pool={}, cache_pool={}").format( - backing_pool, cache_pool)) - else: - e = exception.CephPoolAddTierFailure( - backing_pool=backing_pool, - cache_pool=cache_pool, - response_status_code=response.status_code, - response_reason=response.reason, - status=body.get('status'), - output=body.get('output')) - LOG.warn(e) - raise e - - def cache_tier_remove(self, pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - response, body = self.service.ceph_api.osd_tier_remove( - backing_pool, cache_pool, body='json') - if response.ok: - LOG.info(_LI("Removed OSD tier: " - "backing_pool={}, cache_pool={}").format( - backing_pool, cache_pool)) - else: - e = exception.CephPoolRemoveTierFailure( - backing_pool=backing_pool, - cache_pool=cache_pool, - response_status_code=response.status_code, - response_reason=response.reason, - status=body.get('status'), - output=body.get('output')) - LOG.warn(e) - raise e - - def cache_mode_set(self, pool, mode): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - response, body = self.service.ceph_api.osd_tier_cachemode( - cache_pool, mode, body='json') - if response.ok: - LOG.info(_LI("Set OSD tier cache mode: " - "cache_pool={}, mode={}").format(cache_pool, mode)) - else: - e = exception.CephCacheSetModeFailure( - cache_pool=cache_pool, - mode=mode, - response_status_code=response.status_code, - response_reason=response.reason, - status=body.get('status'), - output=body.get('output')) - LOG.warn(e) - raise e - - def cache_pool_set_config(self, pool, config): - for name, value in config.params.iteritems(): - self.cache_pool_set_param(pool, name, value) - - def cache_pool_set_param(self, pool, name, value): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - ceph.osd_set_pool_param( - self.service.ceph_api, cache_pool, name, value) - - def cache_overlay_create(self, pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - response, body = self.service.ceph_api.osd_tier_set_overlay( - backing_pool, cache_pool, body='json') - if response.ok: - LOG.info(_LI("Set OSD tier overlay: " - "backing_pool={}, cache_pool={}").format( - backing_pool, cache_pool)) - else: - e = exception.CephCacheCreateOverlayFailure( - backing_pool=backing_pool, - cache_pool=cache_pool, - response_status_code=response.status_code, - response_reason=response.reason, - status=body.get('status'), - output=body.get('output')) - LOG.warn(e) - raise e - - def cache_overlay_delete(self, pool): - backing_pool = pool['pool_name'] - cache_pool = pool['pool_name'] - response, body = self.service.ceph_api.osd_tier_remove_overlay( - backing_pool, body='json') - if response.ok: - LOG.info(_LI("Removed OSD tier overlay: " - "backing_pool={}").format(backing_pool)) - else: - e = exception.CephCacheDeleteOverlayFailure( - backing_pool=backing_pool, - cache_pool=cache_pool, - response_status_code=response.status_code, - response_reason=response.reason, - status=body.get('status'), - output=body.get('output')) - LOG.warn(e) - raise e - - @staticmethod - def rados_cache_flush_evict_all(pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - try: - subprocess.check_call( - ['/usr/bin/rados', '-p', cache_pool, 'cache-flush-evict-all']) - LOG.info(_LI("Flushed OSD cache pool:" - "cache_pool={}").format(cache_pool)) - except subprocess.CalledProcessError as e: - _e = exception.CephCacheFlushFailure( - cache_pool=cache_pool, - return_code=str(e.returncode), - cmd=" ".join(e.cmd), - output=e.output) - LOG.warn(_e) - raise _e - - def cache_flush(self, pool): - backing_pool = pool['pool_name'] - cache_pool = backing_pool + '-cache' - try: - # set target_max_objects to a small value to force evacuation of - # objects from cache before we use rados cache-flush-evict-all - # WARNING: assuming cache_pool will be deleted after flush so - # we don't have to save/restore the value of target_max_objects - # - self.cache_pool_set_param(pool, 'target_max_objects', 1) - prev_object_count = None - wait_interval = MIN_WAIT - while True: - response, body = self.service.ceph_api.df(body='json') - if not response.ok: - LOG.warn(_LW( - "Failed to retrieve cluster free space stats: " - "status_code=%d, reason=%s") % ( - response.status_code, response.reason)) - break - stats = None - for s in body['output']['pools']: - if s['name'] == cache_pool: - stats = s['stats'] - break - if not stats: - LOG.warn(_LW("Missing pool free space stats: " - "cache_pool=%s") % cache_pool) - break - object_count = stats['objects'] - if object_count < constants.CACHE_FLUSH_OBJECTS_THRESHOLD: - break - if prev_object_count is not None: - delta_objects = object_count - prev_object_count - if delta_objects > 0: - LOG.warn(_LW("Unexpected increase in number " - "of objects in cache pool: " - "cache_pool=%s, prev_object_count=%d, " - "object_count=%d") % ( - cache_pool, prev_object_count, - object_count)) - break - if delta_objects == 0: - wait_interval *= 2 - if wait_interval > MAX_WAIT: - LOG.warn(_LW( - "Cache pool number of objects did not " - "decrease: cache_pool=%s, object_count=%d, " - "wait_interval=%d") % ( - cache_pool, object_count, wait_interval)) - break - else: - wait_interval = MIN_WAIT - time.sleep(wait_interval) - prev_object_count = object_count - except exception.CephPoolSetParamFailure as e: - LOG.warn(e) - finally: - self.rados_cache_flush_evict_all(pool) - - def update_cache_target_max_bytes(self): - "Dynamically compute target_max_bytes of caching pools" - - # Only compute if cache tiering is enabled - if self.config_applied and self.config_desired: - if (not self.config_desired.cache_enabled or - not self.config_applied.cache_enabled): - LOG.debug("Cache tiering disabled, no need to update " - "target_max_bytes.") - return - LOG.debug("Updating target_max_bytes") - - # Get available space - response, body = self.service.ceph_api.osd_df(body='json', - output_method='tree') - if not response.ok: - LOG.warn(_LW( - "Failed to retrieve cluster free space stats: " - "status_code=%d, reason=%s") % ( - response.status_code, response.reason)) - return - - storage_tier_size = 0 - cache_tier_size = 0 - - replication = constants.CEPH_REPLICATION_FACTOR - for node in body['output']['nodes']: - if node['name'] == 'storage-tier': - storage_tier_size = node['kb']*1024/replication - elif node['name'] == 'cache-tier': - cache_tier_size = node['kb']*1024/replication - - if storage_tier_size == 0 or cache_tier_size == 0: - LOG.info("Failed to get cluster size " - "(storage_tier_size=%s, cache_tier_size=%s)," - "retrying on next cycle" % - (storage_tier_size, cache_tier_size)) - return - - # Get available pools - response, body = self.service.ceph_api.osd_lspools(body='json') - if not response.ok: - LOG.warn(_LW( - "Failed to retrieve available pools: " - "status_code=%d, reason=%s") % ( - response.status_code, response.reason)) - return - pools = [p['poolname'] for p in body['output']] - - # Separate backing from caching for easy iteration - backing_pools = [] - caching_pools = [] - for p in pools: - if p.endswith('-cache'): - caching_pools.append(p) - else: - backing_pools.append(p) - LOG.debug("Pools: caching: %s, backing: %s" % (caching_pools, - backing_pools)) - - if not len(caching_pools): - # We do not have caching pools created yet - return - - # Get quota from backing pools that are cached - stats = {} - for p in caching_pools: - backing_name = p.replace('-cache', '') - stats[backing_name] = {} - try: - quota = ceph.osd_pool_get_quota(self.service.ceph_api, - backing_name) - except exception.CephPoolGetQuotaFailure as e: - LOG.warn(_LW( - "Failed to retrieve quota: " - "exception: %s") % str(e)) - return - stats[backing_name]['quota'] = quota['max_bytes'] - stats[backing_name]['quota_pt'] = (quota['max_bytes']*100.0 / - storage_tier_size) - LOG.debug("Quota for pool: %s " - "is: %s B representing %s pt" % - (backing_name, - quota['max_bytes'], - stats[backing_name]['quota_pt'])) - - # target_max_bytes logic: - # - For computing target_max_bytes cache_tier_size must be equal than - # the sum of target_max_bytes of each caching pool - # - target_max_bytes for each caching pool is computed as the - # percentage of quota in corresponding backing pool - # - the caching tiers has to work at full capacity, so if the sum of - # all quotas in the backing tier is different than 100% we need to - # normalize - # - if the quota is zero for any pool we add CACHE_TIERING_MIN_QUOTA - # by default *after* normalization so that we have real minimum - - # We compute the real percentage that need to be normalized after - # ensuring that we have CACHE_TIERING_MIN_QUOTA for each pool with - # a quota of 0 - real_100pt = 90.0 # we start from max and decrease it for each 0 pool - # Note: We must avoid reaching 100% at all costs! and - # cache_target_full_ratio, the Ceph parameter that is supposed to - # protect the cluster against this does not work in Ceph v0.94.6! - # Therefore a value of 90% is better suited for this - for p in caching_pools: - backing_name = p.replace('-cache', '') - if stats[backing_name]['quota_pt'] == 0: - real_100pt -= constants.CACHE_TIERING_MIN_QUOTA - LOG.debug("Quota before normalization for %s is: %s pt" % - (p, stats[backing_name]['quota_pt'])) - - # Compute total percentage of quotas for all backing pools. - # Should be 100% if correctly configured - total_quota_pt = 0 - for p in caching_pools: - backing_name = p.replace('-cache', '') - total_quota_pt += stats[backing_name]['quota_pt'] - LOG.debug("Total quota pt is: %s" % total_quota_pt) - - # Normalize quota pt to 100% (or real_100pt) - if total_quota_pt != 0: # to avoid divide by zero - for p in caching_pools: - backing_name = p.replace('-cache', '') - stats[backing_name]['quota_pt'] = \ - (stats[backing_name]['quota_pt'] * - (real_100pt / total_quota_pt)) - - # Do not allow quota to be 0 for any pool - total = 0 - for p in caching_pools: - backing_name = p.replace('-cache', '') - if stats[backing_name]['quota_pt'] == 0: - stats[backing_name]['quota_pt'] = \ - constants.CACHE_TIERING_MIN_QUOTA - total += stats[backing_name]['quota_pt'] - LOG.debug("Quota after normalization for %s is: %s:" % - (p, stats[backing_name]['quota_pt'])) - - if total > 100: - # Supplementary protection, we really have to avoid going above - # 100%. Note that real_100pt is less than 100% but we still got - # more than 100! - LOG.warn("Total sum of quotas should not go above 100% " - "but is: %s, recalculating in next cycle" % total) - return - LOG.debug("Total sum of quotas is %s pt" % total) - - # Get current target_max_bytes. We cache it to reduce requests - # to ceph-rest-api. We are the ones changing it, so not an issue. - for p in caching_pools: - if p not in self.target_max_bytes: - try: - value = ceph.osd_get_pool_param(self.service.ceph_api, p, - constants.TARGET_MAX_BYTES) - except exception.CephPoolGetParamFailure as e: - LOG.warn(e) - return - self.target_max_bytes[p] = value - LOG.debug("Existing target_max_bytes got from " - "Ceph: %s" % self.target_max_bytes) - - # Set TARGET_MAX_BYTES - LOG.debug("storage_tier_size: %s " - "cache_tier_size: %s" % (storage_tier_size, - cache_tier_size)) - for p in caching_pools: - backing_name = p.replace('-cache', '') - s = stats[backing_name] - target_max_bytes = math.floor(s['quota_pt'] * cache_tier_size / - 100.0) - target_max_bytes = int(target_max_bytes) - LOG.debug("New Target max bytes of pool: %s is: %s B" % ( - p, target_max_bytes)) - - # Set the new target_max_bytes only if it changed - if self.target_max_bytes.get(p) == target_max_bytes: - LOG.debug("Target max bytes of pool: %s " - "is already updated" % p) - continue - try: - ceph.osd_set_pool_param(self.service.ceph_api, p, - constants.TARGET_MAX_BYTES, - target_max_bytes) - self.target_max_bytes[p] = target_max_bytes - except exception.CephPoolSetParamFailure as e: - LOG.warn(e) - continue - return diff --git a/ceph-manager/ceph-manager/ceph_manager/ceph.py b/ceph-manager/ceph-manager/ceph_manager/ceph.py index dff3c8ab..a143b577 100644 --- a/ceph-manager/ceph-manager/ceph_manager/ceph.py +++ b/ceph-manager/ceph-manager/ceph_manager/ceph.py @@ -73,15 +73,10 @@ def osd_pool_exists(ceph_api, pool_name): def osd_pool_create(ceph_api, pool_name, pg_num, pgp_num): - if pool_name.endswith("-cache"): - # ruleset 1: is the ruleset for the cache tier - # Name: cache_tier_ruleset - ruleset = 1 - else: - # ruleset 0: is the default ruleset if no crushmap is loaded or - # the ruleset for the backing tier if loaded: - # Name: storage_tier_ruleset - ruleset = 0 + # ruleset 0: is the default ruleset if no crushmap is loaded or + # the ruleset for the backing tier if loaded: + # Name: storage_tier_ruleset + ruleset = 0 response, body = ceph_api.osd_pool_create( pool_name, pg_num, pgp_num, pool_type="replicated", ruleset=ruleset, body='json') diff --git a/ceph-manager/ceph-manager/ceph_manager/constants.py b/ceph-manager/ceph-manager/ceph_manager/constants.py index 5b297743..ede99b2a 100644 --- a/ceph-manager/ceph-manager/ceph_manager/constants.py +++ b/ceph-manager/ceph-manager/ceph_manager/constants.py @@ -12,31 +12,14 @@ CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL = \ sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER = \ sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER -CEPH_POOLS = sysinv_constants.BACKING_POOLS +CEPH_POOLS = sysinv_constants.CEPH_POOLS CEPH_REPLICATION_FACTOR = sysinv_constants.CEPH_REPLICATION_FACTOR_DEFAULT -SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM = \ - sysinv_constants.SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM -CACHE_TIERING_DEFAULTS = sysinv_constants.CACHE_TIERING_DEFAULTS -TARGET_MAX_BYTES = \ - sysinv_constants.SERVICE_PARAM_CEPH_CACHE_TIER_TARGET_MAX_BYTES - -# Cache tiering section shortener -CACHE_TIERING = \ - sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER -CACHE_TIERING_DESIRED = \ - sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED -CACHE_TIERING_APPLIED = \ - sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED -CACHE_TIERING_SECTIONS = \ - [CACHE_TIERING, CACHE_TIERING_DESIRED, CACHE_TIERING_APPLIED] # Cache flush parameters CACHE_FLUSH_OBJECTS_THRESHOLD = 1000 CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC = 1 CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC = 128 -CACHE_TIERING_MIN_QUOTA = 5 - FM_ALARM_REASON_MAX_SIZE = 256 # TODO this will later change based on parsed health diff --git a/ceph-manager/ceph-manager/ceph_manager/exception.py b/ceph-manager/ceph-manager/ceph_manager/exception.py index c2d81b8b..3ef07825 100644 --- a/ceph-manager/ceph-manager/ceph_manager/exception.py +++ b/ceph-manager/ceph-manager/ceph_manager/exception.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2016-2017 Wind River Systems, Inc. +# Copyright (c) 2016-2018 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -54,27 +54,6 @@ class CephPoolRulesetFailure(CephManagerException): "pool %(name)s failed: %(reason)s") -class CephPoolAddTierFailure(CephManagerException): - message = _("Failed to add OSD tier: " - "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " - "response=%(response_status_code)s:%(response_reason)s, " - "status=%(status)s, output=%(output)s") - - -class CephPoolRemoveTierFailure(CephManagerException): - message = _("Failed to remove tier: " - "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " - "response=%(response_status_code)s:%(response_reason)s, " - "status=%(status)s, output=%(output)s") - - -class CephCacheSetModeFailure(CephManagerException): - message = _("Failed to set OSD tier cache mode: " - "cache_pool=%(cache_pool)s, mode=%(mode)s, " - "response=%(response_status_code)s:%(response_reason)s, " - "status=%(status)s, output=%(output)s") - - class CephPoolSetParamFailure(CephManagerException): message = _("Cannot set Ceph OSD pool parameter: " "pool_name=%(pool_name)s, param=%(param)s, value=%(value)s. " @@ -87,37 +66,6 @@ class CephPoolGetParamFailure(CephManagerException): "Reason: %(reason)s") -class CephCacheCreateOverlayFailure(CephManagerException): - message = _("Failed to create overlay: " - "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " - "response=%(response_status_code)s:%(response_reason)s, " - "status=%(status)s, output=%(output)s") - - -class CephCacheDeleteOverlayFailure(CephManagerException): - message = _("Failed to delete overlay: " - "backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, " - "response=%(response_status_code)s:%(response_reason)s, " - "status=%(status)s, output=%(output)s") - - -class CephCacheFlushFailure(CephManagerException): - message = _("Failed to flush cache pool: " - "cache_pool=%(cache_pool)s, " - "return_code=%(return_code)s, " - "cmd=%(cmd)s, output=%(output)s") - - -class CephCacheEnableFailure(CephManagerException): - message = _("Cannot enable Ceph cache tier. " - "Reason: cache tiering operation in progress.") - - -class CephCacheDisableFailure(CephManagerException): - message = _("Cannot disable Ceph cache tier. " - "Reason: cache tiering operation in progress.") - - class CephSetKeyFailure(CephManagerException): message = _("Error setting the Ceph flag " "'%(flag)s' %(extra)s: " diff --git a/ceph-manager/ceph-manager/ceph_manager/monitor.py b/ceph-manager/ceph-manager/ceph_manager/monitor.py index 51308240..c0960fbd 100644 --- a/ceph-manager/ceph-manager/ceph_manager/monitor.py +++ b/ceph-manager/ceph-manager/ceph_manager/monitor.py @@ -13,8 +13,6 @@ from fm_api import constants as fm_constants # noinspection PyUnresolvedReferences from oslo_log import log as logging -from sysinv.conductor.cache_tiering_service_config import ServiceConfig - # noinspection PyProtectedMember from i18n import _, _LI, _LW, _LE @@ -155,7 +153,6 @@ class Monitor(HandleUpgradesMixin): def __init__(self, service): self.service = service self.current_ceph_health = "" - self.cache_enabled = False self.tiers_size = {} self.known_object_pool_name = None self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[ @@ -164,20 +161,8 @@ class Monitor(HandleUpgradesMixin): super(Monitor, self).__init__(service) def setup(self, config): - self.set_caching_tier_config(config) super(Monitor, self).setup(config) - def set_caching_tier_config(self, config): - conf = ServiceConfig().from_dict( - config.get(constants.CACHE_TIERING_APPLIED)) - if conf: - self.cache_enabled = conf.cache_enabled - - def monitor_check_cache_tier(self, enable_flag): - LOG.info(_LI("monitor_check_cache_tier: " - "enable_flag={}".format(enable_flag))) - self.cache_enabled = enable_flag - def run(self): # Wait until Ceph cluster is up and we can get the fsid while True: @@ -262,11 +247,6 @@ class Monitor(HandleUpgradesMixin): # Check the quotas on each tier for tier in self.tiers_size: - # TODO(rchurch): For R6 remove the tier from the default crushmap - # and remove this check. No longer supporting this tier in R5 - if tier == 'cache-tier': - continue - # Extract the tier name from the crush equivalent tier_name = tier[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)] @@ -601,9 +581,6 @@ class Monitor(HandleUpgradesMixin): self._check_storage_tier(osd_tree, "storage-tier", lambda *args: alarms.append(args)) - if self.cache_enabled: - self._check_storage_tier(osd_tree, "cache-tier", - lambda *args: alarms.append(args)) old_alarms = {} for alarm_id in [ diff --git a/ceph-manager/ceph-manager/ceph_manager/server.py b/ceph-manager/ceph-manager/ceph_manager/server.py index 9403a7c2..c8b96a72 100644 --- a/ceph-manager/ceph-manager/ceph_manager/server.py +++ b/ceph-manager/ceph-manager/ceph_manager/server.py @@ -27,13 +27,10 @@ from oslo_service.periodic_task import PeriodicTasks # noinspection PyUnresolvedReferences from oslo_service import loopingcall -from sysinv.conductor.cache_tiering_service_config import ServiceConfig - # noinspection PyUnresolvedReferences from cephclient import wrapper from monitor import Monitor -from cache_tiering import CacheTiering import exception import constants @@ -61,34 +58,6 @@ class RpcEndpoint(PeriodicTasks): def __init__(self, service=None): self.service = service - def cache_tiering_enable_cache(self, _, new_config, applied_config): - LOG.info(_LI("Enabling cache")) - try: - self.service.cache_tiering.enable_cache( - new_config, applied_config) - except exception.CephManagerException as e: - self.service.sysinv_conductor.call( - {}, 'cache_tiering_enable_cache_complete', - success=False, exception=str(e.message), - new_config=new_config, applied_config=applied_config) - - def cache_tiering_disable_cache(self, _, new_config, applied_config): - LOG.info(_LI("Disabling cache")) - try: - self.service.cache_tiering.disable_cache( - new_config, applied_config) - except exception.CephManagerException as e: - self.service.sysinv_conductor.call( - {}, 'cache_tiering_disable_cache_complete', - success=False, exception=str(e.message), - new_config=new_config, applied_config=applied_config) - - def cache_tiering_operation_in_progress(self, _): - is_locked = self.service.cache_tiering.is_locked() - LOG.info(_LI("Cache tiering operation " - "is in progress: %s") % str(is_locked).lower()) - return is_locked - def get_primary_tier_size(self, _): """Get the ceph size for the primary tier. @@ -163,7 +132,6 @@ class Service(SysinvConductorUpgradeApi, service.Service): self.entity_instance_id = '' self.fm_api = fm_api.FaultAPIs() self.monitor = Monitor(self) - self.cache_tiering = CacheTiering(self) self.config = None self.config_desired = None self.config_applied = None @@ -181,8 +149,6 @@ class Service(SysinvConductorUpgradeApi, service.Service): # Get initial config from sysinv and send it to # services that need it before starting them - config = self.get_caching_tier_config() - self.monitor.setup(config) self.rpc_server = messaging.get_rpc_server( transport, messaging.Target(topic=constants.CEPH_MANAGER_TOPIC, @@ -190,37 +156,7 @@ class Service(SysinvConductorUpgradeApi, service.Service): [RpcEndpoint(self)], executor='eventlet') self.rpc_server.start() - self.cache_tiering.set_initial_config(config) eventlet.spawn_n(self.monitor.run) - periodic = loopingcall.FixedIntervalLoopingCall( - self.update_ceph_target_max_bytes) - periodic.start(interval=300) - - def get_caching_tier_config(self): - LOG.info("Getting cache tiering configuration from sysinv") - while True: - # Get initial configuration from sysinv, - # retry until sysinv starts - try: - cctxt = self.sysinv_conductor.prepare(timeout=2) - config = cctxt.call({}, 'cache_tiering_get_config') - for section in config: - if section == constants.CACHE_TIERING: - self.config = ServiceConfig().from_dict( - config[section]) - elif section == constants.CACHE_TIERING_DESIRED: - self.config_desired = ServiceConfig().from_dict( - config[section]) - elif section == constants.CACHE_TIERING_APPLIED: - self.config_applied = ServiceConfig().from_dict( - config[section]) - LOG.info("Cache tiering configs: {}".format(config)) - return config - except Exception as ex: - # In production we should retry on every error until connection - # is reestablished. - LOG.warn("Getting cache tiering configuration failed " - "with: {}. Retrying... ".format(str(ex))) def stop(self): try: @@ -230,13 +166,6 @@ class Service(SysinvConductorUpgradeApi, service.Service): pass super(Service, self).stop() - def update_ceph_target_max_bytes(self): - try: - self.cache_tiering.update_cache_target_max_bytes() - except Exception as ex: - LOG.exception("Updating Ceph target max bytes failed " - "with: {} retrying on next cycle.".format(str(ex))) - def run_service(): CONF(sys.argv[1:]) diff --git a/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py b/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py deleted file mode 100644 index 2fd26519..00000000 --- a/ceph-manager/ceph-manager/ceph_manager/tests/test_cache_flush.py +++ /dev/null @@ -1,309 +0,0 @@ -# -# Copyright (c) 2016 Wind River Systems, Inc. -# -# SPDX-License-Identifier: Apache-2.0 -# - -import unittest -import mock - -import subprocess -import math - -from ..cache_tiering import CacheTiering -from ..cache_tiering import LOG as CT_LOG -from ..constants import CACHE_FLUSH_OBJECTS_THRESHOLD -from ..constants import CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC as MIN_WAIT -from ..constants import CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC as MAX_WAIT -from ..exception import CephCacheFlushFailure - - -class TestCacheFlush(unittest.TestCase): - - def setUp(self): - self.service = mock.Mock() - self.ceph_api = mock.Mock() - self.service.ceph_api = self.ceph_api - self.cache_tiering = CacheTiering(self.service) - - @mock.patch('subprocess.check_call') - def test_set_param_fail(self, mock_proc_call): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=False, status_code=500, reason='denied'), - {}) - self.cache_tiering.cache_flush({'pool_name': 'test'}) - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('subprocess.check_call') - def test_df_fail(self, mock_proc_call): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.return_value = ( - mock.Mock(ok=False, status_code=500, reason='denied'), - {}) - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('subprocess.check_call') - def test_rados_evict_fail_raises(self, mock_proc_call): - mock_proc_call.side_effect = subprocess.CalledProcessError(1, ['cmd']) - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=False, status_code=500, reason='denied'), - {}) - self.assertRaises(CephCacheFlushFailure, - self.cache_tiering.cache_flush, - {'pool_name': 'test'}) - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('subprocess.check_call') - def test_df_missing_pool(self, mock_proc_call): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'rbd', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': 0}}]}, - 'status': 'OK'}) - with mock.patch.object(CT_LOG, 'warn') as mock_lw: - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.df.assert_called_once_with(body='json') - for c in mock_lw.call_args_list: - if 'Missing pool free space' in c[0][0]: - break - else: - self.fail('expected log warning') - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('subprocess.check_call') - def test_df_objects_empty(self, mock_proc_call): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': 0}}]}, - 'status': 'OK'}) - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.df.assert_called_once_with(body='json') - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('time.sleep') - @mock.patch('subprocess.check_call') - def test_df_objects_above_threshold(self, mock_proc_call, mock_time_sleep): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.side_effect = [ - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': CACHE_FLUSH_OBJECTS_THRESHOLD}}]}, - 'status': 'OK'}), - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]}, - 'status': 'OK'})] - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - self.ceph_api.df.assert_called_with(body='json') - mock_time_sleep.assert_called_once_with(MIN_WAIT) - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('time.sleep') - @mock.patch('subprocess.check_call') - def test_df_objects_interval_increase(self, mock_proc_call, - mock_time_sleep): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.side_effect = [ - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, - 'status': 'OK'}), - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, - 'status': 'OK'}), - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, - 'status': 'OK'}), - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]}, - 'status': 'OK'})] - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - self.ceph_api.df.assert_called_with(body='json') - self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list], - [MIN_WAIT, - MIN_WAIT * 2, - MIN_WAIT * 4]) - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('time.sleep') - @mock.patch('subprocess.check_call') - def test_df_objects_allways_over_threshold(self, mock_proc_call, - mock_time_sleep): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, - 'status': 'OK'}) - # noinspection PyTypeChecker - mock_time_sleep.side_effect = \ - [None]*int(math.ceil(math.log(float(MAX_WAIT)/MIN_WAIT, 2)) + 1) \ - + [Exception('too many sleeps')] - self.cache_tiering.cache_flush({'pool_name': 'test'}) - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - self.ceph_api.df.assert_called_with(body='json') - expected_sleep = [] - interval = MIN_WAIT - while interval <= MAX_WAIT: - expected_sleep.append(interval) - interval *= 2 - self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list], - expected_sleep) - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all']) - - @mock.patch('time.sleep') - @mock.patch('subprocess.check_call') - def test_df_objects_increase(self, mock_proc_call, mock_time_sleep): - self.ceph_api.osd_set_pool_param = mock.Mock() - self.ceph_api.osd_set_pool_param.return_value = ( - mock.Mock(ok=True, status_code=200, reason='OK'), - {}) - self.ceph_api.df = mock.Mock() - self.ceph_api.df.side_effect = [ - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]}, - 'status': 'OK'}), - (mock.Mock(ok=True, status_code=200, reason='OK'), - {'output': { - 'pools': [ - {'id': 0, - 'name': 'test-cache', - 'stats': {'bytes_used': 0, - 'kb_used': 0, - 'max_avail': 9588428800, - 'objects': - CACHE_FLUSH_OBJECTS_THRESHOLD + 2}}]}, - 'status': 'OK'})] - with mock.patch.object(CT_LOG, 'warn') as mock_lw: - self.cache_tiering.cache_flush({'pool_name': 'test'}) - for c in mock_lw.call_args_list: - if 'Unexpected increase' in c[0][0]: - break - else: - self.fail('expected log warning') - self.ceph_api.df.assert_called_with(body='json') - mock_time_sleep.assert_called_once_with(MIN_WAIT) - self.ceph_api.osd_set_pool_param.assert_called_once_with( - 'test-cache', 'target_max_objects', 1, force=None, body='json') - mock_proc_call.assert_called_with( - ['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])