CDC из DynamoDB в ClickHouse
На этой странице описано, как настроить CDC из DynamoDB в ClickHouse с использованием ClickPipes. Эта интеграция состоит из двух компонентов:
- Начальный снимок данных через S3 ClickPipes
- Обновления в реальном времени через Kinesis ClickPipes
Данные будут приниматься в таблицу ReplacingMergeTree. Этот движок таблицы обычно используется в сценариях CDC, чтобы можно было применять операции обновления. Подробнее об этом подходе можно узнать в следующих статьях блога:
- Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 1
- Change Data Capture (CDC) with PostgreSQL and ClickHouse - Part 2
1. Настройте поток Kinesis
Сначала необходимо включить поток Kinesis для таблицы DynamoDB, чтобы фиксировать изменения в режиме реального времени. Делайте это до создания снимка, чтобы не пропустить ни одних данных. Подробности см. в руководстве AWS здесь.

2. Создайте снимок
Далее мы создадим снимок таблицы DynamoDB. Это можно сделать, выполнив экспорт AWS в S3. Руководство AWS находится здесь. Необходимо выполнить полную выгрузку («Full export») в формате DynamoDB JSON.

3. Загрузите снимок в ClickHouse
Создайте необходимые таблицы
Данные снимка из DynamoDB будут выглядеть примерно так:
Обратите внимание, что данные имеют вложенный формат. Нам нужно будет развернуть (flatten) эти данные перед загрузкой в ClickHouse. Это можно сделать с помощью функции JSONExtract в ClickHouse в materialized view.
Нам нужно создать три таблицы:
- Таблица для хранения исходных данных из DynamoDB
- Таблица для хранения итоговых развернутых данных (таблица назначения)
- materialized view для разворачивания данных
Для приведённого выше примера данных из DynamoDB таблицы в ClickHouse будут выглядеть следующим образом:
Для целевой таблицы есть несколько требований:
- Эта таблица должна быть таблицей
ReplacingMergeTree - В таблице должен быть столбец
version- На последующих шагах мы будем сопоставлять поле
ApproximateCreationDateTimeиз потока Kinesis со столбцомversion.
- На последующих шагах мы будем сопоставлять поле
- Таблица должна использовать ключ партиционирования в качестве ключа сортировки (задается через
ORDER BY)- Строки с одним и тем же ключом сортировки будут дедуплироваться на основе столбца
version.
- Строки с одним и тем же ключом сортировки будут дедуплироваться на основе столбца
Создайте ClickPipe для снимка
Теперь вы можете создать ClickPipe для загрузки данных снимка из S3 в ClickHouse. Следуйте руководству по S3 ClickPipe здесь, но используйте следующие настройки:
- Ingest path: вам нужно будет найти путь к экспортированным json‑файлам в S3. Путь будет выглядеть примерно так:
- Format: JSONEachRow
- Table: Ваша таблица snapshot (например,
default.snapshotв примере выше)
После создания данные начнут поступать в таблицу snapshot и целевую таблицу. Вам не нужно дожидаться завершения загрузки snapshot, чтобы переходить к следующему шагу.
4. Создание Kinesis ClickPipe
Теперь мы можем настроить Kinesis ClickPipe для фиксации изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, при этом используйте следующие настройки:
- Stream: поток Kinesis, использованный на шаге 1
- Table: ваша целевая таблица (например,
default.destinationв примере выше) - Flatten object: true
- Column mappings:
ApproximateCreationDateTime:version- Сопоставьте остальные поля с соответствующими целевыми столбцами, как показано ниже

5. Очистка (необязательно)
После завершения snapshot ClickPipe вы можете удалить snapshot-таблицу и materialized view.