9 def __init__(self, vendor_id, product_id):
10 self.vendor_id = vendor_id
11 self.product_id = product_id
21 def find_device(self):
22 self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
24 raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
26 for config in self.dev:
31 #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)):
32 print("Found serial device with 0x0a | 0xFF")
33 if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
34 print("Found kernel driver, detaching ... ")
35 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
39 if (ep.bmAttributes == 0x02):
40 if ep.bEndpointAddress == self.ep_in:
43 elif ep.bEndpointAddress == self.ep_out:
48 raise ValueError("Device does not have a serial interface")
50 def establish_connection(self):
51 for i in range(self.max_tries):
53 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
54 ret = self.rx_ep.read(1, self.timeout_ms)
56 print("Reply ACK received (0x00)")
58 except usb.core.USBError as e:
59 print(f"Timeout: retry #{i}", e)
62 def confirm_connection(self):
63 for i in range(self.max_tries):
65 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
66 ret = self.rx_ep.read(1, self.timeout_ms)
68 print("Reply received (0xC3)")
70 except usb.core.USBError as e:
71 print(f"Timeout: retry #{i}", e)
74 def authenticate_connection(self):
75 raise Exception("Not implemented")
77 def send_data(self, packed_data):
78 if (self.tx_ep == None):
81 self.tx_ep.write(packed_data, self.timeout_ms)
82 except usb.core.USBError as e:
83 print(f"Timeout: error", e)
87 # packets are length 7, except for a read package
88 def recv_data(self, exp_len):
90 if (exp_len > MAX_TRANSFER_SIZE):
91 raise ValueError(f"length package {exp_len} over max transfer size")
92 if (self.rx_ep == None):
96 while received != exp_len:
97 buf = self.rx_ep.read(exp_len, self.timeout_ms)
101 if received == exp_len:
103 except usb.core.USBError as e:
104 print(f"Timeout: error", e)
109 #communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
111 #communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA')
112 #communicator.recv_data(7)
114 #if not communicator.establish_connection():
115 # print("Cannot connect")
118 #if not communicator.confirm_connection():
119 # print("Failed to confirm boot code")