1 # Copyright (C) Robin Krens - 2024
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
22 from RAPacker import *
24 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
27 def __init__(self, vendor_id, product_id):
28 self.vendor_id = vendor_id
29 self.product_id = product_id
38 self.sel_area = 0 # default to Area 0
41 status_conn = self.inquire_connection()
43 self.confirm_connection()
45 def find_device(self):
46 self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
48 raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
50 for config in self.dev:
52 product_name = usb.util.get_string(self.dev, self.dev.iProduct)
53 print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
54 if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
55 print("Found kernel driver, detaching ... ")
56 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
58 if (ep.bmAttributes == 0x02):
59 if ep.bEndpointAddress == self.ep_in:
61 elif ep.bEndpointAddress == self.ep_out:
65 raise ValueError("Device does not have a CDC interface")
68 def inquire_connection(self):
69 packed = pack_pkt(INQ_CMD, "")
70 self.send_data(packed)
71 info = self.recv_data(7)
72 if info == bytearray(b'\x00') or info == bytearray(b''):
74 msg = unpack_pkt(info)
75 #print("Connection already established")
78 def confirm_connection(self):
79 for i in range(self.max_tries):
81 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
82 ret = self.rx_ep.read(1, self.timeout_ms)
84 print("Reply received (0xC3)")
86 except usb.core.USBError as e:
87 print(f"Timeout: retry #{i}", e)
90 def authenticate_connection(self):
91 raise Exception("Not implemented")
93 def set_chip_layout(self, cfg):
95 raise ValueError("Could net get chip layout")
96 self.chip_layout = cfg
98 def send_data(self, packed_data):
99 if (self.tx_ep == None):
102 self.tx_ep.write(packed_data, self.timeout_ms)
103 except usb.core.USBError as e:
104 print(f"Timeout: error", e)
108 def recv_data(self, exp_len, timeout=100):
110 if (exp_len > MAX_TRANSFER_SIZE):
111 raise ValueError(f"length package {exp_len} over max transfer size")
112 if (self.rx_ep == None):
116 while received != exp_len:
117 buf = self.rx_ep.read(exp_len, timeout)
120 if received == exp_len:
122 except usb.core.USBError as e:
123 print(f"Timeout: error", e)