license: add header to various source files
[renesas-ra-flasher] / src / RAConnect.py
1 # Copyright (C) Robin Krens - 2024
2
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
16
17
18 import sys
19 import time
20 import usb.core
21 import usb.util
22 from RAPacker import *
23
24 MAX_TRANSFER_SIZE = 2048 + 6 # include header and footer
25
26 class RAConnect:
27     def __init__(self, vendor_id, product_id):
28         self.vendor_id = vendor_id
29         self.product_id = product_id
30         self.ep_in = 0x81
31         self.ep_out = 0x02
32         self.max_tries = 20
33         self.timeout_ms = 100
34         self.dev = None
35         self.rx_ep = None
36         self.tx_ep = None
37         self.chip_layout = []
38         self.sel_area = 0 # default to Area 0
39
40         self.find_device()
41         status_conn = self.inquire_connection()
42         if not status_conn:
43             self.confirm_connection()
44
45     def find_device(self):
46         self.dev = usb.core.find(idVendor=self.vendor_id, idProduct=self.product_id)
47         if self.dev is None:
48             raise ValueError(f"Device {self.vendor_id}:{self.product_id} not found\nAre you sure it is connected?")
49
50         for config in self.dev:
51             intf = config[(1,0)]
52             product_name = usb.util.get_string(self.dev, self.dev.iProduct)
53             print(f'Found {product_name} ({self.vendor_id}:{self.product_id})')
54             if self.dev.is_kernel_driver_active(intf.bInterfaceNumber):
55                 print("Found kernel driver, detaching ... ")
56                 self.dev.detach_kernel_driver(intf.bInterfaceNumber)
57             for ep in intf:
58                 if (ep.bmAttributes == 0x02):
59                     if ep.bEndpointAddress == self.ep_in:
60                         self.rx_ep = ep
61                     elif ep.bEndpointAddress == self.ep_out:
62                         self.tx_ep = ep
63             return True
64
65         raise ValueError("Device does not have a CDC interface")
66
67
68     def inquire_connection(self):
69         packed = pack_pkt(INQ_CMD, "")
70         self.send_data(packed)
71         info = self.recv_data(7)
72         if info == bytearray(b'\x00') or info == bytearray(b''):
73             return False
74         msg = unpack_pkt(info)
75         #print("Connection already established")
76         return True
77
78     def confirm_connection(self):
79         for i in range(self.max_tries):
80             try:
81                 self.tx_ep.write(bytes([0x55]), self.timeout_ms)
82                 ret = self.rx_ep.read(1, self.timeout_ms)
83                 if ret[0] == 0xC3:
84                     print("Reply received (0xC3)")
85                     return True
86             except usb.core.USBError as e:
87                 print(f"Timeout: retry #{i}", e)
88         return False
89
90     def authenticate_connection(self):
91         raise Exception("Not implemented")
92
93     def set_chip_layout(self, cfg):
94         if cfg == None:
95             raise ValueError("Could net get chip layout")
96         self.chip_layout = cfg
97
98     def send_data(self, packed_data):
99         if (self.tx_ep == None):
100             return False
101         try:
102             self.tx_ep.write(packed_data, self.timeout_ms)
103         except usb.core.USBError as e:
104             print(f"Timeout: error", e)
105             return False
106         return True
107
108     def recv_data(self, exp_len, timeout=100):
109         msg = bytearray(b'')
110         if (exp_len > MAX_TRANSFER_SIZE):
111             raise ValueError(f"length package {exp_len} over max transfer size")
112         if (self.rx_ep == None):
113             return False
114         try:
115             received = 0
116             while received != exp_len:
117                 buf = self.rx_ep.read(exp_len, timeout)
118                 msg += buf
119                 received += len(buf)
120                 if received == exp_len:
121                     return msg
122         except usb.core.USBError as e:
123             print(f"Timeout: error", e)
124         return msg