-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor plugin management #495
Conversation
One thing I want to do is some benchmarking of running plugins such as backup item actions. It seemed "slow" to me watching the backup in real time, but maybe that's just because I never paid attention before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial comments from everything except pkg/plugin
examples/gcp/00-ark-config.yaml
Outdated
@@ -22,8 +22,8 @@ persistentVolumeProvider: | |||
name: gcp | |||
backupStorageProvider: | |||
name: gcp | |||
bucket: <YOUR_BUCKET> | |||
backupSyncPeriod: 30m | |||
bucket: andy-heptio-ark-data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert changes to this file
func (br *backupService) UploadBackup(bucket, backupName string, metadata, backup, log io.Reader) error { | ||
// Uploading the log file is best-effort; if it fails, we log the error but it doesn't impact the | ||
// backup's status. | ||
func UploadBackupLog(objectStore ObjectStore, bucket, backupName string, log io.Reader) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
godoc
key := getRestoreResultsKey(backup, restore) | ||
return br.objectStore.PutObject(bucket, key, results) | ||
return objectStore.PutObject(bucket, key, results) | ||
} | ||
|
||
// cachedBackupService wraps a real backup service with a cache for getting cloud backups. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this going to stick around?
pkg/cmd/server/server.go
Outdated
if err := s.initBackupService(config); err != nil { | ||
return err | ||
} | ||
// if err := s.initBackupService(config); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete?
pkg/cmd/server/server.go
Outdated
if err != nil { | ||
return err | ||
} | ||
// func (s *server) initBackupService(config *api.Config) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete?
pkg/controller/backup_controller.go
Outdated
@@ -377,6 +384,48 @@ func (controller *backupController) runBackup(backup *api.Backup, bucket string) | |||
return kerrors.NewAggregate(errs) | |||
} | |||
|
|||
func (controller *backupController) UploadBackup(logger logrus.FieldLogger, objectStore cloudprovider.ObjectStore, bucket, backupName string, metadata, backup, log io.Reader) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would make sense to extract this to pkg/backup
so that it's part of the reusable lib?
pkg/restore/restore.go
Outdated
backupService cloudprovider.BackupService | ||
discoveryHelper discovery.Helper | ||
dynamicFactory client.DynamicFactory | ||
// backupService cloudprovider.BackupService |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm line
pkg/restore/restore.go
Outdated
@@ -137,7 +137,7 @@ func prioritizeResources(helper discovery.Helper, priorities []string, includedR | |||
func NewKubernetesRestorer( | |||
discoveryHelper discovery.Helper, | |||
dynamicFactory client.DynamicFactory, | |||
backupService cloudprovider.BackupService, | |||
// backupService cloudprovider.BackupService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once I got the model I found it fairly easy to follow, but it was confusing the first time through. I think some package-level godoc that explains in some detail would be helpful (how plugins and processes relate, how restartIfNeeded
flows, how multiplexing works and what the different structs on the client and server do to implement it). Also added some thoughts on possible alternate names. I'd probably err on the side of longer & clearer names here since there is a lot of overloading of terms like plugin and client which makes it hard to understand initially.
@@ -17,7 +17,6 @@ limitations under the License. | |||
package cloudprovider |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably want to rename this file
@@ -36,90 +32,89 @@ func NewCommand(f client.Factory) *cobra.Command { | |||
logger := arkplugin.NewLogger() | |||
|
|||
c := &cobra.Command{ | |||
Use: "run-plugin [KIND] [NAME]", | |||
Use: "run-plugin", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to pluralize this now (run-plugins
)
pkg/plugin/client_builder.go
Outdated
|
||
return b | ||
} | ||
|
||
func (b *clientBuilder) withCommand(name string, args ...string) *clientBuilder { | ||
b.config.Cmd = exec.Command(name, args...) | ||
// withCommand sets the clientBuilder's commandName to name and commandArgs to args. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this still have a variadic args arg? I see they're being used in client()
to set the Cmd
. If not, edit the comment to remove the ref and probably remove the field from the clientBuilder
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need it as an arg. Plugin authors must expect their plugin executable file to be invoked directly, with no arguments. We do still need it in the struct, as it's used for the special ark run-plugin
case.
pkg/plugin/client_mux.go
Outdated
|
||
// clientMux supports the initialization and retrieval of multiple implementations for a single plugin kind, such as | ||
// "aws" and "azure" implementations of the object store plugin. | ||
type clientMux struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if something like clientFactory
would be clearer here. I think mux
is slightly confusing because this struct is actually just handing out different clients as opposed to actually doing any multiplexing itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per previous - what about clientDispenser
?
pkg/plugin/manager.go
Outdated
arkCommand := os.Args[0] | ||
// getWrapper returns a wrapper for a plugin identified by kind and name, creating a wrapper if it is the first time it | ||
// has been requested. | ||
func (m *manager) getWrapper(kind PluginKind, name string) (*restartablePluginProcess, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying to think of a name that is a bit more descriptive than getWrapper
-- getRestartablePluginProc
, getPluginProcessHandle
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This missed getting renamed when I changed wrapper to restartablePluginProcess. It should be called getRestartablePluginProcess.
pkg/plugin/manager.go
Outdated
} | ||
|
||
// GetObjectStore returns the plugin implementation of the cloudprovider.ObjectStore | ||
// interface with the specified name. | ||
// GetObjectStore returns a resumableObjectStore for name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/resumable/restartable
pkg/plugin/manager.go
Outdated
} | ||
|
||
// GetBlockStore returns the plugin implementation of the cloudprovider.BlockStore | ||
// interface with the specified name. | ||
// GetBlockStore returns a resumableBlockStore for name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/resumable/restartable
} | ||
|
||
// Currently all plugins except for PluginLister dispense clientMux instances. | ||
if mux, ok := dispensed.(*clientMux); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love that the restartablePluginProcess
struct is the one that knows about clientMux
and what to do with it to get an actual client - feels like a completely separate concerrn to restarting plugin processes. I'm not exactly sure how I'd do this differently, but wonder if there's a generic interface that all GRPCClient
funcs could return that enables callers to get an actual client/proxy from the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it the "restartable" part of the name that's tripping you up? Other than the registry that calls Dispense()
to list plugins, this is the only other place in the code where we call Dispense()
. We could call this something other than restartablePluginProcess
.
Or if it's less the name and more that restarting & dispensing seem like 2 different things, maybe we could find something. We need the protocolClient
to be able to dispense, so we could introduce some other struct that solely focuses on dispensing using that. And then have the restartablePluginProcess
hold on to those.
wonder if there's a generic interface that all GRPCClient funcs could return that enables callers to get an actual client/proxy from the result
Different from the type assertion to *clientMux
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some local changes where I extracted the dispensing into a pluginDispenser
struct. It's a small change but it separates that logic, at least.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed up the changes. I quickly replaced pluginDispenser
with process
, responsible for managing a single execution of a plugin process.
pkg/plugin/server_mux.go
Outdated
type ServerInitializer func() (interface{}, error) | ||
|
||
// serverMux manages multiple implementations of a single plugin kind, such as pod and pvc BackupItemActions. | ||
type serverMux struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about referring to server-side implementations of pluggable interfaces as handlers
, analagous to net/http
nomenclature (e.g. the aws blockstore handler)? Trying not to overload plugin
too much, and impl/implementation is kind of generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
pkg/plugin/backup_item_action.go
Outdated
} | ||
|
||
// GRPCClient returns a BackupItemAction gRPC client. | ||
// GRPCClient returns a clientMux for BackupItemAction gRPC clients. | ||
func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The naming that we're locked into here (GRPCClient(...)
) due to go-plugin is unfortunate, since what we're returning is not actually a gRPC client but something that makes clients. Might be helpful to expand the godoc to explain that this is called by go-plugin, and can return any type but it should be a client or something that can be used to create a client
The updates LGTM, I like the additional types and revised names! |
a071d82
to
f5d5cec
Compare
@@ -24,7 +24,7 @@ PKG := github.com/heptio/ark | |||
REGISTRY ?= gcr.io/heptio-images | |||
|
|||
# Which architecture to build - see $(ALL_ARCH) for options. | |||
ARCH ?= linux-amd64 | |||
ARCH ?= $(shell go env GOOS)-$(shell go env GOARCH) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we do this only for make local
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realized that this will hurt make container
if you're running on macOS. I'll see what I can do. We don't have to include the Makefile changes in this PR if it's too much of a pain.
// BackupGetter knows how to list backups in object storage. | ||
type BackupGetter interface { | ||
// GetAllBackups lists all the api.Backups in object storage for the given bucket. | ||
GetAllBackups(bucket string) ([]*api.Backup, error) | ||
} | ||
|
||
type XXXBackupGetter interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this should just be BackupGetter
and the current BackupGetter
should be BackupLister
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You read my mind :-) haven't done the rename yet.
@@ -203,10 +189,27 @@ func (br *backupService) GetAllBackups(bucket string) ([]*api.Backup, error) { | |||
return output, nil | |||
} | |||
|
|||
func (br *backupService) GetBackup(bucket, backupName string) (*api.Backup, error) { | |||
type liveXXXBackupGetter struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obv needs a rename - if we rename the interfaces per my prev comment this could just be liveBackupGetter
clock clock.Clock | ||
} | ||
|
||
type backupStorageDeleter interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the need to do this kind of thing is an annoying downside to having package-level funcs vs. methods on a struct :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll see if I can find something simpler
@@ -211,7 +278,9 @@ func (controller *restoreController) processRestore(key string) error { | |||
logContext.Debug("Running processRestore") | |||
ns, name, err := cache.SplitMetaNamespaceKey(key) | |||
if err != nil { | |||
return errors.Wrap(err, "error splitting queue key") | |||
logContext.WithError(err).Error("unable to process restore: error splitting queue key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we have this returning an error in most/all of our existing controllers, we probably want to add an issue to go change that across the board
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
pkg/plugin/backup_item_action.go
Outdated
func (p *BackupItemActionPlugin) GRPCClient(c *grpc.ClientConn) (interface{}, error) { | ||
return &BackupItemActionGRPCClient{grpcClient: proto.NewBackupItemActionClient(c), log: p.log}, nil | ||
return newClientMux(c, newBackupItemActionGRPCClient), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what did you think about renaming clientMux
-> clientDispenser
(and this func as well)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we talked about that, but I overlooked it. I'll make the change.
pkg/plugin/client_mux.go
Outdated
|
||
// clientMux supports the initialization and retrieval of multiple implementations for a single plugin kind, such as | ||
// "aws" and "azure" implementations of the object store plugin. | ||
type clientMux struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per previous - what about clientDispenser
?
pkg/plugin/server_mux.go
Outdated
type serverMux struct { | ||
kind PluginKind | ||
initializers map[string]ServerInitializer | ||
instances map[string]interface{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about using handler
in place of instance
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
Refactor plugin management: - support multiple plugins per executable - support restarting a plugin process in the event it terminates - simplify plugin lifecycle management by using separate managers for each scope (server vs backup vs restore) Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
pkg/plugin/server_mux.go
Outdated
@@ -42,10 +42,10 @@ func (m *serverMux) setServerLog(log logrus.FieldLogger) { | |||
m.serverLog = log | |||
} | |||
|
|||
// getInstance returns the instance for a plugin with the given name. If an instance has already been initialized, | |||
// getHandler returns the instance for a plugin with the given name. If an instance has already been initialized, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit s/instance/handler
pkg/plugin/restartable_process.go
Outdated
@@ -80,8 +81,13 @@ func (p *restartableProcess) reset() error { | |||
// | |||
// Callers of resetLH *must* acquire the lock before calling it. | |||
func (p *restartableProcess) resetLH() error { | |||
if p.resetFailures > 10 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are you thinking about including any sort of backoff for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't fully fleshed this out yet, but we can/should discuss. Ideally we can handle the following scenarios:
- process dies for whatever reason (e.g. oom kill), and restarting succeeds the first try
- process dies for whatever reason, attempt(s) to restart fail early and quickly -> we surface the issue somehow (ark server shuts itself down, or we create events, or ?)
I would like to try to avoid the situation where the process dies, and it takes minutes or more to finish all the attempts to restart it.
haven't gone through the tests themselves yet, but went through the rest of the rename & refactor stuff and LGTM. |
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Signed-off-by: Andy Goldstein <andy.goldstein@gmail.com>
Replaced by #710 |
Refactor plugin management:
each scope (server vs backup vs restore)
To-dos:
Fixes #481
Things to verify:
Signed-off-by: Andy Goldstein andy.goldstein@gmail.com