def __init__(self, vendor_id, product_id):
self.vendor_id = vendor_id
self.product_id = product_id
- self.ep_in = 0x82
+ self.ep_in = 0x81
self.ep_out = 0x02
self.max_tries = 20
self.timeout_ms = 100
raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
for config in self.dev:
- for intf in config:
- if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
- print("Found serial device with 0x02 | 0xFF")
- if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
- print("Found kernel driver, detaching ... ")
- self.dev.detach_kernel_driver(intf.bInterfaceNumber)
- for ep in intf:
- if (ep.bmAttributes == 0x02):
- if ep.bEndpointAddress == self.ep_in:
- self.rx_ep = ep
- print(ep)
- elif ep.bEndpointAddress == self.ep_out:
- self.tx_ep = ep
- print(ep)
- return True
+ #print(config)
+ intf = config[(1,0)]
+ #for intf in config:
+
+ #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)):
+ print("Found serial device with 0x0a | 0xFF")
+ if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
+ print("Found kernel driver, detaching ... ")
+ self.dev.detach_kernel_driver(intf.bInterfaceNumber)
+ for ep in intf:
+ #print("=========")
+ #print(ep)
+ if (ep.bmAttributes == 0x02):
+ if ep.bEndpointAddress == self.ep_in:
+ self.rx_ep = ep
+ print(ep)
+ elif ep.bEndpointAddress == self.ep_out:
+ self.tx_ep = ep
+ print(ep)
+ return True
raise ValueError("Device does not have a serial interface")
self.tx_ep.write(bytes([0x00]), self.timeout_ms)
ret = self.rx_ep.read(1, self.timeout_ms)
if ret[0] == 0x00:
- print("ACK received")
+ print("Reply ACK received (0x00)")
return True
except usb.core.USBError as e:
print(f"Timeout: retry #{i}", e)
self.tx_ep.write(bytes([0x55]), self.timeout_ms)
ret = self.rx_ep.read(1, self.timeout_ms)
if ret[0] == 0xC3:
- print("ACK received")
+ print("Reply received (0xC3)")
return True
except usb.core.USBError as e:
print(f"Timeout: retry #{i}", e)
# packets are length 7, except for a read package
def recv_data(self, exp_len):
+ msg = bytearray(b'')
if (exp_len > MAX_TRANSFER_SIZE):
raise ValueError(f"length package {exp_len} over max transfer size")
if (self.rx_ep == None):
return False
try:
- msg = self.rx_ep.read(exp_len, self.timeout_ms)
+ received = 0
+ while received != exp_len:
+ buf = self.rx_ep.read(exp_len, self.timeout_ms)
+ msg += buf
+ print(buf, len(buf))
+ received += len(buf)
+ if received == exp_len:
+ return msg
except usb.core.USBError as e:
print(f"Timeout: error", e)
- return False
+ #return False
return msg
packed = pack_pkt(SIG_CMD, "")
dev.send_data(packed)
- #info_ret = dev.recv_data(17)
- info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test
- fmt = '>IIIBBH'
- _HEADER, SCI, RMB, NOA, TYP, BFV = struct.unpack(fmt, info)
+ print(packed)
+ info = dev.recv_data(18)
+ print(info, len(info))
+ #info = b'\x81\x00\x0D\x3A\x01\x31\x2d\x00\x00\x1e\x84\x80\x04\x02\x0a\x08' # test
+ fmt = '>IIIBBHH'
+ _HEADER, SCI, RMB, NOA, TYP, BFV, _FOOTER = struct.unpack(fmt, info)
print(f'Ver{BFV >> 8}.{BFV & 0xFF}')
def verify_img(dev, img, start_addr, end_addr):
args = parser.parse_args()
if args.command == "write":
- dev = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+ dev = RAConnect(vendor_id=0x045B, product_id=0x0261)
print(args)
write_img(dev, args.file_name, args.start_address, args.end_address, args.verify)
elif args.command == "read":
print('read command')
elif args.command == "info":
- dev = RAConnect(vendor_id=0x1a86, product_id=0x7523)
+ dev = RAConnect(vendor_id=0x045B, product_id=0x0261)
+ dev.establish_connection()
+ dev.confirm_connection()
get_dev_info(dev)
else:
parser.print_help()