Deploy state periodical sync
This commit is to allow active controller periodically sending deploy state message to the software agent on its peer controller. The interval is set to 30 seconds. Test Plan: PASS: build and deploy the iso PASS: start new deployment, file is synced in both controllers Task: 49655 Story: 2010676 Change-Id: Ie95c5a7d45b3d88331569ca52d64d40a4f39d6c3 Signed-off-by: junfeng-li <junfeng.li@windriver.com>
This commit is contained in:
parent
306ea5f631
commit
f2a4f93908
|
@ -1219,7 +1219,7 @@ class PatchController(PatchService):
|
||||||
load_import_return = subprocess.run(load_import_cmd,
|
load_import_return = subprocess.run(load_import_cmd,
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
check=False,
|
check=True,
|
||||||
text=True)
|
text=True)
|
||||||
if load_import_return.returncode != 0:
|
if load_import_return.returncode != 0:
|
||||||
local_error += load_import_return.stdout
|
local_error += load_import_return.stdout
|
||||||
|
@ -3058,7 +3058,9 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
global sc
|
global sc
|
||||||
global thread_death
|
global thread_death
|
||||||
|
|
||||||
# LOG.info ("In Main thread")
|
# Send periodic messages to the agents
|
||||||
|
# We only can use one inverval
|
||||||
|
SEND_MSG_INTERVAL_IN_SECONDS = 30.0
|
||||||
|
|
||||||
try:
|
try:
|
||||||
sock_in = sc.setup_socket()
|
sock_in = sc.setup_socket()
|
||||||
|
@ -3083,8 +3085,10 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
sc.socket_lock.release()
|
sc.socket_lock.release()
|
||||||
|
|
||||||
# Send hello every thirty seconds
|
# Send hello every thirty seconds
|
||||||
hello_timeout = time.time() + 30.0
|
hello_timeout = time.time() + SEND_MSG_INTERVAL_IN_SECONDS
|
||||||
remaining = 30
|
# Send deploy state update every thirty seconds
|
||||||
|
deploy_state_update_timeout = time.time() + SEND_MSG_INTERVAL_IN_SECONDS
|
||||||
|
remaining = int(SEND_MSG_INTERVAL_IN_SECONDS)
|
||||||
|
|
||||||
agent_query_conns = []
|
agent_query_conns = []
|
||||||
|
|
||||||
|
@ -3105,8 +3109,8 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
inputs = [sc.sock_in] + agent_query_conns
|
inputs = [sc.sock_in] + agent_query_conns
|
||||||
outputs = []
|
outputs = []
|
||||||
|
|
||||||
# LOG.info("Running select, remaining=%d", remaining)
|
rlist, wlist, xlist = select.select(
|
||||||
rlist, wlist, xlist = select.select(inputs, outputs, inputs, remaining)
|
inputs, outputs, inputs, SEND_MSG_INTERVAL_IN_SECONDS)
|
||||||
|
|
||||||
if (len(rlist) == 0 and
|
if (len(rlist) == 0 and
|
||||||
len(wlist) == 0 and
|
len(wlist) == 0 and
|
||||||
|
@ -3114,7 +3118,6 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
# Timeout hit
|
# Timeout hit
|
||||||
sc.audit_socket()
|
sc.audit_socket()
|
||||||
|
|
||||||
# LOG.info("Checking sockets")
|
|
||||||
for s in rlist:
|
for s in rlist:
|
||||||
data = ''
|
data = ''
|
||||||
addr = None
|
addr = None
|
||||||
|
@ -3214,9 +3217,9 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
stale_hosts.append(ip)
|
stale_hosts.append(ip)
|
||||||
|
|
||||||
remaining = int(hello_timeout - time.time())
|
remaining = int(hello_timeout - time.time())
|
||||||
if remaining <= 0 or remaining > 30:
|
if remaining <= 0 or remaining > int(SEND_MSG_INTERVAL_IN_SECONDS):
|
||||||
hello_timeout = time.time() + 30.0
|
hello_timeout = time.time() + SEND_MSG_INTERVAL_IN_SECONDS
|
||||||
remaining = 30
|
remaining = int(SEND_MSG_INTERVAL_IN_SECONDS)
|
||||||
|
|
||||||
sc.socket_lock.acquire()
|
sc.socket_lock.acquire()
|
||||||
|
|
||||||
|
@ -3250,6 +3253,25 @@ class PatchControllerMainThread(threading.Thread):
|
||||||
sc.interim_state[patch_id].remove(n)
|
sc.interim_state[patch_id].remove(n)
|
||||||
|
|
||||||
sc.hosts_lock.release()
|
sc.hosts_lock.release()
|
||||||
|
|
||||||
|
deploy_state_update_remaining = int(deploy_state_update_timeout - time.time())
|
||||||
|
# Only send the deploy state update from the active controller
|
||||||
|
if deploy_state_update_remaining <= 0 or deploy_state_update_remaining > int(
|
||||||
|
SEND_MSG_INTERVAL_IN_SECONDS):
|
||||||
|
deploy_state_update_timeout = time.time() + SEND_MSG_INTERVAL_IN_SECONDS
|
||||||
|
deploy_state_update_remaining = int(
|
||||||
|
SEND_MSG_INTERVAL_IN_SECONDS)
|
||||||
|
|
||||||
|
# Only send the deploy state update from the active controller
|
||||||
|
if utils.is_active_controller():
|
||||||
|
try:
|
||||||
|
sc.socket_lock.acquire()
|
||||||
|
deploy_state_update = SoftwareMessageDeployStateUpdate()
|
||||||
|
deploy_state_update.send(sc.sock_out)
|
||||||
|
except Exception as e:
|
||||||
|
LOG.exception("Failed to send deploy state update. Error: %s", str(e))
|
||||||
|
finally:
|
||||||
|
sc.socket_lock.release()
|
||||||
except Exception:
|
except Exception:
|
||||||
# Log all exceptions
|
# Log all exceptions
|
||||||
LOG.exception("Error occurred during request processing")
|
LOG.exception("Error occurred during request processing")
|
||||||
|
|
|
@ -424,3 +424,14 @@ def validate_versions(versions):
|
||||||
msg = "Invalid version: %s" % ver
|
msg = "Invalid version: %s" % ver
|
||||||
LOG.exception(msg)
|
LOG.exception(msg)
|
||||||
raise ValueError(msg)
|
raise ValueError(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def is_active_controller():
|
||||||
|
"""
|
||||||
|
Check if a controller is active
|
||||||
|
|
||||||
|
:return: True if the controller is active, False otherwise
|
||||||
|
"""
|
||||||
|
|
||||||
|
keyring_file = f"/opt/platform/.keyring/{constants.SW_VERSION}/.CREDENTIAL"
|
||||||
|
return os.path.exists(keyring_file)
|
||||||
|
|
Loading…
Reference in New Issue