ossec log-analysis

ossec log-analysis

1.前言

最近做主机安全监控恶意命令的时候用到了kafka,发现这是分布式处理大数据日志的一个神器。

大概流程:

  • 1.主机agent端把当前主机执行命令的内容写入kafka(这里可以把kafka理解为一个高吞吐、低延迟的大数据专用队列)(这里用了公司内部的kafka服务器,公司用了19个Broker,不过它也承载了公司其它业务的读写压力,正常个人认为3以上个broker够了)
  • 2.使用java做分布式的消费端,处理主机执行命令的日志内容。(开启了4个java消费端)

这个框架下,覆盖10000多台机器,处理主机执行命令的日志qps为20000。在处理大数据的日志中显示出很强的性能。

这个框架成功以后,考虑能不能把这个框架搬到ossec上,众所周知,ossec是一个著名的主机hids,其中它最强的功能要属他的主机安全日志处理能力,但是ossec有一个比较大的问题就是它扛不了大的并发,只要机器agent量一大,这个东西就不行了(每一台ossec-server默认支持256个agent,最大支持2048个agent,当然可以改。)

不过要是能把前面说的kafka的框架用到这里来,就可以解决上面所说的问题了。需要就是将ossec的安全日志分析逻辑进行分析并复制出来。

2.ossec Log analysised 逻辑

因为公司机器环境是centos的,所以这里只研究了ossec的syslog的主机安全日志分析。

开始时候下面两个文章帮忙很多:

1.http://ouyangjia7.iteye.com/blog/355675
2.http://vinc.top/2017/07/07/%E3%80%90ossec%E3%80%91%E6%97%A5%E5%BF%97%E6%B3%9B%E5%8C%96%E5%8F%8A%E5%91%8A%E8%AD%A6%E8%A7%84%E5%88%99%E9%85%8D%E7%BD%AE/

后面结合ossec源码,半看半猜把这套ossec的安全日志分析逻辑用java写了。【c++代码很多看不懂,感谢@程家大老爷 帮忙一起研究ossec源码】

以下参考ossec写的java的Log analysised相关逻辑:

  • 1.日志预解码 Log pre-decoding
  • 2.日志解码 Log decoding
  • 3.日志分析 Log analysis

2.1.日志预解码 Log pre-decoding

目的:从日志中提取一般的信息。
例如:从系统日志头中获取主机名,程序名和时间等等。
条件:日志必须格式良好。
这一步其实syslog的解析

2.1.1.主机agent中先进行syslog格式化

由于我们自己的主机ids使用golang写的,所以直接用github.com/jeromer/syslogparser/rfc3164这个golang库,在agent进行syslog的解析,分析出syslog的log,host_name,ip,program_name,start_time然后直接写入kafka可以了。我这里目前只是做了/var/log/secure,/var/log/messages(messages目前只上传program_name为su的syslog)两个文件的监控。

另外:golang写kafka可能存在内存泄露的问题,具体解决方法:http://lday.me/2017/09/02/0012_a_memory_leak_detection_procedure/
(设置metrics.UseNilMetrics = true)

2.1.2.实例化syslog为eventinfo对象

在java的kafak消费端,我们把每个syslog看出一个Eventinfo事件(Eventinfo event_one)。Eventinfo的定义类似:

public class Eventinfo {
    // 这个事件的详情
    private String type;// (其实也是日志类型)指明decoder的类型, 包括:firewall、ids、web-log、windows、claw、squid、syslog、host-information
    private String log; // log内容
    private String host_name; // 主机名
    private String ip; // 当前事件对应的ip
    private String program_name;
    private String start_time;
    private int size; //字符串全部大小
    private boolean is_fts; //标示是不是第一次
    // 对应解析这个事件decoder解析完   对应的得到内容
    private String user; // 用户
    private String src_user; // su来源用户
    private String dst_user; // su target用户
    private String url; // sTTY PWD
    private String extra_data; // extra_data
    private String src_ip; // 来源ip
    private String dst_ip; // target的ip
    private String src_port; // 来源port
    private String dst_port; // target的port
    private String protocol; // protocol 类型
    private String location; // location 类型 这里指ip或者。判断是否第一次执行时候有用。
    private String action; // event action (deny, drop, accept, etc)
    private String status; // event status (success, failure, etc)
    private Decoder child_decoder;/* 指向生产这个事件的decoder对象 */
    private List<String> group_list;// 规则对应的group_list
    private List<String> sid_list;// 规则对应的sid_list
}

所以通过第一步日志预解码,eventinfo的log,host_name,ip,program_name,start_time,size这些信息都有了。

2.2.日志解码 Log decoding

日志解码是为了获得比预解码更深入的信息,这次不是从日志头中提取,而是从日志内容中用正则表达式(Regular Expresion)标识出某些关键字,一般我们需要提取源IP地址,用户名,ID号等等的信息。

我们有上百条默认的解码规则,它们被保存在decoder.xml文件中。(/etc/decoder.xml)

所以这一步就是要对日志进行进一步的处理,解析出Eventinfo event_one的is_fts,user,src_user,dst_user,url,extra_data,src_ip,dst_ip....等字段,最终得到一个更完整的Eventinfo的对象信息。

2.2.1.读取decoder.xml,获取parent_decoder_list

对decoder.xml仔细关注以后,我首先把每一个xml的子节点认为一个对象Decoder。Decoder对象类似于:

public class Decoder {
    // xml解析器
    private String type;// (其实也是日志类型)指明decoder的类型,
    private String parent;// 父级解码
    private String name;// 解码器的名字,name
    private String program_name; // 程序名正则
    private String regex;       // 结果匹配,为了提取内容的regex
    private String prematch;    // 结果匹配,为了提取内容的prematch
    private String order;       // 对应解析到的eventinfo字段
    private String fts;         // 根据fts字段判断是不是第一次插入的
    private List<Decoder> child_nodes; // 子child_node;

解析完decoder.xml,我的得到了一个List<Decoder> parent_decoder_list,具体过程为:

  • 1.遍历全部的decoder.xml的DecoderAllList
  • 2.查看当前Decoder节点[decoder_one]是否存在program_name字段。存在就新建decoder_one,赋值name,parent,program_name,prematch,把decoder_one存到parent_decoder_list中
  • 3.当前decoder_one不存在program_name字段,就认为它是一个子节点,我们取当前decoder_one的parent字段[这时候decoder_one的parent一定存在]。然后遍历parent_decoder_list,如果decoder_one的parent字段和parent_decoder_list的某个decoder的name一样,把decoder_one作为子节点添加对应的decoder中。

这样我们就获取到一个List<Decoder> parent_decoder_list

2.2.2.根据parent_decoder_list,完善Eventinfo对象内容

接着我们就开始遍历parent_decoder_list,用里面的Decoder对象来完善Eventinfo对象内容。具体过程为:(注:这里面有用到正则匹配,ossec的正则有些不一样,分为两种:OS_Regex/OS_Match,具体看:http://ossec-docs.readthedocs.io/en/latest/syntax/regex.html 所以说decode的regex字段和match的字段匹配时候是有差别的),具体步骤如下:

  • 1.遍历到parent_decoder_list。
  • 2.获取到当前节点[parent_decoder_one]时。使用parent_decoder_one的program_name字段去匹配当前event_info对应的program_name(这里匹配的是用OS_Match的方式),如果命中把parent_decoder_one指向对应的event_info的child_decoder,并进入第4步。没有命中,到第3步。
  • 3.如果parent_decoder_one不存在prematch字段跳到下一个节点,如果parent_decoder_one的prematch字段存在,就用prematch去匹配event_info对应的log(这里匹配的是用OS_Match的方式)。如果匹配到了,就把当前parent_decoder_one指向对应的event_info的child_decoder,并进入第4步,如果没有匹配到的话就跳到下一个parent_decoder_one。
  • 4.进一步遍历parent_decoder_one的child_nodes。
  • 5.获取当前节点child_one的prematch。如果不存在prematch字段,直接跳到第六步,如果prematch字段存在,就用prematch去匹配event_info对应的log(这里匹配的是用OS_Match的方式),没有匹配中就跳到下一个节点,匹配中就到第6步。
  • 6.获取当前节点child_one的regex,如果不存在regex字段,直接跳到下一个节点,如果regex字段存在,就用regex去匹配event_info对应的log(这里匹配的是用OS_Regex的方式),如果没有匹配到直接跳到下一个节点;
    如果匹配到了,这时候用order字段做对应的event_info进一步(regex和order一定同时存在),regex正则匹配的分组对应需要order的信息来对event_info进行完善,order的数据说明了要把regex捕获组的数据赋值到event_info的哪个字段,比如说regex为su (S+)' S+ for (S+) on S+,order为dstuser,srcusr,那么匹配到的group(0)赋值给event_info的dst_user,group(1)赋值给event_info的user_user。
    接着把当前child_one指向对应的event_info的child_decoder。
  • 7.获取当前节点child_one的fts字段。如果不存在fts字段,结束该部分的处理;因为fts这个字段的作用是用于标记这个事件是不是第一次发生的,比如说第一次登入系统,第一次执行sudo命令等等,所以如果fts字段存在,就会根据fts的字段,把event_info对应的字段继续md5计算获取hash,比如说fts为name,srcuser,location,那就把event_info的child_decder的name值加上src_user值,加上location做md5的hash值计算。
    因为kafka的消费端会有多个,所以他们需要共享这个数据,所以我把hash存到redis中,把hash插入redis中的时候,查询hash在redis是否存在,如果不存在说明是第一次,这是把event_info的fts标志为true,插入该hash值;存在则不插入。

2.3.日志分析 Log analysis

日志被解码之后的下一步,是检查是否有与之相匹配的规则。
OSSEC有400多条默认的规则,用XML形式保存。(/etc/rule/规则.xml)
程序启动时,经过OSSEC处理,所有规则都被读入一个树状的结构中。
规则只能用来匹配被解码之后的日志信息。
由于解码器的存在,规则和日志的初始化之间没有直接的联系。

其实到这一步,我们的信息已经全部存到了event_info这个对象里面了。我们需要通过xml,对event_info进行分析,最终生成对应的安全日志报警。

2.3.1.根据xml文件建rule的树结构

我首先把每一个规则的xml的子节点认为一个对象rule,结构如下:

public class Rule {
    private int deep; // 单曲树层数
    private String id;
    private String level;
    private int maxsize;
    private String frequency;
    private String timeframe;
    private String noalert;// 标志不报警,它不插入数据库中
    private String group;// 规则对应的小group
    private String group_name;// 规则对应的大group
    private String match;// 规则对应的match
    private String regex;// 规则对应的regex
    private String decoded_as;
    private String extra_data;
    private String user;
    private String program_name;
    private String hostname;
    private String url;
    private String action;
    private String status;
    private String if_sid;
    private String if_group;
    private String if_level;
    //timeframe frequency 特有的
    private String if_matched_sid;
    private String if_matched_group;
    private String same_source_ip;
    private String same_source_port;
    private String same_dst_port;
    private String same_location;
    private String same_user;
    private String description;
    private String info;
    private String if_fts;
    private String options;
    private List<Rule> child_rules; // 子child_rules list

按照ossec代码里的规则读取顺序去读xml(一定要按顺序读,不然会有问题的),xml的子节点形成一个rule对象,最终汇总到一个List<Rule> AllRulelists里。

接着遍历AllRulelists,最终目标是生成一个牛逼的rule树结构。至于为什么要使用树结构来匹配呢?因为这种方法十分有效,平均每条日志只要用匹配7-8条规则,而不是全部的400条,这样的话,极大的优化了效率的。虽然在构建树的过程中有点麻烦。。。。具体构建树的过程如下:

  • 1.新建一个Rule对象,名为parent_rule,成为整个树结构的root节点;
  • 2.遍历到AllRulelists,获取到当前节点[rule_one]时,判断如果rule_one的is_sid,is_group,is_level都不存在的话,把该parent_rule添加到root的child_rules中。
  • 3.接下去是进入递归函数,继续遍历AllRulelists,获取到当前节点[rule_one],目的是根据rule_one的if_sid或者if_group,is_level把rule_one节点挂到parent_rule的某个节点上。这里注意if_sid只会让rule_one挂到一个节点上;而if_group,is_level可能导致一个rule_one挂到多个节点上,但是不允许当前rule_one挂自己,因为这样的情况会导致死循环。这个过程说起来可能有点绕,不过要是你仔细观察xml文件,还有源代码,就知道这个树建过程的详细逻辑了。

2.3.2.通过rule_list的树结构,对event_info进行分析生成报警。

到这里我们就有一个parent_rule的树结构来对event_info进行分析并最终生成报警了。
因为是一个树结构所以我们当然还是需要递归了,具体实现步骤如下:

    • 1.遍历parent_rule的child_rules。
    • 2.获取到当前节点[rule]的decoded_as,不存在就跳到第3步,存在使用rule的decoded_as去匹配event_one的child_decoder.parent,如果没有匹配到就跳到child_rules的下一个节点。匹配中了就跳到第3步
    • 3.获取到当前节点[rule]的program_name,不存在就跳到第4步,存在使用rule的program_name去匹配event_one的program_name,如果没有匹配到就跳到child_rules的下一个节点。匹配中了就跳到第4步
    • 4.获取到当前节点[rule]的match,不存在就跳到第5步,存在使用rule的match去匹配event_one的log,如果没有匹配到就跳到child_rules的下一个节点,(这里匹配的是用OS_Match的方式)匹配中了就跳到第5步。
    • 5.获取到当前节点[rule]的regex,不存在就跳到第6步,存在使用rule的regex去匹配event_one的log,如果没有匹配到就跳到child_rules的下一个节点。(这里匹配的是用OS_Regex的方式),匹配中了就跳到第6步
    • 6.下面的Action,Url,Use,Maxsize,Extra_data,Status都是类似的
    • 7.获取到当前节点[rule]的if_fts,不存在就跳到第8步,存在就判断event_one.isIs_fts是否为真,如果为假的话跳到child_rules的下一个节点,如果为真就跳到第8步
    • 8.判断当前节点[rule]的timeframe是否存在,不存在就跳到第9步,timeframe是一个特殊的规则,它是在指定时间(timeframe*2)内判断某个报警时间有没有出现对应frequency次,如果frequency不存在默认为1次。如果都符合进入第9步,不符合就跳到child_rules的下一个节点。
    • 9.到第九步就认为规则被命中了,判断noalert是否存在,存在这个报警就忽略。不存在,就需要把这个报警保存到数据库,因为第8步的时候要用到,而且这些报警也是比较重要的所以都要保存起来。考虑到这个量级比较大,我用了es做报警存储,而且后面结合kibana可以直接作出漂亮图表报告会减少很多工作量;另外我们可以根据level级别来报警通知等等;最后就是重新进入递归,遍历当前节点[rule]的child_rules,而且当递归函数结束同时,还需要从前一个遍历parent_rule的child_rules的过程中break出来

    3.结合kibana 作出报表

    因为报警数据存储到es了,而kibana是一套很成熟的系统了,我们直接用kibana做报表:
    10.50.186.131_8080_app_kibana-1.png

    4.结尾

    因为ossec的c代码很多看不太懂,很多逻辑半猜半看,如果有些逻辑不对或者说的很乱的,请多参考ossec源码。

    另外:文章拒绝转载哦

    标签: ossec

    添加新评论