一、项目背景

监控 filebeat 收集上报到的数据条数,是否与原始日志一致。

实现:消费 kafka topic 5分钟前3分钟内的数据,并取样一分钟的数据用于比对源数据条数和 kafka 数据是否一致

本文示例运行环境:

  1. 操作系统:CentOS Linux release 7
  2. Python 版本:Python 3.6.10
  3. pip 版本:21.3
  4. Kafka server 版本: 2.2.0

二、安装 confluent_kafka

建议先升级 pip 到最新版,否则可能需要安装 kafka 依赖库。

pip install -U pip
pip install 'confluent-kafka==1.7.0'

三、示例代码

#!/usr/bin/env python
import json
import time
from datetime import datetime, timedelta
from confluent_kafka import Consumer, TopicPartition, KafkaException


conf = {
    'bootstrap.servers': '192.168.1.150:9092,192.168.1.148:9092,192.168.1.149:9092',
    'group.id': 'mygroup1',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False
}

now = datetime.now() 
start_time = (now - timedelta(minutes=6)).replace(second=0,microsecond=0)
sample_time = start_time + timedelta(minutes=1)
end_time = start_time + timedelta(minutes=3)


print('当前时间 %s 消费kafka时间段: %s - %s ,对比日志时间段: %s' 
          % (now.strftime('%Y-%m-%d %H:%M:%S'), 
            start_time.strftime('%Y-%m-%d %H:%M:%S'),
            end_time.strftime('%Y-%m-%d %H:%M:%S'), 
            sample_time.strftime('%Y-%m-%d %H:%M:%S')))

topic = 'my_topic'
consumer = Consumer(conf)


c = consumer

tmp = c.list_topics(topic=topic).topics[topic].partitions

start_topic_partitions_to_search = list(
    map(lambda p: TopicPartition(topic, p, int(start_time.timestamp()*1000)), range(len(tmp))))
start_offset = c.offsets_for_times(start_topic_partitions_to_search)

end_topic_partitions_to_search = list(
    map(lambda p: TopicPartition(topic, p, int(end_time.timestamp()*1000)), range(len(tmp))))
end_offset = c.offsets_for_times(end_topic_partitions_to_search)


def read_kafka():
    f = open('msg.txt', 'w')
    #
    for p in start_offset:
        c.assign([p])
        while True:
            msg = c.poll(1.0)
            if not msg:
                break
            if msg.error():
                raise KafkaException(msg.error())
            else:
                offset = msg.offset()
                if offset < end_offset[msg.partition()].offset:
                    f.write(msg.value().decode() + '\n')
                else:
                    c.unassign()
                    break

    c.close()
    f.close()

read_kafka()