Skip to content

Commit

Permalink
Merge pull request #16047 from smartcontractkit/chore/release/2.20.0/…
Browse files Browse the repository at this point in the history
…add-data-streams-changes

Chore/release/2.20.0/add data streams changes
  • Loading branch information
Bwest981 authored Jan 23, 2025
2 parents c750920 + f35d73f commit 039217d
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 52 deletions.
5 changes: 0 additions & 5 deletions .changeset/six-camels-smell.md

This file was deleted.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

- [#15899](https://github.com/smartcontractkit/chainlink/pull/15899) [`796357b17c`](https://github.com/smartcontractkit/chainlink/commit/796357b17ca875ba80e157fc08b0da5db4ed1644) - #updated feat:create tron chain config on operator ui

- [#16019](https://github.com/smartcontractkit/chainlink/pull/16019) [`c75092086f`](https://github.com/smartcontractkit/chainlink/commit/c75092086f790d273abb08f18f1b03f7934e30dc) - Bump to start the next version

### Patch Changes

- [#15357](https://github.com/smartcontractkit/chainlink/pull/15357) [`18cb44e891`](https://github.com/smartcontractkit/chainlink/commit/18cb44e891a00edff7486640ffc8e0c9275a04f8) - #updated use real contracts in ccipreader_tests where possible
Expand Down
20 changes: 0 additions & 20 deletions core/services/llo/channel_definition_cache_factory.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package llo

import (
"fmt"
"net/http"
"sync"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
Expand All @@ -26,8 +22,6 @@ func NewChannelDefinitionCacheFactory(lggr logger.Logger, orm ChannelDefinitionC
orm,
lp,
client,
make(map[common.Address]map[uint32]struct{}),
sync.Mutex{},
}
}

Expand All @@ -36,9 +30,6 @@ type channelDefinitionCacheFactory struct {
orm ChannelDefinitionCacheORM
lp logpoller.LogPoller
client *http.Client

caches map[common.Address]map[uint32]struct{}
mu sync.Mutex
}

func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) {
Expand All @@ -50,16 +41,5 @@ func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (ll
fromBlock := cfg.ChannelDefinitionsContractFromBlock
donID := cfg.DonID

f.mu.Lock()
defer f.mu.Unlock()

if _, exists := f.caches[addr][donID]; exists {
// This shouldn't really happen and isn't supported
return nil, fmt.Errorf("cache already exists for contract address %s and don ID %d", addr.Hex(), donID)
}
if _, exists := f.caches[addr]; !exists {
f.caches[addr] = make(map[uint32]struct{})
}
f.caches[addr][donID] = struct{}{}
return NewChannelDefinitionCache(f.lggr, f.orm, f.client, f.lp, addr, donID, fromBlock), nil
}
7 changes: 4 additions & 3 deletions core/services/llo/channel_definition_cache_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ func Test_ChannelDefinitionCacheFactory(t *testing.T) {
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// returns error if you try to do it again with the same addr/donID
_, err = cdcFactory.NewCache(lloconfig.PluginConfig{
// creates another one if you try to do it again with the same addr/donID
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 1,
})
require.EqualError(t, err, "cache already exists for contract address 0xaAaAaAaaAaAaAaaAaAAAAAAAAaaaAaAaAaaAaaAa and don ID 1")
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// is fine if you do it again with different addr
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
Expand Down
25 changes: 18 additions & 7 deletions core/services/llo/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package grpc
import (
"context"
"crypto/ed25519"
"encoding/hex"
"errors"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
Expand All @@ -29,9 +31,10 @@ type client struct {
services.Service
eng *services.Engine

clientPrivKey ed25519.PrivateKey
serverPubKey ed25519.PublicKey
serverURL string
clientPrivKey ed25519.PrivateKey
clientPubKeyHex string
serverPubKey ed25519.PublicKey
serverURL string

conn *grpc.ClientConn
client rpc.TransmitterClient
Expand All @@ -50,9 +53,10 @@ func NewClient(opts ClientOpts) Client {

func newClient(opts ClientOpts) Client {
c := &client{
clientPrivKey: opts.ClientPrivKey,
serverPubKey: opts.ServerPubKey,
serverURL: opts.ServerURL,
clientPrivKey: opts.ClientPrivKey,
clientPubKeyHex: hex.EncodeToString(opts.ClientPrivKey.Public().(ed25519.PublicKey)),
serverPubKey: opts.ServerPubKey,
serverURL: opts.ServerURL,
}
c.Service, c.eng = services.Config{
Name: "GRPCClient",
Expand Down Expand Up @@ -103,7 +107,14 @@ func (c *client) close() error {
}

func (c *client) Transmit(ctx context.Context, req *rpc.TransmitRequest) (resp *rpc.TransmitResponse, err error) {
return c.client.Transmit(ctx, req)
err = c.eng.IfStarted(func() error {
// This is a self-identified client ID
// It is not cryptographically verified
transmitCtx := metadata.AppendToOutgoingContext(ctx, "client_public_key", c.clientPubKeyHex)
resp, err = c.client.Transmit(transmitCtx, req)
return err
})
return
}

func (c *client) LatestReport(ctx context.Context, req *rpc.LatestReportRequest) (resp *rpc.LatestReportResponse, err error) {
Expand Down
136 changes: 136 additions & 0 deletions core/services/llo/grpc/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package grpc

import (
"context"
"crypto/ed25519"
"crypto/rand"
"encoding/hex"
"net"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink-data-streams/rpc"
"github.com/smartcontractkit/chainlink-data-streams/rpc/mtls"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

func Test_Client(t *testing.T) {
ctx := tests.Context(t)
clientPrivKey := ed25519.NewKeyFromSeed(randomBytes(t, 32))
serverPrivKey := ed25519.NewKeyFromSeed(randomBytes(t, 32))

t.Run("Transmit errors if not started", func(t *testing.T) {
c := NewClient(ClientOpts{
Logger: logger.TestLogger(t),
ClientPrivKey: clientPrivKey,
ServerPubKey: serverPrivKey.Public().(ed25519.PublicKey),
ServerURL: "example.com",
})

resp, err := c.Transmit(tests.Context(t), &rpc.TransmitRequest{})
assert.Nil(t, resp)
require.EqualError(t, err, "service is Unstarted, not started")
})
t.Run("Transmits report including client public key metadata", func(t *testing.T) {
ch := make(chan packet, 100)
srv := newMercuryServer(t, serverPrivKey, ch)
serverURL := srv.start(t, []ed25519.PublicKey{clientPrivKey.Public().(ed25519.PublicKey)})

c := NewClient(ClientOpts{
Logger: logger.TestLogger(t),
ClientPrivKey: clientPrivKey,
ServerPubKey: serverPrivKey.Public().(ed25519.PublicKey),
ServerURL: serverURL,
})

servicetest.Run(t, c)

req := &rpc.TransmitRequest{
Payload: []byte("report"),
ReportFormat: 42,
}
resp, err := c.Transmit(ctx, req)
require.NoError(t, err)

assert.Equal(t, "", resp.Error)
assert.Equal(t, int32(1), resp.Code)

select {
case p := <-ch:
assert.Equal(t, req.Payload, p.req.Payload)
assert.Equal(t, req.ReportFormat, p.req.ReportFormat)
m, ok := metadata.FromIncomingContext(p.ctx)
require.True(t, ok)
require.Len(t, m["client_public_key"], 1)
assert.Equal(t, hex.EncodeToString(clientPrivKey.Public().(ed25519.PublicKey)), m["client_public_key"][0])
default:
t.Fatal("expected request to be received")
}
})
}

func randomBytes(t *testing.T, n int) (r []byte) {
r = make([]byte, n)
_, err := rand.Read(r)
require.NoError(t, err)
return
}

type packet struct {
ctx context.Context //nolint:containedctx // this is used solely for test purposes
req *rpc.TransmitRequest
}

type mercuryServer struct {
rpc.UnimplementedTransmitterServer
privKey ed25519.PrivateKey
packetsCh chan packet
t *testing.T
}

func newMercuryServer(t *testing.T, privKey ed25519.PrivateKey, packetsCh chan packet) *mercuryServer {
return &mercuryServer{rpc.UnimplementedTransmitterServer{}, privKey, packetsCh, t}
}

func (srv *mercuryServer) start(t *testing.T, clientPubKeys []ed25519.PublicKey) (serverURL string) {
// Set up the grpc server
lis, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("[MAIN] failed to listen: %v", err)
}
serverURL = lis.Addr().String()
sMtls, err := mtls.NewTransportCredentials(srv.privKey, clientPubKeys)
require.NoError(t, err)
s := grpc.NewServer(grpc.Creds(sMtls))

// Register mercury implementation with the wsrpc server
rpc.RegisterTransmitterServer(s, srv)

// Start serving
go func() {
s.Serve(lis) //nolint:errcheck // don't care about errors in tests
}()

t.Cleanup(s.Stop)

return
}

func (srv *mercuryServer) Transmit(ctx context.Context, req *rpc.TransmitRequest) (*rpc.TransmitResponse, error) {
srv.packetsCh <- packet{ctx, req}

return &rpc.TransmitResponse{
Code: 1,
Error: "",
}, nil
}

func (srv *mercuryServer) LatestReport(ctx context.Context, lrr *rpc.LatestReportRequest) (*rpc.LatestReportResponse, error) {
panic("should not be called")
}
51 changes: 39 additions & 12 deletions core/services/llo/mercurytransmitter/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,31 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
mt.lggr.Debugw("Loading transmit requests from database")
}

{
var startClosers []services.StartClose
for _, s := range mt.servers {
transmissions, err := s.pm.Load(ctx)
g, startCtx := errgroup.WithContext(ctx)
for _, s := range mt.servers {
// concurrent start of all servers
g.Go(func() error {
// Load DB transmissions and populate server transmit queue
transmissions, err := s.pm.Load(startCtx)
if err != nil {
return err
}
s.q.Init(transmissions)

// Start all associated services
//
// starting pm after loading from it is fine because it simply
// spawns some garbage collection/prune goroutines
startClosers = append(startClosers, s.c, s.q, s.pm)
//
// client, queue etc should be started before spawning server loops
startClosers := []services.StartClose{s.c, s.q, s.pm}
if err := (&services.MultiStart{}).Start(startCtx, startClosers...); err != nil {
return err
}

// Number of goroutines per server will be roughly
// Spawn loops for the server
//
// Number of goroutines per transmitter will be roughly
// 2*nServers*TransmitConcurrency because each server has a
// delete queue and a transmit queue.
//
Expand All @@ -194,13 +206,11 @@ func (mt *transmitter) Start(ctx context.Context) (err error) {
go s.runDeleteQueueLoop(mt.stopCh, mt.wg)
go s.runQueueLoop(mt.stopCh, mt.wg, donIDStr)
}
}
if err := (&services.MultiStart{}).Start(ctx, startClosers...); err != nil {
return err
}
return nil
})
}

return nil
return g.Wait()
})
}

Expand Down Expand Up @@ -245,7 +255,24 @@ func (mt *transmitter) Transmit(
digest types.ConfigDigest,
seqNr uint64,
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature) error {
sigs []types.AttributedOnchainSignature,
) (err error) {
ok := mt.IfStarted(func() {
err = mt.transmit(ctx, digest, seqNr, report, sigs)
})
if !ok {
return errors.New("transmitter is not started")
}
return
}

func (mt *transmitter) transmit(
ctx context.Context,
digest types.ConfigDigest,
seqNr uint64,
report ocr3types.ReportWithInfo[llotypes.ReportInfo],
sigs []types.AttributedOnchainSignature,
) error {
transmissions := make([]*Transmission, 0, len(mt.servers))
for serverURL := range mt.servers {
transmissions = append(transmissions, &Transmission{
Expand Down
Loading

0 comments on commit 039217d

Please sign in to comment.