diff --git a/go.mod b/go.mod index a677d1a34..1193c28d9 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/scylladb/go-reflectx v1.0.1 github.com/shopspring/decimal v1.4.0 github.com/smartcontractkit/chain-selectors v1.0.67 - github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251020164035-ab562b473fe2 + github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251028143833-a5d28bc58c36 github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251021010742-3f8d3dba17d8 github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b diff --git a/go.sum b/go.sum index 4f3a56c66..e87541c76 100644 --- a/go.sum +++ b/go.sum @@ -326,8 +326,8 @@ github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMB github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartcontractkit/chain-selectors v1.0.67 h1:gxTqP/JC40KDe3DE1SIsIKSTKTZEPyEU1YufO1admnw= github.com/smartcontractkit/chain-selectors v1.0.67/go.mod h1:xsKM0aN3YGcQKTPRPDDtPx2l4mlTN1Djmg0VVXV40b8= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251020164035-ab562b473fe2 h1:p79eZtyBbZYumftwZGCkyKSNDvUralW7lqcTD99Ovmw= -github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251020164035-ab562b473fe2/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251028143833-a5d28bc58c36 h1:4nO2wE9ozApuVwwL7Pbebc/pfNPcVYEvN0sD+YVfacA= +github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.9-0.20251028143833-a5d28bc58c36/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4 h1:GCzrxDWn3b7jFfEA+WiYRi8CKoegsayiDoJBCjYkneE= github.com/smartcontractkit/chainlink-protos/billing/go v0.0.0-20251024234028-0988426d98f4/go.mod h1:HHGeDUpAsPa0pmOx7wrByCitjQ0mbUxf0R9v+g67uCA= github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251021010742-3f8d3dba17d8 h1:hPeEwcvRVtwhyNXH45qbzqmscqlbygu94cROwbjyzNQ= diff --git a/pkg/beholder/chip_client.go b/pkg/beholder/chip_client.go deleted file mode 100644 index 2a26f4422..000000000 --- a/pkg/beholder/chip_client.go +++ /dev/null @@ -1,49 +0,0 @@ -package beholder - -import ( - "context" - "fmt" - "io" - - "github.com/smartcontractkit/chainlink-common/pkg/chipingress" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" -) - -type ChipIngressClient interface { - RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) - io.Closer -} - -type chipIngressClient struct { - client chipingress.Client -} - -func NewChipIngressClient(client chipingress.Client) (ChipIngressClient, error) { - if client == nil { - return nil, fmt.Errorf("chip ingress client is nil") - } - - return &chipIngressClient{ - client: client, - }, nil -} -func (sr *chipIngressClient) Close() error { return nil } - -// RegisterSchema registers one or more schemas with the Chip Ingress service. Returns a map of subject to version for each registered schema. -func (sr *chipIngressClient) RegisterSchema(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) { - request := &pb.RegisterSchemaRequest{ - Schemas: schemas, - } - - resp, err := sr.client.RegisterSchema(ctx, request) - if err != nil { - return nil, fmt.Errorf("failed to register schema: %w", err) - } - - registeredMap := make(map[string]int) - for _, schema := range resp.Registered { - registeredMap[schema.Subject] = int(schema.Version) - } - - return registeredMap, err -} diff --git a/pkg/beholder/chip_client_test.go b/pkg/beholder/chip_client_test.go deleted file mode 100644 index 52eb307c0..000000000 --- a/pkg/beholder/chip_client_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package beholder_test - -import ( - "fmt" - "github.com/smartcontractkit/chainlink-common/pkg/beholder" - - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" - "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "testing" -) - -func TestNewChipClient(t *testing.T) { - t.Run("returns error when client is nil", func(t *testing.T) { - registry, err := beholder.NewChipIngressClient(nil) - assert.Nil(t, registry) - assert.EqualError(t, err, "chip ingress client is nil") - }) - - t.Run("returns schema registry when client is valid", func(t *testing.T) { - mockClient := mocks.NewClient(t) - registry, err := beholder.NewChipIngressClient(mockClient) - require.NoError(t, err) - assert.NotNil(t, registry) - }) -} - -func TestRegisterSchema(t *testing.T) { - t.Run("successfully registers schemas", func(t *testing.T) { - mockClient := mocks.NewClient(t) - mockClient. - On("RegisterSchema", mock.Anything, mock.Anything). - Return(&pb.RegisterSchemaResponse{ - Registered: []*pb.RegisteredSchema{ - {Subject: "schema1", Version: 1}, - {Subject: "schema2", Version: 2}, - }, - }, nil) - - registry, err := beholder.NewChipIngressClient(mockClient) - require.NoError(t, err) - - schemas := []*pb.Schema{ - {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, - {Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2}, - } - - result, err := registry.RegisterSchema(t.Context(), schemas...) - require.NoError(t, err) - assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result) - }) - - t.Run("returns error when registration fails", func(t *testing.T) { - mockClient := mocks.NewClient(t) - mockClient. - On("RegisterSchema", mock.Anything, mock.Anything). - Return(nil, fmt.Errorf("registration failed")) - - registry, err := beholder.NewChipIngressClient(mockClient) - require.NoError(t, err) - - schemas := []*pb.Schema{ - {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, - } - - result, err := registry.RegisterSchema(t.Context(), schemas...) - assert.Nil(t, result) - assert.EqualError(t, err, "failed to register schema: registration failed") - }) -} diff --git a/pkg/beholder/client.go b/pkg/beholder/client.go index 253dc23b8..315b8358b 100644 --- a/pkg/beholder/client.go +++ b/pkg/beholder/client.go @@ -45,7 +45,7 @@ type Client struct { // Message Emitter Emitter Emitter // Chip - Chip ChipIngressClient + Chip chipingress.Client // Providers LoggerProvider otellog.LoggerProvider @@ -185,8 +185,8 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro // This will eventually be removed in favor of chip-ingress emitter // and logs will be sent via OTLP using the regular Logger instead of calling Emit emitter := NewMessageEmitter(messageLogger) - var chipIngressClient chipingress.Client + var chipIngressClient chipingress.Client = &chipingress.NoopClient{} // if chip ingress is enabled, create dual source emitter that sends to both otel collector and chip ingress // eventually we will remove the dual source emitter and just use chip ingress if cfg.ChipIngressEmitterEnabled || cfg.ChipIngressEmitterGRPCEndpoint != "" { @@ -232,22 +232,13 @@ func NewGRPCClient(cfg Config, otlploggrpcNew otlploggrpcFactory) (*Client, erro } } - // Create interface/wrapper to chip-ingress for schema registry - var chip ChipIngressClient - if chipIngressClient != nil { - chip, err = NewChipIngressClient(chipIngressClient) - if err != nil { - return nil, fmt.Errorf("failed to create interface to chip ingress: %w", err) - } - } - onClose := func() (err error) { for _, provider := range []shutdowner{messageLoggerProvider, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider} { err = errors.Join(err, provider.Shutdown(context.Background())) } return } - return &Client{cfg, logger, tracer, meter, emitter, chip, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil + return &Client{cfg, logger, tracer, meter, emitter, chipIngressClient, loggerProvider, tracerProvider, meterProvider, messageLoggerProvider, signer, onClose}, nil } // Closes all providers, flushes all data and stops all background processes diff --git a/pkg/beholder/client_test.go b/pkg/beholder/client_test.go index 71e32e6c7..e64780db3 100644 --- a/pkg/beholder/client_test.go +++ b/pkg/beholder/client_test.go @@ -21,6 +21,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/beholder" "github.com/smartcontractkit/chainlink-common/pkg/beholder/internal/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + chipmocks "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" ) type MockExporter struct { @@ -491,6 +494,13 @@ func TestNewClient_Chip(t *testing.T) { require.NoError(t, err) assert.NotNil(t, client) assert.NotNil(t, client.Chip) + + // Verify it implements the Client interface + var _ chipingress.Client = client.Chip + + // Verify the emitter is configured as dual source + assert.NotNil(t, client.Emitter) + assert.IsType(t, &beholder.DualSourceEmitter{}, client.Emitter) }) t.Run("chip interface can be enabled when chip ingress dual emitter is not enabled ", func(t *testing.T) { @@ -503,6 +513,11 @@ func TestNewClient_Chip(t *testing.T) { require.NoError(t, err) assert.NotNil(t, client) assert.NotNil(t, client.Chip) + + // Verify it implements the Client interface + var _ chipingress.Client = client.Chip + + // Verify emitter is not dual source when dual emitter is disabled assert.NotNil(t, client.Emitter) }) @@ -739,3 +754,191 @@ func TestNewGRPCClientChipIngressAuth(t *testing.T) { assert.True(t, ok, "Expected Emitter to be a DualSourceEmitter") }) } + +func TestChipIngressClient(t *testing.T) { + ctx := context.Background() + + t.Run("mock client implements interface", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + var _ chipingress.Client = mockClient + }) + + t.Run("noop client implements interface", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + var _ chipingress.Client = noopClient + }) + + t.Run("noop client Close returns no error", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + err := noopClient.Close() + assert.NoError(t, err) + }) + + t.Run("noop client Ping returns success", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + resp, err := noopClient.Ping(ctx, &pb.EmptyRequest{}) + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, "pong", resp.Message) + }) + + t.Run("noop client Publish returns empty response", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + event, err := chipingress.NewEvent("test-domain", "test.type", []byte("test"), nil) + require.NoError(t, err) + + eventPb, err := chipingress.EventToProto(event) + require.NoError(t, err) + + resp, err := noopClient.Publish(ctx, eventPb) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + + t.Run("noop client PublishBatch returns empty response", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + events := []chipingress.CloudEvent{} + for i := 0; i < 3; i++ { + event, err := chipingress.NewEvent(fmt.Sprintf("domain-%d", i), "test.type", []byte("test"), nil) + require.NoError(t, err) + events = append(events, event) + } + + batch, err := chipingress.EventsToBatch(events) + require.NoError(t, err) + + resp, err := noopClient.PublishBatch(ctx, batch) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + + t.Run("noop client RegisterSchema returns empty response", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + req := &pb.RegisterSchemaRequest{ + Schemas: []*pb.Schema{ + {Subject: "test-subject", Schema: `{"type":"record"}`, Format: 1}, + }, + } + + resp, err := noopClient.RegisterSchema(ctx, req) + assert.NoError(t, err) + assert.NotNil(t, resp) + }) + + t.Run("noop client RegisterSchemas returns empty map", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + schemas := []*pb.Schema{ + {Subject: "test-subject-1", Schema: `{"type":"record"}`, Format: 1}, + {Subject: "test-subject-2", Schema: `{"type":"record"}`, Format: 2}, + } + + result, err := noopClient.RegisterSchemas(ctx, schemas...) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Empty(t, result) + }) + + t.Run("noop client StreamEvents returns nil", func(t *testing.T) { + noopClient := &chipingress.NoopClient{} + stream, err := noopClient.StreamEvents(ctx) + assert.NoError(t, err) + assert.Nil(t, stream) + }) + + t.Run("mock client Ping with expectations", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + expectedResp := &pb.PingResponse{Message: "pong"} + mockClient.EXPECT().Ping(ctx, &pb.EmptyRequest{}).Return(expectedResp, nil) + + resp, err := mockClient.Ping(ctx, &pb.EmptyRequest{}) + assert.NoError(t, err) + assert.Equal(t, expectedResp, resp) + assert.Equal(t, "pong", resp.Message) + }) + + t.Run("mock client Publish with expectations", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + event, err := chipingress.NewEvent("test-domain", "test.type", []byte("test"), nil) + require.NoError(t, err) + + eventPb, err := chipingress.EventToProto(event) + require.NoError(t, err) + + expectedResp := &pb.PublishResponse{ + Results: []*pb.PublishResult{{EventId: "123"}}, + } + + mockClient.EXPECT().Publish(ctx, eventPb).Return(expectedResp, nil) + + resp, err := mockClient.Publish(ctx, eventPb) + assert.NoError(t, err) + assert.Equal(t, expectedResp, resp) + }) + + t.Run("mock client PublishBatch with expectations", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + events := []chipingress.CloudEvent{} + for i := 0; i < 2; i++ { + event, err := chipingress.NewEvent(fmt.Sprintf("domain-%d", i), "test.type", []byte("test"), nil) + require.NoError(t, err) + events = append(events, event) + } + + batch, err := chipingress.EventsToBatch(events) + require.NoError(t, err) + + expectedResp := &pb.PublishResponse{ + Results: []*pb.PublishResult{ + {EventId: "event-1"}, + {EventId: "event-2"}, + }, + } + + mockClient.EXPECT().PublishBatch(ctx, batch).Return(expectedResp, nil) + + resp, err := mockClient.PublishBatch(ctx, batch) + assert.NoError(t, err) + assert.Equal(t, expectedResp, resp) + }) + + t.Run("mock client RegisterSchemas with expectations", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + schemas := []*pb.Schema{ + {Subject: "subject-1", Schema: `{"type":"record"}`, Format: 1}, + {Subject: "subject-2", Schema: `{"type":"record"}`, Format: 2}, + } + + expectedMap := map[string]int{"subject-1": 1, "subject-2": 2} + + mockClient.EXPECT().RegisterSchemas(ctx, schemas[0], schemas[1]).Return(expectedMap, nil) + + result, err := mockClient.RegisterSchemas(ctx, schemas...) + assert.NoError(t, err) + assert.Equal(t, expectedMap, result) + }) + + t.Run("mock client handles errors", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + expectedErr := errors.New("connection error") + mockClient.EXPECT().Ping(ctx, &pb.EmptyRequest{}).Return(nil, expectedErr) + + resp, err := mockClient.Ping(ctx, &pb.EmptyRequest{}) + assert.Error(t, err) + assert.Nil(t, resp) + assert.Equal(t, expectedErr, err) + }) + + t.Run("mock client Close with expectations", func(t *testing.T) { + mockClient := chipmocks.NewClient(t) + + mockClient.EXPECT().Close().Return(nil) + + err := mockClient.Close() + assert.NoError(t, err) + }) +} diff --git a/pkg/beholder/noop.go b/pkg/beholder/noop.go index 21c07847c..67580ec15 100644 --- a/pkg/beholder/noop.go +++ b/pkg/beholder/noop.go @@ -7,6 +7,7 @@ import ( "os" "time" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" "go.opentelemetry.io/otel/exporters/stdout/stdoutlog" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" @@ -35,7 +36,10 @@ func NewNoopClient() *Client { // MessageEmitter messageEmitter := noopMessageEmitter{} - return &Client{cfg, logger, tracer, meter, messageEmitter, nil, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} + // ChipIngress + chipClient := &chipingress.NoopClient{} + + return &Client{cfg, logger, tracer, meter, messageEmitter, chipClient, loggerProvider, tracerProvider, meterProvider, loggerProvider, nil, noopOnClose} } // NewStdoutClient creates a new Client with exporters which send telemetry data to standard output @@ -94,7 +98,7 @@ func NewWriterClient(w io.Writer) (*Client, error) { return } - return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: nil, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil + return &Client{Config: cfg.Config, Logger: logger, Tracer: tracer, Meter: meter, Emitter: emitter, Chip: &chipingress.NoopClient{}, LoggerProvider: loggerProvider, TracerProvider: tracerProvider, MeterProvider: meterProvider, MessageLoggerProvider: loggerProvider, lazySigner: nil, OnClose: onClose}, nil } type noopMessageEmitter struct{} diff --git a/pkg/beholder/noop_test.go b/pkg/beholder/noop_test.go index b9a43a12f..17c9cb04e 100644 --- a/pkg/beholder/noop_test.go +++ b/pkg/beholder/noop_test.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/otel/trace" "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" ) func TestNoopClient(t *testing.T) { @@ -51,6 +53,39 @@ func TestNoopClient(t *testing.T) { )) rootSpan.End() + // Chip - verify noop chip client is initialized and functional + assert.NotNil(t, noopClient.Chip) + var _ chipingress.Client = noopClient.Chip + assert.IsType(t, &chipingress.NoopClient{}, noopClient.Chip) + + // Test Chip methods return no errors + pingResp, err := noopClient.Chip.Ping(t.Context(), &pb.EmptyRequest{}) + assert.NoError(t, err) + assert.NotNil(t, pingResp) + assert.Equal(t, "pong", pingResp.Message) + + // Test Publish returns no error + event, err := chipingress.NewEvent("test-domain", "test.type", []byte("test"), nil) + require.NoError(t, err) + eventPb, err := chipingress.EventToProto(event) + require.NoError(t, err) + publishResp, err := noopClient.Chip.Publish(t.Context(), eventPb) + assert.NoError(t, err) + assert.NotNil(t, publishResp) + + // Test RegisterSchemas returns empty map + schemas := []*pb.Schema{ + {Subject: "test-subject", Schema: `{"type":"record"}`, Format: 1}, + } + schemaResult, err := noopClient.Chip.RegisterSchemas(t.Context(), schemas...) + assert.NoError(t, err) + assert.NotNil(t, schemaResult) + assert.Empty(t, schemaResult) + + // Test Chip Close returns no error + err = noopClient.Chip.Close() + assert.NoError(t, err) + err = noopClient.Close() assert.NoError(t, err) } diff --git a/pkg/chipingress/client.go b/pkg/chipingress/client.go index c4044c071..f3acafb3a 100644 --- a/pkg/chipingress/client.go +++ b/pkg/chipingress/client.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/metadata" ceformat "github.com/cloudevents/sdk-go/binding/format/protobuf/v2" + cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb" ce "github.com/cloudevents/sdk-go/v2" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" @@ -27,6 +28,7 @@ import ( type Client interface { pb.ChipIngressClient Close() error + RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) } type client struct { @@ -128,6 +130,23 @@ func (c *client) Close() error { return c.conn.Close() } +// RegisterSchemas registers one or more schemas with the Chip Ingress service. +func (c *client) RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) { + request := &pb.RegisterSchemaRequest{Schemas: schemas} + + resp, err := c.client.RegisterSchema(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to register schema: %w", err) + } + + registeredMap := make(map[string]int) + for _, schema := range resp.Registered { + registeredMap[schema.Subject] = int(schema.Version) + } + + return registeredMap, nil +} + // WithBasicAuth sets the basic-auth credentials for the ChipIngress service. // Default is to require TLS for security. func WithBasicAuth(user, pass string) Opt { @@ -281,3 +300,44 @@ func EventsToBatch(events []CloudEvent) (*CloudEventBatch, error) { } return batch, nil } + +var _ Client = (*NoopClient)(nil) + +// NoopClient is a no-op implementation of the Client interface. +// All methods return successfully without performing any actual operations. +type NoopClient struct{} + +// Close is a no-op +func (NoopClient) Close() error { + return nil +} + +// Ping is a no-op +func (NoopClient) Ping(ctx context.Context, in *pb.EmptyRequest, opts ...grpc.CallOption) (*pb.PingResponse, error) { + return &pb.PingResponse{Message: "pong"}, nil +} + +// Publish is a no-op +func (NoopClient) Publish(ctx context.Context, in *cepb.CloudEvent, opts ...grpc.CallOption) (*pb.PublishResponse, error) { + return &pb.PublishResponse{}, nil +} + +// PublishBatch is a no-op +func (NoopClient) PublishBatch(ctx context.Context, in *pb.CloudEventBatch, opts ...grpc.CallOption) (*pb.PublishResponse, error) { + return &pb.PublishResponse{}, nil +} + +// StreamEvents is a no-op +func (NoopClient) StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[pb.StreamEventsRequest, pb.StreamEventsResponse], error) { + return nil, nil +} + +// RegisterSchema is a no-op +func (NoopClient) RegisterSchema(ctx context.Context, in *pb.RegisterSchemaRequest, opts ...grpc.CallOption) (*pb.RegisterSchemaResponse, error) { + return &pb.RegisterSchemaResponse{}, nil +} + +// RegisterSchemas is a no-op +func (NoopClient) RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) { + return make(map[string]int), nil +} diff --git a/pkg/chipingress/client_test.go b/pkg/chipingress/client_test.go index 8bbd381ed..979bf02cb 100644 --- a/pkg/chipingress/client_test.go +++ b/pkg/chipingress/client_test.go @@ -14,6 +14,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/protobuf/proto" + "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks" "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb" ) @@ -56,6 +57,33 @@ func TestClient(t *testing.T) { assert.NotNil(t, client) }) + t.Run("NoopClient", func(t *testing.T) { + client := &NoopClient{} + assert.NotNil(t, client) + + // Test that it implements the Client interface + var _ Client = client + + // Test Close returns no error + err := client.Close() + assert.NoError(t, err) + + // Test Ping returns success + pingResp, err := client.Ping(context.Background(), &pb.EmptyRequest{}) + assert.NoError(t, err) + assert.NotNil(t, pingResp) + assert.Equal(t, "pong", pingResp.Message) + + // Test RegisterSchemas returns empty map + schemas := []*pb.Schema{ + {Subject: "test", Schema: `{"test":"value"}`, Format: 1}, + } + result, err := client.RegisterSchemas(context.Background(), schemas...) + assert.NoError(t, err) + assert.NotNil(t, result) + assert.Empty(t, result) + }) + } func TestNewEvent(t *testing.T) { @@ -619,3 +647,62 @@ func TestNewClientWithTLS(t *testing.T) { assert.NotNil(t, client) } } + +func TestClient_RegisterSchemas(t *testing.T) { + t.Run("successfully registers schemas", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().RegisterSchema( + context.Background(), + &pb.RegisterSchemaRequest{ + Schemas: []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + {Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2}, + }, + }, + ).Return(&pb.RegisterSchemaResponse{ + Registered: []*pb.RegisteredSchema{ + {Subject: "schema1", Version: 1}, + {Subject: "schema2", Version: 2}, + }, + }, nil) + + client := &client{ + client: mockClient, + conn: nil, + } + + schemas := []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + {Subject: "schema2", Schema: `{"type":"record","name":"Test2","fields":[{"name":"field2"}]}`, Format: 2}, + } + + result, err := client.RegisterSchemas(context.Background(), schemas...) + assert.NoError(t, err) + assert.Equal(t, map[string]int{"schema1": 1, "schema2": 2}, result) + }) + + t.Run("returns error when registration fails", func(t *testing.T) { + mockClient := mocks.NewClient(t) + mockClient.EXPECT().RegisterSchema( + context.Background(), + &pb.RegisterSchemaRequest{ + Schemas: []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + }, + }, + ).Return(nil, fmt.Errorf("registration failed")) + + client := &client{ + client: mockClient, + conn: nil, + } + + schemas := []*pb.Schema{ + {Subject: "schema1", Schema: `{"type":"record","name":"Test","fields":[{"name":"field1"}]}`, Format: 1}, + } + + result, err := client.RegisterSchemas(context.Background(), schemas...) + assert.Nil(t, result) + assert.EqualError(t, err, "failed to register schema: registration failed") + }) +} diff --git a/pkg/chipingress/mocks/client.go b/pkg/chipingress/mocks/client.go index f8542e388..33b086905 100644 --- a/pkg/chipingress/mocks/client.go +++ b/pkg/chipingress/mocks/client.go @@ -368,6 +368,79 @@ func (_c *Client_RegisterSchema_Call) RunAndReturn(run func(context.Context, *pb return _c } +// RegisterSchemas provides a mock function with given fields: ctx, schemas +func (_m *Client) RegisterSchemas(ctx context.Context, schemas ...*pb.Schema) (map[string]int, error) { + _va := make([]interface{}, len(schemas)) + for _i := range schemas { + _va[_i] = schemas[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for RegisterSchemas") + } + + var r0 map[string]int + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ...*pb.Schema) (map[string]int, error)); ok { + return rf(ctx, schemas...) + } + if rf, ok := ret.Get(0).(func(context.Context, ...*pb.Schema) map[string]int); ok { + r0 = rf(ctx, schemas...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]int) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, ...*pb.Schema) error); ok { + r1 = rf(ctx, schemas...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Client_RegisterSchemas_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterSchemas' +type Client_RegisterSchemas_Call struct { + *mock.Call +} + +// RegisterSchemas is a helper method to define mock.On call +// - ctx context.Context +// - schemas ...*pb.Schema +func (_e *Client_Expecter) RegisterSchemas(ctx interface{}, schemas ...interface{}) *Client_RegisterSchemas_Call { + return &Client_RegisterSchemas_Call{Call: _e.mock.On("RegisterSchemas", + append([]interface{}{ctx}, schemas...)...)} +} + +func (_c *Client_RegisterSchemas_Call) Run(run func(ctx context.Context, schemas ...*pb.Schema)) *Client_RegisterSchemas_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]*pb.Schema, len(args)-1) + for i, a := range args[1:] { + if a != nil { + variadicArgs[i] = a.(*pb.Schema) + } + } + run(args[0].(context.Context), variadicArgs...) + }) + return _c +} + +func (_c *Client_RegisterSchemas_Call) Return(_a0 map[string]int, _a1 error) *Client_RegisterSchemas_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Client_RegisterSchemas_Call) RunAndReturn(run func(context.Context, ...*pb.Schema) (map[string]int, error)) *Client_RegisterSchemas_Call { + _c.Call.Return(run) + return _c +} + // StreamEvents provides a mock function with given fields: ctx, opts func (_m *Client) StreamEvents(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[pb.StreamEventsRequest, pb.StreamEventsResponse], error) { _va := make([]interface{}, len(opts))