From 383ccd9f65c609dca06143236048c76595f22935 Mon Sep 17 00:00:00 2001 From: Robin Krens Date: Tue, 13 Feb 2024 14:57:15 +0100 Subject: [PATCH] RAFlasher.py: initial hardware test - set data header to 0x01 (fix) - select second interface (first interface is not used) for endpoints - repeatly read EP of usb device --- src/RAConnect.py | 53 +++++++++++++++++++++++++++++++++-------------------- src/RAFlasher.py | 16 ++++++++++------ src/RAPacker.py | 2 +- 3 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/RAConnect.py b/src/RAConnect.py index fb2a334..2241275 100644 --- a/src/RAConnect.py +++ b/src/RAConnect.py @@ -9,7 +9,7 @@ class RAConnect: def __init__(self, vendor_id, product_id): self.vendor_id = vendor_id self.product_id = product_id - self.ep_in = 0x82 + self.ep_in = 0x81 self.ep_out = 0x02 self.max_tries = 20 self.timeout_ms = 100 @@ -24,21 +24,26 @@ class RAConnect: raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") for config in self.dev: - for intf in config: - if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)): - print("Found serial device with 0x02 | 0xFF") - if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): - print("Found kernel driver, detaching ... ") - self.dev.detach_kernel_driver(intf.bInterfaceNumber) - for ep in intf: - if (ep.bmAttributes == 0x02): - if ep.bEndpointAddress == self.ep_in: - self.rx_ep = ep - print(ep) - elif ep.bEndpointAddress == self.ep_out: - self.tx_ep = ep - print(ep) - return True + #print(config) + intf = config[(1,0)] + #for intf in config: + + #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)): + print("Found serial device with 0x0a | 0xFF") + if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): + print("Found kernel driver, detaching ... ") + self.dev.detach_kernel_driver(intf.bInterfaceNumber) + for ep in intf: + #print("=========") + #print(ep) + if (ep.bmAttributes == 0x02): + if ep.bEndpointAddress == self.ep_in: + self.rx_ep = ep + print(ep) + elif ep.bEndpointAddress == self.ep_out: + self.tx_ep = ep + print(ep) + return True raise ValueError("Device does not have a serial interface") @@ -48,7 +53,7 @@ class RAConnect: self.tx_ep.write(bytes([0x00]), self.timeout_ms) ret = self.rx_ep.read(1, self.timeout_ms) if ret[0] == 0x00: - print("ACK received") + print("Reply ACK received (0x00)") return True except usb.core.USBError as e: print(f"Timeout: retry #{i}", e) @@ -60,7 +65,7 @@ class RAConnect: self.tx_ep.write(bytes([0x55]), self.timeout_ms) ret = self.rx_ep.read(1, self.timeout_ms) if ret[0] == 0xC3: - print("ACK received") + print("Reply received (0xC3)") return True except usb.core.USBError as e: print(f"Timeout: retry #{i}", e) @@ -81,15 +86,23 @@ class RAConnect: # packets are length 7, except for a read package def recv_data(self, exp_len): + msg = bytearray(b'') if (exp_len > MAX_TRANSFER_SIZE): raise ValueError(f"length package {exp_len} over max transfer size") if (self.rx_ep == None): return False try: - msg = self.rx_ep.read(exp_len, self.timeout_ms) + received = 0 + while received != exp_len: + buf = self.rx_ep.read(exp_len, self.timeout_ms) + msg += buf + print(buf, len(buf)) + received += len(buf) + if received == exp_len: + return msg except usb.core.USBError as e: print(f"Timeout: error", e) - return False + #return False return msg diff --git a/src/RAFlasher.py b/src/RAFlasher.py index c981b22..75a4561 100644 --- a/src/RAFlasher.py +++ b/src/RAFlasher.py @@ -16,10 +16,12 @@ def get_dev_info(dev): packed = pack_pkt(SIG_CMD, "") dev.send_data(packed) - #info_ret = dev.recv_data(17) - info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test - fmt = '>IIIBBH' - _HEADER, SCI, RMB, NOA, TYP, BFV = struct.unpack(fmt, info) + print(packed) + info = dev.recv_data(18) + print(info, len(info)) + #info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test + fmt = '>IIIBBHH' + _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info) print(f'Ver{BFV >> 8}.{BFV & 0xFF}') def verify_img(dev, img, start_addr, end_addr): @@ -94,13 +96,15 @@ def main(): args = parser.parse_args() if args.command == "write": - dev = RAConnect(vendor_id=0x1a86, product_id=0x7523) + dev = RAConnect(vendor_id=0x045B, product_id=0x0261) print(args) write_img(dev, args.file_name, args.start_address, args.end_address, args.verify) elif args.command == "read": print('read command') elif args.command == "info": - dev = RAConnect(vendor_id=0x1a86, product_id=0x7523) + dev = RAConnect(vendor_id=0x045B, product_id=0x0261) + dev.establish_connection() + dev.confirm_connection() get_dev_info(dev) else: parser.print_help() diff --git a/src/RAPacker.py b/src/RAPacker.py index a72970d..676a296 100644 --- a/src/RAPacker.py +++ b/src/RAPacker.py @@ -98,7 +98,7 @@ def pack_command(cmd, data): # format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX] def pack_pkt(res, data): - SOD = 0x81 + SOD = 0x01 # TODO: check if 0x81 header needed if (len(data) > 1024): raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)') LNH, LNL, SUM = calc_sum(int(res), data) -- 2.7.4