什么是Kafka?

kafka是分布式流处理平台,也是一种消息传递的中间件,更适用大量数据的消息传递以及异步消息处理,由消息生产者、消息消费者、broker消息节点和topic消息类型组合成消息传递过程,通过生产者基于某一种topic消息类型发布消息基于broker存储消息,消费者再根据对应的topic进行消息的消费。

kafka核心元素

  • broker:kafka的节点或实例

  • topic:消息的类型

  • Producer:消息的生产者,向Topic中发送消息。

  • Consumer:消息的消费者,从Topic中消费消息。

  • zookeeper/kraft:集群协调的角色(topic、broker、权限等)

kafka特性

  • 业务数据解耦,实现异步消息处理

  • 相对于其他消息队列,性能更强

  • 可以灵活调配实例,满足各体量的数据传输

  • 数据持久化,占用大量的磁盘IO

  • kafka特性:每一次进行消息的拉取或订阅 拉取完都会清空对应的数据,在kafka中数据都是一次性的

日常项目中,用到kafka的场景

  1. 底层服务间的消息传输(异步消息处理、轻量化)

  2. 解耦,一般来说可以用kafka替换http协议实现解耦

  3. 日志聚合或处理

如何避免kafka的消息积压

方案一:控制消息生产者服务的消息生产速率,比方说接入api网关进行接口的流量控制,触发阈值时可拒绝多余的请求或阻塞等待。

方案二:提高消费者消费速率,开发可以分析消费者可优化的业务逻辑等。

方案三:前置完成消息传递的性能测试(压力测试,了解单实例消费的瓶颈),可以了解消息生产者、消息消费者单实例的消息生产、消息消费速率,等到线上环境部署时评估线上可能存在的消息体量,再部署时进行精准的实例分配。

kafka在某些高并发场景有可能会遇到消息积压的情况,比如说用户量突然剧增,消息的消费速度跟不上消息的生成速度。

解决方案:

增加消费者,扩容消费者的实例提高整体的消费速率,直至不存在消息积压。

注:消息速率慢的原因也有可能是broker节点数据处理速率没有跟上

kafka生产者demo:

from flask import Flask, request
from confluent_kafka import Producer

app = Flask(__name__)
kafka_conf = {
    'bootstrap.servers': 'localhost:9092',  # 更改为你的 Kafka broker 地址
    'queue.buffering.max.messages': 500000,
    'queue.buffering.max.ms': 60000
}

producer = Producer(kafka_conf)


@app.route('/push', methods=['POST'])
def push():
    try:
        data = request.json
        num = data['num']  # 广告发布数据量
        topic = data['topic']  # 广告类型
        gg_data = data['data']
    except Exception as e:
        print(e)
        return {'msg': 'params error'}, 403
    for i in range(num):
        producer.produce(topic, gg_data)
        producer.flush()

    return {'msg': 'push success'}, 200


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=899, debug=True)

kafka消费者demo:

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import json
import logging
from typing import Dict, Any

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('wps_file_consumer')


class WPSFileConsumer:
    def __init__(self, config: Dict[str, Any]):
        """
        初始化Kafka消费者

        :param config: Kafka消费者配置
        """
        self.consumer = Consumer(config)
        self.running = False

    def subscribe_to_topic(self, topic: str = 'wps.file'):
        """
        订阅指定的topic

        :param topic: 要订阅的topic名称,默认为'wps.file'
        """
        self.consumer.subscribe([topic])
        logger.info(f"成功订阅topic: {topic}")

    def consume_messages(self):
        """
        开始消费消息
        """
        self.running = True
        try:
            while self.running:
                msg = self.consumer.poll(timeout=1.0)
                if msg is None:
                    continue

                if msg.error():
                    self.handle_error(msg.error())
                    continue

                try:
                    # 假设消息是JSON格式
                    message_value = msg.value().decode('utf-8')
                    self.process_message(message_value, msg)
                except json.JSONDecodeError:
                    logger.warning(f"无法解析JSON消息: {msg.value()}")
                except Exception as e:
                    logger.error(f"处理消息时出错: {str(e)}", exc_info=True)

        except KeyboardInterrupt:
            logger.info("接收到中断信号,准备关闭消费者...")
        finally:
            self.shutdown()

    def process_message(self, message, msg):
        """
        处理接收到的消息

        :param message: 解析后的消息内容
        :param msg: 原始kafka消息对象
        """
        # 这里实现你的业务逻辑
        logger.info(f"收到消息 [分区 {msg.partition()} | 偏移量 {msg.offset()}]: {message}")

        # 示例处理逻辑 - 根据实际需求修改
        offer_date = message

        logger.info(f"处理文件操作: 广告内容 {offer_date}")

    def handle_error(self, error):
        """
        处理消费错误

        :param error: 错误对象
        """
        if error.code() == KafkaError._PARTITION_EOF:
            # 分区已到达末尾
            logger.info("已到达分区末尾")
        else:
            logger.error(f"消费错误: {error.str()}")
            if error.fatal():
                self.running = False

    def shutdown(self):
        """
        关闭消费者
        """
        logger.info("正在关闭消费者...")
        self.running = False
        self.consumer.close()
        logger.info("消费者已关闭")


if __name__ == '__main__':
    # Kafka消费者配置
    consumer_config = {
        'bootstrap.servers': 'localhost:9092',  # Kafka服务器地址
        'group.id': 'wps.file.consumer.group',  # 消费者组ID
        'auto.offset.reset': 'earliest',  # 从最早的消息开始消费
        'enable.auto.commit': False,  # 手动提交偏移量
        'max.poll.interval.ms': 300000,  # 最大poll间隔
        'session.timeout.ms': 10000  # 会话超时时间
    }

    # 创建并启动消费者
    try:
        consumer = WPSFileConsumer(consumer_config)
        consumer.subscribe_to_topic('usa.offer')
        logger.info("启动WPS文件消费者服务...")
        consumer.consume_messages()
    except Exception as e:
        logger.error(f"消费者服务启动失败: {str(e)}", exc_info=True)
        sys.exit(1)

kafka的测试点

功能点:

  • 验证topic的正确性

  • 验证生产者服务消息发布的过程

  • 验证消费者服务消息消费的过程,验证所消费数据的外在表现

  • 对唯一性数据进行重复消费

testcase:模拟单一业务数据的消息生产和消息消费过程,期望确保消息能被下游服务消费

正向的用例demo:

  • 用例名:XX业务数据消息消费

  • 前置步骤:请求生产者服务的对外接口,构建消息数据A

  • 操作步骤:请求消费者服务的消费接口/查询消费者消费后存储的数据源

  • 期望值:消费者服务的相关查询结果查询消费情况,找到结果关键字是否 == 数据A(数据对齐、数据格式和设计文档协议一致)

注:消费者结果验证方式可能通过服务日志、也可能通过服务下游逻辑

逆向的用例场景:在没有发布消息的情况下,请求消费者服务的消费接口,有没有进行正常的处理,如果消费者对外接口出现loading情况,说明未对该场景进行处理。

性能点:

模拟线上的消息积压的场景,分析当前服务实例的瓶颈值。获取到的结果:

1.消费者的消费的速率,可以满足多少的并发数不会出现消息积压的情况。

2.生产者基于外部接口的消息生产速率。

3.broker是否支撑目前的消息积压情况。

kafka监控平台一般只能提供有哪些topic,每个topic当前消息积压的情况。

监控平台推荐:grafana+普罗米修斯+kafka exporter

什么样的消息会进入死信队列

在Kafka中,死信队列是用于存储处理失败或无法正常消费的消息的特殊主题。以下情况通常会导致消息进入死信队列:

  1. 消息处理多次失败:当消费者对某条消息的重试次数达到最大限制后仍无法成功处理

  2. 消息格式错误:消息内容不符合预期格式,无法被反序列化或解析

  3. 业务逻辑拒绝:消息内容违反业务规则被明确拒绝处理

  4. 超时未处理:消息在指定时间内未被成功处理

  5. 权限问题:消费者没有足够的权限处理该消息

死信队列消息的常见处理方式

  1. 人工审核:检查消息内容,确定失败原因(最常见的解决方案)

  2. 修复后重新投递:修正消息内容后重新发送到主主题

  3. 报警通知:触发报警机制通知相关人员

  4. 日志记录:将错误消息记录到日志系统供后续分析

  5. 丢弃处理:对于确实无法处理的消息,可以选择丢弃并记录