From ddc1520d66c00d7ae2b8f1d87f4a9a6cb56e91b9 Mon Sep 17 00:00:00 2001 From: Ullaakut Date: Wed, 31 Aug 2022 15:34:46 +0200 Subject: [PATCH 1/3] Handle application-level deletions --- models/results/error.go | 6 ++++++ network/web2/metadata_fetcher.go | 12 ++++-------- service/pipeline/addition_stage.go | 4 ++-- service/workers/addition_handler.go | 19 ++++++++++++++++++- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/models/results/error.go b/models/results/error.go index c234c310..e023ab09 100644 --- a/models/results/error.go +++ b/models/results/error.go @@ -1,5 +1,11 @@ package results +import ( + "errors" +) + +var ErrTokenNotFound = errors.New("token not found") + type Error struct { Message string `json:"errorMessage"` Type string `json:"errorType"` diff --git a/network/web2/metadata_fetcher.go b/network/web2/metadata_fetcher.go index ee944e80..9b50faa4 100644 --- a/network/web2/metadata_fetcher.go +++ b/network/web2/metadata_fetcher.go @@ -33,22 +33,18 @@ func NewMetadataFetcher(options ...MetadataOption) *MetadataFetcher { return &m } -func (m *MetadataFetcher) Payload(_ context.Context, uri string) ([]byte, error) { +func (m *MetadataFetcher) Payload(_ context.Context, uri string) ([]byte, int, error) { res, err := m.client.Get(uri) if err != nil { - return nil, fmt.Errorf("could not execute request: %w", err) + return nil, 0, fmt.Errorf("could not execute request: %w", err) } defer res.Body.Close() - if res.StatusCode != 200 { - return nil, fmt.Errorf("bad response code (%d)", res.StatusCode) - } - payload, err := io.ReadAll(res.Body) if err != nil { - return nil, fmt.Errorf("could not read response body: %w", err) + return nil, res.StatusCode, fmt.Errorf("could not read response body: %w", err) } - return payload, nil + return payload, res.StatusCode, nil } diff --git a/service/pipeline/addition_stage.go b/service/pipeline/addition_stage.go index 5fa0e36e..64fe0795 100644 --- a/service/pipeline/addition_stage.go +++ b/service/pipeline/addition_stage.go @@ -3,8 +3,8 @@ package pipeline import ( "context" "encoding/json" + "errors" "fmt" - "strings" "github.com/nsqio/go-nsq" "github.com/rs/zerolog" @@ -139,7 +139,7 @@ func (a *AdditionStage) process(payload []byte) error { if err != nil { return fmt.Errorf("could not decode execution error: %w", err) } - if execErr != nil && strings.Contains(execErr.Error(), "URI query for nonexistent token") { + if errors.Is(execErr, results.ErrTokenNotFound) { return a.delete(payload) } if execErr != nil { diff --git a/service/workers/addition_handler.go b/service/workers/addition_handler.go index 5e3b0317..8478c4f7 100644 --- a/service/workers/addition_handler.go +++ b/service/workers/addition_handler.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "net/http" "strings" "github.com/ipfs/go-cid" @@ -169,13 +170,29 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( // Finally, we check if we have a payload already, or if we need to fetch it remotely. var payload []byte + var code int switch { case strings.HasPrefix(privateURI, protocol.HTTP), strings.HasPrefix(privateURI, protocol.HTTPS): - payload, err = fetchMetadata.Payload(ctx, privateURI) + payload, code, err = fetchMetadata.Payload(ctx, privateURI) if err != nil { return nil, fmt.Errorf("could not fetch remote metadata: %w", err) } + if code == http.StatusInternalServerError { + var reqErr *results.Error + err = json.Unmarshal(payload, &reqErr) + if err != nil { + return nil, fmt.Errorf("could not decode execution error: %w", err) + } + switch reqErr.Error() { + case "Token not found", + "URI query for nonexistent token": + // This is an application-level deletion. + return nil, results.ErrTokenNotFound + default: + // This is just a standard 500 error, no need to trigger an error on our side. + } + } log.Debug(). Str("payload", string(payload)). Msg("remote payload fetched") From 3d3fc5b2f4af6e921cb0c6fa2260fe5428ba7c76 Mon Sep 17 00:00:00 2001 From: Ullaakut Date: Wed, 31 Aug 2022 17:21:27 +0200 Subject: [PATCH 2/3] Handle ipfs-based deletions --- service/workers/addition_handler.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/service/workers/addition_handler.go b/service/workers/addition_handler.go index 8478c4f7..f14a9259 100644 --- a/service/workers/addition_handler.go +++ b/service/workers/addition_handler.go @@ -178,19 +178,18 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( if err != nil { return nil, fmt.Errorf("could not fetch remote metadata: %w", err) } - if code == http.StatusInternalServerError { + switch code { + case http.StatusInternalServerError, http.StatusNotFound: var reqErr *results.Error err = json.Unmarshal(payload, &reqErr) if err != nil { return nil, fmt.Errorf("could not decode execution error: %w", err) } - switch reqErr.Error() { - case "Token not found", - "URI query for nonexistent token": + if reqErr.Error() == "Token not found" || + strings.Contains(reqErr.Error(), "URI query for nonexistent token") || + strings.Contains(reqErr.Error(), "no link named") { // This is an application-level deletion. return nil, results.ErrTokenNotFound - default: - // This is just a standard 500 error, no need to trigger an error on our side. } } log.Debug(). From 96d5fd13604c368a5d7d6959d62ae9f8ca974aed Mon Sep 17 00:00:00 2001 From: Ullaakut Date: Fri, 2 Sep 2022 08:46:40 +0200 Subject: [PATCH 3/3] Apply PR suggestion --- service/workers/addition_handler.go | 43 ++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/service/workers/addition_handler.go b/service/workers/addition_handler.go index f14a9259..b1ce3ca3 100644 --- a/service/workers/addition_handler.go +++ b/service/workers/addition_handler.go @@ -178,19 +178,11 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( if err != nil { return nil, fmt.Errorf("could not fetch remote metadata: %w", err) } - switch code { - case http.StatusInternalServerError, http.StatusNotFound: - var reqErr *results.Error - err = json.Unmarshal(payload, &reqErr) - if err != nil { - return nil, fmt.Errorf("could not decode execution error: %w", err) - } - if reqErr.Error() == "Token not found" || - strings.Contains(reqErr.Error(), "URI query for nonexistent token") || - strings.Contains(reqErr.Error(), "no link named") { - // This is an application-level deletion. - return nil, results.ErrTokenNotFound - } + if code == http.StatusInternalServerError && isTokenNotFound(payload) { + return nil, results.ErrTokenNotFound + } + if code == http.StatusNotFound && isIPFSMissingLink(payload) { + return nil, results.ErrTokenNotFound } log.Debug(). Str("payload", string(payload)). @@ -280,3 +272,28 @@ func (a *AdditionHandler) Handle(ctx context.Context, addition *jobs.Addition) ( return &result, nil } + +func isTokenNotFound(payload []byte) bool { + var reqErr *results.Error + err := json.Unmarshal(payload, &reqErr) + if err != nil { + return false + } + if reqErr.Error() == "Token not found" { + return true + } + return false +} + +func isIPFSMissingLink(payload []byte) bool { + var reqErr *results.Error + err := json.Unmarshal(payload, &reqErr) + if err != nil { + return false + } + if strings.Contains(reqErr.Error(), "URI query for nonexistent token") || + strings.Contains(reqErr.Error(), "no link named") { + return true + } + return false +}