kafka快速入门

前提

什么是消息系统

消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 而分布式消息传递是基于可靠消息队列的概念,消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 他们大多数消息模式都遵循 pub-sub 模式。

点对点消息系统

在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。 一旦消费者读取队列中的消息,它就从该队列中消失。 该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。

发布 - 订阅消息系统

在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。 一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。


简介

Apache Kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。

Kafka 适合离线和在线消息消费。 Kafka 消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka 构建在ZooKeeper 同步服务之上。 它与Apache Storm 和Spark 非常好地集成,用于实时流式数据分析。

Apache Kafka 起源于LinkedIn,后来于2011年成为开源Apache 项目,然后于2012年成为First-class Apache 项目。Kafka 是用Scala 和Java 编写的。 Apache Kafka 是基于发布订阅的容错消息系统。 它是快速,可扩展和设计分布。

Kafka专为分布式高吞吐量系统而设计。 Kafka 往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka 具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。

优势

  • 高吞吐量、低延迟: kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  • 持久性、可靠性: Kafka 是分布式、分区、复制和容错的。
  • 可扩展性: Kafka 消息传递系统轻松缩放,无需停机。
  • 耐用性: Kafka 使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
  • 性能: Kafka 对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。Kafka 非常快,并保证零停机和零数据丢失。

使用场景

  • 指标: Kafka 通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
  • 日志聚合解决方案: Kafka 可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
  • 流处理: 流行的框架(如Storm和Spark Streaming) 从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。

术语

  • Topics(主题): 属于特定类别的消息流称为主题。 数据存储在主题中。 主题被拆分成分区。 对于每个主题,Kafka 保存一个分区的数据。 每个这样的分区包含不可变有序序列的消息。
  • Partition(分区): 主题可能有许多分区,因此它可以处理任意数量的数据。
  • Partition offset(分区偏移): 每个分区消息具有称为 offset 的唯一序列标识。
  • Replicas of partition(分区备份): 副本只是一个分区的备份。 副本从不读取或写入数据。 它们用于防止数据丢失。
  • Brokers(经纪人):
    • 代理是负责维护发布数据的简单系统。 每个代理中的每个主题可以具有零个或多个分区。 假设,如果在一个主题和N 个代理中有N 个分区,每个代理将有一个分区。
    • 假设在一个主题中有N个分区并且多于N 个代理(n + m) ,则第一个N 代理将具有一个分区,并且下一个M 代理将不具有用于该特定主题的任何分区。
    • 假设在一个主题中有N个分区并且小于N 个代理(n - m) ,每个代理将在它们之间具有一个或多个分区共享。 由于代理之间的负载分布不相等,不推荐使用此方案。
  • Kafka Cluster(Kafka 集群): Kafka 有多个代理被称为Kafka 集群。 可以扩展Kafka 集群,无需停机。 这些集群用于管理消息数据的持久性和复制。
  • Producers(生产者): 生产者是发送给一个或多个Kafka 主题的消息的发布者。
  • Consumers(消费者): 消费者订阅一个或多个主题,并通过从代理中提取数据来使用已发布的消息。
  • Leader(领导者): Leader 是负责给定分区的所有读取和写入的节点。 每个分区都有一个服务器充当Leader 。
  • Follower(追随者): 跟随领导者指令的节点被称为Follower。 如果领导失败,一个追随者将自动成为新的领导者。 跟随者作为正常消费者,拉取消息并更新其自己的数据存储。

工作流程

Kafka 是分为一个或多个分区的主题的集合。 Kafka 分区是消息的线性有序序列,其中每个消息由它们的索引(称为偏移)来标识。 Kafka 集群中的所有数据都是不相连的分区联合。 传入消息写在分区的末尾,消息由消费者顺序读取。 通过将消息复制到不同的代理提供持久性。

Kafka 以快速,可靠,持久,容错和零停机的方式提供基于 pub-sub 和队列的消息系统。 在这两种情况下,生产者只需将消息发送到主题,消费者可以根据自己的需要选择任何一种类型的消息传递系统。

JYLBx1.png

发布 - 订阅消息的工作流程

  • 生产者定期向主题发送消息。
  • Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka 将在第一分区中存储一个消息,在第二分区中存储第二消息。
  • 消费者订阅特定主题。
  • 一旦消费者订阅主题,Kafka 将向消费者提供主题的当前偏移,并且还将偏移保存在Zookeeper 系综中。
  • 消费者将定期请求Kafka (如100 Ms)新消息。
  • 一旦Kafka 收到来自生产者的消息,它将这些消息转发给消费者。
  • 消费者将收到消息并进行处理。
  • 一旦消息被处理,消费者将向Kafka 代理发送确认。
  • 一旦Kafka 收到确认,它将偏移更改为新值,并在Zookeeper 中更新它。 由于偏移在Zookeeper 中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
  • 以上流程将重复,直到消费者停止请求。
  • 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

队列消息/用户组的工作流

在队列消息传递系统而不是单个消费者中,具有相同组ID 的一组消费者将订阅主题。 简单来说,订阅具有相同 Group ID 的主题的消费者被认为是单个组,并且消息在它们之间共享。

实际工作流程:

  • 生产者以固定间隔向某个主题发送消息。
  • Kafka 存储在为该特定主题配置的分区中的所有消息,类似于前面的方案。
  • 单个消费者订阅特定主题,假设 Topic-01 为 Group ID 为 Group-1 。
  • Kafka 以与发布 - 订阅消息相同的方式与消费者交互,直到新消费者以相同的组ID 订阅相同主题 Topic-01 1 。
  • 一旦新消费者到达,Kafka 将其操作切换到共享模式,并在两个消费者之间共享数据。 此共享将继续,直到用户数达到为该特定主题配置的分区数。
  • 一旦消费者的数量超过分区的数量,新消费者将不会接收任何进一步的消息,直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为Kafka 中的每个消费者将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将必须等待。
  • 此功能也称为使用者组。 同样,Kafka 将以非常简单和高效的方式提供两个系统中最好的。

Zookeeper 的作用

Apache Kafka 的一个关键依赖是Apache Zookeeper,它是一个分布式配置和同步服务。 Zookeeper 是Kafka 代理和消费者之间的协调接口。 Kafka 服务器通过Zookeeper 集群共享信息。 Kafka 在Zookeeper 中存储基本元数据。 例如关于主题,代理,消费者偏移(队列读取器)等的信息。

由于所有关键信息存储在Zookeeper 中,并且它通常在其整体上复制此数据,因此Kafka 代理/ Zookeeper 的故障不会影响Kafka 集群的状态。 Kafka 将恢复状态,一旦Zookeeper 重新启动。 这为Kafka 带来了零停机时间。 同时Kafka 代理之间的领导者选举也通过使用Zookeeper 在领导者失败的情况下完成。


基本操作

安装Kafka

  1. 下载并安装Zookeeper
  • 要在您的计算机上安装ZooKeeper框架,请访问以下链接并下载最新版本的ZooKeeper。http://zookeeper.apache.org/releases.html
  • 解压提取文件
  • 修改配置
  • 启动Zookeeper 服务
    1
    zkServer.sh start
  1. 下载并安装Kafka
  1. 启动Kafka (Kafka Broker)
    1
    bin/kafka-server-start.sh config/server.properties

简单生产者与消费者示例

这里我们仅使用单节点单代理的配置,首先我们需要安装Java 并启动Zookeeper 、和Kafka 。

  • 启动Zookeeper

    1
    zkServer.sh start
  • 启动Kafka Broker

    1
    bin/kafka-server-start.sh config/server.properties
  • 创建Topic 主题

    1
    2
    3
    4
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name # 语法

    # 示例
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka

    创建了一个名为Hello-Kafka 的主题,其中包含一个分区和一个副本因子。

  • 查看已存在的注意列表

    1
    bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 启动生产者产生消息

    1
    2
    3
    4
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name # 语法

    # 示例
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
  • 启动消费者消费消息

    1
    2
    3
    4
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning # 语法

    # 示例
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning
  • 结果展示
    生产者:

    1
    2
    3
    Hello
    My first message
    My second message

消费者:

1
2
3
Hello
My first message
My second message

代码示例

消息事务

在Kafka 中设计事务的目的主要是为了满足读取-处理-写入这种模式的应用程序。这种模式下数据的读写是异步的,比如Kafka 的Topics ,这种应用程序更广泛的被称之为流处理应用程序。

多分区原子写入

事务能够保证Kafka topic 下每个分区的原子写入。 事务中所有的消息都将被成功写入或者丢弃。

在Kafka中,我们通过写入一个名为offsets topic 的内部Kafka topic 来记录offset commit 。消息仅在其offset 被提交给offsets topic 时才被认为成功消费。

由于offset commit 只是对Kafka topic 的另一次写入,并且由于消息仅在提交偏移量时被视为成功消费,所以跨多个主题和分区的原子写入也启用原子“读取-处理-写入”循环:提交偏移量X到offset topic和消息B到tp1的写入将是单个事务的一部分,所以整个步骤都是原子的。

僵尸实例

我们通过为每个事务Producer 分配一个称为transactional.id 的唯一标识符来解决僵尸实例的问题。在进程重新启动时能够识别相同的Producer 实例。

读事务消息

Kafka 保证Consumer 最终只能提供非事务性消息或提交事务性消息。它将保留来自未完成事务的消息,并过滤掉已中止事务的消息。

零拷贝(备份机制)

Kafka 的备份的单元是Partition ,也就是每个Partition 都都会有leader Partiton 和follow Partiton 。其中leader Partition 是用来进行和Producer 进行写交互,follow 从leader 副本进行拉数据进行同步,从而保证数据的冗余,防止数据丢失的目的。

这同时也是Kafka 高可靠性的原因之一,在leader Partition 出现异常之后,follow 之间会重新选举出一个leader Partition 接替之前异常的leader Partition继续提供服务,并且由于备份机制的存在,follow 和leader 之间的数据也不会存在差异。

关于标题为什么是零拷贝,这是因为Kafka 在实现leader 和follow 之间备份的时候采用了零拷贝的技术。

传统的读取文件并发送到网络的步骤:

  • 操作系统将数据从磁盘文件中读取到内核空间的页面缓存;
  • 应用程序将数据从内核空间读入用户空间缓冲区;
  • 应用程序将读到数据写回内核空间并放入socket缓冲区;
  • 操作系统将数据从socket缓冲区复制到网卡接口,此时数据才能通过网络发送。

零拷贝技术:
JNw2C9.jpg
零拷贝技术只用将磁盘文件的数据复制到页面缓存中一次,然后将数据从页面缓存直接发送到网络中(发送给不同的订阅者时,都可以使用同一个页面缓存),避免了重复复制操作。


高级操作

整合Storm

Spark 集成

官方工具

官方工具分为系统工具和复制工具。

系统工具

可以使用运行类脚本从命令行运行系统工具。

1
bin/kafka-run-class.sh package.class - - options
  • Kafka迁移工具: 此工具用于将代理从一个版本迁移到另一个版本。
  • Mirror Maker: 此工具用于向另一个Kafka集群提供镜像。
  • 消费者偏移检查器: 此工具显示指定的主题和使用者组的消费者组,主题,分区,偏移量,日志大小,所有者

复制工具

Kafka复制是一个高级设计工具。 添加复制工具的目的是为了更强的耐用性和更高的可用性。

  • 创建主题工具: 这将创建一个带有默认分区数,复制因子的主题,并使用Kafka的默认方案进行副本分配。
  • 列表主题工具: 此工具列出了指定主题列表的信息。 如果命令行中没有提供主题,该工具将查询Zookeeper以获取所有主题并列出它们的信息。 工具显示的字段是主题名称,分区,leader,replicas,isr。
  • 添加分区工具: 创建主题,必须指定主题的分区数。 稍后,当主题的卷将增加时,可能需要用于主题的更多分区。 此工具有助于为特定主题添加更多分区,还允许手动复制分配已添加的分区。

后记

使用Kafka 应用

Kafka支持许多当今最好的工业应用。

  • Twitter
    Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。 注册用户可以阅读和发布tweet,但未注册的用户只能阅读tweets。 Twitter使用Storm-Kafka作为其流处理基础架构的一部分。

  • LinkedIn
    Apache Kafka在LinkedIn中用于活动流数据和操作度量。 Kafka消息系统帮助LinkedIn的各种产品,如LinkedIn Newsfeed,LinkedIn今天的在线消息消费,以及离线分析系统,如Hadoop。 Kafka的强耐久性也是与LinkedIn相关的关键因素之一。

  • Netflix
    Netflix是美国跨国公司的按需流媒体提供商。 Netflix使用Kafka进行实时监控和事件处理。

  • Mozilla
    Mozilla是一个自由软件社区,由Netscape成员于1998年创建。 Kafka很快将更换Mozilla当前生产系统的一部分,以从最终用户的浏览器收集性能和使用数据,如遥测,测试试验等项目。

  • Oracle
    Oracle通过其名为OSB(Oracle Service Bus)的Enterprise Service Bus产品提供与Kafka的本地连接,该产品允许开发人员利用OSB内置中介功能实现分阶段的数据管道。

官方资源

引用

https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
https://www.cnblogs.com/likehua/p/3999538.html

总结

学习,学习,还是学习。


个人备注

此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!