import time
import usb.core
import usb.util
+from RAPacker import *
MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
self.dev = None
self.rx_ep = None
self.tx_ep = None
+
self.find_device()
+ status_conn = self.inquire_connection()
+ if not status_conn:
+ self.confirm_connection()
def find_device(self):
self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
for config in self.dev:
intf = config[(1,0)]
- print(f'Found usb device {self.vendor_id}:{self.product_id}')
+ product_name = usb.util.get_string(self.dev, self.dev.iProduct)
+ print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
print("Found kernel driver, detaching ... ")
self.dev.detach_kernel_driver(intf.bInterfaceNumber)
self.tx_ep = ep
return True
- raise ValueError("Device does not have a serial interface")
+ raise ValueError("Device does not have a CDC interface")
- def establish_connection(self):
- for i in range(self.max_tries):
- try:
- self.tx_ep.write(bytes([0x00]), self.timeout_ms)
- ret = self.rx_ep.read(1, self.timeout_ms)
- if ret[0] == 0x00:
- print("Reply ACK received (0x00)")
- return True
- except usb.core.USBError as e:
- print(f"Timeout: retry #{i}", e)
- return False
+
+ def inquire_connection(self):
+ packed = pack_pkt(INQ_CMD, "")
+ self.send_data(packed)
+ info = self.recv_data(7)
+ if info == bytearray(b'\x00') or info == bytearray(b''):
+ return False
+ msg = unpack_pkt(info)
+ #print("Connection already established")
+ return True
def confirm_connection(self):
for i in range(self.max_tries):
return False
return True
- def recv_data(self, exp_len):
+ def recv_data(self, exp_len, timeout=100):
msg = bytearray(b'')
if (exp_len > MAX_TRANSFER_SIZE):
raise ValueError(f"length package {exp_len} over max transfer size")
try:
received = 0
while received != exp_len:
- buf = self.rx_ep.read(exp_len, self.timeout_ms)
+ buf = self.rx_ep.read(exp_len, timeout)
msg += buf
received += len(buf)
if received == exp_len: