2699 lines
99 KiB
Python
2699 lines
99 KiB
Python
"""
|
|
Copyright (c) 2014-2019 Wind River Systems, Inc.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
|
|
"""
|
|
import shutil
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import socket
|
|
import json
|
|
import select
|
|
import subprocess
|
|
from six.moves import configparser
|
|
import rpm
|
|
import os
|
|
import gc
|
|
|
|
from cgcs_patch.patch_functions import parse_pkgver
|
|
|
|
from wsgiref import simple_server
|
|
from cgcs_patch.api import app
|
|
from cgcs_patch.authapi import app as auth_app
|
|
from cgcs_patch.patch_functions import configure_logging
|
|
from cgcs_patch.patch_functions import BasePackageData
|
|
from cgcs_patch.patch_functions import avail_dir
|
|
from cgcs_patch.patch_functions import applied_dir
|
|
from cgcs_patch.patch_functions import committed_dir
|
|
from cgcs_patch.patch_functions import PatchFile
|
|
from cgcs_patch.patch_functions import parse_rpm_filename
|
|
from cgcs_patch.patch_functions import package_dir
|
|
from cgcs_patch.patch_functions import repo_dir
|
|
from cgcs_patch.patch_functions import semantics_dir
|
|
from cgcs_patch.patch_functions import SW_VERSION
|
|
from cgcs_patch.patch_functions import root_package_dir
|
|
from cgcs_patch.exceptions import MetadataFail
|
|
from cgcs_patch.exceptions import RpmFail
|
|
from cgcs_patch.exceptions import SemanticFail
|
|
from cgcs_patch.exceptions import PatchError
|
|
from cgcs_patch.exceptions import PatchFail
|
|
from cgcs_patch.exceptions import PatchInvalidRequest
|
|
from cgcs_patch.exceptions import PatchValidationFailure
|
|
from cgcs_patch.exceptions import PatchMismatchFailure
|
|
from cgcs_patch.patch_functions import LOG
|
|
from cgcs_patch.patch_functions import audit_log_info
|
|
from cgcs_patch.patch_functions import patch_dir
|
|
from cgcs_patch.patch_functions import repo_root_dir
|
|
from cgcs_patch.patch_functions import PatchData
|
|
from cgcs_patch.base import PatchService
|
|
|
|
import cgcs_patch.config as cfg
|
|
import cgcs_patch.utils as utils
|
|
# noinspection PyUnresolvedReferences
|
|
from oslo_config import cfg as oslo_cfg
|
|
|
|
import cgcs_patch.messages as messages
|
|
import cgcs_patch.constants as constants
|
|
|
|
from tsconfig.tsconfig import INITIAL_CONFIG_COMPLETE_FLAG
|
|
|
|
CONF = oslo_cfg.CONF
|
|
|
|
pidfile_path = "/var/run/patch_controller.pid"
|
|
|
|
pc = None
|
|
state_file = "%s/.controller.state" % constants.PATCH_STORAGE_DIR
|
|
app_dependency_basename = "app_dependencies.json"
|
|
app_dependency_filename = "%s/%s" % (constants.PATCH_STORAGE_DIR, app_dependency_basename)
|
|
|
|
insvc_patch_restart_controller = "/run/patching/.restart.patch-controller"
|
|
|
|
stale_hosts = []
|
|
pending_queries = []
|
|
|
|
thread_death = None
|
|
keep_running = True
|
|
|
|
# Limit socket blocking to 5 seconds to allow for thread to shutdown
|
|
api_socket_timeout = 5.0
|
|
|
|
|
|
class ControllerNeighbour(object):
|
|
def __init__(self):
|
|
self.last_ack = 0
|
|
self.synced = False
|
|
|
|
def rx_ack(self):
|
|
self.last_ack = time.time()
|
|
|
|
def get_age(self):
|
|
return int(time.time() - self.last_ack)
|
|
|
|
def rx_synced(self):
|
|
self.synced = True
|
|
|
|
def clear_synced(self):
|
|
self.synced = False
|
|
|
|
def get_synced(self):
|
|
return self.synced
|
|
|
|
|
|
class AgentNeighbour(object):
|
|
def __init__(self, ip):
|
|
self.ip = ip
|
|
self.last_ack = 0
|
|
self.last_query_id = 0
|
|
self.out_of_date = False
|
|
self.hostname = "n/a"
|
|
self.requires_reboot = False
|
|
self.patch_failed = False
|
|
self.stale = False
|
|
self.pending_query = False
|
|
self.installed = {}
|
|
self.to_remove = []
|
|
self.missing_pkgs = []
|
|
self.nodetype = None
|
|
self.sw_version = "unknown"
|
|
self.subfunctions = []
|
|
self.state = None
|
|
|
|
def rx_ack(self,
|
|
hostname,
|
|
out_of_date,
|
|
requires_reboot,
|
|
query_id,
|
|
patch_failed,
|
|
sw_version,
|
|
state):
|
|
self.last_ack = time.time()
|
|
self.hostname = hostname
|
|
self.patch_failed = patch_failed
|
|
self.sw_version = sw_version
|
|
self.state = state
|
|
|
|
if out_of_date != self.out_of_date or requires_reboot != self.requires_reboot:
|
|
self.out_of_date = out_of_date
|
|
self.requires_reboot = requires_reboot
|
|
LOG.info("Agent %s (%s) reporting out_of_date=%s, requires_reboot=%s",
|
|
self.hostname,
|
|
self.ip,
|
|
self.out_of_date,
|
|
self.requires_reboot)
|
|
|
|
if self.last_query_id != query_id:
|
|
self.last_query_id = query_id
|
|
self.stale = True
|
|
if self.ip not in stale_hosts and self.ip not in pending_queries:
|
|
stale_hosts.append(self.ip)
|
|
|
|
def get_age(self):
|
|
return int(time.time() - self.last_ack)
|
|
|
|
def handle_query_detailed_resp(self,
|
|
installed,
|
|
to_remove,
|
|
missing_pkgs,
|
|
nodetype,
|
|
sw_version,
|
|
subfunctions,
|
|
state):
|
|
self.installed = installed
|
|
self.to_remove = to_remove
|
|
self.missing_pkgs = missing_pkgs
|
|
self.nodetype = nodetype
|
|
self.stale = False
|
|
self.pending_query = False
|
|
self.sw_version = sw_version
|
|
self.subfunctions = subfunctions
|
|
self.state = state
|
|
|
|
if self.ip in pending_queries:
|
|
pending_queries.remove(self.ip)
|
|
|
|
if self.ip in stale_hosts:
|
|
stale_hosts.remove(self.ip)
|
|
|
|
def get_dict(self):
|
|
d = {"ip": self.ip,
|
|
"hostname": self.hostname,
|
|
"patch_current": not self.out_of_date,
|
|
"secs_since_ack": self.get_age(),
|
|
"patch_failed": self.patch_failed,
|
|
"stale_details": self.stale,
|
|
"installed": self.installed,
|
|
"to_remove": self.to_remove,
|
|
"missing_pkgs": self.missing_pkgs,
|
|
"nodetype": self.nodetype,
|
|
"subfunctions": self.subfunctions,
|
|
"sw_version": self.sw_version,
|
|
"state": self.state}
|
|
|
|
global pc
|
|
if self.out_of_date and not pc.allow_insvc_patching:
|
|
d["requires_reboot"] = True
|
|
else:
|
|
d["requires_reboot"] = self.requires_reboot
|
|
|
|
# Included for future enhancement, to allow per-node determination
|
|
# of in-service patching
|
|
d["allow_insvc_patching"] = pc.allow_insvc_patching
|
|
|
|
return d
|
|
|
|
|
|
class PatchMessageHello(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO)
|
|
self.patch_op_counter = 0
|
|
|
|
def decode(self, data):
|
|
messages.PatchMessage.decode(self, data)
|
|
if 'patch_op_counter' in data:
|
|
self.patch_op_counter = data['patch_op_counter']
|
|
|
|
def encode(self):
|
|
global pc
|
|
messages.PatchMessage.encode(self)
|
|
self.message['patch_op_counter'] = pc.patch_op_counter
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
host = addr[0]
|
|
if host == cfg.get_mgmt_ip():
|
|
# Ignore messages from self
|
|
return
|
|
|
|
# Send response
|
|
if self.patch_op_counter > 0:
|
|
pc.handle_nbr_patch_op_counter(host, self.patch_op_counter)
|
|
|
|
resp = PatchMessageHelloAck()
|
|
resp.send(sock)
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (pc.controller_address, cfg.controller_port))
|
|
|
|
|
|
class PatchMessageHelloAck(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_ACK)
|
|
|
|
def encode(self):
|
|
# Nothing to add, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
|
|
pc.controller_neighbours_lock.acquire()
|
|
if not addr[0] in pc.controller_neighbours:
|
|
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
|
|
|
|
pc.controller_neighbours[addr[0]].rx_ack()
|
|
pc.controller_neighbours_lock.release()
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (pc.controller_address, cfg.controller_port))
|
|
|
|
|
|
class PatchMessageSyncReq(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_REQ)
|
|
|
|
def encode(self):
|
|
# Nothing to add to the SYNC_REQ, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
host = addr[0]
|
|
if host == cfg.get_mgmt_ip():
|
|
# Ignore messages from self
|
|
return
|
|
|
|
# We may need to do this in a separate thread, so that we continue to process hellos
|
|
LOG.info("Handling sync req")
|
|
|
|
pc.sync_from_nbr(host)
|
|
|
|
resp = PatchMessageSyncComplete()
|
|
resp.send(sock)
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
LOG.info("sending sync req")
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (pc.controller_address, cfg.controller_port))
|
|
|
|
|
|
class PatchMessageSyncComplete(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_SYNC_COMPLETE)
|
|
|
|
def encode(self):
|
|
# Nothing to add to the SYNC_COMPLETE, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
LOG.info("Handling sync complete")
|
|
|
|
pc.controller_neighbours_lock.acquire()
|
|
if not addr[0] in pc.controller_neighbours:
|
|
pc.controller_neighbours[addr[0]] = ControllerNeighbour()
|
|
|
|
pc.controller_neighbours[addr[0]].rx_synced()
|
|
pc.controller_neighbours_lock.release()
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
LOG.info("sending sync complete")
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (pc.controller_address, cfg.controller_port))
|
|
|
|
|
|
class PatchMessageHelloAgent(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT)
|
|
|
|
def encode(self):
|
|
global pc
|
|
messages.PatchMessage.encode(self)
|
|
self.message['patch_op_counter'] = pc.patch_op_counter
|
|
|
|
def handle(self, sock, addr):
|
|
LOG.error("Should not get here")
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
local_hostname = utils.ip_to_versioned_localhost(cfg.agent_mcast_group)
|
|
sock.sendto(message, (pc.agent_address, cfg.agent_port))
|
|
sock.sendto(message, (local_hostname, cfg.agent_port))
|
|
|
|
|
|
class PatchMessageHelloAgentAck(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_HELLO_AGENT_ACK)
|
|
self.query_id = 0
|
|
self.agent_out_of_date = False
|
|
self.agent_hostname = "n/a"
|
|
self.agent_requires_reboot = False
|
|
self.agent_patch_failed = False
|
|
self.agent_sw_version = "unknown"
|
|
self.agent_state = "unknown"
|
|
|
|
def decode(self, data):
|
|
messages.PatchMessage.decode(self, data)
|
|
if 'query_id' in data:
|
|
self.query_id = data['query_id']
|
|
if 'out_of_date' in data:
|
|
self.agent_out_of_date = data['out_of_date']
|
|
if 'hostname' in data:
|
|
self.agent_hostname = data['hostname']
|
|
if 'requires_reboot' in data:
|
|
self.agent_requires_reboot = data['requires_reboot']
|
|
if 'patch_failed' in data:
|
|
self.agent_patch_failed = data['patch_failed']
|
|
if 'sw_version' in data:
|
|
self.agent_sw_version = data['sw_version']
|
|
if 'state' in data:
|
|
self.agent_state = data['state']
|
|
|
|
def encode(self):
|
|
# Nothing to add, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
|
|
pc.hosts_lock.acquire()
|
|
if not addr[0] in pc.hosts:
|
|
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
|
|
|
|
pc.hosts[addr[0]].rx_ack(self.agent_hostname,
|
|
self.agent_out_of_date,
|
|
self.agent_requires_reboot,
|
|
self.query_id,
|
|
self.agent_patch_failed,
|
|
self.agent_sw_version,
|
|
self.agent_state)
|
|
pc.hosts_lock.release()
|
|
|
|
def send(self, sock): # pylint: disable=unused-argument
|
|
LOG.error("Should not get here")
|
|
|
|
|
|
class PatchMessageQueryDetailed(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED)
|
|
|
|
def encode(self):
|
|
# Nothing to add to the message, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
LOG.error("Should not get here")
|
|
|
|
def send(self, sock):
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendall(message)
|
|
|
|
|
|
class PatchMessageQueryDetailedResp(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_QUERY_DETAILED_RESP)
|
|
self.agent_sw_version = "unknown"
|
|
self.installed = {}
|
|
self.to_install = {}
|
|
self.to_remove = []
|
|
self.missing_pkgs = []
|
|
self.subfunctions = []
|
|
self.nodetype = "unknown"
|
|
self.agent_sw_version = "unknown"
|
|
self.agent_state = "unknown"
|
|
|
|
def decode(self, data):
|
|
messages.PatchMessage.decode(self, data)
|
|
if 'installed' in data:
|
|
self.installed = data['installed']
|
|
if 'to_remove' in data:
|
|
self.to_remove = data['to_remove']
|
|
if 'missing_pkgs' in data:
|
|
self.missing_pkgs = data['missing_pkgs']
|
|
if 'nodetype' in data:
|
|
self.nodetype = data['nodetype']
|
|
if 'sw_version' in data:
|
|
self.agent_sw_version = data['sw_version']
|
|
if 'subfunctions' in data:
|
|
self.subfunctions = data['subfunctions']
|
|
if 'state' in data:
|
|
self.agent_state = data['state']
|
|
|
|
def encode(self):
|
|
LOG.error("Should not get here")
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
|
|
ip = addr[0]
|
|
pc.hosts_lock.acquire()
|
|
if ip in pc.hosts:
|
|
pc.hosts[ip].handle_query_detailed_resp(self.installed,
|
|
self.to_remove,
|
|
self.missing_pkgs,
|
|
self.nodetype,
|
|
self.agent_sw_version,
|
|
self.subfunctions,
|
|
self.agent_state)
|
|
for patch_id in pc.interim_state.keys():
|
|
if ip in pc.interim_state[patch_id]:
|
|
pc.interim_state[patch_id].remove(ip)
|
|
if len(pc.interim_state[patch_id]) == 0:
|
|
del pc.interim_state[patch_id]
|
|
pc.hosts_lock.release()
|
|
pc.check_patch_states()
|
|
else:
|
|
pc.hosts_lock.release()
|
|
|
|
def send(self, sock): # pylint: disable=unused-argument
|
|
LOG.error("Should not get here")
|
|
|
|
|
|
class PatchMessageAgentInstallReq(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_REQ)
|
|
self.ip = None
|
|
self.force = False
|
|
|
|
def encode(self):
|
|
global pc
|
|
messages.PatchMessage.encode(self)
|
|
self.message['force'] = self.force
|
|
|
|
def handle(self, sock, addr):
|
|
LOG.error("Should not get here")
|
|
|
|
def send(self, sock):
|
|
LOG.info("sending install request to node: %s", self.ip)
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (self.ip, cfg.agent_port))
|
|
|
|
|
|
class PatchMessageAgentInstallResp(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_AGENT_INSTALL_RESP)
|
|
self.status = False
|
|
self.reject_reason = None
|
|
|
|
def decode(self, data):
|
|
messages.PatchMessage.decode(self, data)
|
|
if 'status' in data:
|
|
self.status = data['status']
|
|
if 'reject_reason' in data:
|
|
self.reject_reason = data['reject_reason']
|
|
|
|
def encode(self):
|
|
# Nothing to add, so just call the super class
|
|
messages.PatchMessage.encode(self)
|
|
|
|
def handle(self, sock, addr):
|
|
LOG.info("Handling install resp from %s", addr[0])
|
|
global pc
|
|
# LOG.info("Handling hello ack")
|
|
|
|
pc.hosts_lock.acquire()
|
|
if not addr[0] in pc.hosts:
|
|
pc.hosts[addr[0]] = AgentNeighbour(addr[0])
|
|
|
|
pc.hosts[addr[0]].install_status = self.status
|
|
pc.hosts[addr[0]].install_pending = False
|
|
pc.hosts[addr[0]].install_reject_reason = self.reject_reason
|
|
pc.hosts_lock.release()
|
|
|
|
def send(self, sock): # pylint: disable=unused-argument
|
|
LOG.error("Should not get here")
|
|
|
|
|
|
class PatchMessageDropHostReq(messages.PatchMessage):
|
|
def __init__(self):
|
|
messages.PatchMessage.__init__(self, messages.PATCHMSG_DROP_HOST_REQ)
|
|
self.ip = None
|
|
|
|
def encode(self):
|
|
messages.PatchMessage.encode(self)
|
|
self.message['ip'] = self.ip
|
|
|
|
def decode(self, data):
|
|
messages.PatchMessage.decode(self, data)
|
|
if 'ip' in data:
|
|
self.ip = data['ip']
|
|
|
|
def handle(self, sock, addr):
|
|
global pc
|
|
host = addr[0]
|
|
if host == cfg.get_mgmt_ip():
|
|
# Ignore messages from self
|
|
return
|
|
|
|
if self.ip is None:
|
|
LOG.error("Received PATCHMSG_DROP_HOST_REQ with no ip: %s", json.dumps(self.data))
|
|
return
|
|
|
|
pc.drop_host(self.ip, sync_nbr=False)
|
|
return
|
|
|
|
def send(self, sock):
|
|
global pc
|
|
self.encode()
|
|
message = json.dumps(self.message)
|
|
sock.sendto(message, (pc.controller_address, cfg.controller_port))
|
|
|
|
|
|
class PatchController(PatchService):
|
|
def __init__(self):
|
|
PatchService.__init__(self)
|
|
|
|
# Locks
|
|
self.socket_lock = threading.RLock()
|
|
self.controller_neighbours_lock = threading.RLock()
|
|
self.hosts_lock = threading.RLock()
|
|
self.patch_data_lock = threading.RLock()
|
|
|
|
self.hosts = {}
|
|
self.controller_neighbours = {}
|
|
|
|
# interim_state is used to track hosts that have not responded
|
|
# with fresh queries since a patch was applied or removed, on
|
|
# a per-patch basis. This allows the patch controller to move
|
|
# patches immediately into a "Partial" state until all nodes
|
|
# have responded.
|
|
#
|
|
self.interim_state = {}
|
|
|
|
self.sock_out = None
|
|
self.sock_in = None
|
|
self.controller_address = None
|
|
self.agent_address = None
|
|
self.patch_op_counter = 1
|
|
self.patch_data = PatchData()
|
|
self.patch_data.load_all()
|
|
self.check_patch_states()
|
|
self.base_pkgdata = BasePackageData()
|
|
|
|
self.allow_insvc_patching = True
|
|
|
|
if os.path.exists(app_dependency_filename):
|
|
try:
|
|
with open(app_dependency_filename, 'r') as f:
|
|
self.app_dependencies = json.loads(f.read())
|
|
except Exception:
|
|
LOG.exception("Failed to read app dependencies: %s", app_dependency_filename)
|
|
else:
|
|
self.app_dependencies = {}
|
|
|
|
if os.path.isfile(state_file):
|
|
self.read_state_file()
|
|
else:
|
|
self.write_state_file()
|
|
|
|
def update_config(self):
|
|
cfg.read_config()
|
|
|
|
if self.port != cfg.controller_port:
|
|
self.port = cfg.controller_port
|
|
|
|
# Loopback interface does not support multicast messaging, therefore
|
|
# revert to using unicast messaging when configured against the
|
|
# loopback device
|
|
if cfg.get_mgmt_iface() == constants.LOOPBACK_INTERFACE_NAME:
|
|
mgmt_ip = cfg.get_mgmt_ip()
|
|
self.mcast_addr = None
|
|
self.controller_address = mgmt_ip
|
|
self.agent_address = mgmt_ip
|
|
else:
|
|
self.mcast_addr = cfg.controller_mcast_group
|
|
self.controller_address = cfg.controller_mcast_group
|
|
self.agent_address = cfg.agent_mcast_group
|
|
|
|
def socket_lock_acquire(self):
|
|
self.socket_lock.acquire()
|
|
|
|
def socket_lock_release(self):
|
|
try:
|
|
self.socket_lock.release()
|
|
except Exception:
|
|
pass
|
|
|
|
def write_state_file(self):
|
|
config = configparser.ConfigParser()
|
|
|
|
cfgfile = open(state_file, 'w')
|
|
|
|
config.add_section('runtime')
|
|
config.set('runtime', 'patch_op_counter', str(self.patch_op_counter))
|
|
config.write(cfgfile)
|
|
cfgfile.close()
|
|
|
|
def read_state_file(self):
|
|
config = configparser.ConfigParser()
|
|
|
|
config.read(state_file)
|
|
|
|
try:
|
|
counter = config.getint('runtime', 'patch_op_counter')
|
|
self.patch_op_counter = counter
|
|
|
|
LOG.info("patch_op_counter is: %d", self.patch_op_counter)
|
|
except configparser.Error:
|
|
LOG.exception("Failed to read state info")
|
|
|
|
def handle_nbr_patch_op_counter(self, host, nbr_patch_op_counter):
|
|
if self.patch_op_counter >= nbr_patch_op_counter:
|
|
return
|
|
|
|
self.sync_from_nbr(host)
|
|
|
|
def sync_from_nbr(self, host):
|
|
# Sync the patching repo
|
|
host_url = utils.ip_to_url(host)
|
|
try:
|
|
output = subprocess.check_output(["rsync",
|
|
"-acv",
|
|
"--delete",
|
|
"--exclude", "tmp",
|
|
"rsync://%s/patching/" % host_url,
|
|
"%s/" % patch_dir],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Synced to mate patching via rsync: %s", output)
|
|
except subprocess.CalledProcessError as e:
|
|
LOG.error("Failed to rsync: %s", e.output)
|
|
return False
|
|
|
|
try:
|
|
output = subprocess.check_output(["rsync",
|
|
"-acv",
|
|
"--delete",
|
|
"rsync://%s/repo/" % host_url,
|
|
"%s/" % repo_root_dir],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Synced to mate repo via rsync: %s", output)
|
|
except subprocess.CalledProcessError:
|
|
LOG.error("Failed to rsync: %s", output)
|
|
return False
|
|
|
|
self.read_state_file()
|
|
|
|
self.patch_data_lock.acquire()
|
|
self.hosts_lock.acquire()
|
|
self.interim_state = {}
|
|
self.patch_data.load_all()
|
|
self.check_patch_states()
|
|
self.hosts_lock.release()
|
|
|
|
if os.path.exists(app_dependency_filename):
|
|
try:
|
|
with open(app_dependency_filename, 'r') as f:
|
|
self.app_dependencies = json.loads(f.read())
|
|
except Exception:
|
|
LOG.exception("Failed to read app dependencies: %s", app_dependency_filename)
|
|
else:
|
|
self.app_dependencies = {}
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return True
|
|
|
|
def inc_patch_op_counter(self):
|
|
self.patch_op_counter += 1
|
|
self.write_state_file()
|
|
|
|
def check_patch_states(self):
|
|
# If we have no hosts, we can't be sure of the current patch state
|
|
if len(self.hosts) == 0:
|
|
for patch_id in self.patch_data.metadata:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
|
|
return
|
|
|
|
# Default to allowing in-service patching
|
|
self.allow_insvc_patching = True
|
|
|
|
# Take the detailed query results from the hosts and merge with the patch data
|
|
|
|
self.hosts_lock.acquire()
|
|
|
|
# Initialize patch state data based on repo state and interim_state presence
|
|
for patch_id in self.patch_data.metadata:
|
|
if patch_id in self.interim_state:
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
|
|
elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
|
|
else:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = \
|
|
self.patch_data.metadata[patch_id]["repostate"]
|
|
|
|
for ip in self.hosts.keys():
|
|
if not self.hosts[ip].out_of_date:
|
|
continue
|
|
|
|
for pkg in self.hosts[ip].installed.keys():
|
|
for patch_id in self.patch_data.content_versions.keys():
|
|
if pkg not in self.patch_data.content_versions[patch_id]:
|
|
continue
|
|
|
|
if patch_id not in self.patch_data.metadata:
|
|
LOG.error("Patch data missing for %s", patch_id)
|
|
continue
|
|
|
|
# If the patch is on a different release than the host, skip it.
|
|
if self.patch_data.metadata[patch_id]["sw_version"] != self.hosts[ip].sw_version:
|
|
continue
|
|
|
|
# Is the installed pkg higher or lower version?
|
|
# The rpm.labelCompare takes version broken into 3 components
|
|
installed_ver = self.hosts[ip].installed[pkg].split('@')[0]
|
|
if ":" in installed_ver:
|
|
# Ignore epoch
|
|
installed_ver = installed_ver.split(':')[1]
|
|
|
|
patch_ver = self.patch_data.content_versions[patch_id][pkg]
|
|
if ":" in patch_ver:
|
|
# Ignore epoch
|
|
patch_ver = patch_ver.split(':')[1]
|
|
|
|
rc = rpm.labelCompare(parse_pkgver(installed_ver),
|
|
parse_pkgver(patch_ver))
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
|
|
# The RPM is not expected to be installed.
|
|
# If the installed version is the same or higher,
|
|
# this patch is in a Partial-Remove state
|
|
if rc >= 0 or patch_id in self.interim_state:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
|
|
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
|
|
self.allow_insvc_patching = False
|
|
continue
|
|
elif self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
|
|
# The RPM is expected to be installed.
|
|
# If the installed version is the lower,
|
|
# this patch is in a Partial-Apply state
|
|
if rc == -1 or patch_id in self.interim_state:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
|
|
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
|
|
self.allow_insvc_patching = False
|
|
continue
|
|
|
|
if self.hosts[ip].sw_version == "14.10":
|
|
# For Release 1
|
|
personality = "personality-%s" % self.hosts[ip].nodetype
|
|
else:
|
|
personality = "personality-%s" % "-".join(self.hosts[ip].subfunctions)
|
|
|
|
# Check the to_remove list
|
|
for pkg in self.hosts[ip].to_remove:
|
|
for patch_id in self.patch_data.content_versions.keys():
|
|
if pkg not in self.patch_data.content_versions[patch_id]:
|
|
continue
|
|
|
|
if patch_id not in self.patch_data.metadata:
|
|
LOG.error("Patch data missing for %s", patch_id)
|
|
continue
|
|
|
|
if personality not in self.patch_data.metadata[patch_id]:
|
|
continue
|
|
|
|
if pkg not in self.patch_data.metadata[patch_id][personality]:
|
|
continue
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
|
|
# The RPM is not expected to be installed.
|
|
# This patch is in a Partial-Remove state
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
|
|
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
|
|
self.allow_insvc_patching = False
|
|
continue
|
|
|
|
# Check the missing_pkgs list
|
|
for pkg in self.hosts[ip].missing_pkgs:
|
|
for patch_id in self.patch_data.content_versions.keys():
|
|
if pkg not in self.patch_data.content_versions[patch_id]:
|
|
continue
|
|
|
|
if patch_id not in self.patch_data.metadata:
|
|
LOG.error("Patch data missing for %s", patch_id)
|
|
continue
|
|
|
|
if personality not in self.patch_data.metadata[patch_id]:
|
|
continue
|
|
|
|
if pkg not in self.patch_data.metadata[patch_id][personality]:
|
|
continue
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
|
|
# The RPM is expected to be installed.
|
|
# This patch is in a Partial-Apply state
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
|
|
if self.patch_data.metadata[patch_id].get("reboot_required") != "N":
|
|
self.allow_insvc_patching = False
|
|
continue
|
|
|
|
self.hosts_lock.release()
|
|
|
|
def get_store_filename(self, patch_sw_version, rpmname):
|
|
rpm_dir = package_dir[patch_sw_version]
|
|
rpmfile = "%s/%s" % (rpm_dir, rpmname)
|
|
return rpmfile
|
|
|
|
def get_repo_filename(self, patch_sw_version, rpmname):
|
|
rpmfile = self.get_store_filename(patch_sw_version, rpmname)
|
|
if not os.path.isfile(rpmfile):
|
|
msg = "Could not find rpm: %s" % rpmfile
|
|
LOG.error(msg)
|
|
return None
|
|
|
|
repo_filename = None
|
|
|
|
try:
|
|
# Get the architecture from the RPM
|
|
pkgarch = subprocess.check_output(["rpm",
|
|
"-qp",
|
|
"--queryformat",
|
|
"%{ARCH}",
|
|
"--nosignature",
|
|
rpmfile])
|
|
|
|
repo_filename = "%s/Packages/%s/%s" % (repo_dir[patch_sw_version], pkgarch, rpmname)
|
|
except subprocess.CalledProcessError:
|
|
msg = "RPM query failed for %s" % rpmfile
|
|
LOG.exception(msg)
|
|
return None
|
|
|
|
return repo_filename
|
|
|
|
def run_semantic_check(self, action, patch_list):
|
|
if not os.path.exists(INITIAL_CONFIG_COMPLETE_FLAG):
|
|
# Skip semantic checks if initial configuration isn't complete
|
|
return
|
|
|
|
# Pass the current patch state to the semantic check as a series of args
|
|
patch_state_args = []
|
|
for patch_id in self.patch_data.metadata.keys():
|
|
patch_state = '%s=%s' % (patch_id, self.patch_data.metadata[patch_id]["patchstate"])
|
|
patch_state_args += ['-p', patch_state]
|
|
|
|
# Run semantic checks, if any
|
|
for patch_id in patch_list:
|
|
semchk = os.path.join(semantics_dir, action, patch_id)
|
|
|
|
if os.path.exists(semchk):
|
|
try:
|
|
LOG.info("Running semantic check: %s", semchk)
|
|
subprocess.check_output([semchk] + patch_state_args,
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Semantic check %s passed", semchk)
|
|
except subprocess.CalledProcessError as e:
|
|
msg = "Semantic check failed for %s:\n%s" % (patch_id, e.output)
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
|
|
def patch_import_api(self, patches):
|
|
"""
|
|
Import patches
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
# Refresh data, if needed
|
|
self.base_pkgdata.loaddirs()
|
|
|
|
# Protect against duplications
|
|
patch_list = sorted(list(set(patches)))
|
|
|
|
# First, make sure the specified files exist
|
|
for patch in patch_list:
|
|
if not os.path.isfile(patch):
|
|
raise PatchFail("File does not exist: %s" % patch)
|
|
|
|
try:
|
|
if not os.path.exists(avail_dir):
|
|
os.makedirs(avail_dir)
|
|
if not os.path.exists(applied_dir):
|
|
os.makedirs(applied_dir)
|
|
if not os.path.exists(committed_dir):
|
|
os.makedirs(committed_dir)
|
|
except os.error:
|
|
msg = "Failed to create directories"
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
|
|
msg = "Importing patches: %s" % ",".join(patch_list)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
for patch in patch_list:
|
|
msg = "Importing patch: %s" % patch
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
# Get the patch_id from the filename
|
|
# and check to see if it's already imported
|
|
(patch_id, ext) = os.path.splitext(os.path.basename(patch))
|
|
if patch_id in self.patch_data.metadata:
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
|
|
mdir = applied_dir
|
|
elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
|
|
msg = "%s is committed. Metadata not updated" % patch_id
|
|
LOG.info(msg)
|
|
msg_info += msg + "\n"
|
|
continue
|
|
else:
|
|
mdir = avail_dir
|
|
|
|
try:
|
|
thispatch = PatchFile.extract_patch(patch,
|
|
metadata_dir=mdir,
|
|
metadata_only=True,
|
|
existing_content=self.patch_data.contents[patch_id],
|
|
allpatches=self.patch_data,
|
|
base_pkgdata=self.base_pkgdata)
|
|
self.patch_data.update_patch(thispatch)
|
|
msg = "%s is already imported. Updated metadata only" % patch_id
|
|
LOG.info(msg)
|
|
msg_info += msg + "\n"
|
|
except PatchMismatchFailure:
|
|
msg = "Contents of %s do not match re-imported patch" % patch_id
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
continue
|
|
except PatchValidationFailure as e:
|
|
msg = "Patch validation failed for %s" % patch_id
|
|
if str(e) is not None and str(e) != '':
|
|
msg += ":\n%s" % str(e)
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
continue
|
|
except PatchFail:
|
|
msg = "Failed to import patch %s" % patch_id
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
|
|
continue
|
|
|
|
if ext != ".patch":
|
|
msg = "File must end in .patch extension: %s" \
|
|
% os.path.basename(patch)
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
continue
|
|
|
|
try:
|
|
thispatch = PatchFile.extract_patch(patch,
|
|
metadata_dir=avail_dir,
|
|
allpatches=self.patch_data,
|
|
base_pkgdata=self.base_pkgdata)
|
|
|
|
msg_info += "%s is now available\n" % patch_id
|
|
self.patch_data.add_patch(patch_id, thispatch)
|
|
|
|
self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE
|
|
if len(self.hosts) > 0:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.AVAILABLE
|
|
else:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
|
|
except PatchValidationFailure as e:
|
|
msg = "Patch validation failed for %s" % patch_id
|
|
if str(e) is not None and str(e) != '':
|
|
msg += ":\n%s" % str(e)
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
continue
|
|
except PatchFail:
|
|
msg = "Failed to import patch %s" % patch_id
|
|
LOG.exception(msg)
|
|
msg_error += msg + "\n"
|
|
continue
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_apply_api(self, patch_ids, **kwargs):
|
|
"""
|
|
Apply patches, moving patches from available to applied and updating repo
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
# Protect against duplications
|
|
patch_list = sorted(list(set(patch_ids)))
|
|
|
|
msg = "Applying patches: %s" % ",".join(patch_list)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if "--all" in patch_list:
|
|
# Set patch_ids to list of all available patches
|
|
# We're getting this list now, before we load the applied patches
|
|
patch_list = []
|
|
for patch_id in sorted(self.patch_data.metadata.keys()):
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
|
|
patch_list.append(patch_id)
|
|
|
|
if len(patch_list) == 0:
|
|
msg_info += "There are no available patches to be applied.\n"
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
repo_changed = False
|
|
|
|
# First, verify that all specified patches exist
|
|
id_verification = True
|
|
for patch_id in patch_list:
|
|
if patch_id not in self.patch_data.metadata:
|
|
msg = "Patch %s does not exist" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
id_verification = False
|
|
|
|
if not id_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Check for patches that can't be applied during an upgrade
|
|
upgrade_check = True
|
|
for patch_id in patch_list:
|
|
if self.patch_data.metadata[patch_id]["sw_version"] != SW_VERSION \
|
|
and self.patch_data.metadata[patch_id].get("apply_active_release_only") == "Y":
|
|
msg = "%s cannot be applied in an upgrade" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
upgrade_check = False
|
|
|
|
if not upgrade_check:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Next, check the patch dependencies
|
|
# required_patches will map the required patch to the patches that need it
|
|
required_patches = {}
|
|
for patch_id in patch_list:
|
|
for req_patch in self.patch_data.metadata[patch_id]["requires"]:
|
|
# Ignore patches in the op set
|
|
if req_patch in patch_list:
|
|
continue
|
|
|
|
if req_patch not in required_patches:
|
|
required_patches[req_patch] = []
|
|
|
|
required_patches[req_patch].append(patch_id)
|
|
|
|
# Now verify the state of the required patches
|
|
req_verification = True
|
|
for req_patch, iter_patch_list in required_patches.items():
|
|
if req_patch not in self.patch_data.metadata \
|
|
or self.patch_data.metadata[req_patch]["repostate"] == constants.AVAILABLE:
|
|
msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list)))
|
|
msg_error += msg + "\n"
|
|
LOG.info(msg)
|
|
req_verification = False
|
|
|
|
if not req_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
if kwargs.get("skip-semantic") != "yes":
|
|
self.run_semantic_check(constants.SEMANTIC_PREAPPLY, patch_list)
|
|
|
|
# Start applying the patches
|
|
for patch_id in patch_list:
|
|
msg = "Applying patch: %s" % patch_id
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED \
|
|
or self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
|
|
msg = "%s is already in the repo" % patch_id
|
|
LOG.info(msg)
|
|
msg_info += msg + "\n"
|
|
continue
|
|
|
|
# To allow for easy cleanup, we're going to first iterate
|
|
# through the rpm list to determine where to copy the file.
|
|
# As a second step, we'll go through the list and copy each file.
|
|
# If there are problems querying any RPMs, none will be copied.
|
|
rpmlist = {}
|
|
for rpmname in self.patch_data.contents[patch_id]:
|
|
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
|
|
|
|
rpmfile = self.get_store_filename(patch_sw_version, rpmname)
|
|
if not os.path.isfile(rpmfile):
|
|
msg = "Could not find rpm: %s" % rpmfile
|
|
LOG.error(msg)
|
|
raise RpmFail(msg)
|
|
|
|
repo_filename = self.get_repo_filename(patch_sw_version, rpmname)
|
|
if repo_filename is None:
|
|
msg = "Failed to determine repo path for %s" % rpmfile
|
|
LOG.exception(msg)
|
|
raise RpmFail(msg)
|
|
|
|
repo_pkg_dir = os.path.dirname(repo_filename)
|
|
if not os.path.exists(repo_pkg_dir):
|
|
os.makedirs(repo_pkg_dir)
|
|
rpmlist[rpmfile] = repo_filename
|
|
|
|
# Copy the RPMs. If a failure occurs, clean up copied files.
|
|
copied = []
|
|
for rpmfile in rpmlist:
|
|
LOG.info("Copy %s to %s", rpmfile, rpmlist[rpmfile])
|
|
try:
|
|
shutil.copy(rpmfile, rpmlist[rpmfile])
|
|
copied.append(rpmlist[rpmfile])
|
|
except IOError:
|
|
msg = "Failed to copy %s" % rpmfile
|
|
LOG.exception(msg)
|
|
# Clean up files
|
|
for filename in copied:
|
|
LOG.info("Cleaning up %s", filename)
|
|
os.remove(filename)
|
|
|
|
raise RpmFail(msg)
|
|
|
|
try:
|
|
# Move the metadata to the applied dir
|
|
shutil.move("%s/%s-metadata.xml" % (avail_dir, patch_id),
|
|
"%s/%s-metadata.xml" % (applied_dir, patch_id))
|
|
|
|
msg_info += "%s is now in the repo\n" % patch_id
|
|
except shutil.Error:
|
|
msg = "Failed to move the metadata for %s" % patch_id
|
|
LOG.exception(msg)
|
|
raise MetadataFail(msg)
|
|
|
|
self.patch_data.metadata[patch_id]["repostate"] = constants.APPLIED
|
|
if len(self.hosts) > 0:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_APPLY
|
|
else:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
|
|
|
|
self.hosts_lock.acquire()
|
|
self.interim_state[patch_id] = self.hosts.keys()
|
|
self.hosts_lock.release()
|
|
|
|
repo_changed = True
|
|
|
|
if repo_changed:
|
|
# Update the repo
|
|
self.patch_data.gen_groups_xml()
|
|
for ver, rdir in repo_dir.items():
|
|
try:
|
|
output = subprocess.check_output(["createrepo",
|
|
"--update",
|
|
"-g",
|
|
"comps.xml",
|
|
rdir],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Repo[%s] updated:\n%s", ver, output)
|
|
except subprocess.CalledProcessError:
|
|
msg = "Failed to update the repo for %s" % ver
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
else:
|
|
LOG.info("Repository is unchanged")
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_remove_api(self, patch_ids, **kwargs):
|
|
"""
|
|
Remove patches, moving patches from applied to available and updating repo
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
remove_unremovable = False
|
|
|
|
repo_changed = False
|
|
|
|
# Protect against duplications
|
|
patch_list = sorted(list(set(patch_ids)))
|
|
|
|
msg = "Removing patches: %s" % ",".join(patch_list)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if kwargs.get("removeunremovable") == "yes":
|
|
remove_unremovable = True
|
|
|
|
# First, verify that all specified patches exist
|
|
id_verification = True
|
|
for patch_id in patch_list:
|
|
if patch_id not in self.patch_data.metadata:
|
|
msg = "Patch %s does not exist" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
id_verification = False
|
|
|
|
if not id_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# See if any of the patches are marked as unremovable
|
|
unremovable_verification = True
|
|
for patch_id in patch_list:
|
|
if self.patch_data.metadata[patch_id].get("unremovable") == "Y":
|
|
if remove_unremovable:
|
|
msg = "Unremovable patch %s being removed" % patch_id
|
|
LOG.warning(msg)
|
|
msg_warning += msg + "\n"
|
|
else:
|
|
msg = "Patch %s is not removable" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
unremovable_verification = False
|
|
elif self.patch_data.metadata[patch_id]['repostate'] == constants.COMMITTED:
|
|
msg = "Patch %s is committed and cannot be removed" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
unremovable_verification = False
|
|
|
|
if not unremovable_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Next, see if any of the patches are required by applied patches
|
|
# required_patches will map the required patch to the patches that need it
|
|
required_patches = {}
|
|
for patch_iter in self.patch_data.metadata.keys():
|
|
# Ignore patches in the op set
|
|
if patch_iter in patch_list:
|
|
continue
|
|
|
|
# Only check applied patches
|
|
if self.patch_data.metadata[patch_iter]["repostate"] == constants.AVAILABLE:
|
|
continue
|
|
|
|
for req_patch in self.patch_data.metadata[patch_iter]["requires"]:
|
|
if req_patch not in patch_list:
|
|
continue
|
|
|
|
if req_patch not in required_patches:
|
|
required_patches[req_patch] = []
|
|
|
|
required_patches[req_patch].append(patch_iter)
|
|
|
|
if len(required_patches) > 0:
|
|
for req_patch, iter_patch_list in required_patches.items():
|
|
msg = "%s is required by: %s" % (req_patch, ", ".join(sorted(iter_patch_list)))
|
|
msg_error += msg + "\n"
|
|
LOG.info(msg)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
if kwargs.get("skipappcheck") != "yes":
|
|
# Check application dependencies before removing
|
|
required_patches = {}
|
|
for patch_id in patch_list:
|
|
for appname, iter_patch_list in self.app_dependencies.items():
|
|
if patch_id in iter_patch_list:
|
|
if patch_id not in required_patches:
|
|
required_patches[patch_id] = []
|
|
required_patches[patch_id].append(appname)
|
|
|
|
if len(required_patches) > 0:
|
|
for req_patch, app_list in required_patches.items():
|
|
msg = "%s is required by application(s): %s" % (req_patch, ", ".join(sorted(app_list)))
|
|
msg_error += msg + "\n"
|
|
LOG.info(msg)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
if kwargs.get("skip-semantic") != "yes":
|
|
self.run_semantic_check(constants.SEMANTIC_PREREMOVE, patch_list)
|
|
|
|
for patch_id in patch_list:
|
|
msg = "Removing patch: %s" % patch_id
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.AVAILABLE:
|
|
msg = "%s is not in the repo" % patch_id
|
|
LOG.info(msg)
|
|
msg_info += msg + "\n"
|
|
continue
|
|
|
|
repo_changed = True
|
|
|
|
for rpmname in self.patch_data.contents[patch_id]:
|
|
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
|
|
rpmfile = self.get_store_filename(patch_sw_version, rpmname)
|
|
if not os.path.isfile(rpmfile):
|
|
msg = "Could not find rpm: %s" % rpmfile
|
|
LOG.error(msg)
|
|
raise RpmFail(msg)
|
|
|
|
repo_filename = self.get_repo_filename(patch_sw_version, rpmname)
|
|
if repo_filename is None:
|
|
msg = "Failed to determine repo path for %s" % rpmfile
|
|
LOG.exception(msg)
|
|
raise RpmFail(msg)
|
|
|
|
try:
|
|
os.remove(repo_filename)
|
|
except OSError:
|
|
msg = "Failed to remove RPM"
|
|
LOG.exception(msg)
|
|
raise RpmFail(msg)
|
|
|
|
try:
|
|
# Move the metadata to the available dir
|
|
shutil.move("%s/%s-metadata.xml" % (applied_dir, patch_id),
|
|
"%s/%s-metadata.xml" % (avail_dir, patch_id))
|
|
msg_info += "%s has been removed from the repo\n" % patch_id
|
|
except shutil.Error:
|
|
msg = "Failed to move the metadata for %s" % patch_id
|
|
LOG.exception(msg)
|
|
raise MetadataFail(msg)
|
|
|
|
self.patch_data.metadata[patch_id]["repostate"] = constants.AVAILABLE
|
|
if len(self.hosts) > 0:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.PARTIAL_REMOVE
|
|
else:
|
|
self.patch_data.metadata[patch_id]["patchstate"] = constants.UNKNOWN
|
|
|
|
self.hosts_lock.acquire()
|
|
self.interim_state[patch_id] = self.hosts.keys()
|
|
self.hosts_lock.release()
|
|
|
|
if repo_changed:
|
|
# Update the repo
|
|
self.patch_data.gen_groups_xml()
|
|
for ver, rdir in repo_dir.items():
|
|
try:
|
|
output = subprocess.check_output(["createrepo",
|
|
"--update",
|
|
"-g",
|
|
"comps.xml",
|
|
rdir],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Repo[%s] updated:\n%s", ver, output)
|
|
except subprocess.CalledProcessError:
|
|
msg = "Failed to update the repo for %s" % ver
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
else:
|
|
LOG.info("Repository is unchanged")
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_delete_api(self, patch_ids):
|
|
"""
|
|
Delete patches
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
# Protect against duplications
|
|
patch_list = sorted(list(set(patch_ids)))
|
|
|
|
msg = "Deleting patches: %s" % ",".join(patch_list)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
# Verify patches exist and are in proper state first
|
|
id_verification = True
|
|
for patch_id in patch_list:
|
|
if patch_id not in self.patch_data.metadata:
|
|
msg = "Patch %s does not exist" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
id_verification = False
|
|
continue
|
|
|
|
# Get the aggregated patch state, if possible
|
|
patchstate = constants.UNKNOWN
|
|
if patch_id in self.patch_data.metadata:
|
|
patchstate = self.patch_data.metadata[patch_id]["patchstate"]
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] != constants.AVAILABLE or \
|
|
(patchstate != constants.AVAILABLE and patchstate != constants.UNKNOWN):
|
|
msg = "Patch %s not in Available state" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
id_verification = False
|
|
continue
|
|
|
|
if not id_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Handle operation
|
|
for patch_id in patch_list:
|
|
for rpmname in self.patch_data.contents[patch_id]:
|
|
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
|
|
rpmfile = self.get_store_filename(patch_sw_version, rpmname)
|
|
if not os.path.isfile(rpmfile):
|
|
# We're deleting the patch anyway, so the missing file
|
|
# doesn't really matter
|
|
continue
|
|
|
|
try:
|
|
os.remove(rpmfile)
|
|
except OSError:
|
|
msg = "Failed to remove RPM %s" % rpmfile
|
|
LOG.exception(msg)
|
|
raise RpmFail(msg)
|
|
|
|
for action in constants.SEMANTIC_ACTIONS:
|
|
action_file = os.path.join(semantics_dir, action, patch_id)
|
|
if not os.path.isfile(action_file):
|
|
continue
|
|
|
|
try:
|
|
os.remove(action_file)
|
|
except OSError:
|
|
msg = "Failed to remove semantic %s" % action_file
|
|
LOG.exception(msg)
|
|
raise SemanticFail(msg)
|
|
|
|
try:
|
|
# Delete the metadata
|
|
os.remove("%s/%s-metadata.xml" % (avail_dir, patch_id))
|
|
except OSError:
|
|
msg = "Failed to remove metadata for %s" % patch_id
|
|
LOG.exception(msg)
|
|
raise MetadataFail(msg)
|
|
|
|
self.patch_data.delete_patch(patch_id)
|
|
msg = "%s has been deleted" % patch_id
|
|
LOG.info(msg)
|
|
msg_info += msg + "\n"
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_init_release_api(self, release):
|
|
"""
|
|
Create an empty repo for a new release
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
msg = "Initializing repo for: %s" % release
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if release == SW_VERSION:
|
|
msg = "Rejected: Requested release %s is running release" % release
|
|
msg_error += msg + "\n"
|
|
LOG.info(msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Refresh data
|
|
self.base_pkgdata.loaddirs()
|
|
|
|
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
|
|
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
|
|
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
|
|
|
|
repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release)
|
|
|
|
# Verify the release doesn't already exist
|
|
if os.path.exists(repo_dir[release]):
|
|
msg = "Patch repository for %s already exists" % release
|
|
msg_info += msg + "\n"
|
|
LOG.info(msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Generate the groups xml
|
|
self.patch_data.gen_release_groups_xml(release)
|
|
|
|
# Create the repo
|
|
try:
|
|
output = subprocess.check_output(["createrepo",
|
|
"--update",
|
|
"-g",
|
|
"comps.xml",
|
|
repo_dir[release]],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Repo[%s] updated:\n%s", release, output)
|
|
except subprocess.CalledProcessError:
|
|
msg = "Failed to update the repo for %s" % release
|
|
LOG.exception(msg)
|
|
|
|
# Wipe out what was created
|
|
shutil.rmtree(repo_dir[release])
|
|
del repo_dir[release]
|
|
|
|
raise PatchFail(msg)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_del_release_api(self, release):
|
|
"""
|
|
Delete the repo and patches for second release
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
msg = "Deleting repo and patches for: %s" % release
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if release == SW_VERSION:
|
|
msg = "Rejected: Requested release %s is running release" % release
|
|
msg_error += msg + "\n"
|
|
LOG.info(msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Delete patch XML files
|
|
for patch_id in self.patch_data.metadata.keys():
|
|
if self.patch_data.metadata[patch_id]["sw_version"] != release:
|
|
continue
|
|
|
|
if self.patch_data.metadata[patch_id]["repostate"] == constants.APPLIED:
|
|
mdir = applied_dir
|
|
elif self.patch_data.metadata[patch_id]["repostate"] == constants.COMMITTED:
|
|
mdir = committed_dir
|
|
else:
|
|
mdir = avail_dir
|
|
|
|
for action in constants.SEMANTIC_ACTIONS:
|
|
action_file = os.path.join(semantics_dir, action, patch_id)
|
|
if not os.path.isfile(action_file):
|
|
continue
|
|
|
|
try:
|
|
os.remove(action_file)
|
|
except OSError:
|
|
msg = "Failed to remove semantic %s" % action_file
|
|
LOG.exception(msg)
|
|
raise SemanticFail(msg)
|
|
|
|
try:
|
|
# Delete the metadata
|
|
os.remove("%s/%s-metadata.xml" % (mdir, patch_id))
|
|
except OSError:
|
|
msg = "Failed to remove metadata for %s" % patch_id
|
|
LOG.exception(msg)
|
|
|
|
# Refresh patch data
|
|
self.patch_data = PatchData()
|
|
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
|
|
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
|
|
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
|
|
|
|
raise MetadataFail(msg)
|
|
|
|
# Delete the packages dir
|
|
package_dir[release] = "%s/%s" % (root_package_dir, release)
|
|
if os.path.exists(package_dir[release]):
|
|
try:
|
|
shutil.rmtree(package_dir[release])
|
|
except shutil.Error:
|
|
msg = "Failed to delete package dir for %s" % release
|
|
LOG.exception(msg)
|
|
|
|
del package_dir[release]
|
|
|
|
# Verify the release exists
|
|
repo_dir[release] = "%s/rel-%s" % (repo_root_dir, release)
|
|
if not os.path.exists(repo_dir[release]):
|
|
# Nothing to do
|
|
msg = "Patch repository for %s does not exist" % release
|
|
msg_info += msg + "\n"
|
|
LOG.info(msg)
|
|
del repo_dir[release]
|
|
|
|
# Refresh patch data
|
|
self.patch_data = PatchData()
|
|
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
|
|
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
|
|
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Delete the repo
|
|
try:
|
|
shutil.rmtree(repo_dir[release])
|
|
except shutil.Error:
|
|
msg = "Failed to delete repo for %s" % release
|
|
LOG.exception(msg)
|
|
|
|
del repo_dir[release]
|
|
|
|
if self.base_pkgdata is not None:
|
|
del self.base_pkgdata.pkgs[release]
|
|
|
|
# Refresh patch data
|
|
self.patch_data = PatchData()
|
|
self.patch_data.load_all_metadata(avail_dir, repostate=constants.AVAILABLE)
|
|
self.patch_data.load_all_metadata(applied_dir, repostate=constants.APPLIED)
|
|
self.patch_data.load_all_metadata(committed_dir, repostate=constants.COMMITTED)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_query_what_requires(self, patch_ids):
|
|
"""
|
|
Query the known patches to see which have dependencies on the specified patches
|
|
:return:
|
|
"""
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
msg = "Querying what requires patches: %s" % ",".join(patch_ids)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
# First, verify that all specified patches exist
|
|
id_verification = True
|
|
for patch_id in patch_ids:
|
|
if patch_id not in self.patch_data.metadata:
|
|
msg = "Patch %s does not exist" % patch_id
|
|
LOG.error(msg)
|
|
msg_error += msg + "\n"
|
|
id_verification = False
|
|
|
|
if not id_verification:
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
required_patches = {}
|
|
for patch_iter in self.patch_data.metadata.keys():
|
|
for req_patch in self.patch_data.metadata[patch_iter]["requires"]:
|
|
if req_patch not in patch_ids:
|
|
continue
|
|
|
|
if req_patch not in required_patches:
|
|
required_patches[req_patch] = []
|
|
|
|
required_patches[req_patch].append(patch_iter)
|
|
|
|
for patch_id in patch_ids:
|
|
if patch_id in required_patches:
|
|
iter_patch_list = required_patches[patch_id]
|
|
msg_info += "%s is required by: %s\n" % (patch_id, ", ".join(sorted(iter_patch_list)))
|
|
else:
|
|
msg_info += "%s is not required by any patches.\n" % patch_id
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def patch_sync(self):
|
|
# Increment the patch_op_counter here
|
|
self.inc_patch_op_counter()
|
|
|
|
self.patch_data_lock.acquire()
|
|
# self.patch_data.load_all()
|
|
self.check_patch_states()
|
|
self.patch_data_lock.release()
|
|
|
|
if self.sock_out is None:
|
|
return True
|
|
|
|
# Send the sync requests
|
|
|
|
self.controller_neighbours_lock.acquire()
|
|
for n in self.controller_neighbours:
|
|
self.controller_neighbours[n].clear_synced()
|
|
self.controller_neighbours_lock.release()
|
|
|
|
msg = PatchMessageSyncReq()
|
|
self.socket_lock.acquire()
|
|
msg.send(self.sock_out)
|
|
self.socket_lock.release()
|
|
|
|
# Now we wait, up to two mins... TODO: Wait on a condition
|
|
my_ip = cfg.get_mgmt_ip()
|
|
sync_rc = False
|
|
max_time = time.time() + 120
|
|
while time.time() < max_time:
|
|
all_done = True
|
|
self.controller_neighbours_lock.acquire()
|
|
for n in self.controller_neighbours:
|
|
if n != my_ip and not self.controller_neighbours[n].get_synced():
|
|
all_done = False
|
|
self.controller_neighbours_lock.release()
|
|
|
|
if all_done:
|
|
LOG.info("Sync complete")
|
|
sync_rc = True
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Send hellos to the hosts now, to get queries performed
|
|
hello_agent = PatchMessageHelloAgent()
|
|
self.socket_lock.acquire()
|
|
hello_agent.send(self.sock_out)
|
|
self.socket_lock.release()
|
|
|
|
if not sync_rc:
|
|
LOG.info("Timed out waiting for sync completion")
|
|
return sync_rc
|
|
|
|
def patch_query_cached(self, **kwargs):
|
|
query_state = None
|
|
if "show" in kwargs:
|
|
if kwargs["show"] == "available":
|
|
query_state = constants.AVAILABLE
|
|
elif kwargs["show"] == "applied":
|
|
query_state = constants.APPLIED
|
|
elif kwargs["show"] == "committed":
|
|
query_state = constants.COMMITTED
|
|
|
|
query_release = None
|
|
if "release" in kwargs:
|
|
query_release = kwargs["release"]
|
|
|
|
results = {}
|
|
self.patch_data_lock.acquire()
|
|
if query_state is None and query_release is None:
|
|
# Return everything
|
|
results = self.patch_data.metadata
|
|
else:
|
|
# Filter results
|
|
for patch_id, data in self.patch_data.metadata.items():
|
|
if query_state is not None and data["repostate"] != query_state:
|
|
continue
|
|
if query_release is not None and data["sw_version"] != query_release:
|
|
continue
|
|
results[patch_id] = data
|
|
self.patch_data_lock.release()
|
|
|
|
return results
|
|
|
|
def patch_query_specific_cached(self, patch_ids):
|
|
audit_log_info("Patch show")
|
|
|
|
results = {"metadata": {},
|
|
"contents": {},
|
|
"error": ""}
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
for patch_id in patch_ids:
|
|
if patch_id not in self.patch_data.metadata.keys():
|
|
results["error"] += "%s is unrecognized\n" % patch_id
|
|
|
|
for patch_id, data in self.patch_data.metadata.items():
|
|
if patch_id in patch_ids:
|
|
results["metadata"][patch_id] = data
|
|
for patch_id, data in self.patch_data.contents.items():
|
|
if patch_id in patch_ids:
|
|
results["contents"][patch_id] = data
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return results
|
|
|
|
def get_dependencies(self, patch_ids, recursive):
|
|
dependencies = set()
|
|
patch_added = False
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
# Add patches to workset
|
|
for patch_id in sorted(patch_ids):
|
|
dependencies.add(patch_id)
|
|
patch_added = True
|
|
|
|
while patch_added:
|
|
patch_added = False
|
|
for patch_id in sorted(dependencies):
|
|
for req in self.patch_data.metadata[patch_id]["requires"]:
|
|
if req not in dependencies:
|
|
dependencies.add(req)
|
|
patch_added = recursive
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return sorted(dependencies)
|
|
|
|
def patch_query_dependencies(self, patch_ids, **kwargs):
|
|
msg = "Patch query-dependencies %s" % patch_ids
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
failure = False
|
|
|
|
results = {"patches": [],
|
|
"error": ""}
|
|
|
|
recursive = False
|
|
if kwargs.get("recursive") == "yes":
|
|
recursive = True
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
# Verify patch IDs
|
|
for patch_id in sorted(patch_ids):
|
|
if patch_id not in self.patch_data.metadata.keys():
|
|
errormsg = "%s is unrecognized\n" % patch_id
|
|
LOG.info("patch_query_dependencies: %s", errormsg)
|
|
results["error"] += errormsg
|
|
failure = True
|
|
self.patch_data_lock.release()
|
|
|
|
if failure:
|
|
LOG.info("patch_query_dependencies failed")
|
|
return results
|
|
|
|
results["patches"] = self.get_dependencies(patch_ids, recursive)
|
|
|
|
return results
|
|
|
|
def patch_commit(self, patch_ids, dry_run=False):
|
|
msg = "Patch commit %s" % patch_ids
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
try:
|
|
if not os.path.exists(committed_dir):
|
|
os.makedirs(committed_dir)
|
|
except os.error:
|
|
msg = "Failed to create %s" % committed_dir
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
|
|
failure = False
|
|
recursive = True
|
|
|
|
keep = {}
|
|
cleanup = {}
|
|
cleanup_files = set()
|
|
|
|
results = {"info": "",
|
|
"error": ""}
|
|
|
|
# Ensure there are only REL patches
|
|
non_rel_list = []
|
|
self.patch_data_lock.acquire()
|
|
for patch_id in self.patch_data.metadata:
|
|
if self.patch_data.metadata[patch_id]['status'] != constants.STATUS_RELEASED:
|
|
non_rel_list.append(patch_id)
|
|
self.patch_data_lock.release()
|
|
|
|
if len(non_rel_list) > 0:
|
|
errormsg = "A commit cannot be performed with non-REL status patches in the system:\n"
|
|
for patch_id in non_rel_list:
|
|
errormsg += " %s\n" % patch_id
|
|
LOG.info("patch_commit rejected: %s", errormsg)
|
|
results["error"] += errormsg
|
|
return results
|
|
|
|
# Verify patch IDs
|
|
self.patch_data_lock.acquire()
|
|
for patch_id in sorted(patch_ids):
|
|
if patch_id not in self.patch_data.metadata.keys():
|
|
errormsg = "%s is unrecognized\n" % patch_id
|
|
LOG.info("patch_commit: %s", errormsg)
|
|
results["error"] += errormsg
|
|
failure = True
|
|
self.patch_data_lock.release()
|
|
|
|
if failure:
|
|
LOG.info("patch_commit: Failed patch ID check")
|
|
return results
|
|
|
|
commit_list = self.get_dependencies(patch_ids, recursive)
|
|
|
|
# Check patch states
|
|
avail_list = []
|
|
self.patch_data_lock.acquire()
|
|
for patch_id in commit_list:
|
|
if self.patch_data.metadata[patch_id]['patchstate'] != constants.APPLIED \
|
|
and self.patch_data.metadata[patch_id]['patchstate'] != constants.COMMITTED:
|
|
avail_list.append(patch_id)
|
|
self.patch_data_lock.release()
|
|
|
|
if len(avail_list) > 0:
|
|
errormsg = "The following patches are not applied and cannot be committed:\n"
|
|
for patch_id in avail_list:
|
|
errormsg += " %s\n" % patch_id
|
|
LOG.info("patch_commit rejected: %s", errormsg)
|
|
results["error"] += errormsg
|
|
return results
|
|
|
|
# Get list of packages
|
|
self.patch_data_lock.acquire()
|
|
for patch_id in commit_list:
|
|
patch_sw_version = self.patch_data.metadata[patch_id]["sw_version"]
|
|
|
|
if patch_sw_version not in keep:
|
|
keep[patch_sw_version] = {}
|
|
if patch_sw_version not in cleanup:
|
|
cleanup[patch_sw_version] = {}
|
|
|
|
for rpmname in self.patch_data.contents[patch_id]:
|
|
try:
|
|
pkgname, arch, pkgver = parse_rpm_filename(rpmname)
|
|
except ValueError as e:
|
|
self.patch_data_lock.release()
|
|
raise e
|
|
|
|
if pkgname not in keep[patch_sw_version]:
|
|
keep[patch_sw_version][pkgname] = {arch: pkgver}
|
|
continue
|
|
elif arch not in keep[patch_sw_version][pkgname]:
|
|
keep[patch_sw_version][pkgname][arch] = pkgver
|
|
continue
|
|
|
|
# Compare versions
|
|
keep_pkgver = keep[patch_sw_version][pkgname][arch]
|
|
if pkgver > keep_pkgver:
|
|
if pkgname not in cleanup[patch_sw_version]:
|
|
cleanup[patch_sw_version][pkgname] = {arch: [keep_pkgver]}
|
|
elif arch not in cleanup[patch_sw_version][pkgname]:
|
|
cleanup[patch_sw_version][pkgname][arch] = [keep_pkgver]
|
|
else:
|
|
cleanup[patch_sw_version][pkgname][arch].append(keep_pkgver)
|
|
|
|
# Find the rpmname
|
|
keep_rpmname = keep_pkgver.generate_rpm_filename(pkgname, arch)
|
|
|
|
store_filename = self.get_store_filename(patch_sw_version, keep_rpmname)
|
|
if store_filename is not None and os.path.exists(store_filename):
|
|
cleanup_files.add(store_filename)
|
|
|
|
repo_filename = self.get_repo_filename(patch_sw_version, keep_rpmname)
|
|
if repo_filename is not None and os.path.exists(repo_filename):
|
|
cleanup_files.add(repo_filename)
|
|
|
|
# Keep the new pkgver
|
|
keep[patch_sw_version][pkgname][arch] = pkgver
|
|
else:
|
|
# Put this pkg in the cleanup list
|
|
if pkgname not in cleanup[patch_sw_version]:
|
|
cleanup[patch_sw_version][pkgname] = {arch: [pkgver]}
|
|
elif arch not in cleanup[patch_sw_version][pkgname]:
|
|
cleanup[patch_sw_version][pkgname][arch] = [pkgver]
|
|
else:
|
|
cleanup[patch_sw_version][pkgname][arch].append(pkgver)
|
|
|
|
store_filename = self.get_store_filename(patch_sw_version, rpmname)
|
|
if store_filename is not None and os.path.exists(store_filename):
|
|
cleanup_files.add(store_filename)
|
|
|
|
repo_filename = self.get_repo_filename(patch_sw_version, rpmname)
|
|
if repo_filename is not None and os.path.exists(repo_filename):
|
|
cleanup_files.add(repo_filename)
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
# Calculate disk space
|
|
disk_space = 0
|
|
for rpmfile in cleanup_files:
|
|
statinfo = os.stat(rpmfile)
|
|
disk_space += statinfo.st_size
|
|
|
|
if dry_run:
|
|
results["info"] = "This commit operation would free %0.2f MiB" % (disk_space / (1024.0 * 1024.0))
|
|
return results
|
|
|
|
# Do the commit
|
|
|
|
# Move the metadata to the committed dir
|
|
for patch_id in commit_list:
|
|
metadata_fname = "%s-metadata.xml" % patch_id
|
|
applied_fname = os.path.join(applied_dir, metadata_fname)
|
|
committed_fname = os.path.join(committed_dir, metadata_fname)
|
|
if os.path.exists(applied_fname):
|
|
try:
|
|
shutil.move(applied_fname, committed_fname)
|
|
except shutil.Error:
|
|
msg = "Failed to move the metadata for %s" % patch_id
|
|
LOG.exception(msg)
|
|
raise MetadataFail(msg)
|
|
|
|
# Delete the files
|
|
for rpmfile in cleanup_files:
|
|
try:
|
|
os.remove(rpmfile)
|
|
except OSError:
|
|
msg = "Failed to remove: %s" % rpmfile
|
|
LOG.exception(msg)
|
|
raise MetadataFail(msg)
|
|
|
|
# Update the repo
|
|
self.patch_data.gen_groups_xml()
|
|
for ver, rdir in repo_dir.items():
|
|
try:
|
|
output = subprocess.check_output(["createrepo",
|
|
"--update",
|
|
"-g",
|
|
"comps.xml",
|
|
rdir],
|
|
stderr=subprocess.STDOUT)
|
|
LOG.info("Repo[%s] updated:\n%s", ver, output)
|
|
except subprocess.CalledProcessError:
|
|
msg = "Failed to update the repo for %s" % ver
|
|
LOG.exception(msg)
|
|
raise PatchFail(msg)
|
|
|
|
self.patch_data.load_all()
|
|
|
|
results["info"] = "The patches have been committed."
|
|
return results
|
|
|
|
def query_host_cache(self):
|
|
output = []
|
|
|
|
self.hosts_lock.acquire()
|
|
for nbr in self.hosts.keys():
|
|
host = self.hosts[nbr].get_dict()
|
|
host["interim_state"] = False
|
|
for patch_id in pc.interim_state.keys():
|
|
if nbr in pc.interim_state[patch_id]:
|
|
host["interim_state"] = True
|
|
|
|
output.append(host)
|
|
|
|
self.hosts_lock.release()
|
|
|
|
return output
|
|
|
|
def any_patch_host_installing(self):
|
|
rc = False
|
|
|
|
self.hosts_lock.acquire()
|
|
for host in self.hosts.values():
|
|
if host.state == constants.PATCH_AGENT_STATE_INSTALLING:
|
|
rc = True
|
|
break
|
|
|
|
self.hosts_lock.release()
|
|
|
|
return rc
|
|
|
|
def patch_host_install(self, host_ip, force, async_req=False):
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
ip = host_ip
|
|
|
|
self.hosts_lock.acquire()
|
|
# If not in hosts table, maybe a hostname was used instead
|
|
if host_ip not in self.hosts:
|
|
try:
|
|
ip = utils.gethostbyname(host_ip)
|
|
if ip not in self.hosts:
|
|
# Translated successfully, but IP isn't in the table.
|
|
# Raise an exception to drop out to the failure handling
|
|
raise PatchError("Host IP (%s) not in table" % ip)
|
|
except Exception:
|
|
self.hosts_lock.release()
|
|
msg = "Unknown host specified: %s" % host_ip
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in host-install: %s", msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
msg = "Running host-install for %s (%s), force=%s, async_req=%s" % (host_ip, ip, force, async_req)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
if self.allow_insvc_patching:
|
|
LOG.info("Allowing in-service patching")
|
|
force = True
|
|
|
|
self.hosts[ip].install_pending = True
|
|
self.hosts[ip].install_status = False
|
|
self.hosts[ip].install_reject_reason = None
|
|
self.hosts_lock.release()
|
|
|
|
installreq = PatchMessageAgentInstallReq()
|
|
installreq.ip = ip
|
|
installreq.force = force
|
|
installreq.encode()
|
|
self.socket_lock.acquire()
|
|
installreq.send(self.sock_out)
|
|
self.socket_lock.release()
|
|
|
|
if async_req:
|
|
# async_req install requested, so return now
|
|
msg = "Patch installation request sent to %s." % self.hosts[ip].hostname
|
|
msg_info += msg + "\n"
|
|
LOG.info("host-install async_req: %s", msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
# Now we wait, up to ten mins... TODO: Wait on a condition
|
|
resp_rx = False
|
|
max_time = time.time() + 600
|
|
while time.time() < max_time:
|
|
self.hosts_lock.acquire()
|
|
if ip not in self.hosts:
|
|
# The host aged out while we were waiting
|
|
self.hosts_lock.release()
|
|
msg = "Agent expired while waiting: %s" % ip
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in host-install: %s", msg)
|
|
break
|
|
|
|
if not self.hosts[ip].install_pending:
|
|
# We got a response
|
|
resp_rx = True
|
|
if self.hosts[ip].install_status:
|
|
msg = "Patch installation was successful on %s." % self.hosts[ip].hostname
|
|
msg_info += msg + "\n"
|
|
LOG.info("host-install: %s", msg)
|
|
elif self.hosts[ip].install_reject_reason:
|
|
msg = "Patch installation rejected by %s. %s" % (
|
|
self.hosts[ip].hostname,
|
|
self.hosts[ip].install_reject_reason)
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in host-install: %s", msg)
|
|
else:
|
|
msg = "Patch installation failed on %s." % self.hosts[ip].hostname
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in host-install: %s", msg)
|
|
|
|
self.hosts_lock.release()
|
|
break
|
|
|
|
self.hosts_lock.release()
|
|
|
|
time.sleep(0.5)
|
|
|
|
if not resp_rx:
|
|
msg = "Timeout occurred while waiting response from %s." % ip
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in host-install: %s", msg)
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def drop_host(self, host_ip, sync_nbr=True):
|
|
msg_info = ""
|
|
msg_warning = ""
|
|
msg_error = ""
|
|
|
|
ip = host_ip
|
|
|
|
self.hosts_lock.acquire()
|
|
# If not in hosts table, maybe a hostname was used instead
|
|
if host_ip not in self.hosts:
|
|
try:
|
|
# Because the host may be getting dropped due to deletion,
|
|
# we may be unable to do a hostname lookup. Instead, we'll
|
|
# iterate through the table here.
|
|
for host in self.hosts.keys():
|
|
if host_ip == self.hosts[host].hostname:
|
|
ip = host
|
|
break
|
|
|
|
if ip not in self.hosts:
|
|
# Translated successfully, but IP isn't in the table.
|
|
# Raise an exception to drop out to the failure handling
|
|
raise PatchError("Host IP (%s) not in table" % ip)
|
|
except Exception:
|
|
self.hosts_lock.release()
|
|
msg = "Unknown host specified: %s" % host_ip
|
|
msg_error += msg + "\n"
|
|
LOG.error("Error in drop-host: %s", msg)
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
msg = "Running drop-host for %s (%s)" % (host_ip, ip)
|
|
LOG.info(msg)
|
|
audit_log_info(msg)
|
|
|
|
del self.hosts[ip]
|
|
for patch_id in self.interim_state.keys():
|
|
if ip in self.interim_state[patch_id]:
|
|
self.interim_state[patch_id].remove(ip)
|
|
|
|
self.hosts_lock.release()
|
|
|
|
if sync_nbr:
|
|
sync_msg = PatchMessageDropHostReq()
|
|
sync_msg.ip = ip
|
|
self.socket_lock.acquire()
|
|
sync_msg.send(self.sock_out)
|
|
self.socket_lock.release()
|
|
|
|
return dict(info=msg_info, warning=msg_warning, error=msg_error)
|
|
|
|
def is_applied(self, patch_ids):
|
|
all_applied = True
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
for patch_id in patch_ids:
|
|
if patch_id not in self.patch_data.metadata:
|
|
all_applied = False
|
|
break
|
|
|
|
if self.patch_data.metadata[patch_id]["patchstate"] != constants.APPLIED:
|
|
all_applied = False
|
|
break
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return all_applied
|
|
|
|
def is_available(self, patch_ids):
|
|
all_available = True
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
for patch_id in patch_ids:
|
|
if patch_id not in self.patch_data.metadata:
|
|
all_available = False
|
|
break
|
|
|
|
if self.patch_data.metadata[patch_id]["patchstate"] != \
|
|
constants.AVAILABLE:
|
|
all_available = False
|
|
break
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return all_available
|
|
|
|
def report_app_dependencies(self, patch_ids, **kwargs):
|
|
"""
|
|
Handle report of application dependencies
|
|
"""
|
|
if "app" not in kwargs:
|
|
raise PatchInvalidRequest
|
|
|
|
appname = kwargs.get("app")
|
|
|
|
LOG.info("Handling app dependencies report: app=%s, patch_ids=%s",
|
|
appname, ','.join(patch_ids))
|
|
|
|
self.patch_data_lock.acquire()
|
|
|
|
if len(patch_ids) == 0:
|
|
if appname in self.app_dependencies:
|
|
del self.app_dependencies[appname]
|
|
else:
|
|
self.app_dependencies[appname] = patch_ids
|
|
|
|
try:
|
|
tmpfile, tmpfname = tempfile.mkstemp(
|
|
prefix=app_dependency_basename,
|
|
dir=constants.PATCH_STORAGE_DIR)
|
|
|
|
os.write(tmpfile, json.dumps(self.app_dependencies))
|
|
os.close(tmpfile)
|
|
|
|
os.rename(tmpfname, app_dependency_filename)
|
|
except Exception:
|
|
LOG.exception("Failed in report_app_dependencies")
|
|
raise PatchFail("Internal failure")
|
|
finally:
|
|
self.patch_data_lock.release()
|
|
|
|
return True
|
|
|
|
def query_app_dependencies(self):
|
|
"""
|
|
Query application dependencies
|
|
"""
|
|
self.patch_data_lock.acquire()
|
|
|
|
data = self.app_dependencies
|
|
|
|
self.patch_data_lock.release()
|
|
|
|
return dict(data)
|
|
|
|
|
|
# The wsgiref.simple_server module has an error handler that catches
|
|
# and prints any exceptions that occur during the API handling to stderr.
|
|
# This means the patching sys.excepthook handler that logs uncaught
|
|
# exceptions is never called, and those exceptions are lost.
|
|
#
|
|
# To get around this, we're subclassing the simple_server.ServerHandler
|
|
# in order to replace the handle_error method with a custom one that
|
|
# logs the exception instead, and will set a global flag to shutdown
|
|
# the server and reset.
|
|
#
|
|
class MyServerHandler(simple_server.ServerHandler):
|
|
def handle_error(self):
|
|
LOG.exception('An uncaught exception has occurred:')
|
|
if not self.headers_sent:
|
|
self.result = self.error_output(self.environ, self.start_response)
|
|
self.finish_response()
|
|
global keep_running
|
|
keep_running = False
|
|
|
|
|
|
def get_handler_cls():
|
|
cls = simple_server.WSGIRequestHandler
|
|
|
|
# old-style class doesn't support super
|
|
class MyHandler(cls, object):
|
|
def address_string(self):
|
|
# In the future, we could provide a config option to allow reverse DNS lookup
|
|
return self.client_address[0]
|
|
|
|
# Overload the handle function to use our own MyServerHandler
|
|
def handle(self):
|
|
"""Handle a single HTTP request"""
|
|
|
|
self.raw_requestline = self.rfile.readline()
|
|
if not self.parse_request(): # An error code has been sent, just exit
|
|
return
|
|
|
|
handler = MyServerHandler(
|
|
self.rfile, self.wfile, self.get_stderr(), self.get_environ()
|
|
)
|
|
handler.request_handler = self # pylint: disable=attribute-defined-outside-init
|
|
handler.run(self.server.get_app())
|
|
|
|
return MyHandler
|
|
|
|
|
|
class PatchControllerApiThread(threading.Thread):
|
|
def __init__(self):
|
|
threading.Thread.__init__(self)
|
|
self.wsgi = None
|
|
|
|
def run(self):
|
|
host = "127.0.0.1"
|
|
port = cfg.api_port
|
|
|
|
try:
|
|
# In order to support IPv6, server_class.address_family must be
|
|
# set to the correct address family. Because the unauthenticated
|
|
# API always uses IPv4 for the loopback address, the address_family
|
|
# variable cannot be set directly in the WSGIServer class, so a
|
|
# local subclass needs to be created for the call to make_server,
|
|
# where the correct address_family can be specified.
|
|
class server_class(simple_server.WSGIServer):
|
|
pass
|
|
|
|
server_class.address_family = socket.AF_INET
|
|
self.wsgi = simple_server.make_server(
|
|
host, port,
|
|
app.VersionSelectorApplication(),
|
|
server_class=server_class,
|
|
handler_class=get_handler_cls())
|
|
|
|
self.wsgi.socket.settimeout(api_socket_timeout)
|
|
global keep_running
|
|
while keep_running:
|
|
self.wsgi.handle_request()
|
|
|
|
# Call garbage collect after wsgi request is handled,
|
|
# to ensure any open file handles are closed in the case
|
|
# of an upload.
|
|
gc.collect()
|
|
except Exception:
|
|
# Log all exceptions
|
|
LOG.exception("Error occurred during request processing")
|
|
|
|
global thread_death
|
|
thread_death.set()
|
|
|
|
def kill(self):
|
|
# Must run from other thread
|
|
if self.wsgi is not None:
|
|
self.wsgi.shutdown()
|
|
|
|
|
|
class PatchControllerAuthApiThread(threading.Thread):
|
|
def __init__(self):
|
|
threading.Thread.__init__(self)
|
|
# LOG.info ("Initializing Authenticated API thread")
|
|
self.wsgi = None
|
|
|
|
def run(self):
|
|
host = CONF.auth_api_bind_ip
|
|
port = CONF.auth_api_port
|
|
if host is None:
|
|
host = utils.get_versioned_address_all()
|
|
try:
|
|
# Can only launch authenticated server post-config
|
|
while not os.path.exists('/etc/platform/.initial_config_complete'):
|
|
time.sleep(5)
|
|
|
|
# In order to support IPv6, server_class.address_family must be
|
|
# set to the correct address family. Because the unauthenticated
|
|
# API always uses IPv4 for the loopback address, the address_family
|
|
# variable cannot be set directly in the WSGIServer class, so a
|
|
# local subclass needs to be created for the call to make_server,
|
|
# where the correct address_family can be specified.
|
|
class server_class(simple_server.WSGIServer):
|
|
pass
|
|
|
|
server_class.address_family = utils.get_management_family()
|
|
self.wsgi = simple_server.make_server(
|
|
host, port,
|
|
auth_app.VersionSelectorApplication(),
|
|
server_class=server_class,
|
|
handler_class=get_handler_cls())
|
|
|
|
# self.wsgi.serve_forever()
|
|
self.wsgi.socket.settimeout(api_socket_timeout)
|
|
|
|
global keep_running
|
|
while keep_running:
|
|
self.wsgi.handle_request()
|
|
|
|
# Call garbage collect after wsgi request is handled,
|
|
# to ensure any open file handles are closed in the case
|
|
# of an upload.
|
|
gc.collect()
|
|
except Exception:
|
|
# Log all exceptions
|
|
LOG.exception("Authorized API failure: Error occurred during request processing")
|
|
|
|
def kill(self):
|
|
# Must run from other thread
|
|
if self.wsgi is not None:
|
|
self.wsgi.shutdown()
|
|
|
|
|
|
class PatchControllerMainThread(threading.Thread):
|
|
def __init__(self):
|
|
threading.Thread.__init__(self)
|
|
# LOG.info ("Initializing Main thread")
|
|
|
|
def run(self):
|
|
global pc
|
|
global thread_death
|
|
|
|
# LOG.info ("In Main thread")
|
|
|
|
try:
|
|
sock_in = pc.setup_socket()
|
|
|
|
while sock_in is None:
|
|
# Check every thirty seconds?
|
|
# Once we've got a conf file, tied into packstack,
|
|
# we'll get restarted when the file is updated,
|
|
# and this should be unnecessary.
|
|
time.sleep(30)
|
|
sock_in = pc.setup_socket()
|
|
|
|
# Ok, now we've got our socket. Let's start with a hello!
|
|
pc.socket_lock.acquire()
|
|
|
|
hello = PatchMessageHello()
|
|
hello.send(pc.sock_out)
|
|
|
|
hello_agent = PatchMessageHelloAgent()
|
|
hello_agent.send(pc.sock_out)
|
|
|
|
pc.socket_lock.release()
|
|
|
|
# Send hello every thirty seconds
|
|
hello_timeout = time.time() + 30.0
|
|
remaining = 30
|
|
|
|
agent_query_conns = []
|
|
|
|
while True:
|
|
# Check to see if any other thread has died
|
|
if thread_death.is_set():
|
|
LOG.info("Detected thread death. Terminating")
|
|
return
|
|
|
|
# Check for in-service patch restart flag
|
|
if os.path.exists(insvc_patch_restart_controller):
|
|
LOG.info("In-service patch restart flag detected. Exiting.")
|
|
global keep_running
|
|
keep_running = False
|
|
os.remove(insvc_patch_restart_controller)
|
|
return
|
|
|
|
inputs = [pc.sock_in] + agent_query_conns
|
|
outputs = []
|
|
|
|
# LOG.info("Running select, remaining=%d", remaining)
|
|
rlist, wlist, xlist = select.select(inputs, outputs, inputs, remaining)
|
|
|
|
if (len(rlist) == 0 and
|
|
len(wlist) == 0 and
|
|
len(xlist) == 0):
|
|
# Timeout hit
|
|
pc.audit_socket()
|
|
|
|
# LOG.info("Checking sockets")
|
|
for s in rlist:
|
|
data = ''
|
|
addr = None
|
|
msg = None
|
|
|
|
if s == pc.sock_in:
|
|
# Receive from UDP
|
|
pc.socket_lock.acquire()
|
|
data, addr = s.recvfrom(1024)
|
|
pc.socket_lock.release()
|
|
else:
|
|
# Receive from TCP
|
|
while True:
|
|
try:
|
|
packet = s.recv(1024)
|
|
except socket.error:
|
|
LOG.exception("Socket error on recv")
|
|
data = ''
|
|
break
|
|
|
|
if packet:
|
|
data += packet
|
|
|
|
if data == '':
|
|
break
|
|
try:
|
|
json.loads(data)
|
|
break
|
|
except ValueError:
|
|
# Message is incomplete
|
|
continue
|
|
else:
|
|
LOG.info('End of TCP message received')
|
|
break
|
|
|
|
if data == '':
|
|
# Connection dropped
|
|
agent_query_conns.remove(s)
|
|
s.close()
|
|
continue
|
|
|
|
# Get the TCP endpoint address
|
|
addr = s.getpeername()
|
|
|
|
msgdata = json.loads(data)
|
|
|
|
# For now, discard any messages that are not msgversion==1
|
|
if 'msgversion' in msgdata and msgdata['msgversion'] != 1:
|
|
continue
|
|
|
|
if 'msgtype' in msgdata:
|
|
if msgdata['msgtype'] == messages.PATCHMSG_HELLO:
|
|
msg = PatchMessageHello()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_ACK:
|
|
msg = PatchMessageHelloAck()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_REQ:
|
|
msg = PatchMessageSyncReq()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_SYNC_COMPLETE:
|
|
msg = PatchMessageSyncComplete()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_HELLO_AGENT_ACK:
|
|
msg = PatchMessageHelloAgentAck()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_QUERY_DETAILED_RESP:
|
|
msg = PatchMessageQueryDetailedResp()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_AGENT_INSTALL_RESP:
|
|
msg = PatchMessageAgentInstallResp()
|
|
elif msgdata['msgtype'] == messages.PATCHMSG_DROP_HOST_REQ:
|
|
msg = PatchMessageDropHostReq()
|
|
|
|
if msg is None:
|
|
msg = messages.PatchMessage()
|
|
|
|
msg.decode(msgdata)
|
|
if s == pc.sock_in:
|
|
msg.handle(pc.sock_out, addr)
|
|
else:
|
|
msg.handle(s, addr)
|
|
|
|
# We can drop the connection after a query response
|
|
if msg.msgtype == messages.PATCHMSG_QUERY_DETAILED_RESP and s != pc.sock_in:
|
|
agent_query_conns.remove(s)
|
|
s.shutdown(socket.SHUT_RDWR)
|
|
s.close()
|
|
|
|
while len(stale_hosts) > 0 and len(agent_query_conns) <= 5:
|
|
ip = stale_hosts.pop()
|
|
try:
|
|
agent_sock = socket.create_connection((ip, cfg.agent_port))
|
|
query = PatchMessageQueryDetailed()
|
|
query.send(agent_sock)
|
|
agent_query_conns.append(agent_sock)
|
|
except Exception:
|
|
# Put it back on the list
|
|
stale_hosts.append(ip)
|
|
|
|
remaining = int(hello_timeout - time.time())
|
|
if remaining <= 0 or remaining > 30:
|
|
hello_timeout = time.time() + 30.0
|
|
remaining = 30
|
|
|
|
pc.socket_lock.acquire()
|
|
|
|
hello = PatchMessageHello()
|
|
hello.send(pc.sock_out)
|
|
|
|
hello_agent = PatchMessageHelloAgent()
|
|
hello_agent.send(pc.sock_out)
|
|
|
|
pc.socket_lock.release()
|
|
|
|
# Age out neighbours
|
|
pc.controller_neighbours_lock.acquire()
|
|
nbrs = pc.controller_neighbours.keys()
|
|
for n in nbrs:
|
|
# Age out controllers after 2 minutes
|
|
if pc.controller_neighbours[n].get_age() >= 120:
|
|
LOG.info("Aging out controller %s from table", n)
|
|
del pc.controller_neighbours[n]
|
|
pc.controller_neighbours_lock.release()
|
|
|
|
pc.hosts_lock.acquire()
|
|
nbrs = pc.hosts.keys()
|
|
for n in nbrs:
|
|
# Age out hosts after 1 hour
|
|
if pc.hosts[n].get_age() >= 3600:
|
|
LOG.info("Aging out host %s from table", n)
|
|
del pc.hosts[n]
|
|
for patch_id in pc.interim_state.keys():
|
|
if n in pc.interim_state[patch_id]:
|
|
pc.interim_state[patch_id].remove(n)
|
|
|
|
pc.hosts_lock.release()
|
|
except Exception:
|
|
# Log all exceptions
|
|
LOG.exception("Error occurred during request processing")
|
|
thread_death.set()
|
|
|
|
|
|
def main():
|
|
configure_logging()
|
|
|
|
cfg.read_config()
|
|
|
|
# daemon.pidlockfile.write_pid_to_pidfile(pidfile_path)
|
|
|
|
global thread_death
|
|
thread_death = threading.Event()
|
|
|
|
# Set the TMPDIR environment variable to /scratch so that any modules
|
|
# that create directories with tempfile will not use /tmp
|
|
os.environ['TMPDIR'] = '/scratch'
|
|
|
|
global pc
|
|
pc = PatchController()
|
|
|
|
LOG.info("launching")
|
|
api_thread = PatchControllerApiThread()
|
|
auth_api_thread = PatchControllerAuthApiThread()
|
|
main_thread = PatchControllerMainThread()
|
|
|
|
api_thread.start()
|
|
auth_api_thread.start()
|
|
main_thread.start()
|
|
|
|
thread_death.wait()
|
|
global keep_running
|
|
keep_running = False
|
|
|
|
api_thread.join()
|
|
auth_api_thread.join()
|
|
main_thread.join()
|