I would like to add a keyboard detection for Linux to my existing Keyboard Detector for Windows. So I used pyudev to create a LinuxKeyboardDetector.
The script can be started and the graphical user interface appears, but unfortunately the keyboard detection does not recognize anything and does not report any error.
I suspect that there is a problem with multithreading using QRunnable.
Code
import sys
from datetime import datetime
import platform
from PyQt5. QtCore import QObject, QRunnable, QThreadPool, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow, QTableWidget, QTableWidgetItem, QHeaderView
current_platform = platform.system()
if current_platform == "Windows":
import pythoncom
import wmi
elif current_platform == "Linux":
import pyudev
from pyudev.pyqt5 import MonitorObserver
def create_keyboard_detector():
keyboard_detector = None
if current_platform == "Windows":
keyboard_detector = WindowsKeyboardDetector()
elif current_platform == "Linux":
keyboard_detector = LinuxKeyboardDetector()
return keyboard_detector
class KeyboardDetectorSignals(QObject):
keyboard_changed = pyqtSignal(str)
class WindowsKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
def run(self):
pythoncom.CoInitialize()
device_connected_wql = "SELECT * FROM __InstanceCreationEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
device_disconnected_wql = "SELECT * FROM __InstanceDeletionEvent WITHIN 2 WHERE TargetInstance ISA \'Win32_Keyboard\'"
c = wmi.WMI()
connected_watcher = c.watch_for(raw_wql=device_connected_wql)
disconnected_watcher = c.watch_for(raw_wql=device_disconnected_wql)
while True:
try:
connected = connected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if connected:
self.signals.keyboard_changed.emit("Keyboard connected.")
try:
disconnected = disconnected_watcher(timeout_ms=10)
except wmi.x_wmi_timed_out:
pass
else:
if disconnected:
self.signals.keyboard_changed.emit("Keyboard disconnected.")
class LinuxKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
self.observer = MonitorObserver(self.monitor)
def run(self):
self.monitor.filter_by(subsystem="usb", device_type="usb_device")
self.observer.deviceEvent.connect(self.process_device_event)
self.monitor.start()
def process_device_event(self, device):
if device['ID_INPUT_KEYBOARD'] == '1':
if device.action == "add":
self.signals.keyboard_changed.emit("Keyboard connected.")
if device.action == "remove":
self.signals.keyboard_changed.emit("Keyboard disconnected.")
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setGeometry(100, 100, 500, 500)
self.setWindowTitle("Keyboard Logger")
self.log_table = QTableWidget()
self.log_table.setColumnCount(2)
self.log_table.setShowGrid(True)
self.log_table.setHorizontalHeaderLabels(["Time", "Event"])
self.log_table.horizontalHeader().setStretchLastSection(True)
self.log_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeToContents)
self.setCentralWidget(self.log_table)
self.show()
self.threadpool = QThreadPool()
keyboard_detector = create_keyboard_detector()
keyboard_detector.signals.keyboard_changed.connect(self.add_row)
self.threadpool.start(keyboard_detector)
def add_row(self, event: str):
now = datetime.now()
datetime_string = now.strftime("%Y-%m-%d %H:%M:%S")
row_count = self.log_table.rowCount()
self.log_table.insertRow(row_count)
self.log_table.setItem(row_count, 0, QTableWidgetItem(datetime_string))
self.log_table.setItem(row_count, 1, QTableWidgetItem(event))
def main():
app = QApplication(sys.argv)
window = MainWindow()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Edit 1: Update LinuxKeyboardDetector class to use basic pyudev.MonitorObserver, instead of the dedicated pyqt version.
class LinuxKeyboardDetector(QRunnable):
def __init__(self):
super().__init__()
self.signals = KeyboardDetectorSignals()
self.context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(self.context)
# self.observer = MonitorObserver(self.monitor)
self.observer = pyudev.MonitorObserver(self.monitor, self.process_device_event)
def run(self):
self.monitor.filter_by(subsystem="usb", device_type="usb_device")
# self.observer.deviceEvent.connect(self.process_device_event)
# self.monitor.start()
self.observer.start()
def process_device_event(self, device):
if device['ID_INPUT_KEYBOARD'] == '1':
if device.action == "add":
self.signals.keyboard_changed.emit("Keyboard connected.")
if device.action == "remove":
self.signals.keyboard_changed.emit("Keyboard disconnected.")
Result 1: The following error message appears when a USB keyboard is plugged in or off.
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 532, in run
self._callback(device)
File "/home/ata/source/venv/lib/python3.6/site-packages/pyudev/monitor.py", line 508, in <lambda>
callback = lambda d: event_handler(d.action, d)
TypeError: process_device_event() takes 2 positional arguments but 3 were given