这里通过两种方式测试下kafka集群,区别只是一个是新起一个容器进行测试,另一个则是在原来的基础进行测试:

一、方式一

新起一个容器

1.运行一个kafka-client,用于连接kafka集群

[root@k8s-master01 kafka]#  kubectl run kafka-client --restart='Never' --image registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1 --namespace public-service --command -- sleep infinity

上面参数说明:

  • kubectl run kafka-client: 使用 kubectl 命令创建一个名为 kafka-client 的 Pod
  • --restart='Never': 设置 Pod 的重启策略为 "Never",这意味着 Pod 不会自动重启
  • --image registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1: 指定要在 Pod 中使用的容器镜像。这里使用的是 registry.cn-hangzhou.aliyuncs.com/abroad_images/kafka:3.5.0-debian-11-r1 镜像
  • --namespace public-service: 指定要在名为 public-service 的命名空间中创建 Pod
  • --command -- sleep infinity: 在容器中执行命令 sleep infinity,以保持 Pod 持续运行。--command 表示后面的内容是一个命令而不是一个参数,sleep infinity 是一个常用的命令,使得容器无限期地休眠

查看pod,已成功建立

[root@k8s-master01 kafka]# kubectl get po  -n  public-service
NAME           READY   STATUS    RESTARTS   AGE
kafka-0        1/1     Running   0          163m
kafka-1        1/1     Running   0          163m
kafka-2        1/1     Running   0          163m
kafka-client   1/1     Running   0          52s
zookeeper-0    1/1     Running   0          3h4m
zookeeper-1    1/1     Running   0          3h4m
zookeeper-2    1/1     Running   0          3h4m

2.在k8s-master01节点上开启两个窗口,一个用于生产者,一个用作消费者。 (1)生产者窗口

进入kafka创建一个名为test的topic,出现>代表成功

[root@k8s-master01 kafka]# kubectl exec -it  kafka-client -n  public-service -- bash
I have no name!@kafka-client:/$ cd /opt/bitnami/kafka/bin
I have no name!@kafka-client:/opt/bitnami/kafka/bin$ kafka-console-producer.sh --broker-list kafka-0.kafka-headless.public-service.svc.cluster.local:9092,kafka-1.kafka-headless.publiservice.svc.cluster.local:9092,kafka-2.kafka-headless.public-service.svc.cluster.local:9092 --topic test
>

上面参数说明:

  • kafka-console-producer.sh:用于创建生产者

  • --broker-list kafka-0.kafka-headless.public-service.svc.cluster.local:9092,kafka-1.kafka-headless.public-service.svc.cluster.local:9092,kafka-2.kafka-headless.public-service.svc.cluster.local:9092:指定要连接的 Kafka Broker 列表。使用逗号分隔多个 Broker 的地址。在这里,指定了三个 Kafka Broker 的地址

  • --topic test:指定要发布消息的主题名称,这里使用的是 "test"

(2)消费者窗口

[root@k8s-master01 kafka]# kubectl exec -it  kafka-client -n  public-service -- bash
I have no name!@kafka-client:/$ cd  /opt/bitnami/kafka/bin/
I have no name!@kafka-client:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server kafka.public-service.svc.cluster.local:9092  --topic test  --from-beginning

上面参数说明:

  • kafka-console-consumer.sh:用于启动消费者
  • --bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口
  • --topic test:指定要发布消息的主题名称,这里使用的是 "test"
  • --from-beginning:设置消费者从主题的开始处开始消费消息。这意味着消费者将从主题中的最早可用消息开始消费

3.开始测试,观察到消费正常

(1)生产者窗口

>test2
>test1
>

(2)消费者窗口

test2
test1

二、方式二

1.进入kafka创建一个名为testtopic的topic

[root@k8s-master01 kafka]# kubectl exec -it kafka-0 -n public-service -- bash
[root@k8s-master01 kafka]# cd /opt/bitnami/kafka/bin
I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic testtopic
Created topic testtopic.

上面参数说明:

  • --create:指示 kafka-topics.sh 命令创建一个新的主题
  • kafka-topics.sh:用于创建topic
  • --bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口
  • --replication-factor 1:设置主题的副本因子(replication factor),指定每个分区的副本数量。
  • --partitions 1:设置主题的分区数,指定要创建的分区数量
  • --topic testtopic:指定要创建的主题的名称,这里使用的是 "testtopic"

2.启动消费者

I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic

上面参数说明:

  • kafka-console-consumer.sh:用于创建消费者
  • --bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口

3.新起一个窗口后,进入kafka,启动一个生产者后,输出hello字段

[root@k8s-master01 kafka]# kubectl exec -it kafka-0 -n public-service -- bash
I have no name!@kafka-0:/$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic testtopic
>hello

上面参数说明:

  • kafka-console-consumer.sh:用于创建生产者
  • --bootstrap-server localhost:9092:指定用于引导连接到 Kafka 集群的 Kafka Broker 的地址。使用的是本地主机(localhost)上的 Kafka Broker,并监听 9092 端口

4.在消费者窗口上进行查看,观察到消费正常

I have no name!@kafka-0:/opt/bitnami/kafka/bin$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testtopic

hello