ClickHouse-分布式

概述

随着业务线数据量的突飞猛进、服务器的意外宕机,这些都是底层基础服务会遇到的问题,因此 ClickHouse 就设计了集群副本分片这三个帮手来帮忙。

集群是副本和分片的基础,他将 ClickHouse. 的服务拓扑由单节点延伸为多节点,但是他又不像 hadoop 那样的系统,要求所有的节点都组成一个大集群。ClickHouse 的集群配置非常灵活,用户既可以将所有节点组成一个大集群,也可以按照业务的诉求将节点划分为多个小集群。
在每个小集群区域之间,他们的节点、分区和副本数量可以各不相同。从总体来看,集群定义了多个节点的拓扑关系,这些节点在后续服务中会相互合作,而执行层面的具体内容则是由副本和分片来执行。

那么如何区分副本和分片呢?
在数据层面上,副本之间的数据完全相同,而分片之间数据是不同的。在功能层面上,副本的作用在于防止数据丢失,增加存储数据的冗余,而分片的目的在于实现数据的水平切分。


副本

之前有说过 ReplicatedMergeTree 复制表引擎,该引擎可以实现应用副本的能力,他是在 MergeTree 表引擎的基础上实现了分布式协同的能力。
MergeTree 中,一个数据分区从开始创建到全部完成,会经历两类存储区域:

  • 内存,数据首先会被写入到内存缓冲区。
  • 本地磁盘,数据接着会被写入 tmp 临时目录分区,待全部完成后再将临时目录重命名为正式分区。

ReplicatedMergeTree 在上述的基础上增加了 ZooKeeper 的部分,他会进一步在 ZooKeeper 内部创建一系列的监听节点,并以此实现多个实例之间的通信,并且在整个通信过程中,ZooKeeper 不会涉及到任何的数据传输。

那么我们总结下副本的特点:

  • 依赖 ZooKeeper,在执行 insertalter 查询时 ReplicatedMergeTree 需要借助 ZooKeeper 的分布式协同能力,以实现多个副本之间的同步,但是在 select 副本时并不需要使用。
  • 表级别的副本,副本是在表级别定义的,所以每张表的副本配置都可以按照他的实际需求进行个性化定义,包括副本的数量、副本在集群中的分布位置等。
  • 多主架构,可以在任意一个副本上执行 insertalter 查询,他们的效果都是相同的。
  • Block 数据块,在执行 insert 命令时,会依据 max_insert_block_size 的大小将数据切分为若干个 Block 数据块,因此 Block 数据块是写入的基本单元,并且具有写入的唯一性和原子行。
  • 原子性,在数据写入时,一个 Block 块内的数据要么全部写入成功,要么全部失败。
  • 唯一性,在写入一个 Block 数据块时,会按照当前 Block 数据块的数据顺序、数据行和数据大小等指标计算 Hash 信息摘要并记录在案。如果后续遇到相同的 Hash 摘要则该数据块会被忽略。

ZooKeeper 配置方式

首先新建一个 metrika.xml 的配置文件内容如下:

1
2
3
4
5
6
7
8
9
<?xml version="1.0"?>
<yandex>
<zookeeper-servers> <!-- ZooKeeper 配置,名称自定义 -->
<node index="1"> <!-- 节点配置,可以配置多个地址 -->
<host>host1.vgbhfive.cn</host>
<port>2181</port>
</node>
</zookeeper-servers>
</yandex>

接着在全局配置 config.xml 中使用 <include_from> 标签导入刚才定义的配置:

1
<include_from>/etc/clickhouse-server/cpnfig.d/metrika.xml</include_from>

inclmetrika.xml 配置文件中的节点名称彼此要相互对应。

另外 ClickHouse 还在系统表中提供了一张 zookeeper 的代理表,通过这个表可以使用 SQL 查询读取远端 ZooKeeper 内的数据,不过查询时需要指定 path 条件才能查询到数据。

副本定义形式

首先由于增加了数据的冗余存储,所以降低了数据丢失的风险;其次由于副本采用多主架构,所以每个副本实例都可以作为数据读、写的入口,但这都增加了节点的负载。

ReplicatedMergeTree 定义方式如下:

1
ENGINE = ReplicatedMergeTree('zk_path', 'replica_name')

在上述配置中有 zk_pathreplica_name 两项配置:

  • zk_path 用于指定在 ZooKeeper 中创建的数据表的路径,路径名称是自定义的,可以设置成自己希望的任何路径。
    当然也有一些约定俗成的配置:
    • /clickhouse/tables/ 是约定的路径固定前缀,表示存放数据表的根路径。
    • {shard} 表示分片编号,通常使用数字来替代。
    • table_name 表示数据表的名称,为了维护方便通常采用与物理表相同的名字。
  • replica_name 是定义副本名称,该名称是区分不同副本实例的唯一标识。

对于 zk_path 而言,同一张数据表的同一个分片的不同副本,应该定义相同的路径。而对于 replica_name 而言,同一张数据表的同一个分片的不同副本,应该定义不同的名称。

ReplicatedMergeTree 原理解析

ReplicatedMergeTree 的核心逻辑中,大量运用了 ZooKeeper 的能力,以实现多个 ReplicatedMergeTree 副本实例之间的协同,包括主副本选举、副本状态感知、操作日志分发、任务队列和 BlockID 去重判断等。
在执行 INSERT 数据写入、MERGE 分区和 MUTATION 操作的时候都会涉及到 ZooKeeper 的通信,但是在通信的过程中,并不会涉及到任何表数据的传输,在查询数据时也不会访问 ZooKeeper

ZooKeeper 节点结构

ReplicatedMergeTree 依赖 ZooKeeper 的事件监听机制以实现各个副本之间的协同。因此在每个 ReplicatedMergeTree 表的创建过程中,会以 zk_path 为根路径创建一组监听节点,按照作用不同,监听节点可以大致分为一下几点:

  • 元数据
    • /metadata 保存元数据信息,包括主键、分区键、采样表达式等。
    • /columns 保存列字段信息,包括列名称和数据类型。
    • /replicas 保存副本名称,对应设置参数中的 replica_name
  • 判断标识
    • /leader_election 用于主副本的选举工作,主副本会主导 MERGEMUTATION 操作,这些任务都是在主副本完成之后再借助 ZooKeeper 将消息事件分发到其他副本。
    • /blocks 记录 Block 数据块的 Hash 信息摘要,以及对应的 partition_id。通过 Hash 摘要能够判断 Block 数据块是否重复;通过 partition_id 则能找到需要同步的数据分区。
    • block_numbers 按照分区的写入顺序,以相同的顺序记录 partition_id,各个副本在本地进行 MERGE 时都会依照相同的 block_numbers 顺序进行。
    • quorum 记录 quorum 的数量,当至少有 quorum 数量的副本写入成功后,整个写入操作才算成功。quorum 的数量有 insert_quorum 参数控制,默认值为 0
  • 操作日志
    • /log 常规操作日志,保存副本需要执行的任务指令。
    • /mutations 操作日志,作用与 /log 日志类似,当执行 ALERT DELETEALERT UPDATE 查询时,操作指令会被添加到这个节点。
    • /replicas/{replica_name}/* 每个副本各自的节点下的一组监听节点,用于指导副本在本地执行具体的任务指令
      • /queue 任务队列节点,用于执行具体的操作任务。
      • /log_pointer log 日志指针节点,记录最后一次执行的 log 日志下标信息。
      • /mutation_pointer mutations 日志指针节点,记录了最后一次执行的 mutatutions 日志名称。
数据结构

/log/mutations 他们犹如通信路由器,是分发操作指令的信息通道,而发送指令的方式则是为这些父节点添加子节点。所有的副本实例都会监听父节点的变化,当有子节点被添加时都会被其他副本实时感知。

被添加的子节点统一被抽象为 Entry 对象,而具体实现则是 LogEntryMutationEntry 对象承载,分别对应 /log/mutations 节点:

  • LogEntry 用于封装 /log 子节点信息,核心属性如下:
    • source replica 发送这条 Log 指令的副本来源,对应 replica_name
    • type 操作指令类型,主要有 getmergemutate 三种,分别对应从远程副本下载分区、合并分区和 MUTATION 操作。
    • block_id 当前分区的 BlockID,对应 /blocks 路径下子节点的名称。
    • partition_name 当前分区目录的名称。
  • MutationEntry用于封装 /mutations 子节点信息,核心属性如下:
    • source replica 发送这条 MUTATION 指令的副本来源,对应 replica_name
    • commands 操作指令,主要有 ALERT DELETEALERT UPDATE
    • mutation_id MUTATION 操作的版本号。
    • partition_id 当权分区目录的 ID
副本协同流程
写入执行流程

当需要在 ReplicatedMergeTree 中执行 INSERT 查询以写入数据时,即会进入 INSERT 核心流程,整体流程从上至下按照时间顺序进行,大致可以分为八个步骤。

  1. 创建第一个副本实例

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE replica_sales_1 {
    id String,
    price Float64,
    create_time DateTime
    } ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch5.vgbhfive.cn')
    PARTITION BY toYYYYMM(create_time)
    ORDER BY id

    在创建的过程中 ReplicatedMergeTree 会进行一些初始化操作:

    • 根据 zk_path 初始化所有的 ZooKeeper 节点。
    • /replicas/ 节点下注册自己的副本实例 ch5.vgbhfive.cn
    • 启动监听任务,监听 /log 日志节点。
    • 参与副本选举,选举出主副本,选举的方式是向 /leader_election/ 插入子节点,第一个插入成功的副本就是主副本。
  2. 创建第二个副本实例
    与上述第一个创建副本实例类似,不同之处在于实例名称。

    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE replica_sales_1 {
    id String,
    price Float64,
    create_time DateTime
    } ENGINW = ReplicatedMergeTree('/clickhouse/tables/01/replicated_sales_1', 'ch6.vgbhfive.cn')
    PARTITION BY toYYYYMM(create_time)
    ORDER BY id

    在创建过程中,第二个 ReplicatedMergeTree 同样会进行一些初始化操作:

    • /replicas/ 节点下注册自己的副本实例 ch6.vgbhfive.cn
    • 启动监听任务,监听 /log 日志节点。
    • 参数副本选举,选举出主副本。
  3. 向第一个实例中写入数据
    现在尝试向第一个副本 ch5 写入数据:

    1
    INSERT INTO TABLE replicated_sales_1 VALUES('A001', 100, '2022-05-15 00:00:00')

    上述命令执行完毕后,首先会在本地完成分区目录的写入:

    1
    Renaming temporary part tmp_insert_202205_1_1_0 to 202205_0_0_0

    接着向 /blocks 节点写入该数据分区的 block_id

    1
    Wrote block with ID '202205_xxxxx_yyyy'

    block_id 将作为后续去重操作的判断依据。如果此时再重复执行刚才的语句,试图写入重复数据,则会抛出异常,即副本会自动忽略 block_id 重复的待写入数据。
    此外如果设置了 insert_quorum (默认参数为 0),并且 insert_quorum >= 2,则 ch5 会进一步监控已完成写入操作的副本个数,只有当写入副本个数大于或等于 insert_quorum 时,整个写入操作才会成功。

  4. 由第一个副本实例推送 Log 日志
    3 步骤完成之后,会继续执行 insert 的副本向 /log 节点推送操作日志。日志的编号是 /log/log-00000000,而 LogEntry 的核心属性如下:

    1
    2
    3
    4
    5
    /log/log-00000000
    source replica: ch5.vgbhfive.cn
    block_id: 202205_xxxxx
    type: get
    partition_name: 202205_0_0_0

    从日志内容中可以看出,操作类型为 get 下载,而需要下载的分区时 202205_0_0_0。其余所有副本都会基于 Log 日志以相同的顺序执行命令。

  5. 第二个副本实例拉取 Log 日志
    ch6 副本会一直监听 /log 节点变化,当 ch5 推送日志之后,ch6 便会触发日志的拉取任务并更新 log_pointer,将其指向最新日志下标:

    1
    /replicas/ch6.vgbhfive.cn/log_pointer: 0

    在拉取 LogEntry 之后,并不会直接执行,而是将其转为任务对象放至队列:

    1
    2
    /replicas/ch6.vgbhfive.cn/queues/
    Pulling 1 entries to queue: log-00000000 to log-00000000

    上述 LogEntry 放入队列是因为在复杂的情况下,会连续收到许多个 LogEntry ,所以使用队列消化任务也是一种合理的设计。

  6. 第二个副本实例向其他副本发起下载请求
    ch6 基于 /queue 队列开始执行任务,当看到 typeget 时,ReplicatedMergeTree 会明白此时在远端的其他副本已经成功写入数据分区,而自己需要同步这些数据。
    ch6 上的第二个副本实例会开始选择一个远端的其他副本作为数据的下载来源。远端副本的选择算法大致是这样的:

    • /replicas 节点拿到所有的副本节点。
    • 遍历这些副本选取其中一个。选取的副本需要拥有最大的 log_pointer 下标,并且 /queue 子节点数量最少。log_pointer 下标最大,则意味该副本执行的日志最多,数据应该更加完整;而 /queue 最小,则意味着该副本目前的任务执行负担最小。

    在这个实例中,算法选择的副本实例是 ch5

  7. 第一个副本实例响应数据下载
    ch5DataPartsExchange 端口服务接收到调用请求,在得知对方来意之后,根据参数做出响应,将本地分区 202205_0_0_0 基于 DataPartsExchange 的服务响应发送回 ch6

    1
    Sending part 202205_0_0_0
  8. 第二个实例下载数据并完成本地写入
    ch6 副本在接收到 ch5 的分区数据后,首先将其写入到临时目录中:

    1
    tmp_fetch_202205_0_0_0

    待全部数据接收完成之后,重命名该目录:

    1
    Renaming temporary part tmp_fetch_202205_0_0_0 to 202205_0_0_0

    至此,整个写入结束。

在整个 insert 的写入过程中,ZooKeeper 不会进行任何实质性的数据传输。本着谁执行谁负责的原则,由写入数据的实例负责发送 Log 日志、通知其他实例、监控是否完成、返回下载数据。

MERGE 执行流程

当需要在 ReplicatedMergeTree 中触发分区合并动作时,即会进入这个部分的流程,无论 MERGE 操作从哪个副本发起,其合并计划都会交由主副本来制定。

  1. 创建远程连接,尝试与主副本通信。
    首先在 ch6 节点执行 OPTIMIZE 强制执行 MERGE 合并,此时 ch6 通过 replicas 找到主副本 ch5,并尝试建立与它的远程连接。

    1
    2
    optimize table replicated_sales_1
    Connection (ch5.vgbhfive.cn:9000): Connecting. Database: default. User: default
  2. 主副本接收通信
    主副本 ch5 接收并建立来自远端副本 ch6 的连接。

    1
    Connected ClickHouse Follower replica version 19.17.0, revision: 54428, database: default, user: default
  3. 由主副本制定 MERGE 计划并推送 Log 日志
    由主副本 ch5 制定 MERGE 计划,并判断哪些分区需要被合并。在选定之后 ch5 将合并计划转换为 Log 日志对象并推送 Log 日志,以通知所有副本开始合并。日志的核心信息如下:

    1
    2
    3
    4
    5
    type: merge
    202205_0_0_0
    202205_1_1_0
    into
    202205_0_1_1

    从日志内容可以看出操作类型为 MERGE 合并,而这次需要合并的分区目录是 202205_0_0_0202205_0_1_1。与此同时主副本还会锁住执行线程,对日志的接收情况进行监听:

    1
    Waiting for queue-0000000002 to disapper from ch5.vgbhfive.cn queue

    其监听行为由 replication_alter_partitions_sync 参数控制,默认值为 1

    • 参数为 0 时,不做任何等待。
    • 参数为 1 时,只等主副本自身完成。
    • 参数为 0 时,等待所有副本拉取完成。
  4. 各个副本分别拉取 Log 日志
    ch5ch6 两个副本实例将分别监听 /log/log-00000002 日志的推送,他们分别会拉取日志到本地,并推送到各自的 /queue 任务队列:

    1
    Pulling 1 entries to queue : log-00000002 - log-000000002
  5. 各个副本分别在本地执行 MERGE
    ch5ch6 基于各自的 /queue 队列开始执行任务:

    1
    Executing log entry to merge parts 202205_0_0_0, 202205_1_1_0 to 202205_0_1_1

    各个副本开始在本地执行 MERGE

    1
    Merged 2 parts: from 202205_0_0_0 to 202205_1_1_0

至此 MERGE 的合并过程 ZooKeeper 不会进行任何实质性的数据传输,所有的合并操作最终都是由各个副本在本地完成的。而无论合并动作在哪个副本被触发,最终都会交由主副本负责合并计划的制定、消息日志的推送以及日志接收情况的监控。

MUTATION 执行流程

当对 ReplicatedMergeTree 执行 ALTER DELTE 或者 ALTER UPDATE 操作的时候,即会进入 MUTATION 部分的逻辑,与 MERGE 类似,无论 MUTATION 操作从哪个副本发起,首先都会由主副本进行响应。

  1. 推送 MUTATION 日志
    ch6 节点尝试通过 DELETE 来删除一行数据,执行如下命令:
    1
    ALTER TABLE replicated_sales_1 DELETE where id = 1;
    上述命令执行之后该副本会接着执行两个重要事项:
  • 创建 MUTATION ID
    1
    Created mutation with ID 000000000
  • MUTATION 操作转换为 MutatutionEntry 日志,并推送到 /mutations/00000000Mutation 的核心属性如下:
    1
    2
    3
    4
    5
    /mutations/000000000
    source replica: ch6.vgbhfive.cn
    mutation_id: 2
    partition_id: 202205
    commands: DELETE where id = \'1\'
  1. 所有副本实例各自监听 MUTATION 日志
    ch5ch6 都会监听 /mutations 节点,因此有新的日志子节点加入都会被实时感知:

    1
    Loading 1 mutation entries: 000000000 - 000000000

    当监听到有新的 MUTATION 日志加入时,并不是所有副本都会直接做出响应,他们首先会判断自己是否为主副本。

  2. 由主副本实例响应 MUTATION 日志并推送 Log 日志
    只有主副本才会响应 MUTATION 日志,主副本会将 MUTATION 日志转换为 LogEntry 日志并推送到 /log 节点,以通知各个副本执行具体的操作。日志的核心信息如下:

    1
    2
    3
    4
    5
    /log/log-000000003
    source replica: ch5.vgbhfive.cn
    block_id:
    type: mutate
    202005_0_1_1 to 202205_0_1_1_2
  3. 各个副本实例分别拉取 Log 日志
    ch5ch6 两个副本分别监听 /log/log-000000002 日志的推送,他们也会分别拉取日志到本地,并推送到各自的 /queue 任务队列:

    1
    Pulling 1 entries to queue: log-0000000002 - log-0000000002
  4. 各个副本实例分别在本地执行 MUTATION
    ch5ch6 基于各自的 /queue 队列开始执行任务:

    1
    Executing log entry to mutate part 202205_0_1_1 to 202205_0_1_1_2

    各个副本开始在本地执行 MUTATION

    1
    2
    Cloning part 202205_0_1_1 to tmp_clone_202205_0_1_1_2
    Renaming temporary part tmp_clone_202205_0_1_1_2 to 202205_0_1_1_2.

至此在 MUTATION 的整个过程中 ZooKeeper 同样不会进行任何实质性的数据传输。所有的 MUTATION 操作,最终都是由各个副本在本地完成的,而 MUTATION 操作是经过 /mutations 节点实现分发的。本着谁执行谁负责的原则,执行命令的副本负责消息推送,但是无论在哪个副本执行最终都会被交由主副本,再由主副本负责推送 Log 日志,以通知各个副本最终的 MUTATION 逻辑,同时也由主副本对日志接收的情况进行监控。

ALTER 执行流程

当对 ReplicatedMergeTree 执行 ALTER 操作的时候,即会进入 ALTER 部分的逻辑,与前几个类似 ALTER 流程会简单许多,其执行流程并不会涉及 /log 日志的分发。

  1. 修改共享元数据
    ch6 节点尝试增加一个列字段,执行语句如下:

    1
    ALTER TABLE replicated_sales_1 ADD COLUMN id2 String

    执行之后,ch6 会修改 ZooKeeper 内的共享元数据节点:

    1
    2
    /metadata, /columns
    Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.

    数据修改之后,节点的版本号也会同时提升:

    1
    Version of metadata nodes in ZooKeeper changed. Waiting for structure write lock.

    与此同时,ch6 还会负责监听所有副本的修改完成情况:

    1
    2
    Waiting for ch5.vgbhfive.cn to apply changes.
    Waiting for ch6.vgbhfive.cn to apply changes.
  2. 监听共享元数据变更并各自执行本地修改
    ch5ch6 两个副本分别监听共享元数据的变更,之后会分别对本地的元数据版本号与共享版本号进行对比。

    1
    2
    Metadata changed in ZooKeeper. Applying changes locally.
    Applied changes to the metadata of the table.
  3. 确认所有副本完成修改
    确认所有副本均已完成修改:

    1
    2
    ALTER finished.
    Done processing query.

至此整个 ALTER 流程结束。在执行过程中,ZooKeeper 没有参与实质性的数据传输,所有的 ALTER 都是各个副本在本地完成的。


分片

通过引入数据副本可以降低数据丢失的风险,并提升查询的性能,但是仍然有一个问题没有解决,那就是数据表的容量问题,到目前为止每个副本都是保存全量的数据。

ClickHouse 中的每个节点都可以称为一个 shard (分片),对于一个完整的方案来说,还需要考虑数据在写入时数据如何被均匀地被写入到各个分片中,以及在数据查询时如何路由到每个分片并组合成结果集,所以 ClickHouse 地数据分片需要结合 Distributed 表引擎一同使用。

Distributed 表引擎自身不存储任何数据,它能够作为分布式表的一层透明代理,在集群内部自动开展数据的写入、分发、查询、路由等工作。

集群的配置方式

ClickHouse 中集群配置用 shard 代表分片,用 replica 代表副本。

1
2
3
4
5
6
7
8
9
10
11
12
# 1 分片 0 副本
<shard><!-- 分片 -->
<replica><!-- 副本 -->
</replica>
</shard>
# 1 分片 1 副本
<shard><!-- 分片 -->
<replica><!-- 副本 -->
</replica>
<replica>
</replica>
</shard>

这样的配置貌似有点问题,其实分片更像是逻辑层面的分组,而无论是分片或时副本,承载他们的都是 replica,所以从某种角度来看,副本也是分片。

集群分布式 DDL

在默认情况下 CREATEDROPRENAMEALTERDDL 语句并不支持分布式执行,但是在加入集群配置后使用新的语法实现分布式 DDL 执行:

1
CREATE/DROP/RENAME/ALTER TABLE OM CLUSTER cluster_name

cluster_name 对应了配置文件中的集群名称,ClickHouse 会根据集群配置信息分别去各个节点执行 DDL 语句。

数据结构

ReplicatedMergeTree 类似,分布式 DDL 语句在执行的过程中也需要借助 ZooKeeper 的协同能力,以实现日志分发。

  1. ZooKeeper 内的节点结构
    默认情况下分布式 DDLZooKeeper 内使用的根路径为:

    1
    /clickhouse/task_queue/ddl

    该地址可以在 config.xml 中的 distributed_ddl 配置指定。
    当然在此节点之下还有其他的监听节点,包含 /query-[seq],其内包含 DDL 操作日志,每执行一次分布式 DDL 查询,在该节点下新增一条操作日志。DDL 操作日志使用 ZooKeeper 的持久顺序型节点,每条指令的名称以 query- 为前缀,后面的序号递增。在该操作日志下,还有两条状态节点:

    • /query-[seq]/active 用于状态监控等用途,在任务的执行过程中,在该节点下会临时保存当前集群内状态为 active 的节点。
    • /query-[seq]/finished 用于检查任务完成情况,在任务的执行过程中,每当集群内的某个 host 执行完毕之后,就会在该节点下写入记录。
  2. DDLLongEntry 日志对象的数据结构
    /query-[seq] 下记录的日志信息由 DDLLogEntry 承载,拥有以下几个核心属性:

    • query 记录 DDL 查询的执行语句
      1
      query: DROP TABLE default.test_1_local ON CLUSTER shard_2
    • hosts 记录指定集群的主机列表,集群由分布式 DDL 语句中的 ON CLUSTER 指定
      1
      hosts: ['ch5.vgbhfive.cn:9000', 'ch6.vgbhfive.cn:9000']
    • initiator 记录初始化 host 主机的名称,主机列表来源于初始化 host 节点上的集群
      1
      initiator: ch5.vgbhfive.cn
  3. 分布式 DDL 的核心执行流程
    click-4-1.png
    整个流程从上到下按照时间顺序进行,其大致分为三个步骤:

    • 推送 DDL 日志,在节点执行语句,本着谁执行谁负责原则,会由这个节点创建 DDLLogEntry 日志并将日志推送到 ZooKeeper ,同时也会由这个节点负责监控任务的执行进度。
    • 拉取日志并执行,其余节点监听到日志推送,于是拉取日志到本地。首先判断自身 host 是否被包含在 DDLLogEntryhosts 列表中,如果包含在内则进入执行流程,执行完毕后将状态写入 finished 节点;如果不包含则忽略这次日子的推送。
    • 确认执行进度,在执行 DDL 语句之后,客户端会阻塞等待 180 秒后,以期望所有 host 执行完毕。如果等待时间超过 180 秒,则进入后台线程继续等待(等待时间由 distributed_ddl_task_timeout 参数执行,默认 180 秒)。

Distributed 原理解析

Distributed 表引擎是分布式表的代名词,它自身不存储任何数据,而是作为数据分片的透明代理,能够自动路由数据至集群中的各个节点,所以 Distributed 表引擎需要和其他数据表引擎一起协同工作。
从实体表层面来看,一张分片由两部分组成:

  • 本地表:通常以 _local 为后缀进行命名。本地表是承接数据的载体,可以使用非 Distributed 的任意表引擎,一张本地表对应了一个数据分片。
  • 分布式表:通常以 _all 为后缀进行命名。分布式表是能使用 Distributed 表引擎,他与本地表形成一对多的映射关系,日后将通过分布式表代理操作多张本地表。

Distirbuted 表引擎采用读时检查,即在查询时才会抛出错误,而不会在创建表时检查。

定义形式

Distributed 表引擎的定义形式如下:

1
ENGINE = Distributed(cluster, database, table [, sharding_key])

其中各个参数的含义如下:

  • cluster:集群名称,与集群配置中的自定义名称相对应。在对分布式表执行写入和查询的过程中,他会使用集群的配置信息来找到相应的 host 节点。
  • dataabsetable:分别对应数据库和表的名称,分布式表使用这组配置映射到本地表。
  • sharding_key:分片键,选填参数。在数据写入的过程中,分布式表会依据分片键的规则,将数据分布到各个 host 节点的本地表。
查询分类

Distributed 表引擎的查询操作分类如下:

  • 会作用于本地表的查询,对于 INSERTSELECT 查询,Distributed 将会以分布式的方式作用于 local 本地表。
  • 只会影响 Distributed 自身,不会作用于本地表的查询: Distributed 支持部分元数据操作,包括 CREATEDROPREANMEALTER,其中 ALTER 并不包括分区的操作。
  • 不支持的查询:Distributed 表不支持任何 MUTATION 类型的操作,包括 ALTER DELETEALTER UPDATE
分片规则

分片键要求返回一个整数类型的取值,包括 Int 类型和 UInt 类型。

1
Distributed(cluster, database, table, userid) # 按照用户 id 的余数划分

如果不声明分片键,那么分布式表则只会有一个分片,意味者只能映射一张本地表。当然如果分布式表只包含一个分片,那也就失去了使用的意义。

关于数据如何被具体的划分,需要明确以下几个概念:

  • 分片权重
    在集群的配置中,还有一项分片权重(weight)的设置:
    1
    2
    3
    4
    5
    6
    <shard>
    <weight>10</weight>
    </shard>
    <shard>
    <weight>20</weight>
    </shard>
    分片权重会影响数据在分片中的倾斜程度,一个分片权重值越大,那么被写入的数据就会越多。
  • slot(槽)
    slot 可以理解成许多个小水槽,如果把数据比作成水的话,那么数据之水会顺着这些水槽流进每个数据分片。slot 的数量等于所有分片的权重之和。
  • 选择函数
    选择函数用于判断一行待写入的数据应该被写入到哪个分片,那整个步骤会被分为两个步骤:
    • 找出 slot 的取值,计算公式如下:
      1
      slot = shard_value - sum_weight
      其中 shard_value 是分片键的取值,sum_weight 是所有分片的权重之和。
    • 基于 slot 值找到对应的数据分片
分布式核心流程
分布式写入流程

在向集群内的分片写入数据时,通常有两种思路:一种是借助外部计算系统,事先将数据均匀分片,再借由计算系统直接写入 ClickHouse集群的各个本地表。第二种则是通过 Distributed 表引擎代理写入分片数据的。

将数据写入分片的核心流程

在对 Distributed 表执行 INSERT 查询的时候,会进入数据写入分片的执行逻辑,可以分为五个步骤,整体流程如下:

  1. 在第一个分片节点写入本地分片数据
    ch5 节点对本地表 test_shard_2_all 执行 INSERT 查询,尝试写入 10, 30, 200, 50 四行数据。执行之后分布式表主要分做两件事情:其一,根据分片规则划分数据;第二,将属于当前分片的数据直接写入本地表 test_shard_2_all

  2. 第一个分片建立远端连接,准备发送远端分片数据
    将归至远端分片的数据以分区为单位,分别写入 test_shard_2_all 存储目录下的临时 bin 文件,数据文件的命名规则如下:

    1
    /database@host:port/[increase_num].bin

    由于在这个示例中只有一个远端分片 ch6,所以临时数据文件如下:

    1
    /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin

    10, 200, 50 三行数据会被写入上述临时数据文件。接着会尝试与远端 ch6 分片建立连接:

    1
    Connection (ch6.vgbhfive.cn) : Connected to ClickHouse server
  3. 第一个分片向远端分片发送数据
    此时会有另一组监听任务负责监听 /test_shard_2_all 目录下的文件变化,这些任务负责将目录数据发送至远端分片:

    1
    2
    test_shard_2_all.Distributed.DirectoryMonitor:
    Started processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin

    其中,每份目录将会由独立的线程负责发送,数据在传输之前会被压缩。

  4. 第二个分片接收数据并写入本地
    ch6 分片节点确认建立与 ch5 的连接:

    1
    2
    TCPHandlerFactory: TCP Request. Address: ch5:459127
    TCPHandler: Connected ClickHouse server

    在接收到 ch5 发送的数据后,将他们写入本地表:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    executeQuery: (from ch5) INSERT INTO default.test_shard_2_local
    -- 第一个分区
    Reserving 1.00 MB on disk 'default'
    Renaming temporary part tmp_insert_10_1_1_0 to 10_1_1_0.
    -- 第二个分区
    Reserving 1.00 MB on disk 'default'
    Renaming temporary part tmp_insert_200_1_1_0 to 200_1_1_0.
    -- 第三个分区
    Reserving 1.00 MB on disk 'default'
    Renaming temporary part tmp_insert_50_1_1_0 to 50_1_1_0.
  5. 由第一个分片确认完成写入
    最后由 ch5 分片确认所有的数据发送完毕:

    1
    Finished processing /test_shard_2_all/default@ch6.vgbhfive.cn:9000/1.bin

至此整个流程结束,Distributed 表负责所有分片的写入工作。在由 Distirbuted 表负责向远端分片发送数据时,有异步和同步两种写模式:如果是异步写入则在 Distributed 表写完本地分片之后,INSERT 查询就会返回成功写入的消息;如果是同步写入则在 INSERT 查询之后,会等待所有分片完成写入。使用何种模式由 insert_distributed_sync 参数控制,默认为 false,即异步写入;如果将其设置为 true,则可以进一步通过 insert_distributed_timeout 参数控制同步等待的超时时间。

副本复制数据流程

除了刚才的分片写入流程之外,还会触发副本数据的复制流程。数据在多个副本之间,有两种复制实现方式:一种是继续借助 Distributed 表引擎,由它将数据写入副本。另一种则是借助 ReplicatedMergeTree 表引擎实现副本数据的分发。

分布式查询流程

与数据写入有所不同,在面向集群查询数据的时候,只能通过 Distributed 表引擎实现。当 Distributed 表接收到 SELECT 查询的时候,他会依次查询每个分片的数据,再合并汇总返回。

  1. 多副本的路由规则
    在查询数据的时候,如果集群中的一个 shard,拥有多个 replica,那么 Distributed 表引擎需要面临副本选择的问题。他会使用负载均衡算法从众多 replica 中选择一个,而具体使用何种负载均衡算法,则由 load_balancing 参数控制:

    1
    load_balancing = random / nearest_hostname / in_order / first_or_random

    有四种负载均衡算法:

    • random
    • nearest_hostname
    • in_order
    • first_or_random
  2. 多分片查询
    分布式查询与分布式写入类似,同样本着谁执行谁负责的原则,他会接收 SELECT 查询的 Distributed 表并负责串联起整个过程。首先他会针对分布式表的 SQL 语句,按照分片数量将查询拆分为若干个本地表的子查询,然后向各个分片发起查询,然后再汇总各个分片的返回结果。


引用


个人备注

此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!