Включите исполнение JavaScript в браузере, чтобы запустить приложение.
5 ноя 2024

Что такое Debezium и для чего используется

Узнайте больше о Debezium — инструменте, который помогает отслеживать изменения в базе данных и отправлять их на обработку в другие системы. Изучаем архитектуру ПО и разбираемся с настройкой среды. Подробнее читайте на сайте.

Что такое Debezium и CDC

Debezium — это платформа с открытым исходным кодом (распространяется под лицензией Apache 2.0) для CDC (change data capture, захват изменений данных), которая позволяет отслеживать различные изменения — вставку, обновление или удаление записей в базе данных и записывать эти изменения в режиме реального времени. Платформа поддерживает коннекторы для нескольких популярных СУБД: PostgreSQL, MySQL, MongoDB и других. Коннекторы для некоторых других баз данных находятся в стадии разработки — с ними уже можно пробовать работать, но полноценный релиз еще не состоялся.

CDC — это процесс определения и фиксации изменений (включая добавление и удаление), внесенных в данные в базе данных и дальнейшая передача этих изменений в режиме реального времени в последующий процесс, систему или приложение. Так как в основе многих CDC-систем лежит использование журналов транзакций для определения изменений (вместо выполнения запросов к базе данных), значительно сокращаются задержки. Но также стоит сказать, что для определения изменений в данных могут применяться временные метки, триггеры и так далее. За счет работы CDC-систем в режиме реального времени данные остаются синхронизированными.

Непосредственно Debezium как раз реализует концепцию CDC — платформа осуществляет подключение к журналам транзакций СУБД, затем определяет и извлекает изменения, которые там происходят, после чего преобразует их в события и передает брокеру сообщений (например, Apache Kafka).

Где и для чего применяется Debezium

Платформа может иметь самые разные назначения, поэтому здесь будут приведены только некоторые примеры использования:

  • интеграция систем — платформа используется для объединения данных;
  • синхронизация данных между различными источниками, например, базами данных;
  • использование в микросервисах для организации обмена данными между ними.

Соответственно, целью применения платформы можно назвать отслеживание изменений в базе данных в режиме реального времени, обеспечение синхронизации и целостности данных, публикация изменений в системе потоковой обработки данных.

Архитектура Debezium

Есть несколько способов развернуть платформу, но, как правило, она развертывается с помощью Apache Kafka Connect. Kafka Connect — это фреймворк и среда выполнения, предназначенные для реализации и эксплуатации коннекторов: коннекторов-исходников (source connectors — отправляют записи в Kafka) и коннекторов приемников (sink connectors — распространяют записи из тем Kafka в другие системы).

Архитектура конвейера захвата данных в «Дебезиум» основана на взаимодействии нескольких компонентов:

  • СУБД, выступающие источником данных;
  • коннекторы, которые отслеживают изменения в соответствующих им базах данных, например, коннектор для MySQL использует клиентскую библиотеку для доступа к журналу бинарных логов (binlog);
  • Kafka Connect обрабатывает данные, поступающие от коннекторов, и передает их брокеру Kafka;
  • брокер записывает данные в тему Kafka, при необходимости можно настроить преобразование маршрутизации тем. После этого записанные данные могут передаваться в другие системы.

Еще один способ развернуть платформу — это прибегнуть к использованию сервера Debezium. Это готовое к использованию приложение, которое передает события изменений из исходной базы данных в различные системы обмена сообщениями, например, Amazon Kinesis, Google Cloud Pub/Sub или Apache Pulsar. Сервер Debezium позволяет гибко настраивать обработку событий и выбирать формат сериализации данных, например, JSON или Apache Avro.

Еще один вариант — применение движка Debezium. В этом случае платформа интегрируется как библиотека в пользовательское Java-приложение, то есть работает не через Kafka Connect. Таким образом можно минимизировать стоимость инфраструктуры, так как события изменений потребляются непосредственно в приложении, и развертывать кластеры Kafka и Kafka Connect не нужно.

Настройка среды

Перед тем как начать настраивать коннекторы, необходимо подготовить окружение, а именно следующие компоненты:

  • СУБД. Debezium поддерживает множество популярных СУБД, среди которых MySQL (версия не ниже 5.7), PostgreSQL (версия не ниже 9.6), Cassandra (версия не ниже 3.11.4), MongoDB (версия не ниже 3.2) и другие. Полный перечень совместимых систем можно найти на официальном сайте платформы;
  • кластер Apache Kafka. Можно сказать, это центральный компонент, так как он используется для хранения и передачи событий, которые возникают при определении изменений в базе данных. Установку и запуск можно совершать как вручную, так и с помощью Docker-контейнеров. Это значительно сокращает временные затраты;
  • экземпляр Kafka Connect. Этот компонент нужен для интеграции источников данных с Kafka. Работать с ним можно также двумя способами: с помощью официального Docker-образа от Debezium — этот образ включает в себя многие необходимые плагины и коннекторы — и с помощью Kafka Connect от Confluent;
  • коннектор Debezium. Коннектор — это компонент, который отвечает за отслеживание изменений в базе данных и их отправку в Kafka. Его конфигурация зависит от конкретных требований данного проекта и особенностей инфраструктуры.

Конфигурация

В рамках этого пункта рассмотрим два способа настройки коннектора: для работы с Kafka Connect и с применением движка Debezium.

В первом случае рассмотрим конфигурацию коннектора для MySQL, при работе с Kafka Connect:

{
  "name": "customer-mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "password",
    "database.server.name": "customer-mysql-db-server",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.history.kafka.topic": "schema-changes.customerdb",
    "database.include.list": "customerdb",
    "table.include.list": "customerdb.customers"
  }
}
sql

Описание опций:

  • name — имя коннектора;
  • connector.class — класс коннектора СУБД: в данном случае — MySQL;
  • tasks.max — максимальное количество задач для этого коннектора: в данном случае значение по умолчанию 1 и менять его бессмысленно, но в других случаях этот параметр может пригодиться;
  • database.hostname, database.port, database.user, database.password — параметры для подключения: адрес сервера, номер порта, имя пользователя MySQL и пароль;
  • database.server.name — имя, под которым Kafka будет отслеживать изменения из этой базы данных;
  • database.history.kafka.bootstrap.servers — адрес сервера Kafka, где будет храниться история схем базы данных;
  • database.history.kafka.topic — название темы Kafka, куда будут записываться изменения;
  • database.include.list и table.include.list — список баз данных и таблиц, которые нужно отслеживать.

Настройка коннектора с использованием движка:

@Bean
public Configuration myConnectorConfig() {
    return Configuration.create()
        .with("name", "my-mysql-connector")
        .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
        .with("database.hostname", "mydb-host")
        .with("database.port", "3306")
        .with("database.user", "admin")
        .with("database.password", "secret")
        .with("database.include.list", "mydb")
        .with("database.server.name", "my-mysql-server")


        // Необязательные опции
        .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
        .with("offset.storage.file.filename", "/var/lib/connect/offsets.dat")
        .with("offset.flush.interval.ms", "10000")
        .build();
}
sql

Для настройки создается конфигурационный компонент beam, который содержит необходимые параметры.

Описание некоторых опций:

  • offset.storage и offset.storage.file.filename — место и файл для хранения смещений;
  • offset.flush.interval.ms — частота сохранения смещений.

Работа с Debezium

В этом разделе рассмотрим применение конфигурации на основе примеров выше.

При работе с Kafka Connect нужно запустить коннектор, сделать это можно следующей командой:

curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \ localhost:8083/connectors/ \
-d @path_to_configuration.json
sql

После запуска коннектора изменения в базе данных начнут автоматически записываться в соответствующие темы Kafka.

В случае с использованием движка его нужно создать, запустить и настроить:

private final Executor changeEventExecutor = Executors.newSingleThreadExecutor();
private final DebeziumEngine<RecordChangeEvent<SourceRecord>> mysqlEngine;
private final DataReplicationService dataReplicationService;


// Конструктор
public MysqlChangeListener(Configuration myConnectorConfig, DataReplicationService dataReplicationService) {
    this.mysqlEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
        .using(myConnectorConfig.asProperties())
        .notifying(this::onDataChangeEvent)  
        .build();


    this.dataReplicationService = dataReplicationService;
}


// Метод для обработки изменений данных
private void onDataChangeEvent(RecordChangeEvent<SourceRecord> event) {
  SourceRecord sourceRecord = event.record();
  Struct recordValue = (Struct) sourceRecord.value();


  if (recordValue != null) {
      // Какая-то логика обработки события
  }
}


// Метод для асинхронного запуска Debezium Engine
@PostConstruct
private void startEngine() {
    this.changeEventExecutor.execute(mysqlEngine);
}


// Метод для остановки Debezium Engine
@PreDestroy
private void stopEngine() throws IOException {
    if (this.mysqlEngine != null) {
        this.mysqlEngine.close();
    }
sql

После этого Debezium Engine начинает работать внутри Java-приложения — изменения в базе данных начинают отслеживаться и соответствующим образом обрабатываться.

Преимущества и недостатки Debezium

Рассмотрим основные преимущества платформы:

  • поддержка широкого спектра различных СУБД, что делает платформу более универсальным решением;
  • гибкая конфигурация — наличие большого набора параметров и настроек, позволяющих адаптировать платформу под задачи конкретного проекта, например, платформа предоставляет возможности для оптимизации производительности;
  • гарантия того, что все изменения в базе данных будут зафиксированы: в том числе изменения, связанные с добавлением и удалением данных;
  • обработка событий в режиме реального времени: для некоторых систем, например, финансовых это преимущество довольно значимо.

Основные недостатки:

  • при высокой нагрузке могут наблюдаться проблемы с производительностью. При этом часто они решаемы — производительность можно оптимизировать настройкой параметров;
  • сложность конфигурации — на начальном этапе нужно выполнить настройку всех компонентов — СУБД, Apache Kafka, Kafka Connect, коннекторы, а также настроить взаимодействие между ними, что может быть достаточно сложной задачей.

Заключение

«Дебезиум» — это гибкий инструмент для захвата изменений данных (CDC), основной целью использования которого является отслеживание изменений в базе данных в режиме реального времени и последующая публикация их в системе потоковой обработки данных (Apache Kafka). Это может быть нужно, например, для синхронизации данных между двумя системами или обеспечения целостности данных.

Архитектура платформы включает в себя четыре компонента: СУБД (источник данных), Apache Kafka, Kafka Connect и коннекторы, которые непосредственно отслеживают изменения в базе данных. Также платформу можно развернуть с помощью сервера Debezium и Debezium Engine — тогда платформа интегрируется как библиотека в пользовательское Java-приложение.

Ключевыми преимуществами платформы являются поддержка многих СУБД, возможность гибкой настройки и обработки событий в режиме реального времени. К недостаткам можно отнести потенциальные проблемы с производительностью и сложность конфигурации.