X-Git-Url: https://robinkrens.nl/gitweb/?a=blobdiff_plain;f=src%2FRAConnect.py;h=d6d454a73f763daeaf0bd2c94951660c502ec660;hb=11b2c60dee7b715d8ee69e5076547d6a7a499c17;hp=15a54a475620c76691e89d3f237286ed943c4770;hpb=f969a6fb10665bcaf1432b56234e60197c26a54a;p=renesas-ra-flasher diff --git a/src/RAConnect.py b/src/RAConnect.py index 15a54a4..d6d454a 100644 --- a/src/RAConnect.py +++ b/src/RAConnect.py @@ -2,19 +2,28 @@ import sys import time import usb.core import usb.util +from RAPacker import * + +MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer class RAConnect: def __init__(self, vendor_id, product_id): self.vendor_id = vendor_id self.product_id = product_id - self.ep_in = 0x82 + self.ep_in = 0x81 self.ep_out = 0x02 self.max_tries = 20 self.timeout_ms = 100 self.dev = None self.rx_ep = None self.tx_ep = None + self.chip_layout = [] + self.sel_area = 0 # default to Area 0 + self.find_device() + status_conn = self.inquire_connection() + if not status_conn: + self.confirm_connection() def find_device(self): self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) @@ -22,35 +31,32 @@ class RAConnect: raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") for config in self.dev: - for intf in config: - if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)): - print("Found serial device with 0x02 | 0xFF") - if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): - print("Found kernel driver, detaching ... ") - self.dev.detach_kernel_driver(intf.bInterfaceNumber) - for ep in intf: - if (ep.bmAttributes == 0x02): - if ep.bEndpointAddress == self.ep_in: - self.rx_ep = ep - print(ep) - elif ep.bEndpointAddress == self.ep_out: - self.tx_ep = ep - print(ep) - return True + intf = config[(1,0)] + product_name = usb.util.get_string(self.dev, self.dev.iProduct) + print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') + if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): + print("Found kernel driver, detaching ... ") + self.dev.detach_kernel_driver(intf.bInterfaceNumber) + for ep in intf: + if (ep.bmAttributes == 0x02): + if ep.bEndpointAddress == self.ep_in: + self.rx_ep = ep + elif ep.bEndpointAddress == self.ep_out: + self.tx_ep = ep + return True - raise ValueError("Device does not have a serial interface") + raise ValueError("Device does not have a CDC interface") - def establish_connection(self): - for i in range(self.max_tries): - try: - self.tx_ep.write(bytes([0x00]), self.timeout_ms) - ret = self.rx_ep.read(1, self.timeout_ms) - if ret[0] == 0x00: - print("ACK received") - return True - except usb.core.USBError as e: - print(f"Timeout: retry #{i}", e) - return False + + def inquire_connection(self): + packed = pack_pkt(INQ_CMD, "") + self.send_data(packed) + info = self.recv_data(7) + if info == bytearray(b'\x00') or info == bytearray(b''): + return False + msg = unpack_pkt(info) + #print("Connection already established") + return True def confirm_connection(self): for i in range(self.max_tries): @@ -58,20 +64,44 @@ class RAConnect: self.tx_ep.write(bytes([0x55]), self.timeout_ms) ret = self.rx_ep.read(1, self.timeout_ms) if ret[0] == 0xC3: - print("ACK received") + print("Reply received (0xC3)") return True except usb.core.USBError as e: print(f"Timeout: retry #{i}", e) return False -# Usage -communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523) + def authenticate_connection(self): + raise Exception("Not implemented") -if not communicator.establish_connection(): - print("Cannot connect") - sys.exit(0) + def set_chip_layout(self, cfg): + if cfg == None: + raise ValueError("Could net get chip layout") + self.chip_layout = cfg -if not communicator.confirm_connection(): - print("Failed to confirm boot code") - sys.exit(0) + def send_data(self, packed_data): + if (self.tx_ep == None): + return False + try: + self.tx_ep.write(packed_data, self.timeout_ms) + except usb.core.USBError as e: + print(f"Timeout: error", e) + return False + return True + def recv_data(self, exp_len, timeout=100): + msg = bytearray(b'') + if (exp_len > MAX_TRANSFER_SIZE): + raise ValueError(f"length package {exp_len} over max transfer size") + if (self.rx_ep == None): + return False + try: + received = 0 + while received != exp_len: + buf = self.rx_ep.read(exp_len, timeout) + msg += buf + received += len(buf) + if received == exp_len: + return msg + except usb.core.USBError as e: + print(f"Timeout: error", e) + return msg