Collectd – > Elasticsearch,如果远程主机无法连接到中央elasticsearch

目标

中央存储和分析性能数字的方法:

  • CPU负载
  • 内存使用率

目前的战略

我想实现这样的设置:

  1. collectd
  2. logstash
  3. elasticsearch
  4. kibana

就像这里解释的一样: https : //mtalavera.wordpress.com/2015/02/16/monitoring-with-collectd-and-kibana/

问题:远程主机无法推送数据

约束:

  • 我们只有从中央服务器ssh到远程主机。
  • ssh从远程主机到中央服务器不能工作,因为networking设置(我不能改变的东西)。
  • networkingstream量跨越几个非公共networking。 由于pipe理员使用防火墙规则,因此每个月两次都无法访问主机。 我不想松散一条线。 这就是为什么我想将日志存储在远程主机上并获取(压缩)的数据。

解?

我怎样才能每小时获取数据?

有了上面列出的问题,您需要在远端缓冲统计信息,以免丢失任何信息。

有很多方法可以做到这一点,但没有一个过于简单,需要进行大量的testing才能确保它们的可行性。 它们都涉及在本地编写collectd的输出,然后使用一些方法在中央服务器上获取它。

我没有testing下面的任何一个,所以有些可能根本不工作。

没有特别的顺序或复杂性:

  1. 套接字/networking输出到脚本
    Collectd的输出写入一个套接字或IP /端口,PHP / Perl / Python / Bash脚本正在侦听将命令写入文件。

    然后这些文件可以被中央服务器推送/拉出并由Logstash摄取。

    优点 :简单的脚本来捕获输出; 使用标准的Linux命令
    缺点 :如果你拉大量的统计信息,则不可扩展; 需要维护脚本; 不知道LS是否会处理普通协议

  2. Redis / AMQP / Kafka / MongoDBCollectd的输出写入其中一个可能的“缓冲区”。 他们每个人的工作方式都有所不同,并且有不同的部署选项,所以我会留给你们弄清楚哪个是最好的,因为这个问题已经超出了范围。 也就是说,他们中的任何一个都应该工作

    然后,您需要一种方法将缓冲解决scheme中的数据返回到中央服务器。 应用程序本地复制/镜像/群集或运行每X间隔运行数据(在任一端运行)的脚本有两种可能性。

    优点 :非常灵活的部署选项; 应该很好地扩展; 使用众所周知的工具/程序
    缺点 :缓冲区编程可能需要大量资源,或者安装了很多软件包

  3. 套接字/networking输出到Logstash这与选项1几乎相同,但不是将输出收集到脚本/ progarm,而是写入每个远程主机上的本地Logstash实例。

    然后,Logstash将在本地写入CSV / JSON,并且可以使用任何方法将这些文件返回到中央服务器,包括LS本身。

    优点 :整套解决scheme的单套工具; 提供了一种在边缘转换数据的方法,然后集中采集; 很less的移动部件缺点 :所有远程主机上都需要Java / LS

除了每个选项的优点和缺点之外,所有这些缺点都是你需要find一种方法来在所有服务器上保持一致的configuration。 如果你有很多远程节点(或者一般情况下只有很多节点),你可能已经有一个configurationpipe理系统,这将是微不足道的。

编辑:Achtung! 警告!

请使用这个docker-compose而不是我连接的那个docker-compose (它确实需要dockercompose ,也许是machine但它已经为你做了更多的事情,你将不得不less挣扎。

CELK: https : //github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf

因此,从这里开始获得一个工作系统的好概述。 他们已经为你做了一些工作,所以你只需要担心你的问题,这涉及到configuration和部署。

即使你没有最终使用Docker,这仍然会让你走上成功的道路, 并且还有向你展示它如何组合在一起的额外好处。

首先获得stream浪汉,并build立stream浪汉形象的stream浪汉

如果你不知道什么是stream浪者,那真是太好了。 这是一个允许人们共享整套虚拟机和configuration器的程序,以便您可以只定义一个虚拟机及其configuration,而不是共享整个虚拟机,而且“正常工作”。 这感觉很神奇,但它确实只是可靠的系统工作。

  • (使用上面的链接到CELK): https : //github.com/pblittle/docker-logstash

你将需要安装Vagrant来使用它。 去做就对了! 然后,您不必安装docker因为它将在Vagrant虚拟机上运行。

你有四个select你想如何使用它,但首先,让stream浪汉准备与粗体的命令….

vagrant up

决定你需要运行哪些程序

您的select是:

  • 全套房或ELK(elasticsearch,logstash,kibana)
  • 仅限代理(Logstash收集器)
  • 只有Kibana

还有其他configuration可用,但仅用于testing。

开演时间

现在是时候configurationLogstash了,这是真正唯一具有复杂行为的部分。

Logstashconfiguration文件是纯文本文件,以conf结尾,也可以使用tar或gunzip gz组合在一起。

您可以通过以下两种方式获取configuration文件:

  • 你从互联网上下载它们,使用环境variablesLOGSTASH_CONFIG_URL指向你的configuration文件的url,如果你得到的url不对,或者有一些问题,并且它不能从url获得configuration,url或其他
  • 从磁盘读取它们 – 因为这是docker工具,所以您实际上正在创build一个 (现在),并且每次运行容器时都会挂载卷。

以下是从Internet上使用configuration运行时的样子:

 $ docker run -d \ -e LOGSTASH_CONFIG_URL=https://secretlogstashesstash.com/myconfig.tar.gz \ -p 9292:9292 \ -p 9200:9200 \ pblittle/docker-logstash 

docker 提醒你

默认的logstash.conf只监听stdin和文件input。 如果您希望configurationtcp和/或udpinput,请使用您自己的logstashconfiguration文件并自行公开端口。 有关configuration语法和更多信息,请参阅logstash文档。

注意:什么是默认的logstash conf ?

回想一下,当你没有在所需的环境variablesLOGSTASH_CONFIG_URL中input正确的URL时,

这是input部分:

 // As the author warned, this is all you get. StdIn and Syslog file inputs. input { stdin { type => "stdin-type" } file { type => "syslog" path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ] } file { type => "logstash" path => [ "/var/log/logstash/logstash.log" ] start_position => "beginning" } } 

超越默认

在网站上阅读更多关于logstash的信息。

现在, logstash已经有插件将数据推送到input 。 插件完全按照您的预期变化; 这里有几个:

  • s3从亚马逊(文件系统事件)
  • stdinlogstash (默认,读取stdin缓冲区)
  • httplogstash (你的猜测)
  • …等等…

例如:UDP套接字

UDP是一种无连接且快速的协议,在L4 (传输)的底部运行,支持多重传输,处理故障,通常是logging数据传输的一个很好的select。

你select你想要的端口; 其他select取决于你在做什么。

TCP以相同的方式工作。

udp {port => 9999 codec => json buffer_size => 1452}

例2:来自collectd UDP套接字被过滤和输出

 This is stolen from https://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf input { udp { port => 25826 # 25826 matches port specified in collectd.conf buffer_size => 1452 # 1452 is the default buffer size for Collectd codec => collectd { } # specific Collectd codec to invoke type => collectd } } output { elasticsearch { host => elasticsearch cluster => logstash protocol => http } } 

而过滤就是一个很好的例子,那就是,它真的很长,我认为它是有用的

 filter { # TEST implementation of parse for collectd if [type] == "collectd" { if [plugin] { mutate { rename => { "plugin" => "collectd_plugin" } } } if [plugin_instance] { mutate { rename => { "plugin_instance" => "collectd_plugin_instance" } } } if [type_instance] { mutate { rename => { "type_instance" => "collectd_type_instance" } } } if [value] { mutate { rename => { "value" => "collectd_value" } } mutate { convert => { "collectd_value" => "float" } } } if [collectd_plugin] == "interface" { mutate { add_field => { "collectd_value_instance" => "rx" "collectd_value" => "%{rx}" } } mutate { convert => { "tx" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "tx" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'tx' event['collectd_value_instance'] = 'tx' event['collectd_value'] = event['tx'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "rx", "tx" ] } } if [collectd_plugin] == "disk" { mutate { add_field => { "collectd_value_instance" => "read" "collectd_value" => "%{read}" } } mutate { convert => { "write" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "write" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'write' event['collectd_value_instance'] = 'write' event['collectd_value'] = event['write'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "read", "write" ] } } if [collectd_plugin] == "df" { mutate { add_field => { "collectd_value_instance" => "free" "collectd_value" => "%{free}" } } mutate { convert => { "used" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "used" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] == 'used' event['collectd_value_instance'] = 'used' event['collectd_value'] = event['used'] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "used", "free" ] } } if [collectd_plugin] == "load" { mutate { add_field => { "collectd_value_instance" => "shortterm" "collectd_value" => "%{shortterm}" } } mutate { convert => { "longterm" => "float" "midterm" => "float" "collectd_value" => "float" } } # force clone for kibana3 clone { clones => [ "longterm", "midterm" ] } ##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working ##### ruby { code => " if event['type'] != 'collectd' event['collectd_value_instance'] = event['type'] event['collectd_value'] = event[event['type']] end " } mutate { replace => { "_type" => "collectd" } replace => { "type" => "collectd" } remove_field => [ "longterm", "midterm", "shortterm" ] } } } } 

编辑3:我可能不应该为你做你的工作,但没关系。

像任何好的软件一样收集某些丑陋或难以处理的方面,并试图使它容易,因为它看起来像你发送数据(在这种情况下是一个元组),而不是与序列化相混淆。

你的例子:

(date_time, current_cpu_load), for example ('2016-0-04-24 11:09:12', 12.3)

我不会花我的时间来弄清楚你是如何形成的。 如果你能够使用CPU插件获得这些数据,那就太好了。 我要复制并粘贴我在网上find的一个,以方便我。

这就是说,想一想…只是一点点,它不会伤害。

你看到CPU插件下面加载。

您看到conf文件中collectd的界面太小而无法指定字段。

所以如果你只是这样做,它会工作,但你会得到比CPU负载更多的数据。

这就是你可以使用filter的地方。 但是我想你也可以在Kibana中做到这一点。 所以我宁愿不浪费时间写一个filter,你不需要和b)如果你花了一些时间,可以很容易地写。

 ## In `collectd`: # For each instance where collectd is running, we define # hostname proper to that instance. When metrics from # multiple instances are aggregated, hostname will tell # us were they came from. Hostname "**YOUR_HOSTNAME**" # Fully qualified domain name, false for our little lab FQDNLookup false # Plugins we are going to use with their configurations, # if needed LoadPlugin cpu LoadPlugin df <Plugin df> Device "/dev/sda1" MountPoint "/" FSType "ext4" ReportReserved "true" </Plugin> LoadPlugin interface <Plugin interface> Interface "eth0" IgnoreSelected false </Plugin> LoadPlugin network <Plugin network> Server "**YOUR.HOST.IP.ADDR**" "**PORTNUMBER**" </Plugin> LoadPlugin memory LoadPlugin syslog <Plugin syslog> LogLevel info </Plugin> LoadPlugin swap <Include "/etc/collectd/collectd.conf.d"> Filter ".conf" </Include> 

你的logstashconfiguration

  input { udp { port => **PORTNUMBER** # 25826 matches port specified in collectd.conf buffer_size => **1452** **# 1452 is the default buffer size for Collectd** codec => collectd { } # specific Collectd codec to invoke type => collectd } } output { elasticsearch { cluster => **ELASTICSEARCH_CLUSTER_NAME** # this matches out elasticsearch cluster.name protocol => http } } 

来自collectar的aaron的更新

在Logstash 1.3.x中,我们引入了collectdinput插件。 太棒了! 我们可以在Logstash中处理度量标准,将它们存储在Elasticsearch中,并用Kibana查看它们。 唯一的缺点是你只能通过插件每秒获得3100个事件。 使用Logstash 1.4.0,我们引入了一个新改进的UDPinput插件,它是multithreading的,并有一个队列。 我重构了collectdinput插件作为编解码器(来自我的同事和社区的一些帮助),以利用这个巨大的性能提升。 现在我的双核心MacBook Air上只有3个线程,我可以通过collectd编解码器每秒获得超过45,000个事件!

所以,我想提供一些可以用来改变你的插件configuration来代替使用编解码器的简单例子。

旧的方式: input { collectd {} }新的方法:

 input { udp { port => 25826 # Must be specified. 25826 is the default for collectd buffer_size => 1452 # Should be specified. 1452 is the default for recent versions of collectd codec => collectd { } # This will invoke the default options for the codec type => "collectd" } } This new configuration will use 2 threads and a queue size of 2000 by default for the UDP input plugin. With this you should easily be able to break 30,000 events per second! 

我已经提供了一些其他configuration示例的要点。 有关更多信息,请查看collectd编解码器的Logstash文档。

快乐的日志!