X-Git-Url: https://robinkrens.nl/gitweb/?a=blobdiff_plain;f=src%2FRAConnect.py;h=8405dae5b1438924db1f58493f4c3e72fe423cc6;hb=5bc6b530d89045f25e246d471fbb7deb70e7f10c;hp=2241275615876e8431ef9ca2d6a28bb3f947813c;hpb=383ccd9f65c609dca06143236048c76595f22935;p=renesas-ra-flasher diff --git a/src/RAConnect.py b/src/RAConnect.py index 2241275..8405dae 100644 --- a/src/RAConnect.py +++ b/src/RAConnect.py @@ -2,8 +2,9 @@ import sys import time import usb.core import usb.util +from RAPacker import * -MAX_TRANSFER_SIZE = 64 +MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer class RAConnect: def __init__(self, vendor_id, product_id): @@ -16,7 +17,11 @@ class RAConnect: self.dev = None self.rx_ep = None self.tx_ep = None + self.find_device() + status_conn = self.inquire_connection() + if not status_conn: + self.confirm_connection() def find_device(self): self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) @@ -24,28 +29,32 @@ class RAConnect: raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") for config in self.dev: - #print(config) intf = config[(1,0)] - #for intf in config: - - #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)): - print("Found serial device with 0x0a | 0xFF") + product_name = usb.util.get_string(self.dev, self.dev.iProduct) + print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): print("Found kernel driver, detaching ... ") self.dev.detach_kernel_driver(intf.bInterfaceNumber) for ep in intf: - #print("=========") - #print(ep) if (ep.bmAttributes == 0x02): if ep.bEndpointAddress == self.ep_in: self.rx_ep = ep - print(ep) elif ep.bEndpointAddress == self.ep_out: self.tx_ep = ep - print(ep) return True - raise ValueError("Device does not have a serial interface") + raise ValueError("Device does not have a CDC interface") + + + def inquire_connection(self): + packed = pack_pkt(INQ_CMD, "") + self.send_data(packed) + info = self.recv_data(7) + if info == bytearray(b'\x00') or info == bytearray(b''): + return False + msg = unpack_pkt(info) + #print("Connection already established") + return True def establish_connection(self): for i in range(self.max_tries): @@ -84,8 +93,7 @@ class RAConnect: return False return True - # packets are length 7, except for a read package - def recv_data(self, exp_len): + def recv_data(self, exp_len, timeout=100): msg = bytearray(b'') if (exp_len > MAX_TRANSFER_SIZE): raise ValueError(f"length package {exp_len} over max transfer size") @@ -94,28 +102,11 @@ class RAConnect: try: received = 0 while received != exp_len: - buf = self.rx_ep.read(exp_len, self.timeout_ms) + buf = self.rx_ep.read(exp_len, timeout) msg += buf - print(buf, len(buf)) received += len(buf) if received == exp_len: return msg except usb.core.USBError as e: print(f"Timeout: error", e) - #return False return msg - - -#communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523) - -#communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA') -#communicator.recv_data(7) - -#if not communicator.establish_connection(): -# print("Cannot connect") -# sys.exit(0) -# -#if not communicator.confirm_connection(): -# print("Failed to confirm boot code") -# sys.exit(0) -