python操作Kafka
citgpt 2024-10-25 11:50 20 浏览 0 评论
目录
一、python 操作 kafka
1. python 使用 kafka 生产者
2. python 使用 kafka 消费者
3. 使用 docker 中的 kafka
二、python操作kafka细节
2.1 生产者demo
2.2 消费者demo
2.3 消费者(消费者组)
2.4 消费者(读取目前最早可读的消息)
2.5 消费者(手动设置偏移量)
2.6 消费者(订阅多个主题)
2.7 消费者(手动拉取消息)
2.8 消费者(消息挂起与恢复)
2.8.1 消息挂起和恢复的实现
2.8.2 完整示例
2.9 Python创建自定义的Kafka Topic
说明:关于 kafka 的启动与安装,命令行的使用,请查看前面的几篇文章。本篇文章主要描述python 如何操作 kafka
使用 python 操作 kafka 首先安装如下的包
pip install kafka-python # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要安装kafka-python库;弃用kafka库
说明:python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;
下面是将 kafka-python 库中生产者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中
import json
import kafka
class Producer(object):
""" kafka 的生产者模型
"""
_coding = "utf-8"
def __init__(self,
broker='192.168.74.136:9092',
topic="add_topic",
max_request_size=104857600,
batch_size=0, # 即时发送,提高并发可以适当增加,但是会造成消息的延迟;
**kwargs):
"""初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接;
"""
self.broker = broker
self.topic = topic
self.max_request_size = max_request_size
# 实例化生产者对象
self.producer_json = kafka.KafkaProducer(
bootstrap_servers=self.broker,
max_request_size=self.max_request_size,
batch_size=batch_size,
key_serializer=lambda k: json.dumps(k).encode(self._coding), # 设置键的形式使用匿名函数进行转换
value_serializer=lambda v: json.dumps(v).encode(self._coding), # 当需要使用 json 传输地时候必须加上这两个参数
**kwargs
)
self.producer = kafka.KafkaProducer(
bootstrap_servers=broker,
max_request_size=self.max_request_size,
batch_size=batch_size,
api_version=(0, 10, 1),
**kwargs
)
def send(self, message: bytes, partition: int = 0):
"""
写入普通的消息;
Args:
message: bytes; 字节流数据;将字符串编码成 utf-8的格式;
partition: int; kafka 的分区,将消息发送到指定的分区之中;
Returns:
None
"""
future = self.producer.send(self.topic, message, partition=partition)
record_metadata = future.get(timeout=30)
if future.failed(): # 发送失败,记录异常到日志;
raise Exception("send message failed:%s)" % future.exception)
def send_json(self, key: str, value: dict, partition: int = 0):
"""
发送 json 形式的数据;
Args:
key: str; kafka 中键的值
value: dict; 发送的具体消息
partition: int; 分区的信息
Returns:
None
"""
future = self.producer_json.send(self.topic, key=key, value=value, partition=partition)
record_metadata = future.get(timeout=30)
if future.failed(): # 发送失败记录异常;
raise Exception("send json message failed:%s)" % future.exception)
def close(self):
"""
关闭kafka的连接。
Returns:
None
"""
self.producer_json.close()
self.producer.close()
if __name__ == '__main__':
'''脚本调用执行;'''
kafka_obj = Producer()
print(kafka_obj.broker)
kafka_obj.send("自动生成".encode())
发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接kafka
由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;
操作 kafka 的消费者依旧只需要安装上述的第三方依赖包 kafka-python;
下面是将 kafka-python 库中消费者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中
import json
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition
class KConsumer(object):
"""kafka 消费者; 动态传参,非配置文件传入;
kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
"""
_encode = "UTF-8"
def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs):
""" 初始化kafka的消费者;
1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
3. 手动设置偏移量
Args:
topics: str; kafka 的消费主题;
bootstrap_server: list; kafka 的消费者地址;
group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区;
**kwargs: dict; 其他原生kafka消费者参数的;
"""
if bootstrap_server is None:
bootstrap_server = ["192.168.74.136:9092"] # kafka集群的话就写多个
self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server)
exist = self.exist_topics(topics)
if not exist: # 需要的主题不存在;
# 创建一条
self.create_topics(topics)
if partitions is not None:
self.consumer = KafkaConsumer(
bootstrap_servers=bootstrap_server,
group_id=group_id,
# 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展;
**kwargs
)
# print("指定分区信息:", partitions, topics, type(partitions))
self.topic_set = TopicPartition(topics, int(partitions))
self.consumer.assign([self.topic_set])
else:
# 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的;
self.consumer = KafkaConsumer(
topics,
bootstrap_servers=bootstrap_server,
group_id=group_id,
**kwargs
)
def exist_topics(self, topics):
"""
检查 kafka 中的主题是否存在;
Args:
topics: 主题名称;
Returns:
bool: True/False ; True,表示存在,False 表示不存在;
"""
topics_set = set(self.consumer.topics())
if topics not in topics_set:
return False
return True
@staticmethod
def create_topics(topics):
"""
创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server;
Args:
topics: str; 主题的名字;
Returns:
None
"""
producer = KafkaProducer(
bootstrap_servers='192.168.74.136:9092',
key_serializer=lambda k: json.dumps(k).encode('utf-8'),
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
producer.send(topics, key="start", value={"msg": "aaaa"})
producer.close()
def recv(self):
"""
接收消费中的数据
Returns:
使用生成器进行返回;
"""
for message in self.consumer:
# 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移
# print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
# message.topic, message.partition, message.offset, message.key, message.value))
yield {"topic": message.topic, "partition": message.partition, "key": message.key,
"value": message.value.decode(self._encode)}
def recv_seek(self, offset):
"""
接收消费者中的数据,按照 offset 的指定消费位置;
Args:
offset: int; kafka 消费者中指定的消费位置;
Returns:
generator; 消费者消息的生成器;
"""
self.consumer.seek(self.topic_set, offset)
for message in self.consumer:
# print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
# message.topic, message.partition, message.offset, message.key, message.value))
yield {"topic": message.topic, "partition": message.partition, "key": message.key,
"value": message.value.decode(self._encode)}
if __name__ == '__main__':
""" 测试使用;
"""
obj = KConsumer("exist_topic", bootstrap_server=['192.168.74.136:9092'])
for i in obj.recv():
print(i)
该消费者封装时多增加了一个需求,消费的主题不存在的时候会默认创建(创建成功的前提是kafka服务端的设置auto.create.topics.enable=true),下次就可以继续消费
以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;
docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,因此只需要将相关的地址配好,即可;代码信息无需修改;
一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;
host 文件
127.0.0.1 kafka
当需要远程连接的时候,将地址改成该计算机在内网中的地址即可
# 需要安装的库如下
pip install kafka-python
pip install msgpack # msgpack库将数据序列化可以将数据转换为二进制格式,便于在网络中传输
# msgpack替换为python内置的bytes方法更简便 ,如: log = bytes(str(log), encoding="utf-8")
# msgpack的序列化
data = 'test_message'
msgpack.packb(data)
# msgpack的反序列化
packed_data = b'\x83\xa3age\x14\xa3name\xa3Tom\xa3sex\xa1M'
data = msgpack.unpackb(packed_data)
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(bootstrap_servers=['broker1:1234'])
# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError:
# Decide what to do if produce request failed...
log.exception()
pass
# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)
# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')
# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})
# produce asynchronously
for _ in range(100):
producer.send('my-topic', b'msg')
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
log.error('I am an errback', exc_info=excp)
# handle exception
# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)
# block until all async messages are sent
producer.flush()
# configure multiple retries
producer = KafkaProducer(retries=5)
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092']) #参数为接收主题和kafka服务器地址
# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer: # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待直到新消息过来
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
from kafka import KafkaConsumer
# 为定义的消费者指定一个group_id,对于订阅同一个topic来的group来说,能够使得该group中的所有消费者共同消费其中的消息(group中每个消费者消费的是不同的消息),横向扩展处理能力
# 对于订阅同一个topic来的不同group来说,每个消费组都会获取这个topic中的所有消息进行消费
# 如果不指定group ID,consumer会使用一个自动生成的group id。这个group id的格式通常是 __consumer_offsets-<generated_uuid>,其中<generated_uuid>是一个随机生成的唯一通用标识符。那么相当于一个消费者就单独属于一个消费组,尽管它们订阅了同一个topic,也只能一个人单独消费该topic的全部消息
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
auto_offset_reset:重置偏移量,earliest 移到最早的可用消息,latest 最新的消息,默认为 latest
源码定义: {‘smallest’: ‘earliest’, ‘largest’: ‘latest’}
# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])
print(consumer.partitions_for_topic("test")) #获取test主题的分区信息
print(consumer.topics()) #获取主题列表
print(consumer.subscription()) #获取当前消费者订阅的主题
topic_partition_set = consumer.assignment()
print(topic_partition_set) #获取当前消费者topic、分区信息
for topic_partition in topic_partition_set:
print(consumer.beginning_offsets(topic_partition)) # 获取当前消费者可消费的偏移量
offset = consumer.position(topic_partition)
consumer.seek(TopicPartition(topic='test', partition=0), 5) #重置偏移量,从第5个偏移量消费
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
# =======订阅多个消费者==========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0')) #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
from kafka import KafkaConsumer
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
msg = consumer.poll(timeout_ms=5) #从kafka获取消息
print(msg)
time.sleep(2)
# ==============消息恢复和挂起===========
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0)) # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:
print(num)
print(consumer.paused()) #获取当前挂起的消费者
msg = consumer.poll(timeout_ms=5)
print(msg)
time.sleep(2)
num = num + 1
if num == 10:
print("resume...")
consumer.resume(TopicPartition(topic='test', partition=0))
print("resume......")
pause 执行后,consumer不能读取,直到调用 resume 后恢复
import json
import msgpack
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)
# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))
# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)
# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)
# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')
# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers='my.server.com')
client = KafkaClient(bootstrap_servers=brokers)
if topic not in client.cluster.topics(exclude_internal_topics=True): # Topic不存在
request = admin.CreateTopicsRequest_v0(
create_topic_requests=[(
topic,
num_partitions,
-1, # replication unset.
[], # Partition assignment.
[(key, value) for key, value in configs.items()], # Configs
)],
timeout=timeout_ms
)
future = client.send(2, request) # 2是Controller,发送给其他Node都创建失败。
client.poll(timeout_ms=timeout_ms, future=future, sleep=False) # 这里
result = future.value
# error_code = result.topic_error_codes[0][1]
print("CREATE TOPIC RESPONSE: ", result) # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
client.close()
else: # Topic已经存在
print("Topic already exists!")
return
相关推荐
- js中arguments详解
-
一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...
- firewall-cmd 常用命令
-
目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...
- epel-release 是什么
-
EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...
- FullGC详解 什么是 JVM 的 GC
-
前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...
-
2024-10-26 08:50 citgpt
- 跨域(CrossOrigin)
-
1.介绍 1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。 通俗讲...
- 微服务架构和分布式架构的区别
-
1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...
- 深入理解与应用CSS clip-path 属性
-
clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...
-
2024-10-25 11:51 citgpt
- Request.ServerVariables 大全
-
Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...
- python操作Kafka
-
目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的k...
-
2024-10-25 11:50 citgpt
- Runtime.getRuntime().exec详解
-
Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...
- promise.all详解 promise.all是干什么的
-
promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...
-
2024-10-24 16:21 citgpt
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- oracleclient (56)
- springbatch (59)
- oracle恢复数据 (56)
- 简单工厂模式 (68)
- 函数指针 (72)
- fill_parent (135)
- java配置环境变量 (140)
- linux文件系统 (56)
- 计算机操作系统教程 (60)
- 静态ip (63)
- notifyicon (55)
- 线程同步 (58)
- xcode 4 5 (60)
- 调试器 (60)
- c0000005 (63)
- html代码大全 (61)
- header utf 8 (61)
- 多线程多进程 (65)
- require_once (60)
- 百度网盘下载速度慢破解方法 (72)
- 谷歌浏览器免费入口 (72)
- npm list (64)
- 网站打开速度检测 (59)
- 网站建设流程图 (58)
- this关键字 (67)