我有几个运行Ubuntu 14.04的Linux服务器,并把它们变成一个Spark集群。 此时,我想要测量服务器之间的networkingstream量,以毫秒为单位,以便进行一些推理和分析任务,供研究之用。 我已经尝试了以下没有成功:
1)我从脚本中反复调用“iptables -nvx -L”并获取其输出,其中包含我想要的信息(数据包数和字节数)。 但是,调用它需要超过1ms,因为我在Python(使用subprocess模块)和Bash脚本中实现。
python:
args = ['iptables','-nvx','-L','Log']
raw_traffic = sp.Popen('iptables -nvx -L Log',shell = True,stdout = sp.PIPE).stdout
(上面的代码需要运行1.2ms)
击:
start_time = $(date+%s%N)
std_output = $(iptables -nvx -L Log)
echo(($(date +%s%N) – $ start_time)/ 1000000)“| bc -l
与输出:
$ sudo sh foo.sh
1.17057700000000000000
2)我试图让iptables从服务器的IP地址logging每个数据包。 这当然是为了达到目的,因为它能够以最细致的方式衡量一切。 但是,这会减慢stream量并造成大的开销。 服务器之间的链接是10G,因此全速大约每秒钟为每个IP地址生成1M行日志,这是不可行的。 我现在需要的信息是服务器之间的毫秒数据包和字节。
是否有任何1毫秒的监测变通办法? 我想这应该可以用iptables来做,也许可以用一些黑客。 任何build议都不胜感激。 提前致谢!
你可以做到这一点,但是你需要使用NF_LOG iptables目标,并编写一个专门满足你的需求的程序。
NF_LOG将数据包发送到一个接收应用程序(给定有多快,你设法做你在做什么)将计数的数据包是一个毫秒的水平。
您可以避免写出每个数据包,并截断数据,以确保您可以尝试获得所需的100万pps。
然而问题是没有这样的接收程序存在 – 你需要写一个。
在做了NF_LOG之前,你的声明LOG减慢了stream量,我怀疑它的数据包写入磁盘,可能会播下来的东西。
我仍然怀疑每秒百万个数据包仍然是一个很高的要求。
另一种select(不是实时的)是使用tcpdump捕获数据包,稍后使用程序读取pcap文件并进行相应的毫秒计算。
无论你如何处理这个信息,你可能需要移动到像C这样的低级语言来获得性能。 你甚至可能需要去multithreading。
我没有尝试过,但netfilter库libnetfilter_log看起来很有希望。 iptables本质上是netfilter命令行界面 , netfilter是Linux内核中的包filter。 所以这将是你的数据包filter的直接库。 听起来很有效率。
http://www.netfilter.org/projects/libnetfilter_log/index.html
什么是libnetfilter_log?
libnetfilter_log是一个用户空间库,为内核数据包filterlogging的数据包提供接口。 这是系统的一部分,反对旧的基于syslog / dmesg的数据包日志logging。 这个库以前被称为libnfnetlink_log。
主要特点
接收来自内核nfnetlink_log子系统的待logging数据包
这是一些示例代码 (从netfilter站点复制的例子/不是很糟糕的权利?)
00002 #include <stdio.h> 00003 #include <stdlib.h> 00004 #include <unistd.h> 00005 #include <netinet/in.h> 00006 00007 #include <libnetfilter_log/libnetfilter_log.h> 00008 00009 static int print_pkt(struct nflog_data *ldata) 00010 { 00011 struct nfulnl_msg_packet_hdr *ph = nflog_get_msg_packet_hdr(ldata); 00012 u_int32_t mark = nflog_get_nfmark(ldata); 00013 u_int32_t indev = nflog_get_indev(ldata); 00014 u_int32_t outdev = nflog_get_outdev(ldata); 00015 char *prefix = nflog_get_prefix(ldata); 00016 char *payload; 00017 int payload_len = nflog_get_payload(ldata, &payload); 00018 00019 if (ph) { 00020 printf("hw_protocol=0x%04x hook=%u ", 00021 ntohs(ph->hw_protocol), ph->hook); 00022 } 00023 00024 printf("mark=%u ", mark); 00025 00026 if (indev > 0) 00027 printf("indev=%u ", indev); 00028 00029 if (outdev > 0) 00030 printf("outdev=%u ", outdev); 00031 00032 00033 if (prefix) { 00034 printf("prefix=\"%s\" ", prefix); 00035 } 00036 if (payload_len >= 0) 00037 printf("payload_len=%d ", payload_len); 00038 00039 fputc('\n', stdout); 00040 return 0; 00041 } 00042 00043 static int cb(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg, 00044 struct nflog_data *nfa, void *data) 00045 { 00046 print_pkt(nfa); 00047 } 00048 00049 00050 int main(int argc, char **argv) 00051 { 00052 struct nflog_handle *h; 00053 struct nflog_g_handle *qh; 00054 struct nflog_g_handle *qh100; 00055 int rv, fd; 00056 char buf[4096]; 00057 00058 h = nflog_open(); 00059 if (!h) { 00060 fprintf(stderr, "error during nflog_open()\n"); 00061 exit(1); 00062 } 00063 00064 printf("unbinding existing nf_log handler for AF_INET (if any)\n"); 00065 if (nflog_unbind_pf(h, AF_INET) < 0) { 00066 fprintf(stderr, "error nflog_unbind_pf()\n"); 00067 exit(1); 00068 } 00069 00070 printf("binding nfnetlink_log to AF_INET\n"); 00071 if (nflog_bind_pf(h, AF_INET) < 0) { 00072 fprintf(stderr, "error during nflog_bind_pf()\n"); 00073 exit(1); 00074 } 00075 printf("binding this socket to group 0\n"); 00076 qh = nflog_bind_group(h, 0); 00077 if (!qh) { 00078 fprintf(stderr, "no handle for grup 0\n"); 00079 exit(1); 00080 } 00081 00082 printf("binding this socket to group 100\n"); 00083 qh100 = nflog_bind_group(h, 100); 00084 if (!qh100) { 00085 fprintf(stderr, "no handle for group 100\n"); 00086 exit(1); 00087 } 00088 00089 printf("setting copy_packet mode\n"); 00090 if (nflog_set_mode(qh, NFULNL_COPY_PACKET, 0xffff) < 0) { 00091 fprintf(stderr, "can't set packet copy mode\n"); 00092 exit(1); 00093 } 00094 00095 fd = nflog_fd(h); 00096 00097 printf("registering callback for group 0\n"); 00098 nflog_callback_register(qh, &cb, NULL); 00099 00100 printf("going into main loop\n"); 00101 while ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) { 00102 struct nlmsghdr *nlh; 00103 printf("pkt received (len=%u)\n", rv); 00104 00105 /* handle messages in just-received packet */ 00106 nflog_handle_packet(h, buf, rv); 00107 } 00108 00109 printf("unbinding from group 100\n"); 00110 nflog_unbind_group(qh100); 00111 printf("unbinding from group 0\n"); 00112 nflog_unbind_group(qh); 00113 00114 #ifdef INSANE 00115 /* norally, applications SHOULD NOT issue this command, 00116 * since it detaches other programs/sockets from AF_INET, too ! */ 00117 printf("unbinding from AF_INET\n"); 00118 nflog_unbind_pf(h, AF_INET); 00119 #endif 00120 00121 printf("closing handle\n"); 00122 nflog_close(h); 00123 00124 exit(0); 00125 }