mongo-demo
mongo-demo
Клонирование репозитория
- git clone https://gitverse.ru/sc/dtirskikh/mongo-demo.git
- cd mongo-demo
Практика состоит из 2-х частей
- Тренировка написания запросов
- Пример ознакомительного CDC-пайплайна Mongo -> Debezium -> Kafka -> PostgreSQL
Часть 1. Запросы к данным Mongo
1. Подготовка среды
- Скачать Docker-образ:
docker pull mongo
-
Установка клиента Compass к Mongo https://www.mongodb.com/products/tools/compass
-
Запустить контейнер
docker run --rm -d --name "practicum-de-mongodb" -p 27017:27017 mongo
- Зайти в MongoDB Compass и подключиться к контейнеру
В URI по умолчанию то что необходимо:mongodb://localhost:27017
2. Загрузка данных
-
Создать базу данных
myDb -
Создать две коллекции:
иstudentscountries -
Загрузить данные из директории
репозитория в соответствующие коллекцииdata
иstudentscountries
3. Запросы. Фильтрация
Шпаргалки:
https://cheatography.com/isaeus/cheat-sheets/mongodb
https://www.mongodb.com/docs/manual/reference/sql-comparison
Для работы через оболочку
необходимо сначала выбрать базу, а потом направлять запросы
use myDb
db.countries.find({region: "Africa"})
- Найти документ, где столица = Москва:
{capital: "Moscow"}
Также есть возможность использовать функционал
, но это может снизить производительность
поиска, так как выражение не работает с индексами. Есть и другие ограничение, подробнее- https://www.mongodb.com/docs/manual/reference/operator/query/where/
{$where: "this.capital === 'Moscow'"}
- Найти все страны, площадь которых больше 5 млн кв.км.
{area: {$gt : 5000000}}
- Найти страны, которые граничат только с двумя странами.
{borders: {$size: 2}}
Для поиска, например, больше 2-х границ
{'borders.2': {$exists: true}}
Или
{$where: "this.borders.length > 2"}
- Найти страны Азии и Африки, в которых говорят на английском языке.
{$and: [{"languages.eng": {$exists : true}}, {$or : [{region: "Asia"}, {region:"Africa"}]}]}
4. Запросы. Агрегация
Таблица-шпаргалка https://www.mongodb.com/docs/manual/reference/sql-aggregation-comparison
Compass для создания пайпланов позволяет пользоваться конструктором на вкладке Aggregations.
- Найти среднюю оценку по заданиям по каждому студенту. Поле назвать avgScore
# Compass aggregate
{
"$aggregate": [
{
"$addFields": {
"avgScore": {
"$avg": "$scores.score"
}
}
}
]
}
# Mongo shell
db.students.aggregate([
{
"$addFields": {
"avgScore": {
"$avg": "$scores.score"
}
}
}
]
)
- Найти среднюю оценку по каждому типу задания (quizz, homework, exam ) по всем студентам. Поле назвать avgScore
unwind - создаёт новый документ для каждого элемента массива
# Compass aggregate
{
"$aggregate": [
{
"$unwind": {
"path": "$scores"
}
},
{
"$group": {
"_id": "$scores.type",
"avgScore": {
"$avg": "$scores.score"
}
}
}
]
}
# Mongo shell
db.students.aggregate([
{
"$unwind": {
"path": "$scores"
}
},
{
"$group": {
"_id": "$scores.type",
"avgScore": {
"$avg": "$scores.score"
}
}
}
]
)
- По каждому типу задания создать рейтинг из студентов и оценок(отсортировать от бóльшей оценки к меньшей).
# Compass aggregate
{
"$aggregate": [
{
"$unwind": {
"path": "$scores"
}
},
{
"$sort": {
"scores.type": 1,
"scores.score": -1
}
},
{
"$group": {
"_id": "$scores.type",
"scores": {
"$push": {
"student": "$name",
"score": "$scores.score"
}
}
}
}
]
}
# Mongo shell
db.students.aggregate([
{
"$unwind": {
"path": "$scores"
}
},
{
"$sort": {
"scores.type": 1,
"scores.score": -1
}
},
{
"$group": {
"_id": "$scores.type",
"scores": {
"$push": {
"student": "$name",
"score": "$scores.score"
}
}
}
}
]
)
Часть 2. Ознакомительный CDC-пайплайн
1. Запуск контейнеров
В склонированном репозитории выполнить
# Чтобы освободить порт 27017 от Mongo из части 1docker stop practicum-de-mongodb
# Запуск контейнеров для практикиdocker compose up --build -d
С помощью интерфейса AKHQ есть возможность наблюдать за состоянием Kafka.
На текущем этапе уже созданы служебные топики для Debezium.
Так же есть возможность подключиться к Postgres
localhost:5432/inventorydb
login=postgresuser
password=postgrespw
2. Инициализация MongoDB replica set и загрузка сэмпла данных
docker compose exec mongodb bash -c '/usr/local/bin/init-inventory.sh'
На этом этапе также создался пользователь
, который будет подключаться со стороны Debezium.
Появилась возможность подключиться к Mongo через Compass.
mongodb://debezium:dbz@localhost:27017/?directConnection=true&authMechanism=DEFAULT
3. Активация source-коннектора
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://127.0.0.1:8083/connectors/ -d @./connectors/mongodb-source.json
После этого согласно настройкам
появятся новые топики для коллекций,
которые есть в базе
Mongo
Запросить информацию о коннекторе
curl -i -X GET localhost:8083/connectors/inventory-connector
При необходимости коннектор можно удалить
curl -i -X DELETE localhost:8083/connectors/inventory-connector
4. Активация sink-коннектора
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" \ http://127.0.0.1:8083/connectors/ -d @./connectors/jdbc-sink.json
После этого согласно настройкам
в схеме
базы
Postgres
появится таблица
с данными, которые соответствуют коллекции
Mongo
5. Примеры CDC
Для выполнения операций над коллекциями монго возможно использовать:
- командную строку из контейнера mongodb
docker compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
- mongosh из Compass
use inventory
5.1 Вставка нового документа в коллекцию customers
db.customers.insert([ { _id: NumberLong("1005"), first_name: 'Bob', last_name: 'Hopper', email: 'bob@example.com' }]);
- В коллекции появился новый документ
docker compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.find()"'
- В логах коннектора появились новые операции
docker compose logs connect | grep INSERT # INSERT INTO "customers" ("id","_id","first_name","last_name","email") VALUES (1005,1005,'Bob','Hopper','bob@example.com') # ON CONFLICT ("id") DO UPDATE SET "_id"=EXCLUDED."_id","first_name"=EXCLUDED."first_name","last_name"=EXCLUDED."last_name","email"=EXCLUDED."email" # deletePreparedStatement: DELETE FROM "customers" WHERE "id" = ? [io.confluent.connect.jdbc.sink.BufferedRecords]
- В топике Kafka
появилось новое сообщениеcustomers
..."payload": { "before": null, "after": "{\"_id\": {\"$numberLong\": \"1005\"},\"first_name\": \"Bob\",\"last_name\": \"Hopper\",\"email\": \"bob@example.com\"}", "patch": null, "filter": null, "updateDescription": null, "source": { "version": "2.1.4.Final", "connector": "mongodb", "name": "dbserver1", "ts_ms": 1711863105000, "snapshot": "false", "db": "inventory", "sequence": null, "rs": "rs0", "collection": "customers", "ord": 1, "lsid": null, "txnNumber": null },...
- В Postgres появилась новая запись
docker compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
5.2 Обновление документа в коллекции customers
db.customers.update( { _id: NumberLong("1005") }, { $set: { first_name: "Billy-Bob" } });
5.3 Добавление документа с новым атрибутом age
db.customers.insert([ { _id: NumberLong("1006"), first_name: 'Sam', last_name: 'Watson', email: 'sam@example.com', age: 27 }]);
В логах есть DDL-операция
docker compose logs connect | grep ALTER# ALTER TABLE "customers" ADD "age" INT NULL
В Postgres появился новый атрибут.
5.4 Удаление документа
db.customers.remove( {_id: NumberLong("1005")});
В Postgres удалилась запись с id=1005
5.5 Перезапуск таски
Если таска падает, то её можно рестартануть.
curl -s -X POST "http://localhost:8083/connectors/jdbc-sink/tasks/0/restart"
Подробный Kafka Connect’s REST API: https://developer.confluent.io/courses/kafka-connect/rest-api
6. Очистка
Приостановить контейнеры с возможностью последующего запуска из того же состояния
# Остановитьdocker compose stop
# Снова запуститьdocker compose start
Удалить контейнеры
# Удалитьdocker compose down
# Создать новые контейнерыdocker compose up