什么是Kafka?
kafka是分布式流处理平台,也是一种消息传递的中间件,更适用大量数据的消息传递以及异步消息处理,由消息生产者、消息消费者、broker消息节点和topic消息类型组合成消息传递过程,通过生产者基于某一种topic消息类型发布消息基于broker存储消息,消费者再根据对应的topic进行消息的消费。
kafka核心元素
broker:kafka的节点或实例
topic:消息的类型
Producer:消息的生产者,向Topic中发送消息。
Consumer:消息的消费者,从Topic中消费消息。
zookeeper/kraft:集群协调的角色(topic、broker、权限等)
kafka特性
业务数据解耦,实现异步消息处理
相对于其他消息队列,性能更强
可以灵活调配实例,满足各体量的数据传输
数据持久化,占用大量的磁盘IO
kafka特性:每一次进行消息的拉取或订阅 拉取完都会清空对应的数据,在kafka中数据都是一次性的
日常项目中,用到kafka的场景
底层服务间的消息传输(异步消息处理、轻量化)
解耦,一般来说可以用kafka替换http协议实现解耦
日志聚合或处理
如何避免kafka的消息积压
方案一:控制消息生产者服务的消息生产速率,比方说接入api网关进行接口的流量控制,触发阈值时可拒绝多余的请求或阻塞等待。
方案二:提高消费者消费速率,开发可以分析消费者可优化的业务逻辑等。
方案三:前置完成消息传递的性能测试(压力测试,了解单实例消费的瓶颈),可以了解消息生产者、消息消费者单实例的消息生产、消息消费速率,等到线上环境部署时评估线上可能存在的消息体量,再部署时进行精准的实例分配。
kafka在某些高并发场景有可能会遇到消息积压的情况,比如说用户量突然剧增,消息的消费速度跟不上消息的生成速度。
解决方案:
增加消费者,扩容消费者的实例提高整体的消费速率,直至不存在消息积压。
注:消息速率慢的原因也有可能是broker节点数据处理速率没有跟上
kafka生产者demo:
from flask import Flask, request
from confluent_kafka import Producer
app = Flask(__name__)
kafka_conf = {
'bootstrap.servers': 'localhost:9092', # 更改为你的 Kafka broker 地址
'queue.buffering.max.messages': 500000,
'queue.buffering.max.ms': 60000
}
producer = Producer(kafka_conf)
@app.route('/push', methods=['POST'])
def push():
try:
data = request.json
num = data['num'] # 广告发布数据量
topic = data['topic'] # 广告类型
gg_data = data['data']
except Exception as e:
print(e)
return {'msg': 'params error'}, 403
for i in range(num):
producer.produce(topic, gg_data)
producer.flush()
return {'msg': 'push success'}, 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=899, debug=True)kafka消费者demo:
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import json
import logging
from typing import Dict, Any
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wps_file_consumer')
class WPSFileConsumer:
def __init__(self, config: Dict[str, Any]):
"""
初始化Kafka消费者
:param config: Kafka消费者配置
"""
self.consumer = Consumer(config)
self.running = False
def subscribe_to_topic(self, topic: str = 'wps.file'):
"""
订阅指定的topic
:param topic: 要订阅的topic名称,默认为'wps.file'
"""
self.consumer.subscribe([topic])
logger.info(f"成功订阅topic: {topic}")
def consume_messages(self):
"""
开始消费消息
"""
self.running = True
try:
while self.running:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
self.handle_error(msg.error())
continue
try:
# 假设消息是JSON格式
message_value = msg.value().decode('utf-8')
self.process_message(message_value, msg)
except json.JSONDecodeError:
logger.warning(f"无法解析JSON消息: {msg.value()}")
except Exception as e:
logger.error(f"处理消息时出错: {str(e)}", exc_info=True)
except KeyboardInterrupt:
logger.info("接收到中断信号,准备关闭消费者...")
finally:
self.shutdown()
def process_message(self, message, msg):
"""
处理接收到的消息
:param message: 解析后的消息内容
:param msg: 原始kafka消息对象
"""
# 这里实现你的业务逻辑
logger.info(f"收到消息 [分区 {msg.partition()} | 偏移量 {msg.offset()}]: {message}")
# 示例处理逻辑 - 根据实际需求修改
offer_date = message
logger.info(f"处理文件操作: 广告内容 {offer_date}")
def handle_error(self, error):
"""
处理消费错误
:param error: 错误对象
"""
if error.code() == KafkaError._PARTITION_EOF:
# 分区已到达末尾
logger.info("已到达分区末尾")
else:
logger.error(f"消费错误: {error.str()}")
if error.fatal():
self.running = False
def shutdown(self):
"""
关闭消费者
"""
logger.info("正在关闭消费者...")
self.running = False
self.consumer.close()
logger.info("消费者已关闭")
if __name__ == '__main__':
# Kafka消费者配置
consumer_config = {
'bootstrap.servers': 'localhost:9092', # Kafka服务器地址
'group.id': 'wps.file.consumer.group', # 消费者组ID
'auto.offset.reset': 'earliest', # 从最早的消息开始消费
'enable.auto.commit': False, # 手动提交偏移量
'max.poll.interval.ms': 300000, # 最大poll间隔
'session.timeout.ms': 10000 # 会话超时时间
}
# 创建并启动消费者
try:
consumer = WPSFileConsumer(consumer_config)
consumer.subscribe_to_topic('usa.offer')
logger.info("启动WPS文件消费者服务...")
consumer.consume_messages()
except Exception as e:
logger.error(f"消费者服务启动失败: {str(e)}", exc_info=True)
sys.exit(1)
kafka的测试点
功能点:
验证topic的正确性
验证生产者服务消息发布的过程
验证消费者服务消息消费的过程,验证所消费数据的外在表现
对唯一性数据进行重复消费
testcase:模拟单一业务数据的消息生产和消息消费过程,期望确保消息能被下游服务消费
正向的用例demo:
用例名:XX业务数据消息消费
前置步骤:请求生产者服务的对外接口,构建消息数据A
操作步骤:请求消费者服务的消费接口/查询消费者消费后存储的数据源
期望值:消费者服务的相关查询结果查询消费情况,找到结果关键字是否 == 数据A(数据对齐、数据格式和设计文档协议一致)
注:消费者结果验证方式可能通过服务日志、也可能通过服务下游逻辑
逆向的用例场景:在没有发布消息的情况下,请求消费者服务的消费接口,有没有进行正常的处理,如果消费者对外接口出现loading情况,说明未对该场景进行处理。
性能点:
模拟线上的消息积压的场景,分析当前服务实例的瓶颈值。获取到的结果:
1.消费者的消费的速率,可以满足多少的并发数不会出现消息积压的情况。
2.生产者基于外部接口的消息生产速率。
3.broker是否支撑目前的消息积压情况。
kafka监控平台一般只能提供有哪些topic,每个topic当前消息积压的情况。
监控平台推荐:grafana+普罗米修斯+kafka exporter
什么样的消息会进入死信队列
在Kafka中,死信队列是用于存储处理失败或无法正常消费的消息的特殊主题。以下情况通常会导致消息进入死信队列:
消息处理多次失败:当消费者对某条消息的重试次数达到最大限制后仍无法成功处理
消息格式错误:消息内容不符合预期格式,无法被反序列化或解析
业务逻辑拒绝:消息内容违反业务规则被明确拒绝处理
超时未处理:消息在指定时间内未被成功处理
权限问题:消费者没有足够的权限处理该消息
死信队列消息的常见处理方式
人工审核:检查消息内容,确定失败原因(最常见的解决方案)
修复后重新投递:修正消息内容后重新发送到主主题
报警通知:触发报警机制通知相关人员
日志记录:将错误消息记录到日志系统供后续分析
丢弃处理:对于确实无法处理的消息,可以选择丢弃并记录