Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statestore prototype #14079

Merged
merged 6 commits into from
Nov 26, 2019
Merged

Conversation

urso
Copy link

@urso urso commented Oct 15, 2019

Requires: #14144

The change introduces the statestore package that provides some coordinated access to the registry file.

The store does not implement removal yet. I'm thinking to rather have some GC based approach. This would make it easier to clean stale entries of configs that have gone. Well, and I didn't want the PR to grow too much :)

Go doc:

Package statestore provides coordinated access to entries in the registry
via resources.

A Store supports selective update of fields using go structures. Data to be
stored in the registry should be split into resource identifying meta-data
and read state data. This allows inputs to have separate go-routines for
updating resource tracking meta-data (like file path upon file renames) and
for read state updates (like file offset).

The registry is only eventually consistent to the current state of the
store. When using (*Resource).Update, both the in-memory state and the
registry state will be updated immediately. But when using
(*Resource).UpdateOp, only the in memory state will be updated. The registry
state must be updated using the ResourceUpdateOp, after the associated
events have been ACKed by the outputs. Once all pending update operations
have been applied the in-memory state and the persistent state are assumed
to be in-sync, and the in-memory state is dropped so to free some memory.

The eventual consistency allows resources to be Unlocked and Locked by
another go-routine immediately, as the final read state from the former
go-routine is available right away. The lock guarantees exclusive access. In
the meantime older updates might still be applied to the registry file,
while the new go-routine can start creating new update operations
concurrently to be applied to after already pending updates.

VARIABLES

var (
	ErrCodeOpFailed = errors.New("operation failed")
)
var (
	ErrResourceInUse = errors.New("resource in use")
)

TYPES

type Error struct {
	// Has unexported fields.
}
    Error provides a common error type used by the statestore package. It
    reports the failed operation, custom message and the root cause if
    available.

func (e *Error) Code() error
    Code returns a sentinal error that can be used for checking the error type.

func (e *Error) Error() string
    Error builds the complete error string.

func (e *Error) Op() string
    Op returns the name of the operation that failed.

func (e *Error) Unwrap() error
    Unwrap returns the cause if available.

type Resource struct {
	// Has unexported fields.
}
    Resource is used to lock and modify a resource its registry contents in a
    store.

func (r *Resource) Has() (bool, error)
    Has check if resource is already in registry. Has does not require the lock
    to be taken.

func (r *Resource) IsLocked() bool
    IsLocked checks if the resource currently holds the lock to the shared
    registry entry.

func (r *Resource) Lock()
    Lock locks a resource held by the store. It blocks until the lock becomes
    available.

func (r *Resource) Read(to interface{}) error
    read current state of resource. If there are pending operations, this is the
    last in-memory state. If there are no operations, or all pending operations
    have been acked, we read directly from the registry. Read does not require
    the resource to be locked. But if the lock is not taken, you are subject to
    data races as the go-routine holding a lock on the resource can update its
    contents.

func (r *Resource) TryLock() bool
    TryLock attempts to lock the resource. It returns true if the lock has been
    acquired successfully.

func (r *Resource) Unlock()
    Unlock releases a resource.

func (r *Resource) Update(val interface{}) error
    Update the registry state with fields in val. If the resource key is
    unknown, then a new document is inserted into the registry. Update requires
    the resource to be locked.

    It is recommended to use Update only for resource meta-data updates, that
    allow us to track and identify a resource. Read state updates should be
    handled by UpdateOp.

    The update call is thread-safe, as the update operation itself is protected.
    But data races are still possible, if any two go-routines update an
    overlapping set of fields.

func (r *Resource) UpdateOp(val interface{}) (*ResourceUpdateOp, error)
    UpdateOp creates a resource update operation. The in memory state of the
    resource is updated right away, but the persistent registry state is not
    updated yet. Executing the returned operation updates the persistent state
    and invalidates the operation. As long as there are active operations, the
    in-memory state and the persistent state are not in sync yet. If in-memory
    state and active operations are not in sync, then read operations will use
    the in-memory state only.

    It is recommended to use UpdateOp for read state updates only. Resource
    metadata should be updated using Update instead.

type ResourceKey string

type ResourceUpdateOp struct {
	// Has unexported fields.
}
    ResourceUpdateOp defers a state update to be written to the persistent
    store. The operation can be applied to the registry using ApplyWith. Calling
    Close will mark the operation as done.

func (op *ResourceUpdateOp) ApplyWith(withTx func(*registry.Store, func(*registry.Tx) error) error) error
    ApplyWith applies the operation using the withTx helper function. The helper
    function is responsible for creating and maintaining a write transaction for
    the provided store. If possible the helper should keep the transaction open
    if an array of operations are applied.

func (op *ResourceUpdateOp) Close()
    Close marks the operation as done. ApplyWith can not be run anymore
    afterwards. If all pending operations have been closed, the persistent store
    is assumed to be in sync with the in memory state.

type Store struct {
	// Has unexported fields.
}
    Store provides some coordinates access to a registry.Store. All update and
    read operations require users to acquire an resource first. A Resource must
    be locked before it can be modified. This ensures that at most one
    go-routine has access to a resource. Lock/TryLock/Unlack can be used to
    coordinate resource access even between independent components.

func NewStore(log *logp.Logger, store *registry.Store) *Store
    NewStore creates a new state Store that is backed by a persistent store.

func (s *Store) Access(key ResourceKey) *Resource
    Access an unlocked resource. This creates a handle to a resource that may
    not yet exist in the persistent registry.

func (s *Store) Lock(key ResourceKey) *Resource
    Lock locks and returns the resource for a given key.

func (s *Store) TryLock(key ResourceKey) (res *Resource, locked bool)
    TryLock locks and returns the resource for a given key. The locked return
    value is set to false if TryLock failed, but the resource itself is always
    returned.

@urso urso added the in progress Pull request is currently in progress. label Oct 15, 2019
filebeat/input/v2/statestore/statestore.go Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/statestore.go Show resolved Hide resolved
// predefined errors

var (
ErrResourceInUse = errors.New("resource in use")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported var ErrResourceInUse should have comment or be unexported

filebeat/input/v2/statestore/errors.go Outdated Show resolved Hide resolved
Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@urso code and direction looks good to me, we might want to add a few unit test around the code, I have added a few comments, suggestion around the code.

We will also need to address the hound's comments.

entry.value.mux.Lock()
defer entry.value.mux.Unlock()

invariant(entry.value.pending > 0, "there should be pending updates")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should it more often, especially on a beta in progress feature :)

// in case we miss an unlock operation (programmer error or panic that hash
// been caught) we set a finalizer to eventually free the resource.
// The Unlock operation will unsert the finalizer.
runtime.SetFinalizer(res, (*Resource).finalize)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never used finalizer in Golang before, I've used them in other languages. But after reading a bit more about them and in the context of usage we want to protect the storage as much possible of system and programmer errors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Developers are still asked to unlock used resources. The finalizer is just a safety-net.

r.unlink()
}
return r.isLocked
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe a Lock that accept a deadline, but we can do the same logic in the consumer with the tryLock().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TryLock is supposed to return immediately. I don't want to change Lock/Unlock as is, but some LockWithContext would indeed be nice. E.g. we could unblock the call on shutdown.

Idea of TryLock is similar to current 'Finished' flag in filebeat. Assumed code pattern:

res, ok := store.TryLock(path)
if !ok {
  return fmt.Errorf("file %v is collected by another input", path)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sound good, The current API allow for that flexibility so lets keep it simple and add if needed or event make function that wrap that behavior.

//
// The update call is thread-safe, as the update operation itself is protected.
// But data races are still possible, if any two go-routines update
// an overlapping set of fields.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I that case, I think in this case the implementation recommendation would be to fetch entry merge fields and update in the lock?

Copy link
Author

@urso urso Nov 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A resource is just a 'key' for the store. But the key should be associated with some real-world resource like a file. Users wanting to collect a file must lock said file and only release the lock if the file should not be collected anymore.

The lock must be held for as long as the file is owned by the current process.

Like:

res, ok := store.TryLock(path)
if !ok {
  return <error>
}
defer res.Unlock()

// update file meta:
meta := struct{
  Path string
  LastOpen time.Time
  ...
}{ path, time.Now(), ... }

res.Update(&meta)

for { <tail file> }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the file collection use-case a resource might be updated by a file watcher and an active reader concurrently. This is perfectly valid, as the input holds the lock. In order to have a safe-update, the different go-routines must never update the same set of fields. The file watcher should update file metadata only (e.g. file is renamed), while the active reader should only update the offset. If go-routines adhere to updating distinct set of fields, there is no race.

filebeat/input/v2/statestore/store.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/store.go Show resolved Hide resolved
filebeat/input/v2/statestore/store.go Outdated Show resolved Hide resolved
filebeat/input/v2/statestore/store.go Outdated Show resolved Hide resolved
Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong radio button /facepalm

@urso
Copy link
Author

urso commented Nov 16, 2019

we might want to add a few unit test around the code

Unit tests will definitely come. As the interfaces of this layer is still somewhat in the flux I'd like to postpone these to a later PR. I'm planning to first stabilize all interfaces in the different layers + introduce some test support with reusable mocks for the registry itself (so different dependents can be tested more easily).

@urso urso marked this pull request as ready for review November 16, 2019 08:39
@urso urso added [zube]: In Review review and removed in progress Pull request is currently in progress. labels Nov 16, 2019
Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have posted a few comments. Are you going to add the remove functionality of the store in this PR?

@urso
Copy link
Author

urso commented Nov 20, 2019

Are you going to add the remove functionality of the store in this PR?

No.

@urso
Copy link
Author

urso commented Nov 21, 2019

@ph @kvch recent nits have been adress. Also added some missing vendors.

Copy link
Contributor

@kvch kvch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so far so good

urso added 5 commits November 21, 2019 14:45
Connector directly wraps a registry and gives access to many stores.
Each store accessed will create a session. The registry.Store will be
closed once all sessions are closed.

Go routines can create pending update operations that will be applied
asynchronously later to the persistent registry store. At the same time
a go routine can close a store instance if the store is not needed
anymore. So to keep the store alive for as long as is required to
finally sync the in memory and on-disk state, we introduce the concept
of store sessions. A session is active for as long as the Store instance
is alive or there are any pending updates in the system. The underlying
persistent store can be closed only once all sessions for said store are
have been finished.
Copy link
Contributor

@ph ph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I did another review.

@urso urso merged commit f8cbeb2 into elastic:feature-new-registry-file Nov 26, 2019
@urso urso deleted the statestore-prototype branch November 26, 2019 13:49
@urso urso mentioned this pull request Jan 3, 2020
27 tasks
urso pushed a commit that referenced this pull request Jan 16, 2020
* State store prototype

* Add statestore close and deactivate support

* Introduce connector and store sessions

Connector directly wraps a registry and gives access to many stores.
Each store accessed will create a session. The registry.Store will be
closed once all sessions are closed.

Go routines can create pending update operations that will be applied
asynchronously later to the persistent registry store. At the same time
a go routine can close a store instance if the store is not needed
anymore. So to keep the store alive for as long as is required to
finally sync the in memory and on-disk state, we introduce the concept
of store sessions. A session is active for as long as the Store instance
is alive or there are any pending updates in the system. The underlying
persistent store can be closed only once all sessions for said store are
have been finished.

* minor cleanups

* review

* vendor missing dependency
@andresrc andresrc added the Team:Integrations Label for the Integrations team label Mar 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants