-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Filebeat input v2 #14229
[WIP] Filebeat input v2 #14229
Conversation
log *logp.Logger | ||
} | ||
|
||
func NewTestRunner( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewTestRunner should have comment or be unexported
"github.com/elastic/go-concert/chorus" | ||
) | ||
|
||
// TestRunner can be used to manage the test run of one or multiple inputs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment on exported type InputTestRunner should be of the form "InputTestRunner ..." (with optional leading article)
filebeat/input/v2/loader.go
Outdated
"github.com/elastic/beats/libbeat/logp" | ||
) | ||
|
||
type Loader struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type Loader should have comment or be unexported
return i.Input.Test(closer, log) | ||
} | ||
|
||
func (i *ConfiguredInput) CreateRunner(ctx Context) (Runner, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method ConfiguredInput.CreateRunner should have comment or be unexported
waiter sync.WaitGroup | ||
} | ||
|
||
func (i *ConfiguredInput) TestInput(closer *chorus.Closer, log *logp.Logger) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method ConfiguredInput.TestInput should have comment or be unexported
Input RunnableInput | ||
} | ||
|
||
type RunnableInput interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type RunnableInput should have comment or be unexported
"github.com/elastic/go-concert/chorus" | ||
) | ||
|
||
type ConfiguredInput struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type ConfiguredInput should have comment or be unexported
import "errors" | ||
|
||
var ( | ||
ErrNoTypeConfigured = errors.New("No type configured") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported var ErrNoTypeConfigured should have comment or be unexported
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@urso I like the proposal we have been talking about this for a long time (years?)
Just some notes in related in the context of the agent and beats.
Adding custom
Observer is really powerful, also we can easily chain them together to make sure all events are sends. Here a few uses cases I can see:
- Expose the Status of the plugins through the API. (active, failed, error..)
- Expose monitoring states, (number of running inputs, failures..)
- Return errors to fleet in a uniform way we can, augment the errors if need to attach more information.
Looking at the Plugin definition, I think the output and processors can match most of the defined interface. We address in this proposal how plugins are registered but I think before making the change we also need to define how plugins are managed (started, stopped, removed)
Looking at the concrete configuration do you think the following is the end goal of this proposal, received an untyped configuration, resolve the configuration to a concrete configuration and apply the changes to the currently running system.
I am asking this because I believe the following:
- A configuration from the filesystem
- A dynamic configuration from autodiscover
- A configuration from the Agent.
For me the all above is always about creating a config and the system should be able to decide what needs to be done to run that configuration (either partially or complete). This changes the behavior of autodiscover, instead of creating the configuration on the fly or inputs and start directly the inputs. It will add or remove part of the configuration and the global current configuration is used as a way to decide what needs to be run in the system.
Note that the above is necessary to control the pressure and events created in the system, but this allows that all the different sources of configuration to have the same path and behave in a uniform manner.
// The input is only supposed to be a types place holder for the untyped | ||
// configuration. | ||
Configure func(log *logp.Logger, config *common.Config) (Input, error) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a maturity in the Plugin definition? Stable, Beta, Experimental, we could also automatically add theses context or data to the logger send to the plugin instance.
filebeat/input/v2/plugin.go
Outdated
// On 'stop' an input must unregister configured tasks | ||
// | ||
// XXX: currently in heartbeat, but we will move it to libbeat | ||
Scheduler *scheduler.Scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I presume we would have something like.
Scheduler.Register(p, Scheduler.Daily, func(...) )
Scheduler.Remove(p)
This would give us the opportunity of having save execution state of the scheduler for restart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it is the heartbeat scheduler. The scheduler itself does not make any assumption about the 'Schedule' besides expecting an interface implementing Next() time.Time
.
One can simply add/remove tasks from the scheduler. A Task can return an array of continuation tasks. A task is done only after all continuations are done. Most simple cases (e.g. Metricbeat) don't need the continuation functionality, but it's fundamental to heartbeat, where it is used to check all known IPs after an DNS query.
This would give us the opportunity of having save execution state of the scheduler for restart.
Given that the schedule itself is just an interface, I think we could add some information like this in the future to the registry (as we do for input reading state). But this will require us to have a constant ID for inputs/tasks that does not change between restarts. I didn't really account for this in this POC.
As a task can produce multiple events, this would require some kind of ref-counting though. Only once all events that are produced by a Task are ACKed can we update the registry. I have an idea how this could work somewhat seamlessly, hiding most of the complexity from developers, but this would require quite some plumbing. Maybe in a follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you have in mind by this scheduler? I understand this could be used for Heartbeat monitors or Metricbeat modules. It would probably make sense to have another example using this API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha... everyone is asking about the scheduler :)
This is one of many PRs... maybe I should remove it for now to reduce confusion.
Original goal is to use it for Heartbeat monitors. Heartbeat monitors already implement the Runner interface (Start and Stop method). I "just" need to pass the scheduler to the constructor, and voilà we can run heartbeat monitor next to metricbeat modules using the same framework.
Using the scheduler for modules is optional. I don't plan to add Metricbeat modules support in this PR. I don't have yet decided if we want to use the scheduler for metricbeat modules (we can discuss it when I add the integration). But I don't plan to force people to rewrite or adapt existing metricsets. Instead I will provide wrappers that will support the different interfaces we have in Metricbeat for reuse. E.g. modules with metricsets only might be turned into Plugins like this in the future:
input.Plugin{
Name: "elasticsearch",
Doc: "elasticsearch metrics collection",
Configure: input.WithDatasets(
input.Metricset{
Name: "index",
New: index.New, // constructor from metricbeat/module/elasticsearch/index package
},
input.Metricset{
Name: "node",
Default: true,
New: node.New, // constructor from metricbeat/module/elasticsearch/node package
},
),
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to remove the scheduler it complexify the current discussion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to removing it now, but this looks promising!
type Input interface { | ||
InputTester | ||
RunnerFactory | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know I have raised the issue directly but input should probably have some kind of ids, we can make then automatically generated and definable by users. The later is probably even better to enforce a unique identifier on each input definition, lets say we want to correlate logs events and specific configuration or monitoring.
For monitoring or reporting events to fleet we can get around that by saving that information in the observer, but that could be useful for log statements too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforcing the id on the logger, ensure that by default everything is scoped to a specific instance without having a programmer adding unnecessary context to the logger, like paths watched.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder where we should enforce the ID. And how to pass it. Maybe it should not be enforced by the plugin API, but the architecture using the plugins would add/manage ids. As the logger and overall context is passed to the plugin, we can have the underlying framework populate this information. In this case the ID could be passed as parameter in the context, in case an input wants to report it.
If the ID is pushed top-down, it is the framework managing IDs. Maybe this could be used together with the scheduler. E.g. if the ID is given (static), then we enable support for storing the schedule/task state in the registry. In this case no the input/plugin developer doesn't need to handle any kind of management info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but the architecture using the plugins would add/manage ids. As the logger and overall context is passed to the plugin.
Correct, we would need some kind of bookkeeping of ids especially if we let people define them, to make sure they are indeed unique.
What if, the ID is a system option (in the yaml) when defining the plugin so plugins author doesn't see it or have to define it. We could make it available to the passed context to the plugin so an author can reference that if needed (identify a client to a remote system maybe?)
Maybe this could be used together with the scheduler. E.g. if the ID is given (static)
The goal for having the scheduler that information is to recover execution information from last tick()? I am assuming that if an inputs configuration is changed we might want to try to keep the same ID as much as possible to keep correlation possible and allow us to answer the following question.
Did the config changes actually improve or not the performance of the input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @urso for kicking this off. Have only skimmed through the code so far, so a few high level questions:
- Shouldn't this code be in
libbeat/input/*
instead offilebeat/...
? - Metricbeat modules: You touched on it quickly in your PR description. You mention " A module will be treated like an Input." My thinking here so far is that a module contains multiple metricsets and the metricsets are based on an input. So I expect the concept of modules and metricsets to stay around in the short and mid term but that much more code can be shared because the metricsets depend on an input. Some of the modules and metricsets will become config only, like we have today in Filebeat.
- State on reloading: One challenge we had in the past is what happens on reloading an input config. The old config must shut down first completely before the new one can be started. In your PR description you mention now, this is not needed anymore because of the registry order writing. Lets assume we have a file
foo.log
. An input tails it and sends all the lines until the end. All lines except the last 2 are ACKED. Now it is stopped and a new input on the same file is started, it continues reading from the cache registry after the last line and sends the data. For some reasons, the last 2 lines of the old input can never be acked. Will this mean the state of the new input will never be written because of the ordering? What happens in this case? - Scheduler: You plan to migrate the scheduler concept from Heartbeat to the input. Will the current concept of defining
period: 5s
still exists or will the user have to usecron
definitions?
@ruflin Concerning the Scheduler I think its just implementation details. a cron or a period of 5s is the same implementation, its only how a user defines it change. So I think most of the time we would keep the period because it's an easier context to understand. |
An input is not reconfigurable, as If we introduce IDs, then we need some other component that keeps track of an IDs state. The per ID state would be: current Input (typed configuration), active runner, should input (typed configuration). On config change the system would need to signal stop to the active runner and create a new one in the meantime. The eventual consistency on the registry will help us in reconfiguring/restarting a runner somewhat in time.
I'm not sure I can follow here. I agree we need some change in autodiscovery on how is-state and should-state is handled. But I don't think that I want to push the responsibility into the input. Within this PR it is out of scope, as I don't plan to adapt auto-discovery yet (although I'm preparing for it). |
I'm still experimenting with the interface. Originally filebeat was the target. If we think it is reusable enough we still can move it to libbeat later (this is still a feature branch and more changes will be coming).
Hm... maybe you are mixing some conceptes here. As more low level entity we have sources. See filebeat/inputsource package. Conceptually an Input is not the data collector itself, but an input can set up and run many collectors/harvestors based on its configuration. Configuration wise a module is the entity to be configured. Users selectively enable metricsets within a module configuration. In this sense a metricbeat module is rather similar to an filebeat input (like log collector). The main differentiator is: filebeat inputs (like log input) collect many independent sources of the same type, while modules collect dependent sources (same service) of a different type. The aim is to reflect the entities that make a module at the config level right in code. This is currently not the case. Instead we deal with hidden registries, wrappers and many different possible interfaces a module can implement. We want to reduce the amount of magic. Having a module implementation or not is optional. My idea is to always force the module plugin to be defined explicitely in code and reuse it. We can provide wrapper types that will allow you to combine metricsets into a module definition. As I'm targeting filebeat first, I don't really have sample code for this yet.
The state in the registry follows the in-memory state. All updates to the registry are still required to be order. But the registry is only updated after an event has been ACKed. Part of the idea is to relax the dependency of the 'observable state' on the internal state. Observability in this sense is a measure of: how well can we infer the internal state from the observed state. In this case observable state is events published + registry written. As long as we continue from the last 'observed state', and as long there is no chance of a gap, an user can not tell for sure if the observed state is really in line with in-memory state or not. The in-memory state can be ahead. To some extend this is already the case. Whenever we publish an event in filebeat that is not yet ACKed (still in some queue), we have some internal read state ahead of the observable state. But once we want to close a file or reconfigure a harvester, we used to enforce some kind of sync between observable state and in-memory state. Like a barrier. Only after ensuring we are in sync we used to continue. By requiring the final sync can prevent autodiscovery based reconfiguration to work correctly if the outputs are blocked or the downstream system is slowed down => control functionalty has a strong dependency on data publishing functionality. Given that the read state is bound to a file for example, and only managed/updated by an input temporarily (while the input is active), all state updates for the file will be linear and orderd. We do not allow 2 inputs to collect the same file/resource concurrently. This allows us to "release" the resource and give it to any other input at any point in time. There is no real need to wait for the ouputs.
This is still a case. In order to process a resource one needs to hold the exclusive lock on the resource. An input must shut down and release the resource before it can be acquired by another input. The new input will not read the current read state from the registry file, but from the internal state that might be ahead of the state in the registry file (we already have local state per input in filebeat). All read state updates for the registry file (by the new input) will be scheduled after all the in-flight read state updates from the active old input configuration.
In filebeat events are ordered. If the last 2 lines from the old input are not ACKed, then the new input will not make any progress. The registry file will keep the old state. If the beat is restarted it will start by beginning the last 2 lines that have not been ACKed yet. If events can never be ACKed (e.g. due to mapping conflicts), then the event is dropped (we don't have a dead letter queue yet). This allows the system to continue making progress. But this is already the case as of today.
I think it is not really required for metricbeat modules, but maybe something we want consider. For heartbeat monitors the scheduler is a must. About In heartbeat we also support special keywords for the schedule like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking awesome! I left some comments and questions
// of an input. | ||
type StatusObserver interface { | ||
// Starting indicates that the input is about to be configured and started. | ||
Starting() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this correctly, developer should not call this one, should it be something private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be public or not (when other people will redefine it in their own package)
The observer is also for a developer, but mostly for internal use. Let's say that in the context of the agent we could have a special observer that keeps track of errors in the system and report them to the fleet.
Other possible builtin/chainable observer
- We could also have a standard reporter that targets the logs.
- One that keeps the local state of the system for the web API that the beat exposes.
So in the sense the StatusObserver is similar to an io.Reader in the usefulness for internal development.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@exekias Do you mean the Starting method in particular, or the interface overall?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was talking about the Starting method only. It seems folks implementing inputs won't need to call it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on how you implement the actual input. We provide some helpers, but the most bare bones implementation would be compatible to Runner as is used by autodiscovery (this is, so we can integrate with reloading + autodiscovery).
// Closer provides support for shutdown signaling. | ||
// It is compatible to context.Context, and can be used to cancel IO | ||
// operations during shutdown. | ||
Closer *chorus.Closer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about actually using context.Context
? It would be more friendly for newcomers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They have a compatible interface so you can use it directly in anywhere that context.Context
is available.
The different that the chorus.Closer
has is support ripple close of go routine
. It a library that we have created a few months ago to experiment with patterns https://github.com/elastic/go-concert/blob/master/chorus/closeref.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds interesting, Is there any documentation I can look at? I'm not very familiar with "ripple close"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both context.Context and chorus.Closer build a hierarchy/tree for passing a closing signal down the chain, but stop if some intermediary has already been closed.
context.Context (besides some exceptions) builds the hierarchy by creating temporary go-routines. This is not optimal, but these go-routines are very short lived, as context.Context is supposed to be somewhat short lived.
The chorus.Closer on the other hand provides some helpers for calling a custom close function while the shutdown signal is propagated (useful for files, low level network sockets or if io.Reader is expected). It works somewhat similar to context.Context, but is supposed to have an extended lifetime. It is ensured that the type can be used when context.Context is expected, so it can be used with any networking libs that support cancelling. The Closer builds a simple tree instead of starting go-routines for signal propagation (reduces memory usage + reduces number of go-routines when creating a trace of all active go-routines).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to put context.Context
on the interface and then pass a chorus.Closer
? My concern is more about maintainability and documentation. Newcomers seeing context.Context
will understand it out of the box, while chorus.Closer
requires some learning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might consider it in the later PRs. Reasons I don't want to use context.Context
yet:
- chorus package doesn't yet accept an interface. for some functionality. We need to work a little on the package
context.Value
is inviting devs to add hacks to Beats (a context might be used by autodiscovery in the future)- actually we just want some means of shutdown signaling. The signaling is compatible to
context.Context
only for convenience. We might consider to make remove this compatibility, but provide a 'casting' function for you to use the signaler withcontext.Context
// The test is supposed to not take too much time. Packages running a tester | ||
// might run the test with a pre-configured timeout. | ||
type InputTester interface { | ||
TestInput(closer *chorus.Closer, log *logp.Logger) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could make sense at the plugin level. You also mention there the schema for config validation, where this would go beyond and actually test the config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Plugin level is just a definition of a feature we can provide. The Input
type is no actual instance of an input collecting data, but a place-holder (interface type) for a configuration that has been read already read. It is used to create an instance later on.
The idea (as described already) is to build a complete typed configuration object in a first step, that then can be used by a command in the second step (no partially initialized 'beater'). For example the Filebeat struct would be:
type filebeat struct {
Inputs []input.Input
...
}
func (fb *filebeat) Test(closer *chorus.Closer, log *logp.Logger) error {
...
for _, input in fb.Inputs {
err := input.TestInput(closer, log)
...
}
...
}
With this in mind (long-term approach) each sub-command provided by beats may have multiple phases until the command can be executed. The read-config phase would be the common first-phase.
filebeat/input/v2/plugin.go
Outdated
// On 'stop' an input must unregister configured tasks | ||
// | ||
// XXX: currently in heartbeat, but we will move it to libbeat | ||
Scheduler *scheduler.Scheduler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you have in mind by this scheduler? I understand this could be used for Heartbeat monitors or Metricbeat modules. It would probably make sense to have another example using this API
|
||
package v2 | ||
|
||
func WhileActive(context Context, fn func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function WhileActive should have comment or be unexported
Input: &kafkaInput{ | ||
config: config, | ||
saramaConfig: saramaConfig, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing ',' before newline in composite literal
|
||
package v2 | ||
|
||
func WhileActive(context Context, fn func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function WhileActive should have comment or be unexported
Input: &kafkaInput{ | ||
config: config, | ||
saramaConfig: saramaConfig, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing ',' before newline in composite literal
|
||
package v2 | ||
|
||
func WhileActive(context Context, fn func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function WhileActive should have comment or be unexported
Input: &kafkaInput{ | ||
config: config, | ||
saramaConfig: saramaConfig, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing ',' before newline in composite literal
Input: &kafkaInput{ | ||
config: config, | ||
saramaConfig: saramaConfig, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing ',' before newline in composite literal
return &InputLoader{plugins: m}, nil | ||
} | ||
|
||
func (l *InputLoader) Configure(log *logp.Logger, config *common.Config) (Input, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method InputLoader.Configure should have comment or be unexported
Type string `config:"type"` | ||
} | ||
|
||
func NewInputLoader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewInputLoader should have comment or be unexported
Configure(log *logp.Logger, config *common.Config) (Input, error) | ||
} | ||
|
||
type InputLoader struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type InputLoader should have comment or be unexported
return &InputLoader{plugins: m}, nil | ||
} | ||
|
||
func (l *InputLoader) Configure(log *logp.Logger, config *common.Config) (Input, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported method InputLoader.Configure should have comment or be unexported
Type string `config:"type"` | ||
} | ||
|
||
func NewInputLoader( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported function NewInputLoader should have comment or be unexported
Configure(log *logp.Logger, config *common.Config) (Input, error) | ||
} | ||
|
||
type InputLoader struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exported type InputLoader should have comment or be unexported
ef8c2be
to
40bed88
Compare
f8cbeb2
to
dc050b0
Compare
19757f0
to
fb3b053
Compare
Closing. This PR will be replaced by smaller PRs and meta issue with details of the current implementation. |
Depends on: #14144, #14079, #12908
Note: This is a draft PR for discussing overal changes/apporach. Once it stabilizes I will split it up the PR into smaller ones to ease actual reviewing.
This PR experiments with an alternative input API for Beats. The goal is to provide a common library that can run Filebeat inputs, Metricbeat modules, and Heartbeat monitors.
Work is still in progress, and more PRs will follow. The new registry store has been already implemented in the
feature-new-registry-file
branch, and the interfaces defined here will make heavy use of it.Currently inputs/modules in are registered globally by constructor. The constructor is responsible for parsing the untyped configuration, and directly staring the data collection. But there is no good means to only Test a configuration, do any other kind of validation upfront. Due to this we work with untyped configurations in too many places, plus the Beats commands are all entangled with Beats startup in order to parse a subset of the configuration (code in libbeat/cmd is very difficult to maintain).
The plugin API presented here encourages the use of concrete types. We also remove the need for the global registry.
Plugin authors define an input plugin by filling out the Plugin data structure:
A beat links/provides input plugins by creating a 'Loader' instance with all the plugins that the beat want to provide. for example:
A loader only implements the interface
{ Configure(log *logp.Logger, config *common.Config) (Input, error) }
.NOTE: So to not repeat input definitions, I'm considering profile packages with a set of default Plugins exported.
NOTE: The loader is still subject to change, so we can mix multiple profiles for example.
The Input returned by Configure is supposed to be an interface that replaces the untyped common.Config. Once establed, it should not be modifyable. Using this pattern (hopefully with other features in the future), Beats will be able to parse the complete configuration into an typed AST, such that *common.Config is not required anymore after the parsing step. This should help to decouple config validation, parsing, and command execution. Each sub-command will be able to operate on the config AST, instead of an partially-initialized beats instance.
NOTE: full config validation (also checking for indentation errors and typos) is a tricky task, due to the beats flexibility and therefore not adressed here. 2 possible solutions comming to mind:
An Input needs to implement
TestInput
, which can be used by via CLI, or the agent to test a configuration, without actually running it. An input could for example try to connect to the host we should collect data from.The Input also acts as a Runner factory. The Runner factory somewhat 'complicates' the inputs interface and setup phases. But by being a runner factory, we can more easily reuse the input api as is with autodiscovery and other beats subsystems. To help developers implementing the Input, we provide a ConfiguredInput type:
This code sets up the kafka input for example (all filebeat inputs can be setup using the same pattern):
The kafka input has no Start/Stop/Wait method for handling shutdown anymore. We also get rid of the required factory/constructor, that was used to hold state required for shutdown handling. All required state (e.g. network connections) is to be managed in the Run method only.
If the input is started, Run will be called. If Run returns, the input is stopped. If autodiscovery/config-reloading/agent calls 'Stop', then a shutdown signal is propagated via
context.Closer
, which is also compatible with context.Context.The context provided to the Input is defined like this:
Note: we do not pass monitoring support. The StatusObserver is required to report the input its status. For monitoring support we should find concrete types/interfaces to report common metrics in an unified way.
Inputs/modules/monitors support (still TODO in later PRs):
input.Context
.Input Status is a new concern we have in Beats. For use with the http status API, or agent, or Beats monitoring UI, we want to be able to collect and display an per input status. The current interface to report the status is defined as:
The kafka input is an good example with temporary network errors, that might require the Beat to reconnect to the cluster. The kafka inputs Run method becomes:
One main issue we want to address with this API is the state handling in the regsitry. The store and resource part is implemented in #14079. The
input.Context
provides aStoreAccessor
. Using the accessor we do allow inputs to configure a the registry they want to access. Resources locks and resource state is local to a shared store. All inputs accessing a store X, have a reference to the same shared store. The store acts as a key value store.Resources in a store are accessed by key. A Resource must be locked, in order to modify it. This guarantees that no 2 inputs can read and modify global state for the same resource.
Registry file update operations can be immediated or deferred. An update operation can update selected fields only. Immediate operations are directly written to the store. Deferred updates are supposed to be written to the registry after the event has been ACKed by the output.
A resource state should be split into resource metadata, and resource read state. Use immediate updates for metadata only (that are required to correctly track the resource in the future), and use deferred updates for read state changes (e.g. file offset).
So to decouple state updates from output ACK delays, deferred updates are actually written to some intermediate cache layer in the Store. If an input unlocks and returns, the state in the store can be ahead of the state in the registry file. The registry file is eventually consistent. The cached state is removed from the caching layer only after all deferred updates have been written to the registry file (registry file is consistent with all state updates so far).
By allowing the registry to be eventually consistent, we do not have to block on shutdown and can allow an input to be reconfigured immediately (or allow another input to pick up a resource). The reconfigured input can immediately start from the cached state.
As update operations need to pass the publisher pipeline, it is the publisher pipeline (with ordered ACKs), that guarantees that the read state updates are ACKed (and written to the registry) in correct order. This also helps reloading via the Agent or autodiscovery, as inputs can be stopped more immediate.
TODO: per input autodiscovery hints support.
Right now autodiscovery hints support is some per beat global subsystems, constructing configuration objects from docker/k8s event meta-data. The implementation is not really aware of the configured inputs capabilities, but happily configures some settings that might apply. This creates a coupling between autodiscovery hints and inputs (you need to know the right setting names in inputs), although both are managed independently in code.
Plus unsupported settings for might be configured for the wrong input, without the user noticing.
Using the 'Plugin' definition I'm thinking to move hints support to the input itself, such that input configuration and hints are managed by the input author. We might also consider some kind of validation to point out invalid hints for the input to be configured.