Merge "Avoid checking image uuid for firmware audit"

This commit is contained in:
Zuul 2022-01-18 18:13:20 +00:00 committed by Gerrit Code Review
commit d50ad87fe9
4 changed files with 108 additions and 18 deletions

View File

@ -699,6 +699,10 @@ class SysinvClient(base.DriverBase):
"""Get a list of device images."""
return self.sysinv_client.device_image.list()
def get_device_image(self, image_uuid):
"""Get device image from uuid."""
return self.sysinv_client.device_image.get(image_uuid)
def get_device_image_states(self):
"""Get a list of device image states."""
return self.sysinv_client.device_image_state.list()

View File

@ -30,9 +30,17 @@ LOG = logging.getLogger(__name__)
class FirmwareAuditData(object):
def __init__(self, uuid, applied, pci_vendor,
def __init__(self, bitstream_type, bitstream_id,
bmc, retimer_included,
key_signature, revoke_key_id,
applied, pci_vendor,
pci_device, applied_labels):
self.uuid = uuid
self.bitstream_type = bitstream_type
self.bitstream_id = bitstream_id
self.bmc = bmc
self.retimer_included = retimer_included
self.key_signature = key_signature
self.revoke_key_id = revoke_key_id
self.applied = applied
self.pci_vendor = pci_vendor
self.pci_device = pci_device
@ -40,7 +48,12 @@ class FirmwareAuditData(object):
def to_dict(self):
return {
'uuid': self.uuid,
'bitstream_type': self.bitstream_type,
'bitstream_id': self.bitstream_id,
'bmc': self.bmc,
'retimer_included': self.retimer_included,
'key_signature': self.key_signature,
'revoke_key_id': self.revoke_key_id,
'applied': self.applied,
'pci_vendor': self.pci_vendor,
'pci_device': self.pci_device,
@ -99,7 +112,12 @@ class FirmwareAudit(object):
# Filter images which have been applied on RegionOne
for image in local_device_images:
if image.applied:
filtered_images.append(FirmwareAuditData(image.uuid,
filtered_images.append(FirmwareAuditData(image.bitstream_type,
image.bitstream_id,
image.bmc,
image.retimer_included,
image.key_signature,
image.revoke_key_id,
image.applied,
image.pci_vendor,
image.pci_device,
@ -121,7 +139,23 @@ class FirmwareAudit(object):
return True
return False
def _check_image_match(self,
subcloud_image,
system_controller_image):
if ((system_controller_image.bitstream_type == consts.BITSTREAM_TYPE_ROOT_KEY and
system_controller_image.root_key == subcloud_image.root_key) or
(system_controller_image.bitstream_type == consts.BITSTREAM_TYPE_FUNCTIONAL and
system_controller_image.bitstream_id == subcloud_image.bitstream_id and
system_controller_image.bmc == subcloud_image.bmc and
system_controller_image.retimer_included == subcloud_image.retimer_included) or
(system_controller_image.bitstream_type == consts.BITSTREAM_TYPE_KEY_REVOCATION and
system_controller_image.revoked_key_ids == subcloud_image.revoked_key_ids)):
return True
return False
def _check_subcloud_device_has_image(self,
subcloud_name,
subcloud_sysinv_client,
image,
enabled_host_device_list,
subcloud_device_image_states,
@ -148,9 +182,11 @@ class FirmwareAudit(object):
label_key = list(image_label.keys())[0]
label_value = image_label.get(label_key)
is_device_eligible = \
self._check_for_label_match(subcloud_device_label_list,
device.uuid,
label_key, label_value)
self._check_for_label_match(
subcloud_device_label_list,
device.uuid,
label_key,
label_value)
# If device label matches any image label stop checking
# for any other label matches
if is_device_eligible:
@ -163,11 +199,21 @@ class FirmwareAudit(object):
if image.pci_vendor == device.pvendor_id and \
image.pci_device == device.pdevice_id:
device_image_state = None
subcloud_image = None
for device_image_state_obj in subcloud_device_image_states:
if device_image_state_obj.pcidevice_uuid == device.uuid \
and device_image_state_obj.image_uuid == image.uuid:
device_image_state = device_image_state_obj
break
if device_image_state_obj.pcidevice_uuid == device.uuid:
try:
subcloud_image = subcloud_sysinv_client.\
get_device_image(device_image_state_obj.image_uuid)
except Exception:
LOG.exception('Cannot retrieve device image for '
'subcloud: %s, skip firmware '
'audit' % subcloud_name)
return False
if self._check_image_match(subcloud_image, image):
device_image_state = device_image_state_obj
break
else:
# If no device image state is present in the list that
# means the image hasn't been applied yet
@ -255,7 +301,9 @@ class FirmwareAudit(object):
for image in audit_data:
# audit_data will be a dict from passing through RPC, so objectify
image = FirmwareAuditData.from_dict(image)
proceed = self._check_subcloud_device_has_image(image,
proceed = self._check_subcloud_device_has_image(subcloud_name,
sysinv_client,
image,
enabled_host_device_list,
subcloud_device_image_states,
subcloud_device_label_list)

View File

@ -254,3 +254,8 @@ EXTRA_ARGS_TO_VERSION = 'to-version'
EXTRA_ARGS_CERT_FILE = 'cert-file'
EXTRA_ARGS_EXPIRY_DATE = 'expiry-date'
EXTRA_ARGS_SUBJECT = 'subject'
# Device Image Bitstream Types
BITSTREAM_TYPE_ROOT_KEY = 'root-key'
BITSTREAM_TYPE_FUNCTIONAL = 'functional'
BITSTREAM_TYPE_KEY_REVOCATION = 'key-revocation'

View File

@ -58,9 +58,19 @@ class PCIDevice(object):
class DeviceImage(object):
def __init__(self, uuid, applied, pci_vendor,
pci_device, applied_labels):
self.uuid = uuid
def __init__(self, bitstream_type,
bitstream_id, bmc,
retimer_included,
key_signature,
revoke_key_id, applied,
pci_vendor, pci_device,
applied_labels):
self.bitstream_type = bitstream_type
self.bitstream_id = bitstream_id
self.bmc = bmc
self.retimer_included = retimer_included
self.key_signature = key_signature
self.revoke_key_id = revoke_key_id
self.applied = applied
self.pci_vendor = pci_vendor
self.pci_device = pci_device
@ -125,21 +135,36 @@ PCI_DEVICE4 = PCIDevice('06789e01-13b6-2347',
True)
# Device image has been applied
DEVICE_IMAGE1 = DeviceImage('04ae0e01-13b6-4105',
DEVICE_IMAGE1 = DeviceImage('functional',
'0x2383a62a010504',
True,
True,
'',
'',
True,
'1111',
'2222',
[{}])
# Device image has not been applied
DEVICE_IMAGE2 = DeviceImage('04ae0e01-13b6-4106',
DEVICE_IMAGE2 = DeviceImage('functional',
'0x2383a62a010504',
True,
True,
'',
'',
False,
'1111',
'2222',
[{}])
# Device image has been applied
DEVICE_IMAGE3 = DeviceImage('04ae0e01-13b6-4105',
DEVICE_IMAGE3 = DeviceImage('functional',
'0x2383a62a010504',
True,
True,
'',
'',
True,
'1111',
'2222',
@ -208,6 +233,7 @@ class FakeSysinvClientImageWithoutLabels(object):
self.session = session
self.endpoint = endpoint
self.device_images = [DEVICE_IMAGE1]
self.device_image = DEVICE_IMAGE1
self.pci_devices = [PCI_DEVICE2, PCI_DEVICE3]
self.hosts = [HOST1]
self.device_image_states = [DEVICE_IMAGE_STATE1]
@ -219,6 +245,9 @@ class FakeSysinvClientImageWithoutLabels(object):
def get_host_device_list(self, host_name):
return self.pci_devices
def get_device_image(self, device_image_uuid):
return self.device_image
def get_device_images(self):
return self.device_images
@ -290,6 +319,7 @@ class FakeSysinvClientImageWithLabels(object):
self.region = region
self.session = session
self.endpoint = endpoint
self.device_image = DEVICE_IMAGE3
self.device_images = [DEVICE_IMAGE3]
self.pci_devices = [PCI_DEVICE2, PCI_DEVICE3]
self.hosts = [HOST1]
@ -302,6 +332,9 @@ class FakeSysinvClientImageWithLabels(object):
def get_host_device_list(self, host_name):
return self.pci_devices
def get_device_image(self, device_image_uuid):
return self.device_image
def get_device_images(self):
return self.device_images