update/software/software/software_agent.py

822 lines
30 KiB
Python

"""
Copyright (c) 2024 Wind River Systems, Inc.
SPDX-License-Identifier: Apache-2.0
"""
import json
import os
import random
import requests
import select
import shutil
import socket
import subprocess
import sys
import time
import software.ostree_utils as ostree_utils
from software.software_functions import configure_logging
from software.software_functions import LOG
import software.config as cfg
from software.base import PatchService
from software.exceptions import OSTreeCommandFail
import software.utils as utils
import software.messages as messages
import software.constants as constants
from tsconfig.tsconfig import http_port
from tsconfig.tsconfig import install_uuid
from tsconfig.tsconfig import subfunctions
from tsconfig.tsconfig import SW_VERSION
pidfile_path = "/var/run/software_agent.pid"
agent_running_after_reboot_flag = \
"/var/run/software_agent_running_after_reboot"
node_is_patched_file = "/var/run/node_is_patched"
node_is_software_updated_rr_file = "/var/run/node_is_software_updated_rr"
patch_installing_file = "/var/run/patch_installing"
patch_failed_file = "/var/run/software_install_failed"
node_is_locked_file = "/var/run/.node_locked"
ostree_pull_completed_deployment_pending_file = \
"/var/run/ostree_pull_completed_deployment_pending"
mount_pending_file = "/var/run/mount_pending"
insvc_software_scripts = "/run/software/software-scripts"
insvc_software_flags = "/run/software/software-flags"
insvc_software_restart_agent = "/run/software/.restart.software-agent"
run_insvc_software_scripts_cmd = "/usr/sbin/run-software-scripts"
pa = None
http_port_real = http_port
def setflag(fname):
try:
with open(fname, "w") as f:
f.write("%d\n" % os.getpid())
except Exception:
LOG.exception("Failed to update %s flag", fname)
def clearflag(fname):
if os.path.exists(fname):
try:
os.remove(fname)
except Exception:
LOG.exception("Failed to clear %s flag", fname)
def pull_restart_scripts_from_controller():
# If the rsync fails, it raises an exception to
# the caller "handle_install()" and fails the
# host-install request for this host.
# The restart_scripts are optional, so if the files
# are not present, it should not raise any exception
try:
output = subprocess.check_output(["rsync",
"-acv",
"--delete",
"--exclude", "tmp",
"rsync://controller/repo/software-scripts/",
"%s/" % insvc_software_scripts],
stderr=subprocess.STDOUT)
LOG.info("Synced restart scripts from controller: %s", output)
except subprocess.CalledProcessError as e:
if "No such file or directory" in e.output.decode("utf-8"):
LOG.info("No restart scripts contained in the release")
else:
LOG.exception("Failed to sync restart scripts from controller")
raise
def check_install_uuid():
controller_install_uuid_url = "http://controller:%s/feed/rel-%s/install_uuid" % (http_port_real, SW_VERSION)
try:
req = requests.get(controller_install_uuid_url)
if req.status_code != 200:
# If we're on controller-1, controller-0 may not have the install_uuid
# matching this release, if we're in an upgrade. If the file doesn't exist,
# bypass this check
if socket.gethostname() == "controller-1":
return True
LOG.error("Failed to get install_uuid from controller")
return False
except requests.ConnectionError:
LOG.error("Failed to connect to controller")
return False
controller_install_uuid = str(req.text).rstrip()
if install_uuid != controller_install_uuid:
LOG.error("Local install_uuid=%s doesn't match controller=%s", install_uuid, controller_install_uuid)
return False
return True
class PatchMessageSendLatestFeedCommit(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SEND_LATEST_FEED_COMMIT)
def decode(self, data):
global pa
messages.PatchMessage.decode(self, data)
if 'latest_feed_commit' in data:
pa.latest_feed_commit = data['latest_feed_commit']
def encode(self):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pa
# Check if the node is patch current
pa.query()
class PatchMessageHelloAgent(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT)
self.patch_op_counter = 0
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'patch_op_counter' in data:
self.patch_op_counter = data['patch_op_counter']
def encode(self):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
# Send response
#
# If a user tries to do a host-install on an unlocked node,
# without bypassing the lock check (either via in-service
# patch or --force option), the agent will set its state
# to Install-Rejected in order to report back the rejection.
# However, since this should just be a transient state,
# we don't want the client reporting the Install-Rejected
# state indefinitely, so reset it to Idle after a minute or so.
#
if pa.state == constants.PATCH_AGENT_STATE_INSTALL_REJECTED:
if os.path.exists(node_is_locked_file):
# Node has been locked since rejected attempt. Reset the state
pa.state = constants.PATCH_AGENT_STATE_IDLE
elif (time.time() - pa.rejection_timestamp) > 60:
# Rejected state for more than a minute. Reset it.
pa.state = constants.PATCH_AGENT_STATE_IDLE
if self.patch_op_counter > 0:
pa.handle_patch_op_counter(self.patch_op_counter)
resp = PatchMessageHelloAgentAck()
resp.send(sock)
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageHelloAgentAck(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT_ACK)
def encode(self):
global pa
messages.PatchMessage.encode(self)
self.message['query_id'] = pa.query_id
self.message['out_of_date'] = pa.changes
self.message['hostname'] = socket.gethostname()
self.message['requires_reboot'] = pa.node_is_patched
self.message['patch_failed'] = pa.patch_failed
self.message['sw_version'] = SW_VERSION
self.message['state'] = pa.state
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pa
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pa.controller_address, cfg.controller_port))
class SoftwareMessageDeployStateUpdate(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE)
self.data = {}
def decode(self, data):
messages.PatchMessage.decode(self, data)
self.data = data
def encode(self):
# Nothing to add, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pa
filesystem_data = utils.get_software_filesystem_data()
synced_filesystem_data = utils.get_synced_software_filesystem_data()
actual_state = {"deploy_host": filesystem_data.get("deploy_host", {}),
"deploy": filesystem_data.get("deploy", {})}
synced_state = {"deploy_host": synced_filesystem_data.get("deploy_host", {}),
"deploy": synced_filesystem_data.get("deploy", {})}
peer_state = {"deploy_host": self.data.get("deploy_state").get("deploy_host", {}),
"deploy": self.data.get("deploy_state").get("deploy", {})}
result = "diverged"
if actual_state == peer_state:
result = messages.MSG_ACK_SUCCESS
elif actual_state == synced_state:
result = messages.MSG_ACK_SUCCESS
if result == messages.MSG_ACK_SUCCESS:
utils.save_to_json_file(constants.SOFTWARE_JSON_FILE, peer_state)
utils.save_to_json_file(constants.SYNCED_SOFTWARE_JSON_FILE, peer_state)
resp = SoftwareMessageDeployStateUpdateAck()
resp.send(sock, result)
def send(self, sock): # pylint: disable=unused-argument
LOG.info("Should not get here")
class SoftwareMessageDeployStateUpdateAck(messages.PatchMessage):
def __init__(self, peer_state_data=None):
self.peer_state_data = peer_state_data
messages.PatchMessage.__init__(self, messages.PATCHMSG_DEPLOY_STATE_UPDATE_ACK)
def encode(self, result): # pylint: disable=arguments-differ
global pa
messages.PatchMessage.encode(self)
synced_data = utils.get_synced_software_filesystem_data()
self.message["result"] = result
self.message["deploy_state"] = synced_data
def handle(self, sock, addr): # pylint: disable=unused-argument
LOG.error("Should not get here")
def send(self, sock, result):
global pa
self.encode(result)
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pa.controller_address, cfg.controller_port))
class PatchMessageQueryDetailed(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED)
def decode(self, data):
messages.PatchMessage.decode(self, data)
def encode(self):
# Nothing to add to the HELLO_AGENT, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
# Send response
LOG.info("Handling detailed query")
resp = PatchMessageQueryDetailedResp()
resp.send(sock)
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageQueryDetailedResp(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED_RESP)
def encode(self):
global pa
messages.PatchMessage.encode(self)
self.message['latest_sysroot_commit'] = pa.latest_sysroot_commit
self.message['nodetype'] = cfg.nodetype
self.message['sw_version'] = SW_VERSION
self.message['subfunctions'] = subfunctions
self.message['state'] = pa.state
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
self.encode()
message = json.dumps(self.message)
sock.sendall(str.encode(message))
class PatchMessageAgentInstallReq(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_REQ)
self.force = False
def decode(self, data):
messages.PatchMessage.decode(self, data)
if 'force' in data:
self.force = data['force']
def encode(self):
# Nothing to add to the HELLO_AGENT, so just call the super class
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
LOG.info("Handling host install request, force=%s", self.force)
global pa
resp = PatchMessageAgentInstallResp()
if not self.force:
setflag(node_is_software_updated_rr_file)
if not os.path.exists(node_is_locked_file):
if self.force:
LOG.info("Installing on unlocked node, with force option")
else:
LOG.info("Rejecting install request on unlocked node")
pa.state = constants.PATCH_AGENT_STATE_INSTALL_REJECTED
pa.rejection_timestamp = time.time()
resp.status = False
resp.reject_reason = 'Node must be locked.'
resp.send(sock, addr)
return
resp.status = pa.handle_install()
resp.send(sock, addr)
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
class PatchMessageAgentInstallResp(messages.PatchMessage):
def __init__(self):
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_RESP)
self.status = False
self.reject_reason = None
def encode(self):
global pa
messages.PatchMessage.encode(self)
self.message['status'] = self.status
if self.reject_reason is not None:
self.message['reject_reason'] = self.reject_reason
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock, addr):
address = (addr[0], cfg.controller_port)
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), address)
# Send a hello ack to follow it
resp = PatchMessageHelloAgentAck()
resp.send(sock)
class PatchAgent(PatchService):
def __init__(self):
PatchService.__init__(self)
self.sock_out = None
self.sock_in = None
self.controller_address = None
self.listener = None
self.changes = False
self.latest_feed_commit = None
self.latest_sysroot_commit = None
self.patch_op_counter = 0
self.node_is_patched = os.path.exists(node_is_patched_file)
self.node_is_patched_timestamp = 0
self.query_id = 0
self.state = constants.PATCH_AGENT_STATE_IDLE
self.last_config_audit = 0
self.rejection_timestamp = 0
self.last_repo_revision = None
# Check state flags
if os.path.exists(patch_installing_file):
# We restarted while installing. Change to failed
setflag(patch_failed_file)
os.remove(patch_installing_file)
if os.path.exists(patch_failed_file):
self.state = constants.PATCH_AGENT_STATE_INSTALL_FAILED
self.patch_failed = os.path.exists(patch_failed_file)
def update_config(self):
cfg.read_config()
if self.port != cfg.agent_port:
self.port = cfg.agent_port
# Loopback interface does not support multicast messaging, therefore
# revert to using unicast messaging when configured against the
# loopback device
if cfg.get_mgmt_iface() == constants.LOOPBACK_INTERFACE_NAME:
self.mcast_addr = None
self.controller_address = cfg.get_mgmt_ip()
else:
self.mcast_addr = cfg.agent_mcast_group
self.controller_address = cfg.controller_mcast_group
def setup_tcp_socket(self):
address_family = utils.get_management_family()
self.listener = socket.socket(address_family, socket.SOCK_STREAM)
self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.listener.bind(('', self.port))
self.listener.listen(2) # Allow two connections, for two controllers
def query(self):
"""Check current patch state """
if not check_install_uuid():
LOG.info("Failed install_uuid check. Skipping query")
return False
# Generate a unique query id
self.query_id = random.random()
# determine OSTREE state of the system and the patches
self.changes = False
active_sysroot_commit = ostree_utils.get_sysroot_latest_commit()
self.latest_sysroot_commit = active_sysroot_commit
self.last_repo_revision = active_sysroot_commit
# latest_feed_commit is sent from patch controller
# if unprovisioned (no mgmt ip) attempt to query it
if self.latest_feed_commit is None:
if self.sock_out is None:
try:
self.latest_feed_commit = ostree_utils.get_feed_latest_commit(SW_VERSION)
except OSTreeCommandFail:
LOG.warning("Unable to query latest feed commit")
# latest_feed_commit will remain as None
if self.latest_feed_commit:
if active_sysroot_commit != self.latest_feed_commit:
LOG.info("Active Sysroot Commit:%s does not match "
"active controller's Feed Repo Commit: %s",
active_sysroot_commit, self.latest_feed_commit)
self.changes = True
return True
def handle_install(self,
verbose_to_stdout=False,
disallow_insvc_patch=False,
delete_older_deployments=False):
#
# The disallow_insvc_patch parameter is set when we're installing
# the patch during init. At that time, we don't want to deal with
# in-service patch scripts, so instead we'll treat any patch as
# a reboot-required when this parameter is set. Rather than running
# any scripts, the RR flag will be set, which will result in the node
# being rebooted immediately upon completion of the installation.
#
# The delete_older_deployments is set when the system has
# been rebooted.
#
LOG.info("Handling install")
# Check the INSTALL_UUID first. If it doesn't match the active
# controller, we don't want to install patches.
if not check_install_uuid():
LOG.error("Failed install_uuid check. Skipping install")
self.patch_failed = True
setflag(patch_failed_file)
self.state = constants.PATCH_AGENT_STATE_INSTALL_FAILED
# Send a hello to provide a state update
if self.sock_out is not None:
hello_ack = PatchMessageHelloAgentAck()
hello_ack.send(self.sock_out)
return False
self.state = constants.PATCH_AGENT_STATE_INSTALLING
setflag(patch_installing_file)
if delete_older_deployments:
ostree_utils.delete_older_deployments()
try:
# Create insvc patch directories
if not os.path.exists(insvc_software_scripts):
os.makedirs(insvc_software_scripts, 0o700)
if not os.path.exists(insvc_software_flags):
os.makedirs(insvc_software_flags, 0o700)
except Exception:
LOG.exception("Failed to create in-service patch directories")
# Send a hello to provide a state update
if self.sock_out is not None:
hello_ack = PatchMessageHelloAgentAck()
hello_ack.send(self.sock_out)
# Build up the install set
if verbose_to_stdout:
print("Checking for software updates...")
self.query() # sets self.changes
changed = False
success = True
if self.changes or \
os.path.exists(ostree_pull_completed_deployment_pending_file) or \
os.path.exists(mount_pending_file):
try:
# Pull changes from remote to the sysroot ostree
# The remote value is configured inside
# "/sysroot/ostree/repo/config" file
ostree_utils.pull_ostree_from_remote()
setflag(ostree_pull_completed_deployment_pending_file)
except OSTreeCommandFail:
LOG.exception("Failed to pull changes and create deployment"
"during host-install.")
success = False
try:
# Create a new deployment once the changes are pulled
ostree_utils.create_deployment()
changed = True
clearflag(ostree_pull_completed_deployment_pending_file)
except OSTreeCommandFail:
LOG.exception("Failed to pull changes and create deployment"
"during host-install.")
success = False
if changed:
# Update the node_is_patched flag
setflag(node_is_patched_file)
self.node_is_patched = True
if verbose_to_stdout:
print("This node has been patched.")
if os.path.exists(node_is_software_updated_rr_file):
LOG.info("Reboot is required. Skipping patch-scripts")
elif disallow_insvc_patch:
LOG.info("Disallowing patch-scripts. Treating as reboot-required")
setflag(node_is_software_updated_rr_file)
else:
LOG.info("Mounting the new deployment")
try:
pending_deployment = ostree_utils.fetch_pending_deployment()
deployment_dir = constants.OSTREE_BASE_DEPLOYMENT_DIR + pending_deployment
setflag(mount_pending_file)
ostree_utils.mount_new_deployment(deployment_dir)
clearflag(mount_pending_file)
LOG.info("Running in-service patch-scripts")
pull_restart_scripts_from_controller()
subprocess.check_output(run_insvc_software_scripts_cmd, stderr=subprocess.STDOUT)
# Clear the node_is_patched flag, since we've handled it in-service
clearflag(node_is_patched_file)
self.node_is_patched = False
except subprocess.CalledProcessError as e:
LOG.exception("In-Service patch installation failed")
LOG.error("Command output: %s", e.output)
success = False
# Clear the in-service patch dirs
if os.path.exists(insvc_software_scripts):
shutil.rmtree(insvc_software_scripts, ignore_errors=True)
if os.path.exists(insvc_software_flags):
shutil.rmtree(insvc_software_flags, ignore_errors=True)
if success:
self.patch_failed = False
clearflag(patch_failed_file)
self.state = constants.PATCH_AGENT_STATE_IDLE
else:
# Update the patch_failed flag
self.patch_failed = True
setflag(patch_failed_file)
self.state = constants.PATCH_AGENT_STATE_INSTALL_FAILED
clearflag(patch_installing_file)
self.query()
if self.changes:
LOG.warning("Installing the patch did not change the patch current status")
# Send a hello to provide a state update
if self.sock_out is not None:
hello_ack = PatchMessageHelloAgentAck()
hello_ack.send(self.sock_out)
# Indicate if the method was successful
# success means no change needed, or a change worked.
return success
def handle_patch_op_counter(self, counter):
changed = False
if os.path.exists(node_is_patched_file):
# The node has been patched. Run a query if:
# - node_is_patched didn't exist previously
# - node_is_patched timestamp changed
timestamp = os.path.getmtime(node_is_patched_file)
if not self.node_is_patched:
self.node_is_patched = True
self.node_is_patched_timestamp = timestamp
changed = True
elif self.node_is_patched_timestamp != timestamp:
self.node_is_patched_timestamp = timestamp
changed = True
elif self.node_is_patched:
self.node_is_patched = False
self.node_is_patched_timestamp = 0
changed = True
if self.patch_op_counter < counter:
self.patch_op_counter = counter
changed = True
if changed:
rc = self.query()
if not rc:
# Query failed. Reset the op counter
self.patch_op_counter = 0
def run(self):
self.setup_socket()
while self.sock_out is None:
# Check every thirty seconds?
# Once we've got a conf file, tied into packstack,
# we'll get restarted when the file is updated,
# and this should be unnecessary.
time.sleep(30)
self.setup_socket()
self.setup_tcp_socket()
# Ok, now we've got our socket.
# Let's let the controllers know we're here
hello_ack = PatchMessageHelloAgentAck()
hello_ack.send(self.sock_out)
first_hello = True
connections = []
timeout = time.time() + 30.0
remaining = 30
while True:
inputs = [self.sock_in, self.listener] + connections
outputs = []
rlist, wlist, xlist = select.select(inputs, outputs, inputs, remaining)
remaining = int(timeout - time.time())
if remaining <= 0 or remaining > 30:
timeout = time.time() + 30.0
remaining = 30
if (len(rlist) == 0 and
len(wlist) == 0 and
len(xlist) == 0):
# Timeout hit
self.audit_socket()
continue
for s in rlist:
if s == self.listener:
conn, addr = s.accept()
connections.append(conn)
continue
data = ''
addr = None
msg = None
if s == self.sock_in:
# Receive from UDP
data, addr = s.recvfrom(1024)
else:
# Receive from TCP
while True:
try:
packet = s.recv(1024)
except socket.error:
LOG.exception("Socket error on recv")
data = ''
break
if packet:
data += packet.decode()
if data == '':
break
try:
json.loads(data)
break
except ValueError:
# Message is incomplete
continue
else:
# End of TCP message received
break
if data == '':
# Connection dropped
connections.remove(s)
s.close()
continue
msgdata = json.loads(data)
# For now, discard any messages that are not msgversion==1
if 'msgversion' in msgdata and msgdata['msgversion'] != 1:
continue
if 'msgtype' in msgdata:
if msgdata['msgtype'] == messages.PATCHMSG_HELLO_AGENT:
if first_hello:
self.query()
first_hello = False
msg = PatchMessageHelloAgent()
elif msgdata['msgtype'] == messages.PATCHMSG_QUERY_DETAILED:
msg = PatchMessageQueryDetailed()
elif msgdata['msgtype'] == messages.PATCHMSG_SEND_LATEST_FEED_COMMIT:
msg = PatchMessageSendLatestFeedCommit()
elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_REQ:
msg = PatchMessageAgentInstallReq()
elif msgdata['msgtype'] == messages.PATCHMSG_DEPLOY_STATE_UPDATE:
msg = SoftwareMessageDeployStateUpdate()
if msg is None:
msg = messages.PatchMessage()
msg.decode(msgdata)
if s == self.sock_in:
msg.handle(self.sock_out, addr)
else:
msg.handle(s, addr)
for s in xlist:
if s in connections:
connections.remove(s)
s.close()
# Check for in-service patch restart flag
if os.path.exists(insvc_software_restart_agent):
# Make sure it's safe to restart, ie. no reqs queued
rlist, wlist, xlist = select.select(inputs, outputs, inputs, 0)
if (len(rlist) == 0 and
len(wlist) == 0 and
len(xlist) == 0):
# Restart
LOG.info("In-service software restart flag detected. Exiting.")
os.remove(insvc_software_restart_agent)
exit(0)
def main():
global pa
configure_logging()
cfg.read_config()
pa = PatchAgent()
pa.query()
if os.path.exists(agent_running_after_reboot_flag):
delete_older_deployments_flag = False
else:
setflag(agent_running_after_reboot_flag)
delete_older_deployments_flag = True
if len(sys.argv) <= 1:
pa.run()
elif sys.argv[1] == "--install":
if not check_install_uuid():
# In certain cases, the lighttpd server could still be running using
# its default port 80, as opposed to the port configured in platform.conf
global http_port_real
LOG.info("Failed install_uuid check via http_port=%s. Trying with default port 80", http_port_real)
http_port_real = 80
pa.handle_install(verbose_to_stdout=True,
disallow_insvc_patch=True,
delete_older_deployments=delete_older_deployments_flag)
elif sys.argv[1] == "--status":
rc = 0
if pa.changes:
rc = 1
exit(rc)