software upload return uploaded file info

This commit is to add uploaded file info into 'software upload'
respond, in addition to existing respond messages.

This newly added uploaded file info is a list of dictionaries. Each
dictionary is taking the file name as the key and its value is a
dictionary in which id and sw_version are the keys.

    Uploaded File             Id        SW Version
=====================  ===============  ==========
23.09_ALL_NODES.patch  23.09_ALL_NODES    23.09

Test Plan:
PASS: uploaded iso, sig and patch file and got response

Story: 2010676
Task: 49241

Change-Id: I5332268d238aa9e787dea68f8a23c6be42feb4cd
Signed-off-by: junfeng-li <junfeng.li@windriver.com>
This commit is contained in:
junfeng-li 2023-12-12 19:57:34 +00:00
parent 8db54bdced
commit 8aed4785a3
4 changed files with 104 additions and 25 deletions

View File

@ -400,6 +400,25 @@ def release_upload_req(args):
print_result_debug(req)
else:
print_software_op_result(req)
data = json.loads(req.text)
data_list = [(k, v["id"], v["sw_version"])
for d in data["upload_info"] for k, v in d.items()]
header_data_list = ["Uploaded File", "Id", "SW Version"]
# Find the longest header string in each column
header_lengths = [len(str(x)) for x in header_data_list]
# Find the longest content string in each column
content_lengths = [max(len(str(x[i])) for x in data_list)
for i in range(len(header_data_list))]
# Find the max of the two for each column
col_lengths = [(x if x > y else y) for x, y in zip(header_lengths, content_lengths)]
print(' '.join(f"{x.center(col_lengths[i])}" for i, x in enumerate(header_data_list)))
print(' '.join('=' * length for length in col_lengths))
for item in data_list:
print(' '.join(f"{str(x).center(col_lengths[i])}" for i, x in enumerate(item)))
print("\n")
if check_rc(req) != 0:
# We hit a failure. Update rc but keep looping

View File

@ -41,10 +41,11 @@ from software.exceptions import ReleaseValidationFailure
from software.exceptions import ReleaseMismatchFailure
from software.exceptions import ReleaseIsoDeleteFailure
from software.software_functions import collect_current_load_for_hosts
from software.software_functions import parse_release_metadata
from software.software_functions import configure_logging
from software.software_functions import mount_iso_load
from software.software_functions import unmount_iso_load
from software.software_functions import read_upgrade_metadata
from software.software_functions import read_upgrade_support_versions
from software.software_functions import BasePackageData
from software.software_functions import PatchFile
from software.software_functions import package_dir
@ -988,6 +989,7 @@ class PatchController(PatchService):
local_info = ""
local_warning = ""
local_error = ""
release_meta_info = {}
iso_mount_dir = None
try:
@ -1007,7 +1009,7 @@ class PatchController(PatchService):
LOG.info("Mounted iso file %s to %s", iso_file, iso_mount_dir)
# Read the metadata from the iso file
to_release, supported_from_releases = read_upgrade_metadata(iso_mount_dir)
to_release, supported_from_releases = read_upgrade_support_versions(iso_mount_dir)
LOG.info("Reading metadata from iso file %s completed", iso_file)
# Validate that the current release is supported to upgrade to the new release
supported_versions = [v.get("version") for v in supported_from_releases]
@ -1049,6 +1051,19 @@ class PatchController(PatchService):
release_data.parse_metadata(abs_stx_release_metadata_file, state=constants.AVAILABLE)
LOG.info("Updated release metadata for %s", to_release)
# Get release metadata
all_release_meta_info = parse_release_metadata(abs_stx_release_metadata_file)
release_meta_info = {
os.path.basename(upgrade_files[constants.ISO_EXTENSION]): {
"id": all_release_meta_info.get("id"),
"sw_version": all_release_meta_info.get("sw_version"),
},
os.path.basename(upgrade_files[constants.SIG_EXTENSION]): {
"id": None,
"sw_version": None,
}
}
except ReleaseValidationFailure:
msg = "Upgrade file signature verification failed"
LOG.exception(msg)
@ -1063,7 +1078,7 @@ class PatchController(PatchService):
unmount_iso_load(iso_mount_dir)
LOG.info("Unmounted iso file %s", iso_file)
return local_info, local_warning, local_error
return local_info, local_warning, local_error, release_meta_info
def _process_upload_patch_files(self, patch_files):
'''
@ -1075,6 +1090,7 @@ class PatchController(PatchService):
local_info = ""
local_warning = ""
local_error = ""
upload_patch_info = []
try:
# Create the directories
for state_dir in constants.DEPLOY_STATE_METADATA_DIR:
@ -1086,10 +1102,13 @@ class PatchController(PatchService):
for patch_file in patch_files:
base_patch_filename = os.path.basename(patch_file)
# Get the release_id from the filename
# and check to see if it's already uploaded
# todo(abailey) We should not require the ID as part of the file
(release_id, _) = os.path.splitext(os.path.basename(patch_file))
(release_id, _) = os.path.splitext(base_patch_filename)
patch_metadata = self.release_data.metadata.get(release_id, None)
if patch_metadata:
@ -1155,7 +1174,14 @@ class PatchController(PatchService):
local_error += msg + "\n"
continue
return local_info, local_warning, local_error
upload_patch_info.append({
base_patch_filename: {
"id": release_id,
"sw_version": self.release_data.metadata[release_id].get("sw_version", None),
}
})
return local_info, local_warning, local_error, upload_patch_info
def software_release_upload(self, release_files):
"""
@ -1166,6 +1192,8 @@ class PatchController(PatchService):
msg_warning = ""
msg_error = ""
upload_info = []
# Refresh data, if needed
self.base_pkgdata.loaddirs()
@ -1185,26 +1213,30 @@ class PatchController(PatchService):
elif ext == constants.SIG_EXTENSION:
upgrade_files[constants.SIG_EXTENSION] = uploaded_file
else:
LOG.exception("The file extension is not supported. Supported extensions include .patch, .iso and .sig")
LOG.exception(
"The file extension is not supported. Supported extensions include .patch, .iso and .sig")
if len(upgrade_files) == 1: # Only one upgrade file uploaded
msg = "Missing upgrade file or signature file"
LOG.error(msg)
msg_error += msg + "\n"
elif len(upgrade_files) == 2: # Two upgrade files uploaded
tmp_info, tmp_warning, tmp_error = self._process_upload_upgrade_files(upgrade_files,
self.release_data)
tmp_info, tmp_warning, tmp_error, tmp_release_meta_info = self._process_upload_upgrade_files(
upgrade_files, self.release_data)
msg_info += tmp_info
msg_warning += tmp_warning
msg_error += tmp_error
upload_info.append(tmp_release_meta_info)
if len(patch_files) > 0:
tmp_info, tmp_warning, tmp_error = self._process_upload_patch_files(patch_files)
tmp_info, tmp_warning, tmp_error, tmp_patch_meta_info = self._process_upload_patch_files(
patch_files)
msg_info += tmp_info
msg_warning += tmp_warning
msg_error += tmp_error
upload_info += tmp_patch_meta_info
return dict(info=msg_info, warning=msg_warning, error=msg_error)
return dict(info=msg_info, warning=msg_warning, error=msg_error, upload_info=upload_info)
def release_apply_remove_order(self, release, running_sw_version, reverse=False):

View File

@ -1023,9 +1023,12 @@ def unmount_iso_load(iso_path):
pass
def read_upgrade_metadata(mounted_dir):
def read_upgrade_support_versions(mounted_dir):
"""
Read upgrade metadata file
Read upgrade metadata file to get supported upgrades
versions
:param mounted_dir: Mounted iso directory
:return: to_release, supported_from_releases
"""
try:
root = ElementTree.parse(mounted_dir + "/upgrades/metadata.xml").getroot()
@ -1107,3 +1110,17 @@ def collect_current_load_for_hosts():
LOG.info("Collect current load for hosts successfully.")
except Exception as err:
LOG.error("Error in collect current load for hosts: %s", err)
def parse_release_metadata(filename):
"""
Parse release metadata from xml file
:param filename: XML file
:return: dict of release metadata
"""
tree = ElementTree.parse(filename)
root = tree.getroot()
data = {}
for child in root:
data[child.tag] = child.text
return data

View File

@ -27,15 +27,17 @@ class TestSoftwareController(unittest.TestCase):
@patch('software.software_controller.mount_iso_load')
@patch('software.software_controller.shutil.copyfile')
@patch('software.software_controller.os.chmod')
@patch('software.software_controller.read_upgrade_metadata')
@patch('software.software_controller.read_upgrade_support_versions')
@patch('software.software_controller.subprocess.run')
@patch('software.software_controller.shutil.copytree')
@patch('software.software_controller.parse_release_metadata')
@patch('software.software_controller.unmount_iso_load')
def test_process_upload_upgrade_files(self,
mock_unmount_iso_load,
mock_parse_release_metadata,
mock_copytree, # pylint: disable=unused-argument
mock_run,
mock_read_upgrade_metadata,
mock_read_upgrade_support_versions,
mock_chmod, # pylint: disable=unused-argument
mock_copyfile, # pylint: disable=unused-argument
mock_mount_iso_load,
@ -47,20 +49,23 @@ class TestSoftwareController(unittest.TestCase):
# Mock the return values of the mocked functions
mock_verify_files.return_value = True
mock_mount_iso_load.return_value = '/test/iso'
mock_read_upgrade_metadata.return_value = ('2.0', [{'version': '1.0'}, {'version': '2.0'}])
mock_read_upgrade_support_versions.return_value = (
'2.0', [{'version': '1.0'}, {'version': '2.0'}])
mock_run.return_value.returncode = 0
mock_run.return_value.stdout = 'Load import successful'
mock_parse_release_metadata.return_value = {"id": 1, "sw_version": "2.0"}
# Call the function being tested
with patch('software.software_controller.SW_VERSION', '1.0'):
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
info, warning, error, release_meta_info = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
# Verify that the expected functions were called with the expected arguments
mock_verify_files.assert_called_once_with([self.upgrade_files[constants.ISO_EXTENSION]],
self.upgrade_files[constants.SIG_EXTENSION])
mock_mount_iso_load.assert_called_once_with(self.upgrade_files[constants.ISO_EXTENSION], constants.TMP_DIR)
mock_read_upgrade_metadata.assert_called_once_with('/test/iso')
mock_mount_iso_load.assert_called_once_with(
self.upgrade_files[constants.ISO_EXTENSION], constants.TMP_DIR)
mock_read_upgrade_support_versions.assert_called_once_with('/test/iso')
self.assertEqual(mock_run.call_args[0][0], [constants.LOCAL_LOAD_IMPORT_FILE,
"--from-release=1.0", "--to-release=2.0", "--iso-dir=/test/iso"])
@ -68,9 +73,14 @@ class TestSoftwareController(unittest.TestCase):
# Verify that the expected messages were returned
self.assertEqual(
info, 'iso and signature files uploaded completed\nImporting iso is in progress\nLoad import successful')
info,
'iso and signature files uploaded completed\nImporting iso is in progress\nLoad import successful')
self.assertEqual(warning, '')
self.assertEqual(error, '')
self.assertEqual(
release_meta_info,
{"test.iso": {"id": 1, "sw_version": "2.0"},
"test.sig": {"id": None, "sw_version": None}})
@patch('software.software_controller.PatchController.__init__', return_value=None)
@patch('software.software_controller.verify_files')
@ -90,8 +100,8 @@ class TestSoftwareController(unittest.TestCase):
# Call the function being tested
with patch('software.software_controller.SW_VERSION', '1.0'):
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
info, warning, error, _ = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
# Verify that the expected messages were returned
self.assertEqual(info, '')
@ -99,7 +109,8 @@ class TestSoftwareController(unittest.TestCase):
self.assertEqual(error, 'Upgrade file signature verification failed\n')
@patch('software.software_controller.PatchController.__init__', return_value=None)
@patch('software.software_controller.verify_files', side_effect=ReleaseValidationFailure('Invalid signature file'))
@patch('software.software_controller.verify_files',
side_effect=ReleaseValidationFailure('Invalid signature file'))
def test_process_upload_upgrade_files_validation_error(self,
mock_verify_files,
mock_init): # pylint: disable=unused-argument
@ -109,8 +120,8 @@ class TestSoftwareController(unittest.TestCase):
mock_verify_files.return_value = False
# Call the function being tested
info, warning, error = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
info, warning, error, _ = controller._process_upload_upgrade_files(self.upgrade_files, # pylint: disable=protected-access
controller.release_data)
# Verify that the expected messages were returned
self.assertEqual(info, '')