概述
随着业务线数据量的突飞猛进、服务器的意外宕机,这些都是底层基础服务会遇到的问题,因此 ClickHouse
就设计了集群、副本和分片这三个帮手来帮忙。
集群是副本和分片的基础,他将 ClickHouse
. 的服务拓扑由单节点延伸为多节点,但是他又不像 hadoop
那样的系统,要求所有的节点都组成一个大集群。ClickHouse
的集群配置非常灵活,用户既可以将所有节点组成一个大集群,也可以按照业务的诉求将节点划分为多个小集群。
在每个小集群区域之间,他们的节点、分区和副本数量可以各不相同。从总体来看,集群定义了多个节点的拓扑关系,这些节点在后续服务中会相互合作,而执行层面的具体内容则是由副本和分片来执行。
那么如何区分副本和分片呢?
在数据层面上,副本之间的数据完全相同,而分片之间数据是不同的。在功能层面上,副本的作用在于防止数据丢失,增加存储数据的冗余,而分片的目的在于实现数据的水平切分。
副本
之前有说过 ReplicatedMergeTree
复制表引擎,该引擎可以实现应用副本的能力,他是在 MergeTree
表引擎的基础上实现了分布式协同的能力。
在 MergeTree
中,一个数据分区从开始创建到全部完成,会经历两类存储区域:
- 内存,数据首先会被写入到内存缓冲区。
- 本地磁盘,数据接着会被写入
tmp
临时目录分区,待全部完成后再将临时目录重命名为正式分区。
而 ReplicatedMergeTree
在上述的基础上增加了 ZooKeeper
的部分,他会进一步在 ZooKeeper
内部创建一系列的监听节点,并以此实现多个实例之间的通信,并且在整个通信过程中,ZooKeeper
不会涉及到任何的数据传输。
那么我们总结下副本的特点:
- 依赖
ZooKeeper
,在执行insert
和alter
查询时ReplicatedMergeTree
需要借助ZooKeeper
的分布式协同能力,以实现多个副本之间的同步,但是在select
副本时并不需要使用。 - 表级别的副本,副本是在表级别定义的,所以每张表的副本配置都可以按照他的实际需求进行个性化定义,包括副本的数量、副本在集群中的分布位置等。
- 多主架构,可以在任意一个副本上执行
insert
和alter
查询,他们的效果都是相同的。 Block
数据块,在执行insert
命令时,会依据max_insert_block_size
的大小将数据切分为若干个Block
数据块,因此Block
数据块是写入的基本单元,并且具有写入的唯一性和原子行。- 原子性,在数据写入时,一个
Block
块内的数据要么全部写入成功,要么全部失败。 - 唯一性,在写入一个
Block
数据块时,会按照当前Block
数据块的数据顺序、数据行和数据大小等指标计算Hash
信息摘要并记录在案。如果后续遇到相同的Hash
摘要则该数据块会被忽略。
ZooKeeper
配置方式
首先新建一个 metrika.xml
的配置文件内容如下:
1 |
|
接着在全局配置 config.xml
中使用 <include_from>
标签导入刚才定义的配置:
1 | <include_from>/etc/clickhouse-server/cpnfig.d/metrika.xml</include_from> |
incl
与 metrika.xml
配置文件中的节点名称彼此要相互对应。
另外 ClickHouse
还在系统表中提供了一张 zookeeper
的代理表,通过这个表可以使用 SQL
查询读取远端 ZooKeeper
内的数据,不过查询时需要指定 path
条件才能查询到数据。
副本定义形式
首先由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次由于副本采用多主架构,所以每个副本实例都可以作为数据读、写的入口,但这都增加了节点的负载。
ReplicatedMergeTree
定义方式如下:
1 | ENGINE = ReplicatedMergeTree('zk_path', 'replica_name') |
在上述配置中有 zk_path
和 replica_name
两项配置:
zk_path
用于指定在ZooKeeper
中创建的数据表的路径,路径名称是自定义的,可以设置成自己希望的任何路径。
当然也有一些约定俗成的配置:/clickhouse/tables/
是约定的路径固定前缀,表示存放数据表的根路径。{shard}
表示分片编号,通常使用数字来替代。table_name
表示数据表的名称,为了维护方便通常采用与物理表相同的名字。
replica_name
是定义副本名称,该名称是区分不同副本实例的唯一标识。
对于 zk_path
而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径。而对于 replica_name
而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称。
ReplicatedMergeTree
原理解析
在 ReplicatedMergeTree
的核心逻辑中,大量运用了 ZooKeeper
的能力,以实现多个 ReplicatedMergeTree
副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和 BlockID
去重判断等。
在执行 INSERT
数据写入、MERGE
分区和 MUTATION
操作的时候都会涉及到 ZooKeeper
的通信,但是在通信的过程中,并不会涉及到任何表数据的传输,在查询数据时也不会访问 ZooKeeper
。
ZooKeeper
节点结构
ReplicatedMergeTree
依赖 ZooKeeper
的事件监听机制以实现各个副本之间的协同。因此在每个 ReplicatedMergeTree
表的创建过程中,会以 zk_path
为根路径创建一组监听节点,按照作用不同,监听节点可以大致分为一下几点:
- 元数据
/metadata
保存元数据信息,包括主键、分区键、采样表达式等。/columns
保存列字段信息,包括列名称和数据类型。/replicas
保存副本名称,对应设置参数中的replica_name
。
- 判断标识
/leader_election
用于主副本的选举工作,主副本会主导MERGE
和MUTATION
操作,这些任务都是在主副本完成之后再借助ZooKeeper
将消息事件分发到其他副本。/blocks
记录Block
数据块的Hash
信息摘要,以及对应的partition_id
。通过Hash
摘要能够判断Block
数据块是否重复;通过partition_id
则能找到需要同步的数据分区。block_numbers
按照分区的写入顺序,以相同的顺序记录partition_id
,各个副本在本地进行MERGE
时都会依照相同的block_numbers
顺序进行。quorum
记录quorum
的数量,当至少有quorum
数量的副本写入成功后,整个写入操作才算成功。quorum
的数量有insert_quorum
参数控制,默认值为0
。
- 操作日志
/log
常规操作日志,保存副本需要执行的任务指令。/mutations
操作日志,作用与/log
日志类似,当执行ALERT DELETE
或ALERT UPDATE
查询时,操作指令会被添加到这个节点。/replicas/{replica_name}/*
每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令/queue
任务队列节点,用于执行具体的操作任务。/log_pointer
log
日志指针节点,记录最后一次执行的log
日志下标信息。/mutation_pointer
mutations
日志指针节点,记录了最后一次执行的mutatutions
日志名称。
数据结构
/log
和 /mutations
他们犹如通信路由器,是分发操作指令的信息通道,而发送指令的方式则是为这些父节点添加子节点。所有的副本实例都会监听父节点的变化,当有子节点被添加时都会被其他副本实时感知。
被添加的子节点统一被抽象为 Entry
对象,而具体实现则是 LogEntry
和 MutationEntry
对象承载,分别对应 /log
和 /mutations
节点:
LogEntry
用于封装/log
子节点信息,核心属性如下:source replica
发送这条Log
指令的副本来源,对应replica_name
。type
操作指令类型,主要有get
、merge
和mutate
三种,分别对应从远程副本下载分区、合并分区和MUTATION
操作。block_id
当前分区的BlockID
,对应/blocks
路径下子节点的名称。partition_name
当前分区目录的名称。
MutationEntry
用于封装/mutations
子节点信息,核心属性如下:source replica
发送这条MUTATION
指令的副本来源,对应replica_name
。commands
操作指令,主要有ALERT DELETE
和ALERT UPDATE
。mutation_id
MUTATION
操作的版本号。partition_id
当权分区目录的ID
。
副本协同流程
写入执行流程
当需要在 ReplicatedMergeTree
中执行 INSERT
查询以写入数据时,即会进入 INSERT
核心流程,整体流程从上至下按照时间顺序进行,大致可以分为八个步骤。
创建第一个副本实例
1
2
3
4
5
6
7CREATE TABLE replica_sales_1 {
id String,
price Float64,
create_time DateTime
} ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch5.vgbhfive.cn')
PARTITION BY toYYYYMM(create_time)
ORDER BY id在创建的过程中
ReplicatedMergeTree
会进行一些初始化操作:- 根据
zk_path
初始化所有的ZooKeeper
节点。 - 在
/replicas/
节点下注册自己的副本实例ch5.vgbhfive.cn
。 - 启动监听任务,监听
/log
日志节点。 - 参与副本选举,选举出主副本,选举的方式是向
/leader_election/
插入子节点,第一个插入成功的副本就是主副本。
- 根据
创建第二个副本实例
与上述第一个创建副本实例类似,不同之处在于实例名称。1
2
3
4
5
6
7CREATE TABLE replica_sales_1 {
id String,
price Float64,
create_time DateTime
} ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch6.vgbhfive.cn')
PARTITION BY toYYYYMM(create_time)
ORDER BY id在创建过程中,第二个
ReplicatedMergeTree
同样会进行一些初始化操作:- 在
/replicas/
节点下注册自己的副本实例ch6.vgbhfive.cn
。 - 启动监听任务,监听
/log
日志节点。 - 参数副本选举,选举出主副本。
- 在
向第一个实例中写入数据
现在尝试向第一个副本ch5
写入数据:1
INSERT INTO TABLE replicated_sales_1 VALUES('A001', 100, '2022-05-15 00:00:00')
上述命令执行完毕后,首先会在本地完成分区目录的写入:
1
Renaming temporary part tmp_insert_202205_1_1_0 to 202205_0_0_0
接着向
/blocks
节点写入该数据分区的block_id
:1
Wrote block with ID '202205_xxxxx_yyyy'
该
block_id
将作为后续去重操作的判断依据。如果此时再重复执行刚才的语句,试图写入重复数据,则会抛出异常,即副本会自动忽略block_id
重复的待写入数据。
此外如果设置了insert_quorum
(默认参数为0
),并且insert_quorum >= 2
,则ch5
会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum
时,整个写入操作才会成功。由第一个副本实例推送
Log
日志
在3
步骤完成之后,会继续执行insert
的副本向/log
节点推送操作日志。日志的编号是/log/log-00000000
,而LogEntry
的核心属性如下:1
2
3
4
5/log/log-00000000
source replica: ch5.vgbhfive.cn
block_id: 202205_xxxxx
type: get
partition_name: 202205_0_0_0从日志内容中可以看出,操作类型为
get
下载,而需要下载的分区时202205_0_0_0
。其余所有副本都会基于Log
日志以相同的顺序执行命令。第二个副本实例拉取
Log
日志ch6
副本会一直监听/log
节点变化,当ch5
推送日志之后,ch6
便会触发日志的拉取任务并更新log_pointer
,将其指向最新日志下标:1
/replicas/ch6.vgbhfive.cn/log_pointer: 0
在拉取
LogEntry
之后,并不会直接执行,而是将其转为任务对象放至队列:1
2/replicas/ch6.vgbhfive.cn/queues/
Pulling 1 entries to queue: log-00000000 to log-00000000上述
LogEntry
放入队列是因为在复杂的情况下,会连续收到许多个LogEntry
,所以使用队列消化任务也是一种合理的设计。第二个副本实例向其他副本发起下载请求
ch6
基于/queue
队列开始执行任务,当看到type
为get
时,ReplicatedMergeTree
会明白此时在远端的其他副本已经成功写入数据分区,而自己需要同步这些数据。ch6
上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:- 从
/replicas
节点拿到所有的副本节点。 - 遍历这些副本选取其中一个。选取的副本需要拥有最大的
log_pointer
下标,并且/queue
子节点数量最少。log_pointer
下标最大,则意味该副本执行的日志最多,数据应该更加完整;而/queue
最小,则意味着该副本目前的任务执行负担最小。
在这个实例中,算法选择的副本实例是
ch5
。- 从
第一个副本实例响应数据下载
ch5
的DataPartsExchange
端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区202205_0_0_0
基于DataPartsExchange
的服务响应发送回ch6
:1
Sending part 202205_0_0_0
第二个实例下载数据并完成本地写入
ch6
副本在接收到ch5
的分区数据后,首先将其写入到临时目录中:1
tmp_fetch_202205_0_0_0
待全部数据接收完成之后,重命名该目录:
1
Renaming temporary part tmp_fetch_202205_0_0_0 to 202205_0_0_0
至此,整个写入结束。
在整个 insert
的写入过程中,ZooKeeper
不会进行任何实质性的数据传输。本着谁执行谁负责的原则,由写入数据的实例负责发送 Log
日志、通知其他实例、监控是否完成、返回下载数据。
MERGE
执行流程
当需要在 ReplicatedMergeTree
中触发分区合并动作时,即会进入这个部分的流程,无论 MERGE
操作从哪个副本发起,其合并计划都会交由主副本来制定。
创建远程连接,尝试与主副本通信。
首先在ch6
节点执行OPTIMIZE
强制执行MERGE
合并,此时ch6
通过replicas
找到主副本ch5
,并尝试建立与它的远程连接。1
2optimize table replicated_sales_1
Connection (ch5.vgbhfive.cn:9000): Connecting. Database: default. User: default主副本接收通信
主副本ch5
接收并建立来自远端副本ch6
的连接。1
Connected ClickHouse Follower replica version 19.17.0, revision: 54428, database: default, user: default
由主副本制定
MERGE
计划并推送Log
日志
由主副本ch5
制定MERGE
计划,并判断哪些分区需要被合并。在选定之后ch5
将合并计划转换为Log
日志对象并推送Log
日志,以通知所有副本开始合并。日志的核心信息如下:1
2
3
4
5type: merge
202205_0_0_0
202205_1_1_0
into
202205_0_1_1从日志内容可以看出操作类型为
MERGE
合并,而这次需要合并的分区目录是202205_0_0_0
和202205_0_1_1
。与此同时主副本还会锁住执行线程,对日志的接收情况进行监听:1
Waiting for queue-0000000002 to disapper from ch5.vgbhfive.cn queue
其监听行为由
replication_alter_partitions_sync
参数控制,默认值为1
。- 参数为
0
时,不做任何等待。 - 参数为
1
时,只等主副本自身完成。 - 参数为
0
时,等待所有副本拉取完成。
- 参数为
各个副本分别拉取
Log
日志ch5
和ch6
两个副本实例将分别监听/log/log-00000002
日志的推送,他们分别会拉取日志到本地,并推送到各自的/queue
任务队列:1
Pulling 1 entries to queue : log-00000002 - log-000000002
各个副本分别在本地执行
MERGE
ch5
和ch6
基于各自的/queue
队列开始执行任务:1
Executing log entry to merge parts 202205_0_0_0, 202205_1_1_0 to 202205_0_1_1
各个副本开始在本地执行
MERGE
:1
Merged 2 parts: from 202205_0_0_0 to 202205_1_1_0
至此 MERGE
的合并过程 ZooKeeper
不会进行任何实质性的数据传输,所有的合并操作最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,最终都会交由主副本负责合并计划的制定、消息日志的推送以及日志接收情况的监控。
MUTATION
执行流程
当对 ReplicatedMergeTree
执行 ALTER DELTE
或者 ALTER UPDATE
操作的时候,即会进入 MUTATION
部分的逻辑,与 MERGE
类似,无论 MUTATION
操作从哪个副本发起,首先都会由主副本进行响应。
- 推送
MUTATION
日志
在ch6
节点尝试通过DELETE
来删除一行数据,执行如下命令:上述命令执行之后该副本会接着执行两个重要事项:1
ALTER TABLE replicated_sales_1 DELETE where id = 1;
- 创建
MUTATION ID
:1
Created mutation with ID 000000000
- 将
MUTATION
操作转换为MutatutionEntry
日志,并推送到/mutations/00000000
。Mutation
的核心属性如下:1
2
3
4
5/mutations/000000000
source replica: ch6.vgbhfive.cn
mutation_id: 2
partition_id: 202205
commands: DELETE where id = \'1\'
所有副本实例各自监听
MUTATION
日志ch5
和ch6
都会监听/mutations
节点,因此有新的日志子节点加入都会被实时感知:1
Loading 1 mutation entries: 000000000 - 000000000
当监听到有新的
MUTATION
日志加入时,并不是所有副本都会直接做出响应,他们首先会判断自己是否为主副本。由主副本实例响应
MUTATION
日志并推送Log
日志
只有主副本才会响应MUTATION
日志,主副本会将MUTATION
日志转换为LogEntry
日志并推送到/log
节点,以通知各个副本执行具体的操作。日志的核心信息如下:1
2
3
4
5/log/log-000000003
source replica: ch5.vgbhfive.cn
block_id:
type: mutate
202005_0_1_1 to 202205_0_1_1_2各个副本实例分别拉取
Log
日志ch5
和ch6
两个副本分别监听/log/log-000000002
日志的推送,他们也会分别拉取日志到本地,并推送到各自的/queue
任务队列:1
Pulling 1 entries to queue: log-0000000002 - log-0000000002
各个副本实例分别在本地执行
MUTATION
ch5
和ch6
基于各自的/queue
队列开始执行任务:1
Executing log entry to mutate part 202205_0_1_1 to 202205_0_1_1_2
各个副本开始在本地执行
MUTATION
:1
2Cloning part 202205_0_1_1 to tmp_clone_202205_0_1_1_2
Renaming temporary part tmp_clone_202205_0_1_1_2 to 202205_0_1_1_2.
至此在 MUTATION
的整个过程中 ZooKeeper
同样不会进行任何实质性的数据传输。所有的 MUTATION
操作,最终都是由各个副本在本地完成的,而 MUTATION
操作是经过 /mutations
节点实现分发的。本着谁执行谁负责的原则,执行命令的副本负责消息推送,但是无论在哪个副本执行最终都会被交由主副本,再由主副本负责推送 Log
日志,以通知各个副本最终的 MUTATION
逻辑,同时也由主副本对日志接收的情况进行监控。
ALTER
执行流程
当对 ReplicatedMergeTree
执行 ALTER
操作的时候,即会进入 ALTER
部分的逻辑,与前几个类似 ALTER
流程会简单许多,其执行流程并不会涉及 /log
日志的分发。
修改共享元数据
在ch6
节点尝试增加一个列字段,执行语句如下:1
ALTER TABLE replicated_sales_1 ADD COLUMN id2 String
执行之后,
ch6
会修改ZooKeeper
内的共享元数据节点:1
2/metadata, /columns
Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.数据修改之后,节点的版本号也会同时提升:
1
Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.
与此同时,
ch6
还会负责监听所有副本的修改完成情况:1
2Waiting for ch5.vgbhfive.cn to apply changes.
Waiting for ch6.vgbhfive.cn to apply changes.监听共享元数据变更并各自执行本地修改
ch5
和ch6
两个副本分别监听共享元数据的变更,之后会分别对本地的元数据版本号与共享版本号进行对比。1
2Metadata changed in ZooKeeper. Applying changes locally.
Applied changes to the metadata of the table.确认所有副本完成修改
确认所有副本均已完成修改:1
2ALTER finished.
Done processing query.
至此整个 ALTER
流程结束。在执行过程中,ZooKeeper
没有参与实质性的数据传输,所有的 ALTER
都是各个副本在本地完成的。
分片
通过引入数据副本可以降低数据丢失的风险,并提升查询的性能,但是仍然有一个问题没有解决,那就是数据表的容量问题,到目前为止每个副本都是保存全量的数据。
ClickHouse
中的每个节点都可以称为一个 shard
(分片),对于一个完整的方案来说,还需要考虑数据在写入时数据如何被均匀地被写入到各个分片中,以及在数据查询时如何路由到每个分片并组合成结果集,所以 ClickHouse
地数据分片需要结合 Distributed
表引擎一同使用。
Distributed
表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。
集群的配置方式
在 ClickHouse
中集群配置用 shard
代表分片,用 replica
代表副本。
1 | # 1 分片 0 副本 |
这样的配置貌似有点问题,其实分片更像是逻辑层面的分组,而无论是分片或时副本,承载他们的都是 replica
,所以从某种角度来看,副本也是分片。
集群分布式 DDL
在默认情况下 CREATE
、 DROP
、 RENAME
、 ALTER
等 DDL
语句并不支持分布式执行,但是在加入集群配置后使用新的语法实现分布式 DDL
执行:
1 | CREATE/DROP/RENAME/ALTER TABLE OM CLUSTER cluster_name |
cluster_name
对应了配置文件中的集群名称,ClickHouse
会根据集群配置信息分别去各个节点执行 DDL
语句。
数据结构
与 ReplicatedMergeTree
类似,分布式 DDL
语句在执行的过程中也需要借助 ZooKeeper
的协同能力,以实现日志分发。
ZooKeeper
内的节点结构
默认情况下分布式DDL
在ZooKeeper
内使用的根路径为:1
/clickhouse/task_queue/ddl
该地址可以在
config.xml
中的distributed_ddl
配置指定。
当然在此节点之下还有其他的监听节点,包含/query-[seq]
,其内包含DDL
操作日志,每执行一次分布式DDL
查询,在该节点下新增一条操作日志。DDL
操作日志使用ZooKeeper
的持久顺序型节点,每条指令的名称以query-
为前缀,后面的序号递增。在该操作日志下,还有两条状态节点:/query-[seq]/active
用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为active
的节点。/query-[seq]/finished
用于检查任务完成情况,在任务的执行过程中,每当集群内的某个host
执行完毕之后,就会在该节点下写入记录。
DDLLongEntry
日志对象的数据结构
在/query-[seq]
下记录的日志信息由DDLLogEntry
承载,拥有以下几个核心属性:query
记录DDL
查询的执行语句1
query: DROP TABLE default.test_1_local ON CLUSTER shard_2
hosts
记录指定集群的主机列表,集群由分布式DDL
语句中的ON CLUSTER
指定1
hosts: ['ch5.vgbhfive.cn:9000', 'ch6.vgbhfive.cn:9000']
initiator
记录初始化host
主机的名称,主机列表来源于初始化host
节点上的集群1
initiator: ch5.vgbhfive.cn
分布式
DDL
的核心执行流程
整个流程从上到下按照时间顺序进行,其大致分为三个步骤:- 推送
DDL
日志,在节点执行语句,本着谁执行谁负责原则,会由这个节点创建DDLLogEntry
日志并将日志推送到ZooKeeper
,同时也会由这个节点负责监控任务的执行进度。 - 拉取日志并执行,其余节点监听到日志推送,于是拉取日志到本地。首先判断自身
host
是否被包含在DDLLogEntry
的hosts
列表中,如果包含在内则进入执行流程,执行完毕后将状态写入finished
节点;如果不包含则忽略这次日子的推送。 - 确认执行进度,在执行
DDL
语句之后,客户端会阻塞等待180
秒后,以期望所有host
执行完毕。如果等待时间超过180
秒,则进入后台线程继续等待(等待时间由distributed_ddl_task_timeout
参数执行,默认180
秒)。
- 推送
Distributed
原理解析
Distributed
表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以 Distributed
表引擎需要和其他数据表引擎一起协同工作。
从实体表层面来看,一张分片由两部分组成:
- 本地表:通常以
_local
为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed
的任意表引擎,一张本地表对应了一个数据分片。 - 分布式表:通常以
_all
为后缀进行命名。分布式表是能使用Distributed
表引擎,他与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。
Distirbuted
表引擎采用读时检查,即在查询时才会抛出错误,而不会在创建表时检查。
定义形式
Distributed
表引擎的定义形式如下:
1 | ENGINE = Distributed(cluster, database, table [, sharding_key]) |
其中各个参数的含义如下:
cluster
:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,他会使用集群的配置信息来找到相应的host
节点。dataabse
和table
:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。sharding_key
:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host
节点的本地表。
查询分类
Distributed
表引擎的查询操作分类如下:
- 会作用于本地表的查询,对于
INSERT
和SELECT
查询,Distributed
将会以分布式的方式作用于local
本地表。 - 只会影响
Distributed
自身,不会作用于本地表的查询:Distributed
支持部分元数据操作,包括CREATE
、DROP
、REANME
、ALTER
,其中ALTER
并不包括分区的操作。 - 不支持的查询:
Distributed
表不支持任何MUTATION
类型的操作,包括ALTER DELETE
和ALTER UPDATE
。
分片规则
分片键要求返回一个整数类型的取值,包括 Int
类型和 UInt
类型。
1 | Distributed(cluster, database, table, userid) # 按照用户 id 的余数划分 |
如果不声明分片键,那么分布式表则只会有一个分片,意味者只能映射一张本地表。当然如果分布式表只包含一个分片,那也就失去了使用的意义。
关于数据如何被具体的划分,需要明确以下几个概念:
- 分片权重
在集群的配置中,还有一项分片权重(weight
)的设置:分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么被写入的数据就会越多。1
2
3
4
5
6<shard>
<weight>10</weight>
</shard>
<shard>
<weight>20</weight>
</shard> slot
(槽)
slot
可以理解成许多个小水槽,如果把数据比作成水的话,那么数据之水会顺着这些水槽流进每个数据分片。slot
的数量等于所有分片的权重之和。- 选择函数
选择函数用于判断一行待写入的数据应该被写入到哪个分片,那整个步骤会被分为两个步骤:- 找出
slot
的取值,计算公式如下:其中1
slot = shard_value - sum_weight
shard_value
是分片键的取值,sum_weight
是所有分片的权重之和。 - 基于
slot
值找到对应的数据分片
- 找出
分布式核心流程
分布式写入流程
在向集群内的分片写入数据时,通常有两种思路:一种是借助外部计算系统,事先将数据均匀分片,再借由计算系统直接写入 ClickHouse
集群的各个本地表。第二种则是通过 Distributed
表引擎代理写入分片数据的。
将数据写入分片的核心流程
在对 Distributed
表执行 INSERT
查询的时候,会进入数据写入分片的执行逻辑,可以分为五个步骤,整体流程如下:
在第一个分片节点写入本地分片数据
在ch5
节点对本地表test_shard_2_all
执行INSERT
查询,尝试写入10, 30, 200, 50
四行数据。执行之后分布式表主要分做两件事情:其一,根据分片规则划分数据;第二,将属于当前分片的数据直接写入本地表test_shard_2_all
。第一个分片建立远端连接,准备发送远端分片数据
将归至远端分片的数据以分区为单位,分别写入test_shard_2_all
存储目录下的临时bin
文件,数据文件的命名规则如下:1
/database@host:port/[increase_num].bin
由于在这个示例中只有一个远端分片
ch6
,所以临时数据文件如下:1
/test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin
10, 200, 50
三行数据会被写入上述临时数据文件。接着会尝试与远端ch6
分片建立连接:1
Connection (ch6.vgbhfive.cn) : Connected to ClickHouse server
第一个分片向远端分片发送数据
此时会有另一组监听任务负责监听/test_shard_2_all
目录下的文件变化,这些任务负责将目录数据发送至远端分片:1
2test_shard_2_all.Distributed.DirectoryMonitor:
Started processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。
第二个分片接收数据并写入本地
ch6
分片节点确认建立与ch5
的连接:1
2TCPHandlerFactory: TCP Request. Address: ch5:459127
TCPHandler: Connected ClickHouse server在接收到
ch5
发送的数据后,将他们写入本地表:1
2
3
4
5
6
7
8
9
10executeQuery: (from ch5) INSERT INTO default.test_shard_2_local
-- 第一个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_10_1_1_0 to 10_1_1_0.
-- 第二个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_200_1_1_0 to 200_1_1_0.
-- 第三个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_50_1_1_0 to 50_1_1_0.由第一个分片确认完成写入
最后由ch5
分片确认所有的数据发送完毕:1
Finished processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin
至此整个流程结束,Distributed
表负责所有分片的写入工作。在由 Distirbuted
表负责向远端分片发送数据时,有异步和同步两种写模式:如果是异步写入则在 Distributed
表写完本地分片之后,INSERT
查询就会返回成功写入的消息;如果是同步写入则在 INSERT
查询之后,会等待所有分片完成写入。使用何种模式由 insert_distributed_sync
参数控制,默认为 false
,即异步写入;如果将其设置为 true
,则可以进一步通过 insert_distributed_timeout
参数控制同步等待的超时时间。
副本复制数据流程
除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:一种是继续借助 Distributed
表引擎,由它将数据写入副本。另一种则是借助 ReplicatedMergeTree
表引擎实现副本数据的分发。
分布式查询流程
与数据写入有所不同,在面向集群查询数据的时候,只能通过 Distributed
表引擎实现。当 Distributed
表接收到 SELECT
查询的时候,他会依次查询每个分片的数据,再合并汇总返回。
多副本的路由规则
在查询数据的时候,如果集群中的一个shard
,拥有多个replica
,那么Distributed
表引擎需要面临副本选择的问题。他会使用负载均衡算法从众多replica
中选择一个,而具体使用何种负载均衡算法,则由load_balancing
参数控制:1
load_balancing = random / nearest_hostname / in_order / first_or_random
有四种负载均衡算法:
random
nearest_hostname
in_order
first_or_random
多分片查询
分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,他会接收SELECT
查询的Distributed
表并负责串联起整个过程。首先他会针对分布式表的SQL
语句,按照分片数量将查询拆分为若干个本地表的子查询,然后向各个分片发起查询,然后再汇总各个分片的返回结果。
引用
个人备注
此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!