Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Grpc plugin archive storage support #2317

Merged
merged 28 commits into from
Sep 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3317808
Grpc plugin archive storage support
m8rge Jun 29, 2020
b44e907
Introduce service PluginCapabilities
m8rge Jul 20, 2020
dd2e2ed
Add comments
m8rge Jul 20, 2020
b78bb2f
Introduce PluginServices
m8rge Jul 21, 2020
ba50ecd
Format imports, insert copyright
m8rge Jul 21, 2020
c2a286a
Bubble up error on Capabilities() call
m8rge Jul 21, 2020
d60bb79
Add empty_test.go
m8rge Jul 21, 2020
7017e62
Remove ArchiveReader, ArchiveWriter
m8rge Aug 7, 2020
aa7bdcf
Pass config.PluginServices to grpc.Serve
m8rge Aug 7, 2020
037e8e4
Introduce shared.ArchiveReader/ArchiveWriter
m8rge Aug 10, 2020
347f0e7
Improve ArchiveReader/ArchiveWriter according PR comments
m8rge Aug 31, 2020
8295249
Test fixes
m8rge Aug 31, 2020
100657e
Validate plugin type
m8rge Sep 4, 2020
a35182d
Add context to WriteSpan method
m8rge Sep 4, 2020
dff9797
Return plugin capabilities according ArchiveImpl property
m8rge Sep 10, 2020
08e9862
Apply changes from master
m8rge Sep 10, 2020
3f19fac
Extract PluginServices to shared package, introduce ClientPluginServi…
m8rge Sep 10, 2020
f93bd8f
Improve error text
m8rge Sep 11, 2020
9ceb37f
Rebase-related updates
m8rge Sep 14, 2020
16044f0
Rename memoryStore to memoryStorePlugin
m8rge Sep 14, 2020
5162979
minor clean-up
yurishkuro Sep 15, 2020
37779f7
Return codes.NotFound in grpc plugin on missing trace
m8rge Sep 21, 2020
b86cd5d
make fmt
yurishkuro Sep 21, 2020
b418f35
Handle spanstore.ErrTraceNotFound in grpcServer.GetArchiveTrace
m8rge Sep 22, 2020
239639a
Undo unnecessary change
Sep 23, 2020
a3111df
Increase factory code coverage
m8rge Sep 23, 2020
c7896e9
Trigger codecov again
m8rge Sep 25, 2020
5d81ec4
Disable coverage in plugin/storage/grpc/config/
yurishkuro Sep 25, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -45,21 +46,37 @@ func main() {
opts := memory.Options{}
opts.InitFromViper(v)

grpc.Serve(&memoryStore{store: memory.NewStore()})
plugin := &memoryStorePlugin{
store: memory.NewStore(),
archiveStore: memory.NewStore(),
}
grpc.Serve(&shared.PluginServices{
Store: plugin,
ArchiveStore: plugin,
})
}

type memoryStore struct {
store *memory.Store
type memoryStorePlugin struct {
store *memory.Store
archiveStore *memory.Store
}

func (ns *memoryStore) DependencyReader() dependencystore.Reader {
func (ns *memoryStorePlugin) DependencyReader() dependencystore.Reader {
return ns.store
}

func (ns *memoryStore) SpanReader() spanstore.Reader {
func (ns *memoryStorePlugin) SpanReader() spanstore.Reader {
return ns.store
}

func (ns *memoryStore) SpanWriter() spanstore.Writer {
func (ns *memoryStorePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *memoryStorePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}

func (ns *memoryStorePlugin) ArchiveSpanWriter() spanstore.Writer {
return ns.archiveStore
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/config/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires gRPC plugin binary
38 changes: 31 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,19 @@ type Configuration struct {
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
type PluginBuilder interface {
Build() (shared.StoragePlugin, error)
Build() (*ClientPluginServices, error)
}

// Build instantiates a StoragePlugin
func (c *Configuration) Build() (shared.StoragePlugin, error) {
// Build instantiates a PluginServices
func (c *Configuration) Build() (*ClientPluginServices, error) {
// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)

Expand All @@ -60,18 +66,36 @@ func (c *Configuration) Build() (shared.StoragePlugin, error) {

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err)
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err)
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
}

// in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier)
return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}

return storagePlugin, nil
return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
},
Capabilities: capabilities,
}, nil
}
15 changes: 0 additions & 15 deletions plugin/storage/grpc/config/empty_test.go

This file was deleted.

41 changes: 38 additions & 3 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -36,7 +37,9 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -65,12 +68,14 @@ func (f *Factory) InitFromOptions(opts Options) {
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger

store, err := f.builder.Build()
services, err := f.builder.Build()
if err != nil {
return fmt.Errorf("grpc-plugin builder failed to create a store: %w", err)
}

f.store = store
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -89,3 +94,33 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store.DependencyReader(), nil
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanReader {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanReader(), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if f.capabilities == nil {
return nil, storage.ErrArchiveStorageNotSupported
}
capabilities, err := f.capabilities.Capabilities()
if err != nil {
return nil, err
}
if capabilities == nil || !capabilities.ArchiveSpanWriter {
return nil, storage.ErrArchiveStorageNotSupported
}
return f.archiveStore.ArchiveSpanWriter(), nil
}
149 changes: 145 additions & 4 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
grpcConfig "github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/mocks"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand All @@ -41,19 +42,45 @@ type mockPluginBuilder struct {
err error
}

func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) {
func (b *mockPluginBuilder) Build() (*grpcConfig.ClientPluginServices, error) {
if b.err != nil {
return nil, b.err
}
return b.plugin, nil

services := &grpcConfig.ClientPluginServices{
PluginServices: shared.PluginServices{
Store: b.plugin,
ArchiveStore: b.plugin,
},
}
if b.plugin.capabilities != nil {
services.Capabilities = b.plugin
}

return services, nil
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
return mp.capabilities.Capabilities()
}

func (mp *mockPlugin) ArchiveSpanReader() spanstore.Reader {
return mp.archiveReader
}

func (mp *mockPlugin) ArchiveSpanWriter() spanstore.Writer {
return mp.archiveWriter
}

func (mp *mockPlugin) SpanReader() spanstore.Reader {
return mp.spanReader
}
Expand Down Expand Up @@ -84,6 +111,9 @@ func TestGRPCStorageFactory(t *testing.T) {
plugin: &mockPlugin{
spanWriter: new(spanStoreMocks.Writer),
spanReader: new(spanStoreMocks.Reader),
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: new(mocks.PluginCapabilities),
dependencyReader: new(dependencyStoreMocks.Reader),
},
}
Expand All @@ -101,14 +131,125 @@ func TestGRPCStorageFactory(t *testing.T) {
assert.Equal(t, f.store.DependencyReader(), depReader)
}

func TestGRPCStorageFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.NoError(t, err)
assert.NotNil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
}

func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

assert.NotNil(t, f.store)
reader, err := f.CreateArchiveSpanReader()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, reader)
writer, err := f.CreateArchiveSpanWriter()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, writer)
}

func TestWithConfiguration(t *testing.T) {
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{
err := command.ParseFlags([]string{
"--grpc-storage-plugin.log-level=debug",
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
})
assert.NoError(t, err)
f.InitFromViper(v)
assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin")
assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json")
Expand Down
Loading