config/controllerconfig/controllerconfig/controllerconfig/common/keystone.py

246 lines
6.1 KiB
Python
Executable File

#
# Copyright (c) 2014-2016 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
OpenStack Keystone Interactions
"""
import datetime
import iso8601
from controllerconfig.common.exceptions import KeystoneFail
from oslo_log import log
LOG = log.getLogger(__name__)
class Token(object):
def __init__(self, token_data, token_id):
self._expired = False
self._data = token_data
self._token_id = token_id
def set_expired(self):
""" Indicate token is expired """
self._expired = True
def is_expired(self, within_seconds=300):
""" Check if token is expired """
if not self._expired:
end = iso8601.parse_date(self._data['token']['expires_at'])
now = iso8601.parse_date(datetime.datetime.utcnow().isoformat())
delta = abs(end - now).seconds
return delta <= within_seconds
return True
def get_id(self):
""" Get the identifier of the token """
return self._token_id
def get_service_admin_url(self, service_type, service_name, region_name):
""" Search the catalog of a service for the administrative url """
return self.get_service_url(region_name, service_name,
service_type, 'admin')
def get_service_url(self, region_name, service_name, service_type,
endpoint_type):
"""
Search the catalog of a service in a region for the url
"""
for catalog in self._data['token']['catalog']:
if catalog['type'] == service_type:
if catalog['name'] == service_name:
if 0 != len(catalog['endpoints']):
for endpoint in catalog['endpoints']:
if (endpoint['region'] == region_name and
endpoint['interface'] == endpoint_type):
return endpoint['url']
raise KeystoneFail((
"Keystone service type %s, name %s, region %s, endpoint type %s "
"not available" %
(service_type, service_name, region_name, endpoint_type)))
class Service(object):
"""
Keystone Service
"""
def __init__(self, service_data):
self._data = service_data
def get_id(self):
if 'id' in self._data['service']:
return self._data['service']['id']
return None
class ServiceList(object):
"""
Keystone Service List
"""
def __init__(self, service_data):
self._data = service_data
def get_service_id(self, name, type):
"""
Search the services for the id
"""
for service in self._data['services']:
if service['name'] == name:
if service['type'] == type:
return service['id']
raise KeystoneFail((
"Keystone service type %s, name %s not available" %
(type, name)))
class Project(object):
"""
Keystone Project
"""
def __init__(self, project_data):
self._data = project_data
def get_id(self):
if 'id' in self._data['project']:
return self._data['project']['id']
return None
class ProjectList(object):
"""
Keystone Project List
"""
def __init__(self, project_data):
self._data = project_data
def get_project_id(self, name):
"""
Search the projects for the id
"""
for project in self._data['projects']:
if project['name'] == name:
return project['id']
return None
class Endpoint(object):
"""
Keystone Endpoint
"""
def __init__(self, endpoint_data):
self._data = endpoint_data
def get_id(self):
if 'id' in self._data['endpoint']:
return self._data['endpoint']['id']
return None
class EndpointList(object):
"""
Keystone Endpoint List
"""
def __init__(self, endpoint_data):
self._data = endpoint_data
def get_service_url(self, region_name, service_id, endpoint_type):
"""
Search the endpoints for the url
"""
for endpoint in self._data['endpoints']:
if endpoint['service_id'] == service_id:
if (endpoint['region'] == region_name and
endpoint['interface'] == endpoint_type):
return endpoint['url']
raise KeystoneFail((
"Keystone service id %s, region %s, endpoint type %s not "
"available" % (service_id, region_name, endpoint_type)))
class User(object):
"""
Keystone User
"""
def __init__(self, user_data):
self._data = user_data
def get_user_id(self):
return self._data['user']['id']
class UserList(object):
"""
Keystone User List
"""
def __init__(self, user_data):
self._data = user_data
def get_user_id(self, name):
"""
Search the users for the id
"""
for user in self._data['users']:
if user['name'] == name:
return user['id']
return None
class Role(object):
"""
Keystone Role
"""
def __init__(self, role_data):
self._data = role_data
class RoleList(object):
"""
Keystone Role List
"""
def __init__(self, role_data):
self._data = role_data
def get_role_id(self, name):
"""
Search the roles for the id
"""
for role in self._data['roles']:
if role['name'] == name:
return role['id']
return None
class Domain(object):
"""
Keystone Domain
"""
def __init__(self, user_data):
self._data = user_data
def get_domain_id(self):
return self._data['domain']['id']
class DomainList(object):
"""
Keystone Domain List
"""
def __init__(self, user_data):
self._data = user_data
def get_domain_id(self, name):
"""
Search the domains for the id
"""
for domain in self._data['domains']:
if domain['name'] == name:
return domain['id']
return None