В этой статье описана технология репликации и шардирования данных в Clickhouse.
В первом примере будет рассмотрен вариант с одним шардом и тремя репликами
Во втором примере будет рассмотрен вариант с тремя шардами и лвумя репликами на каждый шард.
Везде будет использоваться одна и та же схема из 4-х хостов
Пример №1: Clickhouse + 1 шард х 3 реплики
В данной конкретной схеме используются 4 хоста: на трех хостах устанавливается Clickhouse,
на 4-м устанавливается Zookeeper.
Я использовал Debian 11.3. На текущий момент в ее репозитариях
лежит Clickhouse версии 18.16.1 и ZooKeeper версии 3.4.13.
ZooKeeper позволяет использовать реплицируемые таблицы в кластере с несколькими хостами в шарде,
и эта репликация будет работать в автоматическом режиме - достаточно настроить конфигурацию для ZooKeeper в конфигах самого Clickhouse, после чего данные будут "размазываться" по всем репликам сами.
Установку данной схемы можно разделить на 5 этапов:
1. Установка Clickhouse на каждой из 3-х нод
2. Установка ZooKeeper на 4-м хосте
3. Настройка конфигов Clickhouse
4. Проверка
5. Тестирование кластера
1. Установка Clickhouse
Яндекс рекомендует использовать официальные скомпилированные deb пакеты для Debian или Ubuntu. Для установки пакетов выполните:
Зайдем на первую ноду и создадим распределенную таблицу.
clickhouse-client
Выполняем команду:
CREATE TABLE test ON CLUSTER '{cluster}'
(
timestamp DateTime,
contractid UInt32,
userid UInt32
) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{cluster}/{shard}/default/test', '{replica}')
PARTITION BY toYYYYMM(timestamp)
ORDER BY (contractid, toDate(timestamp), userid)
SAMPLE BY userid;
Выполним селект к zookeeper:
select * from system.zookeeper WHERE path = '/clickhouse/tables/demo/1/default/test'
Теперь зайдем на вторую ноду и выполним селект:
select hostName(), database, name from cluster('demo', system.tables) where database='default' and name='test';
Зайдем на третью ноду и вставим данные
INSERT INTO test(timestamp, contractid, userid) VALUES (NOW(),1,1);
После этого заходим на любую ноду и делаем селект:
select * from test;
И видим, что только что добавленные данные везде реплицированы.
Для создания реплики можно использовать другой алгоритм, который отличается от типа ReplicatedMergeTree,
рассмотренного выше.
Этот другой алгоритм основан на том, что создаются две таблицы - одна типа MergeTree, а вторая - Distributed,
которая является ссылкой на первую:
CREATE TABLE default.t_cluster ON CLUSTER demo ( id Int16, name String, birth Date )
ENGINE = MergeTree()
PARTITION BY toYYYYMM(birth)
ORDER BY id;
CREATE TABLE default.dist_t_cluster ON CLUSTER demo as t_cluster engine = Distributed(demo, default, t_cluster,rand());
insert into dist_t_cluster values(3, 'ccc', '2021-02-01'), (4, 'ddd', '2021-02-02');
select * from default.dist_t_cluster;
Пример №2: Clickhouse + 3 шарда х 3 реплики
Установку Clickhouse и установку ZooKeeper на каждой из 3-х нод мы уже сделали в предыдущем примере.
Поэтому переходим к настройке.
Мы рассмотрим схему, в которой мало того, что данные будут реплицированы, они при этом еще будут шардированы.
Т.е. в предыдущем рассмотренном случае мы имели аж тройную репликацию с полной избыточностью,
когда данные дублируются одновременно в трех местах - это конечно супер-надежно, но не супер-эффективно.
В этом же примере данные будут распределены по трем разным хостам, и у каждой уникальной порции данных будет своя реплика. Этот вариант тоже имеет право на существование.
Будут созданы 4 базы - три для хранения реальных данных и четвертая для хранения симлинка.
Физически все данные будут храниться в базах dwh01, dwh02, dwh03.
Четвертая база dwh выступает в роли "заглушки" и будет физически пуста.
В этом примере мы будем использовать все те же 4 хоста - 3 для данных и один для zookeeper,
Последний мы трогать не будем и оставим как есть.
Везде рекомендуется использовать для кластеров минимум от 3-х серверов для zookeeper.
1. Создаем новый конфиг - это нужно проделать на каждой из 3-х нод
/etc/clickhouse-server/config.d/cluster.xml
Корневой конфиг cluster.xml на этот раз оставляем нетронутым, поскольку всегда существует вероятность, что при очередном апдэйте самого кликхауса он может быть перезаписан.
4. После настройки конфигов на каждой ноде выполнить команды:
systemctl restart clickhouse-server
systemctl status clickhouse-server
И выполнить селекты на каждой ноде:
SELECT * FROM system.macros m ;
SELECT * FROM system.clusters c WHERE cluster = 'test';
5. Из командной строки выполнить:
На первой ноде:
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh01"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh03"
На второй ноде:
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh02"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh01"
На третьей ноде:
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh03"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS dwh02"
Данные и реплики будут храниться в разных базах, здесь идет перекрестная схема репликации.
На первом хосте будут хранится данные и реплика с третьего хоста.
На втором хосте будут хранится данные и реплика с первого хоста.
На третьем хосте будут хранится данные и реплика со второго хоста.
Как видим, если один хост падает, то не все потеряно, как говорится.
6. На первой ноде из клиента создаем таблицу hits_shard типа ReplicatedMergeTree:
На второй ноде аналогично - те же поля я пропускаю:
CREATE TABLE dwh02.hits_shard
...
ENGINE=ReplicatedMergeTree('/clickhouse/{cluster01}/{shard01}/tables/hits', '{replica01}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
CREATE TABLE dwh01.hits_shard
...
ENGINE=ReplicatedMergeTree('/clickhouse/{cluster01}/{shard02}/tables/hits', '{replica02}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
На третьей ноде :
CREATE TABLE dwh03.hits_shard
...
ENGINE=ReplicatedMergeTree('/clickhouse/{cluster01}/{shard01}/tables/hits', '{replica01}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
CREATE TABLE dwh02.hits_shard
...
ENGINE=ReplicatedMergeTree('/clickhouse/{cluster01}/{shard02}/tables/hits', '{replica02}')
PARTITION BY toYYYYMM(EventDate)
ORDER BY (CounterID, EventDate, intHash32(UserID))
SAMPLE BY intHash32(UserID);
В команде мы задаем название кластера - test - и имя таблицы - hits_shard.
Здесь мы задали параметр - rand() - при этом данные - примерно - равномерно - и случайным образом - "расползутся"" по трем базам. Никто не запрещает вместо rand() указать какой-то столбец или набор столбцов,
при этом данные лягут в порядке сортировки по указанному полю.
8. Загружаем данные из csv файла - всего в нем 100000 записей:
Данные можно взять тут:
clickhouse-client --query " INSERT INTO dwh.hits_distributed FORMAT TSV" --max_insert_block_size=100000 < out.csv
9. Выполняем на каждой ноде запрос.
Он должен дать везде 100000 записей - ровно столько лежит в файле csv.
select count(*) from dwh.hits_distributed;
10. Выполняем запросы:
На первой ноде
select count(*) from dwh01.hits_shard;
select count(*) from dwh03.hits_shard;
На второй ноде
select count(*) from dwh01.hits_shard;
select count(*) from dwh02.hits_shard;
На третьей ноде:
select count(*) from dwh02.hits_shard;
select count(*) from dwh03.hits_shard;
У вас могут быть другие цифры, но в сумме они должны дать 100000.
Физически записи распределяются по разным базам.
В таблице данные хранятся в отсортированном виде по синтетическому ключу:
CounterID + EventDate + intHash32(UserID)
Если мы выполним этот же селект на реплике, то получим ровно такой же результат - т.е. это полноценные реплики.
Аналогичную картину мы получим для dwh02 и dwh03.
Т.е. мы видим, что загруженный набор из 100000 записей:
1. Разбился на 3 части и случайным образом распределился по 3-м базам
2. У каждой базы тут же появилась своя реплика в виде полной копии на соседней ноде - это важно,
потому что в случае краха какого-то хоста всегда есть реплика на другом хосте.