Merge "Implement new certificate APIs"

This commit is contained in:
Zuul 2024-04-17 21:55:09 +00:00 committed by Gerrit Code Review
commit b6682707ae
9 changed files with 472 additions and 24 deletions

View File

@ -116,3 +116,19 @@ SB_SUPPORTED_NETWORKS = {
UPGRADE_NOTIFICATION = 'System platform upgrade is in progress.\n' \
'The command may display the target configuration ' \
'that has not yet been applied to the host.'
EXPIRED = "--expired"
SOON_TO_EXPIRY = "--soon_to_expiry"
VALIDITY = "Validity"
NOT_BEFORE = "Not Before"
NOT_AFTER = "Not After"
RESIDUAL_TIME = "Residual Time"
NAMESPACE = "Namespace"
SECRET = "Secret"
RENEWAL = "Renewal"
SECRET_TYPE = "Secret Type"
FILEPATH = "File Path"
AUTOMATIC = "Automatic"
MANUAL = "Manual"
ISSUER = "Issuer"
SUBJECT = "Subject"

View File

@ -41,3 +41,23 @@ class CertificateManager(base.Manager):
path = self._path(uuid)
_, body = self.api.json_request('DELETE', path)
return body
def get_all_certs(self, expired=False, soon_to_expiry=None):
if expired:
path = f'{self._path("get_all_certs")}?expired=True'
elif soon_to_expiry:
path = f'{self._path("get_all_certs")}?soon_to_expiry={soon_to_expiry}'
else:
path = self._path("get_all_certs")
_, body = self.api.json_request('GET', path)
return body
def get_all_k8s_certs(self, expired=False, soon_to_expiry=None):
if expired:
path = f'{self._path("get_all_k8s_certs")}?expired=True'
elif soon_to_expiry:
path = f'{self._path("get_all_k8s_certs")}?soon_to_expiry={soon_to_expiry}'
else:
path = self._path("get_all_k8s_certs")
_, body = self.api.json_request('GET', path)
return body

View File

@ -9,6 +9,7 @@
#
import os
from cgtsclient.common import constants
from cgtsclient.common import utils
from cgtsclient import exc
@ -81,28 +82,6 @@ def _install_cert(cc, certificate_file, data):
'certificate %s' % certificate_file)
@utils.arg('certificate_uuid', metavar='<certificate_uuid>',
help="UUID of certificate")
def do_certificate_show(cc, args):
"""Show Certificate details."""
certificate = cc.certificate.get(args.certificate_uuid)
if certificate:
_print_certificate_show(certificate)
else:
print("No Certificates installed")
def do_certificate_list(cc, args):
"""List certificates."""
certificates = cc.certificate.list()
fields = ['uuid', 'certtype', 'expiry_date', 'subject']
field_labels = fields
for certificate in certificates:
if certificate.subject and len(certificate.subject) > 20:
certificate.subject = certificate.subject[:20] + "..."
utils.print_list(certificates, fields, field_labels, sortby=0)
@utils.arg('certificate_file',
metavar='<certificate_file>',
help='Path to Certificate file (PEM format) to install. '
@ -225,3 +204,80 @@ def do_ca_certificate_show(cc, args):
else:
print('No certificate of type "ssl_ca" is installed with '
'this uuid: %s' % (args.certificate_uuid))
def _print_certificate_list(certs_dict):
keys = [constants.RESIDUAL_TIME, constants.VALIDITY, constants.ISSUER,
constants.SUBJECT, constants.NAMESPACE, constants.SECRET,
constants.RENEWAL, constants.SECRET_TYPE, constants.FILEPATH]
for cert in sorted(certs_dict):
print("+------------------------------------------------------------+")
print(cert)
print("+------------------------------------------------------------+")
for key in keys:
val = certs_dict[cert].get(key)
if val:
if key == constants.VALIDITY:
issue_date = certs_dict[cert][constants.VALIDITY][constants.NOT_BEFORE]
expiry_date = certs_dict[cert][constants.VALIDITY][constants.NOT_AFTER]
print(f' Issue Date\t: {issue_date}')
print(f' Expiry Date\t: {expiry_date}')
continue
print(f" {key}\t: {val}")
print("+------------------------------------------------------------+")
@utils.arg(constants.EXPIRED, action='store_true',
help="to show the expired certificates")
@utils.arg(constants.SOON_TO_EXPIRY, metavar='<no of days to expiry>',
help="to show the certificates expiring in n days")
def do_certificate_list(cc, args):
"""List system certificates."""
certs = cc.certificate.get_all_certs(expired=args.expired,
soon_to_expiry=args.soon_to_expiry)
_print_certificate_list(certs)
@utils.arg(constants.EXPIRED, action='store_true',
help="to show the expired certificates")
@utils.arg(constants.SOON_TO_EXPIRY, metavar='<no of days to expiry>',
help="to show the certificates expiring in n days")
def do_k8s_certificate_list(cc, args):
"""List k8s certificates."""
certs = cc.certificate.get_all_k8s_certs(expired=args.expired,
soon_to_expiry=args.soon_to_expiry)
_print_certificate_list(certs)
def _print_certificate_details(cert_info, i=1):
s = " " * i
for key, val in cert_info.items():
if isinstance(val, dict):
print(f"{s}{key}:")
_print_certificate_details(val, i=i + 1)
continue
print(f"{s}{key}: {val}")
def _print_certificate(certificate, args):
if certificate:
print("Certificate:")
_print_certificate_details(certificate)
else:
print(f"No Certificate exist with name {args.certificate_name}")
@utils.arg('certificate_name', metavar='<certificate_name>',
help="name of certificate")
def do_certificate_show(cc, args):
"""Show certificate details."""
certificate = cc.certificate.get_all_certs().get(args.certificate_name, None)
_print_certificate(certificate, args)
@utils.arg('certificate_name', metavar='<certificate_name>',
help="name of certificate")
def do_k8s_certificate_show(cc, args):
"""Show certificate details."""
certificate = cc.certificate.get_all_k8s_certs().get(args.certificate_name, None)
_print_certificate(certificate, args)

View File

@ -185,7 +185,9 @@ class CertificateController(rest.RestController):
"""REST controller for certificates."""
_custom_actions = {'certificate_install': ['POST'],
'certificate_renew': ['POST']}
'certificate_renew': ['POST'],
'get_all_certs': ['GET'],
'get_all_k8s_certs': ['GET']}
def __init__(self):
self._api_token = None
@ -601,6 +603,36 @@ class CertificateController(rest.RestController):
return Certificate.convert_with_links(certificate)
@expose('json')
@cutils.synchronized(LOCK_NAME)
def get_all_certs(self, expired=False, soon_to_expiry=None):
cert_data = pecan.request.rpcapi.get_all_certs(pecan.request.context)
return self._get_cert_data(cert_data, expired, soon_to_expiry)
@expose('json')
@cutils.synchronized(LOCK_NAME)
def get_all_k8s_certs(self, expired=False, soon_to_expiry=None):
cert_data = pecan.request.rpcapi.get_all_k8s_certs(pecan.request.context)
return self._get_cert_data(cert_data, expired, soon_to_expiry)
@staticmethod
def _get_cert_data(cert_data, expired, soon_to_expiry):
expired_certs = {}
if expired:
for key, val in cert_data.items():
no_of_days = int(val[constants.RESIDUAL_TIME].split('d')[0])
if no_of_days < 0:
expired_certs[key] = val
return expired_certs
soon_to_expiry_certs = {}
if soon_to_expiry:
for key, val in cert_data.items():
no_of_days = int(val[constants.RESIDUAL_TIME].split('d')[0])
if 0 <= no_of_days <= int(soon_to_expiry):
soon_to_expiry_certs[key] = val
return soon_to_expiry_certs
return cert_data
def _check_endpoint_domain_exists():
# Check that public endpoint FQDN is configured

View File

@ -2512,3 +2512,17 @@ MGMT_IPSEC_DISABLED = 'disabled'
# If True, makes outputs compatible with single stack versions of ansible-playbooks and stx-puppet.
# Shall be removed when the other projects are updated.
DUAL_STACK_COMPATIBILITY_MODE = True
# certificate constants
VALIDITY = "Validity"
NOT_BEFORE = "Not Before"
NOT_AFTER = "Not After"
RESIDUAL_TIME = "Residual Time"
NAMESPACE = "Namespace"
SECRET = "Secret"
RENEWAL = "Renewal"
SECRET_TYPE = "Secret Type"
FILEPATH = "File Path"
AUTOMATIC = "Automatic"
MANUAL = "Manual"
ISSUER = "Issuer"

View File

@ -1497,7 +1497,7 @@ class KubeOperator(object):
def get_cert_secret(self, name, namespace, max_retries=60):
for _ in range(max_retries):
secret = self.kube_get_secret(name, NAMESPACE_DEPLOYMENT)
secret = self.kube_get_secret(name, namespace)
if secret is not None and secret.data.get("tls.crt"):
LOG.debug("secret = %s" % secret)
return secret

View File

@ -3837,3 +3837,147 @@ def update_config_file(config_filepath: str, values_to_update: list):
lines.append(key_value)
with open(config_filepath, 'w') as file:
file.writelines(lines)
def get_cert_values(cert_obj):
data = {}
x509v3_extn = "X509v3 extensions"
critical = "critical"
data[constants.RESIDUAL_TIME] = "{}d".format(
(cert_obj.not_valid_after - datetime.datetime.now()).days)
data["Version"] = cert_obj.version.name
data["Serial Number"] = hex(cert_obj.serial_number)
data["Issuer"] = cert_obj.issuer.rfc4514_string()
data[constants.VALIDITY] = {}
data[constants.VALIDITY][constants.NOT_BEFORE] = cert_obj.not_valid_before.strftime(
'%B %d %H:%M:%S %Y')
data[constants.VALIDITY][constants.NOT_AFTER] = cert_obj.not_valid_after.strftime(
'%B %d %H:%M:%S %Y')
data["Subject"] = cert_obj.subject.rfc4514_string()
if hasattr(cert_obj.public_key(), 'key_size'):
pub_key_info = {}
key_size = cert_obj.public_key().key_size
pub_key_info['key_size'] = f"({key_size} bit)"
data["Subject Public Key Info"] = pub_key_info
data[x509v3_extn] = {}
for ext in cert_obj.extensions:
ext_value = ext.value
if isinstance(ext_value, x509.extensions.KeyUsage):
ext_name = "X509v3 Key Usage"
data[x509v3_extn][ext_name] = {}
value = ""
if ext_value.digital_signature:
value = f"{value}Digital Signature"
if ext_value.key_encipherment:
value = f"{value}, Key Encipherment"
if ext_value.content_commitment:
value = f"{value}, Content Commitment"
if ext_value.data_encipherment:
value = f"{value}, Data Encipherment"
if ext_value.key_agreement:
value = f"{value}, Key Agreement"
if ext_value.crl_sign:
value = f"{value}, CRL Sign" if value else f"{value}CRL Sign"
if value:
data[x509v3_extn][ext_name]["values"] = value
if ext.critical:
data[x509v3_extn][ext_name][critical] = ext.critical
elif isinstance(ext_value, x509.extensions.BasicConstraints):
ext_name = "X509v3 Basic Constraints"
data[x509v3_extn][ext_name] = {}
data[x509v3_extn][ext_name]["CA"] = ext_value.ca
if ext.critical:
data[x509v3_extn][ext_name][critical] = ext.critical
elif isinstance(ext_value, x509.extensions.AuthorityKeyIdentifier):
identifier = {}
if hasattr(ext_value, 'key_identifier'):
identifier["keyid"] = ext_value.key_identifier.hex()
if ext.critical:
identifier[critical] = ext.critical
if identifier:
data[x509v3_extn]["X509v3 Authority Key Identifier"] = identifier
elif isinstance(ext_value, x509.extensions.SubjectKeyIdentifier):
identifier = {}
if hasattr(ext_value, 'key_identifier'):
identifier["keyid"] = ext_value.key_identifier.hex()
if ext.critical:
identifier[critical] = ext.critical
if identifier:
data[x509v3_extn]["X509v3 Subject Key Identifier"] = identifier
elif isinstance(ext_value, x509.extensions.SubjectAlternativeName):
ext_name = "X509v3 Subject Alternative Name"
data[x509v3_extn][ext_name] = {}
dns_names = get_cert_DNSNames(cert_obj)
ip_addresses = get_cert_IPAddresses(cert_obj)
if dns_names:
data[x509v3_extn][ext_name]["DNS"] = get_cert_DNSNames(cert_obj)
if ip_addresses:
data[x509v3_extn][ext_name]["IP Address"] = get_cert_IPAddresses(cert_obj)
data["Signature Algorithm"] = getattr(cert_obj.signature_algorithm_oid, '_name')
data["Signature"] = cert_obj.signature.hex()
return data
def get_secrets_info(secrets_list=None):
kube_operator = kubernetes.KubeOperator()
certificates = kube_operator.list_custom_resources("cert-manager.io", "v1", "certificates")
certs_secrets_list = [cert["spec"]["secretName"] for cert in certificates]
k8s_secrets = []
if secrets_list:
if not isinstance(secrets_list, list):
secrets_list = [secrets_list, ]
for secret, ns in secrets_list:
secret_obj = kube_operator.kube_get_secret(secret, ns)
if secret_obj:
k8s_secrets.append(secret_obj)
else:
opaque_secrets = kube_operator.kube_list_secret_for_all_namespaces(selector='type=Opaque')
tls_secrets = kube_operator.kube_list_secret_for_all_namespaces(
selector='type=kubernetes.io/tls')
k8s_secrets = opaque_secrets + tls_secrets
cert_suf = ("cert", "crt", "ca", "pem", "cer")
certs_info = {}
for secret in k8s_secrets:
secret_name = secret.metadata.name
if secret_name == "kubeadm-certs":
continue
ns = secret.metadata.namespace
secret_type = secret.type
renewal = "Manual"
if secret_name in certs_secrets_list:
renewal = "Automatic"
if secret_type == "Opaque":
for key, val in secret.data.items():
# exception for cm-cert-manager-webhook-ca opaque secret
if secret_name == "cm-cert-manager-webhook-ca":
renewal = "Automatic"
# list elastic-services,kibana cert from "mon-elastic-services-secrets" secret
if secret_name == "mon-elastic-services-secrets":
if key not in ["ext-elastic-services.crt", "kibana.crt", "ca.crt", "ext-ca.crt"]:
continue
if key.endswith(cert_suf) and val:
cert_name = f"{secret_name}/{key}"
crt = base64.decode_as_bytes(val)
cert_obj = extract_certs_from_pem(crt)[0]
certs_info[cert_name] = get_cert_values(cert_obj)
certs_info[cert_name][constants.NAMESPACE] = ns
certs_info[cert_name][constants.SECRET] = secret_name
certs_info[cert_name][constants.RENEWAL] = renewal
certs_info[cert_name][constants.SECRET_TYPE] = secret_type
elif secret_type == "kubernetes.io/tls":
# exception for sc-adminep-ca-certificate tls secret as there is no
# corresponding certificate exist.
if secret_name == "sc-adminep-ca-certificate":
renewal = "Automatic"
cert_name = secret_name
crt = base64.decode_as_bytes(secret.data.get('tls.crt'))
cert_obj = extract_certs_from_pem(crt)[0]
certs_info[cert_name] = get_cert_values(cert_obj)
certs_info[cert_name][constants.NAMESPACE] = ns
certs_info[cert_name][constants.SECRET] = secret_name
certs_info[cert_name][constants.RENEWAL] = renewal
certs_info[cert_name][constants.SECRET_TYPE] = secret_type
LOG.debug(certs_info)
return certs_info

View File

@ -19182,6 +19182,158 @@ class ConductorManager(service.PeriodicService):
def _audit_prune_stale_backup_alarms(self, context):
self._prune_stale_backup_alarms(context)
def get_all_certs(self, context):
"""
list all the platform certificates with the all the certificate values
residual time, issue date, expiry date, issuer, subject, namespace,
secret, renewal and secret type
"""
certs = [("ssl", constants.MANUAL, constants.SSL_PEM_FILE),
("docker_registry", constants.MANUAL, constants.DOCKER_REGISTRY_CERT_FILE),
(constants.OPENLDAP_CERT_SECRET_NAME, constants.MANUAL,
"/etc/ldap/certs/openldap-cert.crt"),
("dc-adminep-root-ca", constants.AUTOMATIC, constants.DC_ROOT_CA_CERT_PATH),
("dc-adminep-server", constants.AUTOMATIC, constants.ADMIN_EP_CERT_FILENAME),
("openstack", constants.MANUAL, constants.OPENSTACK_CERT_FILE),
("openstack_ca", constants.MANUAL, constants.OPENSTACK_CERT_CA_FILE),
("etcd-ca", constants.MANUAL, constants.ETCD_ROOTCA_FILE),
("etcd-client", constants.AUTOMATIC, "/etc/etcd/etcd-client.crt"),
("etcd-server", constants.AUTOMATIC, "/etc/etcd/etcd-server.crt"),
("apiserver-etcd-client", constants.AUTOMATIC,
"/etc/kubernetes/pki/apiserver-etcd-client.crt"),
("kubelet-client", constants.AUTOMATIC, "/var/lib/kubelet/pki/kubelet-client-current.pem"),
("kubernetes-root-ca", constants.MANUAL, constants.KUBERNETES_ROOTCA_FILE),
("apiserver", constants.AUTOMATIC, "/etc/kubernetes/pki/apiserver.crt"),
("apiserver-kubelet-client", constants.AUTOMATIC,
"/etc/kubernetes/pki/apiserver-kubelet-client.crt"),
("front-proxy-client", constants.AUTOMATIC, "/etc/kubernetes/pki/front-proxy-client.crt"),
("front-proxy-ca", constants.AUTOMATIC, "/etc/kubernetes/pki/front-proxy-ca.crt")]
kube_operator = kubernetes.KubeOperator()
certificates = kube_operator.list_custom_resources("cert-manager.io", "v1", "certificates")
k8s_secrets_list = [cert["spec"]["secretName"] for cert in certificates]
certs_info = {}
ssl_ca_path = constants.SSL_CERT_CA_LIST_SHARED_DIR
for cert in os.listdir(ssl_ca_path):
certs.append((cert, constants.MANUAL, os.path.join(ssl_ca_path, cert)))
for cert_name, renewal, cert_path in certs:
if not os.path.exists(cert_path):
continue
cert_obj = cutils.get_certificate_from_file(cert_path)
certs_info[cert_name] = cutils.get_cert_values(cert_obj)
certs_info[cert_name][constants.FILEPATH] = cert_path
certs_info[cert_name][constants.RENEWAL] = renewal
for secret in [constants.RESTAPI_CERT_SECRET_NAME,
constants.REGISTRY_CERT_SECRET_NAME,
constants.OPENLDAP_CERT_SECRET_NAME]:
ns = constants.CERT_NAMESPACE_PLATFORM_CERTS
if kube_operator.kube_get_secret(secret, ns):
if secret == constants.RESTAPI_CERT_SECRET_NAME:
certs_info[secret] = certs_info["ssl"]
del certs_info["ssl"]
elif secret == constants.REGISTRY_CERT_SECRET_NAME:
certs_info[secret] = certs_info["docker_registry"]
del certs_info["docker_registry"]
certs_info[secret][constants.NAMESPACE] = ns
certs_info[secret][constants.SECRET] = secret
if secret in k8s_secrets_list:
certs_info[secret][constants.RENEWAL] = constants.AUTOMATIC
secrets = []
# oidc app certs
oidc_ns = "kube-system"
app_name = "oidc-auth-apps"
try:
app = kubeapp_obj.get_by_name(context, app_name)
oidc_client_db_chart = objects.helm_overrides.get_by_appid_name(context, app.id,
"oidc-client", oidc_ns)
dex_db_chart = objects.helm_overrides.get_by_appid_name(context, app.id, "dex", oidc_ns)
if oidc_client_db_chart.user_overrides and dex_db_chart.user_overrides:
client_user_overrides = yaml.load(oidc_client_db_chart.user_overrides)
dex_user_overrides = yaml.load(dex_db_chart.user_overrides)
oidc_ca_issuer = None
if "issuer_root_ca_secret" in client_user_overrides["config"]:
oidc_ca_issuer = client_user_overrides["config"]["issuer_root_ca_secret"]
secrets.append((oidc_ca_issuer, oidc_ns))
if "volumes" in dex_user_overrides:
for entry in dex_user_overrides["volumes"]:
secrets.append((entry["secret"]["secretName"], oidc_ns))
except exception.KubeAppNotFound:
LOG.info("%s app not present" % app_name)
# system-local-ca secret
secrets.append(("system-local-ca", "cert-manager"))
# WRA secrets
wra_ca_secrets = ["mon-elastic-services-ca-crt", "mon-elastic-services-extca-crt"]
wra_ns = "monitor"
wra_elastic_svc_secret = "mon-elastic-services-secrets"
secrets.append((wra_elastic_svc_secret, wra_ns))
wra_secrets = cutils.get_secrets_info(secrets)
for ca_secret in wra_ca_secrets:
if ca_secret in k8s_secrets_list:
if ca_secret == "mon-elastic-services-ca-crt":
key = f"{wra_elastic_svc_secret}/ca.crt"
elif ca_secret == "mon-elastic-services-extca-crt":
key = f"{wra_elastic_svc_secret}/ext-ca.crt"
if key in wra_secrets:
wra_secrets[key][constants.RENEWAL] = constants.AUTOMATIC
certs_info.update(wra_secrets)
# dc endpoint certificates
system = self.dbapi.isystem_get_one()
system_dc_role = system.get('distributed_cloud_role', None)
if system_dc_role:
if system_dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
ca_cert = "dc-adminep-root-ca-certificate"
server_cert = "dc-adminep-certificate"
ns = "dc-cert"
elif system_dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
ca_cert = "sc-adminep-root-ca-certificate"
server_cert = "sc-adminep-certificate"
ns = "sc-cert"
certs_info[ca_cert] = certs_info["dc-adminep-root-ca"]
certs_info[server_cert] = certs_info["dc-adminep-server"]
# ns,secret only applies to systemcontroller for "dc-adminep-root-ca-certificate" as there is
# a corresponding secret, on subcloud there is no "sc-adminep-root-ca-certificate" secret, it
# is derived from file path
if system_dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
certs_info[ca_cert][constants.NAMESPACE] = ns
certs_info[ca_cert][constants.SECRET] = ca_cert
certs_info[server_cert][constants.NAMESPACE] = ns
certs_info[server_cert][constants.SECRET] = server_cert
del certs_info["dc-adminep-root-ca"]
del certs_info["dc-adminep-server"]
# user account certificates
user_account_certs = [("admin_conf_client", "/etc/kubernetes/admin.conf"),
("scheduler_conf_client", "/etc/kubernetes/scheduler.conf"),
("controller_manager_client", "/etc/kubernetes/controller-manager.conf")]
for cert_name, cert_path in user_account_certs:
with open(cert_path, 'r') as f:
data = yaml.safe_load(f)
client_cert = base64.decode_as_bytes(
data["users"][0]["user"]["client-certificate-data"])
cert_obj = cutils.extract_certs_from_pem(client_cert)[0]
certs_info[cert_name] = cutils.get_cert_values(cert_obj)
certs_info[cert_name][constants.FILEPATH] = cert_path
certs_info[cert_name][constants.RENEWAL] = constants.AUTOMATIC
LOG.debug(certs_info)
return certs_info
def get_all_k8s_certs(self, context):
"""
list all the k8s tls/opaque certificates with the all the certificate values
residual time, issue date, expiry date, issuer, subject, namespace,
secret, renewal and secret type
"""
return cutils.get_secrets_info()
def device_image_state_sort_key(dev_img_state):
if dev_img_state.bitstream_type == dconstants.BITSTREAM_TYPE_ROOT_KEY:

View File

@ -2324,3 +2324,17 @@ class ConductorAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
"""
return self.call(context, self.make_msg('request_firewall_runtime_update',
host_uuid=host_uuid))
def get_all_certs(self, context):
"""Synchronously, have the conductor retrieve the cert information.
:param context: request context.
"""
return self.call(context, self.make_msg('get_all_certs'))
def get_all_k8s_certs(self, context):
"""Synchronously, have the conductor retrieve the k8s certs information.
:param context: request context.
"""
return self.call(context, self.make_msg('get_all_k8s_certs'))