Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce spooling to disk #6581

Merged
merged 4 commits into from
Apr 5, 2018
Merged

Introduce spooling to disk #6581

merged 4 commits into from
Apr 5, 2018

Conversation

urso
Copy link

@urso urso commented Mar 16, 2018

This PR implements the queue.Queue interface, adding spooling to disk functionality to all beats. The queue interface requires all queues to provide 'connections' for Producers and Consumers. The interface also demands all ACKs to be executed asynchronously. Once an event is ACKed, it must not be presented to a Consumer (the outputs worker queue) anymore.

The new queue type is marked as Beta-feature.

In the spool, the events are stored in a dynamic ring buffer, in one single file only. The maximum file size must be configured at startup (default 100MiB). The file layer needs to use available file space for event data and additional meta data. Events are first written into a write buffer, which is flushed once it is full or if the flush timeout is triggered. No limit on event sizes is enforced. An event being bigger then the write buffer will still be accepted by the spool, but will trigger a flush right away. A successful flush also requires 2 fsync operations (1. for data 2. update file header). The spool blocks all producers, once it can not allocate any more space within the spool file. Writing never grows the file past the configured maximum file size. All producers are handled by the inBroker. There is no direct communication between producers and consumers. All required signaling and synchronisation is provided by the file layer.

Once the write buffer is flushed, a signal is returned to each individual Producer, notifying the producers of events being published. This signal is used by filebeat/winlogbeat to update the registry file.

Consumers are handled by the outBroker. Consumers request a batch of events. The broker reads up to N message from the file and forwards these. The reading process is 'readonly' and does not update the on-disk read pointers (only in-memory read pointers are updated).
The file is memory mapped for reading. This will increases the process it's reported memory usage significantly.

The outputs asynchronously ACK batches. The ACK signals are processed by the brokers ackLoop. Due to load-balancing or retries, ACKs can be received out of order. The broker guarantees, ACKs are sorted in the same order events have been read from the queue. Once a continuous set of events (starting from the last on-disk read pointer) is ACKed, the on-disk read pointer is update and space occupied by ACKed events is freed. As free space is tracked by the file, the file meta-data must be updated. If no more space for file meta data updates is available, there is a chance of the file potentially growing a few pages past max-size. Growing is required to guarantee progress (otherwise the spool might be stalled forever). In case the file did grow on ACK, the file layer will try to free the space with later write/ACK operations, potentially truncating the file again.

The file layer is provided by go-txfile. The file is split into pages of equal size. The layer provides transactional access to pages only. All writes (ACK, flushing the write buffer) are handled concurrently, using write transaction. The reader is isolated from concurrent writes/reads, using a read transaction. The last transaction state is never overwritten in a file. If a beat crashes during a write transaction, the most recent committed transaction is still available, so the beat can continue from the last known state upon restart. No additional repair-phase is required.

Known limitations (To be addressed in future PRs):

  • File maximum size can not be changed once file is generated:
    • Add support to grow max size
    • Add support to shrink max size. Shrinking could be made dynamic, trying to return space once pages at end of file are freed
    • Add command to transfer the queue into a new queue file. In case fragmentation prevents dynamic shrinking or user doesn't want to wait.
  • Monitoring metrics do not report already available events upon beats restart (requires some more changes to the queue interface itself).
  • No file related metrics available yet.
  • If file is too small and write buffer to big, queue can become stalled. Potential solution requires (all):
    • Limit maximum event size
    • Generously preallocate meta-area (reserve pages for meta-data only on file creation)
    • Ensure usable data area is always > 2*write buffer -> partial write buffer flush?
    • Startup check validating combination of max_size, data vs. meta area size, event size, write buffer size

@urso urso added the in progress Pull request is currently in progress. label Mar 16, 2018
handlers []func()
}

type Settings struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type Settings should have comment or be unexported

@@ -8,7 +8,7 @@ import (
"github.com/elastic/beats/libbeat/publisher/queue"
)

type QueueFactory func() queue.Queue
type QueueFactory func(t *testing.T) queue.Queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported type QueueFactory should have comment or be unexported

@@ -68,7 +81,7 @@ func configTest(t *testing.T, typ string, configs []string, fn func(t *testing.T
for _, config := range configs {
config := config
t.Run(testName(typ, config), func(t *testing.T) {
t.Parallel()
// t.Parallel()
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: restore capability to run tests in parallel (will need randomized file names)

type outLogger struct {
}

var defaultLogger logger = logp.NewLogger("spool")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should omit type logger from declaration of var defaultLogger; it will be inferred from the right-hand side

Copy link
Member

@ruflin ruflin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I skimmed through the code and left some high level comments. It definitively needs a more detailed review also with knowledge of the file spooler code. One thing that is missing is docs for this PR.

One way I was looking at the code is if we could release this early. As the queue registers itself and only comes into action in case it's enabled it should have no affect on the rest of the code.

This is a feature that needs lots of automated and manual testing to make sure it's bullet proof. Because of this I'm tempted to merged this rather soonish as experimental or beta so people can start to play around with it and provide feedback.

For the testing one thing I'm especially curious is how it works across restarts. Does it continue sending events for example. To me it seems some additinal system tests would be best here.

# is full or the flush_timeout is triggered.
# Once ACKed by the output, events are removed immediately from the queue,
# making space for new events to be persisted.
#spool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call this file or disk? It would make it more obvious that is persistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably align with the naming/convention if/when possible with Logstash persistent queue.

In that case we are using queue.type accepting theses two values: memory (default) persisted

Copy link
Author

@urso urso Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ph @ruflin

Hmmm... introducing queue.type would be a bc-breaking change. As one can have only one queue type, the queue config follows the dictionary style config-pattern like:

queue.<type>:

Whats wrong with spool?

Right now this one implements a FIFO. For metricbeat use cases we might actually introduce another persistent LIFO queue type. This one should have another type. How would you name any of these now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bc-breaking After looking at filebeat.reference.yml

#queue:
  # Queue type by name (default 'mem')
  # The memory queue will present all available events (up to the outputs
  # bulk_max_size) to the output, the moment the output is ready to server
  # another batch of events.
  #mem:
    # Max number of events the queue can buffer.

You are right, actually I think our config make it cleaner. @ruflin @kvch I would vote to keep @urso suggestion.
Concerning the naming: spool or disk or file, I think spool is a really common name for that kind of settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the LIFO, FIFO, its more like a priority, It could also become more complex and take into consideration the kind of events, on a storage system disk usage vs cpu (probably a bad example.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should introduce a type because of the reasons mentioned by @urso. There can only be one queue at the time.

For me spooling can be to disk or memory. Historically we had a spooler in filebeat which kept the data in memory. Other options would be to call it file?

+1 on the proposal from @ph about the priority. FIFO or LIFO is a config option of the queue. It can mean in the background a completely different implementation but the user should not have to worry about that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding priority I was initially thinking the same. But if we have a priority setting, user must be allowed to change the setting between restarts.

But async ACK behaviour of the queue makes the on-disk structure a little more complicated. When introducing stack like functionality, we will end up with a many holes. That is, freeing space will be somewhat more complicated in the LIFO case. I'd like to solve the LIFO case separately, potentially merging both cases into a common file format, later in time. Priority based queue, using heaps might become even more complicated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso Look like a good compromise.

@ruflin You have a point, Disk or File at the discretion of @urso.

# will have no more effect.
#file:
# Location of spool file. This value must be configured.
#path: ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this default to our path.data directory?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should provide a default and path.data make sense in that case, so we can remove the This value must be configured

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, was thinking the same. Will default to ${path.data}/spool.dat.

@@ -148,6 +148,50 @@ auditbeat.modules:
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 1s

# The spool queue will store events in a local spool file, before
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume only 1 queue can be used at the same time. Should we mention that in a comment on line 130?

Copy link
Author

@urso urso Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to? If you configure 2 queue types, beats will report an error that only one is allowed. It's using the dictionary style plugin config we have in quite a few places in beats.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly brought it up as people keep trying to use 2 outputs in Beats and they will for queues. So having it documented makes it easy to point people to it. So they know it already from the docs before running the docs.

@@ -0,0 +1 @@
// Do not remove
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a note on why it should not be removed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change tests to create a temporary directories. Was just nice to have files available after tests (for inspection).

When removing the file, git will remove the directory -> tests would fail.

}

// reg := monitoring.NewRegistry()
pipeline, err := pipeline.Load(info, nil, config.Pipeline, config.Output)
if err != nil {
return err
return fmt.Errorf("Loading pipeline failed: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start error lower case

const (
// Note: Never change order. Codec IDs must be not change in the future. Only
// adding new IDs is allowed.
codecUnknown codecID = iota
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand why we need different codes. It seems it could be configurable through the config but it's not listed in the reference file. I assume it would also cause issues if one changes the code after the beats was running.

It's good for testing to have the different codes but should we for now just set one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruflin the codec is encoded in the header so we can choose the correct encoding on reading, so the queue on disk supports events with mixed codec, so users can change that settings and it should be fine even if there is already events on disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we default to CBORL as the default serialization methods, we should probably benchmark that and use the fastest possible route and we would keep a trace of that.

Also, I am not sure at this point that we should support different format, it seems premature and will increase the support area.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON codec is nice for debugging(assuming I would have to use an hexeditor). Checking out a file in an hexeditor can be complicated enough. No need to make debugging more complicated by having binary data only.

The UBJSON can be removed.

Benchmarks are available in the go-structform tests (using beats events):

BenchmarkDecodeBeatsEvents/packetbeat/structform-json-4         	     200	   9210052 ns/op	  74.12 MB/s	 3251196 B/op	   62062 allocs/op
BenchmarkDecodeBeatsEvents/packetbeat/structform-ubjson-4       	     200	   7440986 ns/op	  87.19 MB/s	 3171845 B/op	   56362 allocs/op
BenchmarkDecodeBeatsEvents/packetbeat/structform-cborl-4        	     200	   7310081 ns/op	  81.46 MB/s	 3171266 B/op	   56361 allocs/op
BenchmarkDecodeBeatsEvents/metricbeat/structform-json-4         	     500	   3333191 ns/op	  62.41 MB/s	 1387710 B/op	   24852 allocs/op
BenchmarkDecodeBeatsEvents/metricbeat/structform-ubjson-4       	     500	   2955721 ns/op	  72.92 MB/s	 1387472 B/op	   24758 allocs/op
BenchmarkDecodeBeatsEvents/metricbeat/structform-cborl-4        	     500	   2816873 ns/op	  67.70 MB/s	 1387136 B/op	   24758 allocs/op
BenchmarkDecodeBeatsEvents/filebeat/structform-json-4           	      30	  46966885 ns/op	 105.49 MB/s	15055009 B/op	  390517 allocs/op
BenchmarkDecodeBeatsEvents/filebeat/structform-ubjson-4         	      30	  38593911 ns/op	 126.92 MB/s	14874314 B/op	  390499 allocs/op
BenchmarkDecodeBeatsEvents/filebeat/structform-cborl-4          	      50	  36702929 ns/op	 123.75 MB/s	14874551 B/op	  390500 allocs/op
BenchmarkEncodeBeatsEvents/packetbeat/structform-json-4         	     200	   5970464 ns/op	 114.26 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/packetbeat/structform-ubjson-4       	     500	   3749375 ns/op	 173.04 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/packetbeat/structform-cborl-4        	     500	   2829715 ns/op	 210.43 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/metricbeat/structform-json-4         	    1000	   2274028 ns/op	  91.03 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/metricbeat/structform-ubjson-4       	    1000	   1672348 ns/op	 128.88 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/metricbeat/structform-cborl-4        	    1000	   1236829 ns/op	 154.18 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/filebeat/structform-json-4           	      50	  32523323 ns/op	 151.97 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/filebeat/structform-ubjson-4         	     100	  22533875 ns/op	 217.37 MB/s	       0 B/op	       0 allocs/op
BenchmarkEncodeBeatsEvents/filebeat/structform-cborl-4          	     100	  15919294 ns/op	 285.32 MB/s	       0 B/op	       0 allocs/op

ubjson is faster on decode, cborl is much faster on encode :)

Copy link
Author

@urso urso Mar 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. the codec setting is not documented for now. But as I have some plans to replace the codec in the future, I added the codec to the header, so upgrades can be smooth (no need to drop old events).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso +1 for thinking ahead for that, since its beta we can the luxury of changing things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For not documenting it: If the config is there, it should be in the reference file. We did this "trick" in the past and I think it didn't benefit us as people discovered the option and started using it. Better have it in the reference file as experimental and say what it does.

If I understand the comment from ph a user can have code A and then configure codec B on restart and everything will keep working smootly? The events are read in with codec A and the next time the file is written, it will be all in codec B.

Copy link
Author

@urso urso Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added codec setting to the reference file.

If I understand the comment from ph a user can have code A and then configure codec B on restart and everything will keep working smootly? The events are read in with codec A and the next time the file is written, it will be all in codec B.

The codec can be change between restarts. The setting is only used by the writer. The reader can easily deal with all codecs within a file.

}

if c.MaxSize < humanize.MiByte {
errs = append(errs, errors.New("max size must be larger 1MiB"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if someone has a multiline event of 2MB and the MaxSize here is 1.1MiB?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there are some "limits" that are not supported.

If event is > total file size, we will block forever.

As event sizes are dynamic and we don't want to employ hard limits on event sizes, we have kind of a weird effect here. There are potential parameters combinations + event sizes that can block the queue. The best we could do is: introduce a max event size and drop events if they hit the limit. This way we can precompute the worst page-usage of an event and figure if max size must be even larger. It's kinda tricky, that's why I didn't do it yet.

# is full or the flush_timeout is triggered.
# Once ACKed by the output, events are removed immediately from the queue,
# making space for new events to be persisted.
#spool:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should mark this beta at first, in the code as a log message, here in the config and in the docs.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// the producer. The producers event (after close) might be processed or
// ignored in the future.
//
// stateBlocked transitions:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of Filebeat this means the harvesters are blocked. In case of metricbeat does means the last events coming in will be dropped? So if the output becomes available again, all events in the queue will be sent and the events which came in between "queue full" and "output available again" will be lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that would be the behavior that we want, not sure how it is done in the form of this current PR.

Few notes:

  1. If the queue is blocked for a short time a small amount of metrics will be lost and metrics will smooth the small gaps, which should still give us a good view of the situation?

  2. If the queue is blocked for a long period of time we could drop a huge amount metrics, I guess the monitoring cluster should detect that behavior by either, a: full queue for a long period of time or b: Didn't receive any metrics snapshot for some time.

Copy link
Author

@urso urso Mar 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same behavior as for the in memory queue. If the queue is full, it is full! If queue is full, it blocks (no event is dropped).

Block vs. drop (beside having similar effects on metricbeat) do have subtle differences. Metricbeat could report/monitor long block periods by having a watchdog checking metricsets actually reporting events within the configured period (or multiple of the configured periods).

It's by design. Administrator pre-allocate/assign disk space for use by beats and beats guarantees (best effort) it does not use any more disk space once the queue is full. Better have beats block if queue is full then making a system go out of space.

#file:
# Location of spool file. This value must be configured.
#path: ""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My vim shows an unnecessary tab here.


# Configure file permissions if file is created. The default value is 0600.
#permissions: 0600

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, an unncessary tab here.


type size uint64

func (c *pathConfig) Validate() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for Validate functions!

active atomic.Bool
done chan struct{}

handlers []func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think calling the list of functions set by users to run when a spoolCtx is too general. I think handlers can be called whenever they are needed. But this list contains only a special kind of handlers which run during closing the session.
I would rather name it onCloseCBs, onCloseCallbacks or closingCallbacks. This change would make more obvious when these functions are called and what their role is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed ^

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urgh... I felt dirty when introducing the handlers. No I'm feeling more "dirty" giving the impression we want multiple of these callbacks. Let's see if I can get rid of handlers or other callbacks.

Also not, these are for internal use only. No external lib/component is ever allowed to install a custom handler! The idea of spoolCtx is to provide a very small required subset of context.Context in order to deal with queue shutdown.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha... handlers is not used anymore for quite some time -> removing it \o/

func (c *consumer) Get(sz int) (queue.Batch, error) {
log := c.ctx.logger

if c.closed.Load() || c.ctx.Closed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to wrap c.closed.Load into a function named c.Closed. Just like it is done in spoolCtx.

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job @urso !

I've went through the code, look goods, I have noted a few things to changes. Can you check the logging statements in the source code? I believe you were using trace with formatted string before moving to log.Debug. So we need to clear out any unnecessary space.

On my side, I need to do the following:

  1. Revise the tests, at first I think we are missing a few unit tests on some of the classes.

  2. Real world human testing on it.

I expect we do theses tasks in followup PRs:

  1. E2E integration tests?
  2. Stress tests?
  3. Recovery scenario?

# will have no more effect.
#file:
# Location of spool file. This value must be configured.
#path: ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should provide a default and path.data make sense in that case, so we can remove the This value must be configured

# is full or the flush_timeout is triggered.
# Once ACKed by the output, events are removed immediately from the queue,
# making space for new events to be persisted.
#spool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably align with the naming/convention if/when possible with Logstash persistent queue.

In that case we are using queue.type accepting theses two values: memory (default) persisted


# Maximum duration after which events are flushed, if the write buffer
# is not full yet. The default value is 1s.
#flush_timeout: 1s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to fsync to disk, should we add a mention that lower values could impact performance?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not document here the effects of lower and higher values. Maybe we want to add some of the tradeoffs to the final asciidoc.

if (m & 0200) == 0 {
errs = append(errs, errors.New("file must be writable by current user"))
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: Should we do a strict check on the permissions of the PQ, when starting up? It's there risk for sensitive data?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, an extra check would make sense. As queue is not encrypted there is indeed the chance of it holding sensitive data.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewSpool checks file permissions if file exists.

type decoder struct {
// TODO: replace with more intelligent custom buffer, so to limit amount of memory the buffer
// holds onto after reading a 'large' event.
// Alternative use strucform decoder types (with shared io.Buffer).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove the TODO and create an issue so we can discuss pros and cons of doing it.

err := b.queue.ACK(uint(ackCh.total))
if err != nil {
log.Debug("ack failed with:", err)
time.Sleep(1 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of this part, is we want to give some backoff on the acking when en error occur.
Suggestion, use an exponential backoff with a fixed cap using 1s by default seems a high?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question is how often this could happen. Only reason ACKing fails is due to disk/write failures (network shares).

With exponential backoff we might want to introduce more configurable parameters. I'd prefer not to introduce additional config parameters for dealing with this kind of fatal disk/io issues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, good for now.

b.active = req
b.total = total
b.timer.Start()
log.Debug(" outbroker (stateActive): switch to stateWithTimer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uneeded space?

b.initState()

case n := <-b.sigFlushed:
// yay, more events \o/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

states, s.clients = s.clients[:n], s.clients[n:]
s.mux.Unlock()
return states
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick, I would have used a defer for the unlock.

active atomic.Bool
done chan struct{}

handlers []func()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed ^

@kvch
Copy link
Contributor

kvch commented Mar 26, 2018

I've finished the first round of review. I added comments with my questions/requests and agree with most of what @ph noted. I am waiting for your changes/responses.

I tested it manually and I haven't found anything problematic so far.

@ph
Copy link
Contributor

ph commented Mar 26, 2018

I did some test manually too over the weekend, was green for me, but it still high level testing.

@urso @kvch I think we need to come up with a solution for stress testing this, do we want to do some chaos monkey suite for that?

@urso
Copy link
Author

urso commented Mar 28, 2018

@ph > So we need to clear out any unnecessary space.

The spaces in logp.Debug are on purpose. It's like a 'multiline' log. The indentation helps me a little to keep context. :/

@ph
Copy link
Contributor

ph commented Mar 29, 2018

The spaces in logp.Debug are on purpose. It's like a 'multiline' log. The indentation helps me a little to keep context. :/

@urso Using a contextual logger wouldn't help in that case?

l := logp.NewLogger("spool").With("ABC", "WYZ")
lo.Debugw("something wrong", "abc", 1234)

:(

@ph
Copy link
Contributor

ph commented Mar 29, 2018

Some files need to be generated:

NOTICE.txt: needs update
auditbeat/auditbeat.reference.yml: needs update
filebeat/filebeat.reference.yml: needs update
heartbeat/heartbeat.reference.yml: needs update
metricbeat/metricbeat.reference.yml: needs update
packetbeat/docs/fields.asciidoc: needs update
packetbeat/packetbeat.reference.yml: needs update
winlogbeat/winlogbeat.reference.yml: needs update
make: *** [check] Error 1

@urso
Copy link
Author

urso commented Mar 29, 2018

Using a contextual logger wouldn't help in that case?

All log statements are from same thread + only one thread does exist. As the state machines can return to very same state, the spaces help a little figuring which logs are part of the current execution of this state. With context based logging, I'd have to create an index and prepare a tmp logger for every step.

Also note, the package provides it's local logger, not relying on logp, which pretty much leaks the implementation of zap (sugar) implementation. The state machine is in the fast path, being executed on every single event. I'd rather prefer not to allocate another temporary context per event.

@ph
Copy link
Contributor

ph commented Mar 29, 2018

Also note, the package provides it's local logger, not relying on logp, which pretty much leaks the implementation of zap (sugar) implementation. The state machine is in the fast path, being executed on every single event. I'd rather prefer not to allocate another temporary context per event.

Make sense, anyway its more for debugging so I think its OK in that case.

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code look good with the changes, will do a bit of testing.

# is full or the flush_timeout is triggered.
# Once ACKed by the output, events are removed immediately from the queue,
# making space for new events to be persisted.
#spool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bc-breaking After looking at filebeat.reference.yml

#queue:
  # Queue type by name (default 'mem')
  # The memory queue will present all available events (up to the outputs
  # bulk_max_size) to the output, the moment the output is ready to server
  # another batch of events.
  #mem:
    # Max number of events the queue can buffer.

You are right, actually I think our config make it cleaner. @ruflin @kvch I would vote to keep @urso suggestion.
Concerning the naming: spool or disk or file, I think spool is a really common name for that kind of settings.

# is full or the flush_timeout is triggered.
# Once ACKed by the output, events are removed immediately from the queue,
# making space for new events to be persisted.
#spool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the LIFO, FIFO, its more like a priority, It could also become more complex and take into consideration the kind of events, on a storage system disk usage vs cpu (probably a bad example.)

const (
// Note: Never change order. Codec IDs must be not change in the future. Only
// adding new IDs is allowed.
codecUnknown codecID = iota
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso +1 for thinking ahead for that, since its beta we can the luxury of changing things.

case codecCBORL:
visitor = cborl.NewVisitor(&e.buf)
case codecUBJSON:
visitor = ubjson.NewVisitor(&e.buf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying that!

cfgPerm := settings.Mode.Perm()

// check if file has permissions set, that must not be set via config
if (perm | cfgPerm) != cfgPerm {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to use common.IsStrictPerms() skip that?

@ph
Copy link
Contributor

ph commented Mar 29, 2018

make check fails with an asciidoc from packetbeat, are you missing a commit?

@ph
Copy link
Contributor

ph commented Mar 29, 2018

@urso I did some testing on the spool with the following scenario:

Scenario:

  • Filebeat
  • /tmp/ph.log 10000 events. (172KB)
  • Output only elasticsearch configured
  • Output Elasticsearch is down.
  • Started with debug -d "*"
  • Remove the registry between run and spool.dat

Memory queue

Start Filebeat with the default memory queues, FB should block because ES is down but you should see that we are trying to connect to it. You should also see a few publish statements with dump of the events.

2018-03-29T16:29:45.149-0400	DEBUG	[elasticsearch]	elasticsearch/client.go:666	ES Ping(url=http://localhost:9200)
2018-03-29T16:29:45.150-0400	DEBUG	[elasticsearch]	elasticsearch/client.go:670	Ping request failed with: Get http://localhost:9200: dial tcp 127.0.0.1:9200: getsockopt: connection refused

Spool

Now enable spooling to disk with the default settings, start FB you see one event in the logs but no connection error from ES.

2018-03-29T16:40:24.407-0400	DEBUG	[publish]	pipeline/processor.go:277	Publish event: {
  "@timestamp": "2018-03-29T20:40:24.406Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "doc",
    "version": "7.0.0-alpha1"
  },
  "source": "/tmp/ph.log",
  "offset": 14,
  "prospector": {
    "type": "log"
  },
  "input": {
    "type": "log"
  },
  "beat": {
    "name": "sashimi",
    "hostname": "sashimi",
    "version": "7.0.0-alpha1"
  },
  "message": "0 hello world"
}
2018-03-29T16:40:24.407-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 0, After: 0, Pending: 0
2018-03-29T16:40:24.407-0400	DEBUG	[registrar]	registrar/registrar.go:286	Registry file updated. 1 states written.
2018-03-29T16:40:34.407-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:34.407-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:34.407-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 0, After: 0, Pending: 0
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:362	Check file for harvesting: /tmp/ph.log
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:448	Update existing file for harvesting: /tmp/ph.log, offset: 14
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:500	Harvester for file is still running: /tmp/ph.log
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 1, After: 1, Pending: 0
2018-03-29T16:40:34.407-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 0, After: 0, Pending: 0
2018-03-29T16:40:44.409-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:44.409-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:44.409-0400	DEBUG	[input]	input/input.go:124	Run input
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 0, After: 0, Pending: 0
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:147	Start next scan
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:362	Check file for harvesting: /tmp/ph.log
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:448	Update existing file for harvesting: /tmp/ph.log, offset: 14
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:500	Harvester for file is still running: /tmp/ph.log
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 1, After: 1, Pending: 0
2018-03-29T16:40:44.409-0400	DEBUG	[input]	log/input.go:168	input states cleaned up. Before: 0, After: 0, Pending: 0

Starting Elasticsearch after doesn't change the situation.

@ph
Copy link
Contributor

ph commented Mar 29, 2018

as discussed in slack @urso I've run the stress tests and they are all green on my machine.

PASS
ok  	github.com/elastic/beats/libbeat/publisher/pipeline/stress	101.419s

@urso
Copy link
Author

urso commented Apr 2, 2018

@ph Fixed filebeat hanging. Fix with details is in this commit. Plus safety-net commit, in case user set flush events to -1.

@urso urso changed the title [WIP] Introduce spooling to disk Introduce spooling to disk Apr 3, 2018
@urso urso added review and removed in progress Pull request is currently in progress. labels Apr 3, 2018
@ph
Copy link
Contributor

ph commented Apr 3, 2018

@urso You last commit fixed the situation and the beats now recover from the situation and send events to Elasticsearch.

I have done a bit more testing I have found another issue, this PR break the update to disk of the registry file.

Scenario

Environment

  • Elasticsearch is stopped.
  • Spool default settings

Tests1

  1. Start Filebeat
  2. read a 10000 events files
  3. Lets FB run for a 3 minutes
  4. Offset is accurate in the Log
2018-04-03T10:01:31.863-0400    DEBUG   [input] log/input.go:448        Update existing file for harvesting: /tmp/ph.log, offset: 168890
2018-04-03T10:01:31.864-0400    DEBUG   [input] log/input.go:500        Harvester for file is still running: /tmp/ph.log
  1. Offset is not in sync in registry on disk?
[{"source":"/tmp/ph.log","offset":0,"timestamp":"2018-04-03T10:01:01.859088-04:00","ttl":-1,"type":"log","FileStateOS":{"inode":4311757072,"device":16777220}}]
  1. Start Elasticsearch
  2. FB recover and send the 10 000 events to ES http://localhost:9200/filebeat-7.0.0-alpha1-2018.04.03/_count
  3. Offset is not updated on the registry on disk.
  4. Offset is OK in memory.
  5. Restarting FB reread all the events.
  6. Registry on disk is not updated.

@ph
Copy link
Contributor

ph commented Apr 3, 2018

I did a few tests concerning when ES was unreachable and recovering in a middle of a read, I didn't see any problem there.

@ph
Copy link
Contributor

ph commented Apr 3, 2018

Found an issue with the Logstash output which might be linked to issue with the registry.

Scenario

Spooling to disk is off.

  • Use the Logstash output without SSL default settings (pipelining: 2)
  • FB Read a 500 000 events files. (EOF is reached)
  • Check Elasticsearch 500 000 events are present.

Spooling to disk is on.

  • Use the Logstash output without SSL default settings (pipelining: 2)
  • FB Read a 500 000 events files. (EOF is reached)
  • Check Elastricsearch only ~31K events are present.
  • No more events are sent

@ph
Copy link
Contributor

ph commented Apr 3, 2018

Same issue with redis, I would expect kafka to have the same problem. goroutine dump at https://gist.github.com/ph/410ccdbe48cd00cd42603a0ec47cdc6a

@urso
Copy link
Author

urso commented Apr 3, 2018

Checking the stack-trace + filebeat offset update problems, it was a missing signal from the queue to the publisher pipeline that the file has been flushed. Signaling is done via channels. After a few flushes the signalling channels have been filled up and the producers ACK handling has deadlocked.

@ph
Copy link
Contributor

ph commented Apr 3, 2018

@urso more testing Logstash with recovering and killing instance. It worked, I had some duplicates, which are expected, but didn't lose events.

Can we cleanup this log statement a bit, I think we should have a space between collected and 2048.

2018-04-03T15:56:47.446-0400    DEBUG   [spool] spool/outbroker.go:239    outbroker (stateActive): events collected2048 2048 <nil>

Also, the registry is now correctly updated, and logs show correctly.

Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, 🎉 we have one faillure for the stress tests, I presume its a timing issue on Travis?

@urso
Copy link
Author

urso commented Apr 3, 2018

Couldn't reproduce stress tests failing. I'm still investigating. CI should be at least somewhat stable before merging.

@elastic elastic deleted a comment from houndci-bot Apr 4, 2018
@ruflin
Copy link
Member

ruflin commented Apr 4, 2018

I think this small change is worth a changelog entry ;-)

@ph
Copy link
Contributor

ph commented Apr 4, 2018

Amazing work @urso ! waiting on Jenkins green.

urso added 4 commits April 5, 2018 18:26
This change implements the `queue.Queue` interface, adding spooling to
disk functionality to all beats. The queue interface requires all
queues to provide 'connections' for Producers and Consumers
The interface also demands all ACKs to be executed asynchronously
Once an event is ACKed, it must not be presented to a Consumer
(the outputs worker queue) anymore.

The new queue type is marked as Beta-feature.

In the spool, the events are stored in a dynamic ring buffer, in one
single file only. The maximum file size must be configured at startup
(default 100MiB). The file layer needs to use available file space
for event data and additional meta data. Events are first written into
a write buffer, which is flushed once it is full or if the flush
timeout is triggered. No limit on event sizes is enforced.
An event being bigger then the write buffer will still be accepted by
the spool, but will trigger a flush right away. A successful flush also
requires 2 fsync operations (1. for data 2. update file header).
The spool blocks all producers, once it can not allocate any more
space within the spool file. Writing never grows the file past
the configured maximum file size. All producers are handled
by the `inBroker`. There is no direct communication between producers
and consumers. All required signaling and synchronisation is provided
by the file layer.

Once the write buffer is flushed, a signal is returned to each
individual Producer, notifying the producers of events being published.
This signal is used by filebeat/winlogbeat to update the registry file.

Consumers are handled by the `outBroker`. Consumers request a batch
of events. The broker reads up to N message from the file and
forwards these. The reading process is 'readonly' and does not update
the on-disk read pointers (only in-memory read pointers are updated).
The file is memory mapped for reading. This will increases the process
it's reported memory usage significantly.

The outputs asynchronously ACK batches. The ACK signals are processed
by the brokers `ackLoop`. Due to load-balancing or retries, ACKs can be
received out of order. The broker guarantees, ACKs are sorted in
the same order events have been read from the queue. Once a continuous
set of events (starting from the last on-disk read pointer) is ACKed,
the on-disk read pointer is update and space occupied by
ACKed events is freed. As free space is tracked by the file, the file
meta-data must be updated. If no more space for file meta data updates
is available, there is a chance of the file potentially growing a few
pages past max-size. Growing is required to guarantee progress
(otherwise the spool might be stalled forever). In case the file did
grow on ACK, the file layer will try to free the space with later
write/ACK operations, potentially truncating the file again.

The file layer is provided by [go-txfile](github.com/elastic/go-txfile).
The file is split into pages of equal size. The layer provides
transactional access to pages only. All writes (ACK, flushing the write
buffer) are handled concurrently, using write transaction.
The reader is isolated from concurrent writes/reads, using a read
transaction. The last transaction state is never overwritten in a file.
If a beat crashes during a write transaction, the most recent committed transaction is still available, so the beat can continue from the
last known state upon restart. No additional repair-phase is required.

Known limitations (To be addressed in future PRs):
- File maximum size can not be changed once file is generated:
  - Add support to grow max size
  - Add support to shrink max size. Shrinking could be made dynamic,
    trying to return space once pages at end of file are freed
  - Add command to transfer the queue into a new queue file. In case
    fragmentation prevents dynamic shrinking or user doesn't
    want to wait.
- Monitoring metrics do not report already available events upon
  beats restart (requires some more changes to the queue
  interface itself).
- No file related metrics available yet.
- If file is too small and write buffer to big,
  queue can become stalled. Potential solution requires (all):
  - Limit maximum event size
  - Generously preallocate meta-area (reserve pages for meta-data
    only on file creation)
  - Ensure usable data area is always > 2*write buffer -> partial write
    buffer flush?
  - Startup check validating combination of max_size, data vs. meta
    area size, event size, write buffer size
- Improve stability by increasing test duration, timeouts and watchdog
  timer
- Add test start/stop messages if run with `-v`. Help with travis timing
  out tests with 10min without any output
- Add all active go-routine stack traces to errors
@ph
Copy link
Contributor

ph commented Apr 5, 2018

One of the metricbeat testsuite failed on Travis related to a MySQL test, it just took too much time to execute and the job was killed by their watchdog. Its the first time I've seen this error, I believe it's a red herring and I've restarted the job.

@ph ph merged commit 8ffb220 into elastic:master Apr 5, 2018
@@ -1223,6 +1287,18 @@
"revision": "8e703b9968693c15f25cabb6ba8be4370cf431d0",
"revisionTime": "2016-08-17T18:24:57Z"
},
{
"checksumSHA1": "H7tCgNt2ajKK4FBJIDNlevu9MAc=",
"path": "github.com/urso/go-bin",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso Could we move this and the dependecy below to the elastic org?

@andrewkroh andrewkroh mentioned this pull request Apr 12, 2018
9 tasks
@urso urso mentioned this pull request Apr 13, 2018
39 tasks
@urso urso deleted the feature/queue-spool branch February 19, 2019 18:54
@urso urso removed the needs_docs label Nov 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants