Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
maeb committed May 7, 2024
1 parent c648c08 commit 94d1cd7
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 43 deletions.
5 changes: 3 additions & 2 deletions cmd/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package confirm
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/spf13/cobra"
"hermetic/internal/common_flags"
"hermetic/internal/teams"
confirmImplementation "hermetic/internal/verify/confirm"
"os"
"os/signal"

"github.com/segmentio/kafka-go"
"github.com/spf13/cobra"
)

func NewCommand() *cobra.Command {
Expand Down
Binary file added hermetic
Binary file not shown.
18 changes: 6 additions & 12 deletions internal/dps/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,34 @@ import (
"context"
"encoding/json"
"fmt"

"github.com/segmentio/kafka-go"
)

func ReadMessages(ctx context.Context, reader *kafka.Reader, callback func(*KafkaResponse) error) error {
func ReadMessage(ctx context.Context, reader *kafka.Reader) (*KafkaResponse, error) {
for {
// TODO log debug
fmt.Println("Reading next message...")
message, err := reader.ReadMessage(ctx)
if err != nil {
return fmt.Errorf("failed to read message: %w", err)
return nil, fmt.Errorf("failed to read message: %w", err)
}

var dpsResponse DigitalPreservationSystemResponse

err = json.Unmarshal(message.Value, &dpsResponse)
if err != nil {
return fmt.Errorf("could not unmarshal message at offset '%d': %w", message.Offset, err)
return nil, fmt.Errorf("could not unmarshal message at offset '%d': %w", message.Offset, err)
}

if !IsWebArchiveOwned(&dpsResponse) {
// TODO log debug message
fmt.Printf("Message at offset '%d' is not owned by web archive, skipping offset\n", message.Offset)
continue
}

response := &KafkaResponse{
return &KafkaResponse{
Offset: message.Offset,
Key: string(message.Key),
DPSResponse: dpsResponse,
}

err = callback(response)
if err != nil {
return err
}
}, nil
}
}
24 changes: 9 additions & 15 deletions internal/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,18 @@ package confirmImplmentation
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"hermetic/internal/dps"

"github.com/segmentio/kafka-go"
)

func ReadConfirmTopic(ctx context.Context, reader *kafka.Reader) error {
err := dps.ReadMessages(ctx, reader, ProcessMessagesFromConfirmTopic)
if err != nil {
return fmt.Errorf("failed to read confirm-topic: `%w`", err)
}
return nil
}

func ProcessMessagesFromConfirmTopic(response *dps.KafkaResponse) error {
if response == nil {
panic("No response found")
for {
response, err := dps.ReadMessage(ctx, reader)
if err != nil {
return fmt.Errorf("failed to read confirm-topic: `%w`", err)
}

Check failure on line 17 in internal/verify/confirm/confirm.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
fmt.Printf("SIP successfully preserved: %+v\n", response)
}

fmt.Printf("SIP successfully preserved: %+v\n", response)

return nil
}
25 changes: 11 additions & 14 deletions internal/verify/reject/reject.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@ package rejectImplementation
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"hermetic/internal/dps"
"hermetic/internal/teams"

"github.com/segmentio/kafka-go"
)

func ReadRejectTopic(ctx context.Context, reader *kafka.Reader, teamsWebhookNotificationUrl string) error {
err := dps.ReadMessages(ctx, reader, func(response *dps.KafkaResponse) error {
if err := ProcessMessagesFromRejectTopic(reader, response, teamsWebhookNotificationUrl); err != nil {
return fmt.Errorf("failed to process message from reject-topic: `%w`", err)
for {
response, err := dps.ReadMessage(ctx, reader)
if err != nil {
return fmt.Errorf("failed to read reject-topic: `%w`", err)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to read reject-topic: `%w`", err)

if err := ProcessMessagesFromRejectTopic(reader, response, teamsWebhookNotificationUrl); err != nil {
return fmt.Errorf("failed to process message from reject-topic: `%w`", err)

Check failure on line 20 in internal/verify/reject/reject.go

View workflow job for this annotation

GitHub Actions / Lint

File is not `gofmt`-ed with `-s` (gofmt)
}
}
return nil
}

func ProcessMessagesFromRejectTopic(reader *kafka.Reader, response *dps.KafkaResponse, teamsWebhookNotificationUrl string) error {
if response == nil {
panic("No response found")
}

// TODO log more info from reject message in case sending to teams fails
// TODO print more info from reject message in case sending to teams fails
fmt.Printf("Processing message with ContentCategory: '%s'\n", response.DPSResponse.ContentCategory)

payload := createTeamsDigitalPreservationSystemFailureMessage(response, reader.Config().Topic, reader.Config().Brokers)
Expand Down

0 comments on commit 94d1cd7

Please sign in to comment.