Software deploy host implementation

This commit enables software deploy host in a Debian env for
all patches. It also includes the ostree_utils required to
checkout a sysroot ostree commit, install/uninstall packages
from a checked-out commit, and write commit to feed ostree -
all of which are not being invoked by software deploy host
as of today but are intended for future use.

Test Plan:
[PASS] software deploy host

Story: 2010676
Task: 47948
Signed-off-by: Jessica Castelino <jessica.castelino@windriver.com>
Change-Id: I2178010f3af5cd8e69b28a06866e567248088cba
This commit is contained in:
Jessica Castelino 2023-05-02 22:47:37 +00:00
parent 5015668dd7
commit c863d8e47d
10 changed files with 327 additions and 131 deletions

View File

@ -2,5 +2,9 @@
# see https://docs.openstack.org/infra/bindep/ for additional information.
# Do not install python2 rpms in a python3 only environment such as debian-bullseye
gir1.2-glib-2.0 [platform:dpkg]
gir1.2-ostree-1.0 [platform:dpkg]
libcairo2-dev [platform:dpkg]
libgirepository1.0-dev [platform:dpkg]
python3-rpm [platform:dpkg]
rpm-python [platform:rpm]

View File

@ -7,6 +7,7 @@ oslo.serialization
netaddr
pecan
pycryptodomex
PyGObject
requests_toolbelt
sh
WebOb

View File

@ -7,14 +7,14 @@ SPDX-License-Identifier: Apache-2.0
from pecan import expose
from software.exceptions import PatchError
from software.software_controller import pc
from software.software_controller import sc
class PatchAPIController(object):
class SoftwareAPIController(object):
@expose('json')
@expose('query.xml', content_type='application/xml')
def host_install_async(self, *args):
def deploy_host(self, *args):
if len(list(args)) == 0:
return dict(error="Host must be specified for install")
force = False
@ -22,7 +22,7 @@ class PatchAPIController(object):
force = True
try:
result = pc.patch_host_install(list(args)[0], force, async_req=True)
result = sc.software_deploy_host_api(list(args)[0], force, async_req=True)
except PatchError as e:
return dict(error="Error: %s" % str(e))
@ -30,11 +30,11 @@ class PatchAPIController(object):
@expose('json')
def is_applied(self, *args):
return pc.is_applied(list(args))
return sc.is_applied(list(args))
@expose('json')
def is_available(self, *args):
return pc.is_available(list(args))
return sc.is_available(list(args))
class RootController:
@ -46,5 +46,5 @@ class RootController:
"""index for the root"""
return "Unified Software Management API, Available versions: /v1"
patch = PatchAPIController()
v1 = PatchAPIController()
software = SoftwareAPIController()
v1 = SoftwareAPIController()

View File

@ -40,7 +40,7 @@ PATCH_AGENT_STATE_INSTALLING = "installing"
PATCH_AGENT_STATE_INSTALL_FAILED = "install-failed"
PATCH_AGENT_STATE_INSTALL_REJECTED = "install-rejected"
PATCH_STORAGE_DIR = "/opt/software"
SOFTWARE_STORAGE_DIR = "/opt/software"
OSTREE_REF = "starlingx"
OSTREE_REMOTE = "debian"
@ -54,3 +54,5 @@ LOOPBACK_INTERFACE_NAME = "lo"
SEMANTIC_PREAPPLY = 'pre-apply'
SEMANTIC_PREREMOVE = 'pre-remove'
SEMANTIC_ACTIONS = [SEMANTIC_PREAPPLY, SEMANTIC_PREREMOVE]
CHECKOUT_FOLDER = "checked_out_commit"

View File

@ -5,9 +5,17 @@ SPDX-License-Identifier: Apache-2.0
"""
import logging
import os
import sh
import shutil
import subprocess
import gi
gi.require_version('OSTree', '1.0')
from gi.repository import Gio
from gi.repository import GLib
from gi.repository import OSTree
from software import constants
from software.exceptions import OSTreeCommandFail
@ -322,3 +330,173 @@ def delete_older_deployments():
% (e.returncode, e.stderr.decode("utf-8"))
LOG.info(info_msg)
raise OSTreeCommandFail(msg)
def checkout_latest_ostree_commit(patch_sw_version):
"""
Checkout the latest feed ostree commit to a temporary folder.
"""
try:
repo_src = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR,
patch_sw_version)
src_repo = OSTree.Repo.new(Gio.File.new_for_path(repo_src))
src_repo.open(None)
_, ref = OSTree.Repo.list_refs(src_repo, constants.OSTREE_REF, None)
dest_base = constants.SOFTWARE_STORAGE_DIR
dest_folder = constants.CHECKOUT_FOLDER
fd = os.open(dest_base, os.O_DIRECTORY)
is_checked_out = OSTree.Repo.checkout_at(src_repo, None, fd, dest_folder,
ref[constants.OSTREE_REF], None)
LOG.info("Feed OSTree latest commit checked out %s", is_checked_out)
os.close(fd)
except GLib.Error as e:
msg = "Failed to checkout latest commit to /opt/software/checked_out_commit directory."
info_msg = "OSTree Checkout Error: %s" \
% (vars(e))
LOG.info(info_msg)
raise OSTreeCommandFail(msg)
finally:
LOG.info("Checked out %s", is_checked_out)
os.close(fd)
def install_deb_package(package_list):
"""
Installs deb package to a checked out commit.
:param package_name: The list of packages to be installed.
"""
real_root = os.open("/", os.O_RDONLY)
try:
dest_base = constants.SOFTWARE_STORAGE_DIR
dest_folder = constants.CHECKOUT_FOLDER
dest_location = f"{dest_base}/{dest_folder}"
# Copy deb packages
tmp_location = f"{dest_location}/var/tmp"
package_location = f"{dest_base}/packages"
shutil.copy(package_location, tmp_location)
os.chroot(dest_location)
os.chdir('/')
try:
subprocess.check_output(["ln", "-sfn", "usr/etc", "etc"], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.info("Failed ln command: %s", e.output)
# change into the /var/tmp in the chroot
os.chdir("/var/tmp")
# install the debian package'
try:
for package in package_list:
subprocess.check_output(["dpkg", "-i", package], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.info("Failed dpkg install command: %s", e.output)
# still inside the chroot
os.chdir('/')
if os.path.isdir("/etc"):
LOG.info(os.path.isdir("etc"))
os.remove("etc")
finally:
os.fchdir(real_root)
os.chroot(".")
# now we can safely close this fd
os.close(real_root)
LOG.info("Exiting chroot")
os.chdir("/home/sysadmin")
LOG.info("Changed directory to /home/sysadmin")
def uninstall_deb_package(package_list):
"""
Uninstalls deb package from a checked out commit.
:param package_name: The list of packages to be uninstalled.
"""
real_root = os.open("/", os.O_RDONLY)
try:
dest_base = constants.SOFTWARE_STORAGE_DIR
dest_folder = constants.CHECKOUT_FOLDER
dest_location = f"{dest_base}/{dest_folder}"
# Copy deb packages
tmp_location = f"{dest_location}/var/tmp"
package_location = f"{dest_base}/packages"
shutil.copy(package_location, tmp_location)
os.chroot(dest_location)
os.chdir('/')
try:
subprocess.check_output(["ln", "-sfn", "usr/etc", "etc"], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.info("Failed ln command: %s", e.output)
# change into the /var/tmp in the chroot
os.chdir("/var/tmp")
# uninstall the debian package'
try:
# todo(jcasteli): Identify if we need to remove any
# /var/lib/dpkg/info/<package>.prerm files
for package in package_list:
subprocess.check_output(["dpkg", "--purge", package], stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
LOG.info("Failed dpkg purge command: %s", e.output)
# still inside the chroot
os.chdir('/')
if os.path.isdir("/etc"):
LOG.info(os.path.isdir("etc"))
os.remove("etc")
finally:
os.fchdir(real_root)
os.chroot(".")
# now we can safely close this fd
os.close(real_root)
LOG.info("Exiting chroot")
os.chdir("/home/sysadmin")
LOG.info("Changed directory to /home/sysadmin")
def write_to_feed_ostree(patch_name, patch_sw_version):
"""
Write a new commit to the feed ostree.
"""
try:
repo_src = "%s/rel-%s/ostree_repo" % (constants.FEED_OSTREE_BASE_DIR,
patch_sw_version)
src_repo = OSTree.Repo.new(Gio.File.new_for_path(repo_src))
src_repo.open(None)
_, ref = OSTree.Repo.list_refs(src_repo, constants.OSTREE_REF, None)
OSTree.Repo.prepare_transaction(src_repo, None)
OSTree.Repo.scan_hardlinks(src_repo, None)
dest_base = constants.SOFTWARE_STORAGE_DIR
dest_folder = constants.CHECKOUT_FOLDER
dest_location = f"{dest_base}/{dest_folder}"
build_dir = Gio.File.new_for_path(dest_location)
mtree = OSTree.MutableTree()
OSTree.Repo.write_directory_to_mtree(src_repo, build_dir, mtree, None, None)
write_success, root = OSTree.Repo.write_mtree(src_repo, mtree, None)
LOG.info("Writing to mutable tree: %s", write_success)
subject = "Patch %s - Deploy Host completed" % (patch_name)
commitSuccess, commit = OSTree.Repo.write_commit(src_repo,
ref[constants.OSTREE_REF],
subject,
None,
None,
root,
None)
LOG.info("Writing to sysroot ostree: %s", commitSuccess)
LOG.info("Setting transaction ref")
OSTree.Repo.transaction_set_ref(src_repo, None, constants.OSTREE_REF, commit)
LOG.info("Commiting ostree transaction")
OSTree.Repo.commit_transaction(src_repo, None)
LOG.info("Regenerating summary")
OSTree.Repo.regenerate_summary(src_repo, None, None)
except GLib.Error as e:
msg = "Failed to write commit to feed ostree repo."
info_msg = "OSTree Commit Write Error: %s" \
% (vars(e))
LOG.info(info_msg)
raise OSTreeCommandFail(msg)

View File

@ -15,7 +15,7 @@ import subprocess
import sys
import time
from software import ostree_utils
import software.ostree_utils as ostree_utils
from software.software_functions import configure_logging
from software.software_functions import LOG
import software.config as cfg

View File

@ -762,7 +762,10 @@ def wait_for_install_complete(agent_ip):
break
else:
m = re.search("(Error message:.*)", req.text, re.MULTILINE)
print(m.group(0))
if m:
print(m.group(0))
else:
print(vars(req))
rc = 1
break
@ -773,8 +776,8 @@ def host_install(args):
rc = 0
agent_ip = args.agent
# Issue host_install_async request and poll for results
url = "http://%s/software/host_install_async/%s" % (api_addr, agent_ip)
# Issue deploy_host request and poll for results
url = "http://%s/software/deploy_host/%s" % (api_addr, agent_ip)
if args.force:
url += "/force"
@ -849,8 +852,8 @@ def software_deploy_host_req(args):
rc = 0
agent_ip = args.agent
# Issue host_install_async request and poll for results
url = "http://%s/software/host_install_async/%s" % (api_addr, agent_ip)
# Issue deploy_host request and poll for results
url = "http://%s/software/deploy_host/%s" % (api_addr, agent_ip)
if args.force:
url += "/force"
@ -1174,6 +1177,14 @@ def register_deploy_commands(commands):
help='Deploy to a software host'
)
cmd.set_defaults(cmd='host')
cmd.set_defaults(func=software_deploy_host_req)
cmd.add_argument('agent',
help="Agent on which host deploy is triggered")
cmd.add_argument('-f',
'--force',
action='store_true',
required=False,
help="Force deploy host")
# --- software deploy list ---------------------------
cmd = sub_cmds.add_parser(

View File

@ -26,7 +26,7 @@ from wsgiref import simple_server
from oslo_config import cfg as oslo_cfg
from software import ostree_utils
import software.ostree_utils as ostree_utils
from software.api import app
from software.authapi import app as auth_app
from software.base import PatchService
@ -69,10 +69,10 @@ CONF = oslo_cfg.CONF
pidfile_path = "/var/run/patch_controller.pid"
pc = None
state_file = "%s/.controller.state" % constants.PATCH_STORAGE_DIR
sc = None
state_file = "%s/.controller.state" % constants.SOFTWARE_STORAGE_DIR
app_dependency_basename = "app_dependencies.json"
app_dependency_filename = "%s/%s" % (constants.PATCH_STORAGE_DIR, app_dependency_basename)
app_dependency_filename = "%s/%s" % (constants.SOFTWARE_STORAGE_DIR, app_dependency_basename)
insvc_patch_restart_controller = "/run/software/.restart.software-controller"
@ -189,15 +189,15 @@ class AgentNeighbour(object):
"sw_version": self.sw_version,
"state": self.state}
global pc
if self.out_of_date and not pc.allow_insvc_patching:
global sc
if self.out_of_date and not sc.allow_insvc_patching:
d["requires_reboot"] = True
else:
d["requires_reboot"] = self.requires_reboot
# Included for future enhancement, to allow per-node determination
# of in-service patching
d["allow_insvc_patching"] = pc.allow_insvc_patching
d["allow_insvc_patching"] = sc.allow_insvc_patching
return d
@ -213,12 +213,12 @@ class PatchMessageHello(messages.PatchMessage):
self.patch_op_counter = data['patch_op_counter']
def encode(self):
global pc
global sc
messages.PatchMessage.encode(self)
self.message['patch_op_counter'] = pc.patch_op_counter
self.message['patch_op_counter'] = sc.patch_op_counter
def handle(self, sock, addr):
global pc
global sc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
@ -226,16 +226,16 @@ class PatchMessageHello(messages.PatchMessage):
# Send response
if self.patch_op_counter > 0:
pc.handle_nbr_patch_op_counter(host, self.patch_op_counter)
sc.handle_nbr_patch_op_counter(host, self.patch_op_counter)
resp = PatchMessageHelloAck()
resp.send(sock)
def send(self, sock):
global pc
global sc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class PatchMessageHelloAck(messages.PatchMessage):
@ -247,20 +247,20 @@ class PatchMessageHelloAck(messages.PatchMessage):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
global sc
pc.controller_neighbours_lock.acquire()
if not addr[0] in pc.controller_neighbours:
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
sc.controller_neighbours_lock.acquire()
if not addr[0] in sc.controller_neighbours:
sc.controller_neighbours[addr[0]] = ControllerNeighbour()
pc.controller_neighbours[addr[0]].rx_ack()
pc.controller_neighbours_lock.release()
sc.controller_neighbours[addr[0]].rx_ack()
sc.controller_neighbours_lock.release()
def send(self, sock):
global pc
global sc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class PatchMessageSyncReq(messages.PatchMessage):
@ -272,7 +272,7 @@ class PatchMessageSyncReq(messages.PatchMessage):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
global sc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
@ -281,17 +281,17 @@ class PatchMessageSyncReq(messages.PatchMessage):
# We may need to do this in a separate thread, so that we continue to process hellos
LOG.info("Handling sync req")
pc.sync_from_nbr(host)
sc.sync_from_nbr(host)
resp = PatchMessageSyncComplete()
resp.send(sock)
def send(self, sock):
global pc
global sc
LOG.info("sending sync req")
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class PatchMessageSyncComplete(messages.PatchMessage):
@ -303,22 +303,22 @@ class PatchMessageSyncComplete(messages.PatchMessage):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
global sc
LOG.info("Handling sync complete")
pc.controller_neighbours_lock.acquire()
if not addr[0] in pc.controller_neighbours:
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
sc.controller_neighbours_lock.acquire()
if not addr[0] in sc.controller_neighbours:
sc.controller_neighbours[addr[0]] = ControllerNeighbour()
pc.controller_neighbours[addr[0]].rx_synced()
pc.controller_neighbours_lock.release()
sc.controller_neighbours[addr[0]].rx_synced()
sc.controller_neighbours_lock.release()
def send(self, sock):
global pc
global sc
LOG.info("sending sync complete")
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class PatchMessageHelloAgent(messages.PatchMessage):
@ -326,19 +326,19 @@ class PatchMessageHelloAgent(messages.PatchMessage):
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT)
def encode(self):
global pc
global sc
messages.PatchMessage.encode(self)
self.message['patch_op_counter'] = pc.patch_op_counter
self.message['patch_op_counter'] = sc.patch_op_counter
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pc
global sc
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (sc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
@ -347,19 +347,19 @@ class PatchMessageSendLatestFeedCommit(messages.PatchMessage):
messages.PatchMessage.__init__(self, messages.PATCHMSG_SEND_LATEST_FEED_COMMIT)
def encode(self):
global pc
global sc
messages.PatchMessage.encode(self)
self.message['latest_feed_commit'] = pc.latest_feed_commit
self.message['latest_feed_commit'] = sc.latest_feed_commit
def handle(self, sock, addr):
LOG.error("Should not get here")
def send(self, sock):
global pc
global sc
self.encode()
message = json.dumps(self.message)
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
sock.sendto(str.encode(message), (pc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (sc.agent_address, cfg.agent_port))
sock.sendto(str.encode(message), (local_hostname, cfg.agent_port))
@ -396,20 +396,20 @@ class PatchMessageHelloAgentAck(messages.PatchMessage):
messages.PatchMessage.encode(self)
def handle(self, sock, addr):
global pc
global sc
pc.hosts_lock.acquire()
if not addr[0] in pc.hosts:
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
sc.hosts_lock.acquire()
if not addr[0] in sc.hosts:
sc.hosts[addr[0]] = AgentNeighbour(addr[0])
pc.hosts[addr[0]].rx_ack(self.agent_hostname,
sc.hosts[addr[0]].rx_ack(self.agent_hostname,
self.agent_out_of_date,
self.agent_requires_reboot,
self.query_id,
self.agent_patch_failed,
self.agent_sw_version,
self.agent_state)
pc.hosts_lock.release()
sc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
@ -459,25 +459,25 @@ class PatchMessageQueryDetailedResp(messages.PatchMessage):
LOG.error("Should not get here")
def handle(self, sock, addr):
global pc
global sc
ip = addr[0]
pc.hosts_lock.acquire()
if ip in pc.hosts:
pc.hosts[ip].handle_query_detailed_resp(self.latest_sysroot_commit,
sc.hosts_lock.acquire()
if ip in sc.hosts:
sc.hosts[ip].handle_query_detailed_resp(self.latest_sysroot_commit,
self.nodetype,
self.agent_sw_version,
self.subfunctions,
self.agent_state)
for patch_id in list(pc.interim_state):
if ip in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(ip)
if len(pc.interim_state[patch_id]) == 0:
del pc.interim_state[patch_id]
pc.hosts_lock.release()
pc.check_patch_states()
for patch_id in list(sc.interim_state):
if ip in sc.interim_state[patch_id]:
sc.interim_state[patch_id].remove(ip)
if len(sc.interim_state[patch_id]) == 0:
del sc.interim_state[patch_id]
sc.hosts_lock.release()
sc.check_patch_states()
else:
pc.hosts_lock.release()
sc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
@ -490,7 +490,7 @@ class PatchMessageAgentInstallReq(messages.PatchMessage):
self.force = False
def encode(self):
global pc
global sc
messages.PatchMessage.encode(self)
self.message['force'] = self.force
@ -523,17 +523,17 @@ class PatchMessageAgentInstallResp(messages.PatchMessage):
def handle(self, sock, addr):
LOG.info("Handling install resp from %s", addr[0])
global pc
global sc
# LOG.info("Handling hello ack")
pc.hosts_lock.acquire()
if not addr[0] in pc.hosts:
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
sc.hosts_lock.acquire()
if not addr[0] in sc.hosts:
sc.hosts[addr[0]] = AgentNeighbour(addr[0])
pc.hosts[addr[0]].install_status = self.status
pc.hosts[addr[0]].install_pending = False
pc.hosts[addr[0]].install_reject_reason = self.reject_reason
pc.hosts_lock.release()
sc.hosts[addr[0]].install_status = self.status
sc.hosts[addr[0]].install_pending = False
sc.hosts[addr[0]].install_reject_reason = self.reject_reason
sc.hosts_lock.release()
def send(self, sock): # pylint: disable=unused-argument
LOG.error("Should not get here")
@ -554,7 +554,7 @@ class PatchMessageDropHostReq(messages.PatchMessage):
self.ip = data['ip']
def handle(self, sock, addr):
global pc
global sc
host = addr[0]
if host == cfg.get_mgmt_ip():
# Ignore messages from self
@ -564,14 +564,14 @@ class PatchMessageDropHostReq(messages.PatchMessage):
LOG.error("Received PATCHMSG_DROP_HOST_REQ with no ip: %s", json.dumps(self.data))
return
pc.drop_host(self.ip, sync_nbr=False)
sc.drop_host(self.ip, sync_nbr=False)
return
def send(self, sock):
global pc
global sc
self.encode()
message = json.dumps(self.message)
sock.sendto(str.encode(message), (pc.controller_address, cfg.controller_port))
sock.sendto(str.encode(message), (sc.controller_address, cfg.controller_port))
class PatchController(PatchService):
@ -2025,8 +2025,8 @@ class PatchController(PatchService):
for nbr in list(self.hosts):
host = self.hosts[nbr].get_dict()
host["interim_state"] = False
for patch_id in list(pc.interim_state):
if nbr in pc.interim_state[patch_id]:
for patch_id in list(sc.interim_state):
if nbr in sc.interim_state[patch_id]:
host["interim_state"] = True
output.append(host)
@ -2084,7 +2084,7 @@ class PatchController(PatchService):
msg = "Failed to delete the restart script for %s" % patch_id
LOG.exception(msg)
def patch_host_install(self, host_ip, force, async_req=False):
def software_deploy_host_api(self, host_ip, force, async_req=False):
msg_info = ""
msg_warning = ""
msg_error = ""
@ -2107,7 +2107,7 @@ class PatchController(PatchService):
LOG.error("Error in host-install: %s", msg)
return dict(info=msg_info, warning=msg_warning, error=msg_error)
msg = "Running host-install for %s (%s), force=%s, async_req=%s" % (host_ip, ip, force, async_req)
msg = "Running software deploy host for %s (%s), force=%s, async_req=%s" % (host_ip, ip, force, async_req)
LOG.info(msg)
audit_log_info(msg)
@ -2291,7 +2291,7 @@ class PatchController(PatchService):
try:
tmpfile, tmpfname = tempfile.mkstemp(
prefix=app_dependency_basename,
dir=constants.PATCH_STORAGE_DIR)
dir=constants.SOFTWARE_STORAGE_DIR)
os.write(tmpfile, json.dumps(self.app_dependencies).encode())
os.close(tmpfile)
@ -2471,13 +2471,13 @@ class PatchControllerMainThread(threading.Thread):
# LOG.info ("Initializing Main thread")
def run(self):
global pc
global sc
global thread_death
# LOG.info ("In Main thread")
try:
sock_in = pc.setup_socket()
sock_in = sc.setup_socket()
while sock_in is None:
# Check every thirty seconds?
@ -2485,18 +2485,18 @@ class PatchControllerMainThread(threading.Thread):
# we'll get restarted when the file is updated,
# and this should be unnecessary.
time.sleep(30)
sock_in = pc.setup_socket()
sock_in = sc.setup_socket()
# Ok, now we've got our socket. Let's start with a hello!
pc.socket_lock.acquire()
sc.socket_lock.acquire()
hello = PatchMessageHello()
hello.send(pc.sock_out)
hello.send(sc.sock_out)
hello_agent = PatchMessageHelloAgent()
hello_agent.send(pc.sock_out)
hello_agent.send(sc.sock_out)
pc.socket_lock.release()
sc.socket_lock.release()
# Send hello every thirty seconds
hello_timeout = time.time() + 30.0
@ -2518,7 +2518,7 @@ class PatchControllerMainThread(threading.Thread):
os.remove(insvc_patch_restart_controller)
return
inputs = [pc.sock_in] + agent_query_conns
inputs = [sc.sock_in] + agent_query_conns
outputs = []
# LOG.info("Running select, remaining=%d", remaining)
@ -2528,7 +2528,7 @@ class PatchControllerMainThread(threading.Thread):
len(wlist) == 0 and
len(xlist) == 0):
# Timeout hit
pc.audit_socket()
sc.audit_socket()
# LOG.info("Checking sockets")
for s in rlist:
@ -2536,11 +2536,11 @@ class PatchControllerMainThread(threading.Thread):
addr = None
msg = None
if s == pc.sock_in:
if s == sc.sock_in:
# Receive from UDP
pc.socket_lock.acquire()
sc.socket_lock.acquire()
data, addr = s.recvfrom(1024)
pc.socket_lock.release()
sc.socket_lock.release()
else:
# Receive from TCP
while True:
@ -2603,13 +2603,13 @@ class PatchControllerMainThread(threading.Thread):
msg = messages.PatchMessage()
msg.decode(msgdata)
if s == pc.sock_in:
msg.handle(pc.sock_out, addr)
if s == sc.sock_in:
msg.handle(sc.sock_out, addr)
else:
msg.handle(s, addr)
# We can drop the connection after a query response
if msg.msgtype == messages.PATCHMSG_QUERY_DETAILED_RESP and s != pc.sock_in:
if msg.msgtype == messages.PATCHMSG_QUERY_DETAILED_RESP and s != sc.sock_in:
agent_query_conns.remove(s)
s.shutdown(socket.SHUT_RDWR)
s.close()
@ -2630,38 +2630,38 @@ class PatchControllerMainThread(threading.Thread):
hello_timeout = time.time() + 30.0
remaining = 30
pc.socket_lock.acquire()
sc.socket_lock.acquire()
hello = PatchMessageHello()
hello.send(pc.sock_out)
hello.send(sc.sock_out)
hello_agent = PatchMessageHelloAgent()
hello_agent.send(pc.sock_out)
hello_agent.send(sc.sock_out)
pc.socket_lock.release()
sc.socket_lock.release()
# Age out neighbours
pc.controller_neighbours_lock.acquire()
nbrs = list(pc.controller_neighbours)
sc.controller_neighbours_lock.acquire()
nbrs = list(sc.controller_neighbours)
for n in nbrs:
# Age out controllers after 2 minutes
if pc.controller_neighbours[n].get_age() >= 120:
if sc.controller_neighbours[n].get_age() >= 120:
LOG.info("Aging out controller %s from table", n)
del pc.controller_neighbours[n]
pc.controller_neighbours_lock.release()
del sc.controller_neighbours[n]
sc.controller_neighbours_lock.release()
pc.hosts_lock.acquire()
nbrs = list(pc.hosts)
sc.hosts_lock.acquire()
nbrs = list(sc.hosts)
for n in nbrs:
# Age out hosts after 1 hour
if pc.hosts[n].get_age() >= 3600:
if sc.hosts[n].get_age() >= 3600:
LOG.info("Aging out host %s from table", n)
del pc.hosts[n]
for patch_id in list(pc.interim_state):
if n in pc.interim_state[patch_id]:
pc.interim_state[patch_id].remove(n)
del sc.hosts[n]
for patch_id in list(sc.interim_state):
if n in sc.interim_state[patch_id]:
sc.interim_state[patch_id].remove(n)
pc.hosts_lock.release()
sc.hosts_lock.release()
except Exception:
# Log all exceptions
LOG.exception("Error occurred during request processing")
@ -2690,8 +2690,8 @@ def main():
# that create directories with tempfile will not use /tmp
os.environ['TMPDIR'] = '/scratch'
global pc
pc = PatchController()
global sc
sc = PatchController()
LOG.info("launching")
api_thread = PatchControllerApiThread()

View File

@ -37,7 +37,7 @@ except Exception:
SW_VERSION = "unknown"
# Constants
patch_dir = constants.PATCH_STORAGE_DIR
patch_dir = constants.SOFTWARE_STORAGE_DIR
avail_dir = "%s/metadata/available" % patch_dir
applied_dir = "%s/metadata/applied" % patch_dir
committed_dir = "%s/metadata/committed" % patch_dir

View File

@ -85,10 +85,10 @@ class SoftwareControllerMessagesTestCase(testtools.TestCase):
self.assertIsNotNone(test_obj)
self.assertIsInstance(test_obj, PatchMessage)
@mock.patch('software.software_controller.pc', FakeSoftwareController())
@mock.patch('software.software_controller.sc', FakeSoftwareController())
def test_message_class_encode(self):
"""'encode' method populates self.message"""
# mock the global software_controller 'pc' variable used by encode
# mock the global software_controller 'sc' variable used by encode
# PatchMessageQueryDetailedResp does not support 'encode'
# so it can be executed, but it will not change the message
@ -108,7 +108,7 @@ class SoftwareControllerMessagesTestCase(testtools.TestCase):
test_obj2.decode(test_obj.message)
# decode does not populate 'message' so nothing to compare
@mock.patch('software.software_controller.pc', FakeSoftwareController())
@mock.patch('software.software_controller.sc', FakeSoftwareController())
@mock.patch('software.config.agent_mcast_group', FAKE_AGENT_MCAST_GROUP)
def test_message_class_send(self):
"""'send' writes to a socket"""
@ -142,7 +142,7 @@ class SoftwareControllerMessagesTestCase(testtools.TestCase):
if message_class in send_all:
mock_sock.sendall.assert_called()
@mock.patch('software.software_controller.pc', FakeSoftwareController())
@mock.patch('software.software_controller.sc', FakeSoftwareController())
def test_message_class_handle(self):
"""'handle' method tests"""
addr = [FAKE_CONTROLLER_ADDRESS, ] # addr is a list