From 316adda1b54cd2e9071f0b81ebdbcfde5798b70c Mon Sep 17 00:00:00 2001 From: Robin Krens Date: Thu, 15 Feb 2024 13:52:11 +0100 Subject: [PATCH] setup.py: minor restructering dirs --- raflash/RAConnect.py | 124 +++++++++++++++++++++++++++ raflash/RAFlasher.py | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++ raflash/RAPacker.py | 136 ++++++++++++++++++++++++++++++ setup.py | 4 +- src/RAConnect.py | 124 --------------------------- src/RAFlasher.py | 233 --------------------------------------------------- src/RAPacker.py | 136 ------------------------------ 7 files changed, 495 insertions(+), 495 deletions(-) create mode 100644 raflash/RAConnect.py create mode 100644 raflash/RAFlasher.py create mode 100644 raflash/RAPacker.py delete mode 100644 src/RAConnect.py delete mode 100644 src/RAFlasher.py delete mode 100644 src/RAPacker.py diff --git a/raflash/RAConnect.py b/raflash/RAConnect.py new file mode 100644 index 0000000..881e516 --- /dev/null +++ b/raflash/RAConnect.py @@ -0,0 +1,124 @@ +# Copyright (C) Robin Krens - 2024 +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import sys +import time +import usb.core +import usb.util +from raflash.RAPacker import * + +MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer + + +class RAConnect: + def __init__(self, vendor_id, product_id): + self.vendor_id = vendor_id + self.product_id = product_id + self.ep_in = 0x81 + self.ep_out = 0x02 + self.max_tries = 20 + self.timeout_ms = 100 + self.dev = None + self.rx_ep = None + self.tx_ep = None + self.chip_layout = [] + self.sel_area = 0 # default to Area 0 + + self.find_device() + status_conn = self.inquire_connection() + if not status_conn: + self.confirm_connection() + + def find_device(self): + self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) + if self.dev is None: + raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") + + for config in self.dev: + intf = config[(1, 0)] + product_name = usb.util.get_string(self.dev, self.dev.iProduct) + print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') + if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): + print("Found kernel driver, detaching ... ") + self.dev.detach_kernel_driver(intf.bInterfaceNumber) + for ep in intf: + if (ep.bmAttributes == 0x02): + if ep.bEndpointAddress == self.ep_in: + self.rx_ep = ep + elif ep.bEndpointAddress == self.ep_out: + self.tx_ep = ep + return True + + raise ValueError("Device does not have a CDC interface") + + def inquire_connection(self): + packed = pack_pkt(INQ_CMD, "") + self.send_data(packed) + info = self.recv_data(7) + if info == bytearray(b'\x00') or info == bytearray(b''): + return False + msg = unpack_pkt(info) + # print("Connection already established") + return True + + def confirm_connection(self): + for i in range(self.max_tries): + try: + self.tx_ep.write(bytes([0x55]), self.timeout_ms) + ret = self.rx_ep.read(1, self.timeout_ms) + if ret[0] == 0xC3: + print("Reply received (0xC3)") + return True + except usb.core.USBError as e: + print(f"Timeout: retry #{i}", e) + return False + + def authenticate_connection(self): + raise Exception("Not implemented") + + def set_chip_layout(self, cfg): + if cfg is None: + raise ValueError("Could net get chip layout") + self.chip_layout = cfg + + def send_data(self, packed_data): + if self.tx_ep is None: + return False + try: + self.tx_ep.write(packed_data, self.timeout_ms) + except usb.core.USBError as e: + print("Timeout: error", e) + return False + return True + + def recv_data(self, exp_len, timeout=100): + msg = bytearray(b'') + if exp_len > MAX_TRANSFER_SIZE: + raise ValueError(f"length package {exp_len} over max transfer size") + if self.rx_ep is None: + return False + try: + received = 0 + while received != exp_len: + buf = self.rx_ep.read(exp_len, timeout) + msg += buf + received += len(buf) + if received == exp_len: + return msg + except usb.core.USBError as e: + print("Timeout: error", e) + return msg diff --git a/raflash/RAFlasher.py b/raflash/RAFlasher.py new file mode 100644 index 0000000..077b976 --- /dev/null +++ b/raflash/RAFlasher.py @@ -0,0 +1,233 @@ +# Copyright (C) Robin Krens - 2024 +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import os +import math +import time +import argparse +import tempfile +from tqdm import tqdm +from raflash.RAConnect import * +from raflash.RAPacker import * + +VENDOR_ID = 0x045B +PRODUCT_ID = 0x0261 + +commands = { + "write": lambda dev, args: write_img(dev, args.file_name, args.start_address, args.size, args.verify), + "read": lambda dev, args: read_img(dev, args.file_name, args.start_address, args.size), + "erase": lambda dev, args: erase_chip(dev, args.start_address, args.size), + "info": lambda dev, args: (get_dev_info(dev), dev.set_chip_layout(get_area_info(dev, output=True))) +} + +def int_to_hex_list(num): + hex_string = hex(num)[2:].upper() # convert to hex string + hex_string = hex_string.zfill(8) # pad for 8 char's long + hex_list = [f'0x{hex_string[c:c+2]}' for c in range(0, 8, 2)] + return hex_list + +def hex_type(string): + try: + value = int(string, 16) + return value + except ValueError: + raise argparse.ArgumentTypeError(f"'{string}' is not a valid hexadecimal value.") + +def set_size_boundaries(dev, start_addr, size): + + sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported + + if start_addr % sector_size: + raise ValueError(f"start addr not aligned on sector size {sector_size}") + + if size < sector_size: + print("Warning: you are trying to write something that is less than one sector size: padding with zeroes") + + blocks = (size + sector_size - 1) // sector_size + end_addr = blocks * sector_size + start_addr - 1 + + if (end_addr <= start_addr): + raise ValueError("End address smaller or equal than start_address") + + if (end_addr > dev.chip_layout[dev.sel_area]['EAD']): + raise ValueError("Binary file is bigger than available ROM space") + + return (start_addr, end_addr) + +def get_area_info(dev, output=False): + cfg = {} + for i in [0, 1, 2]: + packed = pack_pkt(ARE_CMD, [str(i)]) + dev.send_data(packed) + info = dev.recv_data(23) + msg = unpack_pkt(info) + fmt = '>BIIII' + KOA, SAD, EAD, EAU, WAU = struct.unpack(fmt, bytes(int(x, 16) for x in msg)) + cfg[i] = {"SAD": SAD, "EAD": EAD, "ALIGN": EAU} + if output: + print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})') + return cfg + +def get_dev_info(dev): + packed = pack_pkt(SIG_CMD, "") + dev.send_data(packed) + info = dev.recv_data(18) + fmt = '>IIIBBHH' + _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info) + print('====================') + if TYP == 0x02: + print('Chip: RA MCU + RA2/RA4 Series') + elif TYP == 0x03: + print('Chip: RA MCU + RA6 Series') + else: + print('Unknown MCU type') + print(f'Serial interface speed: {SCI} Hz') + print(f'Recommend max UART baud rate: {RMB} bps') + print(f'User area in Code flash [{NOA & 0x1}|{NOA & 0x02 >> 1}]') + print(f'User area in Data flash [{NOA & 0x03 >> 2}]') + print(f'Config area [{NOA & 0x04 >> 3}]') + print(f'Boot firmware: version {BFV >> 8}.{BFV & 0xFF}') + +def erase_chip(dev, start_addr, size): + if size is None: + size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all + + (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) + print(f'Erasing {hex(start_addr)}:{hex(end_addr)}') + + # setup initial communication + SAD = int_to_hex_list(start_addr) + EAD = int_to_hex_list(end_addr) + packed = pack_pkt(ERA_CMD, SAD + EAD) + dev.send_data(packed) + + ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer + unpack_pkt(ret) + print("Erase complete") + +def read_img(dev, img, start_addr, size): + + if size is None: + size = 0x3FFFF - start_addr # read maximum possible + + (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) + + # setup initial communication + SAD = int_to_hex_list(start_addr) + EAD = int_to_hex_list(end_addr) + packed = pack_pkt(REA_CMD, SAD + EAD) + dev.send_data(packed) + + # calculate how many packets are have to be received + nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024 + + with open(img, 'wb') as f: + for i in tqdm(range(0, nr_packet + 1), desc="Reading progress"): + ret = dev.recv_data(1024 + 6) + chunk = unpack_pkt(ret) + chunky = bytes(int(x, 16) for x in chunk) + f.write(chunky) + packed = pack_pkt(REA_CMD, ['0x00'], ack=True) + dev.send_data(packed) + + +def write_img(dev, img, start_addr, size, verify=False): + + if os.path.exists(img): + file_size = os.path.getsize(img) + else: + raise Exception(f'file {img} does not exist') + + if size is None: + size = file_size + + if size > file_size: + raise ValueError("Write size > file size") + + (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) + + chunk_size = 1024 # max is 1024 according to protocol + + # setup initial communication + SAD = int_to_hex_list(start_addr) + EAD = int_to_hex_list(end_addr) + packed = pack_pkt(WRI_CMD, SAD + EAD) + dev.send_data(packed) + ret = dev.recv_data(7) + unpack_pkt(ret) + + totalread = 0 + with open(img, 'rb') as f: + with tqdm(total=size, desc="Writing progress") as pbar: + chunk = f.read(chunk_size) + pbar.update(len(chunk)) + while chunk and totalread < size: + if len(chunk) != chunk_size: + padding_length = chunk_size - len(chunk) + chunk += b'\0' * padding_length + packed = pack_pkt(WRI_CMD, chunk, ack=True) + dev.send_data(packed) + reply_len = 7 + reply = dev.recv_data(reply_len) + msg = unpack_pkt(reply) + chunk = f.read(chunk_size) + totalread += chunk_size + pbar.update(len(chunk)) + + if verify: + with tempfile.NamedTemporaryFile(prefix='.hidden_', delete=False) as tmp_file, open(img, 'rb') as cmp_file: + read_img(dev, tmp_file.name, start_addr, size) + c1 = tmp_file.read(file_size) # due to byte alignment read file is longer + c2 = cmp_file.read() + if c1 == c2: + print("Verify complete") + else: + print("Verify failed") + +def main(): + parser = argparse.ArgumentParser(description="RA Flasher Tool") + + subparsers = parser.add_subparsers(dest="command", title="Commands") + + write_parser = subparsers.add_parser("write", help="Write data to flash") + write_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address") + write_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes") + write_parser.add_argument("--verify", action="store_true", help="Verify after writing") + write_parser.add_argument("file_name", type=str, help="File name") + + read_parser = subparsers.add_parser("read", help="Read data from flash") + read_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address") + read_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes") + read_parser.add_argument("file_name", type=str, help="File name") + + erase_parser = subparsers.add_parser("erase", help="Erase sectors") + erase_parser.add_argument("--start_address", default='0x0000', type=hex_type, help="Start address") + erase_parser.add_argument("--size", type=hex_type, help="Size") + + subparsers.add_parser("info", help="Show flasher information") + + args = parser.parse_args() + if args.command in commands: + dev = RAConnect(VENDOR_ID, PRODUCT_ID) + area_cfg = get_area_info(dev) + dev.set_chip_layout(area_cfg) + commands[args.command](dev, args) + else: + parser.print_help() + +if __name__ == "__main__": + main() diff --git a/raflash/RAPacker.py b/raflash/RAPacker.py new file mode 100644 index 0000000..e40d2ce --- /dev/null +++ b/raflash/RAPacker.py @@ -0,0 +1,136 @@ +# Copyright (C) Robin Krens - 2024 +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# + +import struct + +# Commands send to boot firmware +INQ_CMD = 0x00 +ERA_CMD = 0x12 +WRI_CMD = 0x13 +REA_CMD = 0x15 +IDA_CMD = 0x30 +BAU_CMD = 0x34 +SIG_CMD = 0x3A +ARE_CMD = 0x3B + +STATUS_OK = 0x00 +STATUS_ERR = 0x80 + +# Error codes +error_codes = { + 0xC: "ERR_UNSU", + 0xC1: "ERR_PCKT", + 0xC2: "ERR_CHKS", + 0xC3: "ERR_FLOW", + 0xD0: "ERR_ADDR", + 0xD4: "ERR_BAUD", + 0xDA: "ERR_PROT", + 0xDB: "ERR_ID", + 0xDC: "ERR_SERI", + 0xE1: "ERR_ERA", + 0xE2: "ERR_WRI", + 0xE7: "ERR_SEQ" +} + +# used for init sequence +LOW_PULSE = 0x00 +GENERIC_CODE = 0x55 +BOOT_CODE = 0xC3 + +TESTID = [ + "0xF0", "0xF1", "0xF2", "0xF3", + "0xE4", "0xE5", "0xE6", "0xE7", + "0xD8", "0xD9", "0xDA", "0xDB", + "0xCC", "0xCD", "0xCE", "0xCF" +] + +def calc_sum(cmd, data): + data_len = len(data) + lnh = (data_len + 1 & 0xFF00) >> 8 + lnl = data_len + 1 & 0x00FF + res = lnh + lnl + cmd + for i in range(data_len): + if isinstance(data[i], str): + res += int(data[i], 16) + elif isinstance(data[i], int): + res += data[i] + else: + res += ord(data[i]) + res = ~(res - 1) & 0xFF # two's complement + return (lnh, lnl, res) + + +# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX] +def pack_command(cmd, data): + SOD = 0x01 + COM = cmd + + if isinstance(data, str): + byte_data = bytes(data.encode('utf-8')) + else: + byte_data = bytes([int(x, 16) for x in data]) + + LNH, LNL, SUM = calc_sum(int(cmd), data) + ETX = 0x03 + fmt_header = ' 1024): + raise Exception(f'Data packet too large, data length is {len(data)} (>1024)') + LNH, LNL, SUM = calc_sum(int(res), data) + if not isinstance(data, bytes): + DAT = bytes([int(x, 16) for x in data]) + else: + DAT = data + RES = res + ETX = 0x03 + fmt_header = '=1.2.0', 'future>=0.18.3', @@ -28,7 +28,7 @@ setup( ], entry_points={ 'console_scripts': [ - 'raflash = src.RAFlasher:main', + 'raflash = raflash.RAFlasher:main', ], }, long_description=read('README.md'), diff --git a/src/RAConnect.py b/src/RAConnect.py deleted file mode 100644 index 44e43fb..0000000 --- a/src/RAConnect.py +++ /dev/null @@ -1,124 +0,0 @@ -# Copyright (C) Robin Krens - 2024 -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# - -import sys -import time -import usb.core -import usb.util -from src.RAPacker import * - -MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer - - -class RAConnect: - def __init__(self, vendor_id, product_id): - self.vendor_id = vendor_id - self.product_id = product_id - self.ep_in = 0x81 - self.ep_out = 0x02 - self.max_tries = 20 - self.timeout_ms = 100 - self.dev = None - self.rx_ep = None - self.tx_ep = None - self.chip_layout = [] - self.sel_area = 0 # default to Area 0 - - self.find_device() - status_conn = self.inquire_connection() - if not status_conn: - self.confirm_connection() - - def find_device(self): - self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id) - if self.dev is None: - raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") - - for config in self.dev: - intf = config[(1, 0)] - product_name = usb.util.get_string(self.dev, self.dev.iProduct) - print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') - if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): - print("Found kernel driver, detaching ... ") - self.dev.detach_kernel_driver(intf.bInterfaceNumber) - for ep in intf: - if (ep.bmAttributes == 0x02): - if ep.bEndpointAddress == self.ep_in: - self.rx_ep = ep - elif ep.bEndpointAddress == self.ep_out: - self.tx_ep = ep - return True - - raise ValueError("Device does not have a CDC interface") - - def inquire_connection(self): - packed = pack_pkt(INQ_CMD, "") - self.send_data(packed) - info = self.recv_data(7) - if info == bytearray(b'\x00') or info == bytearray(b''): - return False - msg = unpack_pkt(info) - # print("Connection already established") - return True - - def confirm_connection(self): - for i in range(self.max_tries): - try: - self.tx_ep.write(bytes([0x55]), self.timeout_ms) - ret = self.rx_ep.read(1, self.timeout_ms) - if ret[0] == 0xC3: - print("Reply received (0xC3)") - return True - except usb.core.USBError as e: - print(f"Timeout: retry #{i}", e) - return False - - def authenticate_connection(self): - raise Exception("Not implemented") - - def set_chip_layout(self, cfg): - if cfg is None: - raise ValueError("Could net get chip layout") - self.chip_layout = cfg - - def send_data(self, packed_data): - if self.tx_ep is None: - return False - try: - self.tx_ep.write(packed_data, self.timeout_ms) - except usb.core.USBError as e: - print("Timeout: error", e) - return False - return True - - def recv_data(self, exp_len, timeout=100): - msg = bytearray(b'') - if exp_len > MAX_TRANSFER_SIZE: - raise ValueError(f"length package {exp_len} over max transfer size") - if self.rx_ep is None: - return False - try: - received = 0 - while received != exp_len: - buf = self.rx_ep.read(exp_len, timeout) - msg += buf - received += len(buf) - if received == exp_len: - return msg - except usb.core.USBError as e: - print("Timeout: error", e) - return msg diff --git a/src/RAFlasher.py b/src/RAFlasher.py deleted file mode 100644 index b457e4f..0000000 --- a/src/RAFlasher.py +++ /dev/null @@ -1,233 +0,0 @@ -# Copyright (C) Robin Krens - 2024 -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# - -import os -import math -import time -import argparse -import tempfile -from tqdm import tqdm -from src.RAConnect import * -from src.RAPacker import * - -VENDOR_ID = 0x045B -PRODUCT_ID = 0x0261 - -commands = { - "write": lambda dev, args: write_img(dev, args.file_name, args.start_address, args.size, args.verify), - "read": lambda dev, args: read_img(dev, args.file_name, args.start_address, args.size), - "erase": lambda dev, args: erase_chip(dev, args.start_address, args.size), - "info": lambda dev, args: (get_dev_info(dev), dev.set_chip_layout(get_area_info(dev, output=True))) -} - -def int_to_hex_list(num): - hex_string = hex(num)[2:].upper() # convert to hex string - hex_string = hex_string.zfill(8) # pad for 8 char's long - hex_list = [f'0x{hex_string[c:c+2]}' for c in range(0, 8, 2)] - return hex_list - -def hex_type(string): - try: - value = int(string, 16) - return value - except ValueError: - raise argparse.ArgumentTypeError(f"'{string}' is not a valid hexadecimal value.") - -def set_size_boundaries(dev, start_addr, size): - - sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported - - if start_addr % sector_size: - raise ValueError(f"start addr not aligned on sector size {sector_size}") - - if size < sector_size: - print("Warning: you are trying to write something that is less than one sector size: padding with zeroes") - - blocks = (size + sector_size - 1) // sector_size - end_addr = blocks * sector_size + start_addr - 1 - - if (end_addr <= start_addr): - raise ValueError("End address smaller or equal than start_address") - - if (end_addr > dev.chip_layout[dev.sel_area]['EAD']): - raise ValueError("Binary file is bigger than available ROM space") - - return (start_addr, end_addr) - -def get_area_info(dev, output=False): - cfg = {} - for i in [0, 1, 2]: - packed = pack_pkt(ARE_CMD, [str(i)]) - dev.send_data(packed) - info = dev.recv_data(23) - msg = unpack_pkt(info) - fmt = '>BIIII' - KOA, SAD, EAD, EAU, WAU = struct.unpack(fmt, bytes(int(x, 16) for x in msg)) - cfg[i] = {"SAD": SAD, "EAD": EAD, "ALIGN": EAU} - if output: - print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})') - return cfg - -def get_dev_info(dev): - packed = pack_pkt(SIG_CMD, "") - dev.send_data(packed) - info = dev.recv_data(18) - fmt = '>IIIBBHH' - _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info) - print('====================') - if TYP == 0x02: - print('Chip: RA MCU + RA2/RA4 Series') - elif TYP == 0x03: - print('Chip: RA MCU + RA6 Series') - else: - print('Unknown MCU type') - print(f'Serial interface speed: {SCI} Hz') - print(f'Recommend max UART baud rate: {RMB} bps') - print(f'User area in Code flash [{NOA & 0x1}|{NOA & 0x02 >> 1}]') - print(f'User area in Data flash [{NOA & 0x03 >> 2}]') - print(f'Config area [{NOA & 0x04 >> 3}]') - print(f'Boot firmware: version {BFV >> 8}.{BFV & 0xFF}') - -def erase_chip(dev, start_addr, size): - if size is None: - size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all - - (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) - print(f'Erasing {hex(start_addr)}:{hex(end_addr)}') - - # setup initial communication - SAD = int_to_hex_list(start_addr) - EAD = int_to_hex_list(end_addr) - packed = pack_pkt(ERA_CMD, SAD + EAD) - dev.send_data(packed) - - ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer - unpack_pkt(ret) - print("Erase complete") - -def read_img(dev, img, start_addr, size): - - if size is None: - size = 0x3FFFF - start_addr # read maximum possible - - (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) - - # setup initial communication - SAD = int_to_hex_list(start_addr) - EAD = int_to_hex_list(end_addr) - packed = pack_pkt(REA_CMD, SAD + EAD) - dev.send_data(packed) - - # calculate how many packets are have to be received - nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024 - - with open(img, 'wb') as f: - for i in tqdm(range(0, nr_packet + 1), desc="Reading progress"): - ret = dev.recv_data(1024 + 6) - chunk = unpack_pkt(ret) - chunky = bytes(int(x, 16) for x in chunk) - f.write(chunky) - packed = pack_pkt(REA_CMD, ['0x00'], ack=True) - dev.send_data(packed) - - -def write_img(dev, img, start_addr, size, verify=False): - - if os.path.exists(img): - file_size = os.path.getsize(img) - else: - raise Exception(f'file {img} does not exist') - - if size is None: - size = file_size - - if size > file_size: - raise ValueError("Write size > file size") - - (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) - - chunk_size = 1024 # max is 1024 according to protocol - - # setup initial communication - SAD = int_to_hex_list(start_addr) - EAD = int_to_hex_list(end_addr) - packed = pack_pkt(WRI_CMD, SAD + EAD) - dev.send_data(packed) - ret = dev.recv_data(7) - unpack_pkt(ret) - - totalread = 0 - with open(img, 'rb') as f: - with tqdm(total=size, desc="Writing progress") as pbar: - chunk = f.read(chunk_size) - pbar.update(len(chunk)) - while chunk and totalread < size: - if len(chunk) != chunk_size: - padding_length = chunk_size - len(chunk) - chunk += b'\0' * padding_length - packed = pack_pkt(WRI_CMD, chunk, ack=True) - dev.send_data(packed) - reply_len = 7 - reply = dev.recv_data(reply_len) - msg = unpack_pkt(reply) - chunk = f.read(chunk_size) - totalread += chunk_size - pbar.update(len(chunk)) - - if verify: - with tempfile.NamedTemporaryFile(prefix='.hidden_', delete=False) as tmp_file, open(img, 'rb') as cmp_file: - read_img(dev, tmp_file.name, start_addr, size) - c1 = tmp_file.read(file_size) # due to byte alignment read file is longer - c2 = cmp_file.read() - if c1 == c2: - print("Verify complete") - else: - print("Verify failed") - -def main(): - parser = argparse.ArgumentParser(description="RA Flasher Tool") - - subparsers = parser.add_subparsers(dest="command", title="Commands") - - write_parser = subparsers.add_parser("write", help="Write data to flash") - write_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address") - write_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes") - write_parser.add_argument("--verify", action="store_true", help="Verify after writing") - write_parser.add_argument("file_name", type=str, help="File name") - - read_parser = subparsers.add_parser("read", help="Read data from flash") - read_parser.add_argument("--start_address", type=hex_type, default='0x0000', help="Start address") - read_parser.add_argument("--size", type=hex_type, default=None, help="Size in bytes") - read_parser.add_argument("file_name", type=str, help="File name") - - erase_parser = subparsers.add_parser("erase", help="Erase sectors") - erase_parser.add_argument("--start_address", default='0x0000', type=hex_type, help="Start address") - erase_parser.add_argument("--size", type=hex_type, help="Size") - - subparsers.add_parser("info", help="Show flasher information") - - args = parser.parse_args() - if args.command in commands: - dev = RAConnect(VENDOR_ID, PRODUCT_ID) - area_cfg = get_area_info(dev) - dev.set_chip_layout(area_cfg) - commands[args.command](dev, args) - else: - parser.print_help() - -if __name__ == "__main__": - main() diff --git a/src/RAPacker.py b/src/RAPacker.py deleted file mode 100644 index e40d2ce..0000000 --- a/src/RAPacker.py +++ /dev/null @@ -1,136 +0,0 @@ -# Copyright (C) Robin Krens - 2024 -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# - -import struct - -# Commands send to boot firmware -INQ_CMD = 0x00 -ERA_CMD = 0x12 -WRI_CMD = 0x13 -REA_CMD = 0x15 -IDA_CMD = 0x30 -BAU_CMD = 0x34 -SIG_CMD = 0x3A -ARE_CMD = 0x3B - -STATUS_OK = 0x00 -STATUS_ERR = 0x80 - -# Error codes -error_codes = { - 0xC: "ERR_UNSU", - 0xC1: "ERR_PCKT", - 0xC2: "ERR_CHKS", - 0xC3: "ERR_FLOW", - 0xD0: "ERR_ADDR", - 0xD4: "ERR_BAUD", - 0xDA: "ERR_PROT", - 0xDB: "ERR_ID", - 0xDC: "ERR_SERI", - 0xE1: "ERR_ERA", - 0xE2: "ERR_WRI", - 0xE7: "ERR_SEQ" -} - -# used for init sequence -LOW_PULSE = 0x00 -GENERIC_CODE = 0x55 -BOOT_CODE = 0xC3 - -TESTID = [ - "0xF0", "0xF1", "0xF2", "0xF3", - "0xE4", "0xE5", "0xE6", "0xE7", - "0xD8", "0xD9", "0xDA", "0xDB", - "0xCC", "0xCD", "0xCE", "0xCF" -] - -def calc_sum(cmd, data): - data_len = len(data) - lnh = (data_len + 1 & 0xFF00) >> 8 - lnl = data_len + 1 & 0x00FF - res = lnh + lnl + cmd - for i in range(data_len): - if isinstance(data[i], str): - res += int(data[i], 16) - elif isinstance(data[i], int): - res += data[i] - else: - res += ord(data[i]) - res = ~(res - 1) & 0xFF # two's complement - return (lnh, lnl, res) - - -# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX] -def pack_command(cmd, data): - SOD = 0x01 - COM = cmd - - if isinstance(data, str): - byte_data = bytes(data.encode('utf-8')) - else: - byte_data = bytes([int(x, 16) for x in data]) - - LNH, LNL, SUM = calc_sum(int(cmd), data) - ETX = 0x03 - fmt_header = ' 1024): - raise Exception(f'Data packet too large, data length is {len(data)} (>1024)') - LNH, LNL, SUM = calc_sum(int(res), data) - if not isinstance(data, bytes): - DAT = bytes([int(x, 16) for x in data]) - else: - DAT = data - RES = res - ETX = 0x03 - fmt_header = '