说明:关于 kafka 的启动与安装,命令行的使用,此处不做过多的解释,本篇文章主要描述 kafka 在 python 中的使用;
(相关资料图)
**说明:**python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;
使用 python 操作 kafka 首先安装如下的包
pip install kafka pip install kafka-python # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要多安装该包;
常规的使用主要就是根据,第三方包的介绍使用,网上有许多基本的案例,此处不做介绍,下面直接将封装好的常用的方法进行封装;
import jsonimport kafkaclass Producer(object): """ kafka 的生产者模型 """ _coding = "utf-8" def __init__(self, broker="192.168.74.136:9092", topic="add_topic", max_request_size=104857600, batch_size=0, # 即时发送,提高并发可以适当增加,但是会造成消息的延迟; **kwargs): """初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接; """ self.broker = broker self.topic = topic self.max_request_size = max_request_size # 实例化生产者对象 self.producer_json = kafka.KafkaProducer( bootstrap_servers=self.broker, max_request_size=self.max_request_size, batch_size=batch_size, key_serializer=lambda k: json.dumps(k).encode(self._coding), # 设置键的形式使用匿名函数进行转换 value_serializer=lambda v: json.dumps(v).encode(self._coding), # 当需要使用 json 传输地时候必须加上这两个参数 **kwargs ) self.producer = kafka.KafkaProducer( bootstrap_servers=broker, max_request_size=self.max_request_size, batch_size=batch_size, api_version=(0, 10, 1), **kwargs ) def send(self, message: bytes, partition: int = 0): """ 写入普通的消息; Args: message: bytes; 字节流数据;将字符串编码成 utf-8的格式; partition: int; kafka 的分区,将消息发送到指定的分区之中; Returns: None """ future = self.producer.send(self.topic, message, partition=partition) record_metadata = future.get(timeout=30) if future.failed(): # 发送失败,记录异常到日志; raise Exception("send message failed:%s)" % future.exception) def send_json(self, key: str, value: dict, partition: int = 0): """ 发送 json 形式的数据; Args: key: str; kafka 中键的值 value: dict; 发送的具体消息 partition: int; 分区的信息 Returns: None """ future = self.producer_json.send(self.topic, key=key, value=value, partition=partition) record_metadata = future.get(timeout=30) if future.failed(): # 发送失败记录异常; raise Exception("send json message failed:%s)" % future.exception) def close(self): """ 关闭kafka的连接。 Returns: None """ self.producer_json.close() self.producer.close()if __name__ == "__main__": """脚本调用执行;""" kafka_obj = Producer() print(kafka_obj.broker) kafka_obj.send("自动生成".encode())
发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接;
由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;
操作 kafka 的消费者依旧只需要安装上述的两个第三方依赖包;
封装指定的操作
import jsonfrom kafka import KafkaConsumer, KafkaProducerfrom kafka.structs import TopicPartitionclass KConsumer(object): """kafka 消费者; 动态传参,非配置文件传入; kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中; """ _encode = "UTF-8" def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs): """ 初始化kafka的消费者; 1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值) 2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数; 3. 手动设置偏移量 Args: topics: str; kafka 的消费主题; bootstrap_server: list; kafka 的消费者地址; group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id; partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区; **kwargs: dict; 其他原生kafka消费者参数的; """ if bootstrap_server is None: bootstrap_server = ["192.168.74.136:9092", ] self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server) exist = self.exist_topics(topics) if not exist: # 需要的主题不存在; # 创建一条 self.create_topics(topics) if partitions is not None: self.consumer = KafkaConsumer( bootstrap_servers=bootstrap_server, group_id=group_id, # 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展; **kwargs ) # print("指定分区信息:", partitions, topics, type(partitions)) self.topic_set = TopicPartition(topics, int(partitions)) self.consumer.assign([self.topic_set]) else: # 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的; self.consumer = KafkaConsumer( topics, bootstrap_servers=bootstrap_server, group_id=group_id, **kwargs ) def exist_topics(self, topics): """ 检查 kafka 中的主题是否存在; Args: topics: 主题名称; Returns: bool: True/False ; True,表示存在,False 表示不存在; """ topics_set = set(self.consumer.topics()) if topics not in topics_set: return False return True @staticmethod def create_topics(topics): """ 创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server; Args: topics: str; 主题的名字; Returns: None """ producer = KafkaProducer( bootstrap_servers="192.168.74.136:9092", key_serializer=lambda k: json.dumps(k).encode("utf-8"), value_serializer=lambda v: json.dumps(v).encode("utf-8") ) producer.send(topics, key="start", value={"msg": "aaaa"}) producer.close() def recv(self): """ 接收消费中的数据 Returns: 使用生成器进行返回; """ for message in self.consumer: # 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移 # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % ( # message.topic, message.partition, message.offset, message.key, message.value)) yield {"topic": message.topic, "partition": message.partition, "key": message.key, "value": message.value.decode(self._encode)} def recv_seek(self, offset): """ 接收消费者中的数据,按照 offset 的指定消费位置; Args: offset: int; kafka 消费者中指定的消费位置; Returns: generator; 消费者消息的生成器; """ self.consumer.seek(self.topic_set, offset) for message in self.consumer: # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % ( # message.topic, message.partition, message.offset, message.key, message.value)) yield {"topic": message.topic, "partition": message.partition, "key": message.key, "value": message.value.decode(self._encode)}if __name__ == "__main__": """ 测试使用; """ obj = KConsumer("exist_topic", bootstrap_server=["192.168.74.136:9092"]) for i in obj.recv(): print(i)
该消费者多封装时增加了一个需求,消费的主题不存在的时候会默认创建,下次就可以继续消费
以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;
docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose
部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,但是配置方式,因此只需要将相关的地址配好,即可;代码信息无需修改;
一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;
host 文件127.0.0.1 kafka
当需要远程连接的时候,将地址改成该计算机在内网中的地址即可;
标签:
最新推荐
python使用kafka说明:关于kafka的启动与安装,命令行的使用,此处不做过多的解释,本篇文章主要描述kaf...
1、03为河北省信鸽协会足环全国统一代码,应该是河北省信鸽协会某会员的鸽子,具体是谁的。2、要逐级查...
1、BDM是业务拓展经理。2、BDM是BusinessDevelopmentManager,业务拓展经理的意思,一般BDM要有宏观的战...
云南省首个5吉瓦高效产能异质结电池项目首线投产运行
1、定压功放的音柱都有线间变压器(你要确定你的音柱是有线间变压器的。2、)只要所有音柱的总功率不超...
1、首先,我们写出数字3384和24,仔细观察它们之间的关系。2、我们发现24可以通过公式(3388-4) 141来计算
1、节外生枝,汉语成语,拼音是jiéwàishēngzhī,意思是本不应该生枝的地方生枝,比喻在原有问题之外...
南京养老金一个月多少钱?2022南京退休养老金什么时候调整?下面同社保君来看看。往年南京退休人员养老金...
作为中国首个、深圳创办的国际海上体育赛事品牌,中国杯帆船赛创办于2007年,已成功在深圳举办十三届,...
1、1)排水来体积浮力等于重力mg=Vρg,排水源体积V=1500t (1t m³)=1500m³2)牵引力=
国投资本(600061)(600061):2022年营业收入19 37亿元,与上期同比增加62 58%3月24日,国投资本2022年年报显示,本期营业
1、居民家庭户口简称家庭户,是与集体户相对应的一类住户所登记的户口类型。2、居民家庭户口包括农村居...
服务器的管理停留在xshell登陆管理的时代,主机设备数量少,单人操作的时候还能满足使用。现在的主机数...
伊沃在仁川联效力时的主帅金奉吉于去年12月与延边龙鼎签约,球队将在本赛季征战中甲。金奉吉此前曾执教...
3月24日,中国证监会发布《东方财富证券股份有限公司申请上市证券做市交易业务资格反馈意见》
同壁AI讯,启源软件2023年03月24日发布公告称,西安启源软件股份有限公司进行了股票定向发行,一致行动...
12月31日,河南洛阳。女子爬山时正遇期末考试,于是随地写试卷。拍下这一幕的当事人杜同学称:事先已与...
1、1平方公里等于1500亩。1平方公里=1平方千米=10^6平方米。1亩≈666 67平方米。10^6÷666 67≈1500(...
TMT行情持续火爆,相关产品业绩却下跌,兴银基金怎么了?,重仓股,兴银基金,tmt行情,重要货币市场基金
中国证券网讯(记者陈梦娜)3月24日,“‘新征程、振信心、开新局’—首届租赁业投融资高峰论坛(2023)暨上...
揭示了嫦娥五号样品月帘源区的特点和月球青年火山活动的奥秘。据介绍,月球是人类深空探测的“前哨站”...
《科创板日报》24日讯,全球存储器大厂遭遇财务亏损压力,2023年上半恐仍将难遏止跌价。根据市场统计,2...
1、4G是2014年流行的。2、4G的标准完成在2008年左右,在世界上在2011年左右正式商用4G;2013年12月4
1、目前天津、海南、江西、湖北、云南五省开通移动电话携号转网业务。2、如您号码在此范围内。3、可拨打...
lù。甪,释义:1、常用于名字。见于《康熙字典》。2、镇名,在江苏苏州吴中区东,与昆山市交界处。一称...
3月23日优化生育(三孩)板块较上一交易日下跌0 05%,孩子王领跌。当日上证指数报收于3286 65,上涨0...
俄外交部发言人强调俄方坚持一个中国原则---新华社莫斯科3月23日电(记者华迪)俄罗斯外交部发言人扎哈...
据大湾区之声,第33期《全球金融中心指数》报告今日(23日)发布,香港的总排名维持全球第四位。香港特区...
本文转自:人民网-广西频道3月20日-22日,丹泉酒业2023洞藏封坛·春赏之旅拉开帷幕。一场春季封坛,丹泉...
在家居环境和老年用品方面,重点制定家居产品适老化设计通用标准,并在老视成镜、无障碍洗浴机、轮椅车...
开精密箱子步骤:1、精密一到海角村寻找铁匠,打开精密箱子;2、精密二到西域奇境寻找铁匠,打开精密箱...
上证报中国证券网讯(记者宋薇萍)上海证券报记者3月23日从中国铁路上海局集团有限公司(下简称上海局集...
1、【绵竹吧】活动筹备群QQ群号码100390752绵竹吧-城区(剑南、西南、东北)吧友交流1群【满】QQ群号码6
本周末,2023年新城春季招聘会将在新城凯虹广场举行。正在找工作的不要错过。本次招聘会由市人社局主办...
晋中市五险一金缴纳基数和比例是多少钱2023,下文就随社保君来简单的了解一下吧。晋中市社保缴费介绍:...
1、肆无忌惮的同义词:愈演愈烈、持明火执仗、任性、罗氏接口、独霸他人、鲁莽、大胆、鲁莽、无法无天、...
玩具水晶泥以其独特的外观和手感,以及在各种儿童视频中的高出镜率,吸引了不少儿童消费者。
1、单韵母(6个):ɑ啊、o喔、e鹅、i衣、u乌、v迂。2、复韵母(8个):ai爱、ei诶、ui威、ao袄、ou欧、...
生活中,有的人一到春天就特别容易“感冒”,除了打喷嚏、流鼻涕、鼻塞外,还伴有鼻痒、流泪等症状。其...
万年县气象台2023年03月23日06时16分发布大雾黄色预警信号:预计未来12小时内,陈营镇、大源镇、湖云乡...
1、打开手机微信,点击右边的“我”按钮。2、2、进入之后点击最后一项设置按钮,然后进入设置选项。3、3...
1、地址:厦门市洪莲中路209号公交:瑞景新村站下车(嘉盛豪园的后面)51926363095101520
1、《重案六组4》是由海润影视制作有限公司出品、杨大晗、徐庆东执导,王茜、邢岷山、张潮、郭昊伦、肖...
在接受采访时,曼城中场罗德里谈到了关于西班牙国家队以及曼城的一些情况。罗德里表示,英超联赛就像是...
1、“山回路转不见君,雪上空留马行处”出自唐代诗人岑参的《白雪歌送武判官归京》。2、白雪
1,有人有0203赛季埃因霍温的球员名单吗急需埃因霍温2002-2003赛季一线队名单:门将:沃特鲁斯RonaldWat...
看趋势,做单向,多多观察少交易超短线做期货,首先要看清楚当前的走势和方向,在特定的时间内选择一个...
1、淘宝卖家轻松一键导入第一步:点击“注册”按钮;第二步:根据内容进行填写,并点击“完成”按钮;第...
1 社会保障卡的密码如何修改?(1)社会保障卡的密码分为社会保障应用密码和银行账户应用密码。(2)社...
新华时评:用好法治手段护航网络市场健康发展---题用好法治手段护航网络市场健康发展。随着我国数字经济...
成都抗疫的外籍志愿者:愿为城市“康复”贡献力量
环球关注:钢限 |『经典套路玩不完』:网限HG XVX-016 风灵高达(帕梅特刻痕·第六阶段)预订开始!
世界微头条丨vivo xplay7配置怎么样vivo xplay7参数配置消息
全球微资讯!男子吐槽22元面仅盖碗底!店家退费并将加量20%
天天快消息!全球咖啡巨头竞争再起,星巴克中国首推“沿街取”,拉瓦萨开起艺术展
每日报道:两支国字号PK新西兰:85级国足4次交锋全部战平,新老国脚隔空PK
世界热议:GOG喜加一!免费领取心理恐怖游戏《Lorelai》
要闻:夏普对皮蓬和前妻23年每晚做爱4次反应:这是皮蓬饱受背伤的原因
焦点讯息:明天A股怎么走?我做了一个大胆的预判。具体你请看我的手绘预测图。
【全球独家】李梦事件进展:曝和张隆已结婚戴婚戒图曝光 媒体人称法律上没事
全球最新:吉克曲布个人资料简介_吉克曲布与吉克隽逸是什么关系
全球观焦点:干咳嗽吃什么好的快尤其是饭后_干咳嗽吃什么好的快
环球新消息丨伟大的1-0!亚洲第10夺冠了!剑指2026世界杯,国足出线更难了!
【全球速看料】吉水县气象台发布雷电黄色预警信号【III级/较重】【2023-03-19】
精彩看点:每天踮脚,竟有4个意想不到的好处!不花钱也能给内脏做“按摩”,会养生的人都在悄悄做
全球微速讯:广东东莞市寮步镇邮编_广东省东莞市寮步镇邮编是多少
【环球速看料】广州一居民楼起火15岁少年身亡,消火栓是否有水,业主希望广州消防局说明真像
【世界时快讯】3月17日汇市晚评:欧美市场动荡下 日元再度受宠
全球简讯:桂发祥(002820)3月17日主力资金净卖出455.13万元
天天微动态丨事关办酒席!四川对领导干部提出“八严禁”,退休3年内都得执行
全球看热讯:碑林区:“云上招商”激发活力 “亿元楼宇”潜力无限
全球热门:a1566是ipad几代cpu_a1566是ipad第几代
Copyright © 2015-2022 纵横律师网版权所有 备案号:浙ICP备2022016517号-12 联系邮箱:51 46 76 11 3 @qq.com