From f969a6fb10665bcaf1432b56234e60197c26a54a Mon Sep 17 00:00:00 2001 From: Robin Krens Date: Sat, 10 Feb 2024 18:37:45 +0100 Subject: [PATCH] dir: some restructering and refactoring - naming: connect.py > RAConnect.py flasher.py > Packer.py - .gitignore to mostly ignore cache - setup.py project info --- .gitignore | 6 +++ flasher/connect.py | 97 ------------------------------------ flasher/flasher.py | 138 ---------------------------------------------------- setup.py | 6 +-- src/Packer.py | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/RAConnect.py | 77 +++++++++++++++++++++++++++++ tests/test_parse.py | 2 +- 7 files changed, 225 insertions(+), 239 deletions(-) create mode 100644 .gitignore delete mode 100644 flasher/connect.py delete mode 100644 flasher/flasher.py create mode 100644 src/Packer.py create mode 100644 src/RAConnect.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2925ae6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +flasher.egg-info/ +ra_flasher.egg-info/ +src/__pycache__/ +tests/__pycache__/ +.venv/ + diff --git a/flasher/connect.py b/flasher/connect.py deleted file mode 100644 index 91f21d5..0000000 --- a/flasher/connect.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright (C) - Robin Krens - 2024 -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# - -import sys -import serial -import serial.threaded -import time -import usb.core -import usb.util - -LOW_PULSE = 0x00 -ACK = 0x00 -GENERIC_CODE = 0x55 -BOOT_CODE = 0xC3 - -VENDOR_ID = 0x045B -PRODUCT_ID = 0x0261 - -EP_IN = 0x82 -EP_OUT = 0x02 - -MAX_TRIES = 20 - -def find_device(vendor_id, product_id): - dev = usb.core.find(idVendor=vendor_id, idProduct=product_id) - if dev is None: - raise ValueError(f"Device {vendor_id}:{product_id} not found\n Are you sure it is connected?") - - for config in dev: - for intf in config: - if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)): - print(f"Found serial device with 0x02 | 0xFF") - TxD, RxD = None, None - if dev.is_kernel_driver_active(intf.bInterfaceNumber): - print(f"Found kernel driver, detaching ... ") - dev.detach_kernel_driver(intf.bInterfaceNumber) - for ep in intf: - if (ep.bmAttributes == 0x02): - if ep.bEndpointAddress == EP_IN: - RxD = ep - print(ep) - elif ep.bEndpointAddress == EP_OUT: - TxD = ep - print(ep) - return (dev, RxD, TxD) - - raise ValueError("Device does not have a serial interface") - -def establish_connection(): - dev, rx_ep, tx_ep = find_device(0x1a86, 0x7523) - ret = [] - for i in range(MAX_TRIES): - try: - tx_ep.write(b'\x00', 100) - ret = rx_ep.read(1, 100) - if a[0] == ACK: - print("ACK received") - return True - except: - print(f"Timeout: retry #", i) - return False - -def confirm_connection(): - dev, rx_ep, tx_ep = find_device(0x1a86, 0x7523) - ret = [] - for i in range(MAX_TRIES): - try: - tx_ep.write(b'\x55', 100) - ret = rx_ep.read(1, 100) - if a[0] == BOOT_CODE: - print("ACK received") - return True - except: - print(f"Timeout: retry #", i) - return False - -if not establish_connection(): - print("Cannot connect") - sys.exit(0) - -if not confirm_connection(): - print("Failed to confirm boot code") - sys.exit(0) diff --git a/flasher/flasher.py b/flasher/flasher.py deleted file mode 100644 index ef98a96..0000000 --- a/flasher/flasher.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright (C) Robin Krens - 2024 -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. -# - -import struct - -# Commands send to boot firmware -INQ_CMD = 0x00 -ERA_CMD = 0x12 -WRI_CMD = 0x13 -REA_CMD = 0x15 -IDA_CMD = 0x30 -BAU_CMD = 0x34 -SIG_CMD = 0x3A -ARE_CMD = 0x3B - -# These are combined with send command, for example -# STATUS_OK | ERA_CMD == 0x12 -# STATUS_ERR | ERA_CMD = 0x92 -STATUS_OK = 0x00 -STATUS_ERR = 0x80 - -# Error codes -error_codes = { - 0xC: "ERR_UNSU", - 0xC1: "ERR_PCKT", - 0xC2: "ERR_CHKS", - 0xC3: "ERR_FLOW", - 0xD0: "ERR_ADDR", - 0xD4: "ERR_BAUD", - 0xDA: "ERR_PROT", - 0xDB: "ERR_ID", - 0xDC: "ERR_SERI", - 0xE1: "ERR_ERA", - 0xE2: "ERR_WRI", - 0xE7: "ERR_SEQ" -} - -# used for init sequence -LOW_PULSE = 0x00 -GENERIC_CODE = 0x55 -BOOT_CODE = 0xC3 - -TESTID = [ - "0xF0", "0xF1", "0xF2", "0xF3", - "0xE4", "0xE5", "0xE6", "0xE7", - "0xD8", "0xD9", "0xDA", "0xDB", - "0xCC", "0xCD", "0xCE", "0xCF" -] - -def calc_sum(cmd, data): - data_len = len(data) - lnh = data_len + 1 & 0xFF00 - lnl = data_len + 1 & 0x00FF - res = lnh + lnl + cmd - for i in range(data_len): - if isinstance(data[i], str): - res += int(data[i], 16) - else: - res += ord(data[i]) - res = ~(res - 1) & 0xFF # two's complement - return (lnh, lnl, res) - - -# format of data packet is [SOD|LNH|LNL|COM|byte_data|SUM|ETX] -def pack_command(cmd, data): - SOD = 0x01 - COM = cmd - - if isinstance(data, str): - byte_data = bytes(data.encode('utf-8')) - else: - byte_data = bytes([int(x, 16) for x in data]) - - LNH, LNL, SUM = calc_sum(int(cmd), data) - ETX = 0x03 - fmt_header = '= 1024): - raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)') - LNH, LNL, SUM = calc_sum(int(res), data) - DAT = bytes([int(x, 16) for x in data]) - RES = res - ETX = 0x03 - fmt_header = '= 1024): + raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)') + LNH, LNL, SUM = calc_sum(int(res), data) + DAT = bytes([int(x, 16) for x in data]) + RES = res + ETX = 0x03 + fmt_header = '