文章目录

  • 一、实验目的
  • 二、实验准备
  • 三、实验内容
      • 1. 运行Zookeeper
      • 2. 运行kafka
      • 3. 创建topics
      • 4. Kafka与MySQL的组合使用,把JSON格式数据放入Kafka发送出去,再从Kafka中获取并写入到MySQL数据库,p97
      • 5. Kafka与MySQL的组合使用,把MySQL数据库数据取出,转化为JSON格式,放入Kafka发送出去,再从Kafka中获取,p100
      • 6. 手动提交偏移量,把JSON格式数据放入Kafka发送出去,p100
      • 7. 消费订阅分区,p100
      • 8. 关闭kafka
      • 9. 关闭Zookeeper
  • 四、实验小结

一、实验目的

  1. 掌握Zookeeper的配置和使用。
  2. 掌握Kafka的配置和使用。
  3. 掌握Kafka的基本的操作命令。

二、实验准备

  1. Zookeeper
  2. Kafka

三、实验内容

1. 运行Zookeeper

打开cmd然后执行zkserver 命令

2. 运行kafka

打开命令提示窗口(win+R,输入cmd,进入),进入E:\kafka\kafka37文件内输入并执行以下命令打开kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

3. 创建topics

打开命令提示窗口,进入E:\kafka\kafka37\bin\windows文件内
创建topics,可以把topic理解为文件夹,partition为topic下面的子文件夹,log在partition下,而消息保存在log中

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic test1

查看topics,这里查看一下是否创建成功

kafka-topics --list --bootstrap-server localhost:9092

查看 topic 的详细信息,partition 数量和副本数量等

kafka-topics --bootstrap-server localhost:9092  --describe --topic test1

修改当前topic分区(只能增加分区,如果减少会报错)

kafka-topics.bat --bootstrap-server localhost:9092 --alter --topic test1 --partitions 4

查看修改后的分区

kafka-topics --bootstrap-server localhost:9092  --describe --topic test1

查看消息组消息队列

kafka-consumer-groups.bat --describe --bootstrap-server localhost:9092 --all-groups

4. Kafka与MySQL的组合使用,把JSON格式数据放入Kafka发送出去,再从Kafka中获取并写入到MySQL数据库,p97

5. Kafka与MySQL的组合使用,把MySQL数据库数据取出,转化为JSON格式,放入Kafka发送出去,再从Kafka中获取,p100

目标:读取student表的数据内容,将其转为JSON格式,发送给Kafka
向student表中插入两条记录的SQL语句如下:

insert into student values(95002,’Fopn’,’M’,22);
insert into student values(95003,’Tom’,’M’,23);

编写一个生产者程序mysql_producer.py

from kafka import KafkaProducer
import json
import pymysql.cursors

producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))


connect=pymysql.Connect(
   host='localhost',
   port=3306,
   user='root',
   passwd='123456',
   db='test',
   charset='utf8'
   )
cursor=connect.cursor()
sql="select sno,sname,ssex,sage from student;"
cursor.execute(sql)
data = cursor.fetchall()
connect.commit()

for msg in data:
   res={}
   res['sno']=msg[0]
   res['name']=msg[1]
   res['sex']=msg[2]
   res['age']=msg[3]
   producer.send('mysql_topic', res)

connect.close()
producer.close()

再从Kafka中获取到JSON格式数据,打印出来;
编写一个消费者程序mysql_consumer.py

from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('mysql_topic',bootstrap_servers=['localhost:9092'],group_id=None,auto_offset_reset='earliest')
for msg in consumer:
   msg1=str(msg.value,encoding="utf-8")
   data=json.loads(msg1)
   print(data)

6. 手动提交偏移量,把JSON格式数据放入Kafka发送出去,p100

data.json文件

[{"sno": "95001", "name": "John1", "sex": "M", "age": 23},
{"sno": "95002", "name": "John2", "sex": "M", "age": 23},
{"sno": "95003", "name": "John3", "sex": "M", "age": 23}]

根据上面给出的data.json文件,执行如下操作:
编写生产者程序,将json文件数据发送给Kafka;
编写一个生产者程序commit_producer.py

from kafka import KafkaProducer
import json # 引入模块
# 打开一个json文件
data = open("./data.json")
# 转换为python对象
strJson = json.load(data)
producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v:json.dumps(v).encode('utf-8'))
producer.send('json_topic', strJson)
producer.close()
编写消费者程序,读取前面发送的数据,并手动提交偏移量;
编写一个消费者程序commit_consumer.py
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
import json

class Consumer():
    def __init__(self):
        self.server = 'localhost:9092'
        self.topic = 'json_topic'
        self.consumer = None
        self.tp = None
        self.consumer_timeout_ms = 5000
        self.group_id = 'test1'

    def get_connect(self):
        self.consumer = KafkaConsumer('json_topic',group_id=self.group_id,auto_offset_reset='earliest',bootstrap_servers=self.server,enable_auto_commit=False,consumer_timeout_ms=self.consumer_timeout_ms)

    def beginConsumer(self):
        now_offset = 0
        while True:
            for message in self.consumer:
                now_offset = message.offset
                data = message.value.decode('utf-8')
                data = json.loads(data)
                print(data)
                self.consumer.commit()
        consumer.close()

c = Consumer()
c.get_connect()
c.beginConsumer()

7. 消费订阅分区,p100

手动创建主题“assign_topic”,分区数量2

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic assign_topic

(1)编写生产者程序,以通用唯一标识符UUID作为消息,发送给主题assign_topic;
编写一个生产者程序assign_producer.py:

from kafka import KafkaProducer,TopicPartition
import time
import uuid


producer = KafkaProducer(bootstrap_servers='localhost:9092')
display_interval = 5

print('Producing messages to topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
start_time = time.time()
while True:
    identifier = str(uuid.uuid4())  
    producer.send('assign_topic', identifier.encode('utf-8'))  
    message_count += 1
    now = time.time()
    if now - start_time > display_interval:
        print('No.%i iter %i messages produced at %.0f messages / second' % (
            display_iteration,
            message_count,
            message_count / (now - start_time)))
        display_iteration += 1
        message_count = 0
        start_time = time.time()

(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据;
编写第一个消费者程序assgin_consumer1.py:

from kafka import KafkaConsumer,TopicPartition
import time
import uuid

display_interval = 5

consumer1 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
consumer1.assign([TopicPartition('assign_topic', 0)])
print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
partitions = set()
start_time = time.time()
while True:
    message = next(consumer1)
    identifier = str(message.value,encoding="utf-8")
    message_count += 1
    partitions.add(message.partition)
    now = time.time()
    if now - start_time > display_interval:
        print('No.%i  %i messages consumed at %.0f messages / second - from partitions %r' % (
            display_iteration,
            message_count,
            message_count / (now - start_time),
            sorted(partitions)))
        display_iteration += 1
        message_count = 0
        partitions = set()
        start_time = time.time()

(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据;
编写第二个消费者程序assgin_consumer2.py:

from kafka import KafkaConsumer,TopicPartition
import time
import uuid

display_interval = 5

consumer2 = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest')
consumer2.assign([TopicPartition('assign_topic', 1)])
print('Consuming messagse from topic assign_topic. Press Ctrl-C to interrupt.')
display_iteration = 0
message_count = 0
partitions = set()
start_time = time.time()
while True:
    message = next(consumer2)
    identifier = str(message.value,encoding="utf-8")
    message_count += 1
    partitions.add(message.partition)
    now = time.time()
    if now - start_time > display_interval:
        print('No.%i  %i messages consumed at %.0f messages / second - from partitions %r' % (
            display_iteration,
            message_count,
            message_count / (now - start_time),
            sorted(partitions)))
        display_iteration += 1
        message_count = 0
        partitions = set()
        start_time = time.time()

8. 关闭kafka

.\bin\windows\kafka-server-stop.bat

9. 关闭Zookeeper

.\bin\windows\zookeeper-server-stop.bat

注意:在运行实验期间,1,2打开的窗口不要关闭

四、实验小结

Kafka在大数据生态系统中的作用?

  • 消息中间件:Kafka作为一个分布式流处理平台,其核心作用是作为数据管道,实现实时数据传输。它可以接收从各种数据源产生的大量事件流数据,并将其高效地分发给下游的数据消费者,如实时处理引擎、搜索引擎索引、数据仓库、报表系统等。
  • 缓冲区:Kafka可以作为临时存储层,允许系统在处理能力与数据生成速度不匹配时起到缓冲作用,减轻系统的压力。
  • 解耦:在大数据处理过程中,Kafka使数据生产者与消费者之间解耦,使得数据生产不受消费速率的影响,同时支持多种消费者类型对同一流数据进行并行处理。
  • 持久化和容错:Kafka提供高持久性保障,即使在故障情况下也能保留数据,这对于需要处理不可丢失的实时事件流的应用至关重要。
  • 流处理和实时分析:结合Spark、Flink等流处理框架,Kafka能够支撑实时数据流处理和复杂事件处理。
    2、Kafka总体架构中各个组件的功能有?
  • Producer(生产者):负责发布消息至Kafka的一个或多个Topic,可以选择将消息发送到特定的分区。
    • Broker(代理):Kafka服务器实例,构成了集群,它们保存和复制消息,处理来自生产者的消息发布请求和消费者的订阅请求。
    • Topic(主题):消息分类的逻辑概念,类似于数据库表,消息按照主题进行归类。
    • Partition(分区):Topic内部进一步细分,每个分区是一个有序且不可变的消息序列,有助于水平扩展和并行处理。
    • Consumer(消费者):从Kafka订阅并消费消息的客户端,属于某个消费者组,共同消费一个主题的所有分区,实现负载均衡。
    • Consumer Group(消费者组):一组具有相同组ID的消费者集合,确保消息在组内公平分配和仅一次消费。
    • ZooKeeper(在较早版本中):提供集群协调服务,管理broker的元数据和消费者组状态,但在较新的Kafka版本中,已逐渐移除对ZooKeeper的依赖,改为使用KRaft协议实现自我管理和协调。
      3、Kafka的主要应用场景?
  • 日志收集:聚合不同系统的日志事件,便于集中分析和监控。
    • 用户行为追踪:记录用户在网站或应用程序上的行为轨迹,用于实时监控、推荐系统或异常检测。
    • 流式处理:作为实时流数据处理管道的一部分,配合Storm、Spark Streaming等工具进行实时数据分析和计算。
    • 消息传递:在微服务架构中作为异步通信机制,实现服务之间的解耦和异步操作。
    • 数据集成:通过Kafka Connect API连接不同的数据源和目标系统,实现实时ETL(抽取、转换、加载)过程。
    • 事件驱动架构:触发基于事件的响应和服务,例如订单状态变更通知、库存更新同步等。
本站无任何商业行为
个人在线分享 » 【数据采集】实验07-Kafka的常用命令及使用
E-->