Skip to content

Commit

Permalink
Merge pull request #1566 from bstasyszyn/1565
Browse files Browse the repository at this point in the history
fix: Don't process anchor if it has already been processed
  • Loading branch information
fqutishat authored May 30, 2023
2 parents 23f5800 + e555fc2 commit e0596de
Show file tree
Hide file tree
Showing 8 changed files with 322 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pkg/activitypub/httpsig/signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewSigner(cfg SignerConfig, cr crypto, km keyManager, keyID string) *Signer
func (s *Signer) SignRequest(pubKeyID string, req *http.Request) error {
req.Header.Add(dateHeader, date())

logger.Debug("Signing request for %s. Public key ID [%s]. Headers: %s", logfields.WithRequestURLString(req.RequestURI),
logger.Debug("Signing request", logfields.WithRequestURLString(req.RequestURI),
logfields.WithKeyID(pubKeyID), logfields.WithRequestHeaders(req.Header))

if err := s.signer().Sign(pubKeyID, req); err != nil {
Expand Down
98 changes: 71 additions & 27 deletions pkg/anchor/handler/credential/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,27 @@ func New(anchorPublisher anchorPublisher, casResolver casResolver,
func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, anchorRef, source *url.URL,
anchorEvent *vocab.AnchorEventType,
) error {
logger.Debug("Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorEventURI(anchorRef))
logger.Debugc(ctx, "Received request for anchor", logfields.WithActorIRI(actor), logfields.WithAnchorURI(anchorRef))

ok, err := h.isAnchorProcessed(anchorRef)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", anchorRef, err)
}

if ok {
logger.Infoc(ctx, "Anchor was already processed or processing is pending", logfields.WithAnchorURI(anchorRef))

return nil
}

var anchorLinksetBytes []byte

if anchorEvent != nil {
var err error
var e error

anchorLinksetBytes, err = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if err != nil {
return fmt.Errorf("marshal anchor linkset: %w", err)
anchorLinksetBytes, e = canonicalizer.MarshalCanonical(anchorEvent.Object().Document())
if e != nil {
return fmt.Errorf("marshal anchor linkset: %w", e)
}
}

Expand Down Expand Up @@ -130,7 +141,7 @@ func (h *AnchorEventHandler) HandleAnchorEvent(ctx context.Context, actor, ancho
attributedTo = actor.String()
}

logger.Info("Processing anchor", logfields.WithAnchorEventURI(anchorRef))
logger.Infoc(ctx, "Processing anchor", logfields.WithAnchorURI(anchorRef))

var alternateSources []string

Expand Down Expand Up @@ -185,7 +196,37 @@ func (h *AnchorEventHandler) processAnchorEvent(ctx context.Context, anchorInfo
return fmt.Errorf("validate credential subject for anchor [%s]: %w", anchorLink.Anchor(), err)
}

return h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
hl, err := url.Parse(anchorInfo.Hashlink)
if err != nil {
return fmt.Errorf("parse anchor hashlink [%s]: %w", anchorInfo.Hashlink, err)
}

// Check again if the anchor was processed. This further limits race conditions, especially
// when many anchors with the same parent are being processed concurrently.
processed, err := h.isAnchorProcessed(hl)
if err != nil {
return fmt.Errorf("is anchor processed [%s]: %w", hl, err)
}

if processed {
logger.Infoc(ctx, "Anchor was already processed or processing is pending.", logfields.WithAnchorURI(hl))

return nil
}

logger.Debugc(ctx, "Storing pending anchor link", logfields.WithAnchorURI(hl))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{hl})
if err != nil {
return fmt.Errorf("store pending anchor link: %w", err)
}

err = h.anchorPublisher.PublishAnchor(ctx, anchorInfo.AnchorInfo)
if err != nil {
return fmt.Errorf("publish anchor %s: %w", hl, err)
}

return nil
}

// ensureParentAnchorsAreProcessed checks all ancestors (parents, grandparents, etc.) of the given anchor event
Expand All @@ -196,21 +237,23 @@ func (h *AnchorEventHandler) ensureParentAnchorsAreProcessed(ctx context.Context
return fmt.Errorf("get unprocessed parent anchors for [%s]: %w", anchorRef, err)
}

if len(unprocessedParents) > 0 {
logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))
if len(unprocessedParents) == 0 {
return nil
}

logger.Infoc(ctx, "Processing parents of anchor", logfields.WithTotal(len(unprocessedParents)),
logfields.WithAnchorURI(anchorRef), logfields.WithParents(unprocessedParents.HashLinks()))

spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()
spanCtx, span := h.tracer.Start(ctx, "process parent anchors",
trace.WithAttributes(tracing.AnchorEventURIAttribute(anchorRef.String())))
defer span.End()

for _, parentAnchorInfo := range unprocessedParents {
logger.Infoc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))
for _, parentAnchorInfo := range unprocessedParents {
logger.Debugc(spanCtx, "Processing parent", logfields.WithAnchorURI(anchorRef), logfields.WithParent(parentAnchorInfo.Hashlink))

err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
err = h.processAnchorEvent(spanCtx, parentAnchorInfo)
if err != nil {
return fmt.Errorf("process anchor [%s]: %w", parentAnchorInfo.Hashlink, err)
}
}

Expand Down Expand Up @@ -263,12 +306,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchors(hl string, anchorLink *
}

logger.Debug("Adding parent of anchor event to the unprocessed list",
logfields.WithAnchorEventURIString(hl), logfields.WithParentURI(parentHL))

err = h.anchorLinkStore.PutPendingLinks([]*url.URL{parentHL})
if err != nil {
return nil, fmt.Errorf("store pending parent anchor %s: %w", parentHL, err)
}
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

// Add the parent to the head of the list since it needs to be processed first.
unprocessed = append([]*anchorInfo{info}, unprocessed...)
Expand All @@ -294,7 +332,7 @@ func (h *AnchorEventHandler) getUnprocessedParentAnchor(hl string, parentHL *url
}

if isProcessed {
logger.Debug("Parent of anchor was already processed",
logger.Debug("Parent of anchor was already processed or processing is pending",
logfields.WithAnchorURIString(hl), logfields.WithParentURI(parentHL))

return true, nil, nil
Expand Down Expand Up @@ -361,7 +399,13 @@ func (h *AnchorEventHandler) isAnchorProcessed(hl *url.URL) (bool, error) {
return false, fmt.Errorf("get anchor event: %w", err)
}

return len(links) > 0, nil
for _, link := range links {
if link.String() == hl.String() {
return true, nil
}
}

return false, nil
}

type anchorInfo struct {
Expand Down
106 changes: 98 additions & 8 deletions pkg/anchor/handler/credential/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,25 +292,94 @@ func TestGetUnprocessedParentAnchorEvents(t *testing.T) {
}

func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}
t.Run("success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)
handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

t.Run("success", func(t *testing.T) {
anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

err := handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{},
ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: ls.Link().Anchor().String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("already processed -> success", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

anchorLinkStore.GetProcessedAndPendingLinksReturns([]*url.URL{anchorRef}, nil)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.NoError(t, err)
})

t.Run("is processed -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(sampleGrandparentAnchorLinkset), anchorLinkset))

ls, err := anchorLinkset.Link().Original().Linkset()
require.NoError(t, err)

anchorRef := ls.Link().Anchor()

errExpected := errors.New("injected query error")

anchorLinkStore.GetProcessedAndPendingLinksReturns(nil, errExpected)

err = handler.processAnchorEvent(context.Background(), &anchorInfo{
AnchorInfo: &info.AnchorInfo{
Hashlink: anchorRef.String(),
},
anchorLink: anchorLinkset.Link(),
})
require.Error(t, err)
require.Contains(t, err.Error(), errExpected.Error())
})

t.Run("no replies -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetNoReplies), anchorLinkset))

Expand All @@ -323,6 +392,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid original content -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidContent), anchorLinkset))

Expand All @@ -335,6 +411,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("unsupported profile -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetUnsupportedProfile), anchorLinkset))

Expand All @@ -347,6 +430,13 @@ func TestAnchorEventHandler_processAnchorEvent(t *testing.T) {
})

t.Run("invalid anchor credential -> error", func(t *testing.T) {
casResolver := &mocks2.CASResolver{}
anchorLinkStore := &mocks.AnchorLinkStore{}

handler := New(&anchormocks.AnchorPublisher{}, casResolver, testutil.GetLoader(t),
time.Second, anchorLinkStore, generator.NewRegistry())
require.NotNil(t, handler)

anchorLinkset := &linkset.Linkset{}
require.NoError(t, json.Unmarshal([]byte(anchorLinksetInvalidVC), anchorLinkset))

Expand Down
Loading

0 comments on commit e0596de

Please sign in to comment.