Merge "Remove Ceph Cache Tiering"
This commit is contained in:
commit
3fb3dabc92
|
@ -1,705 +0,0 @@
|
||||||
#
|
|
||||||
# Copyright (c) 2016 Wind River Systems, Inc.
|
|
||||||
#
|
|
||||||
# SPDX-License-Identifier: Apache-2.0
|
|
||||||
#
|
|
||||||
|
|
||||||
import copy
|
|
||||||
import contextlib
|
|
||||||
import functools
|
|
||||||
import math
|
|
||||||
import subprocess
|
|
||||||
import time
|
|
||||||
import traceback
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
import eventlet
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
from eventlet.semaphore import Semaphore
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
from oslo_log import log as logging
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
|
|
||||||
|
|
||||||
from i18n import _LI, _LW, _LE
|
|
||||||
|
|
||||||
import constants
|
|
||||||
import exception
|
|
||||||
import ceph
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
CEPH_POOLS = copy.deepcopy(constants.CEPH_POOLS)
|
|
||||||
|
|
||||||
MAX_WAIT = constants.CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC
|
|
||||||
MIN_WAIT = constants.CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC
|
|
||||||
|
|
||||||
|
|
||||||
class LockOwnership(object):
|
|
||||||
def __init__(self, sem):
|
|
||||||
self.sem = sem
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def __call__(self):
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
if self.sem:
|
|
||||||
self.sem.release()
|
|
||||||
|
|
||||||
def transfer(self):
|
|
||||||
new_lo = LockOwnership(self.sem)
|
|
||||||
self.sem = None
|
|
||||||
return new_lo
|
|
||||||
|
|
||||||
|
|
||||||
class Lock(object):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.sem = Semaphore(value=1)
|
|
||||||
|
|
||||||
def try_lock(self):
|
|
||||||
result = self.sem.acquire(blocking=False)
|
|
||||||
if result:
|
|
||||||
return LockOwnership(self.sem)
|
|
||||||
|
|
||||||
|
|
||||||
class CacheTiering(object):
|
|
||||||
|
|
||||||
def __init__(self, service):
|
|
||||||
self.service = service
|
|
||||||
self.lock = Lock()
|
|
||||||
# will be unlocked by set_initial_config()
|
|
||||||
self._init_config_lock = self.lock.try_lock()
|
|
||||||
self.config = None
|
|
||||||
self.config_desired = None
|
|
||||||
self.config_applied = None
|
|
||||||
self.target_max_bytes = {}
|
|
||||||
|
|
||||||
def set_initial_config(self, config):
|
|
||||||
with self._init_config_lock():
|
|
||||||
LOG.info("Setting Ceph cache tiering initial configuration")
|
|
||||||
self.config = ServiceConfig.from_dict(
|
|
||||||
config.get(constants.CACHE_TIERING, {})) or \
|
|
||||||
ServiceConfig()
|
|
||||||
self.config_desired = ServiceConfig.from_dict(
|
|
||||||
config.get(constants.CACHE_TIERING_DESIRED, {})) or \
|
|
||||||
ServiceConfig()
|
|
||||||
self.config_applied = ServiceConfig.from_dict(
|
|
||||||
config.get(constants.CACHE_TIERING_APPLIED, {})) or \
|
|
||||||
ServiceConfig()
|
|
||||||
if self.config_desired:
|
|
||||||
LOG.debug("set_initial_config config_desired %s " %
|
|
||||||
self.config_desired.to_dict())
|
|
||||||
if self.config_applied:
|
|
||||||
LOG.debug("set_initial_config config_applied %s " %
|
|
||||||
self.config_applied.to_dict())
|
|
||||||
|
|
||||||
# Check that previous caching tier operation completed
|
|
||||||
# successfully or perform recovery
|
|
||||||
if (self.config_desired and
|
|
||||||
self.config_applied and
|
|
||||||
(self.config_desired.cache_enabled !=
|
|
||||||
self.config_applied.cache_enabled)):
|
|
||||||
if self.config_desired.cache_enabled:
|
|
||||||
self.enable_cache(self.config_desired.to_dict(),
|
|
||||||
self.config_applied.to_dict(),
|
|
||||||
self._init_config_lock.transfer())
|
|
||||||
else:
|
|
||||||
self.disable_cache(self.config_desired.to_dict(),
|
|
||||||
self.config_applied.to_dict(),
|
|
||||||
self._init_config_lock.transfer())
|
|
||||||
|
|
||||||
def is_locked(self):
|
|
||||||
lock_ownership = self.lock.try_lock()
|
|
||||||
if not lock_ownership:
|
|
||||||
return True
|
|
||||||
with lock_ownership():
|
|
||||||
return False
|
|
||||||
|
|
||||||
def update_pools_info(self):
|
|
||||||
global CEPH_POOLS
|
|
||||||
cfg = self.service.sysinv_conductor.call(
|
|
||||||
{}, 'get_ceph_pools_config')
|
|
||||||
CEPH_POOLS = copy.deepcopy(cfg)
|
|
||||||
LOG.info(_LI("update_pools_info: pools: {}").format(CEPH_POOLS))
|
|
||||||
|
|
||||||
def enable_cache(self, new_config, applied_config, lock_ownership=None):
|
|
||||||
new_config = ServiceConfig.from_dict(new_config)
|
|
||||||
applied_config = ServiceConfig.from_dict(applied_config)
|
|
||||||
if not lock_ownership:
|
|
||||||
lock_ownership = self.lock.try_lock()
|
|
||||||
if not lock_ownership:
|
|
||||||
raise exception.CephCacheEnableFailure()
|
|
||||||
with lock_ownership():
|
|
||||||
eventlet.spawn(self.do_enable_cache,
|
|
||||||
new_config, applied_config,
|
|
||||||
lock_ownership.transfer())
|
|
||||||
|
|
||||||
def do_enable_cache(self, new_config, applied_config, lock_ownership):
|
|
||||||
LOG.info(_LI("cache_tiering_enable_cache: "
|
|
||||||
"new_config={}, applied_config={}").format(
|
|
||||||
new_config.to_dict(), applied_config.to_dict()))
|
|
||||||
_unwind_actions = []
|
|
||||||
with lock_ownership():
|
|
||||||
success = False
|
|
||||||
_exception = None
|
|
||||||
try:
|
|
||||||
self.config_desired.cache_enabled = True
|
|
||||||
self.update_pools_info()
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
self.cache_pool_create(pool)
|
|
||||||
_unwind_actions.append(
|
|
||||||
functools.partial(self.cache_pool_delete, pool))
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
self.cache_tier_add(pool)
|
|
||||||
_unwind_actions.append(
|
|
||||||
functools.partial(self.cache_tier_remove, pool))
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
self.cache_mode_set(pool, 'writeback')
|
|
||||||
self.cache_pool_set_config(pool, new_config)
|
|
||||||
self.cache_overlay_create(pool)
|
|
||||||
success = True
|
|
||||||
except Exception as e:
|
|
||||||
LOG.error(_LE('Failed to enable cache: reason=%s') %
|
|
||||||
traceback.format_exc())
|
|
||||||
for action in reversed(_unwind_actions):
|
|
||||||
try:
|
|
||||||
action()
|
|
||||||
except Exception:
|
|
||||||
LOG.warn(_LW('Failed cache enable '
|
|
||||||
'unwind action: reason=%s') %
|
|
||||||
traceback.format_exc())
|
|
||||||
success = False
|
|
||||||
_exception = str(e)
|
|
||||||
finally:
|
|
||||||
self.service.monitor.monitor_check_cache_tier(success)
|
|
||||||
if success:
|
|
||||||
self.config_applied.cache_enabled = True
|
|
||||||
self.service.sysinv_conductor.call(
|
|
||||||
{}, 'cache_tiering_enable_cache_complete',
|
|
||||||
success=success, exception=_exception,
|
|
||||||
new_config=new_config.to_dict(),
|
|
||||||
applied_config=applied_config.to_dict())
|
|
||||||
# Run first update of periodic target_max_bytes
|
|
||||||
self.update_cache_target_max_bytes()
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def ignore_ceph_failure(self):
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
except exception.CephManagerException:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def disable_cache(self, new_config, applied_config, lock_ownership=None):
|
|
||||||
new_config = ServiceConfig.from_dict(new_config)
|
|
||||||
applied_config = ServiceConfig.from_dict(applied_config)
|
|
||||||
if not lock_ownership:
|
|
||||||
lock_ownership = self.lock.try_lock()
|
|
||||||
if not lock_ownership:
|
|
||||||
raise exception.CephCacheDisableFailure()
|
|
||||||
with lock_ownership():
|
|
||||||
eventlet.spawn(self.do_disable_cache,
|
|
||||||
new_config, applied_config,
|
|
||||||
lock_ownership.transfer())
|
|
||||||
|
|
||||||
def do_disable_cache(self, new_config, applied_config, lock_ownership):
|
|
||||||
LOG.info(_LI("cache_tiering_disable_cache: "
|
|
||||||
"new_config={}, applied_config={}").format(
|
|
||||||
new_config, applied_config))
|
|
||||||
with lock_ownership():
|
|
||||||
success = False
|
|
||||||
_exception = None
|
|
||||||
try:
|
|
||||||
self.config_desired.cache_enabled = False
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
with self.ignore_ceph_failure():
|
|
||||||
self.cache_mode_set(
|
|
||||||
pool, 'forward')
|
|
||||||
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
retries_left = 3
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
self.cache_flush(pool)
|
|
||||||
break
|
|
||||||
except exception.CephCacheFlushFailure:
|
|
||||||
retries_left -= 1
|
|
||||||
if not retries_left:
|
|
||||||
# give up
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
time.sleep(1)
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
with self.ignore_ceph_failure():
|
|
||||||
self.cache_overlay_delete(pool)
|
|
||||||
self.cache_tier_remove(pool)
|
|
||||||
for pool in CEPH_POOLS:
|
|
||||||
if (pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL or
|
|
||||||
pool['pool_name'] ==
|
|
||||||
constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER):
|
|
||||||
object_pool_name = \
|
|
||||||
self.service.monitor._get_object_pool_name()
|
|
||||||
pool['pool_name'] = object_pool_name
|
|
||||||
|
|
||||||
with self.ignore_ceph_failure():
|
|
||||||
self.cache_pool_delete(pool)
|
|
||||||
success = True
|
|
||||||
except Exception as e:
|
|
||||||
LOG.warn(_LE('Failed to disable cache: reason=%s') %
|
|
||||||
traceback.format_exc())
|
|
||||||
_exception = str(e)
|
|
||||||
finally:
|
|
||||||
self.service.monitor.monitor_check_cache_tier(False)
|
|
||||||
if success:
|
|
||||||
self.config_desired.cache_enabled = False
|
|
||||||
self.config_applied.cache_enabled = False
|
|
||||||
self.service.sysinv_conductor.call(
|
|
||||||
{}, 'cache_tiering_disable_cache_complete',
|
|
||||||
success=success, exception=_exception,
|
|
||||||
new_config=new_config.to_dict(),
|
|
||||||
applied_config=applied_config.to_dict())
|
|
||||||
|
|
||||||
def get_pool_pg_num(self, pool_name):
|
|
||||||
return self.service.sysinv_conductor.call(
|
|
||||||
{}, 'get_pool_pg_num',
|
|
||||||
pool_name=pool_name)
|
|
||||||
|
|
||||||
def cache_pool_create(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
pg_num = self.get_pool_pg_num(cache_pool)
|
|
||||||
if not ceph.osd_pool_exists(self.service.ceph_api, cache_pool):
|
|
||||||
ceph.osd_pool_create(
|
|
||||||
self.service.ceph_api, cache_pool,
|
|
||||||
pg_num, pg_num)
|
|
||||||
|
|
||||||
def cache_pool_delete(self, pool):
|
|
||||||
cache_pool = pool['pool_name'] + '-cache'
|
|
||||||
ceph.osd_pool_delete(
|
|
||||||
self.service.ceph_api, cache_pool)
|
|
||||||
|
|
||||||
def cache_tier_add(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
response, body = self.service.ceph_api.osd_tier_add(
|
|
||||||
backing_pool, cache_pool,
|
|
||||||
force_nonempty="--force-nonempty",
|
|
||||||
body='json')
|
|
||||||
if response.ok:
|
|
||||||
LOG.info(_LI("Added OSD tier: "
|
|
||||||
"backing_pool={}, cache_pool={}").format(
|
|
||||||
backing_pool, cache_pool))
|
|
||||||
else:
|
|
||||||
e = exception.CephPoolAddTierFailure(
|
|
||||||
backing_pool=backing_pool,
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
response_status_code=response.status_code,
|
|
||||||
response_reason=response.reason,
|
|
||||||
status=body.get('status'),
|
|
||||||
output=body.get('output'))
|
|
||||||
LOG.warn(e)
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def cache_tier_remove(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
response, body = self.service.ceph_api.osd_tier_remove(
|
|
||||||
backing_pool, cache_pool, body='json')
|
|
||||||
if response.ok:
|
|
||||||
LOG.info(_LI("Removed OSD tier: "
|
|
||||||
"backing_pool={}, cache_pool={}").format(
|
|
||||||
backing_pool, cache_pool))
|
|
||||||
else:
|
|
||||||
e = exception.CephPoolRemoveTierFailure(
|
|
||||||
backing_pool=backing_pool,
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
response_status_code=response.status_code,
|
|
||||||
response_reason=response.reason,
|
|
||||||
status=body.get('status'),
|
|
||||||
output=body.get('output'))
|
|
||||||
LOG.warn(e)
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def cache_mode_set(self, pool, mode):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
response, body = self.service.ceph_api.osd_tier_cachemode(
|
|
||||||
cache_pool, mode, body='json')
|
|
||||||
if response.ok:
|
|
||||||
LOG.info(_LI("Set OSD tier cache mode: "
|
|
||||||
"cache_pool={}, mode={}").format(cache_pool, mode))
|
|
||||||
else:
|
|
||||||
e = exception.CephCacheSetModeFailure(
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
mode=mode,
|
|
||||||
response_status_code=response.status_code,
|
|
||||||
response_reason=response.reason,
|
|
||||||
status=body.get('status'),
|
|
||||||
output=body.get('output'))
|
|
||||||
LOG.warn(e)
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def cache_pool_set_config(self, pool, config):
|
|
||||||
for name, value in config.params.iteritems():
|
|
||||||
self.cache_pool_set_param(pool, name, value)
|
|
||||||
|
|
||||||
def cache_pool_set_param(self, pool, name, value):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
ceph.osd_set_pool_param(
|
|
||||||
self.service.ceph_api, cache_pool, name, value)
|
|
||||||
|
|
||||||
def cache_overlay_create(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
response, body = self.service.ceph_api.osd_tier_set_overlay(
|
|
||||||
backing_pool, cache_pool, body='json')
|
|
||||||
if response.ok:
|
|
||||||
LOG.info(_LI("Set OSD tier overlay: "
|
|
||||||
"backing_pool={}, cache_pool={}").format(
|
|
||||||
backing_pool, cache_pool))
|
|
||||||
else:
|
|
||||||
e = exception.CephCacheCreateOverlayFailure(
|
|
||||||
backing_pool=backing_pool,
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
response_status_code=response.status_code,
|
|
||||||
response_reason=response.reason,
|
|
||||||
status=body.get('status'),
|
|
||||||
output=body.get('output'))
|
|
||||||
LOG.warn(e)
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def cache_overlay_delete(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = pool['pool_name']
|
|
||||||
response, body = self.service.ceph_api.osd_tier_remove_overlay(
|
|
||||||
backing_pool, body='json')
|
|
||||||
if response.ok:
|
|
||||||
LOG.info(_LI("Removed OSD tier overlay: "
|
|
||||||
"backing_pool={}").format(backing_pool))
|
|
||||||
else:
|
|
||||||
e = exception.CephCacheDeleteOverlayFailure(
|
|
||||||
backing_pool=backing_pool,
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
response_status_code=response.status_code,
|
|
||||||
response_reason=response.reason,
|
|
||||||
status=body.get('status'),
|
|
||||||
output=body.get('output'))
|
|
||||||
LOG.warn(e)
|
|
||||||
raise e
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def rados_cache_flush_evict_all(pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
try:
|
|
||||||
subprocess.check_call(
|
|
||||||
['/usr/bin/rados', '-p', cache_pool, 'cache-flush-evict-all'])
|
|
||||||
LOG.info(_LI("Flushed OSD cache pool:"
|
|
||||||
"cache_pool={}").format(cache_pool))
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
_e = exception.CephCacheFlushFailure(
|
|
||||||
cache_pool=cache_pool,
|
|
||||||
return_code=str(e.returncode),
|
|
||||||
cmd=" ".join(e.cmd),
|
|
||||||
output=e.output)
|
|
||||||
LOG.warn(_e)
|
|
||||||
raise _e
|
|
||||||
|
|
||||||
def cache_flush(self, pool):
|
|
||||||
backing_pool = pool['pool_name']
|
|
||||||
cache_pool = backing_pool + '-cache'
|
|
||||||
try:
|
|
||||||
# set target_max_objects to a small value to force evacuation of
|
|
||||||
# objects from cache before we use rados cache-flush-evict-all
|
|
||||||
# WARNING: assuming cache_pool will be deleted after flush so
|
|
||||||
# we don't have to save/restore the value of target_max_objects
|
|
||||||
#
|
|
||||||
self.cache_pool_set_param(pool, 'target_max_objects', 1)
|
|
||||||
prev_object_count = None
|
|
||||||
wait_interval = MIN_WAIT
|
|
||||||
while True:
|
|
||||||
response, body = self.service.ceph_api.df(body='json')
|
|
||||||
if not response.ok:
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Failed to retrieve cluster free space stats: "
|
|
||||||
"status_code=%d, reason=%s") % (
|
|
||||||
response.status_code, response.reason))
|
|
||||||
break
|
|
||||||
stats = None
|
|
||||||
for s in body['output']['pools']:
|
|
||||||
if s['name'] == cache_pool:
|
|
||||||
stats = s['stats']
|
|
||||||
break
|
|
||||||
if not stats:
|
|
||||||
LOG.warn(_LW("Missing pool free space stats: "
|
|
||||||
"cache_pool=%s") % cache_pool)
|
|
||||||
break
|
|
||||||
object_count = stats['objects']
|
|
||||||
if object_count < constants.CACHE_FLUSH_OBJECTS_THRESHOLD:
|
|
||||||
break
|
|
||||||
if prev_object_count is not None:
|
|
||||||
delta_objects = object_count - prev_object_count
|
|
||||||
if delta_objects > 0:
|
|
||||||
LOG.warn(_LW("Unexpected increase in number "
|
|
||||||
"of objects in cache pool: "
|
|
||||||
"cache_pool=%s, prev_object_count=%d, "
|
|
||||||
"object_count=%d") % (
|
|
||||||
cache_pool, prev_object_count,
|
|
||||||
object_count))
|
|
||||||
break
|
|
||||||
if delta_objects == 0:
|
|
||||||
wait_interval *= 2
|
|
||||||
if wait_interval > MAX_WAIT:
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Cache pool number of objects did not "
|
|
||||||
"decrease: cache_pool=%s, object_count=%d, "
|
|
||||||
"wait_interval=%d") % (
|
|
||||||
cache_pool, object_count, wait_interval))
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
wait_interval = MIN_WAIT
|
|
||||||
time.sleep(wait_interval)
|
|
||||||
prev_object_count = object_count
|
|
||||||
except exception.CephPoolSetParamFailure as e:
|
|
||||||
LOG.warn(e)
|
|
||||||
finally:
|
|
||||||
self.rados_cache_flush_evict_all(pool)
|
|
||||||
|
|
||||||
def update_cache_target_max_bytes(self):
|
|
||||||
"Dynamically compute target_max_bytes of caching pools"
|
|
||||||
|
|
||||||
# Only compute if cache tiering is enabled
|
|
||||||
if self.config_applied and self.config_desired:
|
|
||||||
if (not self.config_desired.cache_enabled or
|
|
||||||
not self.config_applied.cache_enabled):
|
|
||||||
LOG.debug("Cache tiering disabled, no need to update "
|
|
||||||
"target_max_bytes.")
|
|
||||||
return
|
|
||||||
LOG.debug("Updating target_max_bytes")
|
|
||||||
|
|
||||||
# Get available space
|
|
||||||
response, body = self.service.ceph_api.osd_df(body='json',
|
|
||||||
output_method='tree')
|
|
||||||
if not response.ok:
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Failed to retrieve cluster free space stats: "
|
|
||||||
"status_code=%d, reason=%s") % (
|
|
||||||
response.status_code, response.reason))
|
|
||||||
return
|
|
||||||
|
|
||||||
storage_tier_size = 0
|
|
||||||
cache_tier_size = 0
|
|
||||||
|
|
||||||
replication = constants.CEPH_REPLICATION_FACTOR
|
|
||||||
for node in body['output']['nodes']:
|
|
||||||
if node['name'] == 'storage-tier':
|
|
||||||
storage_tier_size = node['kb']*1024/replication
|
|
||||||
elif node['name'] == 'cache-tier':
|
|
||||||
cache_tier_size = node['kb']*1024/replication
|
|
||||||
|
|
||||||
if storage_tier_size == 0 or cache_tier_size == 0:
|
|
||||||
LOG.info("Failed to get cluster size "
|
|
||||||
"(storage_tier_size=%s, cache_tier_size=%s),"
|
|
||||||
"retrying on next cycle" %
|
|
||||||
(storage_tier_size, cache_tier_size))
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get available pools
|
|
||||||
response, body = self.service.ceph_api.osd_lspools(body='json')
|
|
||||||
if not response.ok:
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Failed to retrieve available pools: "
|
|
||||||
"status_code=%d, reason=%s") % (
|
|
||||||
response.status_code, response.reason))
|
|
||||||
return
|
|
||||||
pools = [p['poolname'] for p in body['output']]
|
|
||||||
|
|
||||||
# Separate backing from caching for easy iteration
|
|
||||||
backing_pools = []
|
|
||||||
caching_pools = []
|
|
||||||
for p in pools:
|
|
||||||
if p.endswith('-cache'):
|
|
||||||
caching_pools.append(p)
|
|
||||||
else:
|
|
||||||
backing_pools.append(p)
|
|
||||||
LOG.debug("Pools: caching: %s, backing: %s" % (caching_pools,
|
|
||||||
backing_pools))
|
|
||||||
|
|
||||||
if not len(caching_pools):
|
|
||||||
# We do not have caching pools created yet
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get quota from backing pools that are cached
|
|
||||||
stats = {}
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
stats[backing_name] = {}
|
|
||||||
try:
|
|
||||||
quota = ceph.osd_pool_get_quota(self.service.ceph_api,
|
|
||||||
backing_name)
|
|
||||||
except exception.CephPoolGetQuotaFailure as e:
|
|
||||||
LOG.warn(_LW(
|
|
||||||
"Failed to retrieve quota: "
|
|
||||||
"exception: %s") % str(e))
|
|
||||||
return
|
|
||||||
stats[backing_name]['quota'] = quota['max_bytes']
|
|
||||||
stats[backing_name]['quota_pt'] = (quota['max_bytes']*100.0 /
|
|
||||||
storage_tier_size)
|
|
||||||
LOG.debug("Quota for pool: %s "
|
|
||||||
"is: %s B representing %s pt" %
|
|
||||||
(backing_name,
|
|
||||||
quota['max_bytes'],
|
|
||||||
stats[backing_name]['quota_pt']))
|
|
||||||
|
|
||||||
# target_max_bytes logic:
|
|
||||||
# - For computing target_max_bytes cache_tier_size must be equal than
|
|
||||||
# the sum of target_max_bytes of each caching pool
|
|
||||||
# - target_max_bytes for each caching pool is computed as the
|
|
||||||
# percentage of quota in corresponding backing pool
|
|
||||||
# - the caching tiers has to work at full capacity, so if the sum of
|
|
||||||
# all quotas in the backing tier is different than 100% we need to
|
|
||||||
# normalize
|
|
||||||
# - if the quota is zero for any pool we add CACHE_TIERING_MIN_QUOTA
|
|
||||||
# by default *after* normalization so that we have real minimum
|
|
||||||
|
|
||||||
# We compute the real percentage that need to be normalized after
|
|
||||||
# ensuring that we have CACHE_TIERING_MIN_QUOTA for each pool with
|
|
||||||
# a quota of 0
|
|
||||||
real_100pt = 90.0 # we start from max and decrease it for each 0 pool
|
|
||||||
# Note: We must avoid reaching 100% at all costs! and
|
|
||||||
# cache_target_full_ratio, the Ceph parameter that is supposed to
|
|
||||||
# protect the cluster against this does not work in Ceph v0.94.6!
|
|
||||||
# Therefore a value of 90% is better suited for this
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
if stats[backing_name]['quota_pt'] == 0:
|
|
||||||
real_100pt -= constants.CACHE_TIERING_MIN_QUOTA
|
|
||||||
LOG.debug("Quota before normalization for %s is: %s pt" %
|
|
||||||
(p, stats[backing_name]['quota_pt']))
|
|
||||||
|
|
||||||
# Compute total percentage of quotas for all backing pools.
|
|
||||||
# Should be 100% if correctly configured
|
|
||||||
total_quota_pt = 0
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
total_quota_pt += stats[backing_name]['quota_pt']
|
|
||||||
LOG.debug("Total quota pt is: %s" % total_quota_pt)
|
|
||||||
|
|
||||||
# Normalize quota pt to 100% (or real_100pt)
|
|
||||||
if total_quota_pt != 0: # to avoid divide by zero
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
stats[backing_name]['quota_pt'] = \
|
|
||||||
(stats[backing_name]['quota_pt'] *
|
|
||||||
(real_100pt / total_quota_pt))
|
|
||||||
|
|
||||||
# Do not allow quota to be 0 for any pool
|
|
||||||
total = 0
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
if stats[backing_name]['quota_pt'] == 0:
|
|
||||||
stats[backing_name]['quota_pt'] = \
|
|
||||||
constants.CACHE_TIERING_MIN_QUOTA
|
|
||||||
total += stats[backing_name]['quota_pt']
|
|
||||||
LOG.debug("Quota after normalization for %s is: %s:" %
|
|
||||||
(p, stats[backing_name]['quota_pt']))
|
|
||||||
|
|
||||||
if total > 100:
|
|
||||||
# Supplementary protection, we really have to avoid going above
|
|
||||||
# 100%. Note that real_100pt is less than 100% but we still got
|
|
||||||
# more than 100!
|
|
||||||
LOG.warn("Total sum of quotas should not go above 100% "
|
|
||||||
"but is: %s, recalculating in next cycle" % total)
|
|
||||||
return
|
|
||||||
LOG.debug("Total sum of quotas is %s pt" % total)
|
|
||||||
|
|
||||||
# Get current target_max_bytes. We cache it to reduce requests
|
|
||||||
# to ceph-rest-api. We are the ones changing it, so not an issue.
|
|
||||||
for p in caching_pools:
|
|
||||||
if p not in self.target_max_bytes:
|
|
||||||
try:
|
|
||||||
value = ceph.osd_get_pool_param(self.service.ceph_api, p,
|
|
||||||
constants.TARGET_MAX_BYTES)
|
|
||||||
except exception.CephPoolGetParamFailure as e:
|
|
||||||
LOG.warn(e)
|
|
||||||
return
|
|
||||||
self.target_max_bytes[p] = value
|
|
||||||
LOG.debug("Existing target_max_bytes got from "
|
|
||||||
"Ceph: %s" % self.target_max_bytes)
|
|
||||||
|
|
||||||
# Set TARGET_MAX_BYTES
|
|
||||||
LOG.debug("storage_tier_size: %s "
|
|
||||||
"cache_tier_size: %s" % (storage_tier_size,
|
|
||||||
cache_tier_size))
|
|
||||||
for p in caching_pools:
|
|
||||||
backing_name = p.replace('-cache', '')
|
|
||||||
s = stats[backing_name]
|
|
||||||
target_max_bytes = math.floor(s['quota_pt'] * cache_tier_size /
|
|
||||||
100.0)
|
|
||||||
target_max_bytes = int(target_max_bytes)
|
|
||||||
LOG.debug("New Target max bytes of pool: %s is: %s B" % (
|
|
||||||
p, target_max_bytes))
|
|
||||||
|
|
||||||
# Set the new target_max_bytes only if it changed
|
|
||||||
if self.target_max_bytes.get(p) == target_max_bytes:
|
|
||||||
LOG.debug("Target max bytes of pool: %s "
|
|
||||||
"is already updated" % p)
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
ceph.osd_set_pool_param(self.service.ceph_api, p,
|
|
||||||
constants.TARGET_MAX_BYTES,
|
|
||||||
target_max_bytes)
|
|
||||||
self.target_max_bytes[p] = target_max_bytes
|
|
||||||
except exception.CephPoolSetParamFailure as e:
|
|
||||||
LOG.warn(e)
|
|
||||||
continue
|
|
||||||
return
|
|
|
@ -73,15 +73,10 @@ def osd_pool_exists(ceph_api, pool_name):
|
||||||
|
|
||||||
|
|
||||||
def osd_pool_create(ceph_api, pool_name, pg_num, pgp_num):
|
def osd_pool_create(ceph_api, pool_name, pg_num, pgp_num):
|
||||||
if pool_name.endswith("-cache"):
|
# ruleset 0: is the default ruleset if no crushmap is loaded or
|
||||||
# ruleset 1: is the ruleset for the cache tier
|
# the ruleset for the backing tier if loaded:
|
||||||
# Name: cache_tier_ruleset
|
# Name: storage_tier_ruleset
|
||||||
ruleset = 1
|
ruleset = 0
|
||||||
else:
|
|
||||||
# ruleset 0: is the default ruleset if no crushmap is loaded or
|
|
||||||
# the ruleset for the backing tier if loaded:
|
|
||||||
# Name: storage_tier_ruleset
|
|
||||||
ruleset = 0
|
|
||||||
response, body = ceph_api.osd_pool_create(
|
response, body = ceph_api.osd_pool_create(
|
||||||
pool_name, pg_num, pgp_num, pool_type="replicated",
|
pool_name, pg_num, pgp_num, pool_type="replicated",
|
||||||
ruleset=ruleset, body='json')
|
ruleset=ruleset, body='json')
|
||||||
|
|
|
@ -12,31 +12,14 @@ CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL = \
|
||||||
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL
|
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_JEWEL
|
||||||
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER = \
|
CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER = \
|
||||||
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER
|
sysinv_constants.CEPH_POOL_OBJECT_GATEWAY_NAME_HAMMER
|
||||||
CEPH_POOLS = sysinv_constants.BACKING_POOLS
|
CEPH_POOLS = sysinv_constants.CEPH_POOLS
|
||||||
CEPH_REPLICATION_FACTOR = sysinv_constants.CEPH_REPLICATION_FACTOR_DEFAULT
|
CEPH_REPLICATION_FACTOR = sysinv_constants.CEPH_REPLICATION_FACTOR_DEFAULT
|
||||||
SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM = \
|
|
||||||
sysinv_constants.SERVICE_PARAM_CEPH_CACHE_HIT_SET_TYPE_BLOOM
|
|
||||||
CACHE_TIERING_DEFAULTS = sysinv_constants.CACHE_TIERING_DEFAULTS
|
|
||||||
TARGET_MAX_BYTES = \
|
|
||||||
sysinv_constants.SERVICE_PARAM_CEPH_CACHE_TIER_TARGET_MAX_BYTES
|
|
||||||
|
|
||||||
# Cache tiering section shortener
|
|
||||||
CACHE_TIERING = \
|
|
||||||
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER
|
|
||||||
CACHE_TIERING_DESIRED = \
|
|
||||||
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_DESIRED
|
|
||||||
CACHE_TIERING_APPLIED = \
|
|
||||||
sysinv_constants.SERVICE_PARAM_SECTION_CEPH_CACHE_TIER_APPLIED
|
|
||||||
CACHE_TIERING_SECTIONS = \
|
|
||||||
[CACHE_TIERING, CACHE_TIERING_DESIRED, CACHE_TIERING_APPLIED]
|
|
||||||
|
|
||||||
# Cache flush parameters
|
# Cache flush parameters
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD = 1000
|
CACHE_FLUSH_OBJECTS_THRESHOLD = 1000
|
||||||
CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC = 1
|
CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC = 1
|
||||||
CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC = 128
|
CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC = 128
|
||||||
|
|
||||||
CACHE_TIERING_MIN_QUOTA = 5
|
|
||||||
|
|
||||||
FM_ALARM_REASON_MAX_SIZE = 256
|
FM_ALARM_REASON_MAX_SIZE = 256
|
||||||
|
|
||||||
# TODO this will later change based on parsed health
|
# TODO this will later change based on parsed health
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#
|
#
|
||||||
# Copyright (c) 2016-2017 Wind River Systems, Inc.
|
# Copyright (c) 2016-2018 Wind River Systems, Inc.
|
||||||
#
|
#
|
||||||
# SPDX-License-Identifier: Apache-2.0
|
# SPDX-License-Identifier: Apache-2.0
|
||||||
#
|
#
|
||||||
|
@ -54,27 +54,6 @@ class CephPoolRulesetFailure(CephManagerException):
|
||||||
"pool %(name)s failed: %(reason)s")
|
"pool %(name)s failed: %(reason)s")
|
||||||
|
|
||||||
|
|
||||||
class CephPoolAddTierFailure(CephManagerException):
|
|
||||||
message = _("Failed to add OSD tier: "
|
|
||||||
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
|
|
||||||
"response=%(response_status_code)s:%(response_reason)s, "
|
|
||||||
"status=%(status)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephPoolRemoveTierFailure(CephManagerException):
|
|
||||||
message = _("Failed to remove tier: "
|
|
||||||
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
|
|
||||||
"response=%(response_status_code)s:%(response_reason)s, "
|
|
||||||
"status=%(status)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephCacheSetModeFailure(CephManagerException):
|
|
||||||
message = _("Failed to set OSD tier cache mode: "
|
|
||||||
"cache_pool=%(cache_pool)s, mode=%(mode)s, "
|
|
||||||
"response=%(response_status_code)s:%(response_reason)s, "
|
|
||||||
"status=%(status)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephPoolSetParamFailure(CephManagerException):
|
class CephPoolSetParamFailure(CephManagerException):
|
||||||
message = _("Cannot set Ceph OSD pool parameter: "
|
message = _("Cannot set Ceph OSD pool parameter: "
|
||||||
"pool_name=%(pool_name)s, param=%(param)s, value=%(value)s. "
|
"pool_name=%(pool_name)s, param=%(param)s, value=%(value)s. "
|
||||||
|
@ -87,37 +66,6 @@ class CephPoolGetParamFailure(CephManagerException):
|
||||||
"Reason: %(reason)s")
|
"Reason: %(reason)s")
|
||||||
|
|
||||||
|
|
||||||
class CephCacheCreateOverlayFailure(CephManagerException):
|
|
||||||
message = _("Failed to create overlay: "
|
|
||||||
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
|
|
||||||
"response=%(response_status_code)s:%(response_reason)s, "
|
|
||||||
"status=%(status)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephCacheDeleteOverlayFailure(CephManagerException):
|
|
||||||
message = _("Failed to delete overlay: "
|
|
||||||
"backing_pool=%(backing_pool)s, cache_pool=%(cache_pool)s, "
|
|
||||||
"response=%(response_status_code)s:%(response_reason)s, "
|
|
||||||
"status=%(status)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephCacheFlushFailure(CephManagerException):
|
|
||||||
message = _("Failed to flush cache pool: "
|
|
||||||
"cache_pool=%(cache_pool)s, "
|
|
||||||
"return_code=%(return_code)s, "
|
|
||||||
"cmd=%(cmd)s, output=%(output)s")
|
|
||||||
|
|
||||||
|
|
||||||
class CephCacheEnableFailure(CephManagerException):
|
|
||||||
message = _("Cannot enable Ceph cache tier. "
|
|
||||||
"Reason: cache tiering operation in progress.")
|
|
||||||
|
|
||||||
|
|
||||||
class CephCacheDisableFailure(CephManagerException):
|
|
||||||
message = _("Cannot disable Ceph cache tier. "
|
|
||||||
"Reason: cache tiering operation in progress.")
|
|
||||||
|
|
||||||
|
|
||||||
class CephSetKeyFailure(CephManagerException):
|
class CephSetKeyFailure(CephManagerException):
|
||||||
message = _("Error setting the Ceph flag "
|
message = _("Error setting the Ceph flag "
|
||||||
"'%(flag)s' %(extra)s: "
|
"'%(flag)s' %(extra)s: "
|
||||||
|
|
|
@ -13,8 +13,6 @@ from fm_api import constants as fm_constants
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
|
|
||||||
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
|
|
||||||
|
|
||||||
# noinspection PyProtectedMember
|
# noinspection PyProtectedMember
|
||||||
from i18n import _, _LI, _LW, _LE
|
from i18n import _, _LI, _LW, _LE
|
||||||
|
|
||||||
|
@ -155,7 +153,6 @@ class Monitor(HandleUpgradesMixin):
|
||||||
def __init__(self, service):
|
def __init__(self, service):
|
||||||
self.service = service
|
self.service = service
|
||||||
self.current_ceph_health = ""
|
self.current_ceph_health = ""
|
||||||
self.cache_enabled = False
|
|
||||||
self.tiers_size = {}
|
self.tiers_size = {}
|
||||||
self.known_object_pool_name = None
|
self.known_object_pool_name = None
|
||||||
self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[
|
self.primary_tier_name = constants.SB_TIER_DEFAULT_NAMES[
|
||||||
|
@ -164,20 +161,8 @@ class Monitor(HandleUpgradesMixin):
|
||||||
super(Monitor, self).__init__(service)
|
super(Monitor, self).__init__(service)
|
||||||
|
|
||||||
def setup(self, config):
|
def setup(self, config):
|
||||||
self.set_caching_tier_config(config)
|
|
||||||
super(Monitor, self).setup(config)
|
super(Monitor, self).setup(config)
|
||||||
|
|
||||||
def set_caching_tier_config(self, config):
|
|
||||||
conf = ServiceConfig().from_dict(
|
|
||||||
config.get(constants.CACHE_TIERING_APPLIED))
|
|
||||||
if conf:
|
|
||||||
self.cache_enabled = conf.cache_enabled
|
|
||||||
|
|
||||||
def monitor_check_cache_tier(self, enable_flag):
|
|
||||||
LOG.info(_LI("monitor_check_cache_tier: "
|
|
||||||
"enable_flag={}".format(enable_flag)))
|
|
||||||
self.cache_enabled = enable_flag
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
# Wait until Ceph cluster is up and we can get the fsid
|
# Wait until Ceph cluster is up and we can get the fsid
|
||||||
while True:
|
while True:
|
||||||
|
@ -262,11 +247,6 @@ class Monitor(HandleUpgradesMixin):
|
||||||
|
|
||||||
# Check the quotas on each tier
|
# Check the quotas on each tier
|
||||||
for tier in self.tiers_size:
|
for tier in self.tiers_size:
|
||||||
# TODO(rchurch): For R6 remove the tier from the default crushmap
|
|
||||||
# and remove this check. No longer supporting this tier in R5
|
|
||||||
if tier == 'cache-tier':
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Extract the tier name from the crush equivalent
|
# Extract the tier name from the crush equivalent
|
||||||
tier_name = tier[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)]
|
tier_name = tier[:-len(constants.CEPH_CRUSH_TIER_SUFFIX)]
|
||||||
|
|
||||||
|
@ -601,9 +581,6 @@ class Monitor(HandleUpgradesMixin):
|
||||||
|
|
||||||
self._check_storage_tier(osd_tree, "storage-tier",
|
self._check_storage_tier(osd_tree, "storage-tier",
|
||||||
lambda *args: alarms.append(args))
|
lambda *args: alarms.append(args))
|
||||||
if self.cache_enabled:
|
|
||||||
self._check_storage_tier(osd_tree, "cache-tier",
|
|
||||||
lambda *args: alarms.append(args))
|
|
||||||
|
|
||||||
old_alarms = {}
|
old_alarms = {}
|
||||||
for alarm_id in [
|
for alarm_id in [
|
||||||
|
|
|
@ -27,13 +27,10 @@ from oslo_service.periodic_task import PeriodicTasks
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
from oslo_service import loopingcall
|
from oslo_service import loopingcall
|
||||||
|
|
||||||
from sysinv.conductor.cache_tiering_service_config import ServiceConfig
|
|
||||||
|
|
||||||
# noinspection PyUnresolvedReferences
|
# noinspection PyUnresolvedReferences
|
||||||
from cephclient import wrapper
|
from cephclient import wrapper
|
||||||
|
|
||||||
from monitor import Monitor
|
from monitor import Monitor
|
||||||
from cache_tiering import CacheTiering
|
|
||||||
import exception
|
import exception
|
||||||
import constants
|
import constants
|
||||||
|
|
||||||
|
@ -61,34 +58,6 @@ class RpcEndpoint(PeriodicTasks):
|
||||||
def __init__(self, service=None):
|
def __init__(self, service=None):
|
||||||
self.service = service
|
self.service = service
|
||||||
|
|
||||||
def cache_tiering_enable_cache(self, _, new_config, applied_config):
|
|
||||||
LOG.info(_LI("Enabling cache"))
|
|
||||||
try:
|
|
||||||
self.service.cache_tiering.enable_cache(
|
|
||||||
new_config, applied_config)
|
|
||||||
except exception.CephManagerException as e:
|
|
||||||
self.service.sysinv_conductor.call(
|
|
||||||
{}, 'cache_tiering_enable_cache_complete',
|
|
||||||
success=False, exception=str(e.message),
|
|
||||||
new_config=new_config, applied_config=applied_config)
|
|
||||||
|
|
||||||
def cache_tiering_disable_cache(self, _, new_config, applied_config):
|
|
||||||
LOG.info(_LI("Disabling cache"))
|
|
||||||
try:
|
|
||||||
self.service.cache_tiering.disable_cache(
|
|
||||||
new_config, applied_config)
|
|
||||||
except exception.CephManagerException as e:
|
|
||||||
self.service.sysinv_conductor.call(
|
|
||||||
{}, 'cache_tiering_disable_cache_complete',
|
|
||||||
success=False, exception=str(e.message),
|
|
||||||
new_config=new_config, applied_config=applied_config)
|
|
||||||
|
|
||||||
def cache_tiering_operation_in_progress(self, _):
|
|
||||||
is_locked = self.service.cache_tiering.is_locked()
|
|
||||||
LOG.info(_LI("Cache tiering operation "
|
|
||||||
"is in progress: %s") % str(is_locked).lower())
|
|
||||||
return is_locked
|
|
||||||
|
|
||||||
def get_primary_tier_size(self, _):
|
def get_primary_tier_size(self, _):
|
||||||
"""Get the ceph size for the primary tier.
|
"""Get the ceph size for the primary tier.
|
||||||
|
|
||||||
|
@ -163,7 +132,6 @@ class Service(SysinvConductorUpgradeApi, service.Service):
|
||||||
self.entity_instance_id = ''
|
self.entity_instance_id = ''
|
||||||
self.fm_api = fm_api.FaultAPIs()
|
self.fm_api = fm_api.FaultAPIs()
|
||||||
self.monitor = Monitor(self)
|
self.monitor = Monitor(self)
|
||||||
self.cache_tiering = CacheTiering(self)
|
|
||||||
self.config = None
|
self.config = None
|
||||||
self.config_desired = None
|
self.config_desired = None
|
||||||
self.config_applied = None
|
self.config_applied = None
|
||||||
|
@ -181,8 +149,6 @@ class Service(SysinvConductorUpgradeApi, service.Service):
|
||||||
|
|
||||||
# Get initial config from sysinv and send it to
|
# Get initial config from sysinv and send it to
|
||||||
# services that need it before starting them
|
# services that need it before starting them
|
||||||
config = self.get_caching_tier_config()
|
|
||||||
self.monitor.setup(config)
|
|
||||||
self.rpc_server = messaging.get_rpc_server(
|
self.rpc_server = messaging.get_rpc_server(
|
||||||
transport,
|
transport,
|
||||||
messaging.Target(topic=constants.CEPH_MANAGER_TOPIC,
|
messaging.Target(topic=constants.CEPH_MANAGER_TOPIC,
|
||||||
|
@ -190,37 +156,7 @@ class Service(SysinvConductorUpgradeApi, service.Service):
|
||||||
[RpcEndpoint(self)],
|
[RpcEndpoint(self)],
|
||||||
executor='eventlet')
|
executor='eventlet')
|
||||||
self.rpc_server.start()
|
self.rpc_server.start()
|
||||||
self.cache_tiering.set_initial_config(config)
|
|
||||||
eventlet.spawn_n(self.monitor.run)
|
eventlet.spawn_n(self.monitor.run)
|
||||||
periodic = loopingcall.FixedIntervalLoopingCall(
|
|
||||||
self.update_ceph_target_max_bytes)
|
|
||||||
periodic.start(interval=300)
|
|
||||||
|
|
||||||
def get_caching_tier_config(self):
|
|
||||||
LOG.info("Getting cache tiering configuration from sysinv")
|
|
||||||
while True:
|
|
||||||
# Get initial configuration from sysinv,
|
|
||||||
# retry until sysinv starts
|
|
||||||
try:
|
|
||||||
cctxt = self.sysinv_conductor.prepare(timeout=2)
|
|
||||||
config = cctxt.call({}, 'cache_tiering_get_config')
|
|
||||||
for section in config:
|
|
||||||
if section == constants.CACHE_TIERING:
|
|
||||||
self.config = ServiceConfig().from_dict(
|
|
||||||
config[section])
|
|
||||||
elif section == constants.CACHE_TIERING_DESIRED:
|
|
||||||
self.config_desired = ServiceConfig().from_dict(
|
|
||||||
config[section])
|
|
||||||
elif section == constants.CACHE_TIERING_APPLIED:
|
|
||||||
self.config_applied = ServiceConfig().from_dict(
|
|
||||||
config[section])
|
|
||||||
LOG.info("Cache tiering configs: {}".format(config))
|
|
||||||
return config
|
|
||||||
except Exception as ex:
|
|
||||||
# In production we should retry on every error until connection
|
|
||||||
# is reestablished.
|
|
||||||
LOG.warn("Getting cache tiering configuration failed "
|
|
||||||
"with: {}. Retrying... ".format(str(ex)))
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
try:
|
try:
|
||||||
|
@ -230,13 +166,6 @@ class Service(SysinvConductorUpgradeApi, service.Service):
|
||||||
pass
|
pass
|
||||||
super(Service, self).stop()
|
super(Service, self).stop()
|
||||||
|
|
||||||
def update_ceph_target_max_bytes(self):
|
|
||||||
try:
|
|
||||||
self.cache_tiering.update_cache_target_max_bytes()
|
|
||||||
except Exception as ex:
|
|
||||||
LOG.exception("Updating Ceph target max bytes failed "
|
|
||||||
"with: {} retrying on next cycle.".format(str(ex)))
|
|
||||||
|
|
||||||
|
|
||||||
def run_service():
|
def run_service():
|
||||||
CONF(sys.argv[1:])
|
CONF(sys.argv[1:])
|
||||||
|
|
|
@ -1,309 +0,0 @@
|
||||||
#
|
|
||||||
# Copyright (c) 2016 Wind River Systems, Inc.
|
|
||||||
#
|
|
||||||
# SPDX-License-Identifier: Apache-2.0
|
|
||||||
#
|
|
||||||
|
|
||||||
import unittest
|
|
||||||
import mock
|
|
||||||
|
|
||||||
import subprocess
|
|
||||||
import math
|
|
||||||
|
|
||||||
from ..cache_tiering import CacheTiering
|
|
||||||
from ..cache_tiering import LOG as CT_LOG
|
|
||||||
from ..constants import CACHE_FLUSH_OBJECTS_THRESHOLD
|
|
||||||
from ..constants import CACHE_FLUSH_MIN_WAIT_OBJ_COUNT_DECREASE_SEC as MIN_WAIT
|
|
||||||
from ..constants import CACHE_FLUSH_MAX_WAIT_OBJ_COUNT_DECREASE_SEC as MAX_WAIT
|
|
||||||
from ..exception import CephCacheFlushFailure
|
|
||||||
|
|
||||||
|
|
||||||
class TestCacheFlush(unittest.TestCase):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
self.service = mock.Mock()
|
|
||||||
self.ceph_api = mock.Mock()
|
|
||||||
self.service.ceph_api = self.ceph_api
|
|
||||||
self.cache_tiering = CacheTiering(self.service)
|
|
||||||
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_set_param_fail(self, mock_proc_call):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=False, status_code=500, reason='denied'),
|
|
||||||
{})
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_fail(self, mock_proc_call):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.return_value = (
|
|
||||||
mock.Mock(ok=False, status_code=500, reason='denied'),
|
|
||||||
{})
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_rados_evict_fail_raises(self, mock_proc_call):
|
|
||||||
mock_proc_call.side_effect = subprocess.CalledProcessError(1, ['cmd'])
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=False, status_code=500, reason='denied'),
|
|
||||||
{})
|
|
||||||
self.assertRaises(CephCacheFlushFailure,
|
|
||||||
self.cache_tiering.cache_flush,
|
|
||||||
{'pool_name': 'test'})
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_missing_pool(self, mock_proc_call):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'rbd',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects': 0}}]},
|
|
||||||
'status': 'OK'})
|
|
||||||
with mock.patch.object(CT_LOG, 'warn') as mock_lw:
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.df.assert_called_once_with(body='json')
|
|
||||||
for c in mock_lw.call_args_list:
|
|
||||||
if 'Missing pool free space' in c[0][0]:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.fail('expected log warning')
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_objects_empty(self, mock_proc_call):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects': 0}}]},
|
|
||||||
'status': 'OK'})
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.df.assert_called_once_with(body='json')
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('time.sleep')
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_objects_above_threshold(self, mock_proc_call, mock_time_sleep):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.side_effect = [
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects': CACHE_FLUSH_OBJECTS_THRESHOLD}}]},
|
|
||||||
'status': 'OK'}),
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]},
|
|
||||||
'status': 'OK'})]
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
self.ceph_api.df.assert_called_with(body='json')
|
|
||||||
mock_time_sleep.assert_called_once_with(MIN_WAIT)
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('time.sleep')
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_objects_interval_increase(self, mock_proc_call,
|
|
||||||
mock_time_sleep):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.side_effect = [
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
|
|
||||||
'status': 'OK'}),
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
|
|
||||||
'status': 'OK'}),
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
|
|
||||||
'status': 'OK'}),
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD - 1}}]},
|
|
||||||
'status': 'OK'})]
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
self.ceph_api.df.assert_called_with(body='json')
|
|
||||||
self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list],
|
|
||||||
[MIN_WAIT,
|
|
||||||
MIN_WAIT * 2,
|
|
||||||
MIN_WAIT * 4])
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('time.sleep')
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_objects_allways_over_threshold(self, mock_proc_call,
|
|
||||||
mock_time_sleep):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
|
|
||||||
'status': 'OK'})
|
|
||||||
# noinspection PyTypeChecker
|
|
||||||
mock_time_sleep.side_effect = \
|
|
||||||
[None]*int(math.ceil(math.log(float(MAX_WAIT)/MIN_WAIT, 2)) + 1) \
|
|
||||||
+ [Exception('too many sleeps')]
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
self.ceph_api.df.assert_called_with(body='json')
|
|
||||||
expected_sleep = []
|
|
||||||
interval = MIN_WAIT
|
|
||||||
while interval <= MAX_WAIT:
|
|
||||||
expected_sleep.append(interval)
|
|
||||||
interval *= 2
|
|
||||||
self.assertEqual([c[0][0] for c in mock_time_sleep.call_args_list],
|
|
||||||
expected_sleep)
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
||||||
|
|
||||||
@mock.patch('time.sleep')
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
def test_df_objects_increase(self, mock_proc_call, mock_time_sleep):
|
|
||||||
self.ceph_api.osd_set_pool_param = mock.Mock()
|
|
||||||
self.ceph_api.osd_set_pool_param.return_value = (
|
|
||||||
mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{})
|
|
||||||
self.ceph_api.df = mock.Mock()
|
|
||||||
self.ceph_api.df.side_effect = [
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 1}}]},
|
|
||||||
'status': 'OK'}),
|
|
||||||
(mock.Mock(ok=True, status_code=200, reason='OK'),
|
|
||||||
{'output': {
|
|
||||||
'pools': [
|
|
||||||
{'id': 0,
|
|
||||||
'name': 'test-cache',
|
|
||||||
'stats': {'bytes_used': 0,
|
|
||||||
'kb_used': 0,
|
|
||||||
'max_avail': 9588428800,
|
|
||||||
'objects':
|
|
||||||
CACHE_FLUSH_OBJECTS_THRESHOLD + 2}}]},
|
|
||||||
'status': 'OK'})]
|
|
||||||
with mock.patch.object(CT_LOG, 'warn') as mock_lw:
|
|
||||||
self.cache_tiering.cache_flush({'pool_name': 'test'})
|
|
||||||
for c in mock_lw.call_args_list:
|
|
||||||
if 'Unexpected increase' in c[0][0]:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
self.fail('expected log warning')
|
|
||||||
self.ceph_api.df.assert_called_with(body='json')
|
|
||||||
mock_time_sleep.assert_called_once_with(MIN_WAIT)
|
|
||||||
self.ceph_api.osd_set_pool_param.assert_called_once_with(
|
|
||||||
'test-cache', 'target_max_objects', 1, force=None, body='json')
|
|
||||||
mock_proc_call.assert_called_with(
|
|
||||||
['/usr/bin/rados', '-p', 'test-cache', 'cache-flush-evict-all'])
|
|
Loading…
Reference in New Issue