Create wrapper to run commands and log structured output

This commit creates a class to run commands and functions
for the USM endpoints and capture the rc, stdout and
stderr from them, so that this information is logged into
a json file. The purpose of this file is to enable each
deployment history to be recovered in an easier way on
the future.

The json files are stored inside directories named under
the corresponding release, and have a file named under the
deployment stage in which the object was instantiated, e.g.:

/opt/software/summary/starlingx-24.03.0/deploy-precheck.json

These files can grow incrementally in case multiple commands
are executed under the same deployment stage.

The parts of the code, currently, that can benefit from the
implementation added by this commit will be changed in
follow-up commits.

Test Plan
PASS: manually replace subprocess functions on USM code, run
      the respective commands and verify:
      - Command is executed successfully
      - Output and behavior is maintained
      - The json file is created within the expected directory,
        with correct filename and content
PASS: execute the previous step multiple times with different
      commands and verify the json files are appended with the
      new operations

Story: 2010676
Task: 48955

Change-Id: Iccf1aef1b0cc064399163eeb58c23fa065a6dab5
Signed-off-by: Heitor Matsui <heitorvieira.matsui@windriver.com>
This commit is contained in:
Heitor Matsui 2023-10-11 09:10:49 -03:00
parent a77914160d
commit d6bb66a033
2 changed files with 248 additions and 0 deletions

View File

@ -111,3 +111,6 @@ LICENSE_FILE = "/etc/platform/.license"
VERIFY_LICENSE_BINARY = "/usr/bin/verify-license"
SOFTWARE_JSON_FILE = "/opt/software/software.json"
WORKER_SUMMARY_DIR = "%s/summary" % SOFTWARE_STORAGE_DIR
WORKER_DATETIME_FORMAT = "%Y%m%dT%H%M%S%f"

View File

@ -0,0 +1,245 @@
"""
Copyright (c) 2023 Wind River Systems, Inc.
SPDX-License-Identifier: Apache-2.0
"""
import asyncio
import json
import os
import re
import subprocess
from datetime import datetime
import software.constants as constants
class SoftwareWorker(object):
"""This class wraps the subprocess commands used by USM
modules to run a command with parameters and write its
return code, stdout, stderr and other useful information into
a structured json file that can be later recovered to create
a deployment summary report.
"""
def __init__(self, release, stage):
"""SoftwareWorker constructor
:param release: target release name, used to define
the directory in which json files will be created
:param stage: deployment stage which the commands
are being executed, used to define the json filename
"""
self._release = release
self._stage = stage
self._directory = os.path.join(constants.WORKER_SUMMARY_DIR, self._release)
os.makedirs(self._directory, exist_ok=True)
self._filename = os.path.join(self._directory, self._stage) + ".json"
operations = self._read_file()
self._run = str(SoftwareWorker._get_key(operations))
def _read_file(self):
"""Reads the file and returns its content in a dictionary.
:returns: dictionary loaded with content from json file
"""
try:
with open(self._filename, "r") as f:
return json.loads(f.read())
except (FileNotFoundError, json.decoder.JSONDecodeError):
return {}
def _write_file(self, operation, cmd, rc, output):
"""Writes the command in a structured format in the file.
:param cmd: command that was run via subprocess
:param rc: command return code
:param output: output (stdout + stderr) returned by the command
"""
operations = self._read_file()
command = SoftwareWorker._suppress_text(cmd)
if not isinstance(cmd, list):
command = [command]
with open(self._filename, "w") as f:
if self._run not in operations:
operations[self._run] = {}
operations[self._run][operation] = {
"timestamp": datetime.strftime(datetime.utcnow(),
constants.WORKER_DATETIME_FORMAT),
"command": " ".join(command),
"rc": rc,
"output": output,
}
f.write(json.dumps(operations))
async def _run_async(self, operation, cmd, *args, **kwargs):
"""Run a command with asyncio lib, which allows returning
a line-by-line output for stdout and stderr that is then
written to a json file.
:param operation: operation name written to json file
:param cmd: command to be executed in string format
:param args: list of arguments passed along with the command
:param kwargs: extra arguments to change the behavior of the output
:returns: instance of CompletedProcess object
"""
if "env" in kwargs:
env = kwargs["env"]
else:
env = {}
# concatenate params for shell command format
cmd_str = " ".join([cmd] + list(args))
# create process, capture output and wait it to end
if "shell" in kwargs and kwargs["shell"]:
process = await asyncio.create_subprocess_shell(
cmd_str,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env)
else:
process = await asyncio.create_subprocess_exec(
cmd,
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env)
stdout, stderr = await asyncio.gather(
SoftwareWorker._read_pipe(process.stdout, "stdout"),
SoftwareWorker._read_pipe(process.stderr, "stderr")
)
await process.wait()
# sort pipes by timestamp to write to json
rc = process.returncode
output = stdout + stderr
sorted_output = sorted(output, key=lambda item: list(item.values())[0])
self._write_file(operation, cmd_str, process.returncode, sorted_output)
# join stdout and stderr to return
stdout_str, stderr_str = SoftwareWorker._join_stdout_stderr(sorted_output)
# do some validation to simulate subprocess behavior
if "check" in kwargs and kwargs["check"]:
if rc != 0:
raise subprocess.CalledProcessError(cmd=cmd, returncode=rc,
output=stdout_str, stderr=stderr_str)
cp = subprocess.CompletedProcess(args=args, returncode=rc,
stdout=stdout_str, stderr=stderr_str)
return cp
def run(self, operation, cmd, *args, **kwargs):
"""Run the _run_async() method with asyncio.run()
to hide asyncio complexity details from the user.
:param operation: operation name written to json file
:param cmd: command to be run
:param args: extra arguments passes to the command
:param kwargs: extra keyword arguments
:returns: command output
"""
return asyncio.run(self._run_async(operation, cmd, *args, **kwargs))
def run_func(self, operation, function, *args, **kwargs):
"""Runs a function, capture its output and writes
to a json file.
:param operation: operation name written to json file
:param function: function to be executed
:param args: args to pass to function
:param kwargs: kwargs to pass to function
:returns: executed function return
"""
str_args = [str(arg) for arg in args]
str_kwargs = [str(arg) + "=" + str(kwargs[arg]) for arg in kwargs]
cmd = function.__name__ + "(" + ", ".join(str_args + str_kwargs) + ")"
msg = "'%s' executed " % function.__name__
ret, rc = None, 0
try:
ret = function(*args, **kwargs)
msg = msg + "with success: %s" % str(ret)
except Exception as e:
rc = 1
msg = msg + "with failure: %s" % str(e)
raise e
finally:
msg_type = "stdout" if rc == 0 else "stderr"
self._write_file(operation, cmd, rc, [{
"timestamp": datetime.strftime(datetime.utcnow(),
constants.WORKER_DATETIME_FORMAT),
"type": msg_type,
"output": msg
}])
return ret
@staticmethod
async def _read_pipe(stream, pipe):
"""Read an IO stream created by asyncio line-by-line
:param stream: stream of data to be read
:param pipe: type of the output (e.g. stdio)
:returns: list of dictionaries containing
each line of the stream marked
with date and type
"""
output_list = []
while True:
chunk = await stream.readline()
if len(chunk) == 0:
break
line = str(chunk.decode('utf-8'))
output_list.append({
"timestamp": datetime.strftime(datetime.utcnow(),
constants.WORKER_DATETIME_FORMAT),
"type": pipe,
"output": line
})
return output_list
@staticmethod
def _join_stdout_stderr(output_list):
"""Join a list of lines with two different types
:param output_list: list of lines to be merged
:returns: two strings, one with all stdio output
and one with all stderr output
"""
stderr, stdout = "", ""
for output in output_list:
if output["type"] == "stdout":
stdout += output["output"]
else:
stderr += output["output"]
return stdout, stderr
@staticmethod
def _get_key(d):
"""Receive a dictionary with integer keys
and return the next valid integer
:param d: dictionary with integer keys
:returns: next valid integer key
"""
if not d:
return 1
keys = sorted(list(d.keys()))
last = keys[-1]
return int(last) + 1
@staticmethod
def _suppress_text(_str):
"""Suppress a set of patterns from a string
:param _str: source string
:returns: suppressed string
"""
search_patterns = [
r".*(?:password|pass|pw)[= ]+(\S+)\s",
]
suppressed = _str
for sp in search_patterns:
match = re.match(sp, _str)
if match:
suppressed = suppressed.replace(match.group(1), "xxxxxxx")
return suppressed