Commit 8ca070bf authored by Dhinak G's avatar Dhinak G
Browse files

Initial commit

parents
Showing with 1667 additions and 0 deletions
+1667 -0
.flake8 0 → 100644
[flake8]
ignore = E501, E203
\ No newline at end of file
name: Build
on:
push:
pull_request:
workflow_dispatch:
release:
types: [published]
jobs:
macOS:
name: Build macOS
runs-on: self-hosted
steps:
- uses: actions/checkout@v2
# - name: Set up Python 3
# run: brew install python3 python-tk
- name: Install Python Dependencies
run: pip3 install -r requirements.txt
- name: Install Debug Dependencies
run: pip3 install pyinstaller
- name: Build for macOS
run: pyinstaller spec/macOS.spec
- name: Zip
run: cd dist; zip macOS.zip macOS
- name: Upload to Artifacts
uses: actions/upload-artifact@v2
with:
name: Artifacts macOS
path: dist/macOS.zip
- name: Upload to Release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@e74ff71f7d8a4c4745b560a485cc5fdb9b5b999d
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: dist/macOS.zip
tag: ${{ github.ref }}
file_glob: true
windows:
name: Build Windows
runs-on: windows-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python 3
uses: actions/setup-python@v2
with:
python-version: "3.x"
- name: Install Python Dependencies
run: pip3 install -r requirements.txt
- name: Install Debug Dependencies
run: pip3 install pyinstaller
- name: Build for Windows
run: pyinstaller spec\Windows.spec && pyinstaller spec\Windows_dir.spec && pyinstaller spec\debug_dump.spec
- uses: papeloto/action-zip@v1
with:
files: dist/Windows/
recursive: false
dest: dist/Windows.zip
- name: Upload to Artifacts
uses: actions/upload-artifact@v2
with:
name: Artifacts Windows
path: |
dist/*.exe
dist/Windows.zip
- name: Upload to Release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@e74ff71f7d8a4c4745b560a485cc5fdb9b5b999d
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: dist/*.exe
tag: ${{ github.ref }}
file_glob: true
- name: Upload to Release 2
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@e74ff71f7d8a4c4745b560a485cc5fdb9b5b999d
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: dist/Windows.zip
tag: ${{ github.ref }}
file_glob: true
.gitignore 0 → 100644
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# static files generated from Django application using `collectstatic`
media
static
# General
.DS_Store
.AppleDouble
.LSOverride
# Icon must end with two \r
Icon
# Thumbnails
._*
# Files that might appear in the root of a volume
.DocumentRevisions-V100
.fseventsd
.Spotlight-V100
.TemporaryItems
.Trashes
.VolumeIcon.icns
.com.apple.timemachine.donotpresent
# Directories potentially created on remote AFP share
.AppleDB
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
usb.json
USBMap.kext
UTBMap.kext
*USBMap.json
settings.json
\ No newline at end of file
.pylintrc 0 → 100644
[MASTER]
init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))"
[MESSAGES CONTROL]
disable=unused-import,
subprocess-run-check,
line-too-long,
too-few-public-methods,
missing-module-docstring,
missing-class-docstring,
missing-function-docstring
\ No newline at end of file
README.md 0 → 100644
# USBToolBoxᵇᵉᵗᵃ
*Making USB mapping simple(r)*
The USBToolBox tool is a USB mapping tool supporting Windows and macOS. It allows for building a custom injector kext from Windows and macOS.
## Features
* Supports mapping from Windows and macOS
* Can build a map using either the USBToolBox kext or native Apple kexts (AppleUSBHostMergeProperties)
* Supports multiple ways of matching
* Supports companion ports (on Windows)
* Make educated guesses for port types (on Windows)
## Supported Methods
### From Windows
Windows 10 is recommended for the full feature set (companion port binding, port type guessing.). Windows 8 may work, Windows 7 and below will very likely crash.
Simply download the latest `Windows.exe` from releases. If Windows Defender/other antivirus complains, you can either whitelist the download or use `Windows.zip`, which doesn't have a self extractor (which is what most antiviruses seem to complain about).
### From Windows PE
Yes this works lol. Some device names may not be as descriptive but if you really don't want to install Windows, you can create a Windows PE USB and hit Shift + F10 to open `cmd,` then run the program.
### From macOS
macOS is *not* recommended for several reasons. You won't have features like guessing port types (as there simply isn't enough info for this) as well as binding companion ports (again, no info). However, there's also port limits to deal with, and in macOS 11.3, `XhciPortLimit` is broken, resulting in a lot more hoops to go through.
If you still want to use macOS, download `macOS.zip` from releases.
## Usage
This is gonna be a very basic guide for now. A fully-fleshed guide will be released in the future.
1. Download the appropriate download for your OS.
2. Open and adjust settings if necessary.
3. Select Discover Ports and wait for the listing to populate.
4. Plug in a USB device into each port. Wait for the listing to show your USB device before unplugging it and plugging it into another port.
* If on Windows, you only need to plug in 1 device to USB 3 ports (as companion detection should be working). If on macOS, you will have to plug in a USB 2 device and a USB 3 device into each USB 3 port.
* For old computers with OHCI/UHCI and EHCI controllers, you will need to plug in a mouse/keyboard to map the USB 1.1 personalities, as most USB 2 devices will end on the USB 2 personality.
5. Once mapping is done, go to the Select Ports screen.
6. Select your ports and adjust port types as neccesary.
7. Press K to build the kext!
8. Add the resulting USB map to your `EFI/OC/Kexts` folder, and make sure to update your `config.plist`.
* If building a map that uses the USBToolBox kext, make sure to grab the [latest release](https://github.com/USBToolBox/kext/releases) of the kext too.
* Make sure to remove `UTBDefault.kext` <!-- i need a better name for this lol -->, if you have it.
9. Reboot and you should have your USB map working!
## Known Issues
See the [issues tab](https://github.com/USBToolBox/tool/issues)
## Credits
@CorpNewt for [USBMap](https://github.com/corpnewt/USBMap). This project was heavily inspired by USBMap (and some functions are from USBMap).
My testing team (you know who you are) for testing
from os.path import dirname, basename, isfile
import glob
modules = glob.glob(dirname(__file__)+"/*.py")
__all__ = [ basename(f)[:-3] for f in modules if isfile(f) and not f.endswith('__init__.py')]
\ No newline at end of file
from typing import NewType, Union
import objc
from CoreFoundation import CFRelease # type: ignore # pylint: disable=no-name-in-module
from Foundation import NSBundle # type: ignore # pylint: disable=no-name-in-module
from PyObjCTools import Conversion
IOKit_bundle = NSBundle.bundleWithIdentifier_("com.apple.framework.IOKit")
io_name_t_ref = b"[128c]" # pylint: disable=invalid-name
CFStringRef = b"^{__CFString=}"
CFDictionaryRef = b"^{__CFDictionary=}"
# https://developer.apple.com/library/archive/documentation/Cocoa/Conceptual/ObjCRuntimeGuide/Articles/ocrtTypeEncodings.html
functions = [
("IORegistryEntryCreateCFProperties", b"IIo^@II"),
("IOServiceMatching", CFDictionaryRef + b"r*"),
("IOServiceGetMatchingServices", b"II" + CFDictionaryRef + b"o^I"),
("IOIteratorNext", b"II"),
("IORegistryEntryGetParentEntry", b"IIr*o^I"),
("IOObjectRelease", b"II"),
# io_name_t is char[128]
("IORegistryEntryGetName", b"IIo" + io_name_t_ref),
("IOObjectGetClass", b"IIo" + io_name_t_ref),
("IOObjectCopyClass", CFStringRef + b"I"),
("IOObjectCopySuperclassForClass", CFStringRef + CFStringRef),
("IORegistryEntryGetChildIterator", b"IIr*o^I"),
("IORegistryCreateIterator", b"IIr*Io^I"),
("IORegistryEntryCreateIterator", b"IIr*Io^I"),
("IORegistryIteratorEnterEntry", b"II"),
("IORegistryIteratorExitEntry", b"II"),
("IORegistryEntryCreateCFProperty", b"@I" + CFStringRef + b"II"),
("IORegistryEntryGetPath", b"IIr*oI"),
("IORegistryEntryCopyPath", CFStringRef + b"Ir*"),
]
# TODO: Proper typing
# pylint: disable=invalid-name
pointer = type(None)
kern_return_t = NewType("kern_return_t", int)
io_object_t = NewType("io_object_t", object)
io_name_t = bytes
io_string_t = bytes
# io_registry_entry_t = NewType("io_registry_entry_t", io_object_t)
io_registry_entry_t = io_object_t
io_iterator_t = NewType("io_iterator_t", io_object_t)
CFTypeRef = Union[int, float, bytes, dict, list]
IOOptionBits = int
mach_port_t = int
CFAllocatorRef = int
NULL = 0
kIOMasterPortDefault: mach_port_t = NULL
kCFAllocatorDefault: CFAllocatorRef = NULL
kNilOptions: IOOptionBits = NULL
kIORegistryIterateRecursively = 1
kIORegistryIterateParents = 2
# pylint: enable=invalid-name
# kern_return_t IORegistryEntryCreateCFProperties(io_registry_entry_t entry, CFMutableDictionaryRef * properties, CFAllocatorRef allocator, IOOptionBits options);
def IORegistryEntryCreateCFProperties(entry: io_registry_entry_t, properties: pointer, allocator: CFAllocatorRef, options: IOOptionBits) -> tuple[kern_return_t, dict]: # pylint: disable=invalid-name
raise NotImplementedError
# CFMutableDictionaryRef IOServiceMatching(const char * name);
def IOServiceMatching(name: bytes) -> dict: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOServiceGetMatchingServices(mach_port_t masterPort, CFDictionaryRef matching CF_RELEASES_ARGUMENT, io_iterator_t * existing);
def IOServiceGetMatchingServices(masterPort: mach_port_t, matching: dict, existing: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# io_object_t IOIteratorNext(io_iterator_t iterator);
def IOIteratorNext(iterator: io_iterator_t) -> io_object_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetParentEntry(io_registry_entry_t entry, const io_name_t plane, io_registry_entry_t * parent);
def IORegistryEntryGetParentEntry(entry: io_registry_entry_t, plane: io_name_t, parent: pointer) -> tuple[kern_return_t, io_registry_entry_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOObjectRelease(io_object_t object);
def IOObjectRelease(object: io_object_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetName(io_registry_entry_t entry, io_name_t name);
def IORegistryEntryGetName(entry: io_registry_entry_t, name: pointer) -> tuple[kern_return_t, str]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IOObjectGetClass(io_object_t object, io_name_t className);
def IOObjectGetClass(object: io_object_t, className: pointer) -> tuple[kern_return_t, str]: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IOObjectCopyClass(io_object_t object);
def IOObjectCopyClass(object: io_object_t) -> str: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IOObjectCopySuperclassForClass(CFStringRef classname)
def IOObjectCopySuperclassForClass(classname: str) -> str: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetChildIterator(io_registry_entry_t entry, const io_name_t plane, io_iterator_t * iterator);
def IORegistryEntryGetChildIterator(entry: io_registry_entry_t, plane: io_name_t, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryCreateIterator(mach_port_t masterPort, const io_name_t plane, IOOptionBits options, io_iterator_t * iterator)
def IORegistryCreateIterator(masterPort: mach_port_t, plane: io_name_t, options: IOOptionBits, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryCreateIterator(io_registry_entry_t entry, const io_name_t plane, IOOptionBits options, io_iterator_t * iterator)
def IORegistryEntryCreateIterator(entry: io_registry_entry_t, plane: io_name_t, options: IOOptionBits, iterator: pointer) -> tuple[kern_return_t, io_iterator_t]: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryIteratorEnterEntry(io_iterator_t iterator)
def IORegistryIteratorEnterEntry(iterator: io_iterator_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryIteratorExitEntry(io_iterator_t iterator)
def IORegistryIteratorExitEntry(iterator: io_iterator_t) -> kern_return_t: # pylint: disable=invalid-name
raise NotImplementedError
# CFTypeRef IORegistryEntryCreateCFProperty(io_registry_entry_t entry, CFStringRef key, CFAllocatorRef allocator, IOOptionBits options);
def IORegistryEntryCreateCFProperty(entry: io_registry_entry_t, key: str, allocator: CFAllocatorRef, options: IOOptionBits) -> CFTypeRef: # pylint: disable=invalid-name
raise NotImplementedError
# kern_return_t IORegistryEntryGetPath(io_registry_entry_t entry, const io_name_t plane, io_string_t path);
def IORegistryEntryGetPath(entry: io_registry_entry_t, plane: io_name_t, path: pointer) -> tuple[kern_return_t, io_string_t]: # pylint: disable=invalid-name
raise NotImplementedError
# CFStringRef IORegistryEntryCopyPath(io_registry_entry_t entry, const io_name_t plane)
def IORegistryEntryCopyPath(entry: io_registry_entry_t, plane: bytes) -> str: # pylint: disable=invalid-name
raise NotImplementedError
objc.loadBundleFunctions(IOKit_bundle, globals(), functions) # type: ignore # pylint: disable=no-member
def ioiterator_to_list(iterator: io_iterator_t):
# items = []
item = IOIteratorNext(iterator) # noqa: F821
while item:
# items.append(next)
yield item
item = IOIteratorNext(iterator) # noqa: F821
IOObjectRelease(iterator) # noqa: F821
# return items
def corefoundation_to_native(collection):
native = Conversion.pythonCollectionFromPropertyList(collection)
CFRelease(collection)
return native
def native_to_corefoundation(native):
return Conversion.propertyListFromPythonCollection(native)
def io_name_t_to_str(name):
return name.partition(b"\0")[0].decode()
def get_class_inheritance(io_object):
classes = []
cls = IOObjectCopyClass(io_object)
while cls:
# yield cls
classes.append(cls)
CFRelease(cls)
cls = IOObjectCopySuperclassForClass(cls)
return classes
# pylint: disable=invalid-name
import enum
from time import time
from typing import Callable
class USBDeviceSpeeds(enum.IntEnum):
LowSpeed = 0
FullSpeed = 1
HighSpeed = 2
SuperSpeed = 3
# The integer value of this only applies for macOS
SuperSpeedPlus = 4
# This is not an actual value
Unknown = 9999
def __str__(self) -> str:
return _usb_protocol_names[self]
def __bool__(self) -> bool:
return True
class USBPhysicalPortTypes(enum.IntEnum):
USBTypeA = 0
USBTypeMiniAB = 1
ExpressCard = 2
USB3TypeA = 3
USB3TypeB = 4
USB3TypeMicroB = 5
USB3TypeMicroAB = 6
USB3TypePowerB = 7
USB3TypeC_USB2Only = 8
USB3TypeC_WithSwitch = 9
USB3TypeC_WithoutSwitch = 10
Internal = 255
def __str__(self) -> str:
return _usb_physical_port_types[self]
def __bool__(self) -> bool:
return True
class USBControllerTypes(enum.IntEnum):
UHCI = int("0x00", 16)
OHCI = int("0x10", 16)
EHCI = int("0x20", 16)
XHCI = int("0x30", 16)
Unknown = 9999
def __str__(self) -> str:
return _usb_controller_types[self]
def __bool__(self) -> bool:
return True
_usb_controller_types = {
USBControllerTypes.UHCI: "USB 1.1 (UHCI)",
USBControllerTypes.OHCI: "USB 1.1 (OHCI)",
USBControllerTypes.EHCI: "USB 2.0 (EHCI)",
USBControllerTypes.XHCI: "USB 3.0 (XHCI)",
USBControllerTypes.Unknown: "Unknown",
}
_usb_physical_port_types = {
USBPhysicalPortTypes.USBTypeA: "Type A",
USBPhysicalPortTypes.USBTypeMiniAB: "Type Mini-AB",
USBPhysicalPortTypes.ExpressCard: "ExpressCard",
USBPhysicalPortTypes.USB3TypeA: "USB 3 Type A",
USBPhysicalPortTypes.USB3TypeB: "USB 3 Type B",
USBPhysicalPortTypes.USB3TypeMicroB: "USB 3 Type Micro-B",
USBPhysicalPortTypes.USB3TypeMicroAB: "USB 3 Type Micro-AB",
USBPhysicalPortTypes.USB3TypePowerB: "USB 3 Type Power-B",
USBPhysicalPortTypes.USB3TypeC_USB2Only: "Type C - USB 2 only",
USBPhysicalPortTypes.USB3TypeC_WithSwitch: "Type C - with switch",
USBPhysicalPortTypes.USB3TypeC_WithoutSwitch: "Type C - without switch",
USBPhysicalPortTypes.Internal: "Internal",
}
_short_names = True
_usb_protocol_names_full = {
USBDeviceSpeeds.LowSpeed: "USB 1.1 (Low Speed)",
USBDeviceSpeeds.FullSpeed: "USB 1.1 (Full Speed)",
USBDeviceSpeeds.HighSpeed: "USB 2.0 (High Speed)",
USBDeviceSpeeds.SuperSpeed: "USB 3.0/USB 3.1 Gen 1/USB 3.2 Gen 1x1 (SuperSpeed)",
USBDeviceSpeeds.SuperSpeedPlus: "USB 3.1 Gen 2/USB 3.2 Gen 2×1 (SuperSpeed+)",
USBDeviceSpeeds.Unknown: "Unknown",
}
_usb_protocol_names_short = {
USBDeviceSpeeds.LowSpeed: "USB 1.1",
USBDeviceSpeeds.FullSpeed: "USB 1.1",
USBDeviceSpeeds.HighSpeed: "USB 2.0",
USBDeviceSpeeds.SuperSpeed: "USB 3.0",
USBDeviceSpeeds.SuperSpeedPlus: "USB 3.1 Gen 2",
USBDeviceSpeeds.Unknown: "Unknown",
}
_usb_protocol_names = _usb_protocol_names_short if _short_names else _usb_protocol_names_full
def time_it(func: Callable, text: str, *args, **kwargs):
start = time()
result = func(*args, **kwargs)
end = time()
input(f"{text} took {end - start}, press enter to continue".strip())
return result
def debug(str):
if True:
input(f"DEBUG: {str}\nPress enter to continue")
# def speed_to_name(speed: USBDeviceSpeeds):
# return _usb_protocol_names[speed]
# USBDump Conversion Interface
import itertools
import json
import subprocess
import sys
from operator import itemgetter
from pathlib import Path
from Scripts import shared
# input_path = input("File path: ")
# if input_path:
# file_path = Path("samples/" + input_path)
# else:
# file_path = Path("samples/tablet.json")
# info = json.load(file_path.open())
hub_map = {}
def get_port_type(port):
supported_usb_protocols = port["ConnectionInfoV2"]["SupportedUsbProtocols"]
if supported_usb_protocols["Usb300"]:
return shared.USBDeviceSpeeds.SuperSpeed
elif supported_usb_protocols["Usb200"] and supported_usb_protocols["Usb110"]:
return shared.USBDeviceSpeeds.HighSpeed
elif supported_usb_protocols["Usb110"]:
return shared.USBDeviceSpeeds.FullSpeed
else:
return shared.USBDeviceSpeeds.Unknown
def get_device_speed(port):
speed = port["ConnectionInfo"]["Speed"]
if speed == shared.USBDeviceSpeeds.LowSpeed:
return (shared.USBDeviceSpeeds.LowSpeed, None)
elif speed == shared.USBDeviceSpeeds.FullSpeed:
speed = shared.USBDeviceSpeeds.FullSpeed
elif speed == shared.USBDeviceSpeeds.HighSpeed:
speed = shared.USBDeviceSpeeds.HighSpeed
elif speed == shared.USBDeviceSpeeds.SuperSpeed and port["ConnectionInfoV2"]["Flags"]["DeviceIsOperatingAtSuperSpeedPlusOrHigher"]:
return (shared.USBDeviceSpeeds.SuperSpeedPlus, None)
elif speed == shared.USBDeviceSpeeds.SuperSpeed:
speed = shared.USBDeviceSpeeds.SuperSpeed
else:
return (shared.USBDeviceSpeeds.Unknown, speed)
if port["ConnectionInfoV2"]["Flags"]["DeviceIsSuperSpeedPlusCapableOrHigher"]:
return (speed, shared.USBDeviceSpeeds.SuperSpeedPlus)
elif speed < shared.USBDeviceSpeeds.SuperSpeed and port["ConnectionInfoV2"]["Flags"]["DeviceIsSuperSpeedCapableOrHigher"]:
return (speed, shared.USBDeviceSpeeds.SuperSpeed)
else:
return (speed, None)
def get_device_speed_string(port, hub_port_count=None):
speed = get_device_speed(port)
# return f"{speed[0]}{(', ' + speed[1] + ' capable') if speed[1] else ''}{(', ' + str(hub_port_count) + ' ports') if hub_port_count else ''}"
return speed[0]
def get_device_name(port):
if not port["UsbDeviceProperties"]:
port["UsbDeviceProperties"] = {}
if not port["DeviceInfoNode"]:
port["DeviceInfoNode"] = {}
if (
not port["ConnectionInfo"]["DeviceDescriptor"]["iProduct"]
or (port["UsbDeviceProperties"].get("DeviceDesc") or port["DeviceInfoNode"].get("DeviceDescName"))
and (port["UsbDeviceProperties"].get("DeviceDesc") or port["DeviceInfoNode"].get("DeviceDescName")) != "USB Composite Device"
):
return port["UsbDeviceProperties"].get("DeviceDesc") or port["DeviceInfoNode"].get("DeviceDescName")
for string_desc in port["StringDescs"] or []:
if string_desc["DescriptorIndex"] == port["ConnectionInfo"]["DeviceDescriptor"]["iProduct"]:
return string_desc["StringDescriptor"][0]["bString"]
return port["UsbDeviceProperties"].get("DeviceDesc") or port["DeviceInfoNode"].get("DeviceDescName")
def get_hub_type(port):
return shared.USBDeviceSpeeds(port["HubInfoEx"]["HubType"])
# def merge_companions(controllers):
# controllers = copy.deepcopy(controllers)
# for controller in controllers:
# for port in controller["ports"]:
# if port["companion_info"]["port"]:
# companion_hub = [i for i in controllers if i["hub_name"] == port["companion_info"]["hub"]][0]
# companion_port = [i for i in companion_hub["ports"] if i["index"] == port["companion_info"]["port"]][0]
# companion_port["companion_info"]["port"] = 0
# port["companion_info"]["port"] = companion_port
# for controller in controllers:
# for port in list(controller["ports"]):
# if port["companion_info"]["port"]:
# companion_hub = [i for i in controllers if i["hub_name"] == port["companion_info"]["hub"]][0]
# companion_port = [i for i in companion_hub["ports"] if i["index"] == port["companion_info"]["port"]["index"]][0]
# companion_hub["ports"].remove(companion_port)
# return controllers
def get_hub_by_name(name):
return hub_map.get(name)
def get_companion_port(port):
return [i for i in hub_map[port["companion_info"]["hub"]]["ports"] if i["index"] == port["companion_info"]["port"]][0]
def guess_ports():
for hub in hub_map:
for port in hub_map[hub]["ports"]:
if not port["status"].endswith("DeviceConnected"):
# we don't have info. anything else is going to error
port["guessed"] = None
elif port["type_c"] or port["companion_info"]["port"] and get_companion_port(port)["type_c"]:
port["guessed"] = shared.USBPhysicalPortTypes.USB3TypeC_WithSwitch
elif not port["user_connectable"]:
port["guessed"] = shared.USBPhysicalPortTypes.Internal
elif (
port["class"] == shared.USBDeviceSpeeds.SuperSpeed
and port["companion_info"]["port"]
and get_companion_port(port)["class"] == shared.USBDeviceSpeeds.HighSpeed
or port["class"] == shared.USBDeviceSpeeds.HighSpeed
and port["companion_info"]["port"]
and get_companion_port(port)["class"] == shared.USBDeviceSpeeds.SuperSpeed
):
port["guessed"] = shared.USBPhysicalPortTypes.USB3TypeA
elif port["class"] == shared.USBDeviceSpeeds.SuperSpeed and not port["companion_info"]["port"]:
port["guessed"] = shared.USBPhysicalPortTypes.Internal
else:
port["guessed"] = shared.USBPhysicalPortTypes.USBTypeA
def serialize_hub(hub):
hub_info = {
"hub_name": hub["HubName"],
# "class": get_hub_type(hub),
"port_count": hub["HubInfo"]["HubInformation"]["HubDescriptor"]["bNumberOfPorts"],
# "highest_port_number": hub["HubInfoEx"]["HighestPortNumber"],
"ports": [],
}
# HubPorts
hub_ports = hub["HubPorts"]
for i, port in enumerate(hub_ports):
if not port:
continue
port_info = {
"index": (port.get("PortConnectorProps") or {}).get("ConnectionIndex")
or (port.get("ConnectionInfo") or {}).get("ConnectionIndex")
or (port.get("ConnectionInfoV2") or {}).get("ConnectionIndex")
or i + 1,
"comment": None,
"class": shared.USBDeviceSpeeds.Unknown,
"status": port["ConnectionInfo"]["ConnectionStatus"],
"type": None,
"guessed": None,
"devices": [],
}
port_info["name"] = f"Port {port_info['index']}"
if not port_info["status"].endswith("DeviceConnected"):
# shared.debug(f"Device connected to port {port_info['index']} errored. Please unplug or connect a different device.")
port_info["devices"] = [{"error": True}]
hub_info["ports"].append(port_info)
continue
port_info["class"] = get_port_type(port)
port_info["companion_info"] = {
"port": port["PortConnectorProps"]["CompanionPortNumber"],
"hub": port["PortConnectorProps"]["CompanionHubSymbolicLinkName"],
"multiple_companions": bool(port["PortConnectorProps"]["UsbPortProperties"]["PortHasMultipleCompanions"]),
}
port_info["type_c"] = bool(port["PortConnectorProps"]["UsbPortProperties"]["PortConnectorIsTypeC"])
port_info["user_connectable"] = bool(port["PortConnectorProps"]["UsbPortProperties"]["PortIsUserConnectable"])
# Guess port type
if port["ConnectionInfo"]["ConnectionStatus"] == "DeviceConnected":
device_info = {"name": get_device_name(port), "instance_id": port["UsbDeviceProperties"]["DeviceId"], "devices": []}
if port["DeviceInfoType"] == "ExternalHubInfo":
external_hub = serialize_hub(port)
device_info["speed"] = get_device_speed_string(port, external_hub["port_count"])
device_info["devices"] = [i for i in itertools.chain.from_iterable([hub_port["devices"] for hub_port in external_hub["ports"]]) if i]
# device_info["hub_type"] = get_hub_type(port)
# device_info["hub"] = serialize_hub(port)
else:
device_info["speed"] = get_device_speed_string(port)
port_info["devices"].append(device_info)
hub_info["ports"].append(port_info)
hub_info["ports"].sort(key=itemgetter("index"))
hub_map[hub_info["hub_name"]] = hub_info
return hub_info
def get_controllers():
new_info = []
usbdump_path = Path("resources/usbdump.exe")
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
usbdump_path = Path(sys._MEIPASS) / usbdump_path
info = json.loads(subprocess.run(usbdump_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode())
# info = json.load(Path("/Users/dhinak/Downloads/obummer.txt").open())
for controller in info:
# root
controller_info = {
"name": controller["UsbDeviceProperties"]["DeviceDesc"],
"identifiers": {
"instance_id": controller["UsbDeviceProperties"]["DeviceId"],
# "revision": controller["Revision"],
},
# "port_count_no3": controller["ControllerInfo"]["NumberOfRootPorts"],
"class": "",
} | serialize_hub(controller["RootHub"])
if all(controller[i] not in [0, int("0xFFFF", 16)] for i in ["VendorID", "DeviceID"]):
controller_info["identifiers"]["pci_id"] = [hex(controller[i])[2:] for i in ["VendorID", "DeviceID"]]
if controller["SubSysID"] not in [0, int("0xFFFFFFFF", 16)]:
controller_info["identifiers"]["pci_id"] += [hex(controller["SubSysID"])[2:6], hex(controller["SubSysID"])[6:]]
if controller["ControllerInfo"]["PciRevision"] not in [0, int("0xFF", 16)]:
controller_info["identifiers"]["pci_revision"] = int(controller["ControllerInfo"]["PciRevision"])
if controller["BusDeviceFunctionValid"]:
controller_info["identifiers"]["bdf"] = [controller["BusNumber"], controller["BusDevice"], controller["BusFunction"]]
new_info.append(controller_info)
guess_ports()
if False:
for hub in hub_map:
for port in hub_map[hub]["ports"]:
if port["companion_info"]["hub"]:
port["companion_info"]["hub"] = hub_map[port["companion_info"]["hub"]]
return new_info
import datetime
import json
import os
import sys
import time
from sys import exit
from typing import Callable, Optional, Union
import ansiescapes
if os.name == "nt":
# Windows
import msvcrt
else:
# Not Windows \o/
import select
class Utils:
def __init__(self, name="Python Script"):
self.name = name
# Init our colors before we need to print anything
cwd = os.getcwd()
os.chdir(os.path.dirname(os.path.realpath(__file__)))
if os.path.exists("colors.json"):
self.colors_dict = json.load(open("colors.json"))
else:
self.colors_dict = {}
os.chdir(cwd)
def grab(self, prompt, **kwargs):
# Takes a prompt, a default, and a timeout and shows it with that timeout
# returning the result
timeout = kwargs.get("timeout", 0)
default = kwargs.get("default", None)
# If we don't have a timeout - then skip the timed sections
if timeout <= 0:
return input(prompt)
# Write our prompt
sys.stdout.write(prompt)
sys.stdout.flush()
if os.name == "nt":
start_time = time.time()
i = ""
while True:
if msvcrt.kbhit():
c = msvcrt.getche()
if ord(c) == 13: # enter_key
break
elif ord(c) >= 32: # space_char
i += c.decode("utf-8")
if len(i) == 0 and (time.time() - start_time) > timeout:
break
else:
i, o, e = select.select([sys.stdin], [], [], timeout)
if i:
i = sys.stdin.readline().strip()
print("") # needed to move to next line
if len(i) > 0:
return i
else:
return default
def cls(self):
os.system("cls" if os.name == "nt" else "clear")
# Header drawing method
def head(self, text=None, width=55):
if text == None:
text = self.name
self.cls()
print(" {}".format("#" * width))
mid_len = int(round(width / 2 - len(text) / 2) - 2)
middle = " #{}{}{}#".format(" " * mid_len, text, " " * ((width - mid_len - len(text)) - 2))
if len(middle) > width + 1:
# Get the difference
di = len(middle) - width
# Add the padding for the ...#
di += 3
# Trim the string
middle = middle[:-di] + "...#"
print(middle)
print("#" * width)
def custom_quit(self):
self.head()
print("by DhinakG")
print("with code from Corp's USBMap\n")
print("Thanks for testing it out!\n")
# Get the time and wish them a good morning, afternoon, evening, and night
hr = datetime.datetime.now().time().hour
if hr > 3 and hr < 12:
print("Have a nice morning!\n\n")
elif hr >= 12 and hr < 17:
print("Have a nice afternoon!\n\n")
elif hr >= 17 and hr < 21:
print("Have a nice evening!\n\n")
else:
print("Have a nice night!\n\n")
exit(0)
def cls():
os.system("cls" if os.name == "nt" else "clear")
def header(text, width=55):
cls()
print(" {}".format("#" * width))
mid_len = int(round(width / 2 - len(text) / 2) - 2)
middle = " #{}{}{}#".format(" " * mid_len, text, " " * ((width - mid_len - len(text)) - 2))
if len(middle) > width + 1:
# Get the difference
di = len(middle) - width
# Add the padding for the ...#
di += 3
# Trim the string
middle = middle[:-di] + "...#"
print(middle)
print("#" * width)
class TUIMenu:
EXIT_MENU = object()
def __init__(
self,
title: str,
prompt: str,
return_number: bool = False,
add_quit: bool = True,
auto_number: bool = False,
in_between: Optional[Union[list, Callable]] = None,
top_level: bool = False,
loop: bool = False,
):
self.title = title
self.prompt = prompt
self.in_between = in_between or []
self.options = []
self.return_number = return_number
self.auto_number = auto_number
self.add_quit = add_quit
self.top_level = top_level
self.loop = loop
self.add_quit = add_quit
self.return_option = (["Q", "Quit"] if self.top_level else ["B", "Back"]) if self.add_quit else None
def add_menu_option(self, name: Union[str, Callable], description: Optional[list[str]] = None, function: Optional[Callable] = None, key: Optional[str] = None):
if not key and not self.auto_number:
raise TypeError("Key must be specified if auto_number is false")
self.options.append([key, name, description or [], function])
def head(self):
cls()
header(self.title)
print()
def print_options(self):
for index, option in enumerate(self.options):
if self.auto_number:
option[0] = str((index + 1))
print(option[0] + ". " + (option[1]() if callable(option[1]) else option[1]))
for i in option[2]:
print(" " + i)
if self.add_quit:
print(f"{self.return_option[0]}. {self.return_option[1]}")
print()
def select(self):
print(ansiescapes.cursorSavePosition, end="")
print(ansiescapes.eraseDown, end="")
selected = input(self.prompt)
keys = [option[0].upper() for option in self.options]
if self.add_quit:
keys += [self.return_option[0]]
while not selected or selected.upper() not in keys:
nl_count = self.prompt.count("\n") + selected.count("\n") + 1
selected = input(f"{ansiescapes.cursorRestorePosition}{ansiescapes.eraseDown}{self.prompt}")
if self.add_quit and selected.upper() == self.return_option[0]:
return self.EXIT_MENU
elif self.return_number:
return self.options[keys.index(selected.upper())][0]
else:
return self.options[keys.index(selected.upper())][3]() if self.options[keys.index(selected.upper())][3] else None
def start(self):
while True:
self.head()
if callable(self.in_between):
self.in_between()
print()
elif self.in_between:
for i in self.in_between:
print(i)
print()
self.print_options()
result = self.select()
if result is self.EXIT_MENU:
return self.EXIT_MENU
elif not self.loop:
return result
class TUIOnlyPrint:
def __init__(self, title, prompt, in_between=None):
self.title = title
self.prompt = prompt
self.in_between = in_between or []
def start(self):
cls()
header(self.title)
print()
if callable(self.in_between):
self.in_between()
else:
for i in self.in_between:
print(i)
if self.in_between:
print()
return input(self.prompt)
Windows.py 0 → 100644
import copy
import time
from enum import Enum
from pathlib import Path
import wmi # pylint: disable=import-error
from base import BaseUSBMap
from Scripts import shared, usbdump
class PnpDeviceProperties(Enum):
ACPI_PATH = "DEVPKEY_Device_BiosDeviceName"
DRIVER_KEY = "DEVPKEY_Device_Driver"
LOCATION_PATHS = "DEVPKEY_Device_LocationPaths"
FRIENDLY_NAME = "DEVPKEY_Device_FriendlyName"
BUS_DEVICE_FUNCTION = "DEVPKEY_Device_LocationInfo"
BUS_REPORTED_NAME = "DEVPKEY_Device_BusReportedDeviceDesc"
BUS_RELATIONS = "DEVPKEY_Device_BusRelations"
INTERFACE = "DEVPKEY_PciDevice_ProgIf"
SERVICE = "DEVPKEY_Device_Service"
class WindowsUSBMap(BaseUSBMap):
def __init__(self):
self.usbdump = None
self.wmi = wmi.WMI()
# self.wmi_path = Path(input("WMI json path: "))
# self.wmi: dict = json.load(self.wmi_path.open())
self.wmi_cache = {}
self.wmi_retries = {}
super().__init__()
def update_usbdump(self):
self.usbdump = usbdump.get_controllers()
def get_property_from_wmi(self, instance_id, prop: PnpDeviceProperties):
MAX_TRIES = 2
result = None
if self.wmi_cache.get(instance_id, {}).get(prop):
return self.wmi_cache[instance_id][prop]
elif self.wmi_retries.get(instance_id, {}).get(prop, 0) >= MAX_TRIES:
return None
try:
result = self.wmi.query(f"SELECT * FROM Win32_PnPEntity WHERE PNPDeviceID = '{instance_id}'")[0].GetDeviceProperties([prop.value])[0][0].Data
except IndexError:
# Race condition between unplug detected in usbdump and WMI
return None
except AttributeError:
if not self.wmi_retries.get(instance_id):
self.wmi_retries[instance_id] = {prop: 1}
elif not self.wmi_retries[instance_id].get(prop):
self.wmi_retries[instance_id][prop] = 1
else:
self.wmi_retries[instance_id][prop] += 1
return None
if not self.wmi_cache.get(instance_id):
self.wmi_cache[instance_id] = {prop: result}
else:
self.wmi_cache[instance_id][prop] = result
return result
""" def get_property_from_wmi(self, instance_id, prop: PnpDeviceProperties):
value = self.wmi.setdefault(instance_id, {}).get(prop.value)
if value:
return value
else:
value = input(f"Enter value for {instance_id} {prop}: ")
self.wmi[instance_id][prop.value] = value
json.dump(self.wmi, self.wmi_path.open("w"), indent=4, sort_keys=True)
return value """
def get_name_from_wmi(self, device):
if not isinstance(device, dict):
return
if device.get("error"):
return
device["name"] = self.get_property_from_wmi(device["instance_id"], PnpDeviceProperties.BUS_REPORTED_NAME) or device["name"]
for i in device["devices"]:
self.get_name_from_wmi(i)
def get_controller_class(self, controller):
interface = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.INTERFACE)
if interface:
return shared.USBControllerTypes(interface)
service = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.SERVICE)
if service.lower() == "usbxhci":
return shared.USBControllerTypes.XHCI
elif service.lower() == "usbehci":
return shared.USBControllerTypes.EHCI
elif service.lower() == "usbohci":
return shared.USBControllerTypes.OHCI
elif service.lower() == "usbuhci":
return shared.USBControllerTypes.UHCI
else:
shared.debug(f"Unknown controller type for interface {interface} and service {service}!")
return shared.USBControllerTypes.Unknown
def get_controllers(self):
# self.update_usbdump()
for i in range(10):
try:
# time_it(self.update_usbdump, "USBdump time")
self.update_usbdump()
break
except Exception as e:
if i == 10:
raise
else:
shared.debug(e)
time.sleep(2)
controllers = self.usbdump
for controller in controllers:
controller["name"] = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.FRIENDLY_NAME) or controller["name"]
controller["class"] = self.get_controller_class(controller)
acpi_path = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.ACPI_PATH)
if acpi_path:
controller["identifiers"]["acpi_path"] = acpi_path
driver_key = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.DRIVER_KEY)
if driver_key:
controller["identifiers"]["driver_key"] = driver_key
location_paths = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.LOCATION_PATHS)
if location_paths:
controller["identifiers"]["location_paths"] = location_paths
# controller["identifiers"]["bdf"] = self.get_property_from_wmi(controller["identifiers"]["instance_id"], PnpDeviceProperties.BUS_DEVICE_FUNCTION)
for port in controller["ports"]:
for device in port["devices"]:
self.get_name_from_wmi(device)
self.controllers = controllers
if not self.controllers_historical:
self.controllers_historical = copy.deepcopy(self.controllers)
else:
self.merge_controllers(self.controllers_historical, self.controllers)
def update_devices(self):
self.get_controllers()
e = WindowsUSBMap()
base.py 0 → 100644
This diff is collapsed.
import json
import subprocess
import sys
import time
from enum import Enum
from pathlib import Path
import win32com.client
import wmi
c = wmi.WMI()
class PnpDeviceProperties(Enum):
ACPI_PATH = "DEVPKEY_Device_BiosDeviceName"
DRIVER_KEY = "DEVPKEY_Device_Driver"
LOCATION_PATHS = "DEVPKEY_Device_LocationPaths"
FRIENDLY_NAME = "DEVPKEY_Device_FriendlyName"
BUS_DEVICE_FUNCTION = "DEVPKEY_Device_LocationInfo"
BUS_REPORTED_NAME = "DEVPKEY_Device_BusReportedDeviceDesc"
BUS_RELATIONS = "DEVPKEY_Device_BusRelations"
def get_property_from_wmi(instance_id, prop: PnpDeviceProperties):
try:
return c.query(f"SELECT * FROM Win32_PnPEntity WHERE PNPDeviceID = '{instance_id}'")[0].GetDeviceProperties([prop.value])[0][0].Data
except Exception:
return None
def build_dict(instance):
pnp = c.query(f"SELECT * FROM Win32_PnPEntity WHERE PNPDeviceID = '{instance}'")[0].GetDeviceProperties()[0]
d = {}
for i in pnp:
if isinstance(i.Data, win32com.client.CDispatch):
# Probably useless
d[i.KeyName] = "Removed for garbage"
else:
d[i.KeyName] = i.Data
return d
controllers = []
all_devices = {}
start = time.time()
for e in c.Win32_USBController():
controller = build_dict(e.PNPDeviceID)
all_devices[e.PNPDeviceID] = dict(controller)
controller["root_hub"] = build_dict(controller[PnpDeviceProperties.BUS_RELATIONS.value][0])
all_devices[controller[PnpDeviceProperties.BUS_RELATIONS.value][0]] = dict(controller["root_hub"])
controller["root_hub"]["devices"] = {}
for f in controller["root_hub"].get(PnpDeviceProperties.BUS_RELATIONS.value, []):
controller["root_hub"]["devices"][f] = build_dict(f)
all_devices[f] = dict(controller["root_hub"]["devices"][f])
controllers.append(controller)
end = time.time()
controllers.append({"duration": end - start})
usbdump_path = Path("resources/usbdump.exe")
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
usbdump_path = Path(sys._MEIPASS) / usbdump_path
usbdump = json.loads(subprocess.run(usbdump_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode())
print(json.dumps({"wmitest": all_devices, "usbdump": usbdump}, sort_keys=True))
macOS.py 0 → 100644
import binascii
import copy
from enum import Enum
from operator import itemgetter
from Scripts import iokit, shared
from base import BaseUSBMap
# from gui import *
class IOEntryProperties(Enum):
NAME = "IORegistryEntryName"
CHILDREN = "IORegistryEntryChildren"
CLASS = "IOClass"
OBJECT_CLASS = "IOObjectClass"
ADDRESS = "IOChildIndex"
def hexswap(input_hex: str) -> str:
hex_pairs = [input_hex[i : i + 2] for i in range(0, len(input_hex), 2)]
hex_rev = hex_pairs[::-1]
hex_str = "".join(["".join(x) for x in hex_rev])
return hex_str.upper()
def read_property(input_value: bytes, places: int) -> str:
return binascii.hexlify(input_value).decode()[:-places]
class macOSUSBMap(BaseUSBMap):
@staticmethod
def port_class_to_type(speed):
if "AppleUSB30XHCIPort" in speed:
return shared.USBDeviceSpeeds.SuperSpeed
elif set(["AppleUSB20XHCIPort", "AppleUSBEHCIPort"]) & set(speed):
return shared.USBDeviceSpeeds.HighSpeed
elif set(["AppleUSBOHCIPort", "AppleUSBUHCIPort"]) & set(speed):
return shared.USBDeviceSpeeds.FullSpeed
else:
shared.debug(f"Unknown port type for {speed}!")
return shared.USBDeviceSpeeds.Unknown
@staticmethod
def controller_class_to_type(parent_props, controller_props, inheritance):
# Check class code
if "class-code" in parent_props:
return shared.USBControllerTypes(parent_props["class-code"][0])
# Check class type
elif "AppleUSBXHCI" in inheritance:
return shared.USBControllerTypes.XHCI
elif "AppleUSBEHCI" in inheritance:
return shared.USBControllerTypes.EHCI
elif "AppleUSBOHCI" in inheritance:
return shared.USBControllerTypes.OHCI
elif "AppleUSBUHCI" in inheritance:
return shared.USBControllerTypes.UHCI
else:
shared.debug(f"Unknown controller type for class code {read_property(parent_props['class-code'], 2)}, inheritance {inheritance}!")
return shared.USBControllerTypes.Unknown
def get_controllers(self):
controllers = []
err, controller_iterator = iokit.IOServiceGetMatchingServices(iokit.kIOMasterPortDefault, iokit.IOServiceMatching("AppleUSBHostController".encode()), None)
for controller_instance in iokit.ioiterator_to_list(controller_iterator):
controller_properties: dict = iokit.corefoundation_to_native(iokit.IORegistryEntryCreateCFProperties(controller_instance, None, iokit.kCFAllocatorDefault, iokit.kNilOptions)[1]) # type: ignore
err, parent_device = iokit.IORegistryEntryGetParentEntry(controller_instance, "IOService".encode(), None)
parent_properties: dict = iokit.corefoundation_to_native(iokit.IORegistryEntryCreateCFProperties(parent_device, None, iokit.kCFAllocatorDefault, iokit.kNilOptions)[1]) # type: ignore
controller = {
"name": iokit.io_name_t_to_str(iokit.IORegistryEntryGetName(parent_device, None)[1]),
# "class": macOSUSBMap.port_class_to_type(iokit.get_class_inheritance(controller_instance)),
"identifiers": {"location_id": controller_properties["locationID"], "path": iokit.IORegistryEntryCopyPath(controller_instance, "IOService".encode())},
"ports": [],
}
if set(["vendor-id", "device-id"]) & set(parent_properties.keys()):
controller["identifiers"]["pci_id"] = [hexswap(read_property(parent_properties[i], 4)).lower() for i in ["vendor-id", "device-id"]]
if set(["subsystem-vendor-id", "subsystem-id"]) & set(parent_properties.keys()):
controller["identifiers"]["pci_id"] += [hexswap(read_property(parent_properties[i], 4)).lower() for i in ["subsystem-vendor-id", "subsystem-id"]]
if "revision-id" in parent_properties:
controller["identifiers"]["pci_revision"] = int(hexswap(read_property(parent_properties.get("revision-id", b""), 6)), 16)
if "acpi-path" in parent_properties:
controller["identifiers"]["acpi_path"] = "\\" + ".".join([i.split("@")[0] for i in parent_properties["acpi-path"].split("/")[1:]])
if "pcidebug" in parent_properties:
controller["identifiers"]["bdf"] = [int(i) for i in parent_properties["pcidebug"].split(":", 3)[:3]]
if "bus-number" in parent_properties:
# TODO: Properly figure out max value
controller["identifiers"]["bus_number"] = int(hexswap(read_property(parent_properties["bus-number"], 6)), 16)
controller["class"] = self.controller_class_to_type(parent_properties, controller_properties, iokit.get_class_inheritance(controller_instance))
err, port_iterator = iokit.IORegistryEntryGetChildIterator(controller_instance, "IOService".encode(), None)
for port in iokit.ioiterator_to_list(port_iterator):
port_properties: dict = iokit.corefoundation_to_native(iokit.IORegistryEntryCreateCFProperties(port, None, iokit.kCFAllocatorDefault, iokit.kNilOptions)[1]) # type: ignore
controller["ports"].append(
{
"name": iokit.io_name_t_to_str(iokit.IORegistryEntryGetName(port, None)[1]),
"comment": None,
"index": int(read_property(port_properties["port"], 6), 16),
"class": macOSUSBMap.port_class_to_type(iokit.get_class_inheritance(port)),
"type": None,
"guessed": shared.USBPhysicalPortTypes.USB3TypeC_WithSwitch
if set(iokit.get_class_inheritance(port)) & set(["AppleUSB20XHCITypeCPort", "AppleUSB30XHCITypeCPort"])
else port_properties.get("UsbConnector"),
"location_id": port_properties["locationID"],
"devices": [],
}
)
iokit.IOObjectRelease(port)
controllers.append(controller)
iokit.IOObjectRelease(controller_instance)
iokit.IOObjectRelease(parent_device)
self.controllers = controllers
if not self.controllers_historical:
self.controllers_historical = copy.deepcopy(self.controllers)
else:
self.merge_controllers(self.controllers_historical, self.controllers)
self.update_devices()
def recurse_devices(self, iterator):
props = []
iokit.IORegistryIteratorEnterEntry(iterator)
device = iokit.IOIteratorNext(iterator)
while device:
props.append(
{
"name": iokit.io_name_t_to_str(iokit.IORegistryEntryGetName(device, None)[1]),
"port": iokit.IORegistryEntryCreateCFProperty(device, "PortNum", iokit.kCFAllocatorDefault, iokit.kNilOptions),
"location_id": iokit.IORegistryEntryCreateCFProperty(device, "locationID", iokit.kCFAllocatorDefault, iokit.kNilOptions),
"speed": shared.USBDeviceSpeeds(iokit.IORegistryEntryCreateCFProperty(device, "Device Speed", iokit.kCFAllocatorDefault, iokit.kNilOptions)), # type: ignore
"devices": self.recurse_devices(iterator),
}
)
iokit.IOObjectRelease(device)
device = iokit.IOIteratorNext(iterator)
iokit.IORegistryIteratorExitEntry(iterator)
props.sort(key=itemgetter("name"))
return props
def update_devices(self):
# Reset devices
for controller in self.controllers:
for port in controller["ports"]:
port["devices"] = []
err, usb_plane_iterator = iokit.IORegistryCreateIterator(iokit.kIOMasterPortDefault, "IOUSB".encode(), 0, None)
controller_instance = iokit.IOIteratorNext(usb_plane_iterator)
while controller_instance:
location_id = iokit.corefoundation_to_native(iokit.IORegistryEntryCreateCFProperty(controller_instance, "locationID", iokit.kCFAllocatorDefault, iokit.kNilOptions))
controller = [i for i in self.controllers if i["identifiers"]["location_id"] == location_id][0]
# This is gonna be a controller
devices = self.recurse_devices(usb_plane_iterator)
for port in controller["ports"]:
port["devices"] = [i for i in devices if i["port"] == port["index"] or i["location_id"] == port["location_id"]]
iokit.IOObjectRelease(controller_instance)
controller_instance = iokit.IOIteratorNext(usb_plane_iterator)
iokit.IOObjectRelease(usb_plane_iterator)
self.merge_devices(self.controllers_historical, self.controllers)
e = macOSUSBMap()
# Base requirements
ansiescapes @ git+https://github.com/embedded-dev/ansiescapes.git
termcolor2
# Windows requirements
wmi; platform_system == "Windows"
# macOS requirements
pyobjc; platform_system == "Darwin"
# Debugging
# debugpy
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>CFBundleDevelopmentRegion</key>
<string>English</string>
<key>CFBundleGetInfoString</key>
<string>v1.1</string>
<key>CFBundleIdentifier</key>
<string>com.dhinakg.USBToolBox.map</string>
<key>CFBundleInfoDictionaryVersion</key>
<string>6.0</string>
<key>CFBundleName</key>
<string>UTBMap</string>
<key>CFBundlePackageType</key>
<string>KEXT</string>
<key>CFBundleShortVersionString</key>
<string>1.1</string>
<key>CFBundleSignature</key>
<string>????</string>
<key>CFBundleVersion</key>
<string>1.1</string>
<key>IOKitPersonalities</key>
<dict>
</dict>
<key>OSBundleRequired</key>
<string>Root</string>
</dict>
</plist>
File added
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(['../Windows.py'],
pathex=['Scripts'],
binaries=[],
datas=[('../Scripts', 'Scripts'), ('../resources', 'resources')],
hiddenimports=['msvcrt', 'win32com', 'win32api', 'wmi'],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='Windows',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True)
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(['../Windows.py'],
pathex=['Scripts'],
binaries=[],
datas=[('../Scripts', 'Scripts'), ('../resources', 'resources')],
hiddenimports=['msvcrt', 'win32com', 'win32api', 'wmi'],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
[],
exclude_binaries=True,
name='Windows',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
console=True )
coll = COLLECT(exe,
a.binaries,
a.zipfiles,
a.datas,
strip=False,
upx=True,
upx_exclude=[],
name='Windows')
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(['../debug_dump.py'],
pathex=[],
binaries=[],
datas=[('../resources', 'resources')],
hiddenimports=[],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='debug_dump',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True )
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment