RAConnect: use serial backend instead of pyusb
authorRobin Krens <robin@robinkrens.nl>
Sun, 18 Feb 2024 22:29:14 +0000 (23:29 +0100)
committerRobin Krens <robin@robinkrens.nl>
Sun, 18 Feb 2024 22:29:14 +0000 (23:29 +0100)
raflash/RAConnect.py

index 881e516..182442c 100644 (file)
@@ -17,8 +17,9 @@
 
 import sys
 import time
+import serial
 import usb.core
-import usb.util
+import serial.tools.list_ports
 from raflash.RAPacker import *
 
 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
@@ -28,13 +29,9 @@ class RAConnect:
     def __init__(self, vendor_id, product_id):
         self.vendor_id = vendor_id
         self.product_id = product_id
-        self.ep_in = 0x81
-        self.ep_out = 0x02
         self.max_tries = 20
         self.timeout_ms = 100
         self.dev = None
-        self.rx_ep = None
-        self.tx_ep = None
         self.chip_layout = []
         self.sel_area = 0 # default to Area 0
 
@@ -42,35 +39,30 @@ class RAConnect:
         status_conn = self.inquire_connection()
         if not status_conn:
             self.confirm_connection()
+    
+    def find_port(self):
+        ports = serial.tools.list_ports.comports()
+        for p in ports:
+            if p.vid == self.vendor_id:
+                return p.device
 
-    def find_device(self):
-        self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
-        if self.dev is None:
-            raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
+        return None
 
-        for config in self.dev:
-            intf = config[(1, 0)]
-            product_name = usb.util.get_string(self.dev, self.dev.iProduct)
-            print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
-            if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
-                print("Found kernel driver, detaching ... ")
-                self.dev.detach_kernel_driver(intf.bInterfaceNumber)
-            for ep in intf:
-                if (ep.bmAttributes == 0x02):
-                    if ep.bEndpointAddress == self.ep_in:
-                        self.rx_ep = ep
-                    elif ep.bEndpointAddress == self.ep_out:
-                        self.tx_ep = ep
-            return True
-
-        raise ValueError("Device does not have a CDC interface")
+    def find_device(self):
+        port = self.find_port()
+        try:
+            self.dev = serial.Serial(port, 9600)
+        except Exception as err:
+            raise Exception(f'Failed to open port {err}')
 
     def inquire_connection(self):
         packed = pack_pkt(INQ_CMD, "")
         self.send_data(packed)
-        info = self.recv_data(7)
+        info = self.recv_data(1)
         if info == bytearray(b'\x00') or info == bytearray(b''):
             return False
+        else:
+            info += self.recv_data(6)
         msg = unpack_pkt(info)
         # print("Connection already established")
         return True
@@ -78,12 +70,12 @@ class RAConnect:
     def confirm_connection(self):
         for i in range(self.max_tries):
             try:
-                self.tx_ep.write(bytes([0x55]), self.timeout_ms)
-                ret = self.rx_ep.read(1, self.timeout_ms)
+                self.dev.write(bytes([0x55]))
+                ret = self.dev.read(1)
                 if ret[0] == 0xC3:
                     print("Reply received (0xC3)")
                     return True
-            except usb.core.USBError as e:
+            except Exception as e:
                 print(f"Timeout: retry #{i}", e)
         return False
 
@@ -96,12 +88,12 @@ class RAConnect:
         self.chip_layout = cfg
 
     def send_data(self, packed_data):
-        if self.tx_ep is None:
+        if self.dev is None:
             return False
         try:
-            self.tx_ep.write(packed_data, self.timeout_ms)
-        except usb.core.USBError as e:
-            print("Timeout: error", e)
+            self.dev.write(packed_data)
+        except Exception as err:
+            print("Timeout: error", err)
             return False
         return True
 
@@ -109,16 +101,16 @@ class RAConnect:
         msg = bytearray(b'')
         if exp_len > MAX_TRANSFER_SIZE:
             raise ValueError(f"length package {exp_len} over max transfer size")
-        if self.rx_ep is None:
+        if self.dev is None:
             return False
         try:
             received = 0
             while received != exp_len:
-                buf = self.rx_ep.read(exp_len, timeout)
+                buf = self.dev.read(exp_len)
                 msg += buf
                 received += len(buf)
                 if received == exp_len:
                     return msg
-        except usb.core.USBError as e:
-            print("Timeout: error", e)
+        except Exception as err:
+            print("Timeout: error", err)
         return msg