From 2f4e14213c873d07648c07798f2d3c65670a27a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20B=C3=B8rsheim?= Date: Thu, 18 Apr 2024 16:15:29 +0200 Subject: [PATCH 1/2] refactor: consolidate common functions As both verify reject/confirm is similar, a refactor has been made to allow for sharing of code used by both. - Setup of the kafka reader been moved to the cmd argument parsing, to simplify function calls down the line, and hopefully making it easier to mock in tests. - Changed the groupID in the kafka reader config to separate between verify confirm and verify reject - Now supplying context to the read functions, for a graceful shutdown when running i within a container. --- README.md | 6 +- cmd/send/send.go | 4 +- cmd/verify/reject/reject.go | 19 ++++- internal/dps/read.go | 42 +++++++++++ internal/{send => dps}/send.go | 2 +- internal/{send => dps}/send_test.go | 2 +- internal/dps/types.go | 28 ++++++++ .../reject_test.go => dps/types_test.go} | 18 ++--- internal/verify/reject/reject.go | 72 +++++-------------- internal/verify/reject/teams_message.go | 3 +- internal/verify/reject/teams_message_test.go | 8 ++- internal/verify/reject/types.go | 24 ------- 12 files changed, 125 insertions(+), 103 deletions(-) create mode 100644 internal/dps/read.go rename internal/{send => dps}/send.go (99%) rename internal/{send => dps}/send_test.go (98%) create mode 100644 internal/dps/types.go rename internal/{verify/reject/reject_test.go => dps/types_test.go} (73%) delete mode 100644 internal/verify/reject/types.go diff --git a/README.md b/README.md index f3ea992..d39e381 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,9 @@ hermetic verify \ #### Confirm ```shell hermetic verify \ ---kafka-endpoints= \ -confirm ---confirm-topic + --kafka-endpoints= \ + confirm + --confirm-topic ``` ### Acquisition upload diff --git a/cmd/send/send.go b/cmd/send/send.go index 8a73fad..b0b1eac 100644 --- a/cmd/send/send.go +++ b/cmd/send/send.go @@ -3,7 +3,7 @@ package send import ( "fmt" "hermetic/internal/common_flags" - sendImplementation "hermetic/internal/send" + "hermetic/internal/dps" "hermetic/internal/teams" "github.com/spf13/cobra" @@ -52,7 +52,7 @@ func parseArgumentsAndCallSend(cmd *cobra.Command, args []string) error { return fmt.Errorf("getting stage artifacts root failed, original error: '%w'", err) } - err = sendImplementation.PrepareAndSendSubmissionInformationPackage(common_flags.KafkaEndpoints, transferTopicName, stageArtifactsRoot) + err = dps.PrepareAndSendSubmissionInformationPackage(common_flags.KafkaEndpoints, transferTopicName, stageArtifactsRoot) if err != nil { err = fmt.Errorf("transfer error, cause: `%w`", err) fmt.Printf("Sending error message to Teams\n") diff --git a/cmd/verify/reject/reject.go b/cmd/verify/reject/reject.go index 657bf6d..836be25 100644 --- a/cmd/verify/reject/reject.go +++ b/cmd/verify/reject/reject.go @@ -1,11 +1,15 @@ package reject import ( + "context" "fmt" + "github.com/segmentio/kafka-go" "github.com/spf13/cobra" "hermetic/internal/common_flags" "hermetic/internal/teams" rejectImplementation "hermetic/internal/verify/reject" + "os" + "os/signal" ) func NewCommand() *cobra.Command { @@ -13,7 +17,7 @@ func NewCommand() *cobra.Command { Use: "reject", Short: "Continuously report all rejected data", Args: cobra.NoArgs, - RunE: parseArgumentsAndCallVerify, + RunE: parseArgumentsAndReadRejectTopic, } rejectTopicFlagName := "reject-topic" command.Flags().String(rejectTopicFlagName, "", "name of reject-topic") @@ -23,13 +27,22 @@ func NewCommand() *cobra.Command { return command } -func parseArgumentsAndCallVerify(cmd *cobra.Command, args []string) error { +func parseArgumentsAndReadRejectTopic(cmd *cobra.Command, args []string) error { rejectTopicName, err := cmd.Flags().GetString("reject-topic") if err != nil { return fmt.Errorf("failed to get reject-topic flag, cause: `%w`", err) } - err = rejectImplementation.Verify(rejectTopicName, common_flags.KafkaEndpoints, common_flags.TeamsWebhookNotificationUrl) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: common_flags.KafkaEndpoints, + Topic: rejectTopicName, + GroupID: "nettarkivet-hermetic-verify-reject", + }) + + err = rejectImplementation.ReadRejectTopic(ctx, reader, common_flags.TeamsWebhookNotificationUrl) if err != nil { err = fmt.Errorf("verification error, cause: `%w`", err) fmt.Printf("Sending error message to Teams\n") diff --git a/internal/dps/read.go b/internal/dps/read.go new file mode 100644 index 0000000..68d5084 --- /dev/null +++ b/internal/dps/read.go @@ -0,0 +1,42 @@ +package dps + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/segmentio/kafka-go" +) + +func ReadMessages(ctx context.Context, reader *kafka.Reader) (*KafkaResponse, error) { + for { + fmt.Println("Reading next message...") + message, err := reader.ReadMessage(ctx) + if err != nil { + return nil, fmt.Errorf("failed to read message: %w", err) + } + + var dpsResponse DigitalPreservationSystemResponse + + err = json.Unmarshal(message.Value, &dpsResponse) + if err != nil { + syntaxError := new(json.SyntaxError) + if errors.As(err, &syntaxError) { + fmt.Printf("Could not read message at offset '%d', syntax error in message, skipping offset\n", message.Offset) + continue + } + return nil, fmt.Errorf("failed to unmarshal json, original error: '%w'", err) + } + + if !IsWebArchiveOwned(&dpsResponse) { + fmt.Printf("Message at offset '%d' is not owned by web archive, skipping offset\n", message.Offset) + continue + } + + return &KafkaResponse{ + Offset: message.Offset, + Key: string(message.Key), + DPSResponse: dpsResponse, + }, nil + } +} diff --git a/internal/send/send.go b/internal/dps/send.go similarity index 99% rename from internal/send/send.go rename to internal/dps/send.go index fc5d96a..6ddd1ea 100644 --- a/internal/send/send.go +++ b/internal/dps/send.go @@ -1,4 +1,4 @@ -package sendImplementation +package dps import ( "context" diff --git a/internal/send/send_test.go b/internal/dps/send_test.go similarity index 98% rename from internal/send/send_test.go rename to internal/dps/send_test.go index be9bf48..bb90754 100644 --- a/internal/send/send_test.go +++ b/internal/dps/send_test.go @@ -1,4 +1,4 @@ -package sendImplementation +package dps import ( "hermetic/internal/submission_information_package" diff --git a/internal/dps/types.go b/internal/dps/types.go new file mode 100644 index 0000000..e7df0e9 --- /dev/null +++ b/internal/dps/types.go @@ -0,0 +1,28 @@ +package dps + +type Check struct { + Status string + Message string + Reason string + File string +} + +type DigitalPreservationSystemResponse struct { + Date string + Identifier string + Urn string + Path string + ContentType string + ContentCategory string + Checks []Check +} + +type KafkaResponse struct { + Offset int64 + Key string + DPSResponse DigitalPreservationSystemResponse +} + +func IsWebArchiveOwned(message *DigitalPreservationSystemResponse) bool { + return message.ContentCategory == "nettarkiv" +} diff --git a/internal/verify/reject/reject_test.go b/internal/dps/types_test.go similarity index 73% rename from internal/verify/reject/reject_test.go rename to internal/dps/types_test.go index 12d427f..ba95502 100644 --- a/internal/verify/reject/reject_test.go +++ b/internal/dps/types_test.go @@ -1,22 +1,22 @@ -package rejectImplementation +package dps import ( "testing" ) func TestIsWebArchiveOwnedInvalid(t *testing.T) { - notWebArchive := kafkaResponse{ + notWebArchive := KafkaResponse{ Offset: int64(0), Key: "key", - DPSResponse: digitalPreservationSystemResponse{ + DPSResponse: DigitalPreservationSystemResponse{ ContentCategory: "something else", Date: "date", Identifier: "identifier", Urn: "urn", Path: "path", ContentType: "contentType", - Checks: []check{ + Checks: []Check{ { Status: "status", Message: "message", @@ -26,23 +26,23 @@ func TestIsWebArchiveOwnedInvalid(t *testing.T) { }, }, } - if isWebArchiveOwned(notWebArchive) { + if IsWebArchiveOwned(¬WebArchive.DPSResponse) { t.Errorf("Expected false, got true") } } func TestIsWebArchiveOwnedValid(t *testing.T) { - webArchiveResponse := kafkaResponse{ + webArchiveResponse := KafkaResponse{ Offset: int64(0), Key: "key", - DPSResponse: digitalPreservationSystemResponse{ + DPSResponse: DigitalPreservationSystemResponse{ ContentCategory: "nettarkiv", Date: "date", Identifier: "identifier", Urn: "urn", Path: "path", ContentType: "contentType", - Checks: []check{ + Checks: []Check{ { Status: "status", Message: "message", @@ -52,7 +52,7 @@ func TestIsWebArchiveOwnedValid(t *testing.T) { }, }, } - if !isWebArchiveOwned(webArchiveResponse) { + if !IsWebArchiveOwned(&webArchiveResponse.DPSResponse) { t.Errorf("Expected true, got false") } diff --git a/internal/verify/reject/reject.go b/internal/verify/reject/reject.go index 10991a5..7e3fa10 100644 --- a/internal/verify/reject/reject.go +++ b/internal/verify/reject/reject.go @@ -2,70 +2,30 @@ package rejectImplementation import ( "context" - "encoding/json" - "errors" "fmt" - "hermetic/internal/teams" - "time" - - kafkaHelpers "hermetic/internal/kafka" - "github.com/segmentio/kafka-go" + "hermetic/internal/dps" + "hermetic/internal/teams" ) -func isWebArchiveOwned(message kafkaResponse) bool { - return message.DPSResponse.ContentCategory == "nettarkiv" -} - -func Verify(rejectTopicName string, kafkaEndpoints []string, teamsWebhookNotificationUrl string) error { - reader := kafkaHelpers.MessageReader{ - Reader: kafka.NewReader(kafka.ReaderConfig{ - Brokers: kafkaEndpoints, - Topic: rejectTopicName, - GroupID: "nettarkivet-hermetic-verify", - }), - } - readTimeout := 10 * time.Second - cycleSleepDuration := 1 * time.Minute - +func ReadRejectTopic(ctx context.Context, reader *kafka.Reader, teamsWebhookNotificationUrl string) error { for { - fmt.Printf("Reading next message with timeout '%s'\n", readTimeout) - message, err := reader.ReadMessageWithTimeout(readTimeout) - if errors.Is(err, context.DeadlineExceeded) { - fmt.Printf("Reading message timed out, sleeping for '%s'\n", cycleSleepDuration) - time.Sleep(cycleSleepDuration) - continue - } + response, err := dps.ReadMessages(ctx, reader) if err != nil { - return fmt.Errorf("failed to read message, cause: `%w`", err) + return fmt.Errorf("failed to read message from reject-topic: `%w`", err) } - var parsedMessage digitalPreservationSystemResponse - - err = json.Unmarshal(message.Value, &parsedMessage) - if err != nil { - syntaxError := new(json.SyntaxError) - if errors.As(err, &syntaxError) { - fmt.Printf("Could not read message at offset '%d', syntax error in message, skipping offset\n", message.Offset) - continue - } - return fmt.Errorf("failed to unmarshal json, original error: '%w'", err) - } - response := kafkaResponse{ - Offset: message.Offset, - Key: string(message.Key), - DPSResponse: parsedMessage, - } - - if !isWebArchiveOwned(response) { - fmt.Printf("Skipping message with ContentCategory: '%s'\n", response.DPSResponse.ContentCategory) - continue + if err := ProcessMessagesFromRejectTopic(reader, response, teamsWebhookNotificationUrl); err != nil { + return fmt.Errorf("failed to process message from reject-topic: `%w`", err) } + } +} - fmt.Printf("Processing message with ContentCategory: '%s'\n", response.DPSResponse.ContentCategory) - payload := createTeamsDigitalPreservationSystemFailureMessage(response, rejectTopicName, kafkaEndpoints) - err = teams.SendMessage(payload, teamsWebhookNotificationUrl) - if err != nil { - return fmt.Errorf("failed to send message to teams, cause: `%w`", err) - } +func ProcessMessagesFromRejectTopic(reader *kafka.Reader, response *dps.KafkaResponse, teamsWebhookNotificationUrl string) error { + fmt.Printf("Processing message with ContentCategory: '%s'\n", response.DPSResponse.ContentCategory) + payload := createTeamsDigitalPreservationSystemFailureMessage(response, reader.Config().Topic, reader.Config().Brokers) + err := teams.SendMessage(payload, teamsWebhookNotificationUrl) + if err != nil { + return fmt.Errorf("failed to send message to teams, cause: `%w`", err) } + return nil } diff --git a/internal/verify/reject/teams_message.go b/internal/verify/reject/teams_message.go index 1c3415d..ec46aa4 100644 --- a/internal/verify/reject/teams_message.go +++ b/internal/verify/reject/teams_message.go @@ -2,12 +2,13 @@ package rejectImplementation import ( "fmt" + "hermetic/internal/dps" "hermetic/internal/teams" "strconv" "strings" ) -func createTeamsDigitalPreservationSystemFailureMessage(message kafkaResponse, rejectTopicName string, kafkaEndpoints []string) teams.Message { +func createTeamsDigitalPreservationSystemFailureMessage(message *dps.KafkaResponse, rejectTopicName string, kafkaEndpoints []string) teams.Message { facts := []teams.Fact{ { Name: "Kafka message offset", diff --git a/internal/verify/reject/teams_message_test.go b/internal/verify/reject/teams_message_test.go index 7704387..335a66c 100644 --- a/internal/verify/reject/teams_message_test.go +++ b/internal/verify/reject/teams_message_test.go @@ -3,6 +3,7 @@ package rejectImplementation import ( "encoding/json" "fmt" + "hermetic/internal/dps" "hermetic/internal/teams" "testing" @@ -10,17 +11,17 @@ import ( ) func TestCreateTeamsMessage(t *testing.T) { - kafkaResponse := kafkaResponse{ + kafkaResponse := &dps.KafkaResponse{ Offset: 0, Key: "key", - DPSResponse: digitalPreservationSystemResponse{ + DPSResponse: dps.DigitalPreservationSystemResponse{ Date: "date", Identifier: "identifier", Urn: "urn", Path: "path", ContentType: "contentType", ContentCategory: "contentCategory", - Checks: []check{ + Checks: []dps.Check{ { Status: "status", Message: "message", @@ -30,6 +31,7 @@ func TestCreateTeamsMessage(t *testing.T) { }, }, } + rejectTopicName := "rejectTopicName" kafkaEndpoints := []string{"kafkaEndpoints"} diff --git a/internal/verify/reject/types.go b/internal/verify/reject/types.go deleted file mode 100644 index 5b57d9e..0000000 --- a/internal/verify/reject/types.go +++ /dev/null @@ -1,24 +0,0 @@ -package rejectImplementation - -type check struct { - Status string - Message string - Reason string - File string -} - -type digitalPreservationSystemResponse struct { - Date string - Identifier string - Urn string - Path string - ContentType string - ContentCategory string - Checks []check -} - -type kafkaResponse struct { - Offset int64 - Key string - DPSResponse digitalPreservationSystemResponse -} From 3a4a4a4347915540a82fe43c960708286cdc47ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20B=C3=B8rsheim?= Date: Wed, 8 May 2024 16:35:25 +0200 Subject: [PATCH 2/2] feat: log confirm messages Heavily inspired by the verify reject command this command now prints the messages from confirm-topic. --- cmd/verify/confirm/confirm.go | 19 ++++++++++++++++--- internal/verify/confirm/confirm.go | 16 +++++++++++----- internal/verify/confirm/confirm_test.go | 14 -------------- 3 files changed, 27 insertions(+), 22 deletions(-) delete mode 100644 internal/verify/confirm/confirm_test.go diff --git a/cmd/verify/confirm/confirm.go b/cmd/verify/confirm/confirm.go index fd21f3b..e7304a8 100644 --- a/cmd/verify/confirm/confirm.go +++ b/cmd/verify/confirm/confirm.go @@ -1,11 +1,15 @@ package confirm import ( + "context" "fmt" + "github.com/segmentio/kafka-go" "github.com/spf13/cobra" "hermetic/internal/common_flags" "hermetic/internal/teams" confirmImplementation "hermetic/internal/verify/confirm" + "os" + "os/signal" ) func NewCommand() *cobra.Command { @@ -13,7 +17,7 @@ func NewCommand() *cobra.Command { Use: "confirm", Short: "Continuously report all successfully preserved data", Args: cobra.NoArgs, - RunE: parseArgumentsAndCallVerify, + RunE: parseArgumentsAndReadConfirmTopic, } confirmTopicFlagName := "confirm-topic" command.Flags().String(confirmTopicFlagName, "", "name of confirm-topic") @@ -23,13 +27,22 @@ func NewCommand() *cobra.Command { return command } -func parseArgumentsAndCallVerify(cmd *cobra.Command, args []string) error { +func parseArgumentsAndReadConfirmTopic(cmd *cobra.Command, args []string) error { confirmTopicName, err := cmd.Flags().GetString("confirm-topic") if err != nil { return fmt.Errorf("failed to get confirm-topic flag, cause: `%w`", err) } - err = confirmImplementation.Verify(confirmTopicName) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt) + defer cancel() + + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: common_flags.KafkaEndpoints, + Topic: confirmTopicName, + GroupID: "nettarkivet-hermetic-verify-confirm", + }) + + err = confirmImplementation.ReadConfirmTopic(ctx, reader) if err != nil { err = fmt.Errorf("verification error, cause: `%w`", err) fmt.Printf("Sending error message to Teams\n") diff --git a/internal/verify/confirm/confirm.go b/internal/verify/confirm/confirm.go index d375191..3c09e11 100644 --- a/internal/verify/confirm/confirm.go +++ b/internal/verify/confirm/confirm.go @@ -1,12 +1,18 @@ package confirmImplmentation import ( + "context" "fmt" + "github.com/segmentio/kafka-go" + "hermetic/internal/dps" ) -func Verify(confirmTopicName string) error { - fmt.Printf("Reading messages from topic '%s'\n", confirmTopicName) - fmt.Printf("This command is not implemented yet\n" + - "Aims to solve issue https://github.com/nlnwa/hermetic/issues/3 ") - return nil +func ReadConfirmTopic(ctx context.Context, reader *kafka.Reader) error { + for { + response, err := dps.ReadMessages(ctx, reader) + if err != nil { + return fmt.Errorf("failed to read message from confirm-topic: `%w`", err) + } + fmt.Printf("SIP successfully preserved: %+v\n", response) + } } diff --git a/internal/verify/confirm/confirm_test.go b/internal/verify/confirm/confirm_test.go deleted file mode 100644 index 353ef19..0000000 --- a/internal/verify/confirm/confirm_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package confirmImplmentation - -import ( - "testing" -) - -func TestVerify(t *testing.T) { - t.Run("Verify function test", func(t *testing.T) { - err := Verify(("test-topic")) - if err != nil { - t.Errorf("Verify() returned error: %v", err) - } - }) -}