百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

python操作Kafka

citgpt 2024-10-25 11:50 20 浏览 0 评论

目录

一、python 操作 kafka

python操作Kafka

1. python 使用 kafka 生产者

2. python 使用 kafka 消费者

3. 使用 docker 中的 kafka

二、python操作kafka细节

2.1 生产者demo

2.2 消费者demo

2.3 消费者(消费者组)

2.4 消费者(读取目前最早可读的消息)

2.5 消费者(手动设置偏移量)

2.6 消费者(订阅多个主题)

2.7 消费者(手动拉取消息)

2.8 消费者(消息挂起与恢复)

2.8.1 消息挂起和恢复的实现

2.8.2 完整示例

2.9 Python创建自定义的Kafka Topic

说明:关于 kafka 的启动与安装,命令行的使用,请查看前面的几篇文章。本篇文章主要描述python 如何操作 kafka

使用 python 操作 kafka 首先安装如下的包

pip install kafka-python  # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要安装kafka-python库;弃用kafka库

说明:python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;


下面是将 kafka-python 库中生产者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中


 import json

 import kafka


 

 class Producer(object):

     """ kafka 的生产者模型

     """

 

     _coding = "utf-8"

 

     def __init__(self,

                  broker='192.168.74.136:9092',

                  topic="add_topic",

                  max_request_size=104857600,

                  batch_size=0,  # 即时发送,提高并发可以适当增加,但是会造成消息的延迟;

                  **kwargs):

         """初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接;

         """

         self.broker = broker

         self.topic = topic

         self.max_request_size = max_request_size

         # 实例化生产者对象

         self.producer_json = kafka.KafkaProducer(

             bootstrap_servers=self.broker,

             max_request_size=self.max_request_size,

             batch_size=batch_size,

             key_serializer=lambda k: json.dumps(k).encode(self._coding),  # 设置键的形式使用匿名函数进行转换

             value_serializer=lambda v: json.dumps(v).encode(self._coding),  # 当需要使用 json 传输地时候必须加上这两个参数

             **kwargs

         )

 

         self.producer = kafka.KafkaProducer(

             bootstrap_servers=broker,

             max_request_size=self.max_request_size,

             batch_size=batch_size,

             api_version=(0, 10, 1),

             **kwargs

         )

 

     def send(self, message: bytes, partition: int = 0):

         """

         写入普通的消息;

         Args:

             message: bytes; 字节流数据;将字符串编码成 utf-8的格式;

             partition: int; kafka 的分区,将消息发送到指定的分区之中;

         Returns:

             None

         """

         future = self.producer.send(self.topic, message, partition=partition)

         record_metadata = future.get(timeout=30)

         if future.failed():  # 发送失败,记录异常到日志;

             raise Exception("send message failed:%s)" % future.exception)

 

     def send_json(self, key: str, value: dict, partition: int = 0):

         """

         发送 json 形式的数据;

         Args:

             key: str; kafka 中键的值

             value: dict; 发送的具体消息

             partition: int; 分区的信息

         Returns:

             None

         """

         future = self.producer_json.send(self.topic, key=key, value=value, partition=partition)

         record_metadata = future.get(timeout=30)

         if future.failed():  # 发送失败记录异常;

             raise Exception("send json message failed:%s)" % future.exception)

 

     def close(self):

         """

         关闭kafka的连接。

         Returns:

             None

         """

         self.producer_json.close()

         self.producer.close()

 

 

 if __name__ == '__main__':

     '''脚本调用执行;'''

     kafka_obj = Producer()

     print(kafka_obj.broker)

     kafka_obj.send("自动生成".encode())

 

发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接kafka

由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;


操作 kafka 的消费者依旧只需要安装上述的第三方依赖包 kafka-python;


下面是将 kafka-python 库中消费者常用的方法进行封装,以便直接使用。更详细用法在下面第二节中


 import json

 

 from kafka import KafkaConsumer, KafkaProducer

 from kafka.structs import TopicPartition

 

 

 class KConsumer(object):

     """kafka 消费者; 动态传参,非配置文件传入;

        kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;

     """

 

     _encode = "UTF-8"

 

     def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs):

         """ 初始化kafka的消费者;

             1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)

             2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;

             3. 手动设置偏移量

         Args:

             topics: str; kafka 的消费主题;

             bootstrap_server: list; kafka 的消费者地址;

             group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;

             partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区;

             **kwargs: dict; 其他原生kafka消费者参数的;

         """

 

         if bootstrap_server is None:

             bootstrap_server = ["192.168.74.136:9092"]  # kafka集群的话就写多个

         self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server)

         exist = self.exist_topics(topics)

         if not exist:  # 需要的主题不存在;

             # 创建一条

             self.create_topics(topics)

         if partitions is not None:

             self.consumer = KafkaConsumer(

                 bootstrap_servers=bootstrap_server,

                 group_id=group_id,  

                 # 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展;

                 **kwargs

             )

             # print("指定分区信息:", partitions, topics, type(partitions))

             self.topic_set = TopicPartition(topics, int(partitions))

             self.consumer.assign([self.topic_set])

         else:

             # 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的;

             self.consumer = KafkaConsumer(

                 topics,

                 bootstrap_servers=bootstrap_server,

                 group_id=group_id,

                 **kwargs

             )

 

     def exist_topics(self, topics):

         """

         检查 kafka 中的主题是否存在;

         Args:

             topics: 主题名称;

 

         Returns:

             bool: True/False ; True,表示存在,False 表示不存在;

         """

         topics_set = set(self.consumer.topics())

         if topics not in topics_set:

             return False

         return True

 

     @staticmethod

     def create_topics(topics):

         """

         创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server;

         Args:

             topics: str; 主题的名字;

 

         Returns:

             None

         """

         producer = KafkaProducer(

             bootstrap_servers='192.168.74.136:9092',

             key_serializer=lambda k: json.dumps(k).encode('utf-8'),

             value_serializer=lambda v: json.dumps(v).encode("utf-8")

         )

         producer.send(topics, key="start", value={"msg": "aaaa"})

         producer.close()

 

     def recv(self):

         """

         接收消费中的数据

         Returns:

             使用生成器进行返回;

         """

         for message in self.consumer:  

             # 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移

             # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (

             #     message.topic, message.partition, message.offset, message.key, message.value))

             yield {"topic": message.topic, "partition": message.partition, "key": message.key,

                    "value": message.value.decode(self._encode)}

 

     def recv_seek(self, offset):

         """

         接收消费者中的数据,按照 offset 的指定消费位置;

         Args:

             offset: int; kafka 消费者中指定的消费位置;

 

         Returns:

             generator; 消费者消息的生成器;

         """

         self.consumer.seek(self.topic_set, offset)

         for message in self.consumer:

             # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (

             #     message.topic, message.partition, message.offset, message.key, message.value))

             yield {"topic": message.topic, "partition": message.partition, "key": message.key,

                    "value": message.value.decode(self._encode)}

 

 

 if __name__ == '__main__':

     """ 测试使用;

     """

 

     obj = KConsumer("exist_topic", bootstrap_server=['192.168.74.136:9092'])

     for i in obj.recv():

         print(i)

该消费者封装时多增加了一个需求,消费的主题不存在的时候会默认创建(创建成功的前提是kafka服务端的设置auto.create.topics.enable=true),下次就可以继续消费

以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;


docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,因此只需要将相关的地址配好,即可;代码信息无需修改;


一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;


 host 文件

 

 127.0.0.1 kafka

当需要远程连接的时候,将地址改成该计算机在内网中的地址即可

# 需要安装的库如下

pip install kafka-python

pip install msgpack  # msgpack库将数据序列化可以将数据转换为二进制格式,便于在网络中传输


# msgpack替换为python内置的bytes方法更简便 ,如: log = bytes(str(log), encoding="utf-8")  


# msgpack的序列化

data = 'test_message'

msgpack.packb(data)


# msgpack的反序列化

packed_data = b'\x83\xa3age\x14\xa3name\xa3Tom\xa3sex\xa1M'

data = msgpack.unpackb(packed_data)

from kafka import KafkaProducer

from kafka.errors import KafkaError


producer = KafkaProducer(bootstrap_servers=['broker1:1234'])


# Asynchronous by default

future = producer.send('my-topic', b'raw_bytes')


# Block for 'synchronous' sends

try:

    record_metadata = future.get(timeout=10)

except KafkaError:

    # Decide what to do if produce request failed...

    log.exception()

    pass


# Successful result returns assigned partition and offset

print (record_metadata.topic)

print (record_metadata.partition)

print (record_metadata.offset)


# produce keyed messages to enable hashed partitioning

producer.send('my-topic', key=b'foo', value=b'bar')


# encode objects via msgpack

producer = KafkaProducer(value_serializer=msgpack.dumps)

producer.send('msgpack-topic', {'key': 'value'})


# produce json messages

producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))

producer.send('json-topic', {'key': 'value'})


# produce asynchronously

for _ in range(100):

    producer.send('my-topic', b'msg')


def on_send_success(record_metadata):

    print(record_metadata.topic)

    print(record_metadata.partition)

    print(record_metadata.offset)


def on_send_error(excp):

    log.error('I am an errback', exc_info=excp)

    # handle exception


# produce asynchronously with callbacks

producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)


# block until all async messages are sent

producer.flush()


# configure multiple retries

producer = KafkaProducer(retries=5)

from kafka import KafkaConsumer


consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])  #参数为接收主题和kafka服务器地址


# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移

for message in consumer:  # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待直到新消息过来

    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

from kafka import KafkaConsumer


# 为定义的消费者指定一个group_id,对于订阅同一个topic来的group来说,能够使得该group中的所有消费者共同消费其中的消息(group中每个消费者消费的是不同的消息),横向扩展处理能力

# 对于订阅同一个topic来的不同group来说,每个消费组都会获取这个topic中的所有消息进行消费


# 如果不指定group ID,consumer会使用一个自动生成的group id。这个group id的格式通常是 __consumer_offsets-<generated_uuid>,其中<generated_uuid>是一个随机生成的唯一通用标识符。那么相当于一个消费者就单独属于一个消费组,尽管它们订阅了同一个topic,也只能一个人单独消费该topic的全部消息

consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])

for message in consumer:

    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

from kafka import KafkaConsumer


consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])


for message in consumer:

    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

auto_offset_reset:重置偏移量,earliest 移到最早的可用消息,latest 最新的消息,默认为 latest

源码定义: {‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

# ==========读取指定位置消息===============

from kafka import KafkaConsumer

from kafka.structs import TopicPartition


consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])


print(consumer.partitions_for_topic("test"))  #获取test主题的分区信息

print(consumer.topics())  #获取主题列表

print(consumer.subscription())  #获取当前消费者订阅的主题

topic_partition_set = consumer.assignment()

print(topic_partition_set)  #获取当前消费者topic、分区信息

for topic_partition in topic_partition_set:

    print(consumer.beginning_offsets(topic_partition))  # 获取当前消费者可消费的偏移量

    offset = consumer.position(topic_partition)


consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,从第5个偏移量消费

for message in consumer:

    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

# =======订阅多个消费者==========


from kafka import KafkaConsumer

from kafka.structs import TopicPartition


consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题

print(consumer.topics())

print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量

for message in consumer:

    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

from kafka import KafkaConsumer

import time


consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('test','test0'))

while True:

    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息

    print(msg)

    time.sleep(2)

# ==============消息恢复和挂起===========


from kafka import KafkaConsumer

from kafka.structs import TopicPartition

import time


consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=('test'))

consumer.topics()

consumer.pause(TopicPartition(topic=u'test', partition=0))  # pause执行后,consumer不能读取,直到调用resume后恢复。

num = 0

while True:

    print(num)

    print(consumer.paused())   #获取当前挂起的消费者

    msg = consumer.poll(timeout_ms=5)

    print(msg)

    time.sleep(2)

    num = num + 1

    if num == 10:

        print("resume...")

        consumer.resume(TopicPartition(topic='test', partition=0))

        print("resume......")

pause 执行后,consumer不能读取,直到调用 resume 后恢复

import json

import msgpack

from kafka import KafkaConsumer


# To consume latest messages and auto-commit offsets

consumer = KafkaConsumer('my-topic',

                         group_id='my-group',

                         bootstrap_servers=['localhost:9092'])

for message in consumer:

    # message value and key are raw bytes -- decode if necessary!

    # e.g., for unicode: `message.value.decode('utf-8')`

    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,

                                          message.offset, message.key,

                                          message.value))


# consume earliest available messages, don't commit offsets

KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)


# consume json messages

KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))


# consume msgpack

KafkaConsumer(value_deserializer=msgpack.unpackb)


# StopIteration if no message after 1sec

KafkaConsumer(consumer_timeout_ms=1000)


# Subscribe to a regex topic pattern

consumer = KafkaConsumer()

consumer.subscribe(pattern='^awesome.*')


# Use multiple consumers in parallel w/ 0.9 kafka brokers

# typically you would run each on a different server / process / CPU

consumer1 = KafkaConsumer('my-topic',

                          group_id='my-group',

                          bootstrap_servers='my.server.com')

consumer2 = KafkaConsumer('my-topic',

                          group_id='my-group',

                          bootstrap_servers='my.server.com')

client = KafkaClient(bootstrap_servers=brokers)


if topic not in client.cluster.topics(exclude_internal_topics=True):  # Topic不存在


    request = admin.CreateTopicsRequest_v0(

        create_topic_requests=[(

            topic,

            num_partitions,

            -1,  # replication unset.

            [],  # Partition assignment.

            [(key, value) for key, value in configs.items()],  # Configs

        )],

        timeout=timeout_ms

    )


    future = client.send(2, request)  # 2是Controller,发送给其他Node都创建失败。

    client.poll(timeout_ms=timeout_ms, future=future, sleep=False)  # 这里


    result = future.value

    # error_code = result.topic_error_codes[0][1]

    print("CREATE TOPIC RESPONSE: ", result)  # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS

    client.close()

else:  # Topic已经存在

    print("Topic already exists!")

    return


相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka
python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的k...

2024-10-25 11:50 citgpt

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: