From 33598b2d472f5a3ca79666e5d68db2bc371d5b21 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Wed, 6 Mar 2024 17:11:42 -0600 Subject: [PATCH 1/4] feat: respond to CancelACHFile events with FileCancellationResponse Issue: https://github.com/moov-io/achgateway/issues/141 Issue: https://github.com/moov-io/achgateway/issues/142 --- internal/environment.go | 2 +- internal/incoming/models.go | 6 +++ internal/incoming/web/api_files.go | 49 ++++++++++++++++++++++--- internal/incoming/web/api_files_test.go | 28 ++++++++++++-- internal/pipeline/aggregate.go | 17 ++++++++- internal/pipeline/file_receiver.go | 25 ++++++++----- internal/pipeline/merging.go | 21 +++++++++-- pkg/models/events.go | 7 ++++ 8 files changed, 131 insertions(+), 24 deletions(-) diff --git a/internal/environment.go b/internal/environment.go index e4a8f81..42c3a69 100644 --- a/internal/environment.go +++ b/internal/environment.go @@ -176,7 +176,7 @@ func NewEnvironment(env *Environment) (*Environment, error) { env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute) // append HTTP routes - web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles).AppendRoutes(env.PublicRouter) + web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, inmemEvents, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter) // shard mapping HTTP routes shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository) diff --git a/internal/incoming/models.go b/internal/incoming/models.go index 00777d0..37dafa9 100644 --- a/internal/incoming/models.go +++ b/internal/incoming/models.go @@ -46,3 +46,9 @@ type CancelACHFile struct { FileID string `json:"id"` ShardKey string `json:"shardKey"` } + +type FileCancellationResponse struct { + FileID string `json:"id"` + ShardKey string `json:"shardKey"` + Successful bool `json:"successful"` +} diff --git a/internal/incoming/web/api_files.go b/internal/incoming/web/api_files.go index 9cf7775..116b8f9 100644 --- a/internal/incoming/web/api_files.go +++ b/internal/incoming/web/api_files.go @@ -20,10 +20,12 @@ package web import ( "bytes" "context" + "encoding/json" "fmt" "io" "net/http" "strings" + "sync" "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/incoming" @@ -40,18 +42,42 @@ import ( "gocloud.dev/pubsub" ) -func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController { - return &FilesController{ +func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher, cancellationResponses chan models.FileCancellationResponse) *FilesController { + controller := &FilesController{ logger: logger, cfg: cfg, publisher: pub, + + activeCancellations: make(map[string]chan models.FileCancellationResponse), + cancellationResponses: cancellationResponses, } + controller.listenForCancellations() + return controller } type FilesController struct { logger log.Logger cfg service.HTTPConfig publisher stream.Publisher + + cancellationLock sync.RWMutex + activeCancellations map[string]chan models.FileCancellationResponse + cancellationResponses chan models.FileCancellationResponse +} + +func (c *FilesController) listenForCancellations() { + go func() { + select { + case cancel := <-c.cancellationResponses: + c.cancellationLock.Lock() + out, exists := c.activeCancellations[cancel.FileID] + if exists { + delete(c.activeCancellations, cancel.FileID) + out <- cancel + } + c.cancellationLock.Unlock() + } + }() } func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router { @@ -166,7 +192,13 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque )) defer span.End() - if err := c.cancelFile(ctx, shardKey, fileID); err != nil { + waiter := make(chan models.FileCancellationResponse, 1) + defer func() { + close(waiter) + }() + + err := c.cancelFile(ctx, shardKey, fileID, waiter) + if err != nil { c.logger.With(log.Fields{ "shard_key": log.String(shardKey), "file_id": log.String(fileID), @@ -176,13 +208,21 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque return } + response := <-waiter + + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) } -func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string) error { +func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string, waiter chan models.FileCancellationResponse) error { // Remove .ach suffix if the request added it fileID = strings.TrimSuffix(fileID, ".ach") + c.cancellationLock.RLock() + c.activeCancellations[fmt.Sprintf("%s.ach", fileID)] = waiter + c.cancellationLock.RUnlock() + bs, err := compliance.Protect(c.cfg.Transform, models.Event{ Event: incoming.CancelACHFile{ FileID: fileID, @@ -201,5 +241,4 @@ func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID strin Body: bs, Metadata: meta, }) - } diff --git a/internal/incoming/web/api_files_test.go b/internal/incoming/web/api_files_test.go index 606c75a..42f6aa6 100644 --- a/internal/incoming/web/api_files_test.go +++ b/internal/incoming/web/api_files_test.go @@ -20,12 +20,14 @@ package web import ( "bytes" "context" + "encoding/json" "net/http" "net/http/httptest" "os" "path/filepath" "strings" "testing" + "time" "github.com/moov-io/achgateway/internal/incoming" "github.com/moov-io/achgateway/internal/incoming/stream/streamtest" @@ -40,7 +42,8 @@ import ( func TestCreateFileHandler(t *testing.T) { topic, sub := streamtest.InmemStream(t) - controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic) + cancellationResponses := make(chan models.FileCancellationResponse) + controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses) r := mux.NewRouter() controller.AppendRoutes(r) @@ -68,7 +71,8 @@ func TestCreateFileHandler(t *testing.T) { func TestCreateFileHandlerErr(t *testing.T) { topic, _ := streamtest.InmemStream(t) - controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic) + cancellationResponses := make(chan models.FileCancellationResponse) + controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses) r := mux.NewRouter() controller.AppendRoutes(r) @@ -84,13 +88,24 @@ func TestCreateFileHandlerErr(t *testing.T) { func TestCancelFileHandler(t *testing.T) { topic, sub := streamtest.InmemStream(t) - controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic) + cancellationResponses := make(chan models.FileCancellationResponse) + controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses) r := mux.NewRouter() controller.AppendRoutes(r) // Cancel our file req := httptest.NewRequest("DELETE", "/shards/s2/files/f2.ach", nil) + // Setup the response + go func() { + time.Sleep(time.Second) + cancellationResponses <- models.FileCancellationResponse{ + FileID: "f2.ach", + ShardKey: "s2", + Successful: true, + } + }() + w := httptest.NewRecorder() r.ServeHTTP(w, req) require.Equal(t, http.StatusOK, w.Code) @@ -104,4 +119,11 @@ func TestCancelFileHandler(t *testing.T) { require.Equal(t, "f2", file.FileID) // make sure .ach suffix is trimmed require.Equal(t, "s2", file.ShardKey) + + var response incoming.FileCancellationResponse + json.NewDecoder(w.Body).Decode(&response) + + require.Equal(t, "f2.ach", response.FileID) + require.Equal(t, "s2", response.ShardKey) + require.True(t, response.Successful) } diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index e1b5626..5d4cbc8 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -163,8 +163,21 @@ func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) e return xfagg.merger.HandleXfer(ctx, msg) } -func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) error { - return xfagg.merger.HandleCancel(ctx, msg) +func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) { + response, err := xfagg.merger.HandleCancel(ctx, msg) + if err != nil { + return err + } + + // Send the response back + out := models.FileCancellationResponse(response) + err = xfagg.eventEmitter.Send(ctx, models.Event{ + Event: out, + }) + if err != nil { + return out, fmt.Errorf("problem emitting file cancellation response: %w", err) + } + return out, nil } func (xfagg *aggregator) withEachFile(when time.Time) error { diff --git a/internal/pipeline/file_receiver.go b/internal/pipeline/file_receiver.go index 45f1517..70eaf5a 100644 --- a/internal/pipeline/file_receiver.go +++ b/internal/pipeline/file_receiver.go @@ -62,6 +62,8 @@ type FileReceiver struct { httpFiles stream.Subscription streamFiles stream.Subscription + CancellationResponses chan models.FileCancellationResponse + transformConfig *models.TransformConfig } @@ -77,15 +79,16 @@ func newFileReceiver( ) (*FileReceiver, error) { // Create FileReceiver and connect streamFiles fr := &FileReceiver{ - logger: logger, - cfg: cfg, - eventEmitter: eventEmitter, - defaultShardName: cfg.Sharding.Default, - shardRepository: shardRepository, - shardAggregators: shardAggregators, - fileRepository: fileRepository, - httpFiles: httpFiles, - transformConfig: transformConfig, + logger: logger, + cfg: cfg, + eventEmitter: eventEmitter, + defaultShardName: cfg.Sharding.Default, + shardRepository: shardRepository, + shardAggregators: shardAggregators, + fileRepository: fileRepository, + httpFiles: httpFiles, + CancellationResponses: make(chan models.FileCancellationResponse, 1000), + transformConfig: transformConfig, } err := fr.reconnect() if err != nil { @@ -511,11 +514,13 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel logger.Log("begin canceling ACH file") evt := incoming.CancelACHFile(*cancel) - err = agg.cancelFile(ctx, evt) + response, err := agg.cancelFile(ctx, evt) if err != nil { return logger.Error().LogErrorf("problem canceling file: %v", err).Err() } + fr.CancellationResponses <- response + logger.Log("finished cancel of file") return nil } diff --git a/internal/pipeline/merging.go b/internal/pipeline/merging.go index 5d32eff..288d88c 100644 --- a/internal/pipeline/merging.go +++ b/internal/pipeline/merging.go @@ -53,7 +53,7 @@ import ( // each merged file for an upload. type XferMerging interface { HandleXfer(ctx context.Context, xfer incoming.ACHFile) error - HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error + HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) WithEachMerged(ctx context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error) } @@ -127,15 +127,30 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF return nil } -func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error { +func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) { path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", cancel.FileID)) + // Check if the file exists already + file, _ := m.storage.Open(path) + if file != nil { + defer file.Close() + } + // Write the canceled File err := m.storage.ReplaceFile(path, path+".canceled") if err != nil { telemetry.RecordError(ctx, err) } - return err + + // File was found and we didn't error during the rename + var successful bool = file != nil && err == nil + + out := incoming.FileCancellationResponse{ + FileID: cancel.FileID, + ShardKey: cancel.ShardKey, + Successful: successful, + } + return out, err } func (m *filesystemMerging) isolateMergableDir(ctx context.Context) (string, error) { diff --git a/pkg/models/events.go b/pkg/models/events.go index eae369b..6c1099a 100644 --- a/pkg/models/events.go +++ b/pkg/models/events.go @@ -116,6 +116,10 @@ func ReadWithOpts(data []byte, opts *ach.ValidateOpts) (*Event, error) { var file CancelACHFile event.Event = &file + case "FileCancellationResponse": + var response FileCancellationResponse + event.Event = &response + case "FileUploaded": var file FileUploaded event.Event = &file @@ -287,6 +291,9 @@ func (evt *InvalidQueueFile) SetValidation(opts *ach.ValidateOpts) { // See the Event struct for wrapping steps. type CancelACHFile incoming.CancelACHFile +// FileCancellationResponse is a response to the CancelACHFile event signaling if the cancellation was successful. +type FileCancellationResponse incoming.FileCancellationResponse + // FileUploaded is an event sent after a queued file has been uploaded to the ODFI. // The entries and batches may have been merged into a larger file to optimize on cost, // network performance, or other configuration. From 96751d02893d85e70fbfc2609a95ae66c01979e3 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Thu, 7 Mar 2024 14:04:57 -0600 Subject: [PATCH 2/4] build: update redpanda image to v22.3.25 --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index b0e5ce4..ec687c6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,7 +42,7 @@ services: - "demo:password:::inbound,outbound,reconciliation,returned" kafka1: - image: docker.redpanda.com/vectorized/redpanda:v22.3.21 + image: docker.redpanda.com/vectorized/redpanda:v22.3.25 container_name: kafka1 healthcheck: { From dd50370ea964e41a5b308741790dcfbabdbb92f0 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Thu, 7 Mar 2024 14:06:47 -0600 Subject: [PATCH 3/4] chore: fix build --- internal/environment.go | 2 +- internal/incoming/web/api_files.go | 6 ++++-- internal/pipeline/aggregate.go | 2 +- internal/pipeline/mock_xfer_merging.go | 11 ++++++----- internal/test/upload_test.go | 8 ++++---- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/internal/environment.go b/internal/environment.go index 42c3a69..87d772f 100644 --- a/internal/environment.go +++ b/internal/environment.go @@ -176,7 +176,7 @@ func NewEnvironment(env *Environment) (*Environment, error) { env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute) // append HTTP routes - web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, inmemEvents, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter) + web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter) // shard mapping HTTP routes shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository) diff --git a/internal/incoming/web/api_files.go b/internal/incoming/web/api_files.go index 116b8f9..4f714a6 100644 --- a/internal/incoming/web/api_files.go +++ b/internal/incoming/web/api_files.go @@ -67,8 +67,10 @@ type FilesController struct { func (c *FilesController) listenForCancellations() { go func() { - select { - case cancel := <-c.cancellationResponses: + for { + // Wait for a message + cancel := <-c.cancellationResponses + c.cancellationLock.Lock() out, exists := c.activeCancellations[cancel.FileID] if exists { diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index 5d4cbc8..38b3cc0 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -166,7 +166,7 @@ func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) e func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) { response, err := xfagg.merger.HandleCancel(ctx, msg) if err != nil { - return err + return models.FileCancellationResponse{}, err } // Send the response back diff --git a/internal/pipeline/mock_xfer_merging.go b/internal/pipeline/mock_xfer_merging.go index 32241ab..47d10ca 100644 --- a/internal/pipeline/mock_xfer_merging.go +++ b/internal/pipeline/mock_xfer_merging.go @@ -26,9 +26,10 @@ import ( ) type MockXferMerging struct { - LatestFile *incoming.ACHFile - LatestCancel *incoming.CancelACHFile - processed *processedFiles + LatestFile *incoming.ACHFile + LatestCancel *incoming.CancelACHFile + CancellationResponse incoming.FileCancellationResponse + processed *processedFiles Err error } @@ -38,9 +39,9 @@ func (merge *MockXferMerging) HandleXfer(_ context.Context, xfer incoming.ACHFil return merge.Err } -func (merge *MockXferMerging) HandleCancel(_ context.Context, cancel incoming.CancelACHFile) error { +func (merge *MockXferMerging) HandleCancel(_ context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) { merge.LatestCancel = &cancel - return merge.Err + return merge.CancellationResponse, merge.Err } func (merge *MockXferMerging) WithEachMerged(_ context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error) { diff --git a/internal/test/upload_test.go b/internal/test/upload_test.go index ff5ba3f..b0adb95 100644 --- a/internal/test/upload_test.go +++ b/internal/test/upload_test.go @@ -167,10 +167,6 @@ func TestUploads(t *testing.T) { require.NoError(t, err) defer streamTopic.Shutdown(context.Background()) - fileController := web.NewFilesController(logger, service.HTTPConfig{}, httpPub) - r := mux.NewRouter() - fileController.AppendRoutes(r) - outboundPath := setupTestDirectory(t, uploadConf) fileRepo := &files.MockRepository{} @@ -178,6 +174,10 @@ func TestUploads(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { fileReceiver.Shutdown() }) + fileController := web.NewFilesController(logger, service.HTTPConfig{}, httpPub, fileReceiver.CancellationResponses) + r := mux.NewRouter() + fileController.AppendRoutes(r) + adminServer := admintest.Server(t) fileReceiver.RegisterAdminRoutes(adminServer) From 450a04b06287e9f1fe91c9a04df4b3914fa2c7a1 Mon Sep 17 00:00:00 2001 From: Adam Shannon Date: Fri, 8 Mar 2024 15:38:06 -0600 Subject: [PATCH 4/4] incoming/web: better wait timeout for FileCancellationResponse --- internal/incoming/web/api_files.go | 34 +++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 10 deletions(-) diff --git a/internal/incoming/web/api_files.go b/internal/incoming/web/api_files.go index 4f714a6..9287c00 100644 --- a/internal/incoming/web/api_files.go +++ b/internal/incoming/web/api_files.go @@ -26,6 +26,7 @@ import ( "net/http" "strings" "sync" + "time" "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/incoming" @@ -60,22 +61,26 @@ type FilesController struct { cfg service.HTTPConfig publisher stream.Publisher - cancellationLock sync.RWMutex + cancellationLock sync.Mutex activeCancellations map[string]chan models.FileCancellationResponse cancellationResponses chan models.FileCancellationResponse } func (c *FilesController) listenForCancellations() { + c.logger.Info().Log("listening for cancellation responses") go func() { for { // Wait for a message cancel := <-c.cancellationResponses + c.logger.Info().Logf("received cancellation response: %#v", cancel) + + fileID := strings.TrimSuffix(cancel.FileID, ".ach") c.cancellationLock.Lock() - out, exists := c.activeCancellations[cancel.FileID] + out, exists := c.activeCancellations[fileID] if exists { - delete(c.activeCancellations, cancel.FileID) out <- cancel + delete(c.activeCancellations, fileID) } c.cancellationLock.Unlock() } @@ -195,9 +200,7 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque defer span.End() waiter := make(chan models.FileCancellationResponse, 1) - defer func() { - close(waiter) - }() + defer func() { close(waiter) }() err := c.cancelFile(ctx, shardKey, fileID, waiter) if err != nil { @@ -210,7 +213,18 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque return } - response := <-waiter + var response models.FileCancellationResponse + select { + case resp := <-waiter: + response = resp + + case <-time.After(10 * time.Second): + response = models.FileCancellationResponse{ + FileID: fileID, + ShardKey: shardKey, + Successful: false, + } + } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) @@ -221,9 +235,9 @@ func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID strin // Remove .ach suffix if the request added it fileID = strings.TrimSuffix(fileID, ".ach") - c.cancellationLock.RLock() - c.activeCancellations[fmt.Sprintf("%s.ach", fileID)] = waiter - c.cancellationLock.RUnlock() + c.cancellationLock.Lock() + c.activeCancellations[fileID] = waiter + c.cancellationLock.Unlock() bs, err := compliance.Protect(c.cfg.Transform, models.Event{ Event: incoming.CancelACHFile{