我有以下pipe道:
nginx -> unix_socket -> rsyslog -> omkafka module -> kafka
对于omkafka我使用以下configuration:
module( load="impstats" interval="10" # how often to generate stats resetCounters="on" # to get deltas (eg # of messages submitted in the last 10 seconds) log.file="/var/log/impstats" # file to write those stats to log.syslog="off" # don't send stats through the normal processing pipeline. More on that in a bit ) #### LOAD MODULES #### module(load="omkafka") #### DEFINE GLOBALS #### $MaxMessageSize 64k $EscapeControlCharactersOnReceive off #### TEMPLATES #### $template ngFormat, "%msg:4:$%" input(type="imuxsock" Socket="/spool/syslog" Ruleset="outwriter") ruleset(name="outwriter"){ action( type="omkafka" broker=["kafka666:9092"] topic="nginx_logs" partitions.auto="on" template="cerberFormat" queue.type="linkedlist" queue.dequeueBatchSize="10000" # numbers of messages to be parsed from queue queue.highWatermark="450000" # max no. of events to hold in memory queue.lowWatermark="250000" # use memory queue again, when it's back to this level queue.spoolDirectory="/spool/logs" # where to write on disk queue.fileName="rsyslog_queue" queue.maxDiskSpace="100g" # it will stop at this much disk space queue.size="500000" # or this many messages queue.saveOnShutdown="on" # save memory queue contents to disk when rsyslog is exiting ) } main_queue( queue.type="linkedlist" queue.dequeueBatchSize="10000" # numbers of messages to be parsed from queue queue.highWatermark="450000" # max no. of events to hold in memory queue.lowWatermark="250000" # use memory queue again, when it's back to this level queue.spoolDirectory="/spool/logs" # where to write on disk queue.fileName="rsyslog_main_queue" queue.maxDiskSpace="100g" # it will stop at this much disk space queue.size="500000" # or this many messages queue.saveOnShutdown="on" # save memory queue contents to disk when rsyslog is exiting )
我想,如果kafka代理不可达,所有的omkafka消息都应该放入指定的DA队列中。 但是当我用impstats观察计数器时,DA队列总是空的,omkafka使用自己的输出队列。
它看起来像下面:
Tue Oct 4 13:02:09 2016: global: origin=dynstats Tue Oct 4 13:02:09 2016: imuxsock: origin=imuxsock submitted=13060 ratelimit.discarded=0 ratelimit.numratelimiters=0 Tue Oct 4 13:02:09 2016: **omkafka**: submitted=0 **maxoutqsize=100000** failures=0 topicdynacache.skipped=0 topicdynacache.miss=0 topicdynacache.evicted=0 Tue Oct 4 13:02:09 2016: action 0: origin=core.action processed=13060 failed=13060 suspended=0 suspended.duration=300 resumed=0 Tue Oct 4 13:02:09 2016: action 1: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 3: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 4: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 5: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 6: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 7: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 8: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 9: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 10: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: action 11: origin=core.action processed=0 failed=0 suspended=0 suspended.duration=0 resumed=0 Tue Oct 4 13:02:09 2016: resource-usage: origin=impstats utime=24242276 stime=15882703 maxrss=125316 minflt=95642 majflt=0 inblock=0 oublock=632 nvcsw=1067580 nivcsw=513 Tue Oct 4 13:02:09 2016: **main Q[DA]:** origin=core.queue size=0 enqueued=0 full=0 discarded.full=0 discarded.nf=0 **maxqsize=0** Tue Oct 4 13:02:09 2016: main Q: origin=core.queue size=0 enqueued=13060 full=0 discarded.full=0 discarded.nf=0 maxqsize=18
这是我的configuration错误或omkafka没有可靠的队列?
谢谢!
您应该将此configuration添加到您的action块:
name="kafkaoutput" action.resumeretrycount="-1"
当队列大小> 450000时,它将写入DA队列
我碰巧find这个 :
queue.buffering.max.messages = 100000
看来消息应该已经被消耗了。