AS5600

Форк
0
/
bus_service.py 
241 строка · 13.3 Кб
1
# micropython
2
# MIT license
3
# Copyright (c) 2022 Roman Shevchik   goctaprog@gmail.com
4
"""MicroPython модуль для работы с шинами ввода/вывода"""
5

6
import math
7
from machine import I2C, SPI, Pin
8

9

10
def _mpy_bl(value: int) -> int:
11
    """Возвращает место, занимаемое значением value в битах.
12
    Аналог int.bit_length(), которая есть в Python, но отсутствует в MicroPython!"""
13
    if 0 == value:
14
        return 0
15
    return 1 + int(math.log2(abs(value)))
16

17

18
class BusAdapter:
19
    """Посредник между шиной ввода/вывода и классом ввода/вывода устройства"""
20
    def __init__(self, bus: [I2C, SPI]):
21
        self.bus = bus
22

23
    def get_bus_type(self) -> type:
24
        """Возвращает тип шины"""
25
        return type(self.bus)
26

27
    def read_register(self, device_addr: [int, Pin], reg_addr: int, bytes_count: int) -> bytes:
28
        """считывает из регистра датчика значение.
29
        device_addr - адрес датчика на шине. Для шины SPI это физический вывод MCU!
30
        reg_addr - адрес регистра в адресном пространстве датчика.
31
        bytes_count - размер значения в байтах."""
32
        raise NotImplementedError
33

34
    def write_register(self, device_addr: [int, Pin], reg_addr: int, value: [int, bytes, bytearray],
35
                       bytes_count: int, byte_order: str):
36
        """записывает данные value в датчик, по адресу reg_addr.
37
        bytes_count - кол-во записываемых байт из value.
38
        byte_order - порядок расположения байт в записываемом значении."""
39
        raise NotImplementedError
40

41
    def read(self, device_addr: [int, Pin], n_bytes: int) -> bytes:
42
        """Читает из устройства на шине с адресом device_addr, n_bytes байт.
43
        Возвращает экземпляр класса типа bytes"""
44
        raise NotImplementedError
45

46
    def read_to_buf(self, device_addr: [int, Pin], buf: bytearray) -> bytes:
47
        """Читает из устройства на шине, с адресом device_addr, кол-во байт, равное длине буфера buf.
48
        Возвращает ссылку на buf"""
49
        raise NotImplementedError
50

51
    def write(self, device_addr: [int, Pin], buf: bytes):
52
        """Записывает в устройство на шине все байты из буфера buf"""
53
        raise NotImplementedError
54

55
    def write_const(self, device_addr: [int, Pin], val: int, count: int):
56
        """Отправляет пакет байт со значение val количеством count на шину.
57
        Часто, при работе с дисплеями или памятью, требуется заполнение экрана/области
58
        постоянным значением. Для этого и предназначен этот метод!
59
        Вызов его для сравнительно медленных шин - плохая идея!"""
60
        if 0 == count:
61
            return  # нет ничего
62
        # bl = val.bit_length()     # bit_length() отсутствует в MicroPython
63
        bl = _mpy_bl(val)
64
        if bl > 8:
65
            raise ValueError(f"The value must take no more than 8 bits! Current: {bl}")
66
        _max = 16
67
        if count < _max:
68
            _max = count
69
        # вычисляю кол-во повторений тела цикла
70
        repeats = count // _max  # количество итераций
71
        b = bytearray([val for _ in range(_max)])
72
        for _ in range(repeats):
73
            self.write(device_addr, b)
74
        # вычисляю остаток
75
        remainder = count - _max * repeats
76
        if remainder:
77
            b = bytearray([val for _ in range(remainder)])
78
            self.write(device_addr, b)
79

80
    def read_buf_from_memory(self, device_addr: [int, Pin], mem_addr, buf, address_size: int):
81
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
82
        Количество считываемых байт определяется длинной буфера buf.
83
        address_size - определяет размер адреса в байтах. (в ESP8266 этот аргумент не
84
        распознается и размер адреса всегда равен 1 (8 бит))."""
85
        raise NotImplementedError
86

87
    def write_buf_to_memory(self, device_addr: [int, Pin], mem_addr, buf):
88
        raise NotImplementedError
89

90

91
class I2cAdapter(BusAdapter):
92
    """Адаптер шины I2C"""
93
    def __init__(self, bus: I2C):
94
        super().__init__(bus)
95

96
    def write_register(self, device_addr: int, reg_addr: int, value: [int, bytes, bytearray],
97
                       bytes_count: int, byte_order: str):
98
        """записывает данные value в датчик, по адресу reg_addr.
99
        bytes_count - кол-во записываемых данных
100
        value - должно быть типов int, bytes, bytearray"""
101
        buf = None
102
        if isinstance(value, int):
103
            buf = value.to_bytes(bytes_count, byte_order)
104
        if isinstance(value, (bytes, bytearray)):
105
            buf = value
106

107
        return self.bus.writeto_mem(device_addr, reg_addr, buf)
108

109
    def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
110
        """считывает из регистра датчика значение.
111
        bytes_count - размер значения в байтах"""
112
        return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)
113

114
    def read(self, device_addr: int, n_bytes: int) -> bytes:
115
        return self.bus.readfrom(device_addr, n_bytes)
116

117
    def read_to_buf(self, device_addr: int, buf: bytearray) -> bytes:
118
        """Читает из устройства на шине с адресом device_addr в буфер buf количество байт, равное длине(len) буфера!"""
119
        self.bus.readfrom_into(device_addr, buf)
120
        return buf
121
    
122
    def write(self, device_addr: int, buf: bytes):
123
        return self.bus.writeto(device_addr, buf)
124

125
    def read_buf_from_memory(self, device_addr: int, mem_addr, buf, address_size: int = 1):
126
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
127
        Количество считываемых байт определяется длинной буфера buf.
128
        address_size - определяет размер адреса в байтах. (в ESP8266 этот аргумент не распознается и размер адреса
129
        всегда равен 1 (8 бит)).
130
        Расширение возможностей базового класса."""
131
        return self.bus.readfrom_mem_into(device_addr, mem_addr, buf)
132

133
    def write_buf_to_memory(self, device_addr: int, mem_addr, buf):
134
        """Записывает в устройство с адресом device_addr все байты из буфера buf.
135
        Запись начинается с адреса в устройстве: mem_addr.
136
        Расширение возможностей базового класса."""
137
        return self.bus.writeto_mem(device_addr, mem_addr, buf)
138

139

140
class SpiAdapter(BusAdapter):
141
    """Адаптер шины SPI"""
142
    def __init__(self, bus: SPI, data_mode: Pin = None):
143
        """Параметр data_mode представляет собой вывод MCU, который используется для установки флага,
144
        что посылка является данными (high) или командой (low). Например это необходимо при обмене с ILI9481."""
145
        super().__init__(bus)
146
        # вывод MCU для режима данных
147
        self.data_mode_pin = data_mode
148
        # использовать ли вывод MCU для режима данных (Истина) или команд (Ложь)
149
        self.use_data_mode_pin = False
150
        # флаг для методов write.. . Если Истина, то data_mode (Pin) будет установлена в Истина, иначе в Ложь!
151
        # flag for write.. methods. If True, then data_mode (Pin) will be set to True, otherwise to False!
152
        self.data_packet = False
153
        # индекс/номер байта в пересылаемом устройству по шину буферу, в котором находится адрес регистра устройства!
154
        self._address_index = 0
155
        # ссылка на функцию подготовки содержимого буфера перед его пересылкой в устройство!
156
        # вида prepare(buf:bytearray, address_index:int) -> bytes: ...
157
        # или None
158
        self._prepare_before_send_ref = None
159

160
    @property
161
    def prepare_func(self):
162
        """Возвращает ссылку на функцию обработки буфера перед отправкой его по шине"""
163
        return self._prepare_before_send_ref
164

165
    @prepare_func.setter
166
    def prepare_func(self, value):
167
        """Устанавливает ссылку на функцию обработки буфера перед отправкой его по шине"""
168
        self._prepare_before_send_ref = value
169

170
    def _call_prepare(self, buf: bytearray):
171
        ref = self._prepare_before_send_ref
172
        if ref is not None:
173
            ref(buf, self._address_index)
174

175
    def read(self, device_addr: Pin, n_bytes: int) -> bytes:
176
        """Read a number of bytes specified by n_bytes while continuously writing the single byte given by write.
177
        Returns a bytes object with the data that was read."""
178
        try:
179
            device_addr.low()
180
            return self.bus.read(n_bytes)
181
        finally:
182
            device_addr.high()
183

184
    def read_to_buf(self, device_addr: Pin, buf) -> bytes:
185
        """Читает из устройства на шине с адресом device_addr в буфер buf количество байт, равное длине(len) буфера!"""
186
        try:
187
            device_addr.low()
188
            self.bus.readinto(buf, 0x00)
189
            return buf
190
        finally:
191
            device_addr.high()
192

193
    def write(self, device_addr: Pin, buf: bytes):
194
        """Параметр data_packet представляет собой признак того, что посылка является данными (high) или командой (low).
195
        Например это необходимо при обмене ILI9481.
196
        Write the bytes contained in buf. Returns None.
197
        The data_packet parameter is an indication that the package is data (high) or command (low).
198
         For example, this is necessary when exchanging ILI9481."""
199
        try:
200
            device_addr.low()   # chip select
201
            if self.use_data_mode_pin and self.data_mode_pin:
202
                self.data_mode_pin.value(self.data_packet)
203
            return self.bus.write(buf)
204
        finally:
205
            device_addr.high()
206

207
    def write_and_read(self, device_addr: Pin, wr_buf: bytes, rd_buf: bytes):
208
        """Параметр data_packet представляет собой признак того, что посылка является данными (high) или командой (low).
209
        Например это необходимо при обмене ILI9481.
210
        Расширение возможностей базового класса.
211
        Write the bytes from write_buf while reading into read_buf. The buffers can be the same or different,
212
        but both buffers must have the same length. Returns None.
213
        The data_packet parameter is an indication that the package is data (high) or command (low).
214
         For example, this is necessary when exchanging ILI9481."""
215
        try:
216
            device_addr.low()   # chip select
217
            if self.use_data_mode_pin and self.data_mode_pin:
218
                self.data_mode_pin.value(self.data_packet)
219
            return self.bus.write_readinto(wr_buf, rd_buf)
220
        finally:
221
            device_addr.high()
222

223
    def read_buf_from_memory(self, device_addr: Pin, mem_addr, buf):
224
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
225
        Количество считываемых байт определяется длинной буфера buf."""
226
        try:
227
            device_addr.low()  # chip select
228
            # пока нет реализации!!!
229
            raise NotImplementedError
230
        finally:
231
            device_addr.high()
232

233
    def write_buf_to_memory(self, device_addr: Pin, mem_addr, buf):
234
        try:
235
            device_addr.low()  # chip select
236
            # подготовка буфера к пересылке
237
            self._call_prepare(buf)
238
            # пока нет реализации!!!
239
            raise NotImplementedError
240
        finally:
241
            device_addr.high()
242

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.