Logstashparsing包含多个日志条目的xml文档

我目前正在评估logstash和elasticsearch是否对我们的用例有用。 我有一个包含多个表单的日志文件

<root> <entry> <fieldx>...</fieldx> <fieldy>...</fieldy> <fieldz>...</fieldz> ... <fieldarray> <fielda>...</fielda> <fielda>...</fielda> ... </fieldarray> </entry> <entry> ... </entry> ... <root> 

每个entry元素将包含一个日志事件。 (如果您有兴趣,该文件实际上是Tempo Timesheets(Atlassian JIRA插件)工作日志导出。)

是否有可能将这样的文件转换成多个日志事件,而无需编写我自己的编解码器?

    好的,我find了一个适合我的解决scheme。 这个解决scheme的最大问题是XML插件不太稳定,但是文档不完整,有缺陷,或者logging不准确和错误。

    TLDR

    Bash命令行:

     gzcat -d file.xml.gz | tr -d "\n\r" | xmllint --format - | logstash -f logstash-csv.conf 

    Logstashconfiguration:

     input { stdin {} } filter { # add all lines that have more indentation than double-space to the previous line multiline { pattern => "^\s\s(\s\s|\<\/entry\>)" what => previous } # multiline filter adds the tag "multiline" only to lines spanning multiple lines # We _only_ want those here. if "multiline" in [tags] { # Add the encoding line here. Could in theory extract this from the # first line with a clever filter. Not worth the effort at the moment. mutate { replace => ["message",'<?xml version="1.0" encoding="UTF-8" ?>%{message}'] } # This filter exports the hierarchy into the field "entry". This will # create a very deep structure that elasticsearch does not really like. # Which is why I used add_field to flatten it. xml { target => entry source => message add_field => { fieldx => "%{[entry][fieldx]}" fieldy => "%{[entry][fieldy]}" fieldz => "%{[entry][fieldz]}" # With deeper nested fields, the xml converter actually creates # an array containing hashes, which is why you need the [0] # -- took me ages to find out. fielda => "%{[entry][fieldarray][0][fielda]}" fieldb => "%{[entry][fieldarray][0][fieldb]}" fieldc => "%{[entry][fieldarray][0][fieldc]}" } } # Remove the intermediate fields before output. "message" contains the # original message (XML). You may or may-not want to keep that. mutate { remove_field => ["message"] remove_field => ["entry"] } } } output { ... } 

    详细

    我的解决scheme的工作原理至less直到entry级,我的XMLinput是非常一致的,因此可以通过某种模式匹配来处理。

    由于导出基本上是一个非常长的XML行,并且logstash xml插件本质上只适用于包含XML数据的字段(读取:行),所以我不得不将数据更改为更有用的格式。

    shell:准备文件

    • gzcat -d file.xml.gz | :太多的数据 – 显然你可以跳过
    • tr -d "\n\r" | :删除XML元素内的换行符:某些元素可以包含换行符作为字符数据。 下一步需要删除或以某种方式进行编码。 即使它假定在这一点上,你所有的XML代码在一个巨大的线,这是没有关系,如果这个命令删除任何元素之间的空白

    • xmllint --format - | :用xmllint格式化XML(附带libxml)

      这里XML( <root><entry><fieldx>...</fieldx></entry></root> )的一个巨大的意大利细面条线格式正确:

       <root> <entry> <fieldx>...</fieldx> <fieldy>...</fieldy> <fieldz>...</fieldz> <fieldarray> <fielda>...</fielda> <fieldb>...</fieldb> ... </fieldarray> </entry> <entry> ... </entry> ... </root> 

    Logstash

     logstash -f logstash-csv.conf 

    (请参阅TL; DR部分中的.conf文件的完整内容。)

    在这里, multilinefilter的伎俩。 它可以将多行代码合并成一个日志消息。 这就是为什么用xmllint格式化是必要的:

     filter { # add all lines that have more indentation than double-space to the previous line multiline { pattern => "^\s\s(\s\s|\<\/entry\>)" what => previous } } 

    这基本上说,每一行缩进超过两个空格(或者是</entry> / xmllint默认使用两个空格缩进)属于上一行。 这也意味着字符数据不能包含换行符(在shell中用tr去掉),并且xml必须被标准化(xmllint)

    我也有类似的情况。 parsing这个XML:

     <ROOT number="34"> <EVENTLIST> <EVENT name="hey"/> <EVENT name="you"/> </EVENTLIST> </ROOT> 

    我使用这个configurationlogstash:

     input { file { path => "/path/events.xml" start_position => "beginning" sincedb_path => "/dev/null" codec => multiline { pattern => "<ROOT" negate => "true" what => "previous" auto_flush_interval => 1 } } } filter { xml { source => "message" target => "xml_content" } split { field => "xml_content[EVENTLIST]" } split { field => "xml_content[EVENTLIST][EVENT]" } mutate { add_field => { "number" => "%{xml_content[number]}" } add_field => { "name" => "%{xml_content[EVENTLIST][EVENT][name]}" } remove_field => ['xml_content', 'message', 'path'] } } output { stdout { codec => rubydebug } } 

    我希望这可以帮助别人。 我需要很长时间才能得到它。