gitverse new year логотип

mongo-demo

Форк
0

9 месяцев назад
9 месяцев назад
9 месяцев назад
8 месяцев назад
9 месяцев назад
6 месяцев назад
9 месяцев назад
README.md

mongo-demo

Клонирование репозитория

  1. git clone https://gitverse.ru/sc/dtirskikh/mongo-demo.git
  2. cd mongo-demo

Практика состоит из 2-х частей

  1. Тренировка написания запросов
  2. Пример ознакомительного CDC-пайплайна
    Mongo -> Debezium -> Kafka -> PostgreSQL

Часть 1. Запросы к данным Mongo

1. Подготовка среды

  1. Скачать Docker-образ:
docker pull mongo
  1. Установка клиента Compass к Mongo https://www.mongodb.com/products/tools/compass

  2. Запустить контейнер

docker run --rm -d --name "practicum-de-mongodb" -p 27017:27017 mongo
  1. Зайти в MongoDB Compass и подключиться к контейнеру
    В URI по умолчанию то что необходимо:
    mongodb://localhost:27017

2. Загрузка данных

  1. Создать базу данных

    myDb

  2. Создать две коллекции:

    students
    и
    countries

  3. Загрузить данные из директории

    data
    репозитория в соответствующие коллекции
    students
    и
    countries

3. Запросы. Фильтрация

Шпаргалки:
https://cheatography.com/isaeus/cheat-sheets/mongodb
https://www.mongodb.com/docs/manual/reference/sql-comparison

Для работы через оболочку

mongosh
необходимо сначала выбрать базу, а потом направлять запросы

use myDb db.countries.find({region: "Africa"})
  1. Найти документ, где столица = Москва:
{capital: "Moscow"}

Также есть возможность использовать функционал

$where
, но это может снизить производительность поиска, так как выражение не работает с индексами. Есть и другие ограничение, подробнее- https://www.mongodb.com/docs/manual/reference/operator/query/where/

{$where: "this.capital === 'Moscow'"}
  1. Найти все страны, площадь которых больше 5 млн кв.км.
{area: {$gt : 5000000}}
  1. Найти страны, которые граничат только с двумя странами.
{borders: {$size: 2}}

Для поиска, например, больше 2-х границ

{'borders.2': {$exists: true}}

Или

{$where: "this.borders.length > 2"}
  1. Найти страны Азии и Африки, в которых говорят на английском языке.
{$and: [{"languages.eng": {$exists : true}}, {$or : [{region: "Asia"}, {region:"Africa"}]}]}

4. Запросы. Агрегация

Таблица-шпаргалка https://www.mongodb.com/docs/manual/reference/sql-aggregation-comparison

Compass для создания пайпланов позволяет пользоваться конструктором на вкладке Aggregations.

  1. Найти среднюю оценку по заданиям по каждому студенту. Поле назвать
    avgScore
# Compass aggregate { "$aggregate": [ { "$addFields": { "avgScore": { "$avg": "$scores.score" } } } ] } # Mongo shell db.students.aggregate([ { "$addFields": { "avgScore": { "$avg": "$scores.score" } } } ] )
  1. Найти среднюю оценку по каждому типу задания (quizz, homework, exam ) по всем студентам. Поле назвать
    avgScore
unwind - создаёт новый документ для каждого элемента массива # Compass aggregate { "$aggregate": [ { "$unwind": { "path": "$scores" } }, { "$group": { "_id": "$scores.type", "avgScore": { "$avg": "$scores.score" } } } ] } # Mongo shell db.students.aggregate([ { "$unwind": { "path": "$scores" } }, { "$group": { "_id": "$scores.type", "avgScore": { "$avg": "$scores.score" } } } ] )
  1. По каждому типу задания создать рейтинг из студентов и оценок(отсортировать от бóльшей оценки к меньшей).
# Compass aggregate { "$aggregate": [ { "$unwind": { "path": "$scores" } }, { "$sort": { "scores.type": 1, "scores.score": -1 } }, { "$group": { "_id": "$scores.type", "scores": { "$push": { "student": "$name", "score": "$scores.score" } } } } ] } # Mongo shell db.students.aggregate([ { "$unwind": { "path": "$scores" } }, { "$sort": { "scores.type": 1, "scores.score": -1 } }, { "$group": { "_id": "$scores.type", "scores": { "$push": { "student": "$name", "score": "$scores.score" } } } } ] )

Часть 2. Ознакомительный CDC-пайплайн

1. Запуск контейнеров

В склонированном репозитории выполнить

# Чтобы освободить порт 27017 от Mongo из части 1
docker stop practicum-de-mongodb
# Запуск контейнеров для практики
docker compose up --build -d

img.png

С помощью интерфейса AKHQ есть возможность наблюдать за состоянием Kafka.
На текущем этапе уже созданы служебные топики для Debezium.

Так же есть возможность подключиться к Postgres

localhost:5432/inventorydb login=postgresuser password=postgrespw

2. Инициализация MongoDB replica set и загрузка сэмпла данных

docker compose exec mongodb bash -c '/usr/local/bin/init-inventory.sh'

На этом этапе также создался пользователь

debezium
, который будет подключаться со стороны Debezium.
Появилась возможность подключиться к Mongo через Compass.

mongodb://debezium:dbz@localhost:27017/?directConnection=true&authMechanism=DEFAULT

3. Активация source-коннектора

curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://127.0.0.1:8083/connectors/ -d @./connectors/mongodb-source.json

После этого согласно настройкам

connectors/mongodb-source.json
появятся новые топики для коллекций, которые есть в базе
inventory
Mongo

Запросить информацию о коннекторе

curl -i -X GET localhost:8083/connectors/inventory-connector

При необходимости коннектор можно удалить

curl -i -X DELETE localhost:8083/connectors/inventory-connector

4. Активация sink-коннектора

curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://127.0.0.1:8083/connectors/ -d @./connectors/jdbc-sink.json

После этого согласно настройкам

connectors/jdbc-sink.json
в схеме
public
базы
inventorydb
Postgres появится таблица
customers
с данными, которые соответствуют коллекции
customers
Mongo

5. Примеры CDC

Для выполнения операций над коллекциями монго возможно использовать:

  • командную строку из контейнера mongodb
docker compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory'
  • mongosh из Compass
use inventory

5.1 Вставка нового документа в коллекцию
customers

db.customers.insert([
{ _id: NumberLong("1005"), first_name: 'Bob', last_name: 'Hopper', email: 'bob@example.com' }
]);
  1. В коллекции появился новый документ
docker compose exec mongodb bash -c 'mongo -u $MONGODB_USER -p $MONGODB_PASSWORD --authenticationDatabase admin inventory --eval "db.customers.find()"'
  1. В логах коннектора появились новые операции
docker compose logs connect | grep INSERT
# INSERT INTO "customers" ("id","_id","first_name","last_name","email") VALUES (1005,1005,'Bob','Hopper','bob@example.com')
# ON CONFLICT ("id") DO UPDATE SET "_id"=EXCLUDED."_id","first_name"=EXCLUDED."first_name","last_name"=EXCLUDED."last_name","email"=EXCLUDED."email"
# deletePreparedStatement: DELETE FROM "customers" WHERE "id" = ? [io.confluent.connect.jdbc.sink.BufferedRecords]
  1. В топике Kafka
    customers
    появилось новое сообщение
...
"payload": {
"before": null,
"after": "{\"_id\": {\"$numberLong\": \"1005\"},\"first_name\": \"Bob\",\"last_name\": \"Hopper\",\"email\": \"bob@example.com\"}",
"patch": null,
"filter": null,
"updateDescription": null,
"source": {
"version": "2.1.4.Final",
"connector": "mongodb",
"name": "dbserver1",
"ts_ms": 1711863105000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"rs": "rs0",
"collection": "customers",
"ord": 1,
"lsid": null,
"txnNumber": null
},
...
  1. В Postgres появилась новая запись
docker compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'

5.2 Обновление документа в коллекции
customers

db.customers.update(
{
_id: NumberLong("1005")
},
{
$set: {
first_name: "Billy-Bob"
}
}
);

5.3 Добавление документа с новым атрибутом age

db.customers.insert([
{ _id: NumberLong("1006"), first_name: 'Sam', last_name: 'Watson', email: 'sam@example.com', age: 27 }
]);

В логах есть DDL-операция

docker compose logs connect | grep ALTER
# ALTER TABLE "customers" ADD "age" INT NULL

В Postgres появился новый атрибут.

5.4 Удаление документа

db.customers.remove(
{_id: NumberLong("1005")}
);

В Postgres удалилась запись с id=1005

5.5 Перезапуск таски

Если таска падает, то её можно рестартануть.

curl -s -X POST "http://localhost:8083/connectors/jdbc-sink/tasks/0/restart"

Подробный Kafka Connect’s REST API: https://developer.confluent.io/courses/kafka-connect/rest-api

6. Очистка

Приостановить контейнеры с возможностью последующего запуска из того же состояния

# Остановить
docker compose stop
# Снова запустить
docker compose start

Удалить контейнеры

# Удалить
docker compose down
# Создать новые контейнеры
docker compose up

Использование cookies

Мы используем файлы cookie в соответствии с Политикой конфиденциальности и Политикой использования cookies.

Нажимая кнопку «Принимаю», Вы даете АО «СберТех» согласие на обработку Ваших персональных данных в целях совершенствования нашего веб-сайта и Сервиса GitVerse, а также повышения удобства их использования.

Запретить использование cookies Вы можете самостоятельно в настройках Вашего браузера.