7 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
10 def __init__(self, vendor_id, product_id):
11 self.vendor_id = vendor_id
12 self.product_id = product_id
22 status_conn = self.inquire_connection()
24 self.confirm_connection()
26 def find_device(self):
27 self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
29 raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
31 for config in self.dev:
33 product_name = usb.util.get_string(self.dev, self.dev.iProduct)
34 print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
35 if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
36 print("Found kernel driver, detaching ... ")
37 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
39 if (ep.bmAttributes == 0x02):
40 if ep.bEndpointAddress == self.ep_in:
42 elif ep.bEndpointAddress == self.ep_out:
46 raise ValueError("Device does not have a CDC interface")
49 def inquire_connection(self):
50 packed = pack_pkt(INQ_CMD, "")
51 self.send_data(packed)
52 info = self.recv_data(7)
53 if info == bytearray(b'\x00') or info == bytearray(b''):
55 msg = unpack_pkt(info)
56 #print("Connection already established")
59 def establish_connection(self):
60 for i in range(self.max_tries):
62 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
63 ret = self.rx_ep.read(1, self.timeout_ms)
65 print("Reply ACK received (0x00)")
67 except usb.core.USBError as e:
68 print(f"Timeout: retry #{i}", e)
71 def confirm_connection(self):
72 for i in range(self.max_tries):
74 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
75 ret = self.rx_ep.read(1, self.timeout_ms)
77 print("Reply received (0xC3)")
79 except usb.core.USBError as e:
80 print(f"Timeout: retry #{i}", e)
83 def authenticate_connection(self):
84 raise Exception("Not implemented")
86 def send_data(self, packed_data):
87 if (self.tx_ep == None):
90 self.tx_ep.write(packed_data, self.timeout_ms)
91 except usb.core.USBError as e:
92 print(f"Timeout: error", e)
96 def recv_data(self, exp_len, timeout=100):
98 if (exp_len > MAX_TRANSFER_SIZE):
99 raise ValueError(f"length package {exp_len} over max transfer size")
100 if (self.rx_ep == None):
104 while received != exp_len:
105 buf = self.rx_ep.read(exp_len, timeout)
108 if received == exp_len:
110 except usb.core.USBError as e:
111 print(f"Timeout: error", e)