6 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
9 def __init__(self, vendor_id, product_id):
10 self.vendor_id = vendor_id
11 self.product_id = product_id
21 def find_device(self):
22 self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
24 raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
26 for config in self.dev:
28 print(f'Found usb device {self.vendor_id}:{self.product_id}')
29 if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
30 print("Found kernel driver, detaching ... ")
31 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
33 if (ep.bmAttributes == 0x02):
34 if ep.bEndpointAddress == self.ep_in:
36 elif ep.bEndpointAddress == self.ep_out:
40 raise ValueError("Device does not have a serial interface")
42 def establish_connection(self):
43 for i in range(self.max_tries):
45 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
46 ret = self.rx_ep.read(1, self.timeout_ms)
48 print("Reply ACK received (0x00)")
50 except usb.core.USBError as e:
51 print(f"Timeout: retry #{i}", e)
54 def confirm_connection(self):
55 for i in range(self.max_tries):
57 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
58 ret = self.rx_ep.read(1, self.timeout_ms)
60 print("Reply received (0xC3)")
62 except usb.core.USBError as e:
63 print(f"Timeout: retry #{i}", e)
66 def authenticate_connection(self):
67 raise Exception("Not implemented")
69 def send_data(self, packed_data):
70 if (self.tx_ep == None):
73 self.tx_ep.write(packed_data, self.timeout_ms)
74 except usb.core.USBError as e:
75 print(f"Timeout: error", e)
79 def recv_data(self, exp_len):
81 if (exp_len > MAX_TRANSFER_SIZE):
82 raise ValueError(f"length package {exp_len} over max transfer size")
83 if (self.rx_ep == None):
87 while received != exp_len:
88 buf = self.rx_ep.read(exp_len, self.timeout_ms)
91 if received == exp_len:
93 except usb.core.USBError as e:
94 print(f"Timeout: error", e)