ossec log-analysis
ossec log-analysis
1.前言
最近做主机安全监控恶意命令的时候用到了kafka,发现这是分布式处理大数据日志的一个神器。
大概流程:
- 1.主机agent端把当前主机执行命令的内容写入kafka(这里可以把kafka理解为一个高吞吐、低延迟的大数据专用队列)(这里用了公司内部的kafka服务器,公司用了19个Broker,不过它也承载了公司其它业务的读写压力,正常个人认为3以上个broker够了)
- 2.使用java做分布式的消费端,处理主机执行命令的日志内容。(开启了4个java消费端)
这个框架下,覆盖10000多台机器,处理主机执行命令的日志qps为20000。在处理大数据的日志中显示出很强的性能。
这个框架成功以后,考虑能不能把这个框架搬到ossec上,众所周知,ossec是一个著名的主机hids,其中它最强的功能要属他的主机安全日志处理能力,但是ossec有一个比较大的问题就是它扛不了大的并发,只要机器agent量一大,这个东西就不行了(每一台ossec-server默认支持256个agent,最大支持2048个agent,当然可以改。)
不过要是能把前面说的kafka的框架用到这里来,就可以解决上面所说的问题了。需要就是将ossec的安全日志分析逻辑进行分析并复制出来。
2.ossec Log analysised 逻辑
因为公司机器环境是centos的,所以这里只研究了ossec的syslog的主机安全日志分析。
开始时候下面两个文章帮忙很多:
1.http://ouyangjia7.iteye.com/blog/355675
2.http://vinc.top/2017/07/07/%E3%80%90ossec%E3%80%91%E6%97%A5%E5%BF%97%E6%B3%9B%E5%8C%96%E5%8F%8A%E5%91%8A%E8%AD%A6%E8%A7%84%E5%88%99%E9%85%8D%E7%BD%AE/
后面结合ossec源码,半看半猜把这套ossec的安全日志分析逻辑用java写了。【c++代码很多看不懂,感谢@程家大老爷 帮忙一起研究ossec源码】
以下参考ossec写的java的Log analysised相关逻辑:
- 1.日志预解码 Log pre-decoding
- 2.日志解码 Log decoding
- 3.日志分析 Log analysis
2.1.日志预解码 Log pre-decoding
目的:从日志中提取一般的信息。
例如:从系统日志头中获取主机名,程序名和时间等等。
条件:日志必须格式良好。
这一步其实syslog的解析
2.1.1.主机agent中先进行syslog格式化
由于我们自己的主机ids使用golang写的,所以直接用github.com/jeromer/syslogparser/rfc3164这个golang库,在agent进行syslog的解析,分析出syslog的log,host_name,ip,program_name,start_time然后直接写入kafka可以了。我这里目前只是做了/var/log/secure,/var/log/messages(messages目前只上传program_name为su的syslog)两个文件的监控。
另外:golang写kafka可能存在内存泄露的问题,具体解决方法:http://lday.me/2017/09/02/0012_a_memory_leak_detection_procedure/
(设置metrics.UseNilMetrics = true)
2.1.2.实例化syslog为eventinfo对象
在java的kafak消费端,我们把每个syslog看出一个Eventinfo事件(Eventinfo event_one)。Eventinfo的定义类似:
public class Eventinfo {
// 这个事件的详情
private String type;// (其实也是日志类型)指明decoder的类型, 包括:firewall、ids、web-log、windows、claw、squid、syslog、host-information
private String log; // log内容
private String host_name; // 主机名
private String ip; // 当前事件对应的ip
private String program_name;
private String start_time;
private int size; //字符串全部大小
private boolean is_fts; //标示是不是第一次
// 对应解析这个事件decoder解析完 对应的得到内容
private String user; // 用户
private String src_user; // su来源用户
private String dst_user; // su target用户
private String url; // sTTY PWD
private String extra_data; // extra_data
private String src_ip; // 来源ip
private String dst_ip; // target的ip
private String src_port; // 来源port
private String dst_port; // target的port
private String protocol; // protocol 类型
private String location; // location 类型 这里指ip或者。判断是否第一次执行时候有用。
private String action; // event action (deny, drop, accept, etc)
private String status; // event status (success, failure, etc)
private Decoder child_decoder;/* 指向生产这个事件的decoder对象 */
private List<String> group_list;// 规则对应的group_list
private List<String> sid_list;// 规则对应的sid_list
}
所以通过第一步日志预解码,eventinfo的log,host_name,ip,program_name,start_time,size这些信息都有了。
2.2.日志解码 Log decoding
日志解码是为了获得比预解码更深入的信息,这次不是从日志头中提取,而是从日志内容中用正则表达式(Regular Expresion)标识出某些关键字,一般我们需要提取源IP地址,用户名,ID号等等的信息。
我们有上百条默认的解码规则,它们被保存在decoder.xml文件中。(/etc/decoder.xml)
所以这一步就是要对日志进行进一步的处理,解析出Eventinfo event_one的is_fts,user,src_user,dst_user,url,extra_data,src_ip,dst_ip....等字段,最终得到一个更完整的Eventinfo的对象信息。
2.2.1.读取decoder.xml,获取parent_decoder_list
对decoder.xml仔细关注以后,我首先把每一个xml的子节点认为一个对象Decoder。Decoder对象类似于:
public class Decoder {
// xml解析器
private String type;// (其实也是日志类型)指明decoder的类型,
private String parent;// 父级解码
private String name;// 解码器的名字,name
private String program_name; // 程序名正则
private String regex; // 结果匹配,为了提取内容的regex
private String prematch; // 结果匹配,为了提取内容的prematch
private String order; // 对应解析到的eventinfo字段
private String fts; // 根据fts字段判断是不是第一次插入的
private List<Decoder> child_nodes; // 子child_node;
解析完decoder.xml,我的得到了一个List<Decoder> parent_decoder_list,具体过程为:
- 1.遍历全部的decoder.xml的DecoderAllList
- 2.查看当前Decoder节点[decoder_one]是否存在program_name字段。存在就新建decoder_one,赋值name,parent,program_name,prematch,把decoder_one存到parent_decoder_list中
- 3.当前decoder_one不存在program_name字段,就认为它是一个子节点,我们取当前decoder_one的parent字段[这时候decoder_one的parent一定存在]。然后遍历parent_decoder_list,如果decoder_one的parent字段和parent_decoder_list的某个decoder的name一样,把decoder_one作为子节点添加对应的decoder中。
这样我们就获取到一个List<Decoder> parent_decoder_list
2.2.2.根据parent_decoder_list,完善Eventinfo对象内容
接着我们就开始遍历parent_decoder_list,用里面的Decoder对象来完善Eventinfo对象内容。具体过程为:(注:这里面有用到正则匹配,ossec的正则有些不一样,分为两种:OS_Regex/OS_Match,具体看:http://ossec-docs.readthedocs.io/en/latest/syntax/regex.html 所以说decode的regex字段和match的字段匹配时候是有差别的),具体步骤如下:
- 1.遍历到parent_decoder_list。
- 2.获取到当前节点[parent_decoder_one]时。使用parent_decoder_one的program_name字段去匹配当前event_info对应的program_name(这里匹配的是用OS_Match的方式),如果命中把parent_decoder_one指向对应的event_info的child_decoder,并进入第4步。没有命中,到第3步。
- 3.如果parent_decoder_one不存在prematch字段跳到下一个节点,如果parent_decoder_one的prematch字段存在,就用prematch去匹配event_info对应的log(这里匹配的是用OS_Match的方式)。如果匹配到了,就把当前parent_decoder_one指向对应的event_info的child_decoder,并进入第4步,如果没有匹配到的话就跳到下一个parent_decoder_one。
- 4.进一步遍历parent_decoder_one的child_nodes。
- 5.获取当前节点child_one的prematch。如果不存在prematch字段,直接跳到第六步,如果prematch字段存在,就用prematch去匹配event_info对应的log(这里匹配的是用OS_Match的方式),没有匹配中就跳到下一个节点,匹配中就到第6步。
- 6.获取当前节点child_one的regex,如果不存在regex字段,直接跳到下一个节点,如果regex字段存在,就用regex去匹配event_info对应的log(这里匹配的是用OS_Regex的方式),如果没有匹配到直接跳到下一个节点;
如果匹配到了,这时候用order字段做对应的event_info进一步(regex和order一定同时存在),regex正则匹配的分组对应需要order的信息来对event_info进行完善,order的数据说明了要把regex捕获组的数据赋值到event_info的哪个字段,比如说regex为su (S+)' S+ for (S+) on S+,order为dstuser,srcusr,那么匹配到的group(0)赋值给event_info的dst_user,group(1)赋值给event_info的user_user。
接着把当前child_one指向对应的event_info的child_decoder。 - 7.获取当前节点child_one的fts字段。如果不存在fts字段,结束该部分的处理;因为fts这个字段的作用是用于标记这个事件是不是第一次发生的,比如说第一次登入系统,第一次执行sudo命令等等,所以如果fts字段存在,就会根据fts的字段,把event_info对应的字段继续md5计算获取hash,比如说fts为name,srcuser,location,那就把event_info的child_decder的name值加上src_user值,加上location做md5的hash值计算。
因为kafka的消费端会有多个,所以他们需要共享这个数据,所以我把hash存到redis中,把hash插入redis中的时候,查询hash在redis是否存在,如果不存在说明是第一次,这是把event_info的fts标志为true,插入该hash值;存在则不插入。
2.3.日志分析 Log analysis
日志被解码之后的下一步,是检查是否有与之相匹配的规则。
OSSEC有400多条默认的规则,用XML形式保存。(/etc/rule/规则.xml)
程序启动时,经过OSSEC处理,所有规则都被读入一个树状的结构中。
规则只能用来匹配被解码之后的日志信息。
由于解码器的存在,规则和日志的初始化之间没有直接的联系。
其实到这一步,我们的信息已经全部存到了event_info这个对象里面了。我们需要通过xml,对event_info进行分析,最终生成对应的安全日志报警。
2.3.1.根据xml文件建rule的树结构
我首先把每一个规则的xml的子节点认为一个对象rule,结构如下:
public class Rule {
private int deep; // 单曲树层数
private String id;
private String level;
private int maxsize;
private String frequency;
private String timeframe;
private String noalert;// 标志不报警,它不插入数据库中
private String group;// 规则对应的小group
private String group_name;// 规则对应的大group
private String match;// 规则对应的match
private String regex;// 规则对应的regex
private String decoded_as;
private String extra_data;
private String user;
private String program_name;
private String hostname;
private String url;
private String action;
private String status;
private String if_sid;
private String if_group;
private String if_level;
//timeframe frequency 特有的
private String if_matched_sid;
private String if_matched_group;
private String same_source_ip;
private String same_source_port;
private String same_dst_port;
private String same_location;
private String same_user;
private String description;
private String info;
private String if_fts;
private String options;
private List<Rule> child_rules; // 子child_rules list
按照ossec代码里的规则读取顺序去读xml(一定要按顺序读,不然会有问题的),xml的子节点形成一个rule对象,最终汇总到一个List<Rule> AllRulelists里。
接着遍历AllRulelists,最终目标是生成一个牛逼的rule树结构。至于为什么要使用树结构来匹配呢?因为这种方法十分有效,平均每条日志只要用匹配7-8条规则,而不是全部的400条,这样的话,极大的优化了效率的。虽然在构建树的过程中有点麻烦。。。。具体构建树的过程如下:
- 1.新建一个Rule对象,名为parent_rule,成为整个树结构的root节点;
- 2.遍历到AllRulelists,获取到当前节点[rule_one]时,判断如果rule_one的is_sid,is_group,is_level都不存在的话,把该parent_rule添加到root的child_rules中。
- 3.接下去是进入递归函数,继续遍历AllRulelists,获取到当前节点[rule_one],目的是根据rule_one的if_sid或者if_group,is_level把rule_one节点挂到parent_rule的某个节点上。这里注意if_sid只会让rule_one挂到一个节点上;而if_group,is_level可能导致一个rule_one挂到多个节点上,但是不允许当前rule_one挂自己,因为这样的情况会导致死循环。这个过程说起来可能有点绕,不过要是你仔细观察xml文件,还有源代码,就知道这个树建过程的详细逻辑了。
2.3.2.通过rule_list的树结构,对event_info进行分析生成报警。
到这里我们就有一个parent_rule的树结构来对event_info进行分析并最终生成报警了。
因为是一个树结构所以我们当然还是需要递归了,具体实现步骤如下:
- 1.遍历parent_rule的child_rules。
- 2.获取到当前节点[rule]的decoded_as,不存在就跳到第3步,存在使用rule的decoded_as去匹配event_one的child_decoder.parent,如果没有匹配到就跳到child_rules的下一个节点。匹配中了就跳到第3步
- 3.获取到当前节点[rule]的program_name,不存在就跳到第4步,存在使用rule的program_name去匹配event_one的program_name,如果没有匹配到就跳到child_rules的下一个节点。匹配中了就跳到第4步
- 4.获取到当前节点[rule]的match,不存在就跳到第5步,存在使用rule的match去匹配event_one的log,如果没有匹配到就跳到child_rules的下一个节点,(这里匹配的是用OS_Match的方式)匹配中了就跳到第5步。
- 5.获取到当前节点[rule]的regex,不存在就跳到第6步,存在使用rule的regex去匹配event_one的log,如果没有匹配到就跳到child_rules的下一个节点。(这里匹配的是用OS_Regex的方式),匹配中了就跳到第6步
- 6.下面的Action,Url,Use,Maxsize,Extra_data,Status都是类似的
- 7.获取到当前节点[rule]的if_fts,不存在就跳到第8步,存在就判断event_one.isIs_fts是否为真,如果为假的话跳到child_rules的下一个节点,如果为真就跳到第8步
- 8.判断当前节点[rule]的timeframe是否存在,不存在就跳到第9步,timeframe是一个特殊的规则,它是在指定时间(timeframe*2)内判断某个报警时间有没有出现对应frequency次,如果frequency不存在默认为1次。如果都符合进入第9步,不符合就跳到child_rules的下一个节点。
- 9.到第九步就认为规则被命中了,判断noalert是否存在,存在这个报警就忽略。不存在,就需要把这个报警保存到数据库,因为第8步的时候要用到,而且这些报警也是比较重要的所以都要保存起来。考虑到这个量级比较大,我用了es做报警存储,而且后面结合kibana可以直接作出漂亮图表报告会减少很多工作量;另外我们可以根据level级别来报警通知等等;最后就是重新进入递归,遍历当前节点[rule]的child_rules,而且当递归函数结束同时,还需要从前一个遍历parent_rule的child_rules的过程中break出来
3.结合kibana 作出报表
因为报警数据存储到es了,而kibana是一套很成熟的系统了,我们直接用kibana做报表:
4.结尾
因为ossec的c代码很多看不太懂,很多逻辑半猜半看,如果有些逻辑不对或者说的很乱的,请多参考ossec源码。
另外:文章拒绝转载哦
谢谢分享,学习了。