ltr390uv

Форк
0
/
bus_service.py 
242 строки · 13.3 Кб
1
# micropython
2
# MIT license
3
# Copyright (c) 2022 Roman Shevchik   goctaprog@gmail.com
4
"""MicroPython модуль для работы с шинами ввода/вывода"""
5

6
import math
7
from machine import I2C, SPI, Pin
8

9

10
def mpy_bl(value: int) -> int:
11
    """Возвращает место, занимаемое значением value в битах.
12
    Аналог int.bit_length(), которая есть в Python, но отсутствует в MicroPython!"""
13
    if 0 == value:
14
        return 0
15
    return 1 + int(math.log2(abs(value)))
16

17

18
class BusAdapter:
19
    """Посредник между шиной ввода/вывода и классом ввода/вывода устройства"""
20
    def __init__(self, bus: [I2C, SPI]):
21
        self.bus = bus
22

23
    def get_bus_type(self) -> type:
24
        """Возвращает тип шины"""
25
        return type(self.bus)
26

27
    def read_register(self, device_addr: [int, Pin], reg_addr: int, bytes_count: int) -> bytes:
28
        """считывает из регистра датчика значение.
29
        device_addr - адрес датчика на шине. Для шины SPI это физический вывод MCU!
30
        reg_addr - адрес регистра в адресном пространстве датчика.
31
        bytes_count - размер значения в байтах."""
32
        raise NotImplementedError
33

34
    def write_register(self, device_addr: [int, Pin], reg_addr: int, value: [int, bytes, bytearray],
35
                       bytes_count: int, byte_order: str):
36
        """записывает данные value в датчик, по адресу reg_addr.
37
        bytes_count - кол-во записываемых байт из value.
38
        byte_order - порядок расположения байт в записываемом значении."""
39
        raise NotImplementedError
40

41
    def read(self, device_addr: [int, Pin], n_bytes: int) -> bytes:
42
        """Читает из устройства на шине с адресом device_addr, n_bytes байт.
43
        Возвращает экземпляр класса типа bytes"""
44
        raise NotImplementedError
45

46
    def read_to_buf(self, device_addr: [int, Pin], buf: bytearray) -> bytes:
47
        """Читает из устройства на шине, с адресом device_addr, кол-во байт, равное длине буфера buf.
48
        Возвращает ссылку на buf"""
49
        raise NotImplementedError
50

51
    def write(self, device_addr: [int, Pin], buf: bytes):
52
        """Записывает в устройство на шине все байты из буфера buf"""
53
        raise NotImplementedError
54

55
    def write_const(self, device_addr: [int, Pin], val: int, count: int):
56
        """Отправляет пакет байт со значение val количеством count на шину.
57
        Часто, при работе с дисплеями или памятью, требуется заполнение экрана/области
58
        постоянным значением. Для этого и предназначен этот метод!
59
        Вызов его для сравнительно медленных шин - плохая идея!"""
60
        if 0 == count:
61
            return  # нет ничего
62
        # bl = val.bit_length()     # bit_length() отсутствует в MicroPython
63
        bl = mpy_bl(val)
64
        if bl > 8:
65
            raise ValueError(f"The value must take no more than 8 bits! Current: {bl}")
66
        _max = 16
67
        if count < _max:
68
            _max = count
69
        # вычисляю кол-во повторений тела цикла
70
        repeats = count // _max  # количество итераций
71
        b = bytearray([val for _ in range(_max)])
72
        for _ in range(repeats):
73
            self.write(device_addr, b)
74
        # вычисляю остаток
75
        remainder = count - _max * repeats
76
        if remainder:
77
            b = bytearray([val for _ in range(remainder)])
78
            self.write(device_addr, b)
79

80
    def read_buf_from_memory(self, device_addr: [int, Pin], mem_addr, buf, address_size: int):
81
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
82
        Количество считываемых байт определяется длинной буфера buf.
83
        address_size - определяет размер адреса в байтах. (в ESP8266 этот аргумент не
84
        распознается и размер адреса всегда равен 1 (8 бит))."""
85
        raise NotImplementedError
86

87
    def write_buf_to_memory(self, device_addr: [int, Pin], mem_addr, buf):
88
        raise NotImplementedError
89

90

91
class I2cAdapter(BusAdapter):
92
    """Адаптер шины I2C"""
93
    def __init__(self, bus: I2C):
94
        super().__init__(bus)
95

96
    def write_register(self, device_addr: int, reg_addr: int, value: [int, bytes, bytearray],
97
                       bytes_count: int, byte_order: str):
98
        """записывает данные value в датчик, по адресу reg_addr.
99
        bytes_count - кол-во записываемых данных
100
        value - должно быть типов int, bytes, bytearray"""
101
        buf = None
102
        if isinstance(value, int):
103
            buf = value.to_bytes(bytes_count, byte_order)
104
        if isinstance(value, (bytes, bytearray)):
105
            buf = value
106

107
        return self.bus.writeto_mem(device_addr, reg_addr, buf)
108

109
    def read_register(self, device_addr: int, reg_addr: int, bytes_count: int) -> bytes:
110
        """считывает из регистра датчика значение.
111
        bytes_count - размер значения в байтах"""
112
        return self.bus.readfrom_mem(device_addr, reg_addr, bytes_count)
113

114
    def read(self, device_addr: int, n_bytes: int) -> bytes:
115
        return self.bus.readfrom(device_addr, n_bytes)
116

117
    def read_to_buf(self, device_addr: int, buf: bytearray) -> bytes:
118
        """Читает из устройства на шине с адресом device_addr в буфер buf количество байт, равное длине(len) буфера!"""
119
        self.bus.readfrom_into(device_addr, buf)
120
        return buf
121
    
122
    def write(self, device_addr: int, buf: bytes):
123
        return self.bus.writeto(device_addr, buf)
124

125
    def read_buf_from_memory(self, device_addr: int, mem_addr, buf, address_size: int = 1):
126
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
127
        Количество считываемых байт определяется длинной буфера buf.
128
        address_size - определяет размер адреса в байтах. (в ESP8266 этот аргумент не распознается и размер адреса
129
        всегда равен 1 (8 бит)).
130
        Расширение возможностей базового класса."""
131
        self.bus.readfrom_mem_into(device_addr, mem_addr, buf)
132
        return buf
133

134
    def write_buf_to_memory(self, device_addr: int, mem_addr, buf):
135
        """Записывает в устройство с адресом device_addr все байты из буфера buf.
136
        Запись начинается с адреса в устройстве: mem_addr.
137
        Расширение возможностей базового класса."""
138
        return self.bus.writeto_mem(device_addr, mem_addr, buf)
139

140

141
class SpiAdapter(BusAdapter):
142
    """Адаптер шины SPI"""
143
    def __init__(self, bus: SPI, data_mode: Pin = None):
144
        """Параметр data_mode представляет собой вывод MCU, который используется для установки флага,
145
        что посылка является данными (high) или командой (low). Например это необходимо при обмене с ILI9481."""
146
        super().__init__(bus)
147
        # вывод MCU для режима данных
148
        self.data_mode_pin = data_mode
149
        # использовать ли вывод MCU для режима данных (Истина) или команд (Ложь)
150
        self.use_data_mode_pin = False
151
        # флаг для методов write.. . Если Истина, то data_mode (Pin) будет установлена в Истина, иначе в Ложь!
152
        # flag for write.. methods. If True, then data_mode (Pin) will be set to True, otherwise to False!
153
        self.data_packet = False
154
        # индекс/номер байта в пересылаемом устройству по шину буферу, в котором находится адрес регистра устройства!
155
        self._address_index = 0
156
        # ссылка на функцию подготовки содержимого буфера перед его пересылкой в устройство!
157
        # вида prepare(buf:bytearray, address_index:int) -> bytes: ...
158
        # или None
159
        self._prepare_before_send_ref = None
160

161
    @property
162
    def prepare_func(self):
163
        """Возвращает ссылку на функцию обработки буфера перед отправкой его по шине"""
164
        return self._prepare_before_send_ref
165

166
    @prepare_func.setter
167
    def prepare_func(self, value):
168
        """Устанавливает ссылку на функцию обработки буфера перед отправкой его по шине"""
169
        self._prepare_before_send_ref = value
170

171
    def _call_prepare(self, buf: bytearray):
172
        ref = self._prepare_before_send_ref
173
        if ref is not None:
174
            ref(buf, self._address_index)
175

176
    def read(self, device_addr: Pin, n_bytes: int) -> bytes:
177
        """Read a number of bytes specified by n_bytes while continuously writing the single byte given by write.
178
        Returns a bytes object with the data that was read."""
179
        try:
180
            device_addr.low()
181
            return self.bus.read(n_bytes)
182
        finally:
183
            device_addr.high()
184

185
    def read_to_buf(self, device_addr: Pin, buf) -> bytes:
186
        """Читает из устройства на шине с адресом device_addr в буфер buf количество байт, равное длине(len) буфера!"""
187
        try:
188
            device_addr.low()
189
            self.bus.readinto(buf, 0x00)
190
            return buf
191
        finally:
192
            device_addr.high()
193

194
    def write(self, device_addr: Pin, buf: bytes):
195
        """Параметр data_packet представляет собой признак того, что посылка является данными (high) или командой (low).
196
        Например это необходимо при обмене ILI9481.
197
        Write the bytes contained in buf. Returns None.
198
        The data_packet parameter is an indication that the package is data (high) or command (low).
199
         For example, this is necessary when exchanging ILI9481."""
200
        try:
201
            device_addr.low()   # chip select
202
            if self.use_data_mode_pin and self.data_mode_pin:
203
                self.data_mode_pin.value(self.data_packet)
204
            return self.bus.write(buf)
205
        finally:
206
            device_addr.high()
207

208
    def write_and_read(self, device_addr: Pin, wr_buf: bytes, rd_buf: bytes):
209
        """Параметр data_packet представляет собой признак того, что посылка является данными (high) или командой (low).
210
        Например это необходимо при обмене ILI9481.
211
        Расширение возможностей базового класса.
212
        Write the bytes from write_buf while reading into read_buf. The buffers can be the same or different,
213
        but both buffers must have the same length. Returns None.
214
        The data_packet parameter is an indication that the package is data (high) or command (low).
215
         For example, this is necessary when exchanging ILI9481."""
216
        try:
217
            device_addr.low()   # chip select
218
            if self.use_data_mode_pin and self.data_mode_pin:
219
                self.data_mode_pin.value(self.data_packet)
220
            return self.bus.write_readinto(wr_buf, rd_buf)
221
        finally:
222
            device_addr.high()
223

224
    def read_buf_from_memory(self, device_addr: Pin, mem_addr, buf):
225
        """Читает из устройства с адресом device_addr в буфер buf, начиная с адреса в устройстве mem_addr.
226
        Количество считываемых байт определяется длинной буфера buf."""
227
        try:
228
            device_addr.low()  # chip select
229
            # пока нет реализации!!!
230
            raise NotImplementedError
231
        finally:
232
            device_addr.high()
233

234
    def write_buf_to_memory(self, device_addr: Pin, mem_addr, buf):
235
        try:
236
            device_addr.low()  # chip select
237
            # подготовка буфера к пересылке
238
            self._call_prepare(buf)
239
            # пока нет реализации!!!
240
            raise NotImplementedError
241
        finally:
242
            device_addr.high()
243

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.