# # Copyright (c) 2018 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # import contextlib import jsonpatch import netaddr import os import pecan import re import socket import sys import traceback import tsconfig.tsconfig as tsc import wsme from inventory.api.controllers.v1.sysinv import cgtsclient from inventory.common import constants from inventory.common import exception from inventory.common.i18n import _ from inventory.common import k_host from inventory.common.utils import memoized from inventory import objects from oslo_config import cfg from oslo_log import log CONF = cfg.CONF LOG = log.getLogger(__name__) KEY_VALUE_SEP = '=' JSONPATCH_EXCEPTIONS = (jsonpatch.JsonPatchException, jsonpatch.JsonPointerException, KeyError) def ip_version_to_string(ip_version): return str(constants.IP_FAMILIES[ip_version]) def validate_limit(limit): if limit and limit < 0: raise wsme.exc.ClientSideError(_("Limit must be positive")) return min(CONF.api.limit_max, limit) or CONF.api.limit_max def validate_sort_dir(sort_dir): if sort_dir not in ['asc', 'desc']: raise wsme.exc.ClientSideError(_("Invalid sort direction: %s. " "Acceptable values are " "'asc' or 'desc'") % sort_dir) return sort_dir def validate_patch(patch): """Performs a basic validation on patch.""" if not isinstance(patch, list): patch = [patch] for p in patch: path_pattern = re.compile("^/[a-zA-Z0-9-_]+(/[a-zA-Z0-9-_]+)*$") if not isinstance(p, dict) or \ any(key for key in ["path", "op"] if key not in p): raise wsme.exc.ClientSideError( _("Invalid patch format: %s") % str(p)) path = p["path"] op = p["op"] if op not in ["add", "replace", "remove"]: raise wsme.exc.ClientSideError( _("Operation not supported: %s") % op) if not path_pattern.match(path): raise wsme.exc.ClientSideError(_("Invalid path: %s") % path) if op == "add": if path.count('/') == 1: raise wsme.exc.ClientSideError( _("Adding an additional attribute (%s) to the " "resource is not allowed") % path) def validate_mtu(mtu): """Check if MTU is valid""" if mtu < 576 or mtu > 9216: raise wsme.exc.ClientSideError(_( "MTU must be between 576 and 9216 bytes.")) def validate_address_within_address_pool(ip, pool): """Determine whether an IP address is within the specified IP address pool. :param ip netaddr.IPAddress object :param pool objects.AddressPool object """ ipset = netaddr.IPSet() for start, end in pool.ranges: ipset.update(netaddr.IPRange(start, end)) if netaddr.IPAddress(ip) not in ipset: raise wsme.exc.ClientSideError(_( "IP address %s is not within address pool ranges") % str(ip)) def validate_address_within_nework(ip, network): """Determine whether an IP address is within the specified IP network. :param ip netaddr.IPAddress object :param network objects.Network object """ LOG.info("TODO(sc) validate_address_within_address_pool " "ip=%s, network=%s" % (ip, network)) class ValidTypes(wsme.types.UserType): """User type for validate that value has one of a few types.""" def __init__(self, *types): self.types = types def validate(self, value): for t in self.types: if t is wsme.types.text and isinstance(value, wsme.types.bytes): value = value.decode() if isinstance(value, t): return value else: raise ValueError("Wrong type. Expected '%s', got '%s'" % ( self.types, type(value))) def is_valid_hostname(hostname): """Determine whether an address is valid as per RFC 1123. """ # Maximum length of 255 rc = True length = len(hostname) if length > 255: raise wsme.exc.ClientSideError(_( "Hostname {} is too long. Length {} is greater than 255." "Please configure valid hostname.").format(hostname, length)) # Allow a single dot on the right hand side if hostname[-1] == ".": hostname = hostname[:-1] # Create a regex to ensure: # - hostname does not begin or end with a dash # - each segment is 1 to 63 characters long # - valid characters are A-Z (any case) and 0-9 valid_re = re.compile("(?!-)[A-Z\d-]{1,63}(?