Filebeat 浅析(二)

前言

说完了filebeat的基础架构和相关设计后,我们就来看下filebeat是如何来实现这些功能的。这里只对几个相对来说比较重要的几个流程进行讨论。其余的设计会在之后的文章中进行阐述。几个流程为:

  • 启动过程
  • harvester 监听过程(以log为例)
  • outputs 处理流程(以kafka为例)

分析

启动过程

首先当然是找到入口函数,对于go应用来说很简单,直接找main()就可以了。因此我们可以看到:beats/filebeat/main.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"os"

"github.com/elastic/beats/v7/filebeat/cmd"
inputs "github.com/elastic/beats/v7/filebeat/input/default-inputs"
)

// The basic model of execution:
// - input: finds files in paths/globs to harvest, starts harvesters
// - harvester: reads a file, sends events to the spooler
// - spooler: buffers events until ready to flush to the publisher
// - publisher: writes to the network, notifies registrar
// - registrar: records positions of files read
// Finally, input uses the registrar information, on restart, to
// determine where in each file to restart a harvester.
func main() {
if err := cmd.Filebeat(inputs.Init).Execute(); err != nil {
os.Exit(1)
}
}

其中ipnuts.Init是用来获取该filebeat在之前的一些状态信息或者初始化一份状态信息,主要是用来记录目前harvester已监听到、采集到的offset相关信息。cmd.Filebeat用来初始化一个Filebeat对象。cmd.root.Filebeat(),加载&初始化相关的配置,以及构建filebeat对象。下面代码里的beater.New方法会构建了filebeat对象。GenRootCmdWithSettings方法内会构建启动filebeat相关命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Filebeat build the beat root command for executing filebeat and it's subcommands.
func Filebeat(inputs beater.PluginFactory) *cmd.BeatsRootCmd {
var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("once"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("modules"))
settings := instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
}

// 主要是这行代码
command := cmd.GenRootCmdWithSettings(beater.New(inputs), settings)

command.PersistentFlags().AddGoFlag(flag.CommandLine.Lookup("M"))
command.TestCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.SetupCmd.Flags().AddGoFlag(flag.CommandLine.Lookup("modules"))
command.AddCommand(cmd.GenModulesCmd(Name, "", buildModulesManager))
command.AddCommand(genGenerateCmd())
return command
}

GenRootCmdWithSettings函数在filebeat/libbeat/cmd/root.go文件内。

1
2
3
4
5
6
7
8
func GenRootCmdWithSettings(beatCreator beat.Creator, settings instance.Settings) *BeatsRootCmd {
...

rootCmd.RunCmd = genRunCmd(settings, beatCreator)
...
return rootCmd
}

此函数内会调用filebeat/libbeat/cmd/run.go文件内的func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func genRunCmd(settings instance.Settings, beatCreator beat.Creator) *cobra.Command {
name := settings.Name
runCmd := cobra.Command{
Use: "run",
Short: "Run " + name,
Run: func(cmd *cobra.Command, args []string) {
err := instance.Run(settings, beatCreator)
if err != nil {
os.Exit(1)
}
},
}
...
}

根据方法名字就能关注到重点

1
instance.Run(settings, beatCreator)

Run方法内会构建Beat对象,并调用launch方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Run initializes and runs a Beater implementation. name is the name of the
// Beat (e.g. packetbeat or metricbeat). version is version number of the Beater
// implementation. bt is the `Creator` callback for creating a new beater
// instance.
// XXX Move this as a *Beat method?
func Run(settings Settings, bt beat.Creator) error {
...
return handleError(func() error {
...
b, err := NewBeat(name, idxPrefix, version)
...
return b.launch(settings, bt)
}())
}

launch方法内又构建了Beater对象(Beater是个接口),并且在最后调用了Beater接口的Run方法启动。

1
2
3
4
5
6
7
8
9
10
func (b *Beat) launch(settings Settings, bt beat.Creator) error {

...

beater, err := b.createBeater(bt)

...

return beater.Run(&b.Beat)
}

这个接口有多个实现,但是我们这里只需要关注一个,文件filebeat/beater/filebeat.go文件内的Run方法,这里就是启动filebeat的实现。

Run方法内,会构建一个crawler对象,此对象用来采集数据,工作原理其实是对Inputs的包装,Inputs就是filebeat官网介绍的核心组件之一。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config

if !fb.moduleRegistry.Empty() {
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
}

// 构建相关统计信息的chan
waitFinished := newSignalWait()
waitEvents := newSignalWait()

// count active events for waiting on shutdown
wgEvents := &eventCounter{
count: monitoring.NewInt(nil, "filebeat.events.active"),
added: monitoring.NewUint(nil, "filebeat.events.added"),
done: monitoring.NewUint(nil, "filebeat.events.done"),
}
finishedLogger := newFinishedLogger(wgEvents)

registryMigrator := registrar.NewMigrator(config.Registry)
if err := registryMigrator.Run(); err != nil {
logp.Err("Failed to migrate registry file: %+v", err)
return err
}

// input 对于file的 相关进度信息,offset等
stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()

// 构建一个register 用以保存相关数据
// Setup registrar to persist state
registrar, err := registrar.New(stateStore, finishedLogger, config.Registry.FlushTimeout)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
}

// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

// 构建相关pipeline,internal queu,将上下游形成一个闭环
// input --> harvester --> event --> processors --> queue --> publisher --> outputs
// setup event counting for startup and a global common ACKer, such that all events will be
// routed to the reigstrar after they've been ACKed.
// Events with Private==nil or the type of private != file.State are directly
// forwarded to `finishedLogger`. Events from the `logs` input will first be forwarded
// to the registrar via `registrarChannel`, which finally forwards the events to finishedLogger as well.
// The finishedLogger decrements the counters in wgEvents after all events have been securely processed
// by the registry.
fb.pipeline = withPipelineEventCounter(b.Publisher, wgEvents)
fb.pipeline = pipetool.WithACKer(fb.pipeline, eventACKer(finishedLogger, registrarChannel))

// Filebeat by default required infinite retry. Let's configure this for all
// inputs by default. Inputs (and InputController) can overwrite the sending
// guarantees explicitly when connecting with the pipeline.
fb.pipeline = pipetool.WithDefaultGuarantees(fb.pipeline, beat.GuaranteedSend)

outDone := make(chan struct{}) // outDone closes down all active pipeline connections
pipelineConnector := channel.NewOutletFactory(outDone).Create

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
}

inputsLogger := logp.NewLogger("input")
v2Inputs := fb.pluginFactory(b.Info, inputsLogger, stateStore)
v2InputLoader, err := v2.NewLoader(inputsLogger, v2Inputs, "type", cfg.DefaultType)
if err != nil {
panic(err) // loader detected invalid state.
}

var inputTaskGroup unison.TaskGroup
defer inputTaskGroup.Stop()
if err := v2InputLoader.Init(&inputTaskGroup, v2.ModeRun); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
}

inputLoader := channel.RunnerFactoryWithCommonInputSettings(b.Info, compat.Combine(
compat.RunnerFactory(inputsLogger, b.Info, v2InputLoader),
input.NewRunnerFactory(pipelineConnector, registrar, fb.done),
))
moduleLoader := fileset.NewFactory(inputLoader, b.Info, pipelineLoaderFactory, config.OverwritePipelines)

// input 相关
crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
}

// The order of starting and stopping is important. Stopping is inverted to the starting order.
// The current order is: registrar, publisher, spooler, crawler
// That means, crawler is stopped first.

// Start the registrar
err = registrar.Start()
if err != nil {
return fmt.Errorf("Could not start registrar: %v", err)
}

// Stopping registrar will write last state
defer registrar.Stop()

// Stopping publisher (might potentially drop items)
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
close(outDone) // finally close all active connections to publisher pipeline
}()

// Wait for all events to be processed or timeout
defer waitEvents.Wait()

if config.OverwritePipelines {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)
if err != nil {
crawler.Stop()
return fmt.Errorf("Failed to start crawler: %+v", err)
}

// If run once, add crawler completion check as alternative to done signal
if *once {
runOnce := func() {
logp.Info("Running filebeat once. Waiting for completion ...")
crawler.WaitForCompletion()
logp.Info("All data collection completed. Shutting down.")
}
waitFinished.Add(runOnce)
}

// Register reloadable list of inputs and modules
inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline)
reload.Register.MustRegisterList("filebeat.inputs", inputs)
// modules 载入
modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline)
reload.Register.MustRegisterList("filebeat.modules", modules)

var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adiscover, err = autodiscover.NewAutodiscover(
"filebeat",
fb.pipeline,
cfgfile.MultiplexedRunnerFactory(
cfgfile.MatchHasField("module", moduleLoader),
cfgfile.MatchDefault(inputLoader),
),
autodiscover.QueryConfig(),
config.Autodiscover,
b.Keystore,
)
if err != nil {
return err
}
}
adiscover.Start()

// Add done channel to wait for shutdown signal
waitFinished.AddChan(fb.done)
waitFinished.Wait()

// Stop reloadable lists, autodiscover -> Stop crawler -> stop inputs -> stop harvesters
// Note: waiting for crawlers to stop here in order to install wgEvents.Wait
// after all events have been enqueued for publishing. Otherwise wgEvents.Wait
// or publisher might panic due to concurrent updates.
inputs.Stop()
modules.Stop()
adiscover.Stop()
crawler.Stop()

timeout := fb.config.ShutdownTimeout
// Checks if on shutdown it should wait for all events to be published
waitPublished := fb.config.ShutdownTimeout > 0 || *once
if waitPublished {
// Wait for registrar to finish writing registry
waitEvents.Add(withLog(wgEvents.Wait,
"Continue shutdown: All enqueued events being published."))
// Wait for either timeout or all events having been ACKed by outputs.
if fb.config.ShutdownTimeout > 0 {
logp.Info("Shutdown output timer started. Waiting for max %v.", timeout)
waitEvents.Add(withLog(waitDuration(timeout),
"Continue shutdown: Time out waiting for events being published."))
} else {
waitEvents.AddChan(fb.done)
}
}

return nil
}

至此整个filebeat启动流程基本清晰了。

采集

数据采集其实就是由crawler来完成的,他的主要组件就是harvester。那我们现在就来先看crawler的创建过程,入口函数在filebeat/beater/filebeat.go文件内的Run方法中的crawler, err := newCrawler(inputLoader, moduleLoader, config.Inputs, fb.done, *once),具体在filebeat/beater/crawler.go中,然后就是启动err = crawler.Start(fb.pipeline, config.ConfigInput, config.ConfigModules)`,具体看这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// Start starts the crawler with all inputs
func (c *crawler) Start(
pipeline beat.PipelineConnector,
configInputs *common.Config,
configModules *common.Config,
) error {
log := c.log

log.Infof("Loading Inputs: %v", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
// 1、构建input对象并运行
err := c.startInput(pipeline, inputConfig)
if err != nil {
return fmt.Errorf("starting input failed: %+v", err)
}
}

if configInputs.Enabled() {
// 构建reloader,动态监听配置文件更新,非主流程,这里不累赘
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %+v", err)
}
}

if configModules.Enabled() {
// 同上
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %+v", err)
}
}

if c.inputReloader != nil {
go func() {
c.inputReloader.Run(c.inputsFactory)
}()
}
if c.modulesReloader != nil {
go func() {
c.modulesReloader.Run(c.modulesFactory)
}()
}

log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))

return nil
}

整个代码主要就是启动input对象,我们继续看startInput这个方法,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *common.Config,
) error {
if !config.Enabled() {
return nil
}

var h map[string]interface{}
config.Unpack(&h)
id, err := hashstructure.Hash(h, nil)
if err != nil {
return fmt.Errorf("can not compute id from configuration: %v", err)
}
if _, ok := c.inputs[id]; ok {
return fmt.Errorf("input with same ID already exists: %v", id)
}

runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("Error while initializing input: %+v", err)
}
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
}

c.inputs[id] = runner

c.log.Infof("Starting input (ID: %d)", id)
runner.Start()

return nil
}

注意方法最后的runner.Start(),就是在启动input。这里的Runner是对input的一个封装,用于管理input的生命周期,而Start方法就是通过goroutine调用其的Run方法,而Run的本质就是调用对于inputRun方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
type Input interface {
Run()
Stop()
Wait()
}
type Runner struct {
config inputConfig
input Input
done chan struct{}
wg *sync.WaitGroup
Once bool
beatDone chan struct{}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Start starts the input
func (p *Runner) Start() {
p.wg.Add(1)

onceWg := sync.WaitGroup{}
if p.Once {
// Make sure start is only completed when Run did a complete first scan
defer onceWg.Wait()
}

onceWg.Add(1)
inputList.Add(p.config.Type)
// Add waitgroup to make sure input is finished
go func() {
defer func() {
onceWg.Done()
p.stop()
p.wg.Done()
}()

p.Run()
}()
}

// Run starts scanning through all the file paths and fetch the related files. Start a harvester for each file
func (p *Runner) Run() {
// Initial input run
p.input.Run()

// Shuts down after the first complete run of all input
if p.Once {
return
}

for {
select {
case <-p.done:
logp.Info("input ticker stopped")
return
case <-time.After(p.config.ScanFrequency):
logp.Debug("input", "Run input")
p.input.Run()
}
}
}

由于我们最常用的就是通过filebeat来采集日志,我们这里就以log为例,来看其的具体流程,文件为filebeat/input/log/input.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Run runs the input
func (p *Input) Run() {
logp.Debug("input", "Start next scan")

// TailFiles is like ignore_older = 1ns and only on startup
if p.config.TailFiles {
ignoreOlder := p.config.IgnoreOlder

// Overwrite ignore_older for the first scan
p.config.IgnoreOlder = 1
defer func() {
// Reset ignore_older after first run
p.config.IgnoreOlder = ignoreOlder
// Disable tail_files after the first run
p.config.TailFiles = false
}()
}

// 扫码,核心方法
p.scan()

// It is important that a first scan is run before cleanup to make sure all new states are read first
if p.config.CleanInactive > 0 || p.config.CleanRemoved {
beforeCount := p.states.Count()
cleanedStates, pendingClean := p.states.Cleanup()
logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d",
beforeCount, beforeCount-cleanedStates, pendingClean)
}

// Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first
if p.config.CleanRemoved {
for _, state := range p.states.GetStates() {
// os.Stat will return an error in case the file does not exist
stat, err := os.Stat(state.Source)
if err != nil {
if os.IsNotExist(err) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed: %s", state.Source)
} else {
logp.Err("input state for %s was not removed: %s", state.Source, err)
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source, p.config.Type, p.meta, p.fileStateIdentifier)
if state.IdentifierName != newState.IdentifierName {
logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName)
state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state)
}
if !state.IsEqual(&newState) {
p.removeState(state)
logp.Debug("input", "Remove state of file as its identity has changed: %s", state.Source)
}
}
}
}
}

其中p.scan()是其中的核心方法,跟踪进去看具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// Scan starts a scanGlob for each provided path/glob
func (p *Input) scan() {
var sortInfos []FileSortInfo
var files []string

paths := p.getFiles()

var err error

if p.config.ScanSort != "" {
sortInfos, err = getSortedFiles(p.config.ScanOrder, p.config.ScanSort, getSortInfos(paths))
if err != nil {
logp.Err("Failed to sort files during scan due to error %s", err)
}
}

if sortInfos == nil {
files = getKeys(paths)
}

for i := 0; i < len(paths); i++ {

var path string
var info os.FileInfo

if sortInfos == nil {
path = files[i]
info = paths[path]
} else {
path = sortInfos[i].path
info = sortInfos[i].info
}

select {
case <-p.done:
logp.Info("Scan aborted because input stopped.")
return
default:
}

newState, err := getFileState(path, info, p)
if err != nil {
logp.Err("Skipping file %s due to error %s", path, err)
}

// Load last state
isNewState := p.states.IsNew(newState)

// Ignores all files which fall under ignore_older
if p.isIgnoreOlder(newState) {
err := p.handleIgnoreOlder(isNewState, newState)
if err != nil {
logp.Err("Updating ignore_older state error: %s", err)
}
continue
}

// Decides if previous state exists
if isNewState {
logp.Debug("input", "Start harvester for new file: %s", newState.Source)
// 发现是新文件,需构建新的harvester对象并运行
err := p.startHarvester(newState, 0)
if err == errHarvesterLimit {
logp.Debug("input", harvesterErrMsg, newState.Source, err)
continue
}
if err != nil {
logp.Err(harvesterErrMsg, newState.Source, err)
}
} else {
lastState := p.states.FindPrevious(newState)
p.harvestExistingFile(newState, lastState)
}
}
}

scan方法内首先获取所有的文件。其次获取文件状态,根据状态来判定收集最新数据,还是从历史文件收集。文件收集会构建Harvester对象。p.startHarvester(newState, 0)是其核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// startHarvester starts a new harvester with the given offset
// In case the HarvesterLimit is reached, an error is returned
func (p *Input) startHarvester(state file.State, offset int64) error {
if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 {
p.numHarvesters.Dec()
harvesterSkipped.Add(1)
return errHarvesterLimit
}
// Set state to "not" finished to indicate that a harvester is running
state.Finished = false
state.Offset = offset

// Create harvester with state
// 创建harvester
h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() })
if err != nil {
p.numHarvesters.Dec()
return err
}

// 对harvester 进行配置
err = h.Setup()
if err != nil {
p.numHarvesters.Dec()
return fmt.Errorf("error setting up harvester: %s", err)
}

// Update state before staring harvester
// This makes sure the states is set to Finished: false
// This is synchronous state update as part of the scan
h.SendStateUpdate()
// 启动harvester
if err = p.harvesters.Start(h); err != nil {
p.numHarvesters.Dec()
}
return err
}

整个方法的核心内容可以概括成三步:

  1. p.createHarvester构建harvester
  2. p.Setup配置harvesterSetup方法内会初始化文件相关的内容,以及构建文件reader
  3. p.harvesters.Start(h)运行harvester

这里我们具体看下p.harvesters.Start(h)这个方法,其的作用就是继续在一个go routine中启动harvester.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Start starts the given harvester and add its to the registry
func (r *Registry) Start(h Harvester) error {
// Make sure stop is not called during starting a harvester
r.Lock()
defer r.Unlock()

// Make sure no new harvesters are started after stop was called
if !r.active() {
return errors.New("registry already stopped")
}

r.wg.Add(1)

// Add the harvester to the registry and share the lock with stop making sure Start() and Stop()
// have a consistent view of the harvesters.
r.harvesters[h.ID()] = h

go func() {
defer func() {
r.remove(h)
r.wg.Done()
}()
// Starts harvester and picks the right type. In case type is not set, set it to default (log)
err := h.Run()
if err != nil {
logp.Err("Error running input: %v", err)
}
}()
return nil
}

h.Run其实又是一个interface方法,这里我们继续以log为例,因此我们这里跟踪filebeat/input/log/harvester.go中的Run方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
// Allow for some cleanup on termination
if h.onTerminate != nil {
defer h.onTerminate()
}

outlet := channel.CloseOnSignal(h.outletFactory(), h.done)
forwarder := harvester.NewForwarder(outlet)

// This is to make sure a harvester is not started anymore if stop was already
// called before the harvester was started. The waitgroup is not incremented afterwards
// as otherwise it could happened that between checking for the close channel and incrementing
// the waitgroup, the harvester could be stopped.
// Here stopLock is used to prevent a data race where stopWg.Add(1) below is called
// while stopWg.Wait() is executing in a different goroutine, which is forbidden
// according to sync.WaitGroup docs.
h.stopLock.Lock()
h.stopWg.Add(1)
h.stopLock.Unlock()
select {
case <-h.done:
h.stopWg.Done()
return nil
default:
}

defer func() {
// Channel to stop internal harvester routines
h.stop()

// Makes sure file is properly closed when the harvester is stopped
h.cleanup()

harvesterRunning.Add(-1)

// Marks harvester stopping completed
h.stopWg.Done()
}()

harvesterStarted.Add(1)
harvesterRunning.Add(1)

// Closes reader after timeout or when done channel is closed
// This routine is also responsible to properly stop the reader
go func(source string) {
closeTimeout := make(<-chan time.Time)
// starts close_timeout timer
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", source)
// Required when reader loop returns and reader finished
case <-h.done:
}

h.stop()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)

h.doneWg.Add(1)
go func() {
h.monitorFileSize()
h.doneWg.Done()
}()

for {
select {
case <-h.done:
return nil
default:
}

message, err := h.reader.Next()
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
case reader.ErrLineUnparsable:
logp.Info("Skipping unparsable line in file: %v", h.state.Source)
//line unparsable, go to next line
continue
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
}
return nil
}

// Get copy of state to work on
// This is important in case sending is not successful so on shutdown
// the old offset is reported
state := h.getState()
startingOffset := state.Offset
state.Offset += int64(message.Bytes)

// Stop harvester in case of an error
if !h.onMessage(forwarder, state, message, startingOffset) {
return nil
}

// Update state of harvester as successfully sent
h.state = state

// Update metics of harvester as event was sent
h.metrics.readOffset.Set(state.Offset)
h.metrics.lastPublished.Set(time.Now())
h.metrics.lastPublishedEventTimestamp.Set(message.Ts)
}
}

整个方法主要分为这么几步:

  1. 创建一个forwarder转发器
  2. 死循环从reader里的读取内容
  3. 通过farwarder向下转发

h.sendEvent(data, forwarder)这段代码将采集的数据发送到下游,内部其实就是用forwarder转发了数据。

到此整个采集的流程就基本完成了。

感慨

通过以上这些流程的分析,可以看出filebeat中其实大量使用到了go routine协程。而这也是go的最大一个特性。filebeat其实相当于一个大的管理器,他管理者以单个input文件为单位的n组协程,这样的好处就是他们之间是相互没有影响的,并且由于协程比线程要来的轻量很多,并且对于cpu的上下文切换也要少很多,对于系统资源的消耗也会小很多。那为什么有时候我们又会看到filebeat占用了近20%的cpu利用率,这个其实就在于forwarder转发后的处理流程了,如果你引入了过多的计算process,或者你的output里有重cpu的计算例如压缩,那当然会消耗更多的cpu资源,因此在使用时还是要评估好你的方案对系统资源的消耗。