#!/usr/bin/env python # -*- coding: utf-8 -*- # $Id: tdUsb1.py 67039 2017-05-23 11:33:27Z vboxsync $ """ VirtualBox Validation Kit - USB testcase and benchmark. """ __copyright__ = \ """ Copyright (C) 2014-2016 Oracle Corporation This file is part of VirtualBox Open Source Edition (OSE), as available from http://www.virtualbox.org. This file is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License (GPL) as published by the Free Software Foundation, in version 2 as it comes in the "COPYING" file of the VirtualBox OSE distribution. VirtualBox OSE is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY of any kind. The contents of this file may alternatively be used under the terms of the Common Development and Distribution License Version 1.0 (CDDL) only, as it comes in the "COPYING.CDDL" file of the VirtualBox OSE distribution, in which case the provisions of the CDDL are applicable instead of those of the GPL. You may elect to license modified versions of this file under the terms and conditions of either the GPL or the CDDL or both. """ __version__ = "$Revision: 67039 $" # Standard Python imports. import os; import sys; import socket; # Only the main script needs to modify the path. try: __file__ except: __file__ = sys.argv[0]; g_ksValidationKitDir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))); sys.path.append(g_ksValidationKitDir); # Validation Kit imports. from testdriver import reporter; from testdriver import base; from testdriver import vbox; from testdriver import vboxcon; # USB gadget control import import usbgadget; class tdUsbBenchmark(vbox.TestDriver): # pylint: disable=R0902 """ USB benchmark. """ # The available test devices # # The first key is the hostname of the host the test is running on. # It contains a new dictionary with the attached gadgets based on the # USB speed we want to test (Low, Full, High, Super). # The parameters consist of the hostname of the gadget in the network # and the hardware type. kdGadgetParams = { 'adaris': { 'Low': ('usbtest.de.oracle.com', None), 'Full': ('usbtest.de.oracle.com', None), 'High': ('usbtest.de.oracle.com', None), 'Super': ('usbtest.de.oracle.com', None) }, }; # Mappings of USB controllers to supported USB device speeds. kdUsbSpeedMappings = { 'OHCI': ['Low', 'Full'], 'EHCI': ['High'], 'XHCI': ['Low', 'Full', 'High', 'Super'] }; # Tests currently disabled because they fail, need investigation. kdUsbTestsDisabled = { 'Low': [24], 'Full': [24], 'High': [24], 'Super': [24] }; def __init__(self): vbox.TestDriver.__init__(self); self.asRsrcs = None; self.asTestVMsDef = ['tst-arch']; self.asTestVMs = self.asTestVMsDef; self.asSkipVMs = []; self.asVirtModesDef = ['hwvirt', 'hwvirt-np', 'raw']; self.asVirtModes = self.asVirtModesDef; self.acCpusDef = [1, 2,]; self.acCpus = self.acCpusDef; self.asUsbCtrlsDef = ['OHCI', 'EHCI', 'XHCI']; self.asUsbCtrls = self.asUsbCtrlsDef; self.asUsbSpeedDef = ['Low', 'Full', 'High', 'Super']; self.asUsbSpeed = self.asUsbSpeedDef; self.asUsbTestsDef = ['Compliance', 'Reattach']; self.asUsbTests = self.asUsbTestsDef; self.cUsbReattachCyclesDef = 100; self.cUsbReattachCycles = self.cUsbReattachCyclesDef; self.sHostname = socket.gethostname().lower(); self.sGadgetHostnameDef = 'usbtest.de.oracle.com'; self.uGadgetPortDef = None; self.sUsbCapturePathDef = self.sScratchPath; self.sUsbCapturePath = self.sUsbCapturePathDef; self.fUsbCapture = False; # # Overridden methods. # def showUsage(self): rc = vbox.TestDriver.showUsage(self); reporter.log(''); reporter.log('tdUsb1 Options:'); reporter.log(' --virt-modes '); reporter.log(' Test the specified VMs in the given order. Use this to change'); reporter.log(' the execution order or limit the choice of VMs'); reporter.log(' Default: %s (all)' % (':'.join(self.asTestVMsDef))); reporter.log(' --skip-vms '); reporter.log(' Skip the specified VMs when testing.'); reporter.log(' --usb-ctrls '); reporter.log(' Default: %s' % (self.cUsbReattachCyclesDef)); reporter.log(' --hostname: '); reporter.log(' Default: %s' % (self.sHostname)); reporter.log(' --default-gadget-host '); reporter.log(' Default: %s' % (self.sGadgetHostnameDef)); reporter.log(' --default-gadget-port '); reporter.log(' Default: %s' % (6042)); reporter.log(' --usb-capture-path '); reporter.log(' Default: %s' % (self.sUsbCapturePathDef)); reporter.log(' --usb-capture'); reporter.log(' Whether to capture the USB traffic for each test'); return rc; def parseOption(self, asArgs, iArg): # pylint: disable=R0912,R0915 if asArgs[iArg] == '--virt-modes': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--virt-modes" takes a colon separated list of modes'); self.asVirtModes = asArgs[iArg].split(':'); for s in self.asVirtModes: if s not in self.asVirtModesDef: raise base.InvalidOption('The "--virt-modes" value "%s" is not valid; valid values are: %s' \ % (s, ' '.join(self.asVirtModesDef))); elif asArgs[iArg] == '--cpu-counts': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--cpu-counts" takes a colon separated list of cpu counts'); self.acCpus = []; for s in asArgs[iArg].split(':'): try: c = int(s); except: raise base.InvalidOption('The "--cpu-counts" value "%s" is not an integer' % (s,)); if c <= 0: raise base.InvalidOption('The "--cpu-counts" value "%s" is zero or negative' % (s,)); self.acCpus.append(c); elif asArgs[iArg] == '--test-vms': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--test-vms" takes colon separated list'); self.asTestVMs = asArgs[iArg].split(':'); for s in self.asTestVMs: if s not in self.asTestVMsDef: raise base.InvalidOption('The "--test-vms" value "%s" is not valid; valid values are: %s' \ % (s, ' '.join(self.asTestVMsDef))); elif asArgs[iArg] == '--skip-vms': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--skip-vms" takes colon separated list'); self.asSkipVMs = asArgs[iArg].split(':'); for s in self.asSkipVMs: if s not in self.asTestVMsDef: reporter.log('warning: The "--test-vms" value "%s" does not specify any of our test VMs.' % (s)); elif asArgs[iArg] == '--usb-ctrls': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-ctrls" takes a colon separated list of USB controllers'); self.asUsbCtrls = asArgs[iArg].split(':'); for s in self.asUsbCtrls: if s not in self.asUsbCtrlsDef: reporter.log('warning: The "--usb-ctrls" value "%s" is not a valid USB controller.' % (s)); elif asArgs[iArg] == '--usb-speed': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-speed" takes a colon separated list of USB speeds'); self.asUsbSpeed = asArgs[iArg].split(':'); for s in self.asUsbSpeed: if s not in self.asUsbSpeedDef: reporter.log('warning: The "--usb-speed" value "%s" is not a valid USB speed.' % (s)); elif asArgs[iArg] == '--usb-tests': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-tests" takes a colon separated list of USB tests'); self.asUsbTests = asArgs[iArg].split(':'); for s in self.asUsbTests: if s not in self.asUsbTestsDef: reporter.log('warning: The "--usb-tests" value "%s" is not a valid USB test.' % (s)); elif asArgs[iArg] == '--usb-reattach-cycles': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-reattach-cycles" takes cycle count'); try: self.cUsbReattachCycles = int(asArgs[iArg]); except: raise base.InvalidOption('The "--usb-reattach-cycles" value "%s" is not an integer' \ % (asArgs[iArg],)); if self.cUsbReattachCycles <= 0: raise base.InvalidOption('The "--usb-reattach-cycles" value "%s" is zero or negative.' \ % (self.cUsbReattachCycles,)); elif asArgs[iArg] == '--hostname': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--hostname" takes a hostname'); self.sHostname = asArgs[iArg]; elif asArgs[iArg] == '--default-gadget-host': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--default-gadget-host" takes a hostname'); self.sGadgetHostnameDef = asArgs[iArg]; elif asArgs[iArg] == '--default-gadget-port': iArg += 1; if iArg >= len(asArgs): raise base.InvalidOption('The "--default-gadget-port" takes port number'); try: self.uGadgetPortDef = int(asArgs[iArg]); except: raise base.InvalidOption('The "--default-gadget-port" value "%s" is not an integer' \ % (asArgs[iArg],)); if self.uGadgetPortDef <= 0: raise base.InvalidOption('The "--default-gadget-port" value "%s" is zero or negative.' \ % (self.uGadgetPortDef,)); elif asArgs[iArg] == '--usb-capture-path': if iArg >= len(asArgs): raise base.InvalidOption('The "--usb-capture-path" takes a path argument'); self.sUsbCapturePath = asArgs[iArg]; elif asArgs[iArg] == '--usb-capture': self.fUsbCapture = True; else: return vbox.TestDriver.parseOption(self, asArgs, iArg); return iArg + 1; def completeOptions(self): # Remove skipped VMs from the test list. for sVM in self.asSkipVMs: try: self.asTestVMs.remove(sVM); except: pass; return vbox.TestDriver.completeOptions(self); def getResourceSet(self): # Construct the resource list the first time it's queried. if self.asRsrcs is None: self.asRsrcs = []; if 'tst-arch' in self.asTestVMs: self.asRsrcs.append('4.2/usb/tst-arch.vdi'); return self.asRsrcs; def actionConfig(self): # Some stupid trickery to guess the location of the iso. ## fixme - testsuite unzip ++ sVBoxValidationKit_iso = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxValidationKit.iso')); if not os.path.isfile(sVBoxValidationKit_iso): sVBoxValidationKit_iso = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../VBoxTestSuite.iso')); if not os.path.isfile(sVBoxValidationKit_iso): sVBoxValidationKit_iso = '/mnt/ramdisk/vbox/svn/trunk/validationkit/VBoxValidationKit.iso'; if not os.path.isfile(sVBoxValidationKit_iso): sVBoxValidationKit_iso = '/mnt/ramdisk/vbox/svn/trunk/testsuite/VBoxTestSuite.iso'; if not os.path.isfile(sVBoxValidationKit_iso): sCur = os.getcwd(); for i in range(0, 10): sVBoxValidationKit_iso = os.path.join(sCur, 'validationkit/VBoxValidationKit.iso'); if os.path.isfile(sVBoxValidationKit_iso): break; sVBoxValidationKit_iso = os.path.join(sCur, 'testsuite/VBoxTestSuite.iso'); if os.path.isfile(sVBoxValidationKit_iso): break; sCur = os.path.abspath(os.path.join(sCur, '..')); if i is None: pass; # shut up pychecker/pylint. if not os.path.isfile(sVBoxValidationKit_iso): sVBoxValidationKit_iso = '/home/bird/validationkit/VBoxValidationKit.iso'; if not os.path.isfile(sVBoxValidationKit_iso): sVBoxValidationKit_iso = '/home/bird/testsuite/VBoxTestSuite.iso'; # Make sure vboxapi has been imported so we can use the constants. if not self.importVBoxApi(): return False; # # Configure the VMs we're going to use. # # Linux VMs if 'tst-arch' in self.asTestVMs: oVM = self.createTestVM('tst-arch', 1, '4.2/usb/tst-arch.vdi', sKind = 'ArchLinux_64', fIoApic = True, \ eNic0AttachType = vboxcon.NetworkAttachmentType_NAT, \ sDvdImage = sVBoxValidationKit_iso); if oVM is None: return False; return True; def actionExecute(self): """ Execute the testcase. """ fRc = self.testUsb(); return fRc; def getGadgetParams(self, sHostname, sSpeed): """ Returns the gadget hostname and port from the given hostname the test is running on and device speed we want to test. """ kdGadgetsConfigured = self.kdGadgetParams.get(sHostname); if kdGadgetsConfigured is not None: return kdGadgetsConfigured.get(sSpeed); return (self.sGadgetHostnameDef, self.uGadgetPortDef); def getCaptureFilePath(self, sUsbCtrl, sSpeed): """ Returns capture filename from the given data. """ return '%s%s%s-%s.pcap' % (self.sUsbCapturePath, os.sep, sUsbCtrl, sSpeed); def attachUsbDeviceToVm(self, oSession, sVendorId, sProductId, iBusId, sCaptureFile = None): """ Attaches the given USB device to the VM either via a filter or directly if capturing the USB traffic is enabled. Returns True on success, False on failure. """ fRc = False; if sCaptureFile is None: fRc = oSession.addUsbDeviceFilter('Compliance device', sVendorId = sVendorId, sProductId = sProductId, \ sPort = format(iBusId, 'x')); else: # Search for the correct device in the USB device list waiting for some time # to let it appear. iVendorId = int(sVendorId, 16); iProductId = int(sProductId, 16); # Try a few times to give VBoxSVC a chance to detect the new device. for _ in xrange(5): fFound = False; aoUsbDevs = self.oVBoxMgr.getArray(self.oVBox.host, 'USBDevices'); for oUsbDev in aoUsbDevs: if oUsbDev.vendorId == iVendorId \ and oUsbDev.productId == iProductId \ and oUsbDev.port == iBusId: fFound = True; fRc = oSession.attachUsbDevice(oUsbDev.id, sCaptureFile); break; if fFound: break; # Wait a moment until the next try. self.sleep(1); if fRc: # Wait a moment to let the USB device appear self.sleep(9); return fRc; # # Test execution helpers. # def testUsbCompliance(self, oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile = None): """ Test VirtualBoxs USB stack in a VM. """ # Get configured USB test devices from hostname we are running on sGadgetHost, uGadgetPort = self.getGadgetParams(self.sHostname, sSpeed); oUsbGadget = usbgadget.UsbGadget(); reporter.log('Connecting to UTS: ' + sGadgetHost); fRc = oUsbGadget.connectTo(30 * 1000, sGadgetHost, uPort = uGadgetPort); if fRc is True: reporter.log('Connect succeeded'); self.oVBox.host.addUSBDeviceSource('USBIP', sGadgetHost, sGadgetHost + (':%s' % oUsbGadget.getUsbIpPort()), [], []); fSuperSpeed = False; if sSpeed == 'Super': fSuperSpeed = True; # Create test device gadget and a filter to attach the device automatically. fRc = oUsbGadget.impersonate(usbgadget.g_ksGadgetImpersonationTest, fSuperSpeed); if fRc is True: iBusId, _ = oUsbGadget.getGadgetBusAndDevId(); fRc = self.attachUsbDeviceToVm(oSession, '0525', 'a4a0', iBusId, sCaptureFile); if fRc is True: tupCmdLine = ('UsbTest', ); # Exclude a few tests which hang and cause a timeout, need investigation. lstTestsExclude = self.kdUsbTestsDisabled.get(sSpeed); for iTestExclude in lstTestsExclude: tupCmdLine = tupCmdLine + ('--exclude', str(iTestExclude)); fRc = self.txsRunTest(oTxsSession, 'UsbTest', 3600 * 1000, \ '${CDROM}/${OS/ARCH}/UsbTest${EXESUFF}', tupCmdLine); if not fRc: reporter.testFailure('Running USB test utility failed'); else: reporter.testFailure('Failed to attach USB device to VM'); oUsbGadget.disconnectFrom(); else: reporter.testFailure('Failed to impersonate test device'); self.oVBox.host.removeUSBDeviceSource(sGadgetHost); else: reporter.testFailure('Failed to connect to USB gadget'); _ = sUsbCtrl; return fRc; def testUsbReattach(self, oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile = None): # pylint: disable=W0613 """ Tests that rapid connect/disconnect cycles work. """ # Get configured USB test devices from hostname we are running on sGadgetHost, uGadgetPort = self.getGadgetParams(self.sHostname, sSpeed); oUsbGadget = usbgadget.UsbGadget(); reporter.log('Connecting to UTS: ' + sGadgetHost); fRc = oUsbGadget.connectTo(30 * 1000, sGadgetHost, uPort = uGadgetPort); if fRc is True: self.oVBox.host.addUSBDeviceSource('USBIP', sGadgetHost, sGadgetHost + (':%s' % oUsbGadget.getUsbIpPort()), [], []); fSuperSpeed = False; if sSpeed == 'Super': fSuperSpeed = True; # Create test device gadget and a filter to attach the device automatically. fRc = oUsbGadget.impersonate(usbgadget.g_ksGadgetImpersonationTest, fSuperSpeed); if fRc is True: iBusId, _ = oUsbGadget.getGadgetBusAndDevId(); fRc = self.attachUsbDeviceToVm(oSession, '0525', 'a4a0', iBusId, sCaptureFile); if fRc is True: # Wait a moment to let the USB device appear self.sleep(3); # Do a rapid disconnect reconnect cycle. Wait a second before disconnecting # again or it will happen so fast that the VM can't attach the new device. # @todo: Get rid of the constant wait and use an event to get notified when # the device was attached. for iCycle in xrange (0, self.cUsbReattachCycles): fRc = oUsbGadget.disconnectUsb(); fRc = fRc and oUsbGadget.connectUsb(); if not fRc: reporter.testFailure('Reattach cycle %s failed on the gadget device' % (iCycle)); break; self.sleep(1); else: reporter.testFailure('Failed to impersonate test device'); oUsbGadget.disconnectFrom(); else: reporter.testFailure('Failed to connect to USB gadget'); else: reporter.testFailure('Failed to create USB device filter'); return fRc; def testUsbOneCfg(self, sVmName, sUsbCtrl, sSpeed, sUsbTest): """ Runs the specified VM thru one specified test. Returns a success indicator on the general test execution. This is not the actual test result. """ oVM = self.getVmByName(sVmName); # Reconfigure the VM fRc = True; oSession = self.openSession(oVM); if oSession is not None: fRc = fRc and oSession.enableVirtEx(True); fRc = fRc and oSession.enableNestedPaging(True); # Make sure controllers are disabled initially. fRc = fRc and oSession.enableUsbOhci(False); fRc = fRc and oSession.enableUsbEhci(False); fRc = fRc and oSession.enableUsbXhci(False); if sUsbCtrl == 'OHCI': fRc = fRc and oSession.enableUsbOhci(True); elif sUsbCtrl == 'EHCI': fRc = fRc and oSession.enableUsbEhci(True); elif sUsbCtrl == 'XHCI': fRc = fRc and oSession.enableUsbXhci(True); fRc = fRc and oSession.saveSettings(); fRc = oSession.close() and fRc and True; # pychecker hack. oSession = None; else: fRc = False; # Start up. if fRc is True: self.logVmInfo(oVM); oSession, oTxsSession = self.startVmAndConnectToTxsViaTcp(sVmName, fCdWait = False, fNatForwardingForTxs = False); if oSession is not None: self.addTask(oTxsSession); # Fudge factor - Allow the guest to finish starting up. self.sleep(5); sCaptureFile = None; if self.fUsbCapture: sCaptureFile = self.getCaptureFilePath(sUsbCtrl, sSpeed); if sUsbTest == 'Compliance': fRc = self.testUsbCompliance(oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile); elif sUsbTest == 'Reattach': fRc = self.testUsbReattach(oSession, oTxsSession, sUsbCtrl, sSpeed, sCaptureFile); # cleanup. self.removeTask(oTxsSession); self.terminateVmBySession(oSession) # Add the traffic dump if it exists and the test failed if reporter.testErrorCount() > 0 \ and sCaptureFile is not None \ and os.path.exists(sCaptureFile): reporter.addLogFile(sCaptureFile, 'misc/other', 'USB traffic dump'); else: fRc = False; return fRc; def testUsbForOneVM(self, sVmName): """ Runs one VM thru the various configurations. """ fRc = False; reporter.testStart(sVmName); for sUsbCtrl in self.asUsbCtrls: reporter.testStart(sUsbCtrl) for sUsbSpeed in self.asUsbSpeed: asSupportedSpeeds = self.kdUsbSpeedMappings.get(sUsbCtrl); if sUsbSpeed in asSupportedSpeeds: reporter.testStart(sUsbSpeed) for sUsbTest in self.asUsbTests: reporter.testStart(sUsbTest) fRc = self.testUsbOneCfg(sVmName, sUsbCtrl, sUsbSpeed, sUsbTest); reporter.testDone(); reporter.testDone(); reporter.testDone(); reporter.testDone(); return fRc; def testUsb(self): """ Executes USB test. """ reporter.log("Running on host: " + self.sHostname); # Loop thru the test VMs. for sVM in self.asTestVMs: # run test on the VM. fRc = self.testUsbForOneVM(sVM); return fRc; if __name__ == '__main__': sys.exit(tdUsbBenchmark().main(sys.argv));