Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: respond to CancelACHFile events with FileCancellationResponse #228

Merged
merged 4 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
- "demo:password:::inbound,outbound,reconciliation,returned"

kafka1:
image: docker.redpanda.com/vectorized/redpanda:v22.3.21
image: docker.redpanda.com/vectorized/redpanda:v22.3.25
container_name: kafka1
healthcheck:
{
Expand Down
2 changes: 1 addition & 1 deletion internal/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func NewEnvironment(env *Environment) (*Environment, error) {
env.PublicRouter.Path("/ping").Methods("GET").HandlerFunc(addPingRoute)

// append HTTP routes
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles).AppendRoutes(env.PublicRouter)
web.NewFilesController(env.Config.Logger, env.Config.Inbound.HTTP, httpFiles, fileReceiver.CancellationResponses).AppendRoutes(env.PublicRouter)

// shard mapping HTTP routes
shardMappingService, err := shards.NewShardMappingService(stime.NewStaticTimeService(), env.Config.Logger, shardRepository)
Expand Down
6 changes: 6 additions & 0 deletions internal/incoming/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ type CancelACHFile struct {
FileID string `json:"id"`
ShardKey string `json:"shardKey"`
}

type FileCancellationResponse struct {
FileID string `json:"id"`
ShardKey string `json:"shardKey"`
Successful bool `json:"successful"`
}
65 changes: 60 additions & 5 deletions internal/incoming/web/api_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package web
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
"time"

"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/incoming"
Expand All @@ -40,18 +43,48 @@ import (
"gocloud.dev/pubsub"
)

func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher) *FilesController {
return &FilesController{
func NewFilesController(logger log.Logger, cfg service.HTTPConfig, pub stream.Publisher, cancellationResponses chan models.FileCancellationResponse) *FilesController {
controller := &FilesController{
logger: logger,
cfg: cfg,
publisher: pub,

activeCancellations: make(map[string]chan models.FileCancellationResponse),
cancellationResponses: cancellationResponses,
}
controller.listenForCancellations()
return controller
}

type FilesController struct {
logger log.Logger
cfg service.HTTPConfig
publisher stream.Publisher

cancellationLock sync.Mutex
activeCancellations map[string]chan models.FileCancellationResponse
cancellationResponses chan models.FileCancellationResponse
}

func (c *FilesController) listenForCancellations() {
c.logger.Info().Log("listening for cancellation responses")
go func() {
for {
// Wait for a message
cancel := <-c.cancellationResponses
c.logger.Info().Logf("received cancellation response: %#v", cancel)

fileID := strings.TrimSuffix(cancel.FileID, ".ach")

c.cancellationLock.Lock()
out, exists := c.activeCancellations[fileID]
if exists {
out <- cancel
delete(c.activeCancellations, fileID)
}
c.cancellationLock.Unlock()
}
}()
}

func (c *FilesController) AppendRoutes(router *mux.Router) *mux.Router {
Expand Down Expand Up @@ -166,7 +199,11 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque
))
defer span.End()

if err := c.cancelFile(ctx, shardKey, fileID); err != nil {
waiter := make(chan models.FileCancellationResponse, 1)
defer func() { close(waiter) }()

err := c.cancelFile(ctx, shardKey, fileID, waiter)
if err != nil {
c.logger.With(log.Fields{
"shard_key": log.String(shardKey),
"file_id": log.String(fileID),
Expand All @@ -176,13 +213,32 @@ func (c *FilesController) CancelFileHandler(w http.ResponseWriter, r *http.Reque
return
}

var response models.FileCancellationResponse
select {
case resp := <-waiter:
response = resp

case <-time.After(10 * time.Second):
response = models.FileCancellationResponse{
FileID: fileID,
ShardKey: shardKey,
Successful: false,
}
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
}

func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string) error {
func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID string, waiter chan models.FileCancellationResponse) error {
// Remove .ach suffix if the request added it
fileID = strings.TrimSuffix(fileID, ".ach")

c.cancellationLock.Lock()
c.activeCancellations[fileID] = waiter
c.cancellationLock.Unlock()

bs, err := compliance.Protect(c.cfg.Transform, models.Event{
Event: incoming.CancelACHFile{
FileID: fileID,
Expand All @@ -201,5 +257,4 @@ func (c *FilesController) cancelFile(ctx context.Context, shardKey, fileID strin
Body: bs,
Metadata: meta,
})

}
28 changes: 25 additions & 3 deletions internal/incoming/web/api_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package web
import (
"bytes"
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strings"
"testing"
"time"

"github.com/moov-io/achgateway/internal/incoming"
"github.com/moov-io/achgateway/internal/incoming/stream/streamtest"
Expand All @@ -40,7 +42,8 @@ import (
func TestCreateFileHandler(t *testing.T) {
topic, sub := streamtest.InmemStream(t)

controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic)
cancellationResponses := make(chan models.FileCancellationResponse)
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
r := mux.NewRouter()
controller.AppendRoutes(r)

Expand Down Expand Up @@ -68,7 +71,8 @@ func TestCreateFileHandler(t *testing.T) {
func TestCreateFileHandlerErr(t *testing.T) {
topic, _ := streamtest.InmemStream(t)

controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic)
cancellationResponses := make(chan models.FileCancellationResponse)
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
r := mux.NewRouter()
controller.AppendRoutes(r)

Expand All @@ -84,13 +88,24 @@ func TestCreateFileHandlerErr(t *testing.T) {
func TestCancelFileHandler(t *testing.T) {
topic, sub := streamtest.InmemStream(t)

controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic)
cancellationResponses := make(chan models.FileCancellationResponse)
controller := NewFilesController(log.NewTestLogger(), service.HTTPConfig{}, topic, cancellationResponses)
r := mux.NewRouter()
controller.AppendRoutes(r)

// Cancel our file
req := httptest.NewRequest("DELETE", "/shards/s2/files/f2.ach", nil)

// Setup the response
go func() {
time.Sleep(time.Second)
cancellationResponses <- models.FileCancellationResponse{
FileID: "f2.ach",
ShardKey: "s2",
Successful: true,
}
}()

w := httptest.NewRecorder()
r.ServeHTTP(w, req)
require.Equal(t, http.StatusOK, w.Code)
Expand All @@ -104,4 +119,11 @@ func TestCancelFileHandler(t *testing.T) {

require.Equal(t, "f2", file.FileID) // make sure .ach suffix is trimmed
require.Equal(t, "s2", file.ShardKey)

var response incoming.FileCancellationResponse
json.NewDecoder(w.Body).Decode(&response)

require.Equal(t, "f2.ach", response.FileID)
require.Equal(t, "s2", response.ShardKey)
require.True(t, response.Successful)
}
17 changes: 15 additions & 2 deletions internal/pipeline/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,21 @@ func (xfagg *aggregator) acceptFile(ctx context.Context, msg incoming.ACHFile) e
return xfagg.merger.HandleXfer(ctx, msg)
}

func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) error {
return xfagg.merger.HandleCancel(ctx, msg)
func (xfagg *aggregator) cancelFile(ctx context.Context, msg incoming.CancelACHFile) (models.FileCancellationResponse, error) {
response, err := xfagg.merger.HandleCancel(ctx, msg)
if err != nil {
return models.FileCancellationResponse{}, err
}

// Send the response back
out := models.FileCancellationResponse(response)
err = xfagg.eventEmitter.Send(ctx, models.Event{
Event: out,
})
if err != nil {
return out, fmt.Errorf("problem emitting file cancellation response: %w", err)
}
return out, nil
}

func (xfagg *aggregator) withEachFile(when time.Time) error {
Expand Down
25 changes: 15 additions & 10 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type FileReceiver struct {
httpFiles stream.Subscription
streamFiles stream.Subscription

CancellationResponses chan models.FileCancellationResponse

transformConfig *models.TransformConfig
}

Expand All @@ -77,15 +79,16 @@ func newFileReceiver(
) (*FileReceiver, error) {
// Create FileReceiver and connect streamFiles
fr := &FileReceiver{
logger: logger,
cfg: cfg,
eventEmitter: eventEmitter,
defaultShardName: cfg.Sharding.Default,
shardRepository: shardRepository,
shardAggregators: shardAggregators,
fileRepository: fileRepository,
httpFiles: httpFiles,
transformConfig: transformConfig,
logger: logger,
cfg: cfg,
eventEmitter: eventEmitter,
defaultShardName: cfg.Sharding.Default,
shardRepository: shardRepository,
shardAggregators: shardAggregators,
fileRepository: fileRepository,
httpFiles: httpFiles,
CancellationResponses: make(chan models.FileCancellationResponse, 1000),
transformConfig: transformConfig,
}
err := fr.reconnect()
if err != nil {
Expand Down Expand Up @@ -511,11 +514,13 @@ func (fr *FileReceiver) cancelACHFile(ctx context.Context, cancel *models.Cancel
logger.Log("begin canceling ACH file")

evt := incoming.CancelACHFile(*cancel)
err = agg.cancelFile(ctx, evt)
response, err := agg.cancelFile(ctx, evt)
if err != nil {
return logger.Error().LogErrorf("problem canceling file: %v", err).Err()
}

fr.CancellationResponses <- response

logger.Log("finished cancel of file")
return nil
}
21 changes: 18 additions & 3 deletions internal/pipeline/merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import (
// each merged file for an upload.
type XferMerging interface {
HandleXfer(ctx context.Context, xfer incoming.ACHFile) error
HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error
HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error)

WithEachMerged(ctx context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error)
}
Expand Down Expand Up @@ -127,15 +127,30 @@ func (m *filesystemMerging) writeACHFile(ctx context.Context, xfer incoming.ACHF
return nil
}

func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) error {
func (m *filesystemMerging) HandleCancel(ctx context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) {
path := filepath.Join("mergable", m.shard.Name, fmt.Sprintf("%s.ach", cancel.FileID))

// Check if the file exists already
file, _ := m.storage.Open(path)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we check the error here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're just looking for if the file exists. Cancellation can only be successful if the file gets renamed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if file != nil {
defer file.Close()
}

// Write the canceled File
err := m.storage.ReplaceFile(path, path+".canceled")
if err != nil {
telemetry.RecordError(ctx, err)
}
return err

// File was found and we didn't error during the rename
var successful bool = file != nil && err == nil

out := incoming.FileCancellationResponse{
FileID: cancel.FileID,
ShardKey: cancel.ShardKey,
Successful: successful,
}
return out, err
}

func (m *filesystemMerging) isolateMergableDir(ctx context.Context) (string, error) {
Expand Down
11 changes: 6 additions & 5 deletions internal/pipeline/mock_xfer_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
)

type MockXferMerging struct {
LatestFile *incoming.ACHFile
LatestCancel *incoming.CancelACHFile
processed *processedFiles
LatestFile *incoming.ACHFile
LatestCancel *incoming.CancelACHFile
CancellationResponse incoming.FileCancellationResponse
processed *processedFiles

Err error
}
Expand All @@ -38,9 +39,9 @@ func (merge *MockXferMerging) HandleXfer(_ context.Context, xfer incoming.ACHFil
return merge.Err
}

func (merge *MockXferMerging) HandleCancel(_ context.Context, cancel incoming.CancelACHFile) error {
func (merge *MockXferMerging) HandleCancel(_ context.Context, cancel incoming.CancelACHFile) (incoming.FileCancellationResponse, error) {
merge.LatestCancel = &cancel
return merge.Err
return merge.CancellationResponse, merge.Err
}

func (merge *MockXferMerging) WithEachMerged(_ context.Context, f func(context.Context, int, upload.Agent, *ach.File) (string, error)) (*processedFiles, error) {
Expand Down
8 changes: 4 additions & 4 deletions internal/test/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,17 @@ func TestUploads(t *testing.T) {
require.NoError(t, err)
defer streamTopic.Shutdown(context.Background())

fileController := web.NewFilesController(logger, service.HTTPConfig{}, httpPub)
r := mux.NewRouter()
fileController.AppendRoutes(r)

outboundPath := setupTestDirectory(t, uploadConf)
fileRepo := &files.MockRepository{}

fileReceiver, err := pipeline.Start(ctx, logger, uploadConf, shardRepo, fileRepo, httpSub)
require.NoError(t, err)
t.Cleanup(func() { fileReceiver.Shutdown() })

fileController := web.NewFilesController(logger, service.HTTPConfig{}, httpPub, fileReceiver.CancellationResponses)
r := mux.NewRouter()
fileController.AppendRoutes(r)

adminServer := admintest.Server(t)
fileReceiver.RegisterAdminRoutes(adminServer)

Expand Down
Loading
Loading