RADevice.py: deleted, moved logic to RAConnect class
[renesas-ra-flasher] / src / RAConnect.py
index fb2a334..d6d454a 100644 (file)
@@ -2,21 +2,28 @@ import sys
 import time
 import usb.core
 import usb.util
+from RAPacker import *
 
-MAX_TRANSFER_SIZE = 64
+MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
 
 class RAConnect:
     def __init__(self, vendor_id, product_id):
         self.vendor_id = vendor_id
         self.product_id = product_id
-        self.ep_in = 0x82
+        self.ep_in = 0x81
         self.ep_out = 0x02
         self.max_tries = 20
         self.timeout_ms = 100
         self.dev = None
         self.rx_ep = None
         self.tx_ep = None
+        self.chip_layout = []
+        self.sel_area = 0 # default to Area 0
+
         self.find_device()
+        status_conn = self.inquire_connection()
+        if not status_conn:
+            self.confirm_connection()
 
     def find_device(self):
         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
@@ -24,35 +31,32 @@ class RAConnect:
             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
 
         for config in self.dev:
-            for intf in config:
-                if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
-                    print("Found serial device with 0x02 | 0xFF")
-                    if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
-                        print("Found kernel driver, detaching ... ")
-                        self.dev.detach_kernel_driver(intf.bInterfaceNumber)
-                    for ep in intf:
-                        if (ep.bmAttributes == 0x02):
-                            if ep.bEndpointAddress == self.ep_in:
-                                self.rx_ep = ep
-                                print(ep)
-                            elif ep.bEndpointAddress == self.ep_out:
-                                self.tx_ep = ep
-                                print(ep)
-                    return True
+            intf = config[(1,0)]
+            product_name = usb.util.get_string(self.dev, self.dev.iProduct)
+            print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
+            if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
+                print("Found kernel driver, detaching ... ")
+                self.dev.detach_kernel_driver(intf.bInterfaceNumber)
+            for ep in intf:
+                if (ep.bmAttributes == 0x02):
+                    if ep.bEndpointAddress == self.ep_in:
+                        self.rx_ep = ep
+                    elif ep.bEndpointAddress == self.ep_out:
+                        self.tx_ep = ep
+            return True
 
-        raise ValueError("Device does not have a serial interface")
+        raise ValueError("Device does not have a CDC interface")
 
-    def establish_connection(self):
-        for i in range(self.max_tries):
-            try:
-                self.tx_ep.write(bytes([0x00]), self.timeout_ms)
-                ret = self.rx_ep.read(1, self.timeout_ms)
-                if ret[0] == 0x00:
-                    print("ACK received")
-                    return True
-            except usb.core.USBError as e:
-                print(f"Timeout: retry #{i}", e)
-        return False
+
+    def inquire_connection(self):
+        packed = pack_pkt(INQ_CMD, "")
+        self.send_data(packed)
+        info = self.recv_data(7)
+        if info == bytearray(b'\x00') or info == bytearray(b''):
+            return False
+        msg = unpack_pkt(info)
+        #print("Connection already established")
+        return True
 
     def confirm_connection(self):
         for i in range(self.max_tries):
@@ -60,7 +64,7 @@ class RAConnect:
                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
                 ret = self.rx_ep.read(1, self.timeout_ms)
                 if ret[0] == 0xC3:
-                    print("ACK received")
+                    print("Reply received (0xC3)")
                     return True
             except usb.core.USBError as e:
                 print(f"Timeout: retry #{i}", e)
@@ -69,6 +73,11 @@ class RAConnect:
     def authenticate_connection(self):
         raise Exception("Not implemented")
 
+    def set_chip_layout(self, cfg):
+        if cfg == None:
+            raise ValueError("Could net get chip layout")
+        self.chip_layout = cfg
+
     def send_data(self, packed_data):
         if (self.tx_ep == None):
             return False
@@ -79,30 +88,20 @@ class RAConnect:
             return False
         return True
 
-    # packets are length 7, except for a read package
-    def recv_data(self, exp_len):
+    def recv_data(self, exp_len, timeout=100):
+        msg = bytearray(b'')
         if (exp_len > MAX_TRANSFER_SIZE):
             raise ValueError(f"length package {exp_len} over max transfer size")
         if (self.rx_ep == None):
             return False
         try:
-            msg = self.rx_ep.read(exp_len, self.timeout_ms)
+            received = 0
+            while received != exp_len:
+                buf = self.rx_ep.read(exp_len, timeout)
+                msg += buf
+                received += len(buf)
+                if received == exp_len:
+                    return msg
         except usb.core.USBError as e:
             print(f"Timeout: error", e)
-            return False
         return msg
-
-
-#communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
-
-#communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA')
-#communicator.recv_data(7)
-
-#if not communicator.establish_connection():
-#    print("Cannot connect")
-#    sys.exit(0)
-#
-#if not communicator.confirm_connection():
-#    print("Failed to confirm boot code")
-#    sys.exit(0)
-