dir: some restructering and refactoring
authorRobin Krens <robin@robinkrens.nl>
Sat, 10 Feb 2024 17:37:45 +0000 (18:37 +0100)
committerRobin Krens <robin@robinkrens.nl>
Sat, 10 Feb 2024 17:37:56 +0000 (18:37 +0100)
- naming:
connect.py > RAConnect.py
flasher.py > Packer.py
- .gitignore to mostly ignore cache
- setup.py project info

.gitignore [new file with mode: 0644]
flasher/connect.py [deleted file]
flasher/flasher.py [deleted file]
setup.py
src/Packer.py [new file with mode: 0644]
src/RAConnect.py [new file with mode: 0644]
tests/test_parse.py

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..2925ae6
--- /dev/null
@@ -0,0 +1,6 @@
+flasher.egg-info/
+ra_flasher.egg-info/
+src/__pycache__/
+tests/__pycache__/
+.venv/
+
diff --git a/flasher/connect.py b/flasher/connect.py
deleted file mode 100644 (file)
index 91f21d5..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright (C) - Robin Krens - 2024
-# 
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-# 
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-# 
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-# 
-
-import sys
-import serial
-import serial.threaded
-import time
-import usb.core
-import usb.util
-
-LOW_PULSE = 0x00
-ACK = 0x00
-GENERIC_CODE = 0x55
-BOOT_CODE = 0xC3
-
-VENDOR_ID = 0x045B
-PRODUCT_ID = 0x0261
-
-EP_IN = 0x82
-EP_OUT = 0x02
-
-MAX_TRIES = 20
-
-def find_device(vendor_id, product_id):
-    dev = usb.core.find(idVendor=vendor_id, idProduct=product_id)
-    if dev is None:
-        raise ValueError(f"Device {vendor_id}:{product_id} not found\n Are you sure it is connected?")
-
-    for config in dev:
-        for intf in config:
-            if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
-                print(f"Found serial device with 0x02 | 0xFF")
-                TxD, RxD = None, None
-                if dev.is_kernel_driver_active(intf.bInterfaceNumber):
-                    print(f"Found kernel driver, detaching ... ")
-                    dev.detach_kernel_driver(intf.bInterfaceNumber)
-                for ep in intf:
-                    if (ep.bmAttributes == 0x02):
-                        if ep.bEndpointAddress == EP_IN:
-                            RxD = ep
-                            print(ep)
-                        elif ep.bEndpointAddress == EP_OUT:
-                            TxD = ep
-                            print(ep)
-                return (dev, RxD, TxD)
-
-    raise ValueError("Device does not have a serial interface")
-
-def establish_connection():
-    dev, rx_ep, tx_ep = find_device(0x1a86, 0x7523)
-    ret = []
-    for i in range(MAX_TRIES):
-        try:
-            tx_ep.write(b'\x00', 100)
-            ret = rx_ep.read(1, 100)
-            if a[0] == ACK:
-                print("ACK received")
-                return True
-        except:
-            print(f"Timeout: retry #", i)
-    return False
-
-def confirm_connection():
-    dev, rx_ep, tx_ep = find_device(0x1a86, 0x7523)
-    ret = []
-    for i in range(MAX_TRIES):
-        try:
-            tx_ep.write(b'\x55', 100)
-            ret = rx_ep.read(1, 100)
-            if a[0] == BOOT_CODE:
-                print("ACK received")
-                return True
-        except:
-            print(f"Timeout: retry #", i)
-    return False
-
-if not establish_connection():
-    print("Cannot connect")
-    sys.exit(0)
-
-if not confirm_connection():
-    print("Failed to confirm boot code")
-    sys.exit(0)
diff --git a/flasher/flasher.py b/flasher/flasher.py
deleted file mode 100644 (file)
index ef98a96..0000000
+++ /dev/null
@@ -1,138 +0,0 @@
-# Copyright (C) Robin Krens - 2024
-# 
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-# 
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-# 
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
-#
-
-import struct
-
-# Commands send to boot firmware
-INQ_CMD = 0x00
-ERA_CMD = 0x12
-WRI_CMD = 0x13
-REA_CMD = 0x15
-IDA_CMD = 0x30
-BAU_CMD = 0x34
-SIG_CMD = 0x3A
-ARE_CMD = 0x3B
-
-# These are combined with send command, for example
-# STATUS_OK | ERA_CMD == 0x12
-# STATUS_ERR | ERA_CMD = 0x92
-STATUS_OK = 0x00
-STATUS_ERR = 0x80
-
-# Error codes
-error_codes = {
-    0xC: "ERR_UNSU",
-    0xC1: "ERR_PCKT",
-    0xC2: "ERR_CHKS",
-    0xC3: "ERR_FLOW",
-    0xD0: "ERR_ADDR",
-    0xD4: "ERR_BAUD",
-    0xDA: "ERR_PROT",
-    0xDB: "ERR_ID",
-    0xDC: "ERR_SERI",
-    0xE1: "ERR_ERA",
-    0xE2: "ERR_WRI",
-    0xE7: "ERR_SEQ"
-}
-
-# used for init sequence
-LOW_PULSE = 0x00
-GENERIC_CODE = 0x55
-BOOT_CODE = 0xC3
-
-TESTID = [
-    "0xF0", "0xF1", "0xF2", "0xF3",
-    "0xE4", "0xE5", "0xE6", "0xE7",
-    "0xD8", "0xD9", "0xDA", "0xDB",
-    "0xCC", "0xCD", "0xCE", "0xCF"
-]
-
-def calc_sum(cmd, data):
-    data_len = len(data)
-    lnh = data_len + 1 & 0xFF00
-    lnl = data_len + 1 & 0x00FF
-    res = lnh + lnl + cmd
-    for i in range(data_len):
-            if isinstance(data[i], str):
-                res += int(data[i], 16)
-            else:
-                res += ord(data[i])
-    res = ~(res - 1) & 0xFF # two's complement
-    return (lnh, lnl, res)
-
-
-# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX]
-def pack_command(cmd, data):
-    SOD = 0x01
-    COM = cmd
-
-    if isinstance(data, str):
-        byte_data = bytes(data.encode('utf-8'))
-    else:
-        byte_data = bytes([int(x, 16) for x in data])
-    
-    LNH, LNL, SUM = calc_sum(int(cmd), data)
-    ETX = 0x03
-    fmt_header = '<BBBB'
-    fmt_footer = 'BB'
-    fmt = fmt_header + str(len(data)) + 's' + fmt_footer
-    pack = struct.pack(fmt, SOD, LNH, LNL, COM, byte_data, SUM, ETX)
-    print(fmt, pack, len(pack))
-    return fmt
-
-# format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX]
-def pack_pkt(res, data):
-    SOD = 0x81
-    if (len(data) >= 1024):
-        raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)')
-    LNH, LNL, SUM = calc_sum(int(res), data)
-    DAT = bytes([int(x, 16) for x in data])
-    RES = res
-    ETX = 0x03
-    fmt_header = '<BBBB'
-    fmt_footer = 'BB'
-    fmt = fmt_header + str(len(data)) + 's' + fmt_footer
-    pack = struct.pack(fmt, SOD, LNH, LNL, RES, DAT, SUM, ETX)
-    return pack
-
-# packet received from mcu 
-def unpack_pkt(data):
-    header = data[0:4]
-    fmt_header = '<BBBB'
-    SOD, LNH, LNL, RES = struct.unpack(fmt_header, header)
-    if (SOD != 0x81):
-        raise Exception(f'Wrong start of packet data received')
-    pkt_len = (LNH << 0x8 | LNL) - 1
-    fmt_message = '<' + str(pkt_len) + 's'
-    raw = struct.unpack_from(fmt_message, data, 4)[0]
-    message = ['0x{:02X}'.format(byte) for byte in raw]
-    if (RES & 0x80):
-        raise ValueError(f'MCU encountered error {message[0]}')
-    fmt_footer = '<BB'
-    SUM, ETX = struct.unpack_from(fmt_footer, data, 4 + pkt_len)
-    lnh, lnl, local_sum = calc_sum(RES, message)
-    if (SUM != local_sum):
-        raise Exception(f'Sum calculation mismatch, read {SUM} instead of {local_sum}')
-    if (ETX != 0x03):
-        raise Exception(f'Packet ETX error')
-    return message
-    
-
-cmd = pack_command(INQ_CMD, "")
-cmd = pack_command(BAU_CMD, ['0x00','0x1E'])
-cmd = pack_command(IDA_CMD, TESTID)
-
index 2b2880f..7ef5070 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -10,13 +10,13 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read()
 
 setup(
-    name = "flasher",
+    name = "ra-flasher",
     version = "0.0.1",
     author = "Robin Krens",
     description = ("An example of how to set up pytest"),
     license = "GNU",
-    keywords = "example pytest",
-    packages=['flasher', 'tests'],
+    keywords = "Renesas RA chipset flasher",
+    packages=['src', 'tests'],
     #long_description=read('README.md'),
     classifiers=[
         "Development Status :: 1",
diff --git a/src/Packer.py b/src/Packer.py
new file mode 100644 (file)
index 0000000..ef98a96
--- /dev/null
@@ -0,0 +1,138 @@
+# Copyright (C) Robin Krens - 2024
+# 
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+# 
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# 
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
+#
+
+import struct
+
+# Commands send to boot firmware
+INQ_CMD = 0x00
+ERA_CMD = 0x12
+WRI_CMD = 0x13
+REA_CMD = 0x15
+IDA_CMD = 0x30
+BAU_CMD = 0x34
+SIG_CMD = 0x3A
+ARE_CMD = 0x3B
+
+# These are combined with send command, for example
+# STATUS_OK | ERA_CMD == 0x12
+# STATUS_ERR | ERA_CMD = 0x92
+STATUS_OK = 0x00
+STATUS_ERR = 0x80
+
+# Error codes
+error_codes = {
+    0xC: "ERR_UNSU",
+    0xC1: "ERR_PCKT",
+    0xC2: "ERR_CHKS",
+    0xC3: "ERR_FLOW",
+    0xD0: "ERR_ADDR",
+    0xD4: "ERR_BAUD",
+    0xDA: "ERR_PROT",
+    0xDB: "ERR_ID",
+    0xDC: "ERR_SERI",
+    0xE1: "ERR_ERA",
+    0xE2: "ERR_WRI",
+    0xE7: "ERR_SEQ"
+}
+
+# used for init sequence
+LOW_PULSE = 0x00
+GENERIC_CODE = 0x55
+BOOT_CODE = 0xC3
+
+TESTID = [
+    "0xF0", "0xF1", "0xF2", "0xF3",
+    "0xE4", "0xE5", "0xE6", "0xE7",
+    "0xD8", "0xD9", "0xDA", "0xDB",
+    "0xCC", "0xCD", "0xCE", "0xCF"
+]
+
+def calc_sum(cmd, data):
+    data_len = len(data)
+    lnh = data_len + 1 & 0xFF00
+    lnl = data_len + 1 & 0x00FF
+    res = lnh + lnl + cmd
+    for i in range(data_len):
+            if isinstance(data[i], str):
+                res += int(data[i], 16)
+            else:
+                res += ord(data[i])
+    res = ~(res - 1) & 0xFF # two's complement
+    return (lnh, lnl, res)
+
+
+# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX]
+def pack_command(cmd, data):
+    SOD = 0x01
+    COM = cmd
+
+    if isinstance(data, str):
+        byte_data = bytes(data.encode('utf-8'))
+    else:
+        byte_data = bytes([int(x, 16) for x in data])
+    
+    LNH, LNL, SUM = calc_sum(int(cmd), data)
+    ETX = 0x03
+    fmt_header = '<BBBB'
+    fmt_footer = 'BB'
+    fmt = fmt_header + str(len(data)) + 's' + fmt_footer
+    pack = struct.pack(fmt, SOD, LNH, LNL, COM, byte_data, SUM, ETX)
+    print(fmt, pack, len(pack))
+    return fmt
+
+# format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX]
+def pack_pkt(res, data):
+    SOD = 0x81
+    if (len(data) >= 1024):
+        raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)')
+    LNH, LNL, SUM = calc_sum(int(res), data)
+    DAT = bytes([int(x, 16) for x in data])
+    RES = res
+    ETX = 0x03
+    fmt_header = '<BBBB'
+    fmt_footer = 'BB'
+    fmt = fmt_header + str(len(data)) + 's' + fmt_footer
+    pack = struct.pack(fmt, SOD, LNH, LNL, RES, DAT, SUM, ETX)
+    return pack
+
+# packet received from mcu 
+def unpack_pkt(data):
+    header = data[0:4]
+    fmt_header = '<BBBB'
+    SOD, LNH, LNL, RES = struct.unpack(fmt_header, header)
+    if (SOD != 0x81):
+        raise Exception(f'Wrong start of packet data received')
+    pkt_len = (LNH << 0x8 | LNL) - 1
+    fmt_message = '<' + str(pkt_len) + 's'
+    raw = struct.unpack_from(fmt_message, data, 4)[0]
+    message = ['0x{:02X}'.format(byte) for byte in raw]
+    if (RES & 0x80):
+        raise ValueError(f'MCU encountered error {message[0]}')
+    fmt_footer = '<BB'
+    SUM, ETX = struct.unpack_from(fmt_footer, data, 4 + pkt_len)
+    lnh, lnl, local_sum = calc_sum(RES, message)
+    if (SUM != local_sum):
+        raise Exception(f'Sum calculation mismatch, read {SUM} instead of {local_sum}')
+    if (ETX != 0x03):
+        raise Exception(f'Packet ETX error')
+    return message
+    
+
+cmd = pack_command(INQ_CMD, "")
+cmd = pack_command(BAU_CMD, ['0x00','0x1E'])
+cmd = pack_command(IDA_CMD, TESTID)
+
diff --git a/src/RAConnect.py b/src/RAConnect.py
new file mode 100644 (file)
index 0000000..15a54a4
--- /dev/null
@@ -0,0 +1,77 @@
+import sys
+import time
+import usb.core
+import usb.util
+
+class RAConnect:
+    def __init__(self, vendor_id, product_id):
+        self.vendor_id = vendor_id
+        self.product_id = product_id
+        self.ep_in = 0x82
+        self.ep_out = 0x02
+        self.max_tries = 20
+        self.timeout_ms = 100
+        self.dev = None
+        self.rx_ep = None
+        self.tx_ep = None
+        self.find_device()
+
+    def find_device(self):
+        self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
+        if self.dev is None:
+            raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
+
+        for config in self.dev:
+            for intf in config:
+                if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
+                    print("Found serial device with 0x02 | 0xFF")
+                    if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
+                        print("Found kernel driver, detaching ... ")
+                        self.dev.detach_kernel_driver(intf.bInterfaceNumber)
+                    for ep in intf:
+                        if (ep.bmAttributes == 0x02):
+                            if ep.bEndpointAddress == self.ep_in:
+                                self.rx_ep = ep
+                                print(ep)
+                            elif ep.bEndpointAddress == self.ep_out:
+                                self.tx_ep = ep
+                                print(ep)
+                    return True
+
+        raise ValueError("Device does not have a serial interface")
+
+    def establish_connection(self):
+        for i in range(self.max_tries):
+            try:
+                self.tx_ep.write(bytes([0x00]), self.timeout_ms)
+                ret = self.rx_ep.read(1, self.timeout_ms)
+                if ret[0] == 0x00:
+                    print("ACK received")
+                    return True
+            except usb.core.USBError as e:
+                print(f"Timeout: retry #{i}", e)
+        return False
+
+    def confirm_connection(self):
+        for i in range(self.max_tries):
+            try:
+                self.tx_ep.write(bytes([0x55]), self.timeout_ms)
+                ret = self.rx_ep.read(1, self.timeout_ms)
+                if ret[0] == 0xC3:
+                    print("ACK received")
+                    return True
+            except usb.core.USBError as e:
+                print(f"Timeout: retry #{i}", e)
+        return False
+
+# Usage
+communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+
+if not communicator.establish_connection():
+    print("Cannot connect")
+    sys.exit(0)
+
+if not communicator.confirm_connection():
+    print("Failed to confirm boot code")
+    sys.exit(0)
+
index 81697d9..c498c28 100644 (file)
@@ -1,4 +1,4 @@
-from flasher.flasher import calc_sum, unpack_pkt, pack_pkt
+from src.Packer import calc_sum, unpack_pkt, pack_pkt
 import pytest
 
 def test_calc_sum():