Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)

# 发送消息的函数
def send_message(topic, message):
    # 将消息转换为字节
    producer.send(topic, json.dumps(message).encode('utf-8'))
    producer.flush()

if __name__ == '__main__':
    # 创建'test'主题
    topic = 'test'
    # 发送消息
    i = 1
    while True:
        message = {'num': i, 'msg': f'Hello Kafka {i}'}
        send_message(topic, message)
        i += 1
        time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json

# Kafka服务器地址
bootstrap_servers = ['localhost:9092']

# 创建KafkaConsumer实例
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=bootstrap_servers,
    auto_offset_reset='latest',  # 从最新的消息开始消费
    # auto_offset_reset='earliest',  # 从最早的offset开始消费
    enable_auto_commit=True,  # 自动提交offset
    group_id='my-group'  # 消费者组ID
)

# 消费消息
for message in consumer:
    # 将接收到的消息解码并转换为字典
    message = json.loads(message.value.decode('utf-8'))
    print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic

# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)


# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)

# 删除主题
admin_client.delete_topics(topics=['test-topic'])

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/882932.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CentOS 安装 JAVA环境(JDK 1.8)

镜像选择 推荐国内镜像直接下载 清华镜像 https://mirrors.tuna.tsinghua.edu.cn/Adoptium 关于重命名 AdoptOpenJDK 镜像为 Adoptium 的通知 编程宝库 http://www.codebaoku.com/jdk/jdk-index.html 这个镜像站,包含Oracle JDK、OpenJDK、AdoptOpenJDK、阿里…

视频汇聚EasyCVR视频监控平台调取接口提示“认证过期”是什么原因?

视频汇聚EasyCVR视频监控平台,作为一款智能视频监控综合管理平台,凭借其强大的视频融合汇聚能力和灵活的视频能力,在各行各业的应用中发挥着越来越重要的作用。EasyCVR平台具备强大的拓展性和灵活性,支持多种视频流的外部分发&…

【Elasticsearch】-实现图片向量相似检索

1、http请求方式 如果elasticsearch服务设置账号密码,则在请求的header中添加 Basic Auth 认证 请求方式:Post 请求地址:/index_name/_search 请求body:json格式 {"size": 10, //返回条数"min_score": 0.…

利用低代码快速搭建电商小程序之商品列表页

目标: 搭建商城的一个商品列表页面(先做静态页) 开发环境: 访问白码低代码平台:https://www.bnocode.com/ 白码的新自定义页功能(使用vue框架) 前期准备: 需要先准备商品数据表…

用 Pygame 实现一个乒乓球游戏

用 Pygame 实现一个乒乓球游戏 伸手需要一瞬间,牵手却要很多年,无论你遇见谁,他都是你生命该出现的人,绝非偶然。若无相欠,怎会相见。 引言 在这篇文章中,我将带领大家使用 Pygame 库开发一个简单的乒乓球…

Servlet入门:服务端小程序的初试(自己学习整理的资料)

目录 一.前言 二.建立基础结构​编辑 三.具体步骤 找到Tomcat文件并打开Tomcat。 在webapps中创建一个自己的文件夹。 在classes中新建一个Java文件。 在lib中导入需要的jar文件包。 配置环境变量 在Java文件的目录下打开cmd并输入 javac -d . HelloServlet.java进行…

如何设计出一个比较全面的测试用例

目录 1. 测试用例的基本要素(不需要执行结果) 2. 测试用例的给我们带来的好处 3. 用例编写步骤 4. 设计测试用例的方法 4.1 基于需求进行测试用例的设计 4.2 具体的设计方法 1.等价类 2.边界值 3.判定表(因果图) 4.正交表法 5.场景设计法 6.错误猜测…

[产品管理-33]:实验室技术与商业化产品的距离,实验室技术在商业化过程中要越过多少道“坎”?

目录 一、实验室技术 1.1 实验室研究性技术 1.2 技术发展的S曲线 技术发展S曲线的主要阶段和特点 技术发展S曲线的意义和应用 二、实验室技术商业化的路径 2.1 实验室技术与商业化产品的距离 1、技术成熟度与稳定性 - 技术自身 2、市场需求与适应性 - 技术是满足需求 …

快递物流短信API接口代码

官网:快递鸟 API参数 用户信息类 一.短信模版 1.接口说明 使用快递鸟短信功能时,预先设置好短信模板和对应的发送规则,快递鸟短信API将根据设置的好的模板和规则,进行短信的发送和反馈。 (1)仅支持Json格式。 (2)请求指令810…

org.eclipse.paho.client.mqttv3.MqttException: 无效客户机标识

需求背景 最近有一个项目,需要用到阿里云物联网,不是MQ。发现使用原来EMQX的代码去连接阿里云MQTT直接报错,试了很多种方案都不行。最终还是把错误分析和教程都整理一下。 需要注意的是,阿里云物联网平台和MQ不一样。方向别走偏了。 概念描述 EMQX和阿里云MQTT有什么区别…

国标GB28181视频融合监控汇聚平台的方案实现及场景应用

Liveweb国标视频融合云平台基于端-边-云一体化架构,部署轻量简单、功能灵活多样,平台可支持多协议(GB28181/RTSP/Onvif/海康SDK/Ehome/大华SDK/RTMP推流等)、多类型设备接入(IPC/NVR/监控平台),在视频能力上&#xff0…

CNS-WRFID-01地标卡读写器|写卡器DEMO软件读、写操作说明

CNS-WRFID-01地标卡读写器|写卡器是一款高频读写设备,支持ISO15693协议芯片卡,地标标签读写,支持兴颂系列抗金属|非抗金属RFID标签,如:CNS-CRFID-01、CNS-CRFID-02、CNS-CRFID-03、CNS-CRFID-04、CNS-CRFID-05、CNS-CR…

chorme浏览器 您的连接不是私密连接

‌当浏览器显示“您的连接不是私密连接,攻击者可能会试图从 localhost 窃取您的信息(例如:密码、消息或信用卡信息)”的警告时,这通常意味着您正在尝试访问的网站的安全证书存在问题,可能是因为它使用的是自…

2015年国赛高教杯数学建模A题太阳影子定位解题全过程文档及程序

2015年国赛高教杯数学建模 A题 太阳影子定位 技术就是通过分析视频中物体的太阳影子变化,确定视频拍摄的地点和日期的一种方法。   1.建立影子长度变化的数学模型,分析影子长度关于各个参数的变化规律,并应用你们建立的模型画出2015年10月…

ESP32-WROOM-32 [创建AP站点-客户端-TCP透传]

简介 基于ESP32-WROOM-32 开篇(刚买), 本篇讲的是基于固件 ESP32-WROOM-32-AT-V3.4.0.0(内含用户指南, 有AT指令说明)的TCP透传设置与使用 设备连接 TTL转USB线, 接ESP32 板 的 GND,RX2, TX2 指令介绍 注意,下面指…

利士策分享,如何在有限的时间内过上富足的生活?

利士策分享,如何在有限的时间内过上富足的生活? 在快节奏的现代生活中,追求富足不仅仅是物质上的丰盈,更是心灵的满足与生活的平衡。 如何在有限的时间内实现这一目标,是许多人心中的疑问。 以下是一些实用建议&#…

Linux centerOS 服务器搭建NTP服务

1,安装 NTP软件 sudo yum -y install ntp2,编辑配置文件 sudo vim /etc/ntp.conf 3,修改配置 在ntp.conf文件中,可以配置服务器从哪些上游时间源同步时间。如果你想让你的服务器对外同步时间,可以去掉restrict d…

BGP实验

1、实验拓扑 EBGP:位于不同AS的BGP路由器之间的BGP对等体关系。两台路由器之间要建立EBGP对等体关系,必须满足两个条件: 两个路由器所属AS不同(即AS号不同)。 在配置EBGP时,Peer命令所指定的对等体IP地址要…

【Docker】Docker快速入门

Docker学习笔记 一、Docker概述 为什么会出现Docker? 安卓开发流程:apk(java开发的)发布到应用商店,用户安装apk即可使用。 后端开发流程: jar(java开发的)带上环境发布到Docker仓库,用户从Docker仓库拉取镜像并部署。 总结…

智慧安防监控EasyCVR视频汇聚管理平台如何修改视频流分辨率?

智慧安防监控EasyCVR视频管理平台能在复杂的网络环境中,将前端监控设备进行统一集中接入与汇聚管理。EasyCVR平台支持H.264/H.265视频压缩技术,可在4G/5G/WIFI/宽带等网络环境下,传输720P/1080P/2K/4K高清视频。视频流经平台处理后&#xff0…