From 313bddb2feaf2cfd9cd952eee62ed18512ec35c3 Mon Sep 17 00:00:00 2001 From: Robin Krens Date: Sun, 18 Feb 2024 23:29:14 +0100 Subject: [PATCH] RAConnect: use serial backend instead of pyusb --- raflash/RAConnect.py | 66 +++++++++++++++++++++++----------------------------- 1 file changed, 29 insertions(+), 37 deletions(-) diff --git a/raflash/RAConnect.py b/raflash/RAConnect.py index 881e516..182442c 100644 --- a/raflash/RAConnect.py +++ b/raflash/RAConnect.py @@ -17,8 +17,9 @@ import sys import time +import serial import usb.core -import usb.util +import serial.tools.list_ports from raflash.RAPacker import * MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer @@ -28,13 +29,9 @@ class RAConnect: def __init__(self, vendor_id, product_id): self.vendor_id = vendor_id self.product_id = product_id - self.ep_in = 0x81 - self.ep_out = 0x02 self.max_tries = 20 self.timeout_ms = 100 self.dev = None - self.rx_ep = None - self.tx_ep = None self.chip_layout = [] self.sel_area = 0 # default to Area 0 @@ -42,35 +39,30 @@ class RAConnect: status_conn = self.inquire_connection() if not status_conn: self.confirm_connection() + + def find_port(self): + ports = serial.tools.list_ports.comports() + for p in ports: + if p.vid == self.vendor_id: + return p.device - def find_device(self): - self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) - if self.dev is None: - raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") + return None - for config in self.dev: - intf = config[(1, 0)] - product_name = usb.util.get_string(self.dev, self.dev.iProduct) - print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') - if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): - print("Found kernel driver, detaching ... ") - self.dev.detach_kernel_driver(intf.bInterfaceNumber) - for ep in intf: - if (ep.bmAttributes == 0x02): - if ep.bEndpointAddress == self.ep_in: - self.rx_ep = ep - elif ep.bEndpointAddress == self.ep_out: - self.tx_ep = ep - return True - - raise ValueError("Device does not have a CDC interface") + def find_device(self): + port = self.find_port() + try: + self.dev = serial.Serial(port, 9600) + except Exception as err: + raise Exception(f'Failed to open port {err}') def inquire_connection(self): packed = pack_pkt(INQ_CMD, "") self.send_data(packed) - info = self.recv_data(7) + info = self.recv_data(1) if info == bytearray(b'\x00') or info == bytearray(b''): return False + else: + info += self.recv_data(6) msg = unpack_pkt(info) # print("Connection already established") return True @@ -78,12 +70,12 @@ class RAConnect: def confirm_connection(self): for i in range(self.max_tries): try: - self.tx_ep.write(bytes([0x55]), self.timeout_ms) - ret = self.rx_ep.read(1, self.timeout_ms) + self.dev.write(bytes([0x55])) + ret = self.dev.read(1) if ret[0] == 0xC3: print("Reply received (0xC3)") return True - except usb.core.USBError as e: + except Exception as e: print(f"Timeout: retry #{i}", e) return False @@ -96,12 +88,12 @@ class RAConnect: self.chip_layout = cfg def send_data(self, packed_data): - if self.tx_ep is None: + if self.dev is None: return False try: - self.tx_ep.write(packed_data, self.timeout_ms) - except usb.core.USBError as e: - print("Timeout: error", e) + self.dev.write(packed_data) + except Exception as err: + print("Timeout: error", err) return False return True @@ -109,16 +101,16 @@ class RAConnect: msg = bytearray(b'') if exp_len > MAX_TRANSFER_SIZE: raise ValueError(f"length package {exp_len} over max transfer size") - if self.rx_ep is None: + if self.dev is None: return False try: received = 0 while received != exp_len: - buf = self.rx_ep.read(exp_len, timeout) + buf = self.dev.read(exp_len) msg += buf received += len(buf) if received == exp_len: return msg - except usb.core.USBError as e: - print("Timeout: error", e) + except Exception as err: + print("Timeout: error", err) return msg -- 2.7.4