RAFlasher.py: minor cleanup
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5
6 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
7
8 class RAConnect:
9     def __init__(self, vendor_id, product_id):
10         self.vendor_id = vendor_id
11         self.product_id = product_id
12         self.ep_in = 0x81
13         self.ep_out = 0x02
14         self.max_tries = 20
15         self.timeout_ms = 100
16         self.dev = None
17         self.rx_ep = None
18         self.tx_ep = None
19         self.find_device()
20
21     def find_device(self):
22         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
23         if self.dev is None:
24             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
25
26         for config in self.dev:
27             intf = config[(1,0)]
28             product_name = usb.util.get_string(self.dev, self.dev.iProduct)
29             print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
30             if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
31                 print("Found kernel driver, detaching ... ")
32                 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
33             for ep in intf:
34                 if (ep.bmAttributes == 0x02):
35                     if ep.bEndpointAddress == self.ep_in:
36                         self.rx_ep = ep
37                     elif ep.bEndpointAddress == self.ep_out:
38                         self.tx_ep = ep
39             return True
40
41         raise ValueError("Device does not have a serial interface")
42
43     def establish_connection(self):
44         for i in range(self.max_tries):
45             try:
46                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
47                 ret = self.rx_ep.read(1, self.timeout_ms)
48                 if ret[0] == 0x00:
49                     print("Reply ACK received (0x00)")
50                     return True
51             except usb.core.USBError as e:
52                 print(f"Timeout: retry #{i}", e)
53         return False
54
55     def confirm_connection(self):
56         for i in range(self.max_tries):
57             try:
58                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
59                 ret = self.rx_ep.read(1, self.timeout_ms)
60                 if ret[0] == 0xC3:
61                     print("Reply received (0xC3)")
62                     return True
63             except usb.core.USBError as e:
64                 print(f"Timeout: retry #{i}", e)
65         return False
66
67     def authenticate_connection(self):
68         raise Exception("Not implemented")
69
70     def send_data(self, packed_data):
71         if (self.tx_ep == None):
72             return False
73         try:
74             self.tx_ep.write(packed_data, self.timeout_ms)
75         except usb.core.USBError as e:
76             print(f"Timeout: error", e)
77             return False
78         return True
79
80     def recv_data(self, exp_len, timeout=100):
81         msg = bytearray(b'')
82         if (exp_len > MAX_TRANSFER_SIZE):
83             raise ValueError(f"length package {exp_len} over max transfer size")
84         if (self.rx_ep == None):
85             return False
86         try:
87             received = 0
88             while received != exp_len:
89                 buf = self.rx_ep.read(exp_len, timeout)
90                 msg += buf
91                 received += len(buf)
92                 if received == exp_len:
93                     return msg
94         except usb.core.USBError as e:
95             print(f"Timeout: error", e)
96         return msg