Create message class to sync deploy state between controllers

This commit creates classes to handle the communication and sync
of deploy state between controllers and also add a operation
counter of deploy states.

This commit also synced with peer controller when deploy starts.

Test Plan:

PASS: Software.json of peer controller updated on deploy start.
PASS: Software.json of synced folder on peer controller matches
active controller on deploy start.
PASS: Software.json of peer controller updated with deploy state
change.
PASS: deploy state synced to peer in DX system when software deploy
start succeed.

Depends-on: https://review.opendev.org/c/starlingx/update/+/904362

Story: 2010676
Task: 49325

Change-Id: Id69b15e38402b5314657de963f5b69f164e2c351
Signed-off-by: Luis Eduardo Bonatti <LuizEduardo.Bonatti@windriver.com>
This commit is contained in:
Luis Eduardo Bonatti 2023-12-27 00:05:20 -03:00 committed by Bin Qian
parent ce3dc63c3e
commit 9c990c4d5e
7 changed files with 144 additions and 5 deletions

View File

@ -28,6 +28,7 @@ REPO_DIR=${REPO_ROOT}/debian/rel-${SW_VERSION}
GROUPS_FILE=$REPO_DIR/comps.xml
PATCHING_DIR=/opt/software
RELEASE=bullseye
SYNCED_SOFTWARE_FILESYSTEM_DIR=${PATCHING_DIR}/synced
logfile=/var/log/software.log
@ -62,6 +63,11 @@ function do_setup {
mkdir -p $PATCHING_DIR
fi
if [ ! -d $SYNCED_SOFTWARE_FILESYSTEM_DIR ]; then
LOG "Creating $SYNCED_SOFTWARE_FILESYSTEM_DIR"
mkdir -p $SYNCED_SOFTWARE_FILESYSTEM_DIR
fi
# If we can ping the active controller, sync the repos
LOG_TO_FILE "ping -c 1 -w 1 controller"
ping -c 1 -w 1 controller >> $logfile 2>&1 || ping6 -c 1 -w 1 controller >> $logfile 2>&1

View File

@ -54,10 +54,10 @@ class Root(base.APIBase):
def convert(self):
root = Root()
root.name = "StarlingX USM API"
root.description = ("Unified Software Management API allows for a "
"single REST API / CLI and single procedure for updating "
"the StarlingX software on a Standalone Cloud or Distributed Cloud."
)
root.description = (
"Unified Software Management API allows for a "
"single REST API / CLI and single procedure for updating "
"the StarlingX software on a Standalone Cloud or Distributed Cloud.")
root.versions = [Version.convert('v1')]
root.default_version = Version.convert('v1')
return root

View File

@ -144,7 +144,8 @@ LOCAL_LOAD_IMPORT_FILE = "/etc/software/usm_load_import"
LICENSE_FILE = "/etc/platform/.license"
VERIFY_LICENSE_BINARY = "/usr/bin/verify-license"
SOFTWARE_JSON_FILE = "/opt/software/software.json"
SOFTWARE_JSON_FILE = "%s/software.json" % SOFTWARE_STORAGE_DIR
SYNCED_SOFTWARE_JSON_FILE = "%s/synced/software.json" % SOFTWARE_STORAGE_DIR
# The value "software-deploy" is also used in rule file
SOFTWARE_DEPLOY_FOLDER = "software-deploy"

View File

@ -41,6 +41,8 @@ PATCHMSG_STR = {
PATCHMSG_DEPLOY_STATE_UPDATE_ACK: "deploy-state-update-ack"
}
MSG_ACK_SUCCESS = 'success'
class PatchMessage(object):
def __init__(self, msgtype=PATCHMSG_UNKNOWN):

View File

@ -195,6 +195,72 @@ class PatchMessageHelloAgentAck(messages.PatchMessage):
sock.sendto(str.encode(message), (pa.controller_address, cfg.controller_port))
class SoftwareMessageDeployStateUpdate(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE)
self.data = {}
def decode(self, data):
messages.PatchMessage.decode(self, data)
self.data = data
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pa
filesystem_data = utils.get_software_filesystem_data()
synced_filesystem_data = utils.get_synced_software_filesystem_data()
actual_state = {"deploy_host": filesystem_data.get("deploy_host", {}),
"deploy": filesystem_data.get("deploy", {})}
synced_state = {"deploy_host": synced_filesystem_data.get("deploy_host", {}),
"deploy": synced_filesystem_data.get("deploy", {})}
peer_state = {"deploy_host": self.data.get("deploy_state").get("deploy_host", {}),
"deploy": self.data.get("deploy_state").get("deploy", {})}
result = "diverged"
if actual_state == peer_state:
result = messages.MSG_ACK_SUCCESS
elif actual_state == synced_state:
result = messages.MSG_ACK_SUCCESS
if result == messages.MSG_ACK_SUCCESS:
utils.save_to_json_file(constants.SOFTWARE_JSON_FILE, peer_state)
utils.save_to_json_file(constants.SYNCED_SOFTWARE_JSON_FILE, peer_state)
resp = SoftwareMessageDeployStateUpdateAck()
resp.send(sock, result)
def send(self, sock): # pylint: disable=unused-argument
LOG.info("Should not get here")
class SoftwareMessageDeployStateUpdateAck(messages.PatchMessage):
def __init__(self, peer_state_data=None):
self.peer_state_data = peer_state_data
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE_ACK)
def encode(self, result): # pylint: disable=arguments-differ
global pa
messages.PatchMessage.encode(self)
synced_data = utils.get_synced_software_filesystem_data()
self.message["result"] = result
self.message["deploy_state"] = synced_data
def handle(self, sock, addr): # pylint: disable=unused-argument
LOG.error("Should not get here")
def send(self, sock, result):
global pa
self.encode(result)
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pa.controller_address, cfg.controller_port))
class PatchMessageQueryDetailed(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED)
@ -681,6 +747,8 @@ class PatchAgent(PatchService):
msg = PatchMessageSendLatestFeedCommit()
elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_REQ:
msg = PatchMessageAgentInstallReq()
elif msgdata['msgtype'] == messages.PATCHMSG_DEPLOY_STATE_UPDATE:
msg = SoftwareMessageDeployStateUpdate()
if msg is None:
msg = messages.PatchMessage()

View File

@ -599,6 +599,50 @@ class PatchMessageDropHostReq(messages.PatchMessage):
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class SoftwareMessageDeployStateUpdate(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE)
def encode(self):
global sc
messages.PatchMessage.encode(self)
filesystem_data = utils.get_software_filesystem_data()
deploys_state = {"deploy_host": filesystem_data.get("deploy_host", {}),
"deploy": filesystem_data.get("deploy", {})}
self.message["deploy_state"] = deploys_state
def handle(self, sock, addr): # pylint: disable=unused-argument
LOG.error("Should not get here")
def send(self, sock):
global sc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (sc.agent_address, cfg.agent_port))
class SoftwareMessageDeployStateUpdateAck(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE_ACK)
self.peer_state_data = {}
def decode(self, data):
messages.PatchMessage.decode(self, data)
self.peer_state_data = data
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global sc
if self.peer_state_data["result"] == messages.MSG_ACK_SUCCESS:
LOG.debug("Peer controller is synced with value: %s",
self.peer_state_data["deploy_state"])
else:
LOG.error("Peer controller deploy state has diverged.")
class PatchController(PatchService):
def __init__(self):
PatchService.__init__(self)
@ -1869,6 +1913,14 @@ class PatchController(PatchService):
msg = "Failed to delete the restart script for %s" % patch_id
LOG.exception(msg)
def _update_state_to_peer(self):
state_update_msg = SoftwareMessageDeployStateUpdate()
self.socket_lock.acquire()
try:
state_update_msg.send(self.sock_out)
finally:
self.socket_lock.release()
def _release_basic_checks(self, deployment):
"""
Does basic sanity checks on the release data
@ -2053,6 +2105,7 @@ class PatchController(PatchService):
if sw_rel is None:
raise InternalError("%s cannot be found" % to_release)
sw_rel.update_state(constants.DEPLOYING)
self._update_state_to_peer()
msg_info = "Deployment for %s started" % deployment
else:
msg_error = "Deployment for %s failed to start" % deployment
@ -2942,6 +2995,8 @@ class PatchControllerMainThread(threading.Thread):
msg = PatchMessageAgentInstallResp()
elif msgdata['msgtype'] == messages.PATCHMSG_DROP_HOST_REQ:
msg = PatchMessageDropHostReq()
elif msgdata['msgtype'] == messages.PATCHMSG_DEPLOY_STATE_UPDATE_ACK:
msg = SoftwareMessageDeployStateUpdateAck()
if msg is None:
msg = messages.PatchMessage()

View File

@ -384,6 +384,13 @@ def get_software_filesystem_data():
return {}
def get_synced_software_filesystem_data():
if os.path.exists(constants.SYNCED_SOFTWARE_JSON_FILE):
return load_from_json_file(constants.SYNCED_SOFTWARE_JSON_FILE)
else:
return {}
def validate_versions(versions):
"""
Validate a list of versions