Skip to content

Commit

Permalink
feat: respond to CancelACHFile events with FileCancellationResponse
Browse files Browse the repository at this point in the history
Issue: #141
Issue: #142
  • Loading branch information
adamdecaf committed Mar 6, 2024
1 parent 8722d44 commit 00ba2ed
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 20 deletions.
2 changes: 1 addition & 1 deletion internal/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewEnvironment(env *Environment) (*Environment, error) {
env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute)

// append HTTP routes
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles).AppendRoutes(env.PublicRouter)
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, inmemEvents, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter)

// shard mapping HTTP routes
shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository)
Expand Down
6 changes: 6 additions & 0 deletions internal/incoming/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ type CancelACHFile struct {
FileID string `json:"id"`
ShardKey string `json:"shardKey"`
}

type FileCancellationResponse struct {
FileID string `json:"id"`
ShardKey string `json:"shardKey"`
Successful bool `json:"successful"`
}
43 changes: 39 additions & 4 deletions internal/incoming/web/api_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ package web
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/incoming"
Expand All @@ -40,18 +42,39 @@ import (
"gocloud.dev/pubsub"
)

func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController {
func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher, cancellationResponses chan models.FileCancellationResponse) *FilesController {
return &FilesController{
logger: logger,
cfg: cfg,
publisher: pub,

cancellationResponses: cancellationResponses,
}
}

type FilesController struct {
logger log.Logger
cfg service.HTTPConfig
publisher stream.Publisher

cancellationLock sync.Mutex
activeCancellations map[string]chan models.FileCancellationResponse
cancellationResponses chan models.FileCancellationResponse
}

func (c *FilesController) listenForCancellations() {
go func() {
select {
case cancel := <-c.cancellationResponses:
c.cancellationLock.Lock()
out, exists := c.activeCancellations[cancel.FileID]
if exists {
delete(c.activeCancellations[cancel.FileID])

Check failure on line 72 in internal/incoming/web/api_files.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

invalid operation: not enough arguments for delete(c.activeCancellations[cancel.FileID]) (expected 2, found 1)
out <- cancel
}
c.cancellationLock.UnLock()

Check failure on line 75 in internal/incoming/web/api_files.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

c.cancellationLock.UnLock undefined (type sync.Mutex has no field or method UnLock)
}
}()
}

func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router {
Expand Down Expand Up @@ -166,7 +189,13 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque
))
defer span.End()

if err := c.cancelFile(ctx, shardKey, fileID); err != nil {
waiter := make(chan models.FileCancellationResponse, 1)
defer func() {
close(waiter)
}()

err := c.cancelFile(ctx, shardKey, fileID, waiter)
if err != nil {
c.logger.With(log.Fields{
"shard_key": log.String(shardKey),
"file_id": log.String(fileID),
Expand All @@ -176,13 +205,20 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque
return
}

response := <-waiter
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
}

func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string) error {
func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string, waiter chan models.FileCancellationResponse) error {
// Remove .ach suffix if the request added it
fileID = strings.TrimSuffix(fileID, ".ach")

c.cancellationLock.Lock()
c.activeCancellations[fileID] = waiter
c.cancellationLock.Unlock()

bs, err := compliance.Protect(c.cfg.Transform, models.Event{
Event: incoming.CancelACHFile{
FileID: fileID,
Expand All @@ -201,5 +237,4 @@ func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID strin
Body: bs,
Metadata: meta,
})

}
17 changes: 15 additions & 2 deletions internal/pipeline/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,21 @@ func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) e
return xfagg.merger.HandleXfer(ctx, msg)
}

func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) error {
return xfagg.merger.HandleCancel(ctx, msg)
func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) {
response, err := xfagg.merger.HandleCancel(ctx, msg)
if err != nil {
return err

Check failure on line 169 in internal/pipeline/aggregate.go

View workflow job for this annotation

GitHub Actions / CodeQL-Build

not enough return values
}

// Send the response back
out := models.FileCancellationResponse(response)
err = xfagg.eventEmitter.Send(ctx, models.Event{
Event: out,
})
if err != nil {
return out, fmt.Errorf("problem emitting file cancellation response: %w", err)
}
return out, nil
}

func (xfagg *aggregator) withEachFile(when time.Time) error {
Expand Down
25 changes: 15 additions & 10 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type FileReceiver struct {
httpFiles stream.Subscription
streamFiles stream.Subscription

CancellationResponses chan models.FileCancellationResponse

transformConfig *models.TransformConfig
}

Expand All @@ -77,15 +79,16 @@ func newFileReceiver(
) (*FileReceiver, error) {
// Create FileReceiver and connect streamFiles
fr := &FileReceiver{
logger: logger,
cfg: cfg,
eventEmitter: eventEmitter,
defaultShardName: cfg.Sharding.Default,
shardRepository: shardRepository,
shardAggregators: shardAggregators,
fileRepository: fileRepository,
httpFiles: httpFiles,
transformConfig: transformConfig,
logger: logger,
cfg: cfg,
eventEmitter: eventEmitter,
defaultShardName: cfg.Sharding.Default,
shardRepository: shardRepository,
shardAggregators: shardAggregators,
fileRepository: fileRepository,
httpFiles: httpFiles,
CancellationResponses: make(chan models.FileCancellationResponse, 1000),
transformConfig: transformConfig,
}
err := fr.reconnect()
if err != nil {
Expand Down Expand Up @@ -511,11 +514,13 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel
logger.Log("begin canceling ACH file")

evt := incoming.CancelACHFile(*cancel)
err = agg.cancelFile(ctx, evt)
response, err := agg.cancelFile(ctx, evt)
if err != nil {
return logger.Error().LogErrorf("problem canceling file: %v", err).Err()
}

fr.CancellationResponses <- response

logger.Log("finished cancel of file")
return nil
}
21 changes: 18 additions & 3 deletions internal/pipeline/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
// each merged file for an upload.
type XferMerging interface {
HandleXfer(ctx context.Context, xfer incoming.ACHFile) error
HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error
HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error)

WithEachMerged(ctx context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error)
}
Expand Down Expand Up @@ -127,15 +127,30 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
return nil
}

func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error {
func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) {
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", cancel.FileID))

// Check if the file exists already
file, _ := m.storage.Open(path)
if file != nil {
defer file.Close()
}

// Write the canceled File
err := m.storage.ReplaceFile(path, path+".canceled")
if err != nil {
telemetry.RecordError(ctx, err)
}
return err

// File was found and we didn't error during the rename
var successful bool = file != nil && err == nil

out := incoming.FileCancellationResponse{
FileID: cancel.FileID,
ShardKey: cancel.ShardKey,
Successful: successful,
}
return out, err
}

func (m *filesystemMerging) isolateMergableDir(ctx context.Context) (string, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func ReadWithOpts(data []byte, opts *ach.ValidateOpts) (*Event, error) {
var file CancelACHFile
event.Event = &file

case "FileCancellationResponse":
var response FileCancellationResponse
event.Event = &response

case "FileUploaded":
var file FileUploaded
event.Event = &file
Expand Down Expand Up @@ -287,6 +291,9 @@ func (evt *InvalidQueueFile) SetValidation(opts *ach.ValidateOpts) {
// See the Event struct for wrapping steps.
type CancelACHFile incoming.CancelACHFile

// FileCancellationResponse is a response to the CancelACHFile event signaling if the cancellation was successful.
type FileCancellationResponse incoming.FileCancellationResponse

// FileUploaded is an event sent after a queued file has been uploaded to the ODFI.
// The entries and batches may have been merged into a larger file to optimize on cost,
// network performance, or other configuration.
Expand Down

0 comments on commit 00ba2ed

Please sign in to comment.