diff --git a/internal/environment.go b/internal/environment.go index e4a8f81..42c3a69 100644 --- a/internal/environment.go +++ b/internal/environment.go @@ -176,7 +176,7 @@ func NewEnvironment(env *Environment) (*Environment, error) { env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute) // append HTTP routes - web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles).AppendRoutes(env.PublicRouter) + web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, inmemEvents, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter) // shard mapping HTTP routes shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository) diff --git a/internal/incoming/models.go b/internal/incoming/models.go index 00777d0..37dafa9 100644 --- a/internal/incoming/models.go +++ b/internal/incoming/models.go @@ -46,3 +46,9 @@ type CancelACHFile struct { FileID string `json:"id"` ShardKey string `json:"shardKey"` } + +type FileCancellationResponse struct { + FileID string `json:"id"` + ShardKey string `json:"shardKey"` + Successful bool `json:"successful"` +} diff --git a/internal/incoming/web/api_files.go b/internal/incoming/web/api_files.go index 9cf7775..100ce2a 100644 --- a/internal/incoming/web/api_files.go +++ b/internal/incoming/web/api_files.go @@ -20,10 +20,12 @@ package web import ( "bytes" "context" + "encoding/json" "fmt" "io" "net/http" "strings" + "sync" "github.com/moov-io/ach" "github.com/moov-io/achgateway/internal/incoming" @@ -40,11 +42,13 @@ import ( "gocloud.dev/pubsub" ) -func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController { +func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher, cancellationResponses chan models.FileCancellationResponse) *FilesController { return &FilesController{ logger: logger, cfg: cfg, publisher: pub, + + cancellationResponses: cancellationResponses, } } @@ -52,6 +56,25 @@ type FilesController struct { logger log.Logger cfg service.HTTPConfig publisher stream.Publisher + + cancellationLock sync.Mutex + activeCancellations map[string]chan models.FileCancellationResponse + cancellationResponses chan models.FileCancellationResponse +} + +func (c *FilesController) listenForCancellations() { + go func() { + select { + case cancel := <-c.cancellationResponses: + c.cancellationLock.Lock() + out, exists := c.activeCancellations[cancel.FileID] + if exists { + delete(c.activeCancellations[cancel.FileID]) + out <- cancel + } + c.cancellationLock.UnLock() + } + }() } func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router { @@ -166,7 +189,13 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque )) defer span.End() - if err := c.cancelFile(ctx, shardKey, fileID); err != nil { + waiter := make(chan models.FileCancellationResponse, 1) + defer func() { + close(waiter) + }() + + err := c.cancelFile(ctx, shardKey, fileID, waiter) + if err != nil { c.logger.With(log.Fields{ "shard_key": log.String(shardKey), "file_id": log.String(fileID), @@ -176,13 +205,20 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque return } + response := <-waiter + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) } -func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string) error { +func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string, waiter chan models.FileCancellationResponse) error { // Remove .ach suffix if the request added it fileID = strings.TrimSuffix(fileID, ".ach") + c.cancellationLock.Lock() + c.activeCancellations[fileID] = waiter + c.cancellationLock.Unlock() + bs, err := compliance.Protect(c.cfg.Transform, models.Event{ Event: incoming.CancelACHFile{ FileID: fileID, @@ -201,5 +237,4 @@ func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID strin Body: bs, Metadata: meta, }) - } diff --git a/internal/pipeline/aggregate.go b/internal/pipeline/aggregate.go index e1b5626..5d4cbc8 100644 --- a/internal/pipeline/aggregate.go +++ b/internal/pipeline/aggregate.go @@ -163,8 +163,21 @@ func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) e return xfagg.merger.HandleXfer(ctx, msg) } -func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) error { - return xfagg.merger.HandleCancel(ctx, msg) +func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) { + response, err := xfagg.merger.HandleCancel(ctx, msg) + if err != nil { + return err + } + + // Send the response back + out := models.FileCancellationResponse(response) + err = xfagg.eventEmitter.Send(ctx, models.Event{ + Event: out, + }) + if err != nil { + return out, fmt.Errorf("problem emitting file cancellation response: %w", err) + } + return out, nil } func (xfagg *aggregator) withEachFile(when time.Time) error { diff --git a/internal/pipeline/file_receiver.go b/internal/pipeline/file_receiver.go index 45f1517..70eaf5a 100644 --- a/internal/pipeline/file_receiver.go +++ b/internal/pipeline/file_receiver.go @@ -62,6 +62,8 @@ type FileReceiver struct { httpFiles stream.Subscription streamFiles stream.Subscription + CancellationResponses chan models.FileCancellationResponse + transformConfig *models.TransformConfig } @@ -77,15 +79,16 @@ func newFileReceiver( ) (*FileReceiver, error) { // Create FileReceiver and connect streamFiles fr := &FileReceiver{ - logger: logger, - cfg: cfg, - eventEmitter: eventEmitter, - defaultShardName: cfg.Sharding.Default, - shardRepository: shardRepository, - shardAggregators: shardAggregators, - fileRepository: fileRepository, - httpFiles: httpFiles, - transformConfig: transformConfig, + logger: logger, + cfg: cfg, + eventEmitter: eventEmitter, + defaultShardName: cfg.Sharding.Default, + shardRepository: shardRepository, + shardAggregators: shardAggregators, + fileRepository: fileRepository, + httpFiles: httpFiles, + CancellationResponses: make(chan models.FileCancellationResponse, 1000), + transformConfig: transformConfig, } err := fr.reconnect() if err != nil { @@ -511,11 +514,13 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel logger.Log("begin canceling ACH file") evt := incoming.CancelACHFile(*cancel) - err = agg.cancelFile(ctx, evt) + response, err := agg.cancelFile(ctx, evt) if err != nil { return logger.Error().LogErrorf("problem canceling file: %v", err).Err() } + fr.CancellationResponses <- response + logger.Log("finished cancel of file") return nil } diff --git a/internal/pipeline/merging.go b/internal/pipeline/merging.go index 5d32eff..288d88c 100644 --- a/internal/pipeline/merging.go +++ b/internal/pipeline/merging.go @@ -53,7 +53,7 @@ import ( // each merged file for an upload. type XferMerging interface { HandleXfer(ctx context.Context, xfer incoming.ACHFile) error - HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error + HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) WithEachMerged(ctx context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error) } @@ -127,15 +127,30 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF return nil } -func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error { +func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) { path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", cancel.FileID)) + // Check if the file exists already + file, _ := m.storage.Open(path) + if file != nil { + defer file.Close() + } + // Write the canceled File err := m.storage.ReplaceFile(path, path+".canceled") if err != nil { telemetry.RecordError(ctx, err) } - return err + + // File was found and we didn't error during the rename + var successful bool = file != nil && err == nil + + out := incoming.FileCancellationResponse{ + FileID: cancel.FileID, + ShardKey: cancel.ShardKey, + Successful: successful, + } + return out, err } func (m *filesystemMerging) isolateMergableDir(ctx context.Context) (string, error) { diff --git a/pkg/models/events.go b/pkg/models/events.go index eae369b..6c1099a 100644 --- a/pkg/models/events.go +++ b/pkg/models/events.go @@ -116,6 +116,10 @@ func ReadWithOpts(data []byte, opts *ach.ValidateOpts) (*Event, error) { var file CancelACHFile event.Event = &file + case "FileCancellationResponse": + var response FileCancellationResponse + event.Event = &response + case "FileUploaded": var file FileUploaded event.Event = &file @@ -287,6 +291,9 @@ func (evt *InvalidQueueFile) SetValidation(opts *ach.ValidateOpts) { // See the Event struct for wrapping steps. type CancelACHFile incoming.CancelACHFile +// FileCancellationResponse is a response to the CancelACHFile event signaling if the cancellation was successful. +type FileCancellationResponse incoming.FileCancellationResponse + // FileUploaded is an event sent after a queued file has been uploaded to the ODFI. // The entries and batches may have been merged into a larger file to optimize on cost, // network performance, or other configuration.