Что такое Debezium и CDC
Debezium — это платформа с открытым исходным кодом (распространяется под лицензией Apache 2.0) для CDC (change data capture, захват изменений данных), которая позволяет отслеживать различные изменения — вставку, обновление или удаление записей в базе данных и записывать эти изменения в режиме реального времени. Платформа поддерживает коннекторы для нескольких популярных СУБД: PostgreSQL, MySQL, MongoDB и других. Коннекторы для некоторых других баз данных находятся в стадии разработки — с ними уже можно пробовать работать, но полноценный релиз еще не состоялся.
CDC — это процесс определения и фиксации изменений (включая добавление и удаление), внесенных в данные в базе данных и дальнейшая передача этих изменений в режиме реального времени в последующий процесс, систему или приложение. Так как в основе многих CDC-систем лежит использование журналов транзакций для определения изменений (вместо выполнения запросов к базе данных), значительно сокращаются задержки. Но также стоит сказать, что для определения изменений в данных могут применяться временные метки, триггеры и так далее. За счет работы CDC-систем в режиме реального времени данные остаются синхронизированными.
Непосредственно Debezium как раз реализует концепцию CDC — платформа осуществляет подключение к журналам транзакций СУБД, затем определяет и извлекает изменения, которые там происходят, после чего преобразует их в события и передает брокеру сообщений (например, Apache Kafka).
Где и для чего применяется Debezium
Платформа может иметь самые разные назначения, поэтому здесь будут приведены только некоторые примеры использования:
- интеграция систем — платформа используется для объединения данных;
- синхронизация данных между различными источниками, например, базами данных;
- использование в микросервисах для организации обмена данными между ними.
Соответственно, целью применения платформы можно назвать отслеживание изменений в базе данных в режиме реального времени, обеспечение синхронизации и целостности данных, публикация изменений в системе потоковой обработки данных.
Архитектура Debezium
Есть несколько способов развернуть платформу, но, как правило, она развертывается с помощью Apache Kafka Connect. Kafka Connect — это фреймворк и среда выполнения, предназначенные для реализации и эксплуатации коннекторов: коннекторов-исходников (source connectors — отправляют записи в Kafka) и коннекторов приемников (sink connectors — распространяют записи из тем Kafka в другие системы).
Архитектура конвейера захвата данных в «Дебезиум» основана на взаимодействии нескольких компонентов:
- СУБД, выступающие источником данных;
- коннекторы, которые отслеживают изменения в соответствующих им базах данных, например, коннектор для MySQL использует клиентскую библиотеку для доступа к журналу бинарных логов (binlog);
- Kafka Connect обрабатывает данные, поступающие от коннекторов, и передает их брокеру Kafka;
- брокер записывает данные в тему Kafka, при необходимости можно настроить преобразование маршрутизации тем. После этого записанные данные могут передаваться в другие системы.
Еще один способ развернуть платформу — это прибегнуть к использованию сервера Debezium. Это готовое к использованию приложение, которое передает события изменений из исходной базы данных в различные системы обмена сообщениями, например, Amazon Kinesis, Google Cloud Pub/Sub или Apache Pulsar. Сервер Debezium позволяет гибко настраивать обработку событий и выбирать формат сериализации данных, например, JSON или Apache Avro.
Еще один вариант — применение движка Debezium. В этом случае платформа интегрируется как библиотека в пользовательское Java-приложение, то есть работает не через Kafka Connect. Таким образом можно минимизировать стоимость инфраструктуры, так как события изменений потребляются непосредственно в приложении, и развертывать кластеры Kafka и Kafka Connect не нужно.
Настройка среды
Перед тем как начать настраивать коннекторы, необходимо подготовить окружение, а именно следующие компоненты:
- СУБД. Debezium поддерживает множество популярных СУБД, среди которых MySQL (версия не ниже 5.7), PostgreSQL (версия не ниже 9.6), Cassandra (версия не ниже 3.11.4), MongoDB (версия не ниже 3.2) и другие. Полный перечень совместимых систем можно найти на официальном сайте платформы;
- кластер Apache Kafka. Можно сказать, это центральный компонент, так как он используется для хранения и передачи событий, которые возникают при определении изменений в базе данных. Установку и запуск можно совершать как вручную, так и с помощью Docker-контейнеров. Это значительно сокращает временные затраты;
- экземпляр Kafka Connect. Этот компонент нужен для интеграции источников данных с Kafka. Работать с ним можно также двумя способами: с помощью официального Docker-образа от Debezium — этот образ включает в себя многие необходимые плагины и коннекторы — и с помощью Kafka Connect от Confluent;
- коннектор Debezium. Коннектор — это компонент, который отвечает за отслеживание изменений в базе данных и их отправку в Kafka. Его конфигурация зависит от конкретных требований данного проекта и особенностей инфраструктуры.
Конфигурация
В рамках этого пункта рассмотрим два способа настройки коннектора: для работы с Kafka Connect и с применением движка Debezium.
В первом случае рассмотрим конфигурацию коннектора для MySQL, при работе с Kafka Connect:
{
"name": "customer-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "password",
"database.server.name": "customer-mysql-db-server",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.customerdb",
"database.include.list": "customerdb",
"table.include.list": "customerdb.customers"
}
}
Описание опций:
- name — имя коннектора;
- connector.class — класс коннектора СУБД: в данном случае — MySQL;
- tasks.max — максимальное количество задач для этого коннектора: в данном случае значение по умолчанию 1 и менять его бессмысленно, но в других случаях этот параметр может пригодиться;
- database.hostname, database.port, database.user, database.password — параметры для подключения: адрес сервера, номер порта, имя пользователя MySQL и пароль;
- database.server.name — имя, под которым Kafka будет отслеживать изменения из этой базы данных;
- database.history.kafka.bootstrap.servers — адрес сервера Kafka, где будет храниться история схем базы данных;
- database.history.kafka.topic — название темы Kafka, куда будут записываться изменения;
- database.include.list и table.include.list — список баз данных и таблиц, которые нужно отслеживать.
Настройка коннектора с использованием движка:
@Bean
public Configuration myConnectorConfig() {
return Configuration.create()
.with("name", "my-mysql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("database.hostname", "mydb-host")
.with("database.port", "3306")
.with("database.user", "admin")
.with("database.password", "secret")
.with("database.include.list", "mydb")
.with("database.server.name", "my-mysql-server")
// Необязательные опции
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/var/lib/connect/offsets.dat")
.with("offset.flush.interval.ms", "10000")
.build();
}
Для настройки создается конфигурационный компонент beam, который содержит необходимые параметры.
Описание некоторых опций:
- offset.storage и offset.storage.file.filename — место и файл для хранения смещений;
- offset.flush.interval.ms — частота сохранения смещений.
Работа с Debezium
В этом разделе рассмотрим применение конфигурации на основе примеров выше.
При работе с Kafka Connect нужно запустить коннектор, сделать это можно следующей командой:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \ localhost:8083/connectors/ \
-d @path_to_configuration.json
После запуска коннектора изменения в базе данных начнут автоматически записываться в соответствующие темы Kafka.
В случае с использованием движка его нужно создать, запустить и настроить:
private final Executor changeEventExecutor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> mysqlEngine;
private final DataReplicationService dataReplicationService;
// Конструктор
public MysqlChangeListener(Configuration myConnectorConfig, DataReplicationService dataReplicationService) {
this.mysqlEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
.using(myConnectorConfig.asProperties())
.notifying(this::onDataChangeEvent)
.build();
this.dataReplicationService = dataReplicationService;
}
// Метод для обработки изменений данных
private void onDataChangeEvent(RecordChangeEvent<SourceRecord> event) {
SourceRecord sourceRecord = event.record();
Struct recordValue = (Struct) sourceRecord.value();
if (recordValue != null) {
// Какая-то логика обработки события
}
}
// Метод для асинхронного запуска Debezium Engine
@PostConstruct
private void startEngine() {
this.changeEventExecutor.execute(mysqlEngine);
}
// Метод для остановки Debezium Engine
@PreDestroy
private void stopEngine() throws IOException {
if (this.mysqlEngine != null) {
this.mysqlEngine.close();
}
После этого Debezium Engine начинает работать внутри Java-приложения — изменения в базе данных начинают отслеживаться и соответствующим образом обрабатываться.
Преимущества и недостатки Debezium
Рассмотрим основные преимущества платформы:
- поддержка широкого спектра различных СУБД, что делает платформу более универсальным решением;
- гибкая конфигурация — наличие большого набора параметров и настроек, позволяющих адаптировать платформу под задачи конкретного проекта, например, платформа предоставляет возможности для оптимизации производительности;
- гарантия того, что все изменения в базе данных будут зафиксированы: в том числе изменения, связанные с добавлением и удалением данных;
- обработка событий в режиме реального времени: для некоторых систем, например, финансовых это преимущество довольно значимо.
Основные недостатки:
- при высокой нагрузке могут наблюдаться проблемы с производительностью. При этом часто они решаемы — производительность можно оптимизировать настройкой параметров;
- сложность конфигурации — на начальном этапе нужно выполнить настройку всех компонентов — СУБД, Apache Kafka, Kafka Connect, коннекторы, а также настроить взаимодействие между ними, что может быть достаточно сложной задачей.
Заключение
«Дебезиум» — это гибкий инструмент для захвата изменений данных (CDC), основной целью использования которого является отслеживание изменений в базе данных в режиме реального времени и последующая публикация их в системе потоковой обработки данных (Apache Kafka). Это может быть нужно, например, для синхронизации данных между двумя системами или обеспечения целостности данных.
Архитектура платформы включает в себя четыре компонента: СУБД (источник данных), Apache Kafka, Kafka Connect и коннекторы, которые непосредственно отслеживают изменения в базе данных. Также платформу можно развернуть с помощью сервера Debezium и Debezium Engine — тогда платформа интегрируется как библиотека в пользовательское Java-приложение.
Ключевыми преимуществами платформы являются поддержка многих СУБД, возможность гибкой настройки и обработки событий в режиме реального времени. К недостаткам можно отнести потенциальные проблемы с производительностью и сложность конфигурации.