侧边栏壁纸
博主头像
XiaoLin's Blog博主等级

XiaoLin的个人博客~

  • 累计撰写 33 篇文章
  • 累计创建 33 个标签
  • 累计收到 1 条评论

目 录CONTENT

文章目录

kafka 的基本使用

XiaoLin
2024-08-20 / 0 评论 / 1 点赞 / 36 阅读 / 16667 字
温馨提示:
本文最后更新于 2024-08-22,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Kafka 介绍

Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年贡献给 Apache 软件基金会。Kafka 主要用于构建实时数据管道和流处理应用程序,能够高效地处理大量数据流。

Kafka 的核心概念

  1. 主题(Topic):消息的分类或名称。生产者将消息发送到特定的主题,而消费者从主题中读取消息。
  2. 生产者(Producer):负责向 Kafka 主题发布消息的应用程序。
  3. 消费者(Consumer):从 Kafka 主题中订阅和读取消息的应用程序。
  4. 消费者组(Consumer Group):一组消费者共同消费一个或多个主题中的消息,实现负载均衡。每条消息只会被同一组内的一个消费者消费。
  5. 代理(Broker):Kafka 集群中的服务器,负责存储数据并提供客户端的读写请求。
  6. 分区(Partition):每个主题可以分为多个分区,以提高并行性和吞吐量。每个分区是有序的,并且可以独立地进行读写操作。
  7. 偏移量(Offset):每条消息在分区中的唯一标识符,表示该消息在分区中的位置。

Kafka 的特点

  • 高吞吐量:Kafka 可以处理大量的数据,每秒可以处理数百万条消息。
  • 可扩展性:通过增加更多的代理节点,可以轻松扩展 Kafka 集群。
  • 持久性:数据被持久化到磁盘中,即使系统崩溃也不会丢失。
  • 容错性:Kafka 内置复制机制,可以确保数据在某些节点故障时依然可用。
  • 实时性:Kafka 支持低延迟的数据传输,适合实时数据处理场景。

使用场景

  • 日志聚合
  • 实时分析和监控
  • 数据流处理
  • 消息队列
  • 网站活动跟踪
  • 流媒体处理

Kafka 的基本概念

Topic - 主题

在 Kafka 中,主题(Topic)是用于对消息进行分类的逻辑分组。每个主题可以看作是一个消息类别,生产者会将特定类型的消息发送到相应的主题,而消费者会订阅这些主题以接收消息。

主题的特点:

  1. 多分区支持:每个主题可以被划分为多个分区(Partition)。这意味着同一主题的数据可以被分散到不同的分区上,从而提高并行处理能力和吞吐量。不同的生产者和消费者可以同时向同一个主题的不同分区执行读写操作。
  2. 日志结构:每个分区是一个有序且不可变的日志序列,新的消息会被追加到日志末尾。消费者通过偏移量(Offset)来跟踪自己已经读取到的位置。
  3. 复制机制:为了保证数据可靠性,每个分区的数据可以在集群中的多个代理节点上进行复制。这样即使某些节点发生故障,数据也能从其他副本中恢复。
  4. 持久化存储:Kafka 将每个主题的消息持久化存储到磁盘中,这样即便系统出现故障,也不会导致数据丢失。
  5. 灵活消费:不同的消费者组可以独立地消费同一主题的数据,并且互不影响。这允许多个应用程序或服务使用相同的数据流但以不同方式处理这些数据。

使用场景:

  • 不同行业和领域中需要对海量实时数据进行分类、传输和处理时,可以利用 Kafka 的主题功能来实现高效的数据管理和传递。
  • 在微服务架构中,通过将相关服务之间需要交换的数据定义为特定的主题,可以实现松耦合和高效通信。
  • 在分析系统中,将原始数据流按来源或类型划分为不同的主题,以便于分别处理和分析。
  1. 多分区支持:每个主题可以划分为多个分区(Partition),这意味着同一主题的数据可以分散到不同的分区中,从而提高并行处理能力和吞吐量。不同的生产者和消费者可以同时对同一主题的不同分区进行读写操作。
  2. 日志结构:每个分区是一个有序且不可变的日志序列,新的消息会被追加到日志末尾。消费者通过偏移量(Offset)来跟踪已经读取到的位置。
  3. 复制机制:为了保证数据可靠性,可以在集群中的多个代理节点上复制每个分区的数据。这样即使某些节点发生故障,数据也能从其他副本中恢复。
  4. 持久化存储:Kafka 将每个主题的消息持久化存储到磁盘中,即便系统出现故障,也不会导致数据丢失。
  5. 灵活消费:不同的消费者组可以独立地消费同一主题的数据,并且互不影响。这允许多个应用程序或服务使用相同的数据流,但以不同方式处理这些数据。

Partion - 分区

Kafka 的分区是其核心机制之一,使其能够实现高吞吐量和可扩展性。每个 Kafka 主题可以被划分为多个分区,每个分区都是一个有序且不可变的消息序列,并不断追加新消息。以下是关于 Kafka 分区的一些关键点:

  1. 并行处理:通过将主题划分为多个分区,Kafka 可以在多台服务器上并行处理数据,提高数据处理能力和速度。
  2. 负载均衡:不同的消费者可以消费不同的分区,实现负载均衡。这意味着多个消费者可以同时读取同一主题的数据,但从不同的分区读取。
  3. 顺序保证:在单个分区内,消息是有序的,即消费者从特定分区读取消息时,会按生产者发送的顺序接收。但不同分区之间没有顺序保证。
  4. 容错性:每个分区可以配置副本,这些副本存储在不同的代理节点上,以确保即使某个节点故障,数据也不会丢失。
  5. 伸缩性:通过增加分区数量,可以轻松扩展 Kafka 系统以处理更多数据和更高吞吐量。不过,在主题创建后增加分区可能会导致原有数据重新平衡,从而打破现有的数据消费顺序。
  6. 偏移量管理:每个消费者在每个分区中都有一个偏移量,用于跟踪已消费到哪条消息。偏移量可以手动提交,也可以自动管理。

replication-factor

Kafka 的 replication-factor 是一个关键配置参数,决定了主题(topic)在集群中每个分区(partition)的副本数量。具体来说:

  1. 可靠性:增加 replication-factor 可以提高数据的可靠性和可用性。当一个 broker(Kafka 服务器节点)不可用时,其他副本可以确保数据仍然可访问。
  2. 故障恢复:如果某个分区的 leader 副本故障,Kafka 可以选举另一个副本为 leader,实现快速故障恢复。
  3. 配置建议:通常建议将 replication-factor 设置为至少 3。这意味着即使有两个 broker 出现问题,数据仍然是安全的。
  4. 注意事项replication-factor 不应超过集群中的 broker 数量,否则 Kafka 无法为每个分区创建足够的副本。如果集群有 5 个 broker,则 replication-factor 最大只能设置为 5。

如当分区值设置为 3 时,会选择一个副本做为主分区(leader),所有数据的写入都是写入到主分区之中,数据的读取也是从主分区读取。
其他两个分支称之为从分区(follwer),这两个分区仅从主分区中复制数据,保持数据的一致性。
kafka 会监控几个副本之间数据同步的状态,这个维护的状态集合就是 ISR
在上图的例子之中,我们需要同步数据的节点有 101、102、103,所以 ISR 之中就存储了这些节点的名称。
如果某个副本不能正常的同步数据,kafka 会将他们剔除直到再次跟上同步进度后才会加回 ISR 之中。
当分区值设置为 3 时,会选择一个副本作为主分区(leader),所有数据的写入和读取都通过主分区进行。其他两个副本称为从分区(follower),它们仅从主分区复制数据,以保持数据一致性。

Kafka 会监控各个副本之间的数据同步状态,维护的状态集合称为 ISR。在上图的例子中,需要同步数据的节点有 101、102 和 103,所以 ISR 中存储了这些节点的名称。

如果某个副本无法正常同步数据,Kafka 会将其移出 ISR,直到它再次跟上同步进度后才会被重新加入。

Broker - 消息代理

Kafka 集群中的 Broker 是一个重要的组件。每个 Kafka 集群由多个 Broker 组成,它们负责接收、存储和转发消息数据。Broker 的主要职责包括:

  1. 消息存储:Broker 将接收到的消息持久化在磁盘上,以确保数据的可靠性和持久性。
  2. 分区管理:每个主题(topic)可以被划分为多个分区(partition),这些分区分布在不同的 Broker 上。Broker 负责管理其上存储的分区,确保数据的可用性和负载均衡。
  3. 消费者服务:Broker 处理来自消费者的请求,为它们提供所需的数据流。消费者可以从 Broker 中读取不同主题和分区的数据。
  4. 副本管理:为了提高容错性,每个分区通常有多个副本(replica),这些副本存储在不同的 Broker 上。Kafka 使用“领导者-追随者”机制来管理这些副本,其中一个 Broker 被选为该分区的领导者。
  5. 故障恢复:如果某个 Broker 出现故障,Kafka 会自动将其上的领导者角色转移到其他正常运行的 Broker 上,以确保集群继续正常运作。

通过集成多个 Broker,Kafka 实现了高吞吐量、可扩展性和高容错性,使其成为一种强大的流处理平台。

监听器和内外部网络

在 Kafka 中,监听器(Listener)用于配置 Kafka 集群与外部客户端的通信方式。监听器定义了客户端如何连接到 Kafka 集群中的各个代理(Broker)。Kafka 支持多种协议,通过配置不同的监听器,可以让 Kafka 代理在不同的网络接口上提供服务,这对于支持内外部网络访问尤为重要。

以下是关于监听器和内外部网络的一些关键点:

  1. 监听器类型

    • PLAINTEXT:不使用加密或认证的简单文本连接。
    • SSL:通过 SSL/TLS 加密的安全连接。
    • SASL_PLAINTEXT 和 SASL_SSL:结合 SASL 认证机制进行身份验证,分别支持非加密和加密通信。
  2. 多监听器配置

    • Kafka 支持在同一个代理上配置多个监听器,你可以为内部流量和外部流量指定不同的网络接口和协议。
    • 通常,你会看到类似 PLAINTEXT://localhost:9092 用于本地访问,以及 SSL://<external-host>:9093 用于外部访问。
  3. 内外部网络访问

    • 内部网络通常用于集群内部节点之间及与内部应用程序之间的通信。
    • 外部网络则用于允许来自互联网或公司广域网(WAN)的客户端进行访问。
  4. 广告监听器(Advertised Listeners)

    • 广告监听器用于告诉客户端如何连接到代理。尤其是在使用 Docker 或 Kubernetes 等环境时,这一点尤为重要,因为容器内部地址可能与容器外部地址不同。
    • 配置项 advertised.listeners 用于指定客户端应使用哪个地址进行连接。
  5. 示例配置

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
advertised.listeners=PLAINTEXT://internal-host:9092,SSL://external-host:9093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
inter.broker.listener.name=PLAINTEXT

在这个示例中,Kafka 将同时在本地所有可用 IP 地址上侦听端口 9092 和 9093,但它将对内广告其内网地址,而对外广告其公网或特定外网地址。

  1. 安全性注意事项
    • 在暴露给公共互联网时确保使用加密(如 SSL/TLS)。
    • 使用适当的认证机制(如 SASL)来保护 Kafka 集群不被未授权用户访问。

Kafka 的消息模型

在 Kafka 中,分区是最小的并行单位。一个消费者可以消费多个分区。一个分区可以被多个消费者组里的消费者消费。但是,一个分区不能被同一个消费者组里的多个消费者消费。

发布订阅

根据分区与消费者组的概念我们知道,如果要想实现发布订阅模式,即每条发布的消息都需要被每个消费者消费,那么我们可以将每个消费者都设置为不同的消费组,即可实现发布订阅模式。

点对点(一对一)

将所有消费者都放在一个组里即是点对点模式,同时他也是一个负载均衡的模式

分区与消息的顺序

在 Apache Kafka 中,分区和消息顺序是两个密切相关的概念。

  1. 单个分区内的顺序:在一个特定的分区内,Kafka 保证消息是按照它们被写入的顺序进行存储和读取的。这意味着,如果你发送了两条消息 A 和 B 到同一个分区,并且 A 先于 B,那么消费者将总是先看到 A 然后再看到 B。
  2. 跨分区的无序性:如果一个主题有多个分区,Kafka 不保证跨不同分区之间的消息顺序。这意味着,如果两条消息发送到了不同的分区,消费者可能会以与它们被生产者发送时不同的顺序接收到这些消息。
  3. 生产者控制顺序:生产者可以通过指定相同的键(key)来确保具有相同键的所有消息都发送到同一个分区,从而保持该子集内消息的一致顺序。

实践中的考虑

  • 确定正确数量的分区:在设计 Kafka 主题时,需要根据应用程序的吞吐量需求和并行处理能力来决定合适数量的分区。

  • 使用键来控制顺序:如果应用需要保证某些子集的数据保持严格顺序,可以使用键控策略将这些数据固定到特定的分区中。

  • 处理无序数据:对于跨多个分区消费的数据流,应用程序需要实现机制来处理可能出现的数据无序问题,比如在消费端进行排序或使用某种标记机制以重建原始顺序。

Kafka 快速部署和使用

Docker 部署

我们可以使用 Docker 来快速部署我们的 kafka 学习环境,这里使用 bitnami 所封装的镜像,已经对 kafka 所有常用配置进行了环境变量封装,我们可以很轻易的使用 Docker Compose 一键运行起来。

Kafka Raft 模式是 Apache Kafka 中的一种新的集群协议实现,用于管理元数据和日志复制。自 Kafka 2.8 版本开始引入,Kafka Raft 模式(KRaft)旨在替代之前依赖于 Apache ZooKeeper 的架构,从而简化部署、提高可扩展性和增强系统的可靠性。

以下是 Kafka Raft 模式的一些关键特性:

  1. 去中心化的元数据管理:在 KRaft 中,元数据不再依赖于 ZooKeeper,而是由 Kafka 自身的控制器节点来管理。这些控制器节点使用 Raft 共识算法来确保元数据的一致性和高可用性。
  2. Raft 共识算法:Raft 是一种一致性算法,用于在分布式系统中实现日志复制和领导者选举。通过使用 Raft,Kafka 可以更高效地处理元数据更新,并在节点故障时更快地进行故障恢复。
  3. 简化运维:由于不再需要维护 ZooKeeper 集群,运维复杂度降低。所有的操作都可以通过 Kafka 自身的 API 完成。
  4. 更好的可扩展性:KRaft 提升了 Kafka 的可扩展能力,尤其是在大规模集群中。在这种模式下,Kafka 能够更有效地处理大量主题和分区。
  5. 改进的容错能力:由于采用了强一致性的 Raft 算法,KRaft 提供了更好的容错能力,即使在网络分区或节点故障情况下,也能保证系统的一致性和可用性。

目前kafka 的最新版本中,Raft 模式已经是默认的分布式协调协议,无需在额外的部署 Zookeeper 即可搭建 Kafka

镜像主页:bitnami/kafka - Docker Image | Docker Hub
使用以下docker-compose.yml一键部署

version: "3.9"
services:
  kafka:
    image: "bitnami/kafka:latest"
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    volumes:
      - ./volume/data:/bitnami/kafka

networks:
  default:
    name: dev
    external: true

基本使用

kafka 默认并没有携带 web 控制台,他对 Topic、副本、分区等操作是通过 CLI 脚本提供的,他就是 kafka-topics.sh,我们可以通过 kafka-topics.sh --help 来查看所有的命令帮助。

创建 Topic

要使用kafka-topics.sh创建一个 Kafka Topic,并查看可用的选项,你可以使用以下命令格式:

kafka-topics.sh --create --topic <topic_name> --bootstrap-server <kafka_broker>

以下是一些创建 Topic 时常用的可选参数:

  • --replication-factor <replication_factor>:指定副本因子,即每个分区在集群中的副本数量。
  • --partitions <num_partitions>:指定分区数量。
  • --config <key=value>:设置主题的配置项。例如,--config cleanup.policy=compact

完整的命令示例:

kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 2 --config cleanup.policy=compact

这将创建一个名为 “my-topic” 的 Topic,具有3个分区和2个副本,且配置了清理策略为紧凑型。你可以根据需要调整这些参数。

然后可以使用 --describe 查看目标 Topic 的状态

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

使用生产者和消费者

  1. 创建消费者
kafka-console-consumer.sh --bootstrap-server <kafka_broker> --topic <topic_name>

以下是一些常用的选项,你可以根据需求来配置消费者:

  • --bootstrap-server <kafka_broker>:指定Kafka集群的地址和端口。
  • --topic <topic_name>:指定要消费的主题名称。
  • --from-beginning:如果你想从主题的开头开始读取消息,可以添加这个选项。
  • --group <group_id>:指定消费者组ID,以便启用消费者组的管理。
  • --max-messages <num_messages>:限制消费的消息数量。

完整的命令示例:

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning

以上命令将在本地主机上的Kafka集群中,从主题"my-topic"的开头开始消费消息。另外,如果你想确认消费者组的管理,可以指定--group

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --group my-consumer-group

这样,你的消费者会被添加到名为"my-consumer-group"的消费者组中进行管理。选择合适的参数以满足你的使用需求。

使用生产者发送消息

基本命令格式

kafka-console-producer.sh --bootstrap-server <kafka_broker> --topic <topic_name>

输入命令
使用正确的Kafka broker地址和目标主题名称,输入以下命令:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic
  • --bootstrap-server localhost:9092:指定Kafka broker的地址和端口。
  • --topic my-topic:指定要发送消息的主题名称。

可选参数

  • --producer-property <key=value>:设置生产者的配置属性。例如,--producer-property acks=all

完整的命令示例,包含可选参数:

kafka-console-producer.sh --bootstrap-server localhost:9092 --topic my-topic --producer-property acks=all

这种方式非常适合快速测试消息的发送,确保Kafka集群和主题配置正确。如果需要发送特定格式的数据或进行复杂的生产逻辑,可能需要编写自定义的生产者代码。

1

评论区