中央存储和分析性能数字的方法:
我想实现这样的设置:
就像这里解释的一样: https : //mtalavera.wordpress.com/2015/02/16/monitoring-with-collectd-and-kibana/
约束:
ssh
到远程主机。 ssh
从远程主机到中央服务器不能工作,因为networking设置(我不能改变的东西)。 我怎样才能每小时获取数据?
有了上面列出的问题,您需要在远端缓冲统计信息,以免丢失任何信息。
有很多方法可以做到这一点,但没有一个过于简单,需要进行大量的testing才能确保它们的可行性。 它们都涉及在本地编写collectd
的输出,然后使用一些方法在中央服务器上获取它。
我没有testing下面的任何一个,所以有些可能根本不工作。
没有特别的顺序或复杂性:
套接字/networking输出到脚本
将Collectd
的输出写入一个套接字或IP /端口,PHP / Perl / Python / Bash脚本正在侦听将命令写入文件。
然后这些文件可以被中央服务器推送/拉出并由Logstash摄取。
优点 :简单的脚本来捕获输出; 使用标准的Linux命令
缺点 :如果你拉大量的统计信息,则不可扩展; 需要维护脚本; 不知道LS是否会处理普通协议
Redis / AMQP / Kafka / MongoDB将Collectd
的输出写入其中一个可能的“缓冲区”。 他们每个人的工作方式都有所不同,并且有不同的部署选项,所以我会留给你们弄清楚哪个是最好的,因为这个问题已经超出了范围。 也就是说,他们中的任何一个都应该工作
然后,您需要一种方法将缓冲解决scheme中的数据返回到中央服务器。 应用程序本地复制/镜像/群集或运行每X间隔运行数据(在任一端运行)的脚本有两种可能性。
优点 :非常灵活的部署选项; 应该很好地扩展; 使用众所周知的工具/程序
缺点 :缓冲区编程可能需要大量资源,或者安装了很多软件包
套接字/networking输出到Logstash这与选项1几乎相同,但不是将输出收集到脚本/ progarm,而是写入每个远程主机上的本地Logstash实例。
然后,Logstash将在本地写入CSV / JSON,并且可以使用任何方法将这些文件返回到中央服务器,包括LS本身。
优点 :整套解决scheme的单套工具; 提供了一种在边缘转换数据的方法,然后集中采集; 很less的移动部件缺点 :所有远程主机上都需要Java / LS
除了每个选项的优点和缺点之外,所有这些缺点都是你需要find一种方法来在所有服务器上保持一致的configuration。 如果你有很多远程节点(或者一般情况下只有很多节点),你可能已经有一个configurationpipe理系统,这将是微不足道的。
请使用这个docker-compose
而不是我连接的那个docker-compose
(它确实需要docker
和compose
,也许是machine
但它已经为你做了更多的事情,你将不得不less挣扎。
CELK: https : //github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf
也
因此,从这里开始获得一个工作系统的好概述。 他们已经为你做了一些工作,所以你只需要担心你的问题,这涉及到configuration和部署。
即使你没有最终使用Docker,这仍然会让你走上成功的道路, 并且还有向你展示它如何组合在一起的额外好处。
如果你不知道什么是stream浪者,那真是太好了。 这是一个允许人们共享整套虚拟机和configuration器的程序,以便您可以只定义一个虚拟机及其configuration,而不是共享整个虚拟机,而且“正常工作”。 这感觉很神奇,但它确实只是可靠的系统工作。
你将需要安装Vagrant来使用它。 去做就对了! 然后,您不必安装docker
因为它将在Vagrant虚拟机上运行。
你有四个select你想如何使用它,但首先,让stream浪汉准备与粗体的命令….
vagrant up
您的select是:
还有其他configuration可用,但仅用于testing。
现在是时候configurationLogstash了,这是真正唯一具有复杂行为的部分。
Logstashconfiguration文件是纯文本文件,以conf
结尾,也可以使用tar
或gunzip gz
组合在一起。
您可以通过以下两种方式获取configuration文件:
LOGSTASH_CONFIG_URL
指向你的configuration文件的url,如果你得到的url不对,或者有一些问题,并且它不能从url获得configuration,url或其他 以下是从Internet上使用configuration运行时的样子:
$ docker run -d \ -e LOGSTASH_CONFIG_URL=https://secretlogstashesstash.com/myconfig.tar.gz \ -p 9292:9292 \ -p 9200:9200 \ pblittle/docker-logstash
docker
提醒你 :
默认的logstash.conf只监听stdin和文件input。 如果您希望configurationtcp和/或udpinput,请使用您自己的logstashconfiguration文件并自行公开端口。 有关configuration语法和更多信息,请参阅logstash文档。
回想一下,当你没有在所需的环境variablesLOGSTASH_CONFIG_URL
中input正确的URL时,
// As the author warned, this is all you get. StdIn and Syslog file inputs. input { stdin { type => "stdin-type" } file { type => "syslog" path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] } file { type => "logstash" path => [ "/var/log/logstash/logstash.log" ] start_position => "beginning" } }
在网站上阅读更多关于logstash
的信息。
现在, logstash
已经有插件将数据推送到input
。 插件完全按照您的预期变化; 这里有几个:
s3
从亚马逊(文件系统事件) stdin
从logstash
(默认,读取stdin
缓冲区) http
从logstash
(你的猜测) UDP
是一种无连接且快速的协议,在L4
(传输)的底部运行,支持多重传输,处理故障,通常是logging数据传输的一个很好的select。
你select你想要的端口; 其他select取决于你在做什么。
TCP以相同的方式工作。
udp {port => 9999 codec => json buffer_size => 1452}
collectd
UDP套接字被过滤和输出 This is stolen from https://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf input { udp { port => 25826 # 25826 matches port specified in collectd.conf buffer_size => 1452 # 1452 is the default buffer size for Collectd codec => collectd { } # specific Collectd codec to invoke type => collectd } } output { elasticsearch { host => elasticsearch cluster => logstash protocol => http } }
而过滤就是一个很好的例子,那就是,它真的很长,我认为它是有用的
filter { # TEST implementation of parse for collectd if [type] == "collectd" { if [plugin] { mutate { rename => { "plugin" => "collectd_plugin" } } } if [plugin_instance] { mutate { rename => { "plugin_instance" => "collectd_plugin_instance" } } } if [type_instance] { mutate { rename => { "type_instance" => "collectd_type_instance" } } } if [value] { mutate { rename => { "value" => "collectd_value" } } mutate { convert => { "collectd_value" => "float" } } } if [collectd_plugin] == "interface" { mutate { add_field => { "collectd_value_instance" => "rx" "collectd_value" => "%{rx}" } } mutate { convert => { "tx" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "tx" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'tx' event['collectd_value_instance'] = 'tx' event['collectd_value'] = event['tx'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "rx", "tx" ] } } if [collectd_plugin] == "disk" { mutate { add_field => { "collectd_value_instance" => "read" "collectd_value" => "%{read}" } } mutate { convert => { "write" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "write" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'write' event['collectd_value_instance'] = 'write' event['collectd_value'] = event['write'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "read", "write" ] } } if [collectd_plugin] == "df" { mutate { add_field => { "collectd_value_instance" => "free" "collectd_value" => "%{free}" } } mutate { convert => { "used" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "used" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'used' event['collectd_value_instance'] = 'used' event['collectd_value'] = event['used'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "used", "free" ] } } if [collectd_plugin] == "load" { mutate { add_field => { "collectd_value_instance" => "shortterm" "collectd_value" => "%{shortterm}" } } mutate { convert => { "longterm" => "float" "midterm" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "longterm", "midterm" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] != 'collectd' event['collectd_value_instance'] = event['type'] event['collectd_value'] = event[event['type']] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "longterm", "midterm", "shortterm" ] } } } }
像任何好的软件一样收集某些丑陋或难以处理的方面,并试图使它容易,因为它看起来像你发送数据(在这种情况下是一个元组),而不是与序列化相混淆。
你的例子:
(date_time, current_cpu_load), for example ('2016-0-04-24 11:09:12', 12.3)
我不会花我的时间来弄清楚你是如何形成的。 如果你能够使用CPU插件获得这些数据,那就太好了。 我要复制并粘贴我在网上find的一个,以方便我。
这就是说,想一想…只是一点点,它不会伤害。
你看到CPU插件下面加载。
您看到conf
文件中collectd
的界面太小而无法指定字段。
所以如果你只是这样做,它会工作,但你会得到比CPU负载更多的数据。
这就是你可以使用filter的地方。 但是我想你也可以在Kibana中做到这一点。 所以我宁愿不浪费时间写一个filter,你不需要和b)如果你花了一些时间,可以很容易地写。
## In `collectd`: # For each instance where collectd is running, we define # hostname proper to that instance. When metrics from # multiple instances are aggregated, hostname will tell # us were they came from. Hostname "**YOUR_HOSTNAME**" # Fully qualified domain name, false for our little lab FQDNLookup false # Plugins we are going to use with their configurations, # if needed LoadPlugin cpu LoadPlugin df <Plugin df> Device "/dev/sda1" MountPoint "/" FSType "ext4" ReportReserved "true" </Plugin> LoadPlugin interface <Plugin interface> Interface "eth0" IgnoreSelected false </Plugin> LoadPlugin network <Plugin network> Server "**YOUR.HOST.IP.ADDR**" "**PORTNUMBER**" </Plugin> LoadPlugin memory LoadPlugin syslog <Plugin syslog> LogLevel info </Plugin> LoadPlugin swap <Include "/etc/collectd/collectd.conf.d"> Filter ".conf" </Include>
input { udp { port => **PORTNUMBER** # 25826 matches port specified in collectd.conf buffer_size => **1452** **# 1452 is the default buffer size for Collectd** codec => collectd { } # specific Collectd codec to invoke type => collectd } } output { elasticsearch { cluster => **ELASTICSEARCH_CLUSTER_NAME** # this matches out elasticsearch cluster.name protocol => http } }
在Logstash 1.3.x中,我们引入了collectdinput插件。 太棒了! 我们可以在Logstash中处理度量标准,将它们存储在Elasticsearch中,并用Kibana查看它们。 唯一的缺点是你只能通过插件每秒获得3100个事件。 使用Logstash 1.4.0,我们引入了一个新改进的UDPinput插件,它是multithreading的,并有一个队列。 我重构了collectdinput插件作为编解码器(来自我的同事和社区的一些帮助),以利用这个巨大的性能提升。 现在我的双核心MacBook Air上只有3个线程,我可以通过collectd编解码器每秒获得超过45,000个事件!
所以,我想提供一些可以用来改变你的插件configuration来代替使用编解码器的简单例子。
旧的方式:
input { collectd {} }
新的方法:
input { udp { port => 25826 # Must be specified. 25826 is the default for collectd buffer_size => 1452 # Should be specified. 1452 is the default for recent versions of collectd codec => collectd { } # This will invoke the default options for the codec type => "collectd" } } This new configuration will use 2 threads and a queue size of 2000 by default for the UDP input plugin. With this you should easily be able to break 30,000 events per second!
我已经提供了一些其他configuration示例的要点。 有关更多信息,请查看collectd编解码器的Logstash文档。
快乐的日志!