一、项目背景
监控 filebeat 收集上报到的数据条数,是否与原始日志一致。
实现:消费 kafka topic 5分钟前3分钟内的数据,并取样一分钟的数据用于比对源数据条数和 kafka 数据是否一致
本文示例运行环境:
- 操作系统:CentOS Linux release 7
- Python 版本:Python 3.6.10
- pip 版本:21.3
- Kafka server 版本: 2.2.0
二、安装 confluent_kafka
建议先升级 pip 到最新版,否则可能需要安装 kafka 依赖库。
pip install -U pip
pip install 'confluent-kafka==1.7.0'
三、示例代码
#!/usr/bin/env python
import json
import time
from datetime import datetime, timedelta
from confluent_kafka import Consumer, TopicPartition, KafkaException
conf = {
'bootstrap.servers': '192.168.1.150:9092,192.168.1.148:9092,192.168.1.149:9092',
'group.id': 'mygroup1',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False
}
now = datetime.now()
start_time = (now - timedelta(minutes=6)).replace(second=0,microsecond=0)
sample_time = start_time + timedelta(minutes=1)
end_time = start_time + timedelta(minutes=3)
print('当前时间 %s 消费kafka时间段: %s - %s ,对比日志时间段: %s'
% (now.strftime('%Y-%m-%d %H:%M:%S'),
start_time.strftime('%Y-%m-%d %H:%M:%S'),
end_time.strftime('%Y-%m-%d %H:%M:%S'),
sample_time.strftime('%Y-%m-%d %H:%M:%S')))
topic = 'my_topic'
consumer = Consumer(conf)
c = consumer
tmp = c.list_topics(topic=topic).topics[topic].partitions
start_topic_partitions_to_search = list(
map(lambda p: TopicPartition(topic, p, int(start_time.timestamp()*1000)), range(len(tmp))))
start_offset = c.offsets_for_times(start_topic_partitions_to_search)
end_topic_partitions_to_search = list(
map(lambda p: TopicPartition(topic, p, int(end_time.timestamp()*1000)), range(len(tmp))))
end_offset = c.offsets_for_times(end_topic_partitions_to_search)
def read_kafka():
f = open('msg.txt', 'w')
#
for p in start_offset:
c.assign([p])
while True:
msg = c.poll(1.0)
if not msg:
break
if msg.error():
raise KafkaException(msg.error())
else:
offset = msg.offset()
if offset < end_offset[msg.partition()].offset:
f.write(msg.value().decode() + '\n')
else:
c.unassign()
break
c.close()
f.close()
read_kafka()
张贴您的评论