#!/usr/bin/env python # -*- coding: utf-8 -*- # $Id: vboxshell.py 72919 2018-07-05 14:44:31Z vboxsync $ """ VirtualBox Python Shell. This program is a simple interactive shell for VirtualBox. You can query information and issue commands from a simple command line. It also provides you with examples on how to use VirtualBox's Python API. This shell is even somewhat documented, supports TAB-completion and history if you have Python readline installed. Finally, shell allows arbitrary custom extensions, just create .VirtualBox/shexts/ and drop your extensions there. Enjoy. P.S. Our apologies for the code quality. """ from __future__ import print_function __copyright__ = \ """ Copyright (C) 2009-2017 Oracle Corporation This file is part of VirtualBox Open Source Edition (OSE), as available from http://www.virtualbox.org. This file is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License (GPL) as published by the Free Software Foundation, in version 2 as it comes in the "COPYING" file of the VirtualBox OSE distribution. VirtualBox OSE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. """ __version__ = "$Revision: 72919 $" import gc import os import sys import traceback import shlex import time import re import platform from optparse import OptionParser from pprint import pprint # # Global Variables # g_fBatchMode = False g_sScriptFile = None g_sCmd = None g_fHasReadline = True try: import readline import rlcompleter except ImportError: g_fHasReadline = False g_sPrompt = "vbox> " g_fHasColors = True g_dTermColors = { 'red': '\033[31m', 'blue': '\033[94m', 'green': '\033[92m', 'yellow': '\033[93m', 'magenta': '\033[35m', 'cyan': '\033[36m' } def colored(strg, color): """ Translates a string to one including coloring settings, if enabled. """ if not g_fHasColors: return strg col = g_dTermColors.get(color, None) if col: return col+str(strg)+'\033[0m' return strg if g_fHasReadline: class CompleterNG(rlcompleter.Completer): def __init__(self, dic, ctx): self.ctx = ctx rlcompleter.Completer.__init__(self, dic) def complete(self, text, state): """ taken from: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/496812 """ if False and text == "": return ['\t', None][state] else: return rlcompleter.Completer.complete(self, text, state) def canBePath(self, _phrase, word): return word.startswith('/') def canBeCommand(self, phrase, _word): spaceIdx = phrase.find(" ") begIdx = readline.get_begidx() firstWord = (spaceIdx == -1 or begIdx < spaceIdx) if firstWord: return True if phrase.startswith('help'): return True return False def canBeMachine(self, phrase, word): return not self.canBePath(phrase, word) and not self.canBeCommand(phrase, word) def global_matches(self, text): """ Compute matches when text is a simple name. Return a list of all names currently defined in self.namespace that match. """ matches = [] phrase = readline.get_line_buffer() try: if self.canBePath(phrase, text): (directory, rest) = os.path.split(text) c = len(rest) for word in os.listdir(directory): if c == 0 or word[:c] == rest: matches.append(os.path.join(directory, word)) if self.canBeCommand(phrase, text): c = len(text) for lst in [ self.namespace ]: for word in lst: if word[:c] == text: matches.append(word) if self.canBeMachine(phrase, text): c = len(text) for mach in getMachines(self.ctx, False, True): # although it has autoconversion, we need to cast # explicitly for subscripts to work word = re.sub("(?' self.id = mach.id def cacheMachines(_ctx, lst): result = [] for mach in lst: elem = CachedMach(mach) result.append(elem) return result def getMachines(ctx, invalidate = False, simple=False): if ctx['vb'] is not None: if ctx['_machlist'] is None or invalidate: ctx['_machlist'] = ctx['global'].getArray(ctx['vb'], 'machines') ctx['_machlistsimple'] = cacheMachines(ctx, ctx['_machlist']) if simple: return ctx['_machlistsimple'] else: return ctx['_machlist'] else: return [] def asState(var): if var: return colored('on', 'green') else: return colored('off', 'green') def asFlag(var): if var: return 'yes' else: return 'no' def getFacilityStatus(ctx, guest, facilityType): (status, _timestamp) = guest.getFacilityStatus(facilityType) return asEnumElem(ctx, 'AdditionsFacilityStatus', status) def perfStats(ctx, mach): if not ctx['perf']: return for metric in ctx['perf'].query(["*"], [mach]): print(metric['name'], metric['values_as_string']) def guestExec(ctx, machine, console, cmds): exec(cmds) def printMouseEvent(_ctx, mev): print("Mouse : mode=%d x=%d y=%d z=%d w=%d buttons=%x" % (mev.mode, mev.x, mev.y, mev.z, mev.w, mev.buttons)) def printKbdEvent(ctx, kev): print("Kbd: ", ctx['global'].getArray(kev, 'scancodes')) def printMultiTouchEvent(ctx, mtev): print("MultiTouch : contacts=%d time=%d" % (mtev.contactCount, mtev.scanTime)) xPositions = ctx['global'].getArray(mtev, 'xPositions') yPositions = ctx['global'].getArray(mtev, 'yPositions') contactIds = ctx['global'].getArray(mtev, 'contactIds') contactFlags = ctx['global'].getArray(mtev, 'contactFlags') for i in range(0, mtev.contactCount): print(" [%d] %d,%d %d %d" % (i, xPositions[i], yPositions[i], contactIds[i], contactFlags[i])) def monitorSource(ctx, eventSource, active, dur): def handleEventImpl(event): evtype = event.type print("got event: %s %s" % (str(evtype), asEnumElem(ctx, 'VBoxEventType', evtype))) if evtype == ctx['global'].constants.VBoxEventType_OnMachineStateChanged: scev = ctx['global'].queryInterface(event, 'IMachineStateChangedEvent') if scev: print("machine state event: mach=%s state=%s" % (scev.machineId, scev.state)) elif evtype == ctx['global'].constants.VBoxEventType_OnSnapshotTaken: stev = ctx['global'].queryInterface(event, 'ISnapshotTakenEvent') if stev: print("snapshot taken event: mach=%s snap=%s" % (stev.machineId, stev.snapshotId)) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestPropertyChanged: gpcev = ctx['global'].queryInterface(event, 'IGuestPropertyChangedEvent') if gpcev: print("guest property change: name=%s value=%s" % (gpcev.name, gpcev.value)) elif evtype == ctx['global'].constants.VBoxEventType_OnMousePointerShapeChanged: psev = ctx['global'].queryInterface(event, 'IMousePointerShapeChangedEvent') if psev: shape = ctx['global'].getArray(psev, 'shape') if shape is None: print("pointer shape event - empty shape") else: print("pointer shape event: w=%d h=%d shape len=%d" % (psev.width, psev.height, len(shape))) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestMouse: mev = ctx['global'].queryInterface(event, 'IGuestMouseEvent') if mev: printMouseEvent(ctx, mev) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestKeyboard: kev = ctx['global'].queryInterface(event, 'IGuestKeyboardEvent') if kev: printKbdEvent(ctx, kev) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestMultiTouch: mtev = ctx['global'].queryInterface(event, 'IGuestMultiTouchEvent') if mtev: printMultiTouchEvent(ctx, mtev) class EventListener(object): def __init__(self, arg): pass def handleEvent(self, event): try: # a bit convoluted QI to make it work with MS COM handleEventImpl(ctx['global'].queryInterface(event, 'IEvent')) except: traceback.print_exc() pass if active: listener = ctx['global'].createListener(EventListener) else: listener = eventSource.createListener() registered = False if dur == -1: # not infinity, but close enough dur = 100000 try: eventSource.registerListener(listener, [ctx['global'].constants.VBoxEventType_Any], active) registered = True end = time.time() + dur while time.time() < end: if active: ctx['global'].waitForEvents(500) else: event = eventSource.getEvent(listener, 500) if event: handleEventImpl(event) # otherwise waitable events will leak (active listeners ACK automatically) eventSource.eventProcessed(listener, event) # We need to catch all exceptions here, otherwise listener will never be unregistered except: traceback.print_exc() pass if listener and registered: eventSource.unregisterListener(listener) g_tsLast = 0 def recordDemo(ctx, console, filename, dur): demo = open(filename, 'w') header = "VM=" + console.machine.name + "\n" demo.write(header) global g_tsLast g_tsLast = time.time() def stamp(): global g_tsLast tsCur = time.time() timePassed = int((tsCur-g_tsLast)*1000) g_tsLast = tsCur return timePassed def handleEventImpl(event): evtype = event.type #print("got event: %s %s" % (str(evtype), asEnumElem(ctx, 'VBoxEventType', evtype))) if evtype == ctx['global'].constants.VBoxEventType_OnGuestMouse: mev = ctx['global'].queryInterface(event, 'IGuestMouseEvent') if mev: line = "%d: m %d %d %d %d %d %d\n" % (stamp(), mev.mode, mev.x, mev.y, mev.z, mev.w, mev.buttons) demo.write(line) elif evtype == ctx['global'].constants.VBoxEventType_OnGuestKeyboard: kev = ctx['global'].queryInterface(event, 'IGuestKeyboardEvent') if kev: line = "%d: k %s\n" % (stamp(), str(ctx['global'].getArray(kev, 'scancodes'))) demo.write(line) listener = console.eventSource.createListener() registered = False # we create an aggregated event source to listen for multiple event sources (keyboard and mouse in our case) agg = console.eventSource.createAggregator([console.keyboard.eventSource, console.mouse.eventSource]) demo = open(filename, 'w') header = "VM=" + console.machine.name + "\n" demo.write(header) if dur == -1: # not infinity, but close enough dur = 100000 try: agg.registerListener(listener, [ctx['global'].constants.VBoxEventType_Any], False) registered = True end = time.time() + dur while time.time() < end: event = agg.getEvent(listener, 1000) if event: handleEventImpl(event) # keyboard/mouse events aren't waitable, so no need for eventProcessed # We need to catch all exceptions here, otherwise listener will never be unregistered except: traceback.print_exc() pass demo.close() if listener and registered: agg.unregisterListener(listener) def playbackDemo(ctx, console, filename, dur): demo = open(filename, 'r') if dur == -1: # not infinity, but close enough dur = 100000 header = demo.readline() print("Header is", header) basere = re.compile(r'(?P\d+): (?P[km]) (?P

.*)') mre = re.compile(r'(?P\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+) (?P-*\d+)') kre = re.compile(r'\d+') kbd = console.keyboard mouse = console.mouse try: end = time.time() + dur for line in demo: if time.time() > end: break match = basere.search(line) if match is None: continue rdict = match.groupdict() stamp = rdict['s'] params = rdict['p'] rtype = rdict['t'] time.sleep(float(stamp)/1000) if rtype == 'k': codes = kre.findall(params) #print("KBD:", codes) kbd.putScancodes(codes) elif rtype == 'm': mm = mre.search(params) if mm is not None: mdict = mm.groupdict() if mdict['a'] == '1': # absolute #print("MA: ", mdict['x'], mdict['y'], mdict['z'], mdict['b']) mouse.putMouseEventAbsolute(int(mdict['x']), int(mdict['y']), int(mdict['z']), int(mdict['w']), int(mdict['b'])) else: #print("MR: ", mdict['x'], mdict['y'], mdict['b']) mouse.putMouseEvent(int(mdict['x']), int(mdict['y']), int(mdict['z']), int(mdict['w']), int(mdict['b'])) # We need to catch all exceptions here, to close file except KeyboardInterrupt: ctx['interrupt'] = True except: traceback.print_exc() pass demo.close() def takeScreenshotOld(_ctx, console, args): from PIL import Image display = console.display if len(args) > 0: f = args[0] else: f = "/tmp/screenshot.png" if len(args) > 3: screen = int(args[3]) else: screen = 0 (fbw, fbh, _fbbpp, fbx, fby, _) = display.getScreenResolution(screen) if len(args) > 1: w = int(args[1]) else: w = fbw if len(args) > 2: h = int(args[2]) else: h = fbh print("Saving screenshot (%d x %d) screen %d in %s..." % (w, h, screen, f)) data = display.takeScreenShotToArray(screen, w, h, ctx['const'].BitmapFormat_RGBA) size = (w, h) mode = "RGBA" im = Image.frombuffer(mode, size, str(data), "raw", mode, 0, 1) im.save(f, "PNG") def takeScreenshot(_ctx, console, args): display = console.display if len(args) > 0: f = args[0] else: f = "/tmp/screenshot.png" if len(args) > 3: screen = int(args[3]) else: screen = 0 (fbw, fbh, _fbbpp, fbx, fby, _) = display.getScreenResolution(screen) if len(args) > 1: w = int(args[1]) else: w = fbw if len(args) > 2: h = int(args[2]) else: h = fbh print("Saving screenshot (%d x %d) screen %d in %s..." % (w, h, screen, f)) data = display.takeScreenShotToArray(screen, w, h, ctx['const'].BitmapFormat_PNG) pngfile = open(f, 'wb') pngfile.write(data) pngfile.close() def teleport(ctx, _session, console, args): if args[0].find(":") == -1: print("Use host:port format for teleport target") return (host, port) = args[0].split(":") if len(args) > 1: passwd = args[1] else: passwd = "" if len(args) > 2: maxDowntime = int(args[2]) else: maxDowntime = 250 port = int(port) print("Teleporting to %s:%d..." % (host, port)) progress = console.teleport(host, port, passwd, maxDowntime) if progressBar(ctx, progress, 100) and int(progress.resultCode) == 0: print("Success!") else: reportError(ctx, progress) def guestStats(ctx, console, args): guest = console.guest # we need to set up guest statistics if len(args) > 0 : update = args[0] else: update = 1 if guest.statisticsUpdateInterval != update: guest.statisticsUpdateInterval = update try: time.sleep(float(update)+0.1) except: # to allow sleep interruption pass all_stats = ctx['const'].all_values('GuestStatisticType') cpu = 0 for s in list(all_stats.keys()): try: val = guest.getStatistic( cpu, all_stats[s]) print("%s: %d" % (s, val)) except: # likely not implemented pass def plugCpu(_ctx, machine, _session, args): cpu = int(args[0]) print("Adding CPU %d..." % (cpu)) machine.hotPlugCPU(cpu) def unplugCpu(_ctx, machine, _session, args): cpu = int(args[0]) print("Removing CPU %d..." % (cpu)) machine.hotUnplugCPU(cpu) def mountIso(_ctx, machine, _session, args): machine.mountMedium(args[0], args[1], args[2], args[3], args[4]) machine.saveSettings() def cond(c, v1, v2): if c: return v1 else: return v2 def printHostUsbDev(ctx, ud): print(" %s: %s (vendorId=%d productId=%d serial=%s) %s" % (ud.id, colored(ud.product, 'blue'), ud.vendorId, ud.productId, ud.serialNumber, asEnumElem(ctx, 'USBDeviceState', ud.state))) def printUsbDev(_ctx, ud): print(" %s: %s (vendorId=%d productId=%d serial=%s)" % (ud.id, colored(ud.product, 'blue'), ud.vendorId, ud.productId, ud.serialNumber)) def printSf(ctx, sf): print(" name=%s host=%s %s %s" % (sf.name, colPath(ctx, sf.hostPath), cond(sf.accessible, "accessible", "not accessible"), cond(sf.writable, "writable", "read-only"))) def ginfo(ctx, console, _args): guest = console.guest if guest.additionsRunLevel != ctx['const'].AdditionsRunLevelType_None: print("Additions active, version %s" % (guest.additionsVersion)) print("Support seamless: %s" % (getFacilityStatus(ctx, guest, ctx['const'].AdditionsFacilityType_Seamless))) print("Support graphics: %s" % (getFacilityStatus(ctx, guest, ctx['const'].AdditionsFacilityType_Graphics))) print("Balloon size: %d" % (guest.memoryBalloonSize)) print("Statistic update interval: %d" % (guest.statisticsUpdateInterval)) else: print("No additions") usbs = ctx['global'].getArray(console, 'USBDevices') print("Attached USB:") for ud in usbs: printUsbDev(ctx, ud) rusbs = ctx['global'].getArray(console, 'remoteUSBDevices') print("Remote USB:") for ud in rusbs: printHostUsbDev(ctx, ud) print("Transient shared folders:") sfs = rusbs = ctx['global'].getArray(console, 'sharedFolders') for sf in sfs: printSf(ctx, sf) def cmdExistingVm(ctx, mach, cmd, args): session = None try: vbox = ctx['vb'] session = ctx['global'].openMachineSession(mach, fPermitSharing=True) except Exception as e: printErr(ctx, "Session to '%s' not open: %s" % (mach.name, str(e))) if g_fVerbose: traceback.print_exc() return if session.state != ctx['const'].SessionState_Locked: print("Session to '%s' in wrong state: %s" % (mach.name, session.state)) session.unlockMachine() return # this could be an example how to handle local only (i.e. unavailable # in Webservices) functionality if ctx['remote'] and cmd == 'some_local_only_command': print('Trying to use local only functionality, ignored') session.unlockMachine() return console = session.console ops = {'pause': lambda: console.pause(), 'resume': lambda: console.resume(), 'powerdown': lambda: console.powerDown(), 'powerbutton': lambda: console.powerButton(), 'stats': lambda: perfStats(ctx, mach), 'guest': lambda: guestExec(ctx, mach, console, args), 'ginfo': lambda: ginfo(ctx, console, args), 'guestlambda': lambda: args[0](ctx, mach, console, args[1:]), 'save': lambda: progressBar(ctx, session.machine.saveState()), 'screenshot': lambda: takeScreenshot(ctx, console, args), 'teleport': lambda: teleport(ctx, session, console, args), 'gueststats': lambda: guestStats(ctx, console, args), 'plugcpu': lambda: plugCpu(ctx, session.machine, session, args), 'unplugcpu': lambda: unplugCpu(ctx, session.machine, session, args), 'mountiso': lambda: mountIso(ctx, session.machine, session, args), } try: ops[cmd]() except KeyboardInterrupt: ctx['interrupt'] = True except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() session.unlockMachine() def cmdClosedVm(ctx, mach, cmd, args=[], save=True): session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine try: cmd(ctx, mach, args) except Exception as e: save = False printErr(ctx, e) if g_fVerbose: traceback.print_exc() if save: try: mach.saveSettings() except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() ctx['global'].closeMachineSession(session) def cmdAnyVm(ctx, mach, cmd, args=[], save=False): session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine try: cmd(ctx, mach, session.console, args) except Exception as e: save = False printErr(ctx, e) if g_fVerbose: traceback.print_exc() if save: mach.saveSettings() ctx['global'].closeMachineSession(session) def machById(ctx, uuid): mach = ctx['vb'].findMachine(uuid) return mach class XPathNode: def __init__(self, parent, obj, ntype): self.parent = parent self.obj = obj self.ntype = ntype def lookup(self, subpath): children = self.enum() matches = [] for e in children: if e.matches(subpath): matches.append(e) return matches def enum(self): return [] def matches(self, subexp): if subexp == self.ntype: return True if not subexp.startswith(self.ntype): return False match = re.search(r"@(?P\w+)=(?P[^\'\[\]]+)", subexp) matches = False try: if match is not None: xdict = match.groupdict() attr = xdict['a'] val = xdict['v'] matches = (str(getattr(self.obj, attr)) == val) except: pass return matches def apply(self, cmd): exec(cmd, {'obj':self.obj, 'node':self, 'ctx':self.getCtx()}, {}) def getCtx(self): if hasattr(self, 'ctx'): return self.ctx return self.parent.getCtx() class XPathNodeHolder(XPathNode): def __init__(self, parent, obj, attr, heldClass, xpathname): XPathNode.__init__(self, parent, obj, 'hld '+xpathname) self.attr = attr self.heldClass = heldClass self.xpathname = xpathname def enum(self): children = [] for node in self.getCtx()['global'].getArray(self.obj, self.attr): nodexml = self.heldClass(self, node) children.append(nodexml) return children def matches(self, subexp): return subexp == self.xpathname class XPathNodeValue(XPathNode): def __init__(self, parent, obj, xpathname): XPathNode.__init__(self, parent, obj, 'val '+xpathname) self.xpathname = xpathname def matches(self, subexp): return subexp == self.xpathname class XPathNodeHolderVM(XPathNodeHolder): def __init__(self, parent, vbox): XPathNodeHolder.__init__(self, parent, vbox, 'machines', XPathNodeVM, 'vms') class XPathNodeVM(XPathNode): def __init__(self, parent, obj): XPathNode.__init__(self, parent, obj, 'vm') #def matches(self, subexp): # return subexp=='vm' def enum(self): return [XPathNodeHolderNIC(self, self.obj), XPathNodeValue(self, self.obj.BIOSSettings, 'bios'), ] class XPathNodeHolderNIC(XPathNodeHolder): def __init__(self, parent, mach): XPathNodeHolder.__init__(self, parent, mach, 'nics', XPathNodeVM, 'nics') self.maxNic = self.getCtx()['vb'].systemProperties.getMaxNetworkAdapters(self.obj.chipsetType) def enum(self): children = [] for i in range(0, self.maxNic): node = XPathNodeNIC(self, self.obj.getNetworkAdapter(i)) children.append(node) return children class XPathNodeNIC(XPathNode): def __init__(self, parent, obj): XPathNode.__init__(self, parent, obj, 'nic') def matches(self, subexp): return subexp == 'nic' class XPathNodeRoot(XPathNode): def __init__(self, ctx): XPathNode.__init__(self, None, None, 'root') self.ctx = ctx def enum(self): return [XPathNodeHolderVM(self, self.ctx['vb'])] def matches(self, subexp): return True def eval_xpath(ctx, scope): pathnames = scope.split("/")[2:] nodes = [XPathNodeRoot(ctx)] for path in pathnames: seen = [] while len(nodes) > 0: node = nodes.pop() seen.append(node) for s in seen: matches = s.lookup(path) for match in matches: nodes.append(match) if len(nodes) == 0: break return nodes def argsToMach(ctx, args): if len(args) < 2: print("usage: %s [vmname|uuid]" % (args[0])) return None uuid = args[1] mach = machById(ctx, uuid) if mach == None: print("Machine '%s' is unknown, use list command to find available machines" % (uuid)) return mach def helpSingleCmd(cmd, h, sp): if sp != 0: spec = " [ext from "+sp+"]" else: spec = "" print(" %s: %s%s" % (colored(cmd, 'blue'), h, spec)) def helpCmd(_ctx, args): if len(args) == 1: print("Help page:") names = list(commands.keys()) names.sort() for i in names: helpSingleCmd(i, commands[i][0], commands[i][2]) else: cmd = args[1] c = commands.get(cmd) if c == None: print("Command '%s' not known" % (cmd)) else: helpSingleCmd(cmd, c[0], c[2]) return 0 def asEnumElem(ctx, enum, elem): enumVals = ctx['const'].all_values(enum) for e in list(enumVals.keys()): if str(elem) == str(enumVals[e]): return colored(e, 'green') return colored("", 'green') def enumFromString(ctx, enum, strg): enumVals = ctx['const'].all_values(enum) return enumVals.get(strg, None) def listCmd(ctx, _args): for mach in getMachines(ctx, True): try: if mach.teleporterEnabled: tele = "[T] " else: tele = " " print("%sMachine '%s' [%s], machineState=%s, sessionState=%s" % (tele, colVm(ctx, mach.name), mach.id, asEnumElem(ctx, "MachineState", mach.state), asEnumElem(ctx, "SessionState", mach.sessionState))) except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() return 0 def infoCmd(ctx, args): if len(args) < 2: print("usage: info [vmname|uuid]") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 try: vmos = ctx['vb'].getGuestOSType(mach.OSTypeId) except: vmos = None print(" One can use setvar to change variable, using name in [].") print(" Name [name]: %s" % (colVm(ctx, mach.name))) print(" Description [description]: %s" % (mach.description)) print(" ID [n/a]: %s" % (mach.id)) print(" OS Type [via OSTypeId]: %s" % (vmos.description if vmos is not None else mach.OSTypeId)) print(" Firmware [firmwareType]: %s (%s)" % (asEnumElem(ctx, "FirmwareType", mach.firmwareType), mach.firmwareType)) print() print(" CPUs [CPUCount]: %d" % (mach.CPUCount)) print(" RAM [memorySize]: %dM" % (mach.memorySize)) print(" VRAM [VRAMSize]: %dM" % (mach.VRAMSize)) print(" Monitors [monitorCount]: %d" % (mach.monitorCount)) print(" Chipset [chipsetType]: %s (%s)" % (asEnumElem(ctx, "ChipsetType", mach.chipsetType), mach.chipsetType)) print() print(" Clipboard mode [clipboardMode]: %s (%s)" % (asEnumElem(ctx, "ClipboardMode", mach.clipboardMode), mach.clipboardMode)) print(" Machine status [n/a]: %s (%s)" % (asEnumElem(ctx, "SessionState", mach.sessionState), mach.sessionState)) print() if mach.teleporterEnabled: print(" Teleport target on port %d (%s)" % (mach.teleporterPort, mach.teleporterPassword)) print() bios = mach.BIOSSettings print(" ACPI [BIOSSettings.ACPIEnabled]: %s" % (asState(bios.ACPIEnabled))) print(" APIC [BIOSSettings.IOAPICEnabled]: %s" % (asState(bios.IOAPICEnabled))) hwVirtEnabled = mach.getHWVirtExProperty(ctx['global'].constants.HWVirtExPropertyType_Enabled) print(" Hardware virtualization [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_Enabled, value)]: " + asState(hwVirtEnabled)) hwVirtVPID = mach.getHWVirtExProperty(ctx['const'].HWVirtExPropertyType_VPID) print(" VPID support [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_VPID, value)]: " + asState(hwVirtVPID)) hwVirtNestedPaging = mach.getHWVirtExProperty(ctx['const'].HWVirtExPropertyType_NestedPaging) print(" Nested paging [guest win machine.setHWVirtExProperty(ctx[\\'const\\'].HWVirtExPropertyType_NestedPaging, value)]: " + asState(hwVirtNestedPaging)) print(" Hardware 3d acceleration [accelerate3DEnabled]: " + asState(mach.accelerate3DEnabled)) print(" Hardware 2d video acceleration [accelerate2DVideoEnabled]: " + asState(mach.accelerate2DVideoEnabled)) print(" Use universal time [RTCUseUTC]: %s" % (asState(mach.RTCUseUTC))) print(" HPET [HPETEnabled]: %s" % (asState(mach.HPETEnabled))) if mach.audioAdapter.enabled: print(" Audio [via audioAdapter]: chip %s; host driver %s" % (asEnumElem(ctx, "AudioControllerType", mach.audioAdapter.audioController), asEnumElem(ctx, "AudioDriverType", mach.audioAdapter.audioDriver))) print(" CPU hotplugging [CPUHotPlugEnabled]: %s" % (asState(mach.CPUHotPlugEnabled))) print(" Keyboard [keyboardHIDType]: %s (%s)" % (asEnumElem(ctx, "KeyboardHIDType", mach.keyboardHIDType), mach.keyboardHIDType)) print(" Pointing device [pointingHIDType]: %s (%s)" % (asEnumElem(ctx, "PointingHIDType", mach.pointingHIDType), mach.pointingHIDType)) print(" Last changed [n/a]: " + time.asctime(time.localtime(mach.lastStateChange/1000))) # OSE has no VRDE try: print(" VRDE server [VRDEServer.enabled]: %s" % (asState(mach.VRDEServer.enabled))) except: pass print() print(colCat(ctx, " USB Controllers:")) for oUsbCtrl in ctx['global'].getArray(mach, 'USBControllers'): print(" '%s': type %s standard: %#x" \ % (oUsbCtrl.name, asEnumElem(ctx, "USBControllerType", oUsbCtrl.type), oUsbCtrl.USBStandard)) print() print(colCat(ctx, " I/O subsystem info:")) print(" Cache enabled [IOCacheEnabled]: %s" % (asState(mach.IOCacheEnabled))) print(" Cache size [IOCacheSize]: %dM" % (mach.IOCacheSize)) controllers = ctx['global'].getArray(mach, 'storageControllers') if controllers: print() print(colCat(ctx, " Storage Controllers:")) for controller in controllers: print(" '%s': bus %s type %s" % (controller.name, asEnumElem(ctx, "StorageBus", controller.bus), asEnumElem(ctx, "StorageControllerType", controller.controllerType))) attaches = ctx['global'].getArray(mach, 'mediumAttachments') if attaches: print() print(colCat(ctx, " Media:")) for a in attaches: print(" Controller: '%s' port/device: %d:%d type: %s (%s):" % (a.controller, a.port, a.device, asEnumElem(ctx, "DeviceType", a.type), a.type)) medium = a.medium if a.type == ctx['global'].constants.DeviceType_HardDisk: print(" HDD:") print(" Id: %s" % (medium.id)) print(" Location: %s" % (colPath(ctx, medium.location))) print(" Name: %s" % (medium.name)) print(" Format: %s" % (medium.format)) if a.type == ctx['global'].constants.DeviceType_DVD: print(" DVD:") if medium: print(" Id: %s" % (medium.id)) print(" Name: %s" % (medium.name)) if medium.hostDrive: print(" Host DVD %s" % (colPath(ctx, medium.location))) if a.passthrough: print(" [passthrough mode]") else: print(" Virtual image at %s" % (colPath(ctx, medium.location))) print(" Size: %s" % (medium.size)) if a.type == ctx['global'].constants.DeviceType_Floppy: print(" Floppy:") if medium: print(" Id: %s" % (medium.id)) print(" Name: %s" % (medium.name)) if medium.hostDrive: print(" Host floppy %s" % (colPath(ctx, medium.location))) else: print(" Virtual image at %s" % (colPath(ctx, medium.location))) print(" Size: %s" % (medium.size)) print() print(colCat(ctx, " Shared folders:")) for sf in ctx['global'].getArray(mach, 'sharedFolders'): printSf(ctx, sf) return 0 def startCmd(ctx, args): if len(args) < 2: print("usage: start name ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 if len(args) > 2: vmtype = args[2] else: vmtype = "gui" startVm(ctx, mach, vmtype) return 0 def createVmCmd(ctx, args): if len(args) != 3: print("usage: createvm name ostype") return 0 name = args[1] oskind = args[2] try: ctx['vb'].getGuestOSType(oskind) except Exception: print('Unknown OS type:', oskind) return 0 createVm(ctx, name, oskind) return 0 def ginfoCmd(ctx, args): if len(args) < 2: print("usage: ginfo [vmname|uuid]") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'ginfo', '') return 0 def execInGuest(ctx, console, args, env, user, passwd, tmo, inputPipe=None, outputPipe=None): if len(args) < 1: print("exec in guest needs at least program name") return guest = console.guest guestSession = guest.createSession(user, passwd, "", "vboxshell guest exec") # shall contain program name as argv[0] gargs = args print("executing %s with args %s as %s" % (args[0], gargs, user)) flags = 0 if inputPipe is not None: flags = 1 # set WaitForProcessStartOnly print(args[0]) process = guestSession.processCreate(args[0], gargs, env, [], tmo) print("executed with pid %d" % (process.PID)) if pid != 0: try: while True: if inputPipe is not None: indata = inputPipe(ctx) if indata is not None: write = len(indata) off = 0 while write > 0: w = guest.setProcessInput(pid, 0, 10*1000, indata[off:]) off = off + w write = write - w else: # EOF try: guest.setProcessInput(pid, 1, 10*1000, " ") except: pass data = guest.getProcessOutput(pid, 0, 10000, 4096) if data and len(data) > 0: sys.stdout.write(data) continue progress.waitForCompletion(100) ctx['global'].waitForEvents(0) data = guest.getProcessOutput(pid, 0, 0, 4096) if data and len(data) > 0: if outputPipe is not None: outputPipe(ctx, data) else: sys.stdout.write(data) continue if progress.completed: break except KeyboardInterrupt: print("Interrupted.") ctx['interrupt'] = True if progress.cancelable: progress.cancel() (_reason, code, _flags) = guest.getProcessStatus(pid) print("Exit code: %d" % (code)) return 0 else: reportError(ctx, progress) def copyToGuest(ctx, console, args, user, passwd): src = args[0] dst = args[1] flags = 0 print("Copying host %s to guest %s" % (src, dst)) progress = console.guest.copyToGuest(src, dst, user, passwd, flags) progressBar(ctx, progress) def nh_raw_input(prompt=""): stream = sys.stdout prompt = str(prompt) if prompt: stream.write(prompt) line = sys.stdin.readline() if not line: raise EOFError if line[-1] == '\n': line = line[:-1] return line def getCred(_ctx): import getpass user = getpass.getuser() user_inp = nh_raw_input("User (%s): " % (user)) if len(user_inp) > 0: user = user_inp passwd = getpass.getpass() return (user, passwd) def gexecCmd(ctx, args): if len(args) < 2: print("usage: gexec [vmname|uuid] command args") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 gargs = args[2:] env = [] # ["DISPLAY=:0"] (user, passwd) = getCred(ctx) gargs.insert(0, lambda ctx, mach, console, args: execInGuest(ctx, console, args, env, user, passwd, 10000)) cmdExistingVm(ctx, mach, 'guestlambda', gargs) return 0 def gcopyCmd(ctx, args): if len(args) < 2: print("usage: gcopy [vmname|uuid] host_path guest_path") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 gargs = args[2:] (user, passwd) = getCred(ctx) gargs.insert(0, lambda ctx, mach, console, args: copyToGuest(ctx, console, args, user, passwd)) cmdExistingVm(ctx, mach, 'guestlambda', gargs) return 0 def readCmdPipe(ctx, _hcmd): try: return ctx['process'].communicate()[0] except: return None def gpipeCmd(ctx, args): if len(args) < 4: print("usage: gpipe [vmname|uuid] hostProgram guestProgram, such as gpipe linux '/bin/uname -a' '/bin/sh -c \"/usr/bin/tee; /bin/uname -a\"'") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 hcmd = args[2] gcmd = args[3] (user, passwd) = getCred(ctx) import subprocess ctx['process'] = subprocess.Popen(split_no_quotes(hcmd), stdout=subprocess.PIPE) gargs = split_no_quotes(gcmd) env = [] gargs.insert(0, lambda ctx, mach, console, args: execInGuest(ctx, console, args, env, user, passwd, 10000, lambda ctx:readCmdPipe(ctx, hcmd))) cmdExistingVm(ctx, mach, 'guestlambda', gargs) try: ctx['process'].terminate() except: pass ctx['process'] = None return 0 def removeVmCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 removeVm(ctx, mach) return 0 def pauseCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'pause', '') return 0 def powerdownCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'powerdown', '') return 0 def powerbuttonCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'powerbutton', '') return 0 def resumeCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'resume', '') return 0 def saveCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'save', '') return 0 def statsCmd(ctx, args): mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'stats', '') return 0 def guestCmd(ctx, args): if len(args) < 3: print("usage: guest name commands") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 if mach.state != ctx['const'].MachineState_Running: cmdClosedVm(ctx, mach, lambda ctx, mach, a: guestExec (ctx, mach, None, ' '.join(args[2:]))) else: cmdExistingVm(ctx, mach, 'guest', ' '.join(args[2:])) return 0 def screenshotCmd(ctx, args): if len(args) < 2: print("usage: screenshot vm ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'screenshot', args[2:]) return 0 def teleportCmd(ctx, args): if len(args) < 3: print("usage: teleport name host:port ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'teleport', args[2:]) return 0 def portalsettings(_ctx, mach, args): enabled = args[0] mach.teleporterEnabled = enabled if enabled: port = args[1] passwd = args[2] mach.teleporterPort = port mach.teleporterPassword = passwd def openportalCmd(ctx, args): if len(args) < 3: print("usage: openportal name port ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 port = int(args[2]) if len(args) > 3: passwd = args[3] else: passwd = "" if not mach.teleporterEnabled or mach.teleporterPort != port or passwd: cmdClosedVm(ctx, mach, portalsettings, [True, port, passwd]) startVm(ctx, mach, "gui") return 0 def closeportalCmd(ctx, args): if len(args) < 2: print("usage: closeportal name") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 if mach.teleporterEnabled: cmdClosedVm(ctx, mach, portalsettings, [False]) return 0 def gueststatsCmd(ctx, args): if len(args) < 2: print("usage: gueststats name ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdExistingVm(ctx, mach, 'gueststats', args[2:]) return 0 def plugcpu(_ctx, mach, args): plug = args[0] cpu = args[1] if plug: print("Adding CPU %d..." % (cpu)) mach.hotPlugCPU(cpu) else: print("Removing CPU %d..." % (cpu)) mach.hotUnplugCPU(cpu) def plugcpuCmd(ctx, args): if len(args) < 2: print("usage: plugcpu name cpuid") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 if str(mach.sessionState) != str(ctx['const'].SessionState_Locked): if mach.CPUHotPlugEnabled: cmdClosedVm(ctx, mach, plugcpu, [True, int(args[2])]) else: cmdExistingVm(ctx, mach, 'plugcpu', args[2]) return 0 def unplugcpuCmd(ctx, args): if len(args) < 2: print("usage: unplugcpu name cpuid") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 if str(mach.sessionState) != str(ctx['const'].SessionState_Locked): if mach.CPUHotPlugEnabled: cmdClosedVm(ctx, mach, plugcpu, [False, int(args[2])]) else: cmdExistingVm(ctx, mach, 'unplugcpu', args[2]) return 0 def setvar(_ctx, _mach, args): expr = 'mach.'+args[0]+' = '+args[1] print("Executing", expr) exec(expr) def setvarCmd(ctx, args): if len(args) < 4: print("usage: setvar [vmname|uuid] expr value") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdClosedVm(ctx, mach, setvar, args[2:]) return 0 def setvmextra(_ctx, mach, args): key = args[0] value = args[1] print("%s: setting %s to %s" % (mach.name, key, value if value else None)) mach.setExtraData(key, value) def setExtraDataCmd(ctx, args): if len(args) < 3: print("usage: setextra [vmname|uuid|global] key ") return 0 key = args[2] if len(args) == 4: value = args[3] else: value = '' if args[1] == 'global': ctx['vb'].setExtraData(key, value) return 0 mach = argsToMach(ctx, args) if mach == None: return 0 cmdClosedVm(ctx, mach, setvmextra, [key, value]) return 0 def printExtraKey(obj, key, value): print("%s: '%s' = '%s'" % (obj, key, value)) def getExtraDataCmd(ctx, args): if len(args) < 2: print("usage: getextra [vmname|uuid|global] ") return 0 if len(args) == 3: key = args[2] else: key = None if args[1] == 'global': obj = ctx['vb'] else: obj = argsToMach(ctx, args) if obj == None: return 0 if key == None: keys = obj.getExtraDataKeys() else: keys = [ key ] for k in keys: printExtraKey(args[1], k, obj.getExtraData(k)) return 0 def quitCmd(_ctx, _args): return 1 def aliasCmd(ctx, args): if len(args) == 3: aliases[args[1]] = args[2] return 0 for (key, value) in list(aliases.items()): print("'%s' is an alias for '%s'" % (key, value)) return 0 def verboseCmd(ctx, args): global g_fVerbose if len(args) > 1: g_fVerbose = (args[1]=='on') else: g_fVerbose = not g_fVerbose return 0 def colorsCmd(ctx, args): global g_fHasColors if len(args) > 1: g_fHasColors = (args[1] == 'on') else: g_fHasColors = not g_fHasColors return 0 def hostCmd(ctx, args): vbox = ctx['vb'] try: print("VirtualBox version %s" % (colored(vbox.version, 'blue'))) except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() props = vbox.systemProperties print("Machines: %s" % (colPath(ctx, props.defaultMachineFolder))) #print("Global shared folders:") #for ud in ctx['global'].getArray(vbox, 'sharedFolders'): # printSf(ctx, sf) host = vbox.host cnt = host.processorCount print(colCat(ctx, "Processors:")) print(" available/online: %d/%d " % (cnt, host.processorOnlineCount)) for i in range(0, cnt): print(" processor #%d speed: %dMHz %s" % (i, host.getProcessorSpeed(i), host.getProcessorDescription(i))) print(colCat(ctx, "RAM:")) print(" %dM (free %dM)" % (host.memorySize, host.memoryAvailable)) print(colCat(ctx, "OS:")) print(" %s (%s)" % (host.operatingSystem, host.OSVersion)) if host.acceleration3DAvailable: print(colCat(ctx, "3D acceleration available")) else: print(colCat(ctx, "3D acceleration NOT available")) print(colCat(ctx, "Network interfaces:")) for ni in ctx['global'].getArray(host, 'networkInterfaces'): print(" %s (%s)" % (ni.name, ni.IPAddress)) print(colCat(ctx, "DVD drives:")) for dd in ctx['global'].getArray(host, 'DVDDrives'): print(" %s - %s" % (dd.name, dd.description)) print(colCat(ctx, "Floppy drives:")) for dd in ctx['global'].getArray(host, 'floppyDrives'): print(" %s - %s" % (dd.name, dd.description)) print(colCat(ctx, "USB devices:")) for ud in ctx['global'].getArray(host, 'USBDevices'): printHostUsbDev(ctx, ud) if ctx['perf']: for metric in ctx['perf'].query(["*"], [host]): print(metric['name'], metric['values_as_string']) return 0 def monitorGuestCmd(ctx, args): if len(args) < 2: print("usage: monitorGuest name (duration)") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.eventSource, active, dur)]) return 0 def monitorGuestKbdCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestKbd name (duration)") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.keyboard.eventSource, active, dur)]) return 0 def monitorGuestMouseCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestMouse name (duration)") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.mouse.eventSource, active, dur)]) return 0 def monitorGuestMultiTouchCmd(ctx, args): if len(args) < 2: print("usage: monitorGuestMultiTouch name (duration)") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 dur = 5 if len(args) > 2: dur = float(args[2]) active = False cmdExistingVm(ctx, mach, 'guestlambda', [lambda ctx, mach, console, args: monitorSource(ctx, console.mouse.eventSource, active, dur)]) return 0 def monitorVBoxCmd(ctx, args): if len(args) > 2: print("usage: monitorVBox (duration)") return 0 dur = 5 if len(args) > 1: dur = float(args[1]) vbox = ctx['vb'] active = False monitorSource(ctx, vbox.eventSource, active, dur) return 0 def getAdapterType(ctx, natype): if (natype == ctx['global'].constants.NetworkAdapterType_Am79C970A or natype == ctx['global'].constants.NetworkAdapterType_Am79C973): return "pcnet" elif (natype == ctx['global'].constants.NetworkAdapterType_I82540EM or natype == ctx['global'].constants.NetworkAdapterType_I82545EM or natype == ctx['global'].constants.NetworkAdapterType_I82543GC): return "e1000" elif (natype == ctx['global'].constants.NetworkAdapterType_Virtio): return "virtio" elif (natype == ctx['global'].constants.NetworkAdapterType_Null): return None else: raise Exception("Unknown adapter type: "+natype) def portForwardCmd(ctx, args): if len(args) != 5: print("usage: portForward ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 adapterNum = int(args[2]) hostPort = int(args[3]) guestPort = int(args[4]) proto = "TCP" session = ctx['global'].openMachineSession(mach, fPermitSharing=True) mach = session.machine adapter = mach.getNetworkAdapter(adapterNum) adapterType = getAdapterType(ctx, adapter.adapterType) profile_name = proto+"_"+str(hostPort)+"_"+str(guestPort) config = "VBoxInternal/Devices/" + adapterType + "/" config = config + str(adapter.slot) +"/LUN#0/Config/" + profile_name mach.setExtraData(config + "/Protocol", proto) mach.setExtraData(config + "/HostPort", str(hostPort)) mach.setExtraData(config + "/GuestPort", str(guestPort)) mach.saveSettings() session.unlockMachine() return 0 def showLogCmd(ctx, args): if len(args) < 2: print("usage: showLog vm ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 log = 0 if len(args) > 2: log = args[2] uOffset = 0 while True: data = mach.readLog(log, uOffset, 4096) if len(data) == 0: break # print adds either NL or space to chunks not ending with a NL sys.stdout.write(str(data)) uOffset += len(data) return 0 def findLogCmd(ctx, args): if len(args) < 3: print("usage: findLog vm pattern ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 log = 0 if len(args) > 3: log = args[3] pattern = args[2] uOffset = 0 while True: # to reduce line splits on buffer boundary data = mach.readLog(log, uOffset, 512*1024) if len(data) == 0: break d = str(data).split("\n") for s in d: match = re.findall(pattern, s) if len(match) > 0: for mt in match: s = s.replace(mt, colored(mt, 'red')) print(s) uOffset += len(data) return 0 def findAssertCmd(ctx, args): if len(args) < 2: print("usage: findAssert vm ") return 0 mach = argsToMach(ctx, args) if mach == None: return 0 log = 0 if len(args) > 2: log = args[2] uOffset = 0 ere = re.compile(r'(Expression:|\!\!\!\!\!\!)') active = False context = 0 while True: # to reduce line splits on buffer boundary data = mach.readLog(log, uOffset, 512*1024) if len(data) == 0: break d = str(data).split("\n") for s in d: if active: print(s) if context == 0: active = False else: context = context - 1 continue match = ere.findall(s) if len(match) > 0: active = True context = 50 print(s) uOffset += len(data) return 0 def evalCmd(ctx, args): expr = ' '.join(args[1:]) try: exec(expr) except Exception as e: printErr(ctx, e) if g_fVerbose: traceback.print_exc() return 0 def reloadExtCmd(ctx, args): # maybe will want more args smartness checkUserExtensions(ctx, commands, getHomeFolder(ctx)) autoCompletion(commands, ctx) return 0 def runScriptCmd(ctx, args): if len(args) != 2: print("usage: runScript