--- /dev/null
+# Copyright (C) Robin Krens - 2024
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+
+import sys
+import time
+import usb.core
+import usb.util
+from raflash.RAPacker import *
+
+MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
+
+
+class RAConnect:
+ def __init__(self, vendor_id, product_id):
+ self.vendor_id = vendor_id
+ self.product_id = product_id
+ self.ep_in = 0x81
+ self.ep_out = 0x02
+ self.max_tries = 20
+ self.timeout_ms = 100
+ self.dev = None
+ self.rx_ep = None
+ self.tx_ep = None
+ self.chip_layout = []
+ self.sel_area = 0 # default to Area 0
+
+ self.find_device()
+ status_conn = self.inquire_connection()
+ if not status_conn:
+ self.confirm_connection()
+
+ def find_device(self):
+ self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
+ if self.dev is None:
+ raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
+
+ for config in self.dev:
+ intf = config[(1, 0)]
+ product_name = usb.util.get_string(self.dev, self.dev.iProduct)
+ print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
+ if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
+ print("Found kernel driver, detaching ... ")
+ self.dev.detach_kernel_driver(intf.bInterfaceNumber)
+ for ep in intf:
+ if (ep.bmAttributes == 0x02):
+ if ep.bEndpointAddress == self.ep_in:
+ self.rx_ep = ep
+ elif ep.bEndpointAddress == self.ep_out:
+ self.tx_ep = ep
+ return True
+
+ raise ValueError("Device does not have a CDC interface")
+
+ def inquire_connection(self):
+ packed = pack_pkt(INQ_CMD, "")
+ self.send_data(packed)
+ info = self.recv_data(7)
+ if info == bytearray(b'\x00') or info == bytearray(b''):
+ return False
+ msg = unpack_pkt(info)
+ # print("Connection already established")
+ return True
+
+ def confirm_connection(self):
+ for i in range(self.max_tries):
+ try:
+ self.tx_ep.write(bytes([0x55]), self.timeout_ms)
+ ret = self.rx_ep.read(1, self.timeout_ms)
+ if ret[0] == 0xC3:
+ print("Reply received (0xC3)")
+ return True
+ except usb.core.USBError as e:
+ print(f"Timeout: retry #{i}", e)
+ return False
+
+ def authenticate_connection(self):
+ raise Exception("Not implemented")
+
+ def set_chip_layout(self, cfg):
+ if cfg is None:
+ raise ValueError("Could net get chip layout")
+ self.chip_layout = cfg
+
+ def send_data(self, packed_data):
+ if self.tx_ep is None:
+ return False
+ try:
+ self.tx_ep.write(packed_data, self.timeout_ms)
+ except usb.core.USBError as e:
+ print("Timeout: error", e)
+ return False
+ return True
+
+ def recv_data(self, exp_len, timeout=100):
+ msg = bytearray(b'')
+ if exp_len > MAX_TRANSFER_SIZE:
+ raise ValueError(f"length package {exp_len} over max transfer size")
+ if self.rx_ep is None:
+ return False
+ try:
+ received = 0
+ while received != exp_len:
+ buf = self.rx_ep.read(exp_len, timeout)
+ msg += buf
+ received += len(buf)
+ if received == exp_len:
+ return msg
+ except usb.core.USBError as e:
+ print("Timeout: error", e)
+ return msg
--- /dev/null
+# Copyright (C) Robin Krens - 2024
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+
+import os
+import math
+import time
+import argparse
+import tempfile
+from tqdm import tqdm
+from raflash.RAConnect import *
+from raflash.RAPacker import *
+
+VENDOR_ID = 0x045B
+PRODUCT_ID = 0x0261
+
+commands = {
+ "write": lambda dev, args: write_img(dev, args.file_name, args.start_address, args.size, args.verify),
+ "read": lambda dev, args: read_img(dev, args.file_name, args.start_address, args.size),
+ "erase": lambda dev, args: erase_chip(dev, args.start_address, args.size),
+ "info": lambda dev, args: (get_dev_info(dev), dev.set_chip_layout(get_area_info(dev, output=True)))
+}
+
+def int_to_hex_list(num):
+ hex_string = hex(num)[2:].upper() # convert to hex string
+ hex_string = hex_string.zfill(8) # pad for 8 char's long
+ hex_list = [f'0x{hex_string[c:c+2]}' for c in range(0, 8, 2)]
+ return hex_list
+
+def hex_type(string):
+ try:
+ value = int(string, 16)
+ return value
+ except ValueError:
+ raise argparse.ArgumentTypeError(f"'{string}' is not a valid hexadecimal value.")
+
+def set_size_boundaries(dev, start_addr, size):
+
+ sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported
+
+ if start_addr % sector_size:
+ raise ValueError(f"start addr not aligned on sector size {sector_size}")
+
+ if size < sector_size:
+ print("Warning: you are trying to write something that is less than one sector size: padding with zeroes")
+
+ blocks = (size + sector_size - 1) // sector_size
+ end_addr = blocks * sector_size + start_addr - 1
+
+ if (end_addr <= start_addr):
+ raise ValueError("End address smaller or equal than start_address")
+
+ if (end_addr > dev.chip_layout[dev.sel_area]['EAD']):
+ raise ValueError("Binary file is bigger than available ROM space")
+
+ return (start_addr, end_addr)
+
+def get_area_info(dev, output=False):
+ cfg = {}
+ for i in [0, 1, 2]:
+ packed = pack_pkt(ARE_CMD, [str(i)])
+ dev.send_data(packed)
+ info = dev.recv_data(23)
+ msg = unpack_pkt(info)
+ fmt = '>BIIII'
+ KOA, SAD, EAD, EAU, WAU = struct.unpack(fmt, bytes(int(x, 16) for x in msg))
+ cfg[i] = {"SAD": SAD, "EAD": EAD, "ALIGN": EAU}
+ if output:
+ print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})')
+ return cfg
+
+def get_dev_info(dev):
+ packed = pack_pkt(SIG_CMD, "")
+ dev.send_data(packed)
+ info = dev.recv_data(18)
+ fmt = '>IIIBBHH'
+ _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info)
+ print('====================')
+ if TYP == 0x02:
+ print('Chip: RA MCU + RA2/RA4 Series')
+ elif TYP == 0x03:
+ print('Chip: RA MCU + RA6 Series')
+ else:
+ print('Unknown MCU type')
+ print(f'Serial interface speed: {SCI} Hz')
+ print(f'Recommend max UART baud rate: {RMB} bps')
+ print(f'User area in Code flash [{NOA & 0x1}|{NOA & 0x02 >> 1}]')
+ print(f'User area in Data flash [{NOA & 0x03 >> 2}]')
+ print(f'Config area [{NOA & 0x04 >> 3}]')
+ print(f'Boot firmware: version {BFV >> 8}.{BFV & 0xFF}')
+
+def erase_chip(dev, start_addr, size):
+ if size is None:
+ size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all
+
+ (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
+ print(f'Erasing {hex(start_addr)}:{hex(end_addr)}')
+
+ # setup initial communication
+ SAD = int_to_hex_list(start_addr)
+ EAD = int_to_hex_list(end_addr)
+ packed = pack_pkt(ERA_CMD, SAD + EAD)
+ dev.send_data(packed)
+
+ ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer
+ unpack_pkt(ret)
+ print("Erase complete")
+
+def read_img(dev, img, start_addr, size):
+
+ if size is None:
+ size = 0x3FFFF - start_addr # read maximum possible
+
+ (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
+
+ # setup initial communication
+ SAD = int_to_hex_list(start_addr)
+ EAD = int_to_hex_list(end_addr)
+ packed = pack_pkt(REA_CMD, SAD + EAD)
+ dev.send_data(packed)
+
+ # calculate how many packets are have to be received
+ nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024
+
+ with open(img, 'wb') as f:
+ for i in tqdm(range(0, nr_packet + 1), desc="Reading progress"):
+ ret = dev.recv_data(1024 + 6)
+ chunk = unpack_pkt(ret)
+ chunky = bytes(int(x, 16) for x in chunk)
+ f.write(chunky)
+ packed = pack_pkt(REA_CMD, ['0x00'], ack=True)
+ dev.send_data(packed)
+
+
+def write_img(dev, img, start_addr, size, verify=False):
+
+ if os.path.exists(img):
+ file_size = os.path.getsize(img)
+ else:
+ raise Exception(f'file {img} does not exist')
+
+ if size is None:
+ size = file_size
+
+ if size > file_size:
+ raise ValueError("Write size > file size")
+
+ (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
+
+ chunk_size = 1024 # max is 1024 according to protocol
+
+ # setup initial communication
+ SAD = int_to_hex_list(start_addr)
+ EAD = int_to_hex_list(end_addr)
+ packed = pack_pkt(WRI_CMD, SAD + EAD)
+ dev.send_data(packed)
+ ret = dev.recv_data(7)
+ unpack_pkt(ret)
+
+ totalread = 0
+ with open(img, 'rb') as f:
+ with tqdm(total=size, desc="Writing progress") as pbar:
+ chunk = f.read(chunk_size)
+ pbar.update(len(chunk))
+ while chunk and totalread < size:
+ if len(chunk) != chunk_size:
+ padding_length = chunk_size - len(chunk)
+ chunk += b'\0' * padding_length
+ packed = pack_pkt(WRI_CMD, chunk, ack=True)
+ dev.send_data(packed)
+ reply_len = 7
+ reply = dev.recv_data(reply_len)
+ msg = unpack_pkt(reply)
+ chunk = f.read(chunk_size)
+ totalread += chunk_size
+ pbar.update(len(chunk))
+
+ if verify:
+ with tempfile.NamedTemporaryFile(prefix='.hidden_', delete=False) as tmp_file, open(img, 'rb') as cmp_file:
+ read_img(dev, tmp_file.name, start_addr, size)
+ c1 = tmp_file.read(file_size) # due to byte alignment read file is longer
+ c2 = cmp_file.read()
+ if c1 == c2:
+ print("Verify complete")
+ else:
+ print("Verify failed")
+
+def main():
+ parser = argparse.ArgumentParser(description="RA Flasher Tool")
+
+ subparsers = parser.add_subparsers(dest="command", title="Commands")
+
+ write_parser = subparsers.add_parser("write", help="Write data to flash")
+ write_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address")
+ write_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes")
+ write_parser.add_argument("--verify", action="store_true", help="Verify after writing")
+ write_parser.add_argument("file_name", type=str, help="File name")
+
+ read_parser = subparsers.add_parser("read", help="Read data from flash")
+ read_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address")
+ read_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes")
+ read_parser.add_argument("file_name", type=str, help="File name")
+
+ erase_parser = subparsers.add_parser("erase", help="Erase sectors")
+ erase_parser.add_argument("--start_address", default='0x0000', type=hex_type, help="Start address")
+ erase_parser.add_argument("--size", type=hex_type, help="Size")
+
+ subparsers.add_parser("info", help="Show flasher information")
+
+ args = parser.parse_args()
+ if args.command in commands:
+ dev = RAConnect(VENDOR_ID, PRODUCT_ID)
+ area_cfg = get_area_info(dev)
+ dev.set_chip_layout(area_cfg)
+ commands[args.command](dev, args)
+ else:
+ parser.print_help()
+
+if __name__ == "__main__":
+ main()
--- /dev/null
+# Copyright (C) Robin Krens - 2024
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+#
+
+import struct
+
+# Commands send to boot firmware
+INQ_CMD = 0x00
+ERA_CMD = 0x12
+WRI_CMD = 0x13
+REA_CMD = 0x15
+IDA_CMD = 0x30
+BAU_CMD = 0x34
+SIG_CMD = 0x3A
+ARE_CMD = 0x3B
+
+STATUS_OK = 0x00
+STATUS_ERR = 0x80
+
+# Error codes
+error_codes = {
+ 0xC: "ERR_UNSU",
+ 0xC1: "ERR_PCKT",
+ 0xC2: "ERR_CHKS",
+ 0xC3: "ERR_FLOW",
+ 0xD0: "ERR_ADDR",
+ 0xD4: "ERR_BAUD",
+ 0xDA: "ERR_PROT",
+ 0xDB: "ERR_ID",
+ 0xDC: "ERR_SERI",
+ 0xE1: "ERR_ERA",
+ 0xE2: "ERR_WRI",
+ 0xE7: "ERR_SEQ"
+}
+
+# used for init sequence
+LOW_PULSE = 0x00
+GENERIC_CODE = 0x55
+BOOT_CODE = 0xC3
+
+TESTID = [
+ "0xF0", "0xF1", "0xF2", "0xF3",
+ "0xE4", "0xE5", "0xE6", "0xE7",
+ "0xD8", "0xD9", "0xDA", "0xDB",
+ "0xCC", "0xCD", "0xCE", "0xCF"
+]
+
+def calc_sum(cmd, data):
+ data_len = len(data)
+ lnh = (data_len + 1 & 0xFF00) >> 8
+ lnl = data_len + 1 & 0x00FF
+ res = lnh + lnl + cmd
+ for i in range(data_len):
+ if isinstance(data[i], str):
+ res += int(data[i], 16)
+ elif isinstance(data[i], int):
+ res += data[i]
+ else:
+ res += ord(data[i])
+ res = ~(res - 1) & 0xFF # two's complement
+ return (lnh, lnl, res)
+
+
+# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX]
+def pack_command(cmd, data):
+ SOD = 0x01
+ COM = cmd
+
+ if isinstance(data, str):
+ byte_data = bytes(data.encode('utf-8'))
+ else:
+ byte_data = bytes([int(x, 16) for x in data])
+
+ LNH, LNL, SUM = calc_sum(int(cmd), data)
+ ETX = 0x03
+ fmt_header = '<BBBB'
+ fmt_footer = 'BB'
+ fmt = fmt_header + str(len(data)) + 's' + fmt_footer
+ pack = struct.pack(fmt, SOD, LNH, LNL, COM, byte_data, SUM, ETX)
+ print(fmt, pack, len(pack))
+ return fmt
+
+# format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX]
+def pack_pkt(res, data, ack=False):
+ SOD = 0x01 # TODO: check if 0x81 header needed
+ if ack:
+ SOD = 0x81
+ if (len(data) > 1024):
+ raise Exception(f'Data packet too large, data length is {len(data)} (>1024)')
+ LNH, LNL, SUM = calc_sum(int(res), data)
+ if not isinstance(data, bytes):
+ DAT = bytes([int(x, 16) for x in data])
+ else:
+ DAT = data
+ RES = res
+ ETX = 0x03
+ fmt_header = '<BBBB'
+ fmt_footer = 'BB'
+ fmt = fmt_header + str(len(data)) + 's' + fmt_footer
+ pack = struct.pack(fmt, SOD, LNH, LNL, RES, DAT, SUM, ETX)
+ return pack
+
+# packet received from mcu
+def unpack_pkt(data):
+ header = data[0:4]
+ fmt_header = '<BBBB'
+ SOD, LNH, LNL, RES = struct.unpack(fmt_header, header)
+ if (SOD != 0x81):
+ raise Exception('Wrong start of packet data received')
+ pkt_len = (LNH << 0x8 | LNL) - 1
+ fmt_message = '<' + str(pkt_len) + 's'
+ raw = struct.unpack_from(fmt_message, data, 4)[0]
+ message = ['0x{:02X}'.format(byte) for byte in raw]
+ if (RES & 0x80):
+ raise ValueError(f'MCU encountered error {message[0]}')
+ fmt_footer = '<BB'
+ SUM, ETX = struct.unpack_from(fmt_footer, data, 4 + pkt_len)
+ lnh, lnl, local_sum = calc_sum(RES, message)
+ if (SUM != local_sum):
+ raise Exception(f'Sum calculation mismatch, read {SUM} instead of {local_sum}')
+ if (ETX != 0x03):
+ raise Exception('Packet ETX error')
+ return message
description=("Flasher for the built in ROM bootloader for Renesas RA microcontrollers"),
license="GNU",
keywords="Renesas RA chipset flasher",
- packages=['src', 'tests'],
+ packages=['raflash', 'tests'],
install_requires=[
'exceptiongroup>=1.2.0',
'future>=0.18.3',
],
entry_points={
'console_scripts': [
- 'raflash = src.RAFlasher:main',
+ 'raflash = raflash.RAFlasher:main',
],
},
long_description=read('README.md'),
+++ /dev/null
-# Copyright (C) Robin Krens - 2024
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
-
-import sys
-import time
-import usb.core
-import usb.util
-from src.RAPacker import *
-
-MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
-
-
-class RAConnect:
- def __init__(self, vendor_id, product_id):
- self.vendor_id = vendor_id
- self.product_id = product_id
- self.ep_in = 0x81
- self.ep_out = 0x02
- self.max_tries = 20
- self.timeout_ms = 100
- self.dev = None
- self.rx_ep = None
- self.tx_ep = None
- self.chip_layout = []
- self.sel_area = 0 # default to Area 0
-
- self.find_device()
- status_conn = self.inquire_connection()
- if not status_conn:
- self.confirm_connection()
-
- def find_device(self):
- self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
- if self.dev is None:
- raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
-
- for config in self.dev:
- intf = config[(1, 0)]
- product_name = usb.util.get_string(self.dev, self.dev.iProduct)
- print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
- if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
- print("Found kernel driver, detaching ... ")
- self.dev.detach_kernel_driver(intf.bInterfaceNumber)
- for ep in intf:
- if (ep.bmAttributes == 0x02):
- if ep.bEndpointAddress == self.ep_in:
- self.rx_ep = ep
- elif ep.bEndpointAddress == self.ep_out:
- self.tx_ep = ep
- return True
-
- raise ValueError("Device does not have a CDC interface")
-
- def inquire_connection(self):
- packed = pack_pkt(INQ_CMD, "")
- self.send_data(packed)
- info = self.recv_data(7)
- if info == bytearray(b'\x00') or info == bytearray(b''):
- return False
- msg = unpack_pkt(info)
- # print("Connection already established")
- return True
-
- def confirm_connection(self):
- for i in range(self.max_tries):
- try:
- self.tx_ep.write(bytes([0x55]), self.timeout_ms)
- ret = self.rx_ep.read(1, self.timeout_ms)
- if ret[0] == 0xC3:
- print("Reply received (0xC3)")
- return True
- except usb.core.USBError as e:
- print(f"Timeout: retry #{i}", e)
- return False
-
- def authenticate_connection(self):
- raise Exception("Not implemented")
-
- def set_chip_layout(self, cfg):
- if cfg is None:
- raise ValueError("Could net get chip layout")
- self.chip_layout = cfg
-
- def send_data(self, packed_data):
- if self.tx_ep is None:
- return False
- try:
- self.tx_ep.write(packed_data, self.timeout_ms)
- except usb.core.USBError as e:
- print("Timeout: error", e)
- return False
- return True
-
- def recv_data(self, exp_len, timeout=100):
- msg = bytearray(b'')
- if exp_len > MAX_TRANSFER_SIZE:
- raise ValueError(f"length package {exp_len} over max transfer size")
- if self.rx_ep is None:
- return False
- try:
- received = 0
- while received != exp_len:
- buf = self.rx_ep.read(exp_len, timeout)
- msg += buf
- received += len(buf)
- if received == exp_len:
- return msg
- except usb.core.USBError as e:
- print("Timeout: error", e)
- return msg
+++ /dev/null
-# Copyright (C) Robin Krens - 2024
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
-
-import os
-import math
-import time
-import argparse
-import tempfile
-from tqdm import tqdm
-from src.RAConnect import *
-from src.RAPacker import *
-
-VENDOR_ID = 0x045B
-PRODUCT_ID = 0x0261
-
-commands = {
- "write": lambda dev, args: write_img(dev, args.file_name, args.start_address, args.size, args.verify),
- "read": lambda dev, args: read_img(dev, args.file_name, args.start_address, args.size),
- "erase": lambda dev, args: erase_chip(dev, args.start_address, args.size),
- "info": lambda dev, args: (get_dev_info(dev), dev.set_chip_layout(get_area_info(dev, output=True)))
-}
-
-def int_to_hex_list(num):
- hex_string = hex(num)[2:].upper() # convert to hex string
- hex_string = hex_string.zfill(8) # pad for 8 char's long
- hex_list = [f'0x{hex_string[c:c+2]}' for c in range(0, 8, 2)]
- return hex_list
-
-def hex_type(string):
- try:
- value = int(string, 16)
- return value
- except ValueError:
- raise argparse.ArgumentTypeError(f"'{string}' is not a valid hexadecimal value.")
-
-def set_size_boundaries(dev, start_addr, size):
-
- sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported
-
- if start_addr % sector_size:
- raise ValueError(f"start addr not aligned on sector size {sector_size}")
-
- if size < sector_size:
- print("Warning: you are trying to write something that is less than one sector size: padding with zeroes")
-
- blocks = (size + sector_size - 1) // sector_size
- end_addr = blocks * sector_size + start_addr - 1
-
- if (end_addr <= start_addr):
- raise ValueError("End address smaller or equal than start_address")
-
- if (end_addr > dev.chip_layout[dev.sel_area]['EAD']):
- raise ValueError("Binary file is bigger than available ROM space")
-
- return (start_addr, end_addr)
-
-def get_area_info(dev, output=False):
- cfg = {}
- for i in [0, 1, 2]:
- packed = pack_pkt(ARE_CMD, [str(i)])
- dev.send_data(packed)
- info = dev.recv_data(23)
- msg = unpack_pkt(info)
- fmt = '>BIIII'
- KOA, SAD, EAD, EAU, WAU = struct.unpack(fmt, bytes(int(x, 16) for x in msg))
- cfg[i] = {"SAD": SAD, "EAD": EAD, "ALIGN": EAU}
- if output:
- print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})')
- return cfg
-
-def get_dev_info(dev):
- packed = pack_pkt(SIG_CMD, "")
- dev.send_data(packed)
- info = dev.recv_data(18)
- fmt = '>IIIBBHH'
- _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info)
- print('====================')
- if TYP == 0x02:
- print('Chip: RA MCU + RA2/RA4 Series')
- elif TYP == 0x03:
- print('Chip: RA MCU + RA6 Series')
- else:
- print('Unknown MCU type')
- print(f'Serial interface speed: {SCI} Hz')
- print(f'Recommend max UART baud rate: {RMB} bps')
- print(f'User area in Code flash [{NOA & 0x1}|{NOA & 0x02 >> 1}]')
- print(f'User area in Data flash [{NOA & 0x03 >> 2}]')
- print(f'Config area [{NOA & 0x04 >> 3}]')
- print(f'Boot firmware: version {BFV >> 8}.{BFV & 0xFF}')
-
-def erase_chip(dev, start_addr, size):
- if size is None:
- size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all
-
- (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
- print(f'Erasing {hex(start_addr)}:{hex(end_addr)}')
-
- # setup initial communication
- SAD = int_to_hex_list(start_addr)
- EAD = int_to_hex_list(end_addr)
- packed = pack_pkt(ERA_CMD, SAD + EAD)
- dev.send_data(packed)
-
- ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer
- unpack_pkt(ret)
- print("Erase complete")
-
-def read_img(dev, img, start_addr, size):
-
- if size is None:
- size = 0x3FFFF - start_addr # read maximum possible
-
- (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
-
- # setup initial communication
- SAD = int_to_hex_list(start_addr)
- EAD = int_to_hex_list(end_addr)
- packed = pack_pkt(REA_CMD, SAD + EAD)
- dev.send_data(packed)
-
- # calculate how many packets are have to be received
- nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024
-
- with open(img, 'wb') as f:
- for i in tqdm(range(0, nr_packet + 1), desc="Reading progress"):
- ret = dev.recv_data(1024 + 6)
- chunk = unpack_pkt(ret)
- chunky = bytes(int(x, 16) for x in chunk)
- f.write(chunky)
- packed = pack_pkt(REA_CMD, ['0x00'], ack=True)
- dev.send_data(packed)
-
-
-def write_img(dev, img, start_addr, size, verify=False):
-
- if os.path.exists(img):
- file_size = os.path.getsize(img)
- else:
- raise Exception(f'file {img} does not exist')
-
- if size is None:
- size = file_size
-
- if size > file_size:
- raise ValueError("Write size > file size")
-
- (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size)
-
- chunk_size = 1024 # max is 1024 according to protocol
-
- # setup initial communication
- SAD = int_to_hex_list(start_addr)
- EAD = int_to_hex_list(end_addr)
- packed = pack_pkt(WRI_CMD, SAD + EAD)
- dev.send_data(packed)
- ret = dev.recv_data(7)
- unpack_pkt(ret)
-
- totalread = 0
- with open(img, 'rb') as f:
- with tqdm(total=size, desc="Writing progress") as pbar:
- chunk = f.read(chunk_size)
- pbar.update(len(chunk))
- while chunk and totalread < size:
- if len(chunk) != chunk_size:
- padding_length = chunk_size - len(chunk)
- chunk += b'\0' * padding_length
- packed = pack_pkt(WRI_CMD, chunk, ack=True)
- dev.send_data(packed)
- reply_len = 7
- reply = dev.recv_data(reply_len)
- msg = unpack_pkt(reply)
- chunk = f.read(chunk_size)
- totalread += chunk_size
- pbar.update(len(chunk))
-
- if verify:
- with tempfile.NamedTemporaryFile(prefix='.hidden_', delete=False) as tmp_file, open(img, 'rb') as cmp_file:
- read_img(dev, tmp_file.name, start_addr, size)
- c1 = tmp_file.read(file_size) # due to byte alignment read file is longer
- c2 = cmp_file.read()
- if c1 == c2:
- print("Verify complete")
- else:
- print("Verify failed")
-
-def main():
- parser = argparse.ArgumentParser(description="RA Flasher Tool")
-
- subparsers = parser.add_subparsers(dest="command", title="Commands")
-
- write_parser = subparsers.add_parser("write", help="Write data to flash")
- write_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address")
- write_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes")
- write_parser.add_argument("--verify", action="store_true", help="Verify after writing")
- write_parser.add_argument("file_name", type=str, help="File name")
-
- read_parser = subparsers.add_parser("read", help="Read data from flash")
- read_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address")
- read_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes")
- read_parser.add_argument("file_name", type=str, help="File name")
-
- erase_parser = subparsers.add_parser("erase", help="Erase sectors")
- erase_parser.add_argument("--start_address", default='0x0000', type=hex_type, help="Start address")
- erase_parser.add_argument("--size", type=hex_type, help="Size")
-
- subparsers.add_parser("info", help="Show flasher information")
-
- args = parser.parse_args()
- if args.command in commands:
- dev = RAConnect(VENDOR_ID, PRODUCT_ID)
- area_cfg = get_area_info(dev)
- dev.set_chip_layout(area_cfg)
- commands[args.command](dev, args)
- else:
- parser.print_help()
-
-if __name__ == "__main__":
- main()
+++ /dev/null
-# Copyright (C) Robin Krens - 2024
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
-#
-
-import struct
-
-# Commands send to boot firmware
-INQ_CMD = 0x00
-ERA_CMD = 0x12
-WRI_CMD = 0x13
-REA_CMD = 0x15
-IDA_CMD = 0x30
-BAU_CMD = 0x34
-SIG_CMD = 0x3A
-ARE_CMD = 0x3B
-
-STATUS_OK = 0x00
-STATUS_ERR = 0x80
-
-# Error codes
-error_codes = {
- 0xC: "ERR_UNSU",
- 0xC1: "ERR_PCKT",
- 0xC2: "ERR_CHKS",
- 0xC3: "ERR_FLOW",
- 0xD0: "ERR_ADDR",
- 0xD4: "ERR_BAUD",
- 0xDA: "ERR_PROT",
- 0xDB: "ERR_ID",
- 0xDC: "ERR_SERI",
- 0xE1: "ERR_ERA",
- 0xE2: "ERR_WRI",
- 0xE7: "ERR_SEQ"
-}
-
-# used for init sequence
-LOW_PULSE = 0x00
-GENERIC_CODE = 0x55
-BOOT_CODE = 0xC3
-
-TESTID = [
- "0xF0", "0xF1", "0xF2", "0xF3",
- "0xE4", "0xE5", "0xE6", "0xE7",
- "0xD8", "0xD9", "0xDA", "0xDB",
- "0xCC", "0xCD", "0xCE", "0xCF"
-]
-
-def calc_sum(cmd, data):
- data_len = len(data)
- lnh = (data_len + 1 & 0xFF00) >> 8
- lnl = data_len + 1 & 0x00FF
- res = lnh + lnl + cmd
- for i in range(data_len):
- if isinstance(data[i], str):
- res += int(data[i], 16)
- elif isinstance(data[i], int):
- res += data[i]
- else:
- res += ord(data[i])
- res = ~(res - 1) & 0xFF # two's complement
- return (lnh, lnl, res)
-
-
-# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX]
-def pack_command(cmd, data):
- SOD = 0x01
- COM = cmd
-
- if isinstance(data, str):
- byte_data = bytes(data.encode('utf-8'))
- else:
- byte_data = bytes([int(x, 16) for x in data])
-
- LNH, LNL, SUM = calc_sum(int(cmd), data)
- ETX = 0x03
- fmt_header = '<BBBB'
- fmt_footer = 'BB'
- fmt = fmt_header + str(len(data)) + 's' + fmt_footer
- pack = struct.pack(fmt, SOD, LNH, LNL, COM, byte_data, SUM, ETX)
- print(fmt, pack, len(pack))
- return fmt
-
-# format of data packet is [SOD|LNH|LNL|RES|DAT|SUM|ETX]
-def pack_pkt(res, data, ack=False):
- SOD = 0x01 # TODO: check if 0x81 header needed
- if ack:
- SOD = 0x81
- if (len(data) > 1024):
- raise Exception(f'Data packet too large, data length is {len(data)} (>1024)')
- LNH, LNL, SUM = calc_sum(int(res), data)
- if not isinstance(data, bytes):
- DAT = bytes([int(x, 16) for x in data])
- else:
- DAT = data
- RES = res
- ETX = 0x03
- fmt_header = '<BBBB'
- fmt_footer = 'BB'
- fmt = fmt_header + str(len(data)) + 's' + fmt_footer
- pack = struct.pack(fmt, SOD, LNH, LNL, RES, DAT, SUM, ETX)
- return pack
-
-# packet received from mcu
-def unpack_pkt(data):
- header = data[0:4]
- fmt_header = '<BBBB'
- SOD, LNH, LNL, RES = struct.unpack(fmt_header, header)
- if (SOD != 0x81):
- raise Exception('Wrong start of packet data received')
- pkt_len = (LNH << 0x8 | LNL) - 1
- fmt_message = '<' + str(pkt_len) + 's'
- raw = struct.unpack_from(fmt_message, data, 4)[0]
- message = ['0x{:02X}'.format(byte) for byte in raw]
- if (RES & 0x80):
- raise ValueError(f'MCU encountered error {message[0]}')
- fmt_footer = '<BB'
- SUM, ETX = struct.unpack_from(fmt_footer, data, 4 + pkt_len)
- lnh, lnl, local_sum = calc_sum(RES, message)
- if (SUM != local_sum):
- raise Exception(f'Sum calculation mismatch, read {SUM} instead of {local_sum}')
- if (ETX != 0x03):
- raise Exception('Packet ETX error')
- return message