Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncer #13427

Merged
merged 12 commits into from
Jun 13, 2024
5 changes: 5 additions & 0 deletions .changeset/shaggy-ears-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#internal Add RegistrySyncer
110 changes: 110 additions & 0 deletions core/capabilities/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package capabilities

import (
"context"
"encoding/json"

"github.com/smartcontractkit/chainlink-common/pkg/types"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

type remoteRegistryReader struct {
r types.ContractReader
}

var _ reader = (*remoteRegistryReader)(nil)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why isn't this part of the syncer.go file? Is it to keep the file size smaller or some go pattern?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean this type assertion specifically? It's located here because it makes assertions about the reader abstraction and the interface it satisfies. Separating these out also makes it easier to mock the reader so we don't have to set up a chain and make the relevant registry calls.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should've been clearer 🙈 I was wondering why the reader.go was split from the syncer.go. After reviewing the whole PR, it might make sense for the reader to do the reading and state augmentation though and the syncer being a simple service that manages the reader 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the reason why it's separate is so that the syncer can accept a mocked reader rather than a concrete implementation based on ChainReader (which therefore depends on a mocked chain + all of the state that comes with it)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense


type hashedCapabilityID [32]byte
type donID uint32

type state struct {
IDsToDONs map[donID]kcr.CapabilityRegistryDONInfo
IDsToNodes map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo
IDsToCapabilities map[hashedCapabilityID]kcr.CapabilityRegistryCapability
}

func (r *remoteRegistryReader) state(ctx context.Context) (state, error) {
dons := []kcr.CapabilityRegistryDONInfo{}
err := r.r.GetLatestValue(ctx, "capabilityRegistry", "getDONs", nil, &dons)
if err != nil {
return state{}, err
}

idsToDONs := map[donID]kcr.CapabilityRegistryDONInfo{}
for _, d := range dons {
idsToDONs[donID(d.Id)] = d
}

caps := kcr.GetCapabilities{}
err = r.r.GetLatestValue(ctx, "capabilityRegistry", "getCapabilities", nil, &caps)
if err != nil {
return state{}, err
}

idsToCapabilities := map[hashedCapabilityID]kcr.CapabilityRegistryCapability{}
for i, c := range caps.Capabilities {
idsToCapabilities[caps.HashedCapabilityIds[i]] = c
}

nodes := &kcr.GetNodes{}
err = r.r.GetLatestValue(ctx, "capabilityRegistry", "getNodes", nil, &nodes)
if err != nil {
return state{}, err
}

idsToNodes := map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo{}
for _, node := range nodes.NodeInfo {
idsToNodes[node.P2pId] = node
}

return state{IDsToDONs: idsToDONs, IDsToCapabilities: idsToCapabilities, IDsToNodes: idsToNodes}, nil
}

type contractReaderFactory interface {
NewContractReader(context.Context, []byte) (types.ContractReader, error)
}

func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) {
contractReaderConfig := evmrelaytypes.ChainReaderConfig{
Contracts: map[string]evmrelaytypes.ChainContractReader{
"capabilityRegistry": {
ContractABI: kcr.CapabilityRegistryABI,
Configs: map[string]*evmrelaytypes.ChainReaderDefinition{
"getDONs": {
ChainSpecificName: "getDONs",
},
"getCapabilities": {
ChainSpecificName: "getCapabilities",
},
"getNodes": {
ChainSpecificName: "getNodes",
},
},
},
},
}

contractReaderConfigEncoded, err := json.Marshal(contractReaderConfig)
if err != nil {
return nil, err
}

cr, err := relayer.NewContractReader(ctx, contractReaderConfigEncoded)
if err != nil {
return nil, err
}

err = cr.Bind(ctx, []types.BoundContract{
{
Address: remoteRegistryAddress,
Name: "capabilityRegistry",
},
})
if err != nil {
return nil, err
}

return &remoteRegistryReader{r: cr}, err
}
210 changes: 210 additions & 0 deletions core/capabilities/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package capabilities

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
evmrelaytypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
)

var writeChainCapability = kcr.CapabilityRegistryCapability{
LabelledName: "write-chain",
Version: "1.0.1",
ResponseType: uint8(1),
}

func startNewChainWithRegistry(t *testing.T) (*kcr.CapabilityRegistry, common.Address, *bind.TransactOpts, *backends.SimulatedBackend) {
owner := testutils.MustNewSimTransactor(t)

oneEth, _ := new(big.Int).SetString("100000000000000000000", 10)
gasLimit := ethconfig.Defaults.Miner.GasCeil * 2 // 60 M blocks

simulatedBackend := backends.NewSimulatedBackend(core.GenesisAlloc{owner.From: {
Balance: oneEth,
}}, gasLimit)
simulatedBackend.Commit()

capabilityRegistryAddress, _, capabilityRegistry, err := kcr.DeployCapabilityRegistry(owner, simulatedBackend)
require.NoError(t, err, "DeployCapabilityRegistry failed")

fmt.Println("Deployed CapabilityRegistry at", capabilityRegistryAddress.Hex())
simulatedBackend.Commit()

return capabilityRegistry, capabilityRegistryAddress, owner, simulatedBackend
}

type crFactory struct {
lggr logger.Logger
logPoller logpoller.LogPoller
client evmclient.Client
}

func (c *crFactory) NewContractReader(ctx context.Context, cfg []byte) (types.ContractReader, error) {
crCfg := &evmrelaytypes.ChainReaderConfig{}
if err := json.Unmarshal(cfg, crCfg); err != nil {
return nil, err
}
svc, err := evm.NewChainReaderService(ctx, c.lggr, c.logPoller, c.client, *crCfg)
if err != nil {
return nil, err
}

return svc, svc.Start(ctx)
}

func newContractReaderFactory(t *testing.T, simulatedBackend *backends.SimulatedBackend) *crFactory {
lggr := logger.TestLogger(t)
client := evmclient.NewSimulatedBackendClient(
t,
simulatedBackend,
testutils.SimulatedChainID,
)
db := pgtest.NewSqlxDB(t)
lp := logpoller.NewLogPoller(
logpoller.NewORM(testutils.SimulatedChainID, db, lggr),
client,
lggr,
logpoller.Opts{
PollPeriod: 100 * time.Millisecond,
FinalityDepth: 2,
BackfillBatchSize: 3,
RpcBatchSize: 2,
KeepFinalizedBlocksDepth: 1000,
},
)
return &crFactory{
lggr: lggr,
client: client,
logPoller: lp,
}
}

func randomWord() [32]byte {
word := make([]byte, 32)
_, err := rand.Read(word)
if err != nil {
panic(err)
}
return [32]byte(word)
}

func TestReader_Integration(t *testing.T) {
ctx := testutils.Context(t)
reg, regAddress, owner, sim := startNewChainWithRegistry(t)

_, err := reg.AddCapabilities(owner, []kcr.CapabilityRegistryCapability{writeChainCapability})
require.NoError(t, err, "AddCapability failed for %s", writeChainCapability.LabelledName)
sim.Commit()

cid, err := reg.GetHashedCapabilityId(&bind.CallOpts{}, writeChainCapability.LabelledName, writeChainCapability.Version)
require.NoError(t, err)

_, err = reg.AddNodeOperators(owner, []kcr.CapabilityRegistryNodeOperator{
{
Admin: owner.From,
Name: "TEST_NOP",
},
})
require.NoError(t, err)

nodeSet := [][32]byte{
randomWord(),
randomWord(),
randomWord(),
}

nodes := []kcr.CapabilityRegistryNodeInfo{
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[0],
HashedCapabilityIds: [][32]byte{cid},
},
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[1],
HashedCapabilityIds: [][32]byte{cid},
},
{
// The first NodeOperatorId has id 1 since the id is auto-incrementing.
NodeOperatorId: uint32(1),
Signer: randomWord(),
P2pId: nodeSet[2],
HashedCapabilityIds: [][32]byte{cid},
},
}
_, err = reg.AddNodes(owner, nodes)
require.NoError(t, err)

cfgs := []kcr.CapabilityRegistryCapabilityConfiguration{
{
CapabilityId: cid,
Config: []byte(`{"hello": "world"}`),
},
}
_, err = reg.AddDON(
owner,
nodeSet,
cfgs,
true,
true,
1,
)
sim.Commit()

require.NoError(t, err)

factory := newContractReaderFactory(t, sim)
reader, err := newRemoteRegistryReader(ctx, factory, regAddress.Hex())
require.NoError(t, err)

s, err := reader.state(ctx)
require.NoError(t, err)
assert.Len(t, s.IDsToCapabilities, 1)

gotCap := s.IDsToCapabilities[cid]
assert.Equal(t, writeChainCapability, gotCap)

assert.Len(t, s.IDsToDONs, 1)
assert.Equal(t, kcr.CapabilityRegistryDONInfo{
Id: 1, // initial Id
ConfigCount: 1, // initial Count
IsPublic: true,
AcceptsWorkflows: true,
F: 1,
NodeP2PIds: nodeSet,
CapabilityConfigurations: cfgs,
}, s.IDsToDONs[1])

assert.Len(t, s.IDsToNodes, 3)
assert.Equal(t, map[p2ptypes.PeerID]kcr.CapabilityRegistryNodeInfo{
nodeSet[0]: nodes[0],
nodeSet[1]: nodes[1],
nodeSet[2]: nodes[2],
}, s.IDsToNodes)
}
7 changes: 6 additions & 1 deletion core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ package capabilities

import (
"context"
"errors"
"fmt"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/logger"
)

var (
ErrCapabilityAlreadyExists = errors.New("capability already exists")
)

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
Expand Down Expand Up @@ -141,7 +146,7 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
id := info.ID
_, ok := r.m[id]
if ok {
return fmt.Errorf("capability with id: %s already exists", id)
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
}

r.m[id] = c
Expand Down
3 changes: 2 additions & 1 deletion core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package capabilities_test

import (
"context"
"errors"
"fmt"
"testing"

Expand Down Expand Up @@ -85,7 +86,7 @@ func TestRegistry_NoDuplicateIDs(t *testing.T) {
c2 := &mockCapability{CapabilityInfo: ci}

err = r.Add(ctx, c2)
assert.ErrorContains(t, err, "capability with id: capability-1@1.0.0 already exists")
assert.True(t, errors.Is(err, coreCapabilities.ErrCapabilityAlreadyExists))
}

func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions core/capabilities/remote/trigger_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
//
// TriggerPublisher communicates with corresponding TriggerSubscribers on remote nodes.
type triggerPublisher struct {
config types.RemoteTriggerConfig
config *types.RemoteTriggerConfig
underlying commoncap.TriggerCapability
capInfo commoncap.CapabilityInfo
capDonInfo commoncap.DON
Expand All @@ -48,7 +48,7 @@ type pubRegState struct {
var _ types.Receiver = &triggerPublisher{}
var _ services.Service = &triggerPublisher{}

func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
func NewTriggerPublisher(config *types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher {
config.ApplyDefaults()
return &triggerPublisher{
config: config,
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/remote/trigger_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestTriggerPublisher_Register(t *testing.T) {
}

dispatcher := remoteMocks.NewDispatcher(t)
config := remotetypes.RemoteTriggerConfig{
config := &remotetypes.RemoteTriggerConfig{
RegistrationRefreshMs: 100,
RegistrationExpiryMs: 100_000,
MinResponsesToAggregate: 1,
Expand Down
Loading
Loading