tests: add/modify ack=True test
[renesas-ra-flasher] / src / RAConnect.py
1 import sys
2 import time
3 import usb.core
4 import usb.util
5 from RAPacker import *
6
7 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
8
9 class RAConnect:
10     def __init__(self, vendor_id, product_id):
11         self.vendor_id = vendor_id
12         self.product_id = product_id
13         self.ep_in = 0x81
14         self.ep_out = 0x02
15         self.max_tries = 20
16         self.timeout_ms = 100
17         self.dev = None
18         self.rx_ep = None
19         self.tx_ep = None
20         self.chip_layout = []
21         self.sel_area = 0 # default to Area 0
22
23         self.find_device()
24         status_conn = self.inquire_connection()
25         if not status_conn:
26             self.confirm_connection()
27
28     def find_device(self):
29         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
30         if self.dev is None:
31             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
32
33         for config in self.dev:
34             intf = config[(1,0)]
35             product_name = usb.util.get_string(self.dev, self.dev.iProduct)
36             print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
37             if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
38                 print("Found kernel driver, detaching ... ")
39                 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
40             for ep in intf:
41                 if (ep.bmAttributes == 0x02):
42                     if ep.bEndpointAddress == self.ep_in:
43                         self.rx_ep = ep
44                     elif ep.bEndpointAddress == self.ep_out:
45                         self.tx_ep = ep
46             return True
47
48         raise ValueError("Device does not have a CDC interface")
49
50
51     def inquire_connection(self):
52         packed = pack_pkt(INQ_CMD, "")
53         self.send_data(packed)
54         info = self.recv_data(7)
55         if info == bytearray(b'\x00') or info == bytearray(b''):
56             return False
57         msg = unpack_pkt(info)
58         #print("Connection already established")
59         return True
60
61     def confirm_connection(self):
62         for i in range(self.max_tries):
63             try:
64                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
65                 ret = self.rx_ep.read(1, self.timeout_ms)
66                 if ret[0] == 0xC3:
67                     print("Reply received (0xC3)")
68                     return True
69             except usb.core.USBError as e:
70                 print(f"Timeout: retry #{i}", e)
71         return False
72
73     def authenticate_connection(self):
74         raise Exception("Not implemented")
75
76     def set_chip_layout(self, cfg):
77         if cfg == None:
78             raise ValueError("Could net get chip layout")
79         self.chip_layout = cfg
80
81     def send_data(self, packed_data):
82         if (self.tx_ep == None):
83             return False
84         try:
85             self.tx_ep.write(packed_data, self.timeout_ms)
86         except usb.core.USBError as e:
87             print(f"Timeout: error", e)
88             return False
89         return True
90
91     def recv_data(self, exp_len, timeout=100):
92         msg = bytearray(b'')
93         if (exp_len > MAX_TRANSFER_SIZE):
94             raise ValueError(f"length package {exp_len} over max transfer size")
95         if (self.rx_ep == None):
96             return False
97         try:
98             received = 0
99             while received != exp_len:
100                 buf = self.rx_ep.read(exp_len, timeout)
101                 msg += buf
102                 received += len(buf)
103                 if received == exp_len:
104                     return msg
105         except usb.core.USBError as e:
106             print(f"Timeout: error", e)
107         return msg