一个mysql查询的logstash多行日志

我期待从mysql-proxy lua的脚本推送日志到lostash。 一个例子可能是日志

[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends -- params: SELECT date, SUM(t.rev) revenue, SUM(t.rev - t.cost) profit FROM am.s_d t INNER JOIN am.event e ON t.`event_id` = e.`event_id` WHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08' AND t.source_id = 25 GROUP BY date [2015-03-09 11:17:28] USER:mzupan IP:10.102.22.216:49843 DB: Query: show databases 

新的日志条目总是以[

所以我使用logstash-forwarder和处理类似的东西运输日志

 filter { if [type] == "mysql-proxy" { grok { match => { "message" => "\[%{TIMESTAMP_ISO8601}\] USER:%{WORD:user} IP:%{IP:ip}:%{INT} DB:%{DATA:db} Query: (?<query>(.|\r|\n)*)" } } multiline { pattern => "^\[" what => "previous" negate=> true } date { match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ] } } } 

我的问题是在kibana我看到像下面的json的查询

 { "_index": "logstash-2015.03.09", "_type": "mysql-proxy", "_id": "AUv_vj3u0BuDzneUoKKc", "_score": null, "_source": { "message": "[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends \n-- params:\n\nSELECT \n date,\n SUM(t.rev) revenue,\n SUM(t.rev - t.cost) profit \nFROM\n am.s_d t\n INNER JOIN am.event e \n ON t.`event_id` = e.`event_id`\nWHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08'\n AND t.source_id = 25\nGROUP BY date", "@version": "1", "@timestamp": "2015-03-09T18:13:52.287Z", "type": "mysql-proxy", "file": "/var/log/mysql-queries.log", "host": "an01.domain.com", "offset": [ "11855847", "11855943", "11855954", "11855955", "11855963", "11855971", "11855993", "11856023", "11856028", "11856039", "11856064", "11856099", "11856156", "11856179", "11856193", "11856194" ], "user": "username", "ip": "10.102.51.134", "db": "dbname", "query": "-- One Pager Trends ", "tags": [ "_grokparsefailure", "multiline" ] }, "fields": { "@timestamp": [ 1425924832287 ] }, "sort": [ 1425924832287 ] } 

即使logstash似乎正确设置了消息,我也只能看到第一部分。

多线在你的filter应放置在比赛部分之前。 尝试像这样configuration它:

filter{
  如果[type] ==“mysql-proxy”{
    多行{
       pattern =>“^ \ [”
      什么=>“前一个”
      否定=>真实
     }
     grok {
      用户:%{WORD:用户} IP:%{IP:IP}:%{INT} DB:%{DATA:db}查询:( ?(。| \ r | \ n)*)“}
     }
    date{
       match => [“timestamp”,“yyyy-MM-dd HH:mm:ss”]
     }
   }

这对logstash v1.4.2适用。