RAFlasher.py: initial setp with write_img
authorRobin Krens <robin@robinkrens.nl>
Sun, 11 Feb 2024 13:29:29 +0000 (14:29 +0100)
committerRobin Krens <robin@robinkrens.nl>
Sun, 11 Feb 2024 13:30:00 +0000 (14:30 +0100)
src/Packer.py
src/RAConnect.py
src/RADevice.py [new file with mode: 0644]
src/RAFlasher.py [new file with mode: 0644]

index ef98a96..8480267 100644 (file)
@@ -69,6 +69,8 @@ def calc_sum(cmd, data):
     for i in range(data_len):
             if isinstance(data[i], str):
                 res += int(data[i], 16)
+            elif isinstance(data[i], int):
+                res += data[i]
             else:
                 res += ord(data[i])
     res = ~(res - 1) & 0xFF # two's complement
@@ -100,7 +102,10 @@ def pack_pkt(res, data):
     if (len(data) >= 1024):
         raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)')
     LNH, LNL, SUM = calc_sum(int(res), data)
-    DAT = bytes([int(x, 16) for x in data])
+    if not isinstance(data, bytes):
+        DAT = bytes([int(x, 16) for x in data])
+    else:
+        DAT = data
     RES = res
     ETX = 0x03
     fmt_header = '<BBBB'
@@ -132,7 +137,7 @@ def unpack_pkt(data):
     return message
     
 
-cmd = pack_command(INQ_CMD, "")
-cmd = pack_command(BAU_CMD, ['0x00','0x1E'])
-cmd = pack_command(IDA_CMD, TESTID)
+#cmd = pack_command(INQ_CMD, "")
+#cmd = pack_command(BAU_CMD, ['0x00','0x1E'])
+#cmd = pack_command(IDA_CMD, TESTID)
 
index 15a54a4..fb2a334 100644 (file)
@@ -3,6 +3,8 @@ import time
 import usb.core
 import usb.util
 
+MAX_TRANSFER_SIZE = 64
+
 class RAConnect:
     def __init__(self, vendor_id, product_id):
         self.vendor_id = vendor_id
@@ -64,14 +66,43 @@ class RAConnect:
                 print(f"Timeout: retry #{i}", e)
         return False
 
-# Usage
-communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+    def authenticate_connection(self):
+        raise Exception("Not implemented")
+
+    def send_data(self, packed_data):
+        if (self.tx_ep == None):
+            return False
+        try:
+            self.tx_ep.write(packed_data, self.timeout_ms)
+        except usb.core.USBError as e:
+            print(f"Timeout: error", e)
+            return False
+        return True
+
+    # packets are length 7, except for a read package
+    def recv_data(self, exp_len):
+        if (exp_len > MAX_TRANSFER_SIZE):
+            raise ValueError(f"length package {exp_len} over max transfer size")
+        if (self.rx_ep == None):
+            return False
+        try:
+            msg = self.rx_ep.read(exp_len, self.timeout_ms)
+        except usb.core.USBError as e:
+            print(f"Timeout: error", e)
+            return False
+        return msg
+
+
+#communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
 
-if not communicator.establish_connection():
-    print("Cannot connect")
-    sys.exit(0)
+#communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA')
+#communicator.recv_data(7)
 
-if not communicator.confirm_connection():
-    print("Failed to confirm boot code")
-    sys.exit(0)
+#if not communicator.establish_connection():
+#    print("Cannot connect")
+#    sys.exit(0)
+#
+#if not communicator.confirm_connection():
+#    print("Failed to confirm boot code")
+#    sys.exit(0)
 
diff --git a/src/RADevice.py b/src/RADevice.py
new file mode 100644 (file)
index 0000000..726878b
--- /dev/null
@@ -0,0 +1,21 @@
+from Packer import *
+
+class RADevice:
+    def __init__(self, comm, info):
+        self.SCI = 0
+        self.RMB = 0
+        self.NOA = 0x0
+        self.TYP = 0x0
+        self.BFV = 0x0000
+
+        if (comm == None):
+            return
+        self.get_info(info)
+
+    def get_info(self, info):
+        fmt = '>IIBBH'
+        self.SCI, self.RMB, self.NOA, self.TYP, self.BFV = struct.unpack(fmt, info)
+        print(f'Ver{self.BFV >> 8}.{self.BFV & 0xFF}')
+
+# test
+d = RADevice('a', b'\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08')
diff --git a/src/RAFlasher.py b/src/RAFlasher.py
new file mode 100644 (file)
index 0000000..70a6d98
--- /dev/null
@@ -0,0 +1,51 @@
+import os
+import math
+from RAConnect import *
+from Packer import *
+
+SECTOR_SIZE = 2048
+
+def verify_img(dev, img, start_addr, end_addr):
+    raise Exception("Not implemented")
+
+def write_img(dev, img, start_addr, end_addr, verify=False):
+
+    if os.path.exists(img):
+        file_size = os.path.getsize(img)
+    else:
+        raise Exception(f'file {img} does not exist')
+
+    # calculate / check start and end address 
+    if start_addr == None or end_addr == None:
+        if start_addr == None:
+            start_addr = 0
+        # align start addr
+        if start_addr % SECTOR_SIZE:
+            raise ValueError(f"start addr not aligned on sector size {SECTOR_SIZE}")
+        blocks = (file_size + SECTOR_SIZE - 1) // SECTOR_SIZE
+        end_addr = blocks * SECTOR_SIZE + start_addr
+        print(end_addr)
+    
+    chunk_size = 64 # max is 1024
+    if (start_addr > 0xFF800): # for RA4 series
+        raise ValueError("start address value error")
+    if (end_addr <= start_addr or end_addr > 0xFF800):
+        raise ValueError("end address value error")
+    with open(img, 'rb') as f:
+        chunk = f.read(chunk_size)
+        #while chunk:
+        #    chunk = f.read(chunk_size)
+        packed = pack_pkt(WRI_CMD, chunk)
+        print(f'Sending {len(chunk)} bytes')
+        dev.send_data(packed)
+        reply_len = 7
+        reply = dev.recv_data(reply_len)
+        reply = b'\x81\x00\x02\x00\x00\xFE\x03'
+        if not reply == False:
+            msg = unpack_pkt(reply)
+            print(msg)
+
+
+dev = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+write_img(dev, 'src/sample.bin', 0x800, None, True)
+