#!/usr/bin/env python3 import time import enum import os import warnings import sys import binascii from contextlib import contextmanager, suppress, wraps import hashlib import secrets import struct import serial from cobs import cobs import uinput from noise.connection import NoiseConnection, Keypair from noise.exceptions import NoiseInvalidMessage import keymap from hexdump import hexdump BINDING_INCANTATION_LENGTH = 4 class PacketType(enum.Enum): _RESERVED = 0 INITIATE_HANDSHAKE = 1 HANDSHAKE = 2 DATA = 3 COMM_ERROR = 4 CRYPTO_ERROR = 5 TOO_MANY_FAILS = 6 PASSTHROUGH_REPORT = 7 class ReportType(enum.Enum): _RESERVED = 0 KEYBOARD = 1 MOUSE = 2 PAIRING_INPUT = 3 PAIRING_SUCCESS = 4 PAIRING_ERROR = 5 PAIRING_START = 6 class ProtocolError(Exception): pass class Packetizer: def __init__(self, serial, debug=False, width=16): self.ser, self.debug, self.width = serial, debug, width self.ser.write(b'\0') # COBS synchronization def send_packet(self, pkt_type, data): if self.debug: print(f'\033[93mSending {len(data)} bytes, packet type {pkt_type.name} ({pkt_type.value})\033[0m') hexdump(print, data, self.width) data = bytes([pkt_type.value]) + data encoded = cobs.encode(data) + b'\0' self.ser.write(encoded) self.ser.flushOutput() def receive_packet(self): packet = self.ser.read_until(b'\0') data = cobs.decode(packet[:-1]) pkt_type, data = PacketType(data[0]), data[1:] if self.debug: print(f'\033[93mReceived {len(data)} bytes, packet type {pkt_type.name} ({pkt_type.value})\033[0m') hexdump(print, data, self.width) if pkt_type is PacketType.COMM_ERROR: raise ProtocolError('Device-side serial communication error') elif pkt_type is PacketType.CRYPTO_ERROR: raise ProtocolError('Device-side cryptographic error') elif pkt_type is PacketType.TOO_MANY_FAILS: raise ProtocolError('Device reports too many failed handshake attempts') else: return pkt_type, data class KeyMapper: Keycode = enum.Enum('Keycode', start=0, names=''' KEY_NONE _RESERVED_0x01 _RESERVED_0x02 _RESERVED_0x03 KEY_A KEY_B KEY_C KEY_D KEY_E KEY_F KEY_G KEY_H KEY_I KEY_J KEY_K KEY_L KEY_M KEY_N KEY_O KEY_P KEY_Q KEY_R KEY_S KEY_T KEY_U KEY_V KEY_W KEY_X KEY_Y KEY_Z KEY_1 KEY_2 KEY_3 KEY_4 KEY_5 KEY_6 KEY_7 KEY_8 KEY_9 KEY_0 KEY_ENTER KEY_ESC KEY_BACKSPACE KEY_TAB KEY_SPACE KEY_MINUS KEY_EQUAL KEY_LEFTBRACE KEY_RIGHTBRACE KEY_BACKSLASH KEY_HASH KEY_SEMICOLON KEY_APOSTROPHE KEY_GRAVE KEY_COMMA KEY_DOT KEY_SLASH KEY_CAPSLOCK KEY_F1 KEY_F2 KEY_F3 KEY_F4 KEY_F5 KEY_F6 KEY_F7 KEY_F8 KEY_F9 KEY_F10 KEY_F11 KEY_F12 KEY_SYSRQ KEY_SCROLLLOCK KEY_PAUSE KEY_INSERT KEY_HOME KEY_PAGEUP KEY_DELETE KEY_END KEY_PAGEDOWN KEY_RIGHT KEY_LEFT KEY_DOWN KEY_UP KEY_NUMLOCK KEY_KPSLASH KEY_KPASTERISK KEY_KPMINUS KEY_KPPLUS KEY_KPENTER KEY_KP1 KEY_KP2 KEY_KP3 KEY_KP4 KEY_KP5 KEY_KP6 KEY_KP7 KEY_KP8 KEY_KP9 KEY_KP0 KEY_KPDOT KEY_102ND KEY_COMPOSE KEY_POWER KEY_KPEQUAL KEY_F13 KEY_F14 KEY_F15 KEY_F16 KEY_F17 KEY_F18 KEY_F19 KEY_F20 KEY_F21 KEY_F22 KEY_F23 KEY_F24 KEY_OPEN KEY_HELP KEY_PROPS KEY_FRONT KEY_STOP KEY_AGAIN KEY_UNDO KEY_CUT KEY_COPY KEY_PASTE KEY_FIND KEY_MUTE KEY_VOLUMEUP KEY_VOLUMEDOWN _RESERVED_0x82 _RESERVED_0x83 _RESERVED_0x84 KEY_KPCOMMA _RESERVED_0x86 KEY_RO KEY_KATAKANAHIRAGANA KEY_YEN KEY_HENKAN KEY_MUHENKAN KEY_KPJPCOMMA _RESERVED_0x8D _RESERVED_0x8E _RESERVED_0x8F KEY_HANGEUL KEY_HANJA KEY_KATAKANA KEY_HIRAGANA KEY_ZENKAKUHANKAKU _RESERVED_0x95 _RESERVED_0x96 _RESERVED_0x97 _RESERVED_0x98 _RESERVED_0x99 _RESERVED_0x9A _RESERVED_0x9B _RESERVED_0x9C _RESERVED_0x9D _RESERVED_0x9E _RESERVED_0x9F _RESERVED_0xA0 _RESERVED_0xA1 _RESERVED_0xA2 _RESERVED_0xA3 _RESERVED_0xA4 _RESERVED_0xA5 _RESERVED_0xA6 _RESERVED_0xA7 _RESERVED_0xA8 _RESERVED_0xA9 _RESERVED_0xAA _RESERVED_0xAB _RESERVED_0xAC _RESERVED_0xAD _RESERVED_0xAE _RESERVED_0xAF _RESERVED_0xB0 _RESERVED_0xB1 _RESERVED_0xB2 _RESERVED_0xB3 _RESERVED_0xB4 _RESERVED_0xB5 KEY_KPLEFTPAREN KEY_KPRIGHTPAREN _RESERVED_0xB8 _RESERVED_0xB9 _RESERVED_0xBA _RESERVED_0xBB _RESERVED_0xBC _RESERVED_0xBD _RESERVED_0xBE _RESERVED_0xBF _RESERVED_0xC0 _RESERVED_0xC1 _RESERVED_0xC2 _RESERVED_0xC3 _RESERVED_0xC4 _RESERVED_0xC5 _RESERVED_0xC6 _RESERVED_0xC7 _RESERVED_0xC8 _RESERVED_0xC9 _RESERVED_0xCA _RESERVED_0xCB _RESERVED_0xCC _RESERVED_0xCD _RESERVED_0xCE _RESERVED_0xCF _RESERVED_0xD0 _RESERVED_0xD1 _RESERVED_0xD2 _RESERVED_0xD3 _RESERVED_0xD4 _RESERVED_0xD5 _RESERVED_0xD6 _RESERVED_0xD7 _RESERVED_0xD8 _RESERVED_0xD9 _RESERVED_0xDA _RESERVED_0xDB _RESERVED_0xDC _RESERVED_0xDD _RESERVED_0xDE _RESERVED_0xDF _RESERVED_0xE0 _RESERVED_0xE1 _RESERVED_0xE2 _RESERVED_0xE3 _RESERVED_0xE4 _RESERVED_0xE5 _RESERVED_0xE6 _RESERVED_0xE7 _RESERVED_0xE8 _RESERVED_0xE9 _RESERVED_0xEA _RESERVED_0xEB _RESERVED_0xEC _RESERVED_0xED _RESERVED_0xEE _RESERVED_0xEF _RESERVED_0xF0 _RESERVED_0xF1 _RESERVED_0xF2 _RESERVED_0xF3 _RESERVED_0xF4 _RESERVED_0xF5 _RESERVED_0xF6 _RESERVED_0xF7 _RESERVED_0xF8 _RESERVED_0xF9 _RESERVED_0xFA _RESERVED_0xFB _RESERVED_0xFC _RESERVED_0xFD _RESERVED_0xFE _RESERVED_0xFF ''') MODIFIERS = [ uinput.ev.KEY_LEFTCTRL, uinput.ev.KEY_LEFTSHIFT, uinput.ev.KEY_LEFTALT, uinput.ev.KEY_LEFTMETA, uinput.ev.KEY_RIGHTCTRL, uinput.ev.KEY_RIGHTSHIFT, uinput.ev.KEY_RIGHTALT, uinput.ev.KEY_RIGHTMETA ] ALL_KEYS = [ v for k, v in uinput.ev.__dict__.items() if k.startswith('KEY_') ] REGULAR_MAP = { kc.value: getattr(uinput.ev, kc.name) for kc in Keycode if hasattr(uinput.ev, kc.name) } @classmethod def map_modifiers(kls, val): return [ mod for i, mod in enumerate(kls.MODIFIERS) if val & (1< 8: raise ValueError('Unsupported report length', report_len) _report_type, buttons, a, b, c, w, _2 = report x = ((b&0x0f)<<8) | a y = ((b&0xf0) >> 4) | (c<<4) if x >= 2048: x -= 4096 if y >= 2048: y -= 4096 if w >= 128: w -= 256 #print('got mouse report', binascii.hexlify(report), buttons, x, y, w) ui.emit(uinput.BTN_LEFT, bool(buttons&1), syn=False) ui.emit(uinput.BTN_MIDDLE, bool(buttons&4), syn=False) ui.emit(uinput.BTN_RIGHT, bool(buttons&2), syn=False) ui.emit(uinput.REL_X, x, syn=False) ui.emit(uinput.REL_Y, y, syn=False) ui.emit(uinput.REL_WHEEL, w, syn=False) ui.syn() if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('serial') parser.add_argument('baudrate') parser.add_argument('-w', '--width', type=int, default=16, help='Number of bytes to display in one line') parser.add_argument('-d', '--debug', action='store_true') parser.add_argument('-r', '--random-key', action='store_true', help='Use random key instead of persisted key') args = parser.parse_args() if args.random_key: host_key_private = NoiseEngine.generate_private_key_x25519() else: XDG_CONFIG_HOME = os.environ.get('XDG_CONFIG_HOME') or os.path.join(os.path.expandvars('$HOME'), '.config', 'secure_hid') if not os.path.isdir(XDG_CONFIG_HOME): os.mkdir(XDG_CONFIG_HOME) private_key_file = os.path.join(XDG_CONFIG_HOME, 'host_key.pem') if not os.path.isfile(private_key_file): with open(private_key_file, 'w') as f: f.write(binascii.hexlify(hexnoise.NoiseEngine.generate_private_key_x25519()).decode()) with open(private_key_file) as f: host_key_private = binascii.unhexlify(f.read()) ser = serial.Serial(args.serial, args.baudrate) packetizer = Packetizer(ser, debug=args.debug, width=args.width) noise = NoiseEngine(host_key_private, packetizer, debug=args.debug) noise.perform_handshake() if noise.connected and not noise.paired: print('Handshake channel binding incantation:') channel_binding_incantation = noise.channel_binding_incantation() print(channel_binding_incantation) def term_match(user_input, template): out = '' words_u = user_input.split(' ') words_t = [ w for w in template.split(' ') if w != 'and' ] for u_word in words_u: if 'and'.startswith(u_word) or not u_word: out += ' ' + u_word continue if not words_t: out += '\033[91m' continue t_word, *words_t = words_t if not t_word.startswith(u_word.strip(',')): out += ' \033[91m' + u_word else: out += ' ' + u_word return '\033[92m' + out[1:] + '\033[0m' for user_input in noise.pairing_messages(): if not args.debug: print('\033[2K\r', end='') matched = term_match(user_input, channel_binding_incantation) print('Pairing input:', matched, end='' if not args.debug else '\n', flush=True) print() print('Pairing success') elif noise.connected and noise.paired: print('Successfully re-connected') else: print('Could not connect') sys.exit(1) noise.uinput_passthrough()