如何使用logstashparsingaudit.log

我想用logstash来收集日志文件,文件的格式是这样的:

type=USER_START msg=audit(1404170401.294:157): user pid=29228 uid=0 auid=0 ses=7972 subj=system_u:system_r:crond_t:s0-s0:c0.c1023 msg='op=PAM:session_open acct="root" exe="/usr/sbin/crond" hostname=? addr=? terminal=cron res=success' 

我应该用哪个filter来匹配这条线? 或者有另一种方式来处理它。

任何帮助,将不胜感激。


使用下面的模式来匹配grokdebugging器的行,但仍然得到一个No matches消息。

 type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message} 

但是,当我删除subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message} ,它成功,并得到这样的JSON对象。

 { "audit_type": [ [ "USER_END" ] ], "audit_epoch": [ [ "1404175981.491" ] ], "BASE10NUM": [ [ "1404175981.491", "524", "1465", "0", "0" ] ], "audit_counter": [ [ "524" ] ], "audit_pid": [ [ "1465" ] ], "audit_uid": [ [ "0" ] ], "audit_audid": [ [ "0" ] ] } 

不知道为什么subjmsg不能工作。

快速search在github上find这个

 AUDIT type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message} AUDITLOGIN type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): login pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} old auid=%{NUMBER:old_auid} new auid=%{NUMBER:new_auid} old ses=%{NUMBER:old_ses} new ses=%{NUMBER:new_ses} 

粗略的审查表明,这可能是你在找什么。

审计日志被写成一系列的key = value对,这些对可以很容易的使用kvfilter来提取。 但是我注意到,密钥msg有时被使用了两次,也是一系列的密钥=值对。

第一个grok被用来获取字段audit_typeaudit_epochaudit_countersub_msg (第二个msg字段)

 grok { pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ] named_captures_only => true } 

kv用于提取除msg和type之外的所有key = value对,因为我们已经使用grok获取了该数据:

 kv { exclude_keys => [ "msg", "type" ] } 

kv再次用于parsingsub_msg中的键=值对(如果存在):

 kv { source => "sub_msg" } 

date用于将date设置为audit_epoch中的值,使用date格式UNIX将parsing浮点或整数时间戳:

 date { match => [ "audit_epoch", "UNIX" ] } 

最后使用mutate删除冗余字段:

 mutate { remove_field => ['sub_msg', 'audit_epoch'] } 

你也可以重命名像sysadmin1138build议的字段:

 mutate { rename => [ "auid", "uid_audit", "fsuid", "uid_fs", "suid", "uid_set", "ses", "session_id" ] remove_field => ['sub_msg', 'audit_epoch'] } 

所有组合的filter看起来像这样:

 filter { grok { pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ] named_captures_only => true } kv { exclude_keys => [ "msg", "type" ] } kv { source => "sub_msg" } date { match => [ "audit_epoch", "UNIX" ] } mutate { rename => [ "auid", "uid_audit", "fsuid", "uid_fs", "suid", "uid_set", "ses", "session_id" ] remove_field => ['sub_msg', 'audit_epoch'] } } 

比grok更好的解决scheme可能是使用kv滤波器。 这将parsing以“key = value”格式configuration的字段,这是大多数审计日志所属的字段。 与Grok不同的是,这将处理string,有时甚至有时不字段。 但是,这些字段名称的用处不大,因此您可能需要进行字段重命名。

 filter { kv { } } 

这将使您获得最多的信息,并且这些字段将与日志中显示的内容相匹配。 所有的数据types都是string 。 为了使田野人性化,

 filter { kv { } mutate { rename => { "type" => "audit_type" "auid" => "uid_audit" "fsuid => "uid_fs" "suid" => "uid_set" "ses" => "session_id" } } } 

但是,包含时间戳和事件ID的msg字段仍然需要进行修改。 其他答案显示如何做到这一点。

 filter { kv { } grok { match => { "msg" => "audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):" } mutate { rename => { "type" => "audit_type" "auid" => "uid_audit" "fsuid => "uid_fs" "suid" => "uid_set" "ses" => "session_id" } } } 

grok的格式已经改变了,所以看看这个:

 filter { grok { # example: type=CRED_DISP msg=audit(1431084081.914:298): pid=1807 uid=0 auid=1000 ses=7 msg='op=PAM:setcred acct="user1" exe="/usr/sbin/sshd" hostname=host1 addr=192.168.160.1 terminal=ssh res=success' match => { "message" => "type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} ses=%{NUMBER:ses} msg=\'op=%{WORD:operation}:%{WORD:detail_operation} acct=\"%{WORD:acct_user}\" exe=\"%{GREEDYDATA:exec}\" hostname=%{GREEDYDATA:hostname} addr=%{GREEDYDATA:ipaddr} terminal=%{WORD:terminal} res=%{WORD:result}\'" } } date { match => [ "audit_epoch", "UNIX_MS" ] } } 

这将使用来自audit_epoch的date作为@datetime。