diff --git a/pkg/plugin/backup_item_action.go b/pkg/plugin/backup_item_action.go index fc5a16f76b..7168b6deea 100644 --- a/pkg/plugin/backup_item_action.go +++ b/pkg/plugin/backup_item_action.go @@ -90,7 +90,8 @@ type backupItemClientMux struct { clients map[string]*BackupItemActionGRPCClient } -func (m *backupItemClientMux) GetByName(name string) interface{} { +// clientFor returns a BackupItemActionGRPCClient for the BackupItemAction with the given name. +func (m *backupItemClientMux) clientFor(name string) interface{} { if client, found := m.clients[name]; found { return client } diff --git a/pkg/plugin/wrapper.go b/pkg/plugin/wrapper.go index 8f459adbfe..d8b8ed2c2d 100644 --- a/pkg/plugin/wrapper.go +++ b/pkg/plugin/wrapper.go @@ -24,10 +24,13 @@ type wrapper struct { reinitializers map[kindAndName]reinitializer } +// reinitializer is capable of reinitializing a resumable plugin instance using the newly dispensed plugin. type reinitializer interface { + // reinitialize reinitializes a resumable plugin instance using the newly dispensed plugin. reinitialize(dispensed interface{}) error } +// newWrapper creates a new wrapper for the given command and options. func newWrapper(command string, options ...wrapperOption) (*wrapper, error) { var args []string if command == os.Args[0] { @@ -46,19 +49,23 @@ func newWrapper(command string, options ...wrapperOption) (*wrapper, error) { f(w) } + // This launches the process err := w.reset() return w, err } +// wrapperOption is a functional option for configuring wrapper. type wrapperOption func(w *wrapper) +// withLog is a wrapperOption that configures the supplied logger and log level in the wrapper's client builder. func withLog(logger logrus.FieldLogger, level logrus.Level) wrapperOption { return func(w *wrapper) { w.builder.withLogger(&logrusAdapter{impl: logger, level: level}) } } +// addReinitializer registers the reinitializer r for key. func (w *wrapper) addReinitializer(key kindAndName, r reinitializer) { w.lock.Lock() defer w.lock.Unlock() @@ -66,6 +73,7 @@ func (w *wrapper) addReinitializer(key kindAndName, r reinitializer) { w.reinitializers[key] = r } +// reset acquires the lock and calls resetLH. func (w *wrapper) reset() error { w.lock.Lock() defer w.lock.Unlock() @@ -73,35 +81,45 @@ func (w *wrapper) reset() error { return w.resetLH() } +// resetLH (re)launches the plugin process. It redispenses all previously dispensed plugins and reinitializes all the +// registered reinitializers using the newly dispensed plugins. +// +// Callers of resetLH *must* acquire the lock before calling it. func (w *wrapper) resetLH() error { + // This creates a new go-plugin Client that has its own unique exec.Cmd for launching the plugin process. w.client = w.builder.client() + // This launches the plugin process. protocolClient, err := w.client.Client() if err != nil { return err } w.protocolClient = protocolClient + // Redispense any previously dispensed plugins, reinitializing if necessary. newPlugins := make(map[kindAndName]interface{}) for key := range w.plugins { - // re-dispense + // Re-dispense dispensed, err := w.dispenseLH(key) if err != nil { return err } newPlugins[key] = dispensed + // Reinitialize if r, found := w.reinitializers[key]; found { if err := r.reinitialize(dispensed); err != nil { return err } } } + // Make sure we update the wrapper's plugins! w.plugins = newPlugins return nil } +// resetIfNeeded checks if the plugin process has exited and resets the wrapper if it has. func (w *wrapper) resetIfNeeded() (bool, error) { w.lock.Lock() defer w.lock.Unlock() @@ -113,6 +131,7 @@ func (w *wrapper) resetIfNeeded() (bool, error) { return false, nil } +// getByKindAndName acquires the lock and calls getByKindAndNameLH. func (w *wrapper) getByKindAndName(key kindAndName) (interface{}, error) { w.lock.Lock() defer w.lock.Unlock() @@ -120,6 +139,8 @@ func (w *wrapper) getByKindAndName(key kindAndName) (interface{}, error) { return w.getByKindAndNameLH(key) } +// getByKindAndNameLH returns the dispensed plugin for key, dispensing a new plugin if it hasn't previously been +// dispensed. func (w *wrapper) getByKindAndNameLH(key kindAndName) (interface{}, error) { dispensed, found := w.plugins[key] if found { @@ -129,21 +150,33 @@ func (w *wrapper) getByKindAndNameLH(key kindAndName) (interface{}, error) { return w.dispenseLH(key) } +// dispenseLH dispenses a plugin for key. If the dispensed plugin is a clientMux, dispenseLH retrieves the plugin +// instance for key.name. func (w *wrapper) dispenseLH(key kindAndName) (interface{}, error) { dispensed, err := w.protocolClient.Dispense(key.kind.String()) if err != nil { return nil, errors.WithStack(err) } - if getByNamer, ok := dispensed.(GetByNamer); ok { - dispensed = getByNamer.GetByName(key.name) + if mux, ok := dispensed.(clientMux); ok { + dispensed = mux.clientFor(key.name) } + // Make sure we record what we dispensed! w.plugins[key] = dispensed return dispensed, nil } +// clientMux allows a dispensed plugin to support multiple implementations, such as AWS and GCP object stores. +type clientMux interface { + // clientFor returns a gRPC client for the plugin named name. Note, the return type is interface{} because there + // isn't an interface for a gRPC client itself; the returned object must implement one of our plugin interfaces, + // such as ObjectStore. + clientFor(name string) interface{} +} + +// stop terminates the plugin process. func (w *wrapper) stop() { w.lock.Lock() w.client.Kill()