概述
随着业务线数据量的突飞猛进、服务器的意外宕机,这些都是底层基础服务会遇到的问题,因此 ClickHouse 就设计了集群、副本和分片这三个帮手来帮忙。
集群是副本和分片的基础,他将 ClickHouse. 的服务拓扑由单节点延伸为多节点,但是他又不像 hadoop 那样的系统,要求所有的节点都组成一个大集群。ClickHouse 的集群配置非常灵活,用户既可以将所有节点组成一个大集群,也可以按照业务的诉求将节点划分为多个小集群。
在每个小集群区域之间,他们的节点、分区和副本数量可以各不相同。从总体来看,集群定义了多个节点的拓扑关系,这些节点在后续服务中会相互合作,而执行层面的具体内容则是由副本和分片来执行。
那么如何区分副本和分片呢?
在数据层面上,副本之间的数据完全相同,而分片之间数据是不同的。在功能层面上,副本的作用在于防止数据丢失,增加存储数据的冗余,而分片的目的在于实现数据的水平切分。
副本
之前有说过 ReplicatedMergeTree 复制表引擎,该引擎可以实现应用副本的能力,他是在 MergeTree 表引擎的基础上实现了分布式协同的能力。
在 MergeTree 中,一个数据分区从开始创建到全部完成,会经历两类存储区域:
- 内存,数据首先会被写入到内存缓冲区。
- 本地磁盘,数据接着会被写入
tmp临时目录分区,待全部完成后再将临时目录重命名为正式分区。
而 ReplicatedMergeTree 在上述的基础上增加了 ZooKeeper 的部分,他会进一步在 ZooKeeper 内部创建一系列的监听节点,并以此实现多个实例之间的通信,并且在整个通信过程中,ZooKeeper 不会涉及到任何的数据传输。
那么我们总结下副本的特点:
- 依赖
ZooKeeper,在执行insert和alter查询时ReplicatedMergeTree需要借助ZooKeeper的分布式协同能力,以实现多个副本之间的同步,但是在select副本时并不需要使用。 - 表级别的副本,副本是在表级别定义的,所以每张表的副本配置都可以按照他的实际需求进行个性化定义,包括副本的数量、副本在集群中的分布位置等。
- 多主架构,可以在任意一个副本上执行
insert和alter查询,他们的效果都是相同的。 Block数据块,在执行insert命令时,会依据max_insert_block_size的大小将数据切分为若干个Block数据块,因此Block数据块是写入的基本单元,并且具有写入的唯一性和原子行。- 原子性,在数据写入时,一个
Block块内的数据要么全部写入成功,要么全部失败。 - 唯一性,在写入一个
Block数据块时,会按照当前Block数据块的数据顺序、数据行和数据大小等指标计算Hash信息摘要并记录在案。如果后续遇到相同的Hash摘要则该数据块会被忽略。
ZooKeeper 配置方式
首先新建一个 metrika.xml 的配置文件内容如下:
1 |
|
接着在全局配置 config.xml 中使用 <include_from> 标签导入刚才定义的配置:
1 | <include_from>/etc/clickhouse-server/cpnfig.d/metrika.xml</include_from> |
incl 与 metrika.xml 配置文件中的节点名称彼此要相互对应。
另外 ClickHouse 还在系统表中提供了一张 zookeeper 的代理表,通过这个表可以使用 SQL 查询读取远端 ZooKeeper 内的数据,不过查询时需要指定 path 条件才能查询到数据。
副本定义形式
首先由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次由于副本采用多主架构,所以每个副本实例都可以作为数据读、写的入口,但这都增加了节点的负载。
ReplicatedMergeTree 定义方式如下:
1 | ENGINE = ReplicatedMergeTree('zk_path', 'replica_name') |
在上述配置中有 zk_path 和 replica_name 两项配置:
zk_path用于指定在ZooKeeper中创建的数据表的路径,路径名称是自定义的,可以设置成自己希望的任何路径。
当然也有一些约定俗成的配置:/clickhouse/tables/是约定的路径固定前缀,表示存放数据表的根路径。{shard}表示分片编号,通常使用数字来替代。table_name表示数据表的名称,为了维护方便通常采用与物理表相同的名字。
replica_name是定义副本名称,该名称是区分不同副本实例的唯一标识。
对于 zk_path 而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径。而对于 replica_name 而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称。
ReplicatedMergeTree 原理解析
在 ReplicatedMergeTree 的核心逻辑中,大量运用了 ZooKeeper 的能力,以实现多个 ReplicatedMergeTree 副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和 BlockID 去重判断等。
在执行 INSERT 数据写入、MERGE 分区和 MUTATION 操作的时候都会涉及到 ZooKeeper 的通信,但是在通信的过程中,并不会涉及到任何表数据的传输,在查询数据时也不会访问 ZooKeeper。
ZooKeeper 节点结构
ReplicatedMergeTree 依赖 ZooKeeper 的事件监听机制以实现各个副本之间的协同。因此在每个 ReplicatedMergeTree 表的创建过程中,会以 zk_path 为根路径创建一组监听节点,按照作用不同,监听节点可以大致分为一下几点:
- 元数据
/metadata保存元数据信息,包括主键、分区键、采样表达式等。/columns保存列字段信息,包括列名称和数据类型。/replicas保存副本名称,对应设置参数中的replica_name。
- 判断标识
/leader_election用于主副本的选举工作,主副本会主导MERGE和MUTATION操作,这些任务都是在主副本完成之后再借助ZooKeeper将消息事件分发到其他副本。/blocks记录Block数据块的Hash信息摘要,以及对应的partition_id。通过Hash摘要能够判断Block数据块是否重复;通过partition_id则能找到需要同步的数据分区。block_numbers按照分区的写入顺序,以相同的顺序记录partition_id,各个副本在本地进行MERGE时都会依照相同的block_numbers顺序进行。quorum记录quorum的数量,当至少有quorum数量的副本写入成功后,整个写入操作才算成功。quorum的数量有insert_quorum参数控制,默认值为0。
- 操作日志
/log常规操作日志,保存副本需要执行的任务指令。/mutations操作日志,作用与/log日志类似,当执行ALERT DELETE或ALERT UPDATE查询时,操作指令会被添加到这个节点。/replicas/{replica_name}/*每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令/queue任务队列节点,用于执行具体的操作任务。/log_pointerlog日志指针节点,记录最后一次执行的log日志下标信息。/mutation_pointermutations日志指针节点,记录了最后一次执行的mutatutions日志名称。
数据结构
/log 和 /mutations 他们犹如通信路由器,是分发操作指令的信息通道,而发送指令的方式则是为这些父节点添加子节点。所有的副本实例都会监听父节点的变化,当有子节点被添加时都会被其他副本实时感知。
被添加的子节点统一被抽象为 Entry 对象,而具体实现则是 LogEntry 和 MutationEntry 对象承载,分别对应 /log 和 /mutations 节点:
LogEntry用于封装/log子节点信息,核心属性如下:source replica发送这条Log指令的副本来源,对应replica_name。type操作指令类型,主要有get、merge和mutate三种,分别对应从远程副本下载分区、合并分区和MUTATION操作。block_id当前分区的BlockID,对应/blocks路径下子节点的名称。partition_name当前分区目录的名称。
MutationEntry用于封装/mutations子节点信息,核心属性如下:source replica发送这条MUTATION指令的副本来源,对应replica_name。commands操作指令,主要有ALERT DELETE和ALERT UPDATE。mutation_idMUTATION操作的版本号。partition_id当权分区目录的ID。
副本协同流程
写入执行流程
当需要在 ReplicatedMergeTree 中执行 INSERT 查询以写入数据时,即会进入 INSERT 核心流程,整体流程从上至下按照时间顺序进行,大致可以分为八个步骤。
创建第一个副本实例
1
2
3
4
5
6
7CREATE TABLE replica_sales_1 {
id String,
price Float64,
create_time DateTime
} ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch5.vgbhfive.cn')
PARTITION BY toYYYYMM(create_time)
ORDER BY id在创建的过程中
ReplicatedMergeTree会进行一些初始化操作:- 根据
zk_path初始化所有的ZooKeeper节点。 - 在
/replicas/节点下注册自己的副本实例ch5.vgbhfive.cn。 - 启动监听任务,监听
/log日志节点。 - 参与副本选举,选举出主副本,选举的方式是向
/leader_election/插入子节点,第一个插入成功的副本就是主副本。
- 根据
创建第二个副本实例
与上述第一个创建副本实例类似,不同之处在于实例名称。1
2
3
4
5
6
7CREATE TABLE replica_sales_1 {
id String,
price Float64,
create_time DateTime
} ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch6.vgbhfive.cn')
PARTITION BY toYYYYMM(create_time)
ORDER BY id在创建过程中,第二个
ReplicatedMergeTree同样会进行一些初始化操作:- 在
/replicas/节点下注册自己的副本实例ch6.vgbhfive.cn。 - 启动监听任务,监听
/log日志节点。 - 参数副本选举,选举出主副本。
- 在
向第一个实例中写入数据
现在尝试向第一个副本ch5写入数据:1
INSERT INTO TABLE replicated_sales_1 VALUES('A001', 100, '2022-05-15 00:00:00')
上述命令执行完毕后,首先会在本地完成分区目录的写入:
1
Renaming temporary part tmp_insert_202205_1_1_0 to 202205_0_0_0
接着向
/blocks节点写入该数据分区的block_id:1
Wrote block with ID '202205_xxxxx_yyyy'
该
block_id将作为后续去重操作的判断依据。如果此时再重复执行刚才的语句,试图写入重复数据,则会抛出异常,即副本会自动忽略block_id重复的待写入数据。
此外如果设置了insert_quorum(默认参数为0),并且insert_quorum >= 2,则ch5会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于insert_quorum时,整个写入操作才会成功。由第一个副本实例推送
Log日志
在3步骤完成之后,会继续执行insert的副本向/log节点推送操作日志。日志的编号是/log/log-00000000,而LogEntry的核心属性如下:1
2
3
4
5/log/log-00000000
source replica: ch5.vgbhfive.cn
block_id: 202205_xxxxx
type: get
partition_name: 202205_0_0_0从日志内容中可以看出,操作类型为
get下载,而需要下载的分区时202205_0_0_0。其余所有副本都会基于Log日志以相同的顺序执行命令。第二个副本实例拉取
Log日志ch6副本会一直监听/log节点变化,当ch5推送日志之后,ch6便会触发日志的拉取任务并更新log_pointer,将其指向最新日志下标:1
/replicas/ch6.vgbhfive.cn/log_pointer: 0
在拉取
LogEntry之后,并不会直接执行,而是将其转为任务对象放至队列:1
2/replicas/ch6.vgbhfive.cn/queues/
Pulling 1 entries to queue: log-00000000 to log-00000000上述
LogEntry放入队列是因为在复杂的情况下,会连续收到许多个LogEntry,所以使用队列消化任务也是一种合理的设计。第二个副本实例向其他副本发起下载请求
ch6基于/queue队列开始执行任务,当看到type为get时,ReplicatedMergeTree会明白此时在远端的其他副本已经成功写入数据分区,而自己需要同步这些数据。ch6上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:- 从
/replicas节点拿到所有的副本节点。 - 遍历这些副本选取其中一个。选取的副本需要拥有最大的
log_pointer下标,并且/queue子节点数量最少。log_pointer下标最大,则意味该副本执行的日志最多,数据应该更加完整;而/queue最小,则意味着该副本目前的任务执行负担最小。
在这个实例中,算法选择的副本实例是
ch5。- 从
第一个副本实例响应数据下载
ch5的DataPartsExchange端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区202205_0_0_0基于DataPartsExchange的服务响应发送回ch6:1
Sending part 202205_0_0_0
第二个实例下载数据并完成本地写入
ch6副本在接收到ch5的分区数据后,首先将其写入到临时目录中:1
tmp_fetch_202205_0_0_0
待全部数据接收完成之后,重命名该目录:
1
Renaming temporary part tmp_fetch_202205_0_0_0 to 202205_0_0_0
至此,整个写入结束。
在整个 insert 的写入过程中,ZooKeeper 不会进行任何实质性的数据传输。本着谁执行谁负责的原则,由写入数据的实例负责发送 Log 日志、通知其他实例、监控是否完成、返回下载数据。
MERGE 执行流程
当需要在 ReplicatedMergeTree 中触发分区合并动作时,即会进入这个部分的流程,无论 MERGE 操作从哪个副本发起,其合并计划都会交由主副本来制定。
创建远程连接,尝试与主副本通信。
首先在ch6节点执行OPTIMIZE强制执行MERGE合并,此时ch6通过replicas找到主副本ch5,并尝试建立与它的远程连接。1
2optimize table replicated_sales_1
Connection (ch5.vgbhfive.cn:9000): Connecting. Database: default. User: default主副本接收通信
主副本ch5接收并建立来自远端副本ch6的连接。1
Connected ClickHouse Follower replica version 19.17.0, revision: 54428, database: default, user: default
由主副本制定
MERGE计划并推送Log日志
由主副本ch5制定MERGE计划,并判断哪些分区需要被合并。在选定之后ch5将合并计划转换为Log日志对象并推送Log日志,以通知所有副本开始合并。日志的核心信息如下:1
2
3
4
5type: merge
202205_0_0_0
202205_1_1_0
into
202205_0_1_1从日志内容可以看出操作类型为
MERGE合并,而这次需要合并的分区目录是202205_0_0_0和202205_0_1_1。与此同时主副本还会锁住执行线程,对日志的接收情况进行监听:1
Waiting for queue-0000000002 to disapper from ch5.vgbhfive.cn queue
其监听行为由
replication_alter_partitions_sync参数控制,默认值为1。- 参数为
0时,不做任何等待。 - 参数为
1时,只等主副本自身完成。 - 参数为
0时,等待所有副本拉取完成。
- 参数为
各个副本分别拉取
Log日志ch5和ch6两个副本实例将分别监听/log/log-00000002日志的推送,他们分别会拉取日志到本地,并推送到各自的/queue任务队列:1
Pulling 1 entries to queue : log-00000002 - log-000000002
各个副本分别在本地执行
MERGEch5和ch6基于各自的/queue队列开始执行任务:1
Executing log entry to merge parts 202205_0_0_0, 202205_1_1_0 to 202205_0_1_1
各个副本开始在本地执行
MERGE:1
Merged 2 parts: from 202205_0_0_0 to 202205_1_1_0
至此 MERGE 的合并过程 ZooKeeper 不会进行任何实质性的数据传输,所有的合并操作最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,最终都会交由主副本负责合并计划的制定、消息日志的推送以及日志接收情况的监控。
MUTATION 执行流程
当对 ReplicatedMergeTree 执行 ALTER DELTE 或者 ALTER UPDATE 操作的时候,即会进入 MUTATION 部分的逻辑,与 MERGE 类似,无论 MUTATION 操作从哪个副本发起,首先都会由主副本进行响应。
- 推送
MUTATION日志
在ch6节点尝试通过DELETE来删除一行数据,执行如下命令:上述命令执行之后该副本会接着执行两个重要事项:1
ALTER TABLE replicated_sales_1 DELETE where id = 1;
- 创建
MUTATION ID:1
Created mutation with ID 000000000
- 将
MUTATION操作转换为MutatutionEntry日志,并推送到/mutations/00000000。Mutation的核心属性如下:1
2
3
4
5/mutations/000000000
source replica: ch6.vgbhfive.cn
mutation_id: 2
partition_id: 202205
commands: DELETE where id = \'1\'
所有副本实例各自监听
MUTATION日志ch5和ch6都会监听/mutations节点,因此有新的日志子节点加入都会被实时感知:1
Loading 1 mutation entries: 000000000 - 000000000
当监听到有新的
MUTATION日志加入时,并不是所有副本都会直接做出响应,他们首先会判断自己是否为主副本。由主副本实例响应
MUTATION日志并推送Log日志
只有主副本才会响应MUTATION日志,主副本会将MUTATION日志转换为LogEntry日志并推送到/log节点,以通知各个副本执行具体的操作。日志的核心信息如下:1
2
3
4
5/log/log-000000003
source replica: ch5.vgbhfive.cn
block_id:
type: mutate
202005_0_1_1 to 202205_0_1_1_2各个副本实例分别拉取
Log日志ch5和ch6两个副本分别监听/log/log-000000002日志的推送,他们也会分别拉取日志到本地,并推送到各自的/queue任务队列:1
Pulling 1 entries to queue: log-0000000002 - log-0000000002
各个副本实例分别在本地执行
MUTATIONch5和ch6基于各自的/queue队列开始执行任务:1
Executing log entry to mutate part 202205_0_1_1 to 202205_0_1_1_2
各个副本开始在本地执行
MUTATION:1
2Cloning part 202205_0_1_1 to tmp_clone_202205_0_1_1_2
Renaming temporary part tmp_clone_202205_0_1_1_2 to 202205_0_1_1_2.
至此在 MUTATION 的整个过程中 ZooKeeper 同样不会进行任何实质性的数据传输。所有的 MUTATION 操作,最终都是由各个副本在本地完成的,而 MUTATION 操作是经过 /mutations 节点实现分发的。本着谁执行谁负责的原则,执行命令的副本负责消息推送,但是无论在哪个副本执行最终都会被交由主副本,再由主副本负责推送 Log 日志,以通知各个副本最终的 MUTATION 逻辑,同时也由主副本对日志接收的情况进行监控。
ALTER 执行流程
当对 ReplicatedMergeTree 执行 ALTER 操作的时候,即会进入 ALTER 部分的逻辑,与前几个类似 ALTER 流程会简单许多,其执行流程并不会涉及 /log 日志的分发。
修改共享元数据
在ch6节点尝试增加一个列字段,执行语句如下:1
ALTER TABLE replicated_sales_1 ADD COLUMN id2 String
执行之后,
ch6会修改ZooKeeper内的共享元数据节点:1
2/metadata, /columns
Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.数据修改之后,节点的版本号也会同时提升:
1
Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.
与此同时,
ch6还会负责监听所有副本的修改完成情况:1
2Waiting for ch5.vgbhfive.cn to apply changes.
Waiting for ch6.vgbhfive.cn to apply changes.监听共享元数据变更并各自执行本地修改
ch5和ch6两个副本分别监听共享元数据的变更,之后会分别对本地的元数据版本号与共享版本号进行对比。1
2Metadata changed in ZooKeeper. Applying changes locally.
Applied changes to the metadata of the table.确认所有副本完成修改
确认所有副本均已完成修改:1
2ALTER finished.
Done processing query.
至此整个 ALTER 流程结束。在执行过程中,ZooKeeper 没有参与实质性的数据传输,所有的 ALTER 都是各个副本在本地完成的。
分片
通过引入数据副本可以降低数据丢失的风险,并提升查询的性能,但是仍然有一个问题没有解决,那就是数据表的容量问题,到目前为止每个副本都是保存全量的数据。
ClickHouse 中的每个节点都可以称为一个 shard (分片),对于一个完整的方案来说,还需要考虑数据在写入时数据如何被均匀地被写入到各个分片中,以及在数据查询时如何路由到每个分片并组合成结果集,所以 ClickHouse 地数据分片需要结合 Distributed 表引擎一同使用。
Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。
集群的配置方式
在 ClickHouse 中集群配置用 shard 代表分片,用 replica 代表副本。
1 | # 1 分片 0 副本 |
这样的配置貌似有点问题,其实分片更像是逻辑层面的分组,而无论是分片或时副本,承载他们的都是 replica,所以从某种角度来看,副本也是分片。
集群分布式 DDL
在默认情况下 CREATE 、 DROP 、 RENAME 、 ALTER 等 DDL 语句并不支持分布式执行,但是在加入集群配置后使用新的语法实现分布式 DDL 执行:
1 | CREATE/DROP/RENAME/ALTER TABLE OM CLUSTER cluster_name |
cluster_name 对应了配置文件中的集群名称,ClickHouse 会根据集群配置信息分别去各个节点执行 DDL 语句。
数据结构
与 ReplicatedMergeTree 类似,分布式 DDL 语句在执行的过程中也需要借助 ZooKeeper 的协同能力,以实现日志分发。
ZooKeeper内的节点结构
默认情况下分布式DDL在ZooKeeper内使用的根路径为:1
/clickhouse/task_queue/ddl
该地址可以在
config.xml中的distributed_ddl配置指定。
当然在此节点之下还有其他的监听节点,包含/query-[seq],其内包含DDL操作日志,每执行一次分布式DDL查询,在该节点下新增一条操作日志。DDL操作日志使用ZooKeeper的持久顺序型节点,每条指令的名称以query-为前缀,后面的序号递增。在该操作日志下,还有两条状态节点:/query-[seq]/active用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为active的节点。/query-[seq]/finished用于检查任务完成情况,在任务的执行过程中,每当集群内的某个host执行完毕之后,就会在该节点下写入记录。
DDLLongEntry日志对象的数据结构
在/query-[seq]下记录的日志信息由DDLLogEntry承载,拥有以下几个核心属性:query记录DDL查询的执行语句1
query: DROP TABLE default.test_1_local ON CLUSTER shard_2
hosts记录指定集群的主机列表,集群由分布式DDL语句中的ON CLUSTER指定1
hosts: ['ch5.vgbhfive.cn:9000', 'ch6.vgbhfive.cn:9000']
initiator记录初始化host主机的名称,主机列表来源于初始化host节点上的集群1
initiator: ch5.vgbhfive.cn
分布式
DDL的核心执行流程
整个流程从上到下按照时间顺序进行,其大致分为三个步骤:- 推送
DDL日志,在节点执行语句,本着谁执行谁负责原则,会由这个节点创建DDLLogEntry日志并将日志推送到ZooKeeper,同时也会由这个节点负责监控任务的执行进度。 - 拉取日志并执行,其余节点监听到日志推送,于是拉取日志到本地。首先判断自身
host是否被包含在DDLLogEntry的hosts列表中,如果包含在内则进入执行流程,执行完毕后将状态写入finished节点;如果不包含则忽略这次日子的推送。 - 确认执行进度,在执行
DDL语句之后,客户端会阻塞等待180秒后,以期望所有host执行完毕。如果等待时间超过180秒,则进入后台线程继续等待(等待时间由distributed_ddl_task_timeout参数执行,默认180秒)。
- 推送
Distributed 原理解析
Distributed 表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以 Distributed 表引擎需要和其他数据表引擎一起协同工作。
从实体表层面来看,一张分片由两部分组成:
- 本地表:通常以
_local为后缀进行命名。本地表是承接数据的载体,可以使用非Distributed的任意表引擎,一张本地表对应了一个数据分片。 - 分布式表:通常以
_all为后缀进行命名。分布式表是能使用Distributed表引擎,他与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。
Distirbuted 表引擎采用读时检查,即在查询时才会抛出错误,而不会在创建表时检查。
定义形式
Distributed 表引擎的定义形式如下:
1 | ENGINE = Distributed(cluster, database, table [, sharding_key]) |
其中各个参数的含义如下:
cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,他会使用集群的配置信息来找到相应的host节点。dataabse和table:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个host节点的本地表。
查询分类
Distributed 表引擎的查询操作分类如下:
- 会作用于本地表的查询,对于
INSERT和SELECT查询,Distributed将会以分布式的方式作用于local本地表。 - 只会影响
Distributed自身,不会作用于本地表的查询:Distributed支持部分元数据操作,包括CREATE、DROP、REANME、ALTER,其中ALTER并不包括分区的操作。 - 不支持的查询:
Distributed表不支持任何MUTATION类型的操作,包括ALTER DELETE和ALTER UPDATE。
分片规则
分片键要求返回一个整数类型的取值,包括 Int 类型和 UInt 类型。
1 | Distributed(cluster, database, table, userid) # 按照用户 id 的余数划分 |
如果不声明分片键,那么分布式表则只会有一个分片,意味者只能映射一张本地表。当然如果分布式表只包含一个分片,那也就失去了使用的意义。
关于数据如何被具体的划分,需要明确以下几个概念:
- 分片权重
在集群的配置中,还有一项分片权重(weight)的设置:分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么被写入的数据就会越多。1
2
3
4
5
6<shard>
<weight>10</weight>
</shard>
<shard>
<weight>20</weight>
</shard> slot(槽)
slot可以理解成许多个小水槽,如果把数据比作成水的话,那么数据之水会顺着这些水槽流进每个数据分片。slot的数量等于所有分片的权重之和。- 选择函数
选择函数用于判断一行待写入的数据应该被写入到哪个分片,那整个步骤会被分为两个步骤:- 找出
slot的取值,计算公式如下:其中1
slot = shard_value - sum_weight
shard_value是分片键的取值,sum_weight是所有分片的权重之和。 - 基于
slot值找到对应的数据分片
- 找出
分布式核心流程
分布式写入流程
在向集群内的分片写入数据时,通常有两种思路:一种是借助外部计算系统,事先将数据均匀分片,再借由计算系统直接写入 ClickHouse集群的各个本地表。第二种则是通过 Distributed 表引擎代理写入分片数据的。
将数据写入分片的核心流程
在对 Distributed 表执行 INSERT 查询的时候,会进入数据写入分片的执行逻辑,可以分为五个步骤,整体流程如下:
在第一个分片节点写入本地分片数据
在ch5节点对本地表test_shard_2_all执行INSERT查询,尝试写入10, 30, 200, 50四行数据。执行之后分布式表主要分做两件事情:其一,根据分片规则划分数据;第二,将属于当前分片的数据直接写入本地表test_shard_2_all。第一个分片建立远端连接,准备发送远端分片数据
将归至远端分片的数据以分区为单位,分别写入test_shard_2_all存储目录下的临时bin文件,数据文件的命名规则如下:1
/database@host:port/[increase_num].bin
由于在这个示例中只有一个远端分片
ch6,所以临时数据文件如下:1
/test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin
10, 200, 50三行数据会被写入上述临时数据文件。接着会尝试与远端ch6分片建立连接:1
Connection (ch6.vgbhfive.cn) : Connected to ClickHouse server
第一个分片向远端分片发送数据
此时会有另一组监听任务负责监听/test_shard_2_all目录下的文件变化,这些任务负责将目录数据发送至远端分片:1
2test_shard_2_all.Distributed.DirectoryMonitor:
Started processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。
第二个分片接收数据并写入本地
ch6分片节点确认建立与ch5的连接:1
2TCPHandlerFactory: TCP Request. Address: ch5:459127
TCPHandler: Connected ClickHouse server在接收到
ch5发送的数据后,将他们写入本地表:1
2
3
4
5
6
7
8
9
10executeQuery: (from ch5) INSERT INTO default.test_shard_2_local
-- 第一个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_10_1_1_0 to 10_1_1_0.
-- 第二个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_200_1_1_0 to 200_1_1_0.
-- 第三个分区
Reserving 1.00 MB on disk 'default'
Renaming temporary part tmp_insert_50_1_1_0 to 50_1_1_0.由第一个分片确认完成写入
最后由ch5分片确认所有的数据发送完毕:1
Finished processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin
至此整个流程结束,Distributed 表负责所有分片的写入工作。在由 Distirbuted 表负责向远端分片发送数据时,有异步和同步两种写模式:如果是异步写入则在 Distributed 表写完本地分片之后,INSERT 查询就会返回成功写入的消息;如果是同步写入则在 INSERT 查询之后,会等待所有分片完成写入。使用何种模式由 insert_distributed_sync 参数控制,默认为 false,即异步写入;如果将其设置为 true,则可以进一步通过 insert_distributed_timeout 参数控制同步等待的超时时间。
副本复制数据流程
除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:一种是继续借助 Distributed 表引擎,由它将数据写入副本。另一种则是借助 ReplicatedMergeTree 表引擎实现副本数据的分发。
分布式查询流程
与数据写入有所不同,在面向集群查询数据的时候,只能通过 Distributed 表引擎实现。当 Distributed 表接收到 SELECT 查询的时候,他会依次查询每个分片的数据,再合并汇总返回。
多副本的路由规则
在查询数据的时候,如果集群中的一个shard,拥有多个replica,那么Distributed表引擎需要面临副本选择的问题。他会使用负载均衡算法从众多replica中选择一个,而具体使用何种负载均衡算法,则由load_balancing参数控制:1
load_balancing = random / nearest_hostname / in_order / first_or_random
有四种负载均衡算法:
randomnearest_hostnamein_orderfirst_or_random
多分片查询
分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,他会接收SELECT查询的Distributed表并负责串联起整个过程。首先他会针对分布式表的SQL语句,按照分片数量将查询拆分为若干个本地表的子查询,然后向各个分片发起查询,然后再汇总各个分片的返回结果。
引用
个人备注
此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!