nfv/nfv/nfv-plugins/nfv_plugins/nfvi_plugins/openstack/usm.py

139 lines
3.3 KiB
Python
Executable File

#
# Copyright (c) 2024 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import json
import os
from nfv_common import debug
from nfv_plugins.nfvi_plugins.openstack.objects import PLATFORM_SERVICE
from nfv_plugins.nfvi_plugins.openstack.rest_api import rest_api_request
REST_API_REQUEST_TIMEOUT = 60
DLOG = debug.debug_get_logger('nfv_plugins.nfvi_plugins.openstack.usm')
def _usm_api_cmd(token, endpoint):
base_url = token.get_service_url(PLATFORM_SERVICE.USM)
if base_url is None:
raise ValueError("PlatformService USM URL is invalid")
url = os.path.join(base_url, "v1/software", endpoint)
return url
def _api_cmd_headers():
api_cmd_headers = dict()
api_cmd_headers['Content-Type'] = "application/json"
api_cmd_headers['User-Agent'] = "vim/1.0"
return api_cmd_headers
def _api_get(token, url):
"""
Perform a generic GET for a particular API endpoint
"""
response = rest_api_request(token,
"GET",
url,
timeout_in_secs=REST_API_REQUEST_TIMEOUT)
return response
def _api_post(token, url, payload, headers=None):
"""
Generic POST to an endpoint with a payload
"""
if headers is None:
headers = _api_cmd_headers()
response = rest_api_request(token,
"POST",
url,
headers,
json.dumps(payload),
timeout_in_secs=REST_API_REQUEST_TIMEOUT)
return response
def sw_deploy_get_release(token, release):
"""
Query USM for information about a specific upgrade
"""
uri = f"show/{release}"
url = _usm_api_cmd(token, uri)
response = _api_get(token, url)
return response
def sw_deploy_host_list(token):
"""
Query USM for information about a hosts during a deployment
"""
# TODO(jkraitbe): This API will change in the future
uri = "host_list"
url = _usm_api_cmd(token, uri)
response = _api_get(token, url)
return response
def sw_deploy_precheck(token, release, force=False):
"""
Ask USM to precheck before a deployment
"""
uri = f"deploy_precheck/{release}/force" if force else f"deploy_precheck/{release}"
url = _usm_api_cmd(token, uri)
response = _api_post(token, url, {})
return response
def sw_deploy_start(token, release, force=False):
"""
Ask USM to start a deployment
"""
uri = f"deploy_start/{release}/force" if force else f"deploy_start/{release}"
url = _usm_api_cmd(token, uri)
response = _api_post(token, url, {})
return response
def sw_deploy_execute(token, host_name):
"""
Ask USM to execute a deployment on a host
"""
uri = f"deploy_host/{host_name}"
url = _usm_api_cmd(token, uri)
response = _api_post(token, url, {})
return response
def sw_deploy_activate(token, release):
"""
Ask USM activate a deployment
"""
uri = f"deploy_activate/{release}"
url = _usm_api_cmd(token, uri)
response = _api_post(token, url, {})
return response
def sw_deploy_complete(token, release):
"""
Ask USM complete a deployment
"""
uri = f"deploy_complete/{release}"
url = _usm_api_cmd(token, uri)
response = _api_post(token, url, {})
return response