Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling fixes #203

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/pipeline/events_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/moov-io/ach"
"github.com/moov-io/achgateway/internal/admintest"
"github.com/moov-io/achgateway/internal/events"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/achgateway/pkg/compliance"
"github.com/moov-io/achgateway/pkg/models"
"github.com/moov-io/base"
Expand All @@ -40,7 +41,7 @@ import (
func TestEventsAPI_FileUploaded(t *testing.T) {
adminServer := admintest.Server(t)

fr := testFileReceiver(t)
fr := testFileReceiver(t, func(conf *service.Config) {})
fr.RegisterAdminRoutes(adminServer)

// Write a file that's produced
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestEventsAPI_FileUploaded(t *testing.T) {
func TestEventsAPI_FileUploadedErrors(t *testing.T) {
adminServer := admintest.Server(t)

fr := testFileReceiver(t)
fr := testFileReceiver(t, func(conf *service.Config) {})
fr.RegisterAdminRoutes(adminServer)

t.Run("Call /file-uploaded on a shard that doesn't exist", func(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions internal/pipeline/file_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (fr *FileReceiver) handleMessage(ctx context.Context, sub stream.Subscripti
go func() {
msg, err := sub.Receive(ctx)
if err != nil {
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
return
}
if strings.Contains(err.Error(), "Subscription has been Shutdown") {
Expand Down Expand Up @@ -268,7 +268,8 @@ func (fr *FileReceiver) processMessage(msg *pubsub.Message) error {
// Optionally decode and decrypt message
data, err = compliance.Reveal(fr.transformConfig, data)
if err != nil {
return logger.LogErrorf("unable to reveal event: %v", err).Err()
logger.LogErrorf("unable to reveal event: %v", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this is going to be a workaround or a hack to get past where the event used in cancelling a transfer comes in without needing to be decrypted, or if this is going to be a permanent solution.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there are different event types for ACHFile and CancelACHFile. Wouldn't it be better to check the type, and not trying to decrypt CancelACHFile events. That way true decryption errors could fail as they should. You could just move the decryption down to line 307 or 315.

Copy link
Contributor Author

@jasonbornsteinMOOV jasonbornsteinMOOV Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could try to check the event type. it would require reading first (to get the event type), possibly failing because it needs to be revealed, an(d then reading again). but it shouldn't depend on the event type. it should just be that what we got was either encrypted or it wasn't, and we have to either reveal it or read it first to find out.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I did not notice that. If we need to get something working soon, I am ok with this approach. However, I think that we should create a ticket to fix this in a better way. It looks to me like the achgateway can receive events over http and over kafka. The kafka events come from ach-orchestrator and are encrypted, the http events are in the clear. After receiving the event, they are passed to code that processes both types and trys to decrypt them. The decryption always fails for the events received over http.

The decryption should be moved into the kafka specific components (configurable) so that decrypted events get delivered to the common code in all cases. This would fix the achgateway correctly in all cases.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I did not notice that. If we need to get something working soon, I am ok with this approach. However, I think that we should create a ticket to fix this in a better way. It looks to me like the achgateway can receive events over http and over kafka. The kafka events come from ach-orchestrator and are encrypted, the http events are in the clear. After receiving the event, they are passed to code that processes both types and trys to decrypt them. The decryption always fails for the events received over http.

The decryption should be moved into the kafka specific components (configurable) so that decrypted events get delivered to the common code in all cases. This would fix the achgateway correctly in all cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point. Can you raise a Linear card for that change?

Copy link
Contributor Author

@jasonbornsteinMOOV jasonbornsteinMOOV Oct 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data = msg.Body
}

event, readErr := models.Read(data)
Expand Down
82 changes: 79 additions & 3 deletions internal/pipeline/file_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (fr *TestFileReceiver) TriggerCutoff(t *testing.T) {
agg.cutoffTrigger <- waiter
}

func testFileReceiver(t *testing.T) *TestFileReceiver {
func testFileReceiver(t *testing.T, additionalConfig func(conf *service.Config)) *TestFileReceiver {
t.Helper()
if testing.Short() {
t.Skip("skipping integration test via -short")
Expand Down Expand Up @@ -110,6 +110,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
},
},
}
additionalConfig(conf)

shardRepo := shards.NewInMemoryRepository()
shardRepo.Add(service.ShardMapping{ShardKey: "testing", ShardName: "testing"}, database.NopInTx)
Expand All @@ -133,7 +134,7 @@ func testFileReceiver(t *testing.T) *TestFileReceiver {
}

func TestFileReceiver__InvalidQueueFile(t *testing.T) {
fr := testFileReceiver(t)
fr := testFileReceiver(t, func(conf *service.Config) {})

file, err := ach.ReadFile(filepath.Join("..", "incoming", "odfi", "testdata", "return-no-batch-controls.ach"))
require.ErrorContains(t, err, ach.ErrFileHeader.Error())
Expand Down Expand Up @@ -165,8 +166,83 @@ func TestFileReceiver__InvalidQueueFile(t *testing.T) {
require.Contains(t, iqf.Error, "reading QueueACHFile failed: ImmediateDestination")
}

func TestFileReceiver__CancelFile_ConsumerEncrypted(t *testing.T) {
fr := testFileReceiver(t, func(conf *service.Config) {
if conf.Inbound.Kafka == nil {
conf.Inbound.Kafka = &service.KafkaConfig{}
}
conf.Inbound.Kafka.Transform = &models.TransformConfig{
Encoding: &models.EncodingConfig{
Base64: true,
Compress: true,
},
Encryption: &models.EncryptionConfig{
AES: &models.AESConfig{
Key: "1111111111",
},
},
}
})
s, err := storage.New(fr.cfg.Upload.Merging.Storage)
require.NoError(t, err)

bs, err := compliance.Protect(nil, models.Event{
Event: models.CancelACHFile{
FileID: "return-no-batch-controls",
ShardKey: "testing",
},
})
require.NoError(t, err)

err = fr.Publisher.Send(context.Background(), &pubsub.Message{
Body: bs,
})
require.NoError(t, err)

require.Eventually(t, func() bool {
file2, err := s.Open(filepath.Join("mergable", "testing", fmt.Sprintf("%s.canceled", "return-no-batch-controls.ach")))
if err != nil {
t.Logf("waiting for file to be canceled: %v", err)
}
if file2 != nil {
t.Logf("file2.Filename: %s", file2.Filename())
}
return file2 != nil
}, 60*time.Second, 500*time.Millisecond)
}

func TestFileReceiver__CancelFile_ConsumerNotEncrypted(t *testing.T) {
fr := testFileReceiver(t, func(conf *service.Config) {})
s, err := storage.New(fr.cfg.Upload.Merging.Storage)
require.NoError(t, err)

bs, err := compliance.Protect(nil, models.Event{
Event: models.CancelACHFile{
FileID: "return-no-batch-controls",
ShardKey: "testing",
},
})
require.NoError(t, err)

err = fr.Publisher.Send(context.Background(), &pubsub.Message{
Body: bs,
})
require.NoError(t, err)

require.Eventually(t, func() bool {
file2, err := s.Open(filepath.Join("mergable", "testing", fmt.Sprintf("%s.canceled", "return-no-batch-controls.ach")))
if err != nil {
t.Logf("waiting for file to be canceled: %v", err)
}
if file2 != nil {
t.Logf("file2.Filename: %s", file2.Filename())
}
return file2 != nil
}, 60*time.Second, 500*time.Millisecond)
}

func TestFileReceiver__shouldAutocommit(t *testing.T) {
fr := testFileReceiver(t)
fr := testFileReceiver(t, func(conf *service.Config) {})

// Ensure the setup is as we expect
require.Nil(t, fr.cfg.Inbound.Kafka)
Expand Down