RAFlasher.py: initial hardware test
authorRobin Krens <robin@robinkrens.nl>
Tue, 13 Feb 2024 13:57:15 +0000 (14:57 +0100)
committerRobin Krens <robin@robinkrens.nl>
Tue, 13 Feb 2024 13:57:15 +0000 (14:57 +0100)
- set data header to 0x01 (fix)
- select second interface (first interface is not used) for endpoints
- repeatly read EP of usb device

src/RAConnect.py
src/RAFlasher.py
src/RAPacker.py

index fb2a334..2241275 100644 (file)
@@ -9,7 +9,7 @@ class RAConnect:
     def __init__(self, vendor_id, product_id):
         self.vendor_id = vendor_id
         self.product_id = product_id
-        self.ep_in = 0x82
+        self.ep_in = 0x81
         self.ep_out = 0x02
         self.max_tries = 20
         self.timeout_ms = 100
@@ -24,21 +24,26 @@ class RAConnect:
             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
 
         for config in self.dev:
-            for intf in config:
-                if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
-                    print("Found serial device with 0x02 | 0xFF")
-                    if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
-                        print("Found kernel driver, detaching ... ")
-                        self.dev.detach_kernel_driver(intf.bInterfaceNumber)
-                    for ep in intf:
-                        if (ep.bmAttributes == 0x02):
-                            if ep.bEndpointAddress == self.ep_in:
-                                self.rx_ep = ep
-                                print(ep)
-                            elif ep.bEndpointAddress == self.ep_out:
-                                self.tx_ep = ep
-                                print(ep)
-                    return True
+            #print(config)
+            intf = config[(1,0)]
+            #for intf in config:
+                
+                #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)):
+            print("Found serial device with 0x0a | 0xFF")
+            if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
+                print("Found kernel driver, detaching ... ")
+                self.dev.detach_kernel_driver(intf.bInterfaceNumber)
+            for ep in intf:
+                #print("=========")
+                #print(ep)
+                if (ep.bmAttributes == 0x02):
+                    if ep.bEndpointAddress == self.ep_in:
+                        self.rx_ep = ep
+                        print(ep)
+                    elif ep.bEndpointAddress == self.ep_out:
+                        self.tx_ep = ep
+                        print(ep)
+            return True
 
         raise ValueError("Device does not have a serial interface")
 
@@ -48,7 +53,7 @@ class RAConnect:
                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
                 ret = self.rx_ep.read(1, self.timeout_ms)
                 if ret[0] == 0x00:
-                    print("ACK received")
+                    print("Reply ACK received (0x00)")
                     return True
             except usb.core.USBError as e:
                 print(f"Timeout: retry #{i}", e)
@@ -60,7 +65,7 @@ class RAConnect:
                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
                 ret = self.rx_ep.read(1, self.timeout_ms)
                 if ret[0] == 0xC3:
-                    print("ACK received")
+                    print("Reply received (0xC3)")
                     return True
             except usb.core.USBError as e:
                 print(f"Timeout: retry #{i}", e)
@@ -81,15 +86,23 @@ class RAConnect:
 
     # packets are length 7, except for a read package
     def recv_data(self, exp_len):
+        msg = bytearray(b'')
         if (exp_len > MAX_TRANSFER_SIZE):
             raise ValueError(f"length package {exp_len} over max transfer size")
         if (self.rx_ep == None):
             return False
         try:
-            msg = self.rx_ep.read(exp_len, self.timeout_ms)
+            received = 0
+            while received != exp_len:
+                buf = self.rx_ep.read(exp_len, self.timeout_ms)
+                msg += buf
+                print(buf, len(buf))
+                received += len(buf)
+                if received == exp_len:
+                    return msg
         except usb.core.USBError as e:
             print(f"Timeout: error", e)
-            return False
+            #return False
         return msg
 
 
index c981b22..75a4561 100644 (file)
@@ -16,10 +16,12 @@ def get_dev_info(dev):
 
     packed = pack_pkt(SIG_CMD, "")
     dev.send_data(packed)
-    #info_ret = dev.recv_data(17)
-    info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test
-    fmt = '>IIIBBH'
-    _HEADER, SCI, RMB, NOA, TYP, BFV = struct.unpack(fmt, info)
+    print(packed)
+    info = dev.recv_data(18)
+    print(info, len(info))
+    #info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test
+    fmt = '>IIIBBHH'
+    _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info)
     print(f'Ver{BFV >> 8}.{BFV & 0xFF}')
 
 def verify_img(dev, img, start_addr, end_addr):
@@ -94,13 +96,15 @@ def main():
     args = parser.parse_args()
 
     if args.command == "write":
-        dev = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+        dev = RAConnect(vendor_id=0x045B, product_id=0x0261)
         print(args)
         write_img(dev, args.file_name, args.start_address, args.end_address, args.verify)
     elif args.command == "read":
         print('read command')
     elif args.command == "info":
-        dev = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+        dev = RAConnect(vendor_id=0x045B, product_id=0x0261)
+        dev.establish_connection()
+        dev.confirm_connection()
         get_dev_info(dev)
     else:
         parser.print_help()
index a72970d..676a296 100644 (file)
@@ -98,7 +98,7 @@ def pack_command(cmd, data):
 
 # format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX]
 def pack_pkt(res, data):
-    SOD = 0x81
+    SOD = 0x01 # TODO: check if 0x81 header needed
     if (len(data) > 1024):
         raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)')
     LNH, LNL, SUM = calc_sum(int(res), data)