Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't process anchor if it has already been processed #1566

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/activitypub/httpsig/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSigner(cfg SignerConfig, cr crypto, km keyManager, keyID string) *Signer
func (s *Signer) SignRequest(pubKeyID string, req *http.Request) error {
req.Header.Add(dateHeader, date())

logger.Debug("Signing request for %s. Public key ID [%s]. Headers: %s", logfields.WithRequestURLString(req.RequestURI),
logger.Debug("Signing request", logfields.WithRequestURLString(req.RequestURI),
logfields.WithKeyID(pubKeyID), logfields.WithRequestHeaders(req.Header))

if err := s.signer().Sign(pubKeyID, req); err != nil {
Expand Down
98 changes: 71 additions & 27 deletions pkg/anchor/handler/credential/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,27 @@ func New(anchorPublisher anchorPublisher, casResolver casResolver,
func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, anchorRef, source *url.URL,
anchorEvent *vocab.AnchorEventType,
) error {
logger.Debug("Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorEventURI(anchorRef))
logger.Debugc(ctx, "Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorURI(anchorRef))

ok, err := h.isAnchorProcessed(anchorRef)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", anchorRef, err)
}

if ok {
logger.Infoc(ctx, "Anchor was already processed or processing is pending", logfields.WithAnchorURI(anchorRef))

return nil
}

var anchorLinksetBytes []byte

if anchorEvent != nil {
var err error
var e error

anchorLinksetBytes, err = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if err != nil {
return fmt.Errorf("marshal anchor linkset: %w", err)
anchorLinksetBytes, e = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if e != nil {
return fmt.Errorf("marshal anchor linkset: %w", e)
}
}

Expand Down Expand Up @@ -130,7 +141,7 @@ func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, ancho
attributedTo = actor.String()
}

logger.Info("Processing anchor", logfields.WithAnchorEventURI(anchorRef))
logger.Infoc(ctx, "Processing anchor", logfields.WithAnchorURI(anchorRef))

var alternateSources []string

Expand Down Expand Up @@ -185,7 +196,37 @@ func (h *AnchorEventHandler) processAnchorEvent(ctx context.Context, anchorInfo
return fmt.Errorf("validate credential subject for anchor [%s]: %w", anchorLink.Anchor(), err)
}

return h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
hl, err := url.Parse(anchorInfo.Hashlink)
if err != nil {
return fmt.Errorf("parse anchor hashlink [%s]: %w", anchorInfo.Hashlink, err)
}

// Check again if the anchor was processed. This further limits race conditions, especially
// when many anchors with the same parent are being processed concurrently.
processed, err := h.isAnchorProcessed(hl)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", hl, err)
}

if processed {
logger.Infoc(ctx, "Anchor was already processed or processing is pending.", logfields.WithAnchorURI(hl))

return nil
}

logger.Debugc(ctx, "Storing pending anchor link", logfields.WithAnchorURI(hl))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{hl})
if err != nil {
return fmt.Errorf("store pending anchor link: %w", err)
}

err = h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
if err != nil {
return fmt.Errorf("publish anchor %s: %w", hl, err)
}

return nil
}

// ensureParentAnchorsAreProcessed checks all ancestors (parents, grandparents, etc.) of the given anchor event
Expand All @@ -196,21 +237,23 @@ func (h *AnchorEventHandler) ensureParentAnchorsAreProcessed(ctx context.Context
return fmt.Errorf("get unprocessed parent anchors for [%s]: %w", anchorRef, err)
}

if len(unprocessedParents) > 0 {
logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))
if len(unprocessedParents) == 0 {
return nil
}

logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))

spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()
spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()

for _, parentAnchorInfo := range unprocessedParents {
logger.Infoc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))
for _, parentAnchorInfo := range unprocessedParents {
logger.Debugc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))

err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
}

Expand Down Expand Up @@ -263,12 +306,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchors(hl string, anchorLink *
}

logger.Debug("Adding parent of anchor event to the unprocessed list",
logfields.WithAnchorEventURIString(hl), logfields.WithParentURI(parentHL))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{parentHL})
if err != nil {
return nil, fmt.Errorf("store pending parent anchor %s: %w", parentHL, err)
}
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

// Add the parent to the head of the list since it needs to be processed first.
unprocessed = append([]*anchorInfo{info}, unprocessed...)
Expand All @@ -294,7 +332,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchor(hl string, parentHL *url
}

if isProcessed {
logger.Debug("Parent of anchor was already processed",
logger.Debug("Parent of anchor was already processed or processing is pending",
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

return true, nil, nil
Expand Down Expand Up @@ -361,7 +399,13 @@ func (h *AnchorEventHandler) isAnchorProcessed(hl *url.URL) (bool, error) {
return false, fmt.Errorf("get anchor event: %w", err)
}

return len(links) > 0, nil
for _, link := range links {
if link.String() == hl.String() {
return true, nil
}
}

return false, nil
}

type anchorInfo struct {
Expand Down
106 changes: 98 additions & 8 deletions pkg/anchor/handler/credential/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,94 @@ func TestGetUnprocessedParentAnchorEvents(t *testing.T) {
}

func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}
t.Run("success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)
handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

t.Run("success", func(t *testing.T) {
anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

err := handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{},
ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: ls.Link().Anchor().String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("already processed -> success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

anchorLinkStore.GetProcessedAndPendingLinksReturns([]*url.URL{anchorRef}, nil)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("is processed -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

errExpected := errors.New("injected query error")

anchorLinkStore.GetProcessedAndPendingLinksReturns(nil, errExpected)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.Error(t, err)
require.Contains(t, err.Error(), errExpected.Error())
})

t.Run("no replies -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetNoReplies), anchorLinkset))

Expand All @@ -323,6 +392,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid original content -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidContent), anchorLinkset))

Expand All @@ -335,6 +411,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("unsupported profile -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetUnsupportedProfile), anchorLinkset))

Expand All @@ -347,6 +430,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid anchor credential -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidVC), anchorLinkset))

Expand Down
Loading