当前位置: 首页 >  业内资讯  >   >  正文

【世界时快讯】python 使用 kafka

  • 2023-03-25 21:21:57 来源:博客园

python 使用 kafka

说明:关于 kafka 的启动与安装,命令行的使用,此处不做过多的解释,本篇文章主要描述 kafka 在 python 中的使用;


(相关资料图)

1. python 使用 kafka 生产者

**说明:**python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;

使用 python 操作 kafka 首先安装如下的包

pip install kafka pip install kafka-python # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要多安装该包;

常规的使用主要就是根据,第三方包的介绍使用,网上有许多基本的案例,此处不做介绍,下面直接将封装好的常用的方法进行封装;

import json​import kafka​​class Producer(object):  """ kafka 的生产者模型   """​  _coding = "utf-8"​  def __init__(self,         broker="192.168.74.136:9092",         topic="add_topic",         max_request_size=104857600,         batch_size=0, # 即时发送,提高并发可以适当增加,但是会造成消息的延迟;         **kwargs):    """初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接;     """    self.broker = broker    self.topic = topic    self.max_request_size = max_request_size    # 实例化生产者对象    self.producer_json = kafka.KafkaProducer(      bootstrap_servers=self.broker,      max_request_size=self.max_request_size,      batch_size=batch_size,      key_serializer=lambda k: json.dumps(k).encode(self._coding), # 设置键的形式使用匿名函数进行转换      value_serializer=lambda v: json.dumps(v).encode(self._coding), # 当需要使用 json 传输地时候必须加上这两个参数      **kwargs     )​    self.producer = kafka.KafkaProducer(      bootstrap_servers=broker,      max_request_size=self.max_request_size,      batch_size=batch_size,      api_version=(0, 10, 1),      **kwargs     )​  def send(self, message: bytes, partition: int = 0):    """     写入普通的消息;     Args:       message: bytes; 字节流数据;将字符串编码成 utf-8的格式;       partition: int; kafka 的分区,将消息发送到指定的分区之中;     Returns:       None     """    future = self.producer.send(self.topic, message, partition=partition)    record_metadata = future.get(timeout=30)    if future.failed(): # 发送失败,记录异常到日志;      raise Exception("send message failed:%s)" % future.exception)​  def send_json(self, key: str, value: dict, partition: int = 0):    """     发送 json 形式的数据;     Args:       key: str; kafka 中键的值       value: dict; 发送的具体消息       partition: int; 分区的信息     Returns:       None     """    future = self.producer_json.send(self.topic, key=key, value=value, partition=partition)    record_metadata = future.get(timeout=30)    if future.failed(): # 发送失败记录异常;      raise Exception("send json message failed:%s)" % future.exception)​  def close(self):    """     关闭kafka的连接。     Returns:       None     """    self.producer_json.close()    self.producer.close()​​if __name__ == "__main__":  """脚本调用执行;"""  kafka_obj = Producer()  print(kafka_obj.broker)  kafka_obj.send("自动生成".encode())​

发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接;

2. python 使用 kafka 消费者

由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;

操作 kafka 的消费者依旧只需要安装上述的两个第三方依赖包;

封装指定的操作

import json​from kafka import KafkaConsumer, KafkaProducerfrom kafka.structs import TopicPartition​​class KConsumer(object):  """kafka 消费者; 动态传参,非配置文件传入;    kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;   """​  _encode = "UTF-8"​  def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs):    """ 初始化kafka的消费者;       1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)       2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;       3. 手动设置偏移量     Args:       topics: str; kafka 的消费主题;       bootstrap_server: list; kafka 的消费者地址;       group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;       partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区;       **kwargs: dict; 其他原生kafka消费者参数的;     """​    if bootstrap_server is None:      bootstrap_server = ["192.168.74.136:9092", ]    self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server)    exist = self.exist_topics(topics)    if not exist: # 需要的主题不存在;      # 创建一条      self.create_topics(topics)    if partitions is not None:      self.consumer = KafkaConsumer(        bootstrap_servers=bootstrap_server,        group_id=group_id,         # 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展;        **kwargs       )      # print("指定分区信息:", partitions, topics, type(partitions))      self.topic_set = TopicPartition(topics, int(partitions))      self.consumer.assign([self.topic_set])    else:      # 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的;      self.consumer = KafkaConsumer(        topics,        bootstrap_servers=bootstrap_server,        group_id=group_id,        **kwargs       )​  def exist_topics(self, topics):    """     检查 kafka 中的主题是否存在;     Args:       topics: 主题名称;​     Returns:       bool: True/False ; True,表示存在,False 表示不存在;     """    topics_set = set(self.consumer.topics())    if topics not in topics_set:      return False    return True​  @staticmethod  def create_topics(topics):    """     创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server;     Args:       topics: str; 主题的名字;​     Returns:       None     """    producer = KafkaProducer(      bootstrap_servers="192.168.74.136:9092",      key_serializer=lambda k: json.dumps(k).encode("utf-8"),      value_serializer=lambda v: json.dumps(v).encode("utf-8")     )    producer.send(topics, key="start", value={"msg": "aaaa"})    producer.close()​  def recv(self):    """     接收消费中的数据     Returns:       使用生成器进行返回;     """    for message in self.consumer:       # 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移      # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (      #   message.topic, message.partition, message.offset, message.key, message.value))      yield {"topic": message.topic, "partition": message.partition, "key": message.key,          "value": message.value.decode(self._encode)}​  def recv_seek(self, offset):    """     接收消费者中的数据,按照 offset 的指定消费位置;     Args:       offset: int; kafka 消费者中指定的消费位置;​     Returns:       generator; 消费者消息的生成器;     """    self.consumer.seek(self.topic_set, offset)    for message in self.consumer:      # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (      #   message.topic, message.partition, message.offset, message.key, message.value))      yield {"topic": message.topic, "partition": message.partition, "key": message.key,          "value": message.value.decode(self._encode)}​​if __name__ == "__main__":  """ 测试使用;   """​  obj = KConsumer("exist_topic", bootstrap_server=["192.168.74.136:9092"])  for i in obj.recv():    print(i)

该消费者多封装时增加了一个需求,消费的主题不存在的时候会默认创建,下次就可以继续消费

3. 使用 docker 中的 kafka

以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;

docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,但是配置方式,因此只需要将相关的地址配好,即可;代码信息无需修改;

一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;

host 文件​127.0.0.1 kafka

当需要远程连接的时候,将地址改成该计算机在内网中的地址即可;

标签:

上一篇 :

下一篇 :

最新推荐

【世界时快讯】python 使用 kafka

python使用kafka说明:关于kafka的启动与安装,命令行的使用,此处不做过多的解释,本篇文章主要描述kaf...

今头条!中华信鸽网各地公棚_中华信鸽网

1、03为河北省信鸽协会足环全国统一代码,应该是河北省信鸽协会某会员的鸽子,具体是谁的。2、要逐级查...

天天资讯:bdm是什么职位的简称

1、BDM是业务拓展经理。2、BDM是BusinessDevelopmentManager,业务拓展经理的意思,一般BDM要有宏观的战...

全球热头条丨云南省首个5吉瓦高效产能异质结电池项目首线投产运行

云南省首个5吉瓦高效产能异质结电池项目首线投产运行

世界播报:定压功放70v 110v输出_定压功放70v与100v的区别

1、定压功放的音柱都有线间变压器(你要确定你的音柱是有线间变压器的。2、)只要所有音柱的总功率不超...

世界观察:3388算24点怎么算式加减乘除_3388算24点怎么算

1、首先,我们写出数字3384和24,仔细观察它们之间的关系。2、我们发现24可以通过公式(3388-4) 141来计算

全球速看:节外生枝的意思

1、节外生枝,汉语成语,拼音是jiéwàishēngzhī,意思是本不应该生枝的地方生枝,比喻在原有问题之外...

每日精选:南京养老金一个月多少钱 南京退休养老金什么时候调整

南京养老金一个月多少钱?2022南京退休养老金什么时候调整?下面同社保君来看看。往年南京退休人员养老金...

环球滚动:百舸争流 竞逐深圳大亚湾

作为中国首个、深圳创办的国际海上体育赛事品牌,中国杯帆船赛创办于2007年,已成功在深圳举办十三届,...

今日快讯:第一个南极考察基地长城站什么时候落成_在南极考察中使用的海洋破冰船

1、1)排水来体积浮力等于重力mg=Vρg,排水源体积V=1500t (1t m³)=1500m³2)牵引力=

焦点日报:国投资本(600061):2022年营业收入19.37亿元,与上期同比增加62.58%

国投资本(600061)(600061):2022年营业收入19 37亿元,与上期同比增加62 58%3月24日,国投资本2022年年报显示,本期营业

天天热讯:居民家庭户口什么意思

1、居民家庭户口简称家庭户,是与集体户相对应的一类住户所登记的户口类型。2、居民家庭户口包括农村居...

焦点!jumpserver的简单安装使用

服务器的管理停留在xshell登陆管理的时代,主机设备数量少,单人操作的时候还能满足使用。现在的主机数...

焦点信息:体坛:伊沃很可能追随恩师金奉吉,加盟中甲球队延边龙鼎

伊沃在仁川联效力时的主帅金奉吉于去年12月与延边龙鼎签约,球队将在本赛季征战中甲。金奉吉此前曾执教...

世界速读:交易软件无法正常登录交易 证监会要求东方财富(300059.SZ)说明原因并反馈整改情况

3月24日,中国证监会发布《东方财富证券股份有限公司申请上市证券做市交易业务资格反馈意见》

观焦点:启源软件:一致行动人魏永刚通过定向发行方式新增股份30万股

同壁AI讯,启源软件2023年03月24日发布公告称,西安启源软件股份有限公司进行了股票定向发行,一致行动...

每日聚焦:临时接到考试通知女生边爬山边写试卷

12月31日,河南洛阳。女子爬山时正遇期末考试,于是随地写试卷。拍下这一幕的当事人杜同学称:事先已与...

【天天快播报】1平方公里是多少亩

1、1平方公里等于1500亩。1平方公里=1平方千米=10^6平方米。1亩≈666 67平方米。10^6÷666 67≈1500(...

环球热议:TMT行情持续火爆,相关产品业绩却下跌,兴银基金怎么了?

TMT行情持续火爆,相关产品业绩却下跌,兴银基金怎么了?,重仓股,兴银基金,tmt行情,重要货币市场基金

环球滚动:上海市地方金融监督管理局副局长管小军:促进资本与实体经济精准对接 打造融资租赁生态高地

中国证券网讯(记者陈梦娜)3月24日,“‘新征程、振信心、开新局’—首届租赁业投融资高峰论坛(2023)暨上...

世界快看:嫦娥五号样品研究揭示月幔源区特性奥秘

揭示了嫦娥五号样品月帘源区的特点和月球青年火山活动的奥秘。据介绍,月球是人类深空探测的“前哨站”...

世界报道:NAND产业或将从第三季度迎来增长契机

《科创板日报》24日讯,全球存储器大厂遭遇财务亏损压力,2023年上半恐仍将难遏止跌价。根据市场统计,2...

环球热文:4g什么时候出来的_4g什么时候开始的

1、4G是2014年流行的。2、4G的标准完成在2008年左右,在世界上在2011年左右正式商用4G;2013年12月4

天天快看点丨联通携号转网套餐_移动转联通携号转网怎么办理

1、目前天津、海南、江西、湖北、云南五省开通移动电话携号转网业务。2、如您号码在此范围内。3、可拨打...

【聚看点】甪怎么读

lù。甪,释义:1、常用于名字。见于《康熙字典》。2、镇名,在江苏苏州吴中区东,与昆山市交界处。一称...

今日讯!优化生育(三孩)板块3月23日跌0.05%,孩子王领跌,主力资金净流出4.79亿元

3月23日优化生育(三孩)板块较上一交易日下跌0 05%,孩子王领跌。当日上证指数报收于3286 65,上涨0...

环球关注:俄外交部发言人强调俄方坚持一个中国原则

俄外交部发言人强调俄方坚持一个中国原则---新华社莫斯科3月23日电(记者华迪)俄罗斯外交部发言人扎哈...

焦点热议:全球金融中心指数香港排名第四位

据大湾区之声,第33期《全球金融中心指数》报告今日(23日)发布,香港的总排名维持全球第四位。香港特区...

天天观点:丹泉酒业2023洞藏封坛·春赏之旅拉开帷幕

本文转自:人民网-广西频道3月20日-22日,丹泉酒业2023洞藏封坛·春赏之旅拉开帷幕。一场春季封坛,丹泉...

环球短讯!市场监管总局下达一批适老化改造国家标准制修订专项计划

在家居环境和老年用品方面,重点制定家居产品适老化设计通用标准,并在老视成镜、无障碍洗浴机、轮椅车...

最新消息:传奇世界精密灵匣如何打开

开精密箱子步骤:1、精密一到海角村寻找铁匠,打开精密箱子;2、精密二到西域奇境寻找铁匠,打开精密箱...

环球速看:4月1日起 上海虹桥至香港西九龙间高铁恢复开行

上证报中国证券网讯(记者宋薇萍)上海证券报记者3月23日从中国铁路上海局集团有限公司(下简称上海局集...

世界通讯!绵竹贴吧2019_绵竹贴吧

1、【绵竹吧】活动筹备群QQ群号码100390752绵竹吧-城区(剑南、西南、东北)吧友交流1群【满】QQ群号码6

每日视讯:便民信息汇丨持续两天,推出岗位1000+ 本周末新城有一场招聘会

本周末,2023年新城春季招聘会将在新城凯虹广场举行。正在找工作的不要错过。本次招聘会由市人社局主办...

全球滚动:晋中市五险一金缴纳基数和比例是多少钱2023

晋中市五险一金缴纳基数和比例是多少钱2023,下文就随社保君来简单的了解一下吧。晋中市社保缴费介绍:...

环球快看:肆无忌惮的近义词褒义词(肆无忌惮的近义词)

1、肆无忌惮的同义词:愈演愈烈、持明火执仗、任性、罗氏接口、独霸他人、鲁莽、大胆、鲁莽、无法无天、...

百事通!抽检1/3不合格,水晶泥有“毒”?还能玩吗

玩具水晶泥以其独特的外观和手感,以及在各种儿童视频中的高出镜率,吸引了不少儿童消费者。

全球播报:韵母表怎么读

1、单韵母(6个):ɑ啊、o喔、e鹅、i衣、u乌、v迂。2、复韵母(8个):ai爱、ei诶、ui威、ao袄、ou欧、...

全球看点:如何区分花粉过敏和感冒?关键区别在这里

生活中,有的人一到春天就特别容易“感冒”,除了打喷嚏、流鼻涕、鼻塞外,还伴有鼻痒、流泪等症状。其...

环球观天下!万年县气象台发布大雾黄色预警信号【III级/较重】【2023-03-23】

万年县气象台2023年03月23日06时16分发布大雾黄色预警信号:预计未来12小时内,陈营镇、大源镇、湖云乡...

最新快讯!微信升级怎么升8._微信升级怎么升

1、打开手机微信,点击右边的“我”按钮。2、2、进入之后点击最后一项设置按钮,然后进入设置选项。3、3...

环球热消息:厦门逸夫中学小升初录取分数线_厦门逸夫中学

1、地址:厦门市洪莲中路209号公交:瑞景新村站下车(嘉盛豪园的后面)51926363095101520

世界焦点!重案六组4演员表

1、《重案六组4》是由海润影视制作有限公司出品、杨大晗、徐庆东执导,王茜、邢岷山、张潮、郭昊伦、肖...

焦点热文:罗德里:英超联赛非常艰难,你可以将其等同于欧冠

在接受采访时,曼城中场罗德里谈到了关于西班牙国家队以及曼城的一些情况。罗德里表示,英超联赛就像是...

每日播报!山回路转不见君的下一句自创_山回路转不见君的下一句

1、“山回路转不见君,雪上空留马行处”出自唐代诗人岑参的《白雪歌送武判官归京》。2、白雪

世界即时看!埃因霍温阵容,有人有0203赛季埃因霍温的球员名单吗急需

1,有人有0203赛季埃因霍温的球员名单吗急需埃因霍温2002-2003赛季一线队名单:门将:沃特鲁斯RonaldWat...

天天视讯!期货短线的方法和纪律

看趋势,做单向,多多观察少交易超短线做期货,首先要看清楚当前的走势和方向,在特定的时间内选择一个...

天天实时:17zwd一起做网店普宁_17一起做网店普宁

1、淘宝卖家轻松一键导入第一步:点击“注册”按钮;第二步:根据内容进行填写,并点击“完成”按钮;第...

天天滚动:南通社保卡密码如何修改?

1 社会保障卡的密码如何修改?(1)社会保障卡的密码分为社会保障应用密码和银行账户应用密码。(2)社...

全球微动态丨新华时评:用好法治手段 护航网络市场健康发展

新华时评:用好法治手段护航网络市场健康发展---题用好法治手段护航网络市场健康发展。随着我国数字经济...

X 广告
X 广告

精彩放送

头条焦点:广发期货:焦煤建议偏空对待

环球关注:钢限 |『经典套路玩不完』:网限HG XVX-016 风灵高达(帕梅特刻痕·第六阶段)预订开始!

天天快资讯丨注册安全工程师分类选择_注册安全工程师分类

世界微头条丨vivo xplay7配置怎么样vivo xplay7参数配置消息

全球微资讯!男子吐槽22元面仅盖碗底!店家退费并将加量20%

天天快消息!全球咖啡巨头竞争再起,星巴克中国首推“沿街取”,拉瓦萨开起艺术展

当前短讯!qq亲密度怎么到100_qq亲密度怎么刷到100

全球热头条丨2023年3月21日焦点图

每日报道:两支国字号PK新西兰:85级国足4次交锋全部战平,新老国脚隔空PK

天天观焦点:皇桐和正村:发展特色桑葚产业助力乡村振兴

世界热议:GOG喜加一!免费领取心理恐怖游戏《Lorelai》

要闻:夏普对皮蓬和前妻23年每晚做爱4次反应:这是皮蓬饱受背伤的原因

今日热讯:梗阻性黄疸以哪种胆红素升高为主_梗阻性黄疸

焦点讯息:明天A股怎么走?我做了一个大胆的预判。具体你请看我的手绘预测图。

世界观天下!黄鹤楼科技园(集团)有限公司是国企吗

【全球独家】李梦事件进展:曝和张隆已结婚戴婚戒图曝光 媒体人称法律上没事

全球最新:吉克曲布个人资料简介_吉克曲布与吉克隽逸是什么关系

天天信息:扎头发教程-扎头发

全球通讯!尹锡悦被批“做日本仆人”!韩国直接沦为全球笑柄!

全球观焦点:干咳嗽吃什么好的快尤其是饭后_干咳嗽吃什么好的快

每日热文:国际知名民调机构:中国人幸福感全球最高

天天讯息:韭菜子泡酒的配方_韭菜子泡酒有什么药用价值

天天滚动:航拍广西在建世界最大跨径拱桥

环球新消息丨伟大的1-0!亚洲第10夺冠了!剑指2026世界杯,国足出线更难了!

【全球速看料】吉水县气象台发布雷电黄色预警信号【III级/较重】【2023-03-19】

精彩看点:每天踮脚,竟有4个意想不到的好处!不花钱也能给内脏做“按摩”,会养生的人都在悄悄做

全球微速讯:广东东莞市寮步镇邮编_广东省东莞市寮步镇邮编是多少

【天天热闻】Intel 56核心至强超频5.5GHz

视点!iQOO7怎么插双卡

【天天报资讯】柯达破产的故事_柯达破产

【环球速看料】广州一居民楼起火15岁少年身亡,消火栓是否有水,业主希望广州消防局说明真像

世界最新:中国移动03月17日被沪股通减持91.07万股

【天天新要闻】木工板种类_木工板厚度

环球热头条丨佐力药业:接受富国基金等机构调研

环球讯息:激战插混市场,长城汽车称不会跟风定价

【世界时快讯】3月17日汇市晚评:欧美市场动荡下 日元再度受宠

全球简讯:桂发祥(002820)3月17日主力资金净卖出455.13万元

环球速看:多地鼓励医生兼职获取报酬,专家:很难成为普遍现象

天天微动态丨事关办酒席!四川对领导干部提出“八严禁”,退休3年内都得执行

精彩看点:3月17日F5G概念板块涨幅达3%

全球看热讯:碑林区:“云上招商”激发活力 “亿元楼宇”潜力无限

每日精选:快速学会催眠术教程_快速学习催眠术

全球热门:a1566是ipad几代cpu_a1566是ipad第几代

当前快看:全球连线|美联储激进加息是硅谷银行关闭主因

全球观察:2023广东春季高考线上咨询会怎么进?附官方入口

当前快讯:宁夏银川宝丰昱能供电工程具备送电条件

环球实时:2023年郑州高新区公租房位置在哪里?(持续更新)

天天通讯!纺织观察:棉纱弱势企稳 纺企担忧订单消退

环球短讯!远方信息3月16日快速反弹

每日信息:狗狗汪汪队玩具_狗狗汪汪队第二季

Copyright ©  2015-2022 纵横律师网版权所有  备案号:浙ICP备2022016517号-12   联系邮箱:51 46 76 11 3 @qq.com