RAFlasher.py: write setup communication with start and end address conv
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5
6 MAX_TRANSFER_SIZE = 64
7
8 class RAConnect:
9     def __init__(self, vendor_id, product_id):
10         self.vendor_id = vendor_id
11         self.product_id = product_id
12         self.ep_in = 0x82
13         self.ep_out = 0x02
14         self.max_tries = 20
15         self.timeout_ms = 100
16         self.dev = None
17         self.rx_ep = None
18         self.tx_ep = None
19         self.find_device()
20
21     def find_device(self):
22         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
23         if self.dev is None:
24             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
25
26         for config in self.dev:
27             for intf in config:
28                 if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0x02 or d.bInterfaceClass == 0xFF)):
29                     print("Found serial device with 0x02 | 0xFF")
30                     if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
31                         print("Found kernel driver, detaching ... ")
32                         self.dev.detach_kernel_driver(intf.bInterfaceNumber)
33                     for ep in intf:
34                         if (ep.bmAttributes == 0x02):
35                             if ep.bEndpointAddress == self.ep_in:
36                                 self.rx_ep = ep
37                                 print(ep)
38                             elif ep.bEndpointAddress == self.ep_out:
39                                 self.tx_ep = ep
40                                 print(ep)
41                     return True
42
43         raise ValueError("Device does not have a serial interface")
44
45     def establish_connection(self):
46         for i in range(self.max_tries):
47             try:
48                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
49                 ret = self.rx_ep.read(1, self.timeout_ms)
50                 if ret[0] == 0x00:
51                     print("ACK received")
52                     return True
53             except usb.core.USBError as e:
54                 print(f"Timeout: retry #{i}", e)
55         return False
56
57     def confirm_connection(self):
58         for i in range(self.max_tries):
59             try:
60                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
61                 ret = self.rx_ep.read(1, self.timeout_ms)
62                 if ret[0] == 0xC3:
63                     print("ACK received")
64                     return True
65             except usb.core.USBError as e:
66                 print(f"Timeout: retry #{i}", e)
67         return False
68
69     def authenticate_connection(self):
70         raise Exception("Not implemented")
71
72     def send_data(self, packed_data):
73         if (self.tx_ep == None):
74             return False
75         try:
76             self.tx_ep.write(packed_data, self.timeout_ms)
77         except usb.core.USBError as e:
78             print(f"Timeout: error", e)
79             return False
80         return True
81
82     # packets are length 7, except for a read package
83     def recv_data(self, exp_len):
84         if (exp_len > MAX_TRANSFER_SIZE):
85             raise ValueError(f"length package {exp_len} over max transfer size")
86         if (self.rx_ep == None):
87             return False
88         try:
89             msg = self.rx_ep.read(exp_len, self.timeout_ms)
90         except usb.core.USBError as e:
91             print(f"Timeout: error", e)
92             return False
93         return msg
94
95
96 #communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
97
98 #communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA')
99 #communicator.recv_data(7)
100
101 #if not communicator.establish_connection():
102 #    print("Cannot connect")
103 #    sys.exit(0)
104 #
105 #if not communicator.confirm_connection():
106 #    print("Failed to confirm boot code")
107 #    sys.exit(0)
108