dir: some restructering and refactoring
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5
6 class RAConnect:
7     def __init__(self, vendor_id, product_id):
8         self.vendor_id = vendor_id
9         self.product_id = product_id
10         self.ep_in = 0x82
11         self.ep_out = 0x02
12         self.max_tries = 20
13         self.timeout_ms = 100
14         self.dev = None
15         self.rx_ep = None
16         self.tx_ep = None
17         self.find_device()
18
19     def find_device(self):
20         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
21         if self.dev is None:
22             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
23
24         for config in self.dev:
25             for intf in config:
26                 if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
27                     print("Found serial device with 0x02 | 0xFF")
28                     if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
29                         print("Found kernel driver, detaching ... ")
30                         self.dev.detach_kernel_driver(intf.bInterfaceNumber)
31                     for ep in intf:
32                         if (ep.bmAttributes == 0x02):
33                             if ep.bEndpointAddress == self.ep_in:
34                                 self.rx_ep = ep
35                                 print(ep)
36                             elif ep.bEndpointAddress == self.ep_out:
37                                 self.tx_ep = ep
38                                 print(ep)
39                     return True
40
41         raise ValueError("Device does not have a serial interface")
42
43     def establish_connection(self):
44         for i in range(self.max_tries):
45             try:
46                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
47                 ret = self.rx_ep.read(1, self.timeout_ms)
48                 if ret[0] == 0x00:
49                     print("ACK received")
50                     return True
51             except usb.core.USBError as e:
52                 print(f"Timeout: retry #{i}", e)
53         return False
54
55     def confirm_connection(self):
56         for i in range(self.max_tries):
57             try:
58                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
59                 ret = self.rx_ep.read(1, self.timeout_ms)
60                 if ret[0] == 0xC3:
61                     print("ACK received")
62                     return True
63             except usb.core.USBError as e:
64                 print(f"Timeout: retry #{i}", e)
65         return False
66
67 # Usage
68 communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
69
70 if not communicator.establish_connection():
71     print("Cannot connect")
72     sys.exit(0)
73
74 if not communicator.confirm_connection():
75     print("Failed to confirm boot code")
76     sys.exit(0)
77