RAFlasher.py: initial working write
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5
6 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
7
8 class RAConnect:
9     def __init__(self, vendor_id, product_id):
10         self.vendor_id = vendor_id
11         self.product_id = product_id
12         self.ep_in = 0x81
13         self.ep_out = 0x02
14         self.max_tries = 20
15         self.timeout_ms = 100
16         self.dev = None
17         self.rx_ep = None
18         self.tx_ep = None
19         self.find_device()
20
21     def reset(self):
22         if self.dev is None:
23             print(f'Device not connected')
24         self.dev.reset()
25
26     def find_device(self):
27         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
28         if self.dev is None:
29             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
30
31         for config in self.dev:
32             #print(config)
33             intf = config[(1,0)]
34             #for intf in config:
35                 
36                 #if usb.util.find_descriptor(config, custom_match=lambda d: (d.bInterfaceClass == 0xa or d.bInterfaceClass == 0xBB)):
37             print("Found serial device with 0x0a | 0xFF")
38             if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
39                 print("Found kernel driver, detaching ... ")
40                 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
41             for ep in intf:
42                 #print("=========")
43                 #print(ep)
44                 if (ep.bmAttributes == 0x02):
45                     if ep.bEndpointAddress == self.ep_in:
46                         self.rx_ep = ep
47                         #print(ep)
48                     elif ep.bEndpointAddress == self.ep_out:
49                         self.tx_ep = ep
50                         #print(ep)
51             return True
52
53         raise ValueError("Device does not have a serial interface")
54
55     def establish_connection(self):
56         for i in range(self.max_tries):
57             try:
58                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
59                 ret = self.rx_ep.read(1, self.timeout_ms)
60                 if ret[0] == 0x00:
61                     print("Reply ACK received (0x00)")
62                     return True
63             except usb.core.USBError as e:
64                 print(f"Timeout: retry #{i}", e)
65         return False
66
67     def confirm_connection(self):
68         for i in range(self.max_tries):
69             try:
70                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
71                 ret = self.rx_ep.read(1, self.timeout_ms)
72                 if ret[0] == 0xC3:
73                     print("Reply received (0xC3)")
74                     return True
75             except usb.core.USBError as e:
76                 print(f"Timeout: retry #{i}", e)
77         return False
78
79     def authenticate_connection(self):
80         raise Exception("Not implemented")
81
82     def send_data(self, packed_data):
83         if (self.tx_ep == None):
84             return False
85         try:
86             self.tx_ep.write(packed_data, self.timeout_ms)
87         except usb.core.USBError as e:
88             print(f"Timeout: error", e)
89             return False
90         return True
91
92     # packets are length 7, except for a read package
93     def recv_data(self, exp_len):
94         msg = bytearray(b'')
95         if (exp_len > MAX_TRANSFER_SIZE):
96             raise ValueError(f"length package {exp_len} over max transfer size")
97         if (self.rx_ep == None):
98             return False
99         try:
100             received = 0
101             while received != exp_len:
102                 buf = self.rx_ep.read(exp_len, self.timeout_ms)
103                 msg += buf
104                 #print(buf, len(buf))
105                 received += len(buf)
106                 if received == exp_len:
107                     return msg
108         except usb.core.USBError as e:
109             print(f"Timeout: error", e)
110             #return False
111         return msg
112
113
114 #communicator = RAConnect(vendor_id=0x1a86, product_id=0x7523)
115
116 #communicator.send_data(b'\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA\xAA')
117 #communicator.recv_data(7)
118
119 #if not communicator.establish_connection():
120 #    print("Cannot connect")
121 #    sys.exit(0)
122 #
123 #if not communicator.confirm_connection():
124 #    print("Failed to confirm boot code")
125 #    sys.exit(0)
126