From 26795ba3625767c104f69ec43d2ac8fb7d142cbc Mon Sep 17 00:00:00 2001 From: Robin Krens Date: Thu, 15 Feb 2024 12:04:31 +0100 Subject: [PATCH] .flake8: add config + minor style fixes --- .flake8 | 10 ++++++++++ src/RAConnect.py | 18 +++++++++--------- src/RAFlasher.py | 40 ++++++++++++++++++---------------------- src/RAPacker.py | 18 +++++++++--------- 4 files changed, 46 insertions(+), 40 deletions(-) create mode 100644 .flake8 diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..1a5dedb --- /dev/null +++ b/.flake8 @@ -0,0 +1,10 @@ +[flake8] +max-line-length = 120 +extend-ignore = E203, W503 +exclude = .git,__pycache__,.venv +ignore = E261, E302, E305, E501, W291, W293, W503, W504, F401, F403, F841 +inline-quotes = ' +select = C,E,F,W,B,B950 +per-file-ignores = + # Example per-file ignores + # __init__.py: F401 diff --git a/src/RAConnect.py b/src/RAConnect.py index eb00868..44e43fb 100644 --- a/src/RAConnect.py +++ b/src/RAConnect.py @@ -23,6 +23,7 @@ from src.RAPacker import * MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer + class RAConnect: def __init__(self, vendor_id, product_id): self.vendor_id = vendor_id @@ -48,7 +49,7 @@ class RAConnect: raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?") for config in self.dev: - intf = config[(1,0)] + intf = config[(1, 0)] product_name = usb.util.get_string(self.dev, self.dev.iProduct) print(f'Found {product_name} ({self.vendor_id}:{self.product_id})') if self.dev.is_kernel_driver_active(intf.bInterfaceNumber): @@ -64,7 +65,6 @@ class RAConnect: raise ValueError("Device does not have a CDC interface") - def inquire_connection(self): packed = pack_pkt(INQ_CMD, "") self.send_data(packed) @@ -72,7 +72,7 @@ class RAConnect: if info == bytearray(b'\x00') or info == bytearray(b''): return False msg = unpack_pkt(info) - #print("Connection already established") + # print("Connection already established") return True def confirm_connection(self): @@ -91,25 +91,25 @@ class RAConnect: raise Exception("Not implemented") def set_chip_layout(self, cfg): - if cfg == None: + if cfg is None: raise ValueError("Could net get chip layout") self.chip_layout = cfg def send_data(self, packed_data): - if (self.tx_ep == None): + if self.tx_ep is None: return False try: self.tx_ep.write(packed_data, self.timeout_ms) except usb.core.USBError as e: - print(f"Timeout: error", e) + print("Timeout: error", e) return False return True def recv_data(self, exp_len, timeout=100): msg = bytearray(b'') - if (exp_len > MAX_TRANSFER_SIZE): + if exp_len > MAX_TRANSFER_SIZE: raise ValueError(f"length package {exp_len} over max transfer size") - if (self.rx_ep == None): + if self.rx_ep is None: return False try: received = 0 @@ -120,5 +120,5 @@ class RAConnect: if received == exp_len: return msg except usb.core.USBError as e: - print(f"Timeout: error", e) + print("Timeout: error", e) return msg diff --git a/src/RAFlasher.py b/src/RAFlasher.py index 823e9e6..b457e4f 100644 --- a/src/RAFlasher.py +++ b/src/RAFlasher.py @@ -35,7 +35,7 @@ commands = { } def int_to_hex_list(num): - hex_string = hex(num)[2:].upper() # convert to hex string + hex_string = hex(num)[2:].upper() # convert to hex string hex_string = hex_string.zfill(8) # pad for 8 char's long hex_list = [f'0x{hex_string[c:c+2]}' for c in range(0, 8, 2)] return hex_list @@ -49,7 +49,7 @@ def hex_type(string): def set_size_boundaries(dev, start_addr, size): - sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported + sector_size = dev.chip_layout[dev.sel_area]['ALIGN'] # currently only area 0 supported if start_addr % sector_size: raise ValueError(f"start addr not aligned on sector size {sector_size}") @@ -61,16 +61,16 @@ def set_size_boundaries(dev, start_addr, size): end_addr = blocks * sector_size + start_addr - 1 if (end_addr <= start_addr): - raise ValueError(f"End address smaller or equal than start_address") + raise ValueError("End address smaller or equal than start_address") if (end_addr > dev.chip_layout[dev.sel_area]['EAD']): - raise ValueError(f"Binary file is bigger than available ROM space") + raise ValueError("Binary file is bigger than available ROM space") return (start_addr, end_addr) def get_area_info(dev, output=False): cfg = {} - for i in [0,1,2]: + for i in [0, 1, 2]: packed = pack_pkt(ARE_CMD, [str(i)]) dev.send_data(packed) info = dev.recv_data(23) @@ -79,8 +79,7 @@ def get_area_info(dev, output=False): KOA, SAD, EAD, EAU, WAU = struct.unpack(fmt, bytes(int(x, 16) for x in msg)) cfg[i] = {"SAD": SAD, "EAD": EAD, "ALIGN": EAU} if output: - print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})') - + print(f'Area {KOA}: {hex(SAD)}:{hex(EAD)} (erase {hex(EAU)} - write {hex(WAU)})') return cfg def get_dev_info(dev): @@ -104,9 +103,8 @@ def get_dev_info(dev): print(f'Boot firmware: version {BFV >> 8}.{BFV & 0xFF}') def erase_chip(dev, start_addr, size): - - if size == None: - size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all + if size is None: + size = dev.chip_layout[dev.sel_area]['EAD'] - start_addr # erase all (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) print(f'Erasing {hex(start_addr)}:{hex(end_addr)}') @@ -117,14 +115,14 @@ def erase_chip(dev, start_addr, size): packed = pack_pkt(ERA_CMD, SAD + EAD) dev.send_data(packed) - ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer + ret = dev.recv_data(7, timeout=1000) # erase takes usually a bit longer unpack_pkt(ret) print("Erase complete") def read_img(dev, img, start_addr, size): - if size == None: - size = 0x3FFFF - start_addr # read maximum possible + if size is None: + size = 0x3FFFF - start_addr # read maximum possible (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) @@ -135,10 +133,10 @@ def read_img(dev, img, start_addr, size): dev.send_data(packed) # calculate how many packets are have to be received - nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024 + nr_packet = (end_addr - start_addr) // 1024 # TODO: set other than just 1024 with open(img, 'wb') as f: - for i in tqdm(range(0, nr_packet+1), desc="Reading progress"): + for i in tqdm(range(0, nr_packet + 1), desc="Reading progress"): ret = dev.recv_data(1024 + 6) chunk = unpack_pkt(ret) chunky = bytes(int(x, 16) for x in chunk) @@ -154,15 +152,15 @@ def write_img(dev, img, start_addr, size, verify=False): else: raise Exception(f'file {img} does not exist') - if size == None: + if size is None: size = file_size if size > file_size: raise ValueError("Write size > file size") (start_addr, end_addr) = set_size_boundaries(dev, start_addr, size) - - chunk_size = 1024 # max is 1024 according to protocol + + chunk_size = 1024 # max is 1024 according to protocol # setup initial communication SAD = int_to_hex_list(start_addr) @@ -189,8 +187,7 @@ def write_img(dev, img, start_addr, size, verify=False): chunk = f.read(chunk_size) totalread += chunk_size pbar.update(len(chunk)) - - + if verify: with tempfile.NamedTemporaryFile(prefix='.hidden_', delete=False) as tmp_file, open(img, 'rb') as cmp_file: read_img(dev, tmp_file.name, start_addr, size) @@ -223,8 +220,7 @@ def main(): subparsers.add_parser("info", help="Show flasher information") - args = parser.parse_args() - + args = parser.parse_args() if args.command in commands: dev = RAConnect(VENDOR_ID, PRODUCT_ID) area_cfg = get_area_info(dev) diff --git a/src/RAPacker.py b/src/RAPacker.py index 7cc8d46..e40d2ce 100644 --- a/src/RAPacker.py +++ b/src/RAPacker.py @@ -64,12 +64,12 @@ def calc_sum(cmd, data): lnl = data_len + 1 & 0x00FF res = lnh + lnl + cmd for i in range(data_len): - if isinstance(data[i], str): - res += int(data[i], 16) - elif isinstance(data[i], int): - res += data[i] - else: - res += ord(data[i]) + if isinstance(data[i], str): + res += int(data[i], 16) + elif isinstance(data[i], int): + res += data[i] + else: + res += ord(data[i]) res = ~(res - 1) & 0xFF # two's complement return (lnh, lnl, res) @@ -99,7 +99,7 @@ def pack_pkt(res, data, ack=False): if ack: SOD = 0x81 if (len(data) > 1024): - raise Exception(f'Data packet too large, data length is {DATA_LEN} (>1024)') + raise Exception(f'Data packet too large, data length is {len(data)} (>1024)') LNH, LNL, SUM = calc_sum(int(res), data) if not isinstance(data, bytes): DAT = bytes([int(x, 16) for x in data]) @@ -119,7 +119,7 @@ def unpack_pkt(data): fmt_header = '