RAFlasher.py: move inquire connection to RAConnect class
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5 from RAPacker import *
6
7 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
8
9 class RAConnect:
10     def __init__(self, vendor_id, product_id):
11         self.vendor_id = vendor_id
12         self.product_id = product_id
13         self.ep_in = 0x81
14         self.ep_out = 0x02
15         self.max_tries = 20
16         self.timeout_ms = 100
17         self.dev = None
18         self.rx_ep = None
19         self.tx_ep = None
20
21         self.find_device()
22         status_conn = self.inquire_connection()
23         if not status_conn:
24             self.confirm_connection()
25
26     def find_device(self):
27         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
28         if self.dev is None:
29             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
30
31         for config in self.dev:
32             intf = config[(1,0)]
33             product_name = usb.util.get_string(self.dev, self.dev.iProduct)
34             print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
35             if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
36                 print("Found kernel driver, detaching ... ")
37                 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
38             for ep in intf:
39                 if (ep.bmAttributes == 0x02):
40                     if ep.bEndpointAddress == self.ep_in:
41                         self.rx_ep = ep
42                     elif ep.bEndpointAddress == self.ep_out:
43                         self.tx_ep = ep
44             return True
45
46         raise ValueError("Device does not have a CDC interface")
47
48
49     def inquire_connection(self):
50         packed = pack_pkt(INQ_CMD, "")
51         self.send_data(packed)
52         info = self.recv_data(7)
53         if info == bytearray(b'\x00') or info == bytearray(b''):
54             return False
55         msg = unpack_pkt(info)
56         #print("Connection already established")
57         return True
58
59     def establish_connection(self):
60         for i in range(self.max_tries):
61             try:
62                 self.tx_ep.write(bytes([0x00]), self.timeout_ms)
63                 ret = self.rx_ep.read(1, self.timeout_ms)
64                 if ret[0] == 0x00:
65                     print("Reply ACK received (0x00)")
66                     return True
67             except usb.core.USBError as e:
68                 print(f"Timeout: retry #{i}", e)
69         return False
70
71     def confirm_connection(self):
72         for i in range(self.max_tries):
73             try:
74                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
75                 ret = self.rx_ep.read(1, self.timeout_ms)
76                 if ret[0] == 0xC3:
77                     print("Reply received (0xC3)")
78                     return True
79             except usb.core.USBError as e:
80                 print(f"Timeout: retry #{i}", e)
81         return False
82
83     def authenticate_connection(self):
84         raise Exception("Not implemented")
85
86     def send_data(self, packed_data):
87         if (self.tx_ep == None):
88             return False
89         try:
90             self.tx_ep.write(packed_data, self.timeout_ms)
91         except usb.core.USBError as e:
92             print(f"Timeout: error", e)
93             return False
94         return True
95
96     def recv_data(self, exp_len, timeout=100):
97         msg = bytearray(b'')
98         if (exp_len > MAX_TRANSFER_SIZE):
99             raise ValueError(f"length package {exp_len} over max transfer size")
100         if (self.rx_ep == None):
101             return False
102         try:
103             received = 0
104             while received != exp_len:
105                 buf = self.rx_ep.read(exp_len, timeout)
106                 msg += buf
107                 received += len(buf)
108                 if received == exp_len:
109                     return msg
110         except usb.core.USBError as e:
111             print(f"Timeout: error", e)
112         return msg