Filebeat 浅析(二)
前言
说完了filebeat
的基础架构和相关设计后,我们就来看下filebeat
是如何来实现这些功能的。这里只对几个相对来说比较重要的几个流程进行讨论。其余的设计会在之后的文章中进行阐述。几个流程为:
- 启动过程
- harvester 监听过程(以log为例)
- outputs 处理流程(以kafka为例)
分析
启动过程
首先当然是找到入口函数,对于go
应用来说很简单,直接找main()
就可以了。因此我们可以看到:beats/filebeat/main.go
的
1 | package main |
其中ipnuts.Init
是用来获取该filebeat
在之前的一些状态信息或者初始化一份状态信息,主要是用来记录目前harvester
已监听到、采集到的offset
相关信息。cmd.Filebeat
用来初始化一个Filebeat
对象。cmd.root.Filebeat()
,加载&初始化相关的配置,以及构建filebeat
对象。下面代码里的beater.New
方法会构建了filebeat
对象。GenRootCmdWithSettings
方法内会构建启动filebeat
相关命令。
1 | // Filebeat build the beat root command for executing filebeat and it's subcommands. |
GenRootCmdWithSettings
函数在filebeat/libbeat/cmd/root.go
文件内。
1 | func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings) *BeatsRootCmd { |
此函数内会调用filebeat/libbeat/cmd/run.go
文件内的func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command
方法。
1 | func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command { |
根据方法名字就能关注到重点
1 | instance.Run(settings, beatCreator) |
Run
方法内会构建Beat
对象,并调用launch
方法。
1 | // Run initializes and runs a Beater implementation. name is the name of the |
launch
方法内又构建了Beater
对象(Beater
是个接口),并且在最后调用了Beater
接口的Run
方法启动。
1 | func (b *Beat) launch(settings Settings, bt beat.Creator) error { |
这个接口有多个实现,但是我们这里只需要关注一个,文件filebeat/beater/filebeat.go
文件内的Run
方法,这里就是启动filebeat
的实现。
在Run
方法内,会构建一个crawler
对象,此对象用来采集数据,工作原理其实是对Inputs
的包装,Inputs
就是filebeat
官网介绍的核心组件之一。
1 | // Run allows the beater to be run as a beat. |
至此整个filebeat
启动流程基本清晰了。
采集
数据采集其实就是由crawler
来完成的,他的主要组件就是harvester
。那我们现在就来先看crawler
的创建过程,入口函数在filebeat/beater/filebeat.go
文件内的Run
方法中的crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
,具体在filebeat/beater/crawler.go
中,然后就是启动err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)`,具体看这个方法:
1 | // Start starts the crawler with all inputs |
整个代码主要就是启动input对象,我们继续看startInput
这个方法,
1 | func (c *crawler) startInput( |
注意方法最后的runner.Start()
,就是在启动input
。这里的Runner
是对input
的一个封装,用于管理input
的生命周期,而Start
方法就是通过goroutine
调用其的Run
方法,而Run
的本质就是调用对于input
的Run
方法。
1 | type Input interface { |
1 | // Start starts the input |
由于我们最常用的就是通过filebeat
来采集日志,我们这里就以log
为例,来看其的具体流程,文件为filebeat/input/log/input.go
:
1 | // Run runs the input |
其中p.scan()
是其中的核心方法,跟踪进去看具体的实现:
1 | // Scan starts a scanGlob for each provided path/glob |
scan
方法内首先获取所有的文件。其次获取文件状态,根据状态来判定收集最新数据,还是从历史文件收集。文件收集会构建Harvester
对象。p.startHarvester(newState, 0)
是其核心方法:
1 | // startHarvester starts a new harvester with the given offset |
整个方法的核心内容可以概括成三步:
p.createHarvester
构建harvester
p.Setup
配置harvester
。Setup
方法内会初始化文件相关的内容,以及构建文件reader
。p.harvesters.Start(h)
运行harvester
这里我们具体看下p.harvesters.Start(h)
这个方法,其的作用就是继续在一个go routine
中启动harvester.run
。
1 | // Start starts the given harvester and add its to the registry |
而h.Run
其实又是一个interface方法,这里我们继续以log
为例,因此我们这里跟踪filebeat/input/log/harvester.go
中的Run
方法:
1 | // Run start the harvester and reads files line by line and sends events to the defined output |
整个方法主要分为这么几步:
- 创建一个
forwarder
转发器 - 死循环从
reader
里的读取内容 - 通过
farwarder
向下转发
h.sendEvent(data, forwarder)
这段代码将采集的数据发送到下游,内部其实就是用forwarder
转发了数据。
到此整个采集的流程就基本完成了。
感慨
通过以上这些流程的分析,可以看出filebeat中其实大量使用到了go routine
协程。而这也是go
的最大一个特性。filebeat
其实相当于一个大的管理器,他管理者以单个input
文件为单位的n组协程,这样的好处就是他们之间是相互没有影响的,并且由于协程比线程要来的轻量很多,并且对于cpu的上下文切换也要少很多,对于系统资源的消耗也会小很多。那为什么有时候我们又会看到filebeat
占用了近20%的cpu利用率,这个其实就在于forwarder
转发后的处理流程了,如果你引入了过多的计算process
,或者你的output
里有重cpu的计算例如压缩,那当然会消耗更多的cpu资源,因此在使用时还是要评估好你的方案对系统资源的消耗。