virtual-deployment/virtualbox/pybox/helper/vboxmanage.py

904 lines
26 KiB
Python

#!/usr/bin/python3
#
# SPDX-License-Identifier: Apache-2.0
#
"""
This module provides functions for managing virtual machines using VirtualBox.
"""
import os
import subprocess
import re
import getpass
import time
from sys import platform
from utils.install_log import LOG
def vboxmanage_version():
"""
Return version of vbox.
"""
cmd = ["vboxmanage", "--version"]
version = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
return version
def vboxmanage_extpack():
"""
This allows you to install, uninstall the vbox extensions"
"""
output = vboxmanage_version()
version = re.match(b"(.*)r", output)
version_path = version.group(1).decode("utf-8")
LOG.info("Downloading extension pack")
filename = f"Oracle_VM_VirtualBox_Extension_Pack-{version_path}.vbox-extpack"
cmd = [
"wget",
f"http://download.virtualbox.org/virtualbox/{version_path}/{filename}",
"-P",
"/tmp"
]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info(result)
LOG.info("Installing extension pack")
cmd = ["vboxmanage", "extpack", "install", "/tmp/" + filename, "--replace"]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info(result)
def get_all_vms(labname, option="vms"):
"""
Return a list of virtual machines (VMs) belonging to a specified lab.
Args:
labname (str): The name of the lab to which the VMs belong.
option (str, optional): The vboxmanage command option to use when listing VMs.
Defaults to "vms".
Returns:
list: A list of strings representing the names of the VMs that belong to the specified lab.
"""
initial_node_list = []
vm_list = vboxmanage_list(option)
labname.encode("utf-8")
# Reduce the number of VMs we query
for item in vm_list:
if labname.encode("utf-8") in item and (
b"controller-" in item or b"compute-" in item or b"storage-" in item
):
initial_node_list.append(item.decode("utf-8"))
# Filter by group
node_list = []
group = bytearray(f'"/{labname}"', "utf-8")
for item in initial_node_list:
info = vboxmanage_showinfo(item).splitlines()
for line in info:
try:
k_value, v_value = line.split(b"=")
except ValueError:
continue
if k_value == b"groups" and v_value == group:
node_list.append(item)
return node_list
def take_snapshot(labname, snapshot_name):
"""
Take a snapshot of all VMs belonging to a specified lab.
Args:
labname (str): The name of the lab whose VMs will be snapshotted.
snapshot_name (str): The name of the snapshot to be taken.
Returns:
None
"""
vms = get_all_vms(labname, option="vms")
runningvms = get_all_vms(labname, option="runningvms")
LOG.info("#### Taking snapshot %s of lab %s", snapshot_name, labname)
LOG.info("VMs in lab %s: %s", labname, vms)
LOG.info("VMs running in lab %s: %s", labname, runningvms)
_pause_running_vms(runningvms, vms)
if len(vms) != 0:
vboxmanage_takesnapshot(vms, snapshot_name)
_resume_running_vms(runningvms)
LOG.info("Waiting 10s before running VMs")
time.sleep(10)
if runningvms:
_wait_for_vms_to_run(labname, runningvms, vms)
def _pause_running_vms(runningvms, vms):
"""Pause running virtual machines.
Args:
runningvms (list): A list of strings representing the names of running virtual machines.
vms (list): A list of strings representing the names of all virtual machines.
Returns:
None
"""
if len(runningvms) > 1:
for node in runningvms:
newpid = os.fork()
if newpid == 0:
vboxmanage_controlvms([node], "pause")
os._exit(0) # pylint: disable=protected-access
for node in vms:
os.waitpid(0, 0)
time.sleep(2)
def _resume_running_vms(runningvms):
"""Resume paused virtual machines.
Args:
runningvms (list): A list of strings representing the names of running virtual machines.
Returns:
None
"""
if len(runningvms) > 1:
for node in runningvms:
newpid = os.fork()
if newpid == 0:
vboxmanage_controlvms([node], "resume")
os._exit(0) # pylint: disable=protected-access
for node in runningvms:
os.waitpid(0, 0)
def _wait_for_vms_to_run(labname, runningvms, vms):
"""Wait for virtual machines to finish running.
Args:
labname (str): The name of the lab whose virtual machines are being waited for.
runningvms (list): A list of strings representing the names of running virtual machines.
vms (list): A list of strings representing the names of all virtual machines.
Returns:
None
"""
new_vms = get_all_vms(labname, option="runningvms")
retry = 0
while retry < 20:
LOG.info(
"Waiting for VMs to come up running after taking snapshot..."
"Up VMs are %s ",
new_vms,
)
if len(runningvms) < len(new_vms):
time.sleep(1)
new_vms = get_all_vms(labname, option="runningvms")
retry += 1
else:
LOG.info("All VMs %s are up running after taking snapshot...", vms)
break
def restore_snapshot(node_list, name):
"""
Restore a snapshot of a list of virtual machines.
Args:
node_list (list): A list of strings representing the names of the virtual machines
whose snapshot will be restored.
name (str): The name of the snapshot to restore.
Returns:
None
"""
LOG.info("Restore snapshot of %s for hosts %s", name, node_list)
if len(node_list) != 0:
vboxmanage_controlvms(node_list, "poweroff")
LOG.info("Waiting 5s")
time.sleep(5)
if len(node_list) != 0:
for host in node_list:
vboxmanage_restoresnapshot(host, name)
LOG.info("Waiting 5s")
time.sleep(5)
for host in node_list:
if "controller-0" not in host:
vboxmanage_startvm(host)
LOG.info("Waiting 10s")
time.sleep(10)
for host in node_list:
if "controller-0" in host:
vboxmanage_startvm(host)
LOG.info("Waiting 10s")
time.sleep(10)
def vboxmanage_list(option="vms"):
"""
This returns a list of vm names.
"""
cmd = ["vboxmanage", "list", option]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
vms_list = []
for item in result.splitlines():
vm_name = re.match(b'"(.*?)"', item)
vms_list.append(vm_name.group(1))
return vms_list
def vboxmanage_showinfo(host):
"""
This returns info about the host
"""
if not isinstance(host, str):
host.decode("utf-8")
cmd = ["vboxmanage", "showvminfo", host, "--machinereadable"]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
return result
def vboxmanage_createvm(hostname, labname):
"""
This creates a VM with the specified name.
"""
assert hostname, "Hostname is required"
assert labname, "Labname is required"
group = "/" + labname
LOG.info("Creating VM %s", hostname)
cmd = [
"vboxmanage",
"createvm",
"--name",
hostname,
"--register",
"--ostype",
"Linux_64",
"--groups",
group,
]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def vboxmanage_deletevms(hosts=None):
"""
Deletes a list of VMs
"""
assert hosts, "A list of hostname(s) is required"
if len(hosts) != 0:
for hostname in hosts:
LOG.info("Deleting VM %s", hostname)
cmd = ["vboxmanage", "unregistervm", hostname, "--delete"]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info("Waiting 10s")
time.sleep(10)
# in case medium is still present after delete
vboxmanage_deletemedium(hostname)
vms_list = vboxmanage_list("vms")
for items in hosts:
assert (
items not in vms_list
), f"The following vms are unexpectedly present {vms_list}"
def vboxmanage_hostonlyifcreate(name="vboxnet0", oam_ip=None, netmask=None):
"""
This creates a hostonly network for systems to communicate.
"""
assert name, "Must provide network name"
assert oam_ip, "Must provide an OAM IP"
assert netmask, "Must provide an OAM Netmask"
LOG.info("Creating Host-only Network")
cmd = ["vboxmanage", "hostonlyif", "create"]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info("Provisioning %s with IP %s and Netmask %s", name, oam_ip, netmask)
cmd = [
"vboxmanage",
"hostonlyif",
"ipconfig",
name,
"--ip",
oam_ip,
"--netmask",
netmask,
]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def vboxmanage_hostonlyifdelete(name="vboxnet0"):
"""
Deletes hostonly network. This is used as a work around for creating too many hostonlyifs.
"""
assert name, "Must provide network name"
LOG.info("Removing Host-only Network")
cmd = ["vboxmanage", "hostonlyif", "remove", name]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def vboxmanage_modifyvm(hostname, vm_config=None):
"""
Modify a virtual machine according to a specified configuration.
Args:
hostname(str): Name of host to modify
vm_config (dict): A dictionary representing the configuration options
for the virtual machine. Possible key values: cpus, memory, nic, nictype,
nicpromisc, nicnum, intnet, hostonlyadapter, natnetwork, uartbase,
uartport, uartmode, uartpath, nicbootprio2=1, prefix=""
Returns:
None
"""
#put default values in nicbootprio2 and prefix if they not exist
vm_config["nicbootprio2"] = vm_config.get("nicbootprio2", 1)
vm_config["prefix"] = vm_config.get("prefix", "")
cmd = ["vboxmanage", "modifyvm", hostname]
nic_cmd = []
if _contains_value("cpus", vm_config):
cmd.extend(["--cpus", vm_config["cpus"]])
if _contains_value("memory", vm_config):
cmd.extend(["--memory", vm_config["memory"]])
if _is_network_configured(vm_config):
nic_cmd = _get_network_configuration(vm_config)
cmd.extend(nic_cmd)
elif _is_nat_network_configured(vm_config):
cmd.extend([f'--nic{vm_config["nicnum"]}', "nat"])
if _is_uart_configured(vm_config):
uart_config = _add_uart(vm_config)
cmd.extend(uart_config)
if _contains_value("nicbootprio2", vm_config):
cmd.extend(["--nicbootprio2"])
cmd.extend([f'{vm_config["nicbootprio2"]}'])
cmd.extend(["--boot4"])
cmd.extend(["net"])
LOG.info("#### Updating VM %s configuration", hostname)
LOG.info("#### Executing command on the host machine:\n$ %s\n", ' '.join(str(i) for i in cmd))
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def _is_network_configured(vm_config):
"""
Checks whether a network interface is configured in the given VM configuration.
Args:
vm_config (dict): A dictionary representing the configuration options for the VM.
Returns:
bool: True if a network interface is configured, False otherwise.
"""
return (_contains_value("nic", vm_config)
and _contains_value("nictype", vm_config)
and _contains_value("nicpromisc", vm_config)
and _contains_value("nicnum", vm_config)
)
def _get_network_configuration(vm_config):
"""
Constructs a list of options for the network interface based on the values in vm_config.
Args:
vm_config (dict): A dictionary representing the configuration options for the VM.
Returns:
list: A list of command-line options for the network interface.
"""
nic_cmd = [f'--nic{vm_config["nicnum"]}', vm_config["nic"]]
nic_cmd.extend([f'--nictype{vm_config["nicnum"]}', vm_config["nictype"]])
nic_cmd.extend([f'--nicpromisc{vm_config["nicnum"]}', vm_config["nicpromisc"]])
if _contains_value("intnet", vm_config):
intnet = vm_config["intnet"]
if _contains_value("prefix", vm_config):
intnet = f"{vm_config['prefix']}-{intnet}"
else:
intnet = f"{intnet}"
nic_cmd.extend([f'--intnet{vm_config["nicnum"]}', intnet])
if _contains_value("hostonlyadapter", vm_config):
nic_cmd.extend(
[
f'--hostonlyadapter{vm_config["nicnum"]}',
vm_config["hostonlyadapter"],
]
)
if _contains_value("natnetwork", vm_config):
nic_cmd.extend(
[f'--nat-network{vm_config["nicnum"]}', vm_config["natnetwork"]]
)
return nic_cmd
def _is_nat_network_configured(vm_config):
"""
Checks whether the NAT network is configured in the given VM configuration.
Args:
vm_config (dict): A dictionary representing the configuration options for the VM.
Returns:
bool: True if the NAT network is configured, False otherwise.
"""
return _contains_value("nicnum", vm_config) and vm_config.get("nictype") == "nat"
def _is_uart_configured(vm_config):
"""
Checks whether the UART device is configured in the given VM configuration.
Args:
vm_config (dict): A dictionary representing the configuration options for the VM.
Returns:
bool: True if the UART device is configured, False otherwise.
"""
return (
_contains_value("uartbase", vm_config)
and _contains_value("uartport", vm_config)
and _contains_value("uartmode", vm_config)
and _contains_value("uartpath", vm_config)
)
def _add_uart(vm_config):
"""
Constructs a list of options for the UART device based on the values in vm_config.
Args:
vm_config (dict): A dictionary representing the configuration options for the VM.
Returns:
list: A list of command-line options for the UART device.
"""
uart_config = ["--uart1"]
uart_config.extend([f'{vm_config["uartbase"]}'])
uart_config.extend([f'{vm_config["uartport"]}'])
uart_config.extend(["--uartmode1"])
uart_config.extend([f'{vm_config["uartmode"]}'])
uart_config.extend([f'{vm_config["uartpath"]}'])
return uart_config
def _contains_value(key, dictionary):
return key in dictionary and dictionary[key]
def vboxmanage_storagectl(hostname=None, storectl="sata", hostiocache="off"):
"""
This creates a storage controller on the host.
"""
assert hostname, "Hostname is required"
assert storectl, "Type of storage controller is required"
LOG.info("Creating %s storage controller on VM %s", storectl, hostname)
cmd = [
"vboxmanage",
"storagectl",
hostname,
"--name",
storectl,
"--add",
storectl,
"--hostiocache",
hostiocache,
]
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def vboxmanage_storageattach(hostname, storage_config):
"""
Attaches a disk to a storage controller.
Args:
hostname (str): Name of the virtual machine.
storage_config (dict): A dictionary containing the config options for the storage device.
Possible key values: storectl, storetype, disk, port_num, device_num.
Returns:
str: The output of the vboxmanage command.
"""
assert hostname, "Hostname is required"
assert storage_config and isinstance(storage_config, dict), "Storage configuration is required"
storectl = storage_config.get("storectl", "sata")
storetype = storage_config.get("storetype", "hdd")
disk = storage_config.get("disk")
port_num = storage_config.get("port_num", "0")
device_num = storage_config.get("device_num", "0")
assert disk, "Disk name is required"
assert storectl, "Name of storage controller is required"
assert storetype, "Type of storage controller is required"
LOG.info(
"Attaching %s storage to storage controller %s on VM %s",
storetype,
storectl,
hostname,
)
cmd = [
"vboxmanage",
"storageattach",
hostname,
"--storagectl",
storectl,
"--medium",
disk,
"--type",
storetype,
"--port",
port_num,
"--device",
device_num,
]
return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
def vboxmanage_deletemedium(hostname, vbox_home_dir="/home"):
"""
Deletes the disk medium associated with a virtual machine.
Args:
hostname (str): The name of the virtual machine to which the disk medium is attached.
vbox_home_dir (str): The directory in which the disk medium files are stored.
Defaults to "/home".
Returns:
None
"""
assert hostname, "Hostname is required"
if platform in ("win32", "win64"):
return
username = getpass.getuser()
vbox_home_dir = f"{vbox_home_dir}/{username}/vbox_disks/"
disk_list = [
f
for f in os.listdir(vbox_home_dir)
if os.path.isfile(os.path.join(vbox_home_dir, f)) and hostname in f
]
LOG.info("Disk mediums to delete: %s", disk_list)
for disk in disk_list:
LOG.info("Disconnecting disk %s from vbox.", disk)
try:
cmd = [
"vboxmanage",
"closemedium",
"disk",
f"{vbox_home_dir}{disk}",
"--delete",
]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info(result)
except subprocess.CalledProcessError as exception:
# Continue if failures, disk may not be present
LOG.warning(
"Error disconnecting disk, continuing. "
"Details: stdout: %s stderr: %s",
exception.stdout,
exception.stderr,
)
LOG.info("Removing backing file %s", disk)
try:
os.remove(f"{vbox_home_dir}{disk}")
except Exception as exc:
LOG.debug("Failure at removing backing file\nError: %s\n", repr(exc))
def vboxmanage_createmedium(hostname=None, disk_list=None, vbox_home_dir="/home"):
"""
This creates the required disks.
"""
assert hostname, "Hostname is required"
assert disk_list, "A list of disk sizes is required"
username = getpass.getuser()
device_num = 0
port_num = 0
disk_count = 1
for disk in disk_list:
if platform in ("win32", "win64"):
file_name = (
"C:\\Users\\"
+ username
+ "\\vbox_disks\\"
+ hostname
+ f"_disk_{disk_count}"
)
else:
file_name = (
vbox_home_dir
+ "/"
+ username
+ "/vbox_disks/"
+ hostname
+ f"_disk_{disk_count}"
)
LOG.info(
"Creating disk %s of size %s on VM %s on device %s port %s",
file_name,
disk,
hostname,
device_num,
port_num,
)
try:
cmd = [
"vboxmanage",
"createmedium",
"disk",
"--size",
str(disk),
"--filename",
file_name,
"--format",
"vdi",
"--variant",
"standard",
]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info(result)
except subprocess.CalledProcessError as exception:
LOG.error("Error stdout: %s stderr: %s", exception.stdout, exception.stderr)
raise
vboxmanage_storageattach(
hostname,
{
"storectl": "sata",
"storetype": "hdd",
"disk": file_name + ".vdi",
"port_num": str(port_num),
"device_num": str(device_num),
},
)
disk_count += 1
port_num += 1
LOG.info("Waiting 5s")
time.sleep(5)
def vboxmanage_startvm(hostname=None, headless=False, force=False):
"""
This allows you to power on a VM.
"""
assert hostname, "Hostname is required"
if not force:
LOG.info("Check if VM is running")
running_vms = vboxmanage_list(option="runningvms")
else:
running_vms = []
interface_type = "gui"
if headless:
interface_type = "headless"
if hostname.encode("utf-8") in running_vms:
LOG.info("Host %s is already started", hostname)
else:
LOG.info("Powering on VM %s", hostname)
cmd = ["vboxmanage", "startvm", hostname, "--type", interface_type]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
LOG.info(result)
# Wait for VM to start
tmout = 20
while tmout:
tmout -= 1
running_vms = vboxmanage_list(option="runningvms")
if hostname.encode("utf-8") in running_vms:
break
time.sleep(1)
else:
raise f"Failed to start VM: {hostname}"
LOG.info("VM '%s' started.", hostname)
def vboxmanage_controlvms(hosts=None, action=None):
"""
This allows you to control a VM, e.g. pause, resume, etc.
"""
assert hosts, "Hostname is required"
assert action, "Need to provide an action to execute"
for host in hosts:
LOG.info("Executing %s action on VM %s", action, host)
subprocess.call(
["vboxmanage", "controlvm", host, action], stderr=subprocess.STDOUT
)
time.sleep(1)
def vboxmanage_takesnapshot(hosts=None, name=None):
"""
This allows you to take snapshot of VMs.
"""
assert hosts, "Hostname is required"
assert name, "Need to provide a name for the snapshot"
for host in hosts:
LOG.info("Taking snapshot %s on VM %s", name, host)
subprocess.call(
["vboxmanage", "snapshot", host, "take", name], stderr=subprocess.STDOUT
)
def vboxmanage_restoresnapshot(host=None, name=None):
"""
This allows you to restore snapshot of a VM.
"""
assert host, "Hostname is required"
assert name, "Need to provide the snapshot to restore"
LOG.info("Restoring snapshot %s on VM %s", name, host)
subprocess.call(
["vboxmanage", "snapshot", host, "restore", name], stderr=subprocess.STDOUT
)
LOG.info("Waiting 10s")
time.sleep(10)
def vboxmanage_getrulename(network, local_port):
"""
Get port-forwarding rule for given NAT network and local port in VirtualBox.
Args:
network (str): Name of the NAT network.
local_port (str): The local port number.
Returns:
(str): Name of rule or empty
"""
# List information about all nat networks in VirtualBox
cmd = ["vboxmanage", "list", "natnets"]
result = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
natpattern = r"NetworkName:(.*?)loopback mappings \(ipv4\)"
natnetworks = re.findall(natpattern,result.decode(),re.DOTALL)
# Get the rule name of the given local port in the given natnetwork
for natnetwork in natnetworks:
natinfo = natnetwork.strip().split('\n')
if natinfo[0] == network:
try:
startindex = natinfo.index("Port-forwarding (ipv4)")
except ValueError:
# If no index is found the function return an empty string
return ""
for index in range (startindex+1,len(natinfo)):
rule = natinfo[index].strip()
parsed_rule = rule.split(':')
if int(parsed_rule[3]) == int(local_port):
return parsed_rule[0]
return ""
def vboxmanage_addportforward(rule_name, local_port, guest_ip, guest_port, network):
"""
Add port-forwarding rule for a NAT network in VirtualBox.
Args:
rule_name (str): Name of the port-forward rule to be added.
local_port (str): The local port number to forward.
guest_ip (str): The IP address of the guest to forward to.
guest_port (str): The port number on the guest to forward to.
network (str): Name of the NAT network.
Returns:
True if the port was added
False if an error occurred when trying to add the port-forward rule.
"""
rule = f"{rule_name}:tcp:[]:{local_port}:[{guest_ip}]:{guest_port}"
LOG.info("Creating port-forwarding rule to: %s", rule)
cmd = [
"vboxmanage",
"natnetwork",
"modify",
"--netname",
network,
"--port-forward-4",
rule,
]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
LOG.info("Error while trying to create port-forwarding rule. Continuing installation!")
return False
return True
def vboxmanage_deleteportforward(rule_name, network):
"""
Delete port-forwarding rule for a NAT network in VirtualBox.
Args:
rule_name (str): Name of the port-forward rule to be deleted.
network (str): Name of the NAT network.
Returns:
None
"""
LOG.info(
"Removing previous forwarding rule '%s' from NAT network '%s'",
rule_name,
network,
)
cmd = [
"vboxmanage",
"natnetwork",
"modify",
"--netname",
network,
"--port-forward-4",
"delete",
rule_name,
]
try:
subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError:
LOG.info("Error while trying to delete port-forwarding rule. Continuing installation!")