Skip to content

Commit

Permalink
Merge pull request #44 from nlnwa/feat/log_confirm_messages
Browse files Browse the repository at this point in the history
Log confirm messages
  • Loading branch information
maeb authored Jul 24, 2024
2 parents f3ea607 + 3a4a4a4 commit 6dfdeed
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 125 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ hermetic verify \
#### Confirm
```shell
hermetic verify \
--kafka-endpoints=<list-of-kafka-endpoints> \
confirm
--confirm-topic <topic-name>
--kafka-endpoints=<list-of-kafka-endpoints> \
confirm
--confirm-topic <topic-name>
```

### Acquisition upload
Expand Down
4 changes: 2 additions & 2 deletions cmd/send/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package send
import (
"fmt"
"hermetic/internal/common_flags"
sendImplementation "hermetic/internal/send"
"hermetic/internal/dps"
"hermetic/internal/teams"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -52,7 +52,7 @@ func parseArgumentsAndCallSend(cmd *cobra.Command, args []string) error {
return fmt.Errorf("getting stage artifacts root failed, original error: '%w'", err)
}

err = sendImplementation.PrepareAndSendSubmissionInformationPackage(common_flags.KafkaEndpoints, transferTopicName, stageArtifactsRoot)
err = dps.PrepareAndSendSubmissionInformationPackage(common_flags.KafkaEndpoints, transferTopicName, stageArtifactsRoot)
if err != nil {
err = fmt.Errorf("transfer error, cause: `%w`", err)
fmt.Printf("Sending error message to Teams\n")
Expand Down
19 changes: 16 additions & 3 deletions cmd/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package confirm

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/spf13/cobra"
"hermetic/internal/common_flags"
"hermetic/internal/teams"
confirmImplementation "hermetic/internal/verify/confirm"
"os"
"os/signal"
)

func NewCommand() *cobra.Command {
command := &cobra.Command{
Use: "confirm",
Short: "Continuously report all successfully preserved data",
Args: cobra.NoArgs,
RunE: parseArgumentsAndCallVerify,
RunE: parseArgumentsAndReadConfirmTopic,
}
confirmTopicFlagName := "confirm-topic"
command.Flags().String(confirmTopicFlagName, "", "name of confirm-topic")
Expand All @@ -23,13 +27,22 @@ func NewCommand() *cobra.Command {
return command
}

func parseArgumentsAndCallVerify(cmd *cobra.Command, args []string) error {
func parseArgumentsAndReadConfirmTopic(cmd *cobra.Command, args []string) error {
confirmTopicName, err := cmd.Flags().GetString("confirm-topic")
if err != nil {
return fmt.Errorf("failed to get confirm-topic flag, cause: `%w`", err)
}

err = confirmImplementation.Verify(confirmTopicName)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: common_flags.KafkaEndpoints,
Topic: confirmTopicName,
GroupID: "nettarkivet-hermetic-verify-confirm",
})

err = confirmImplementation.ReadConfirmTopic(ctx, reader)
if err != nil {
err = fmt.Errorf("verification error, cause: `%w`", err)
fmt.Printf("Sending error message to Teams\n")
Expand Down
19 changes: 16 additions & 3 deletions cmd/verify/reject/reject.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package reject

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"github.com/spf13/cobra"
"hermetic/internal/common_flags"
"hermetic/internal/teams"
rejectImplementation "hermetic/internal/verify/reject"
"os"
"os/signal"
)

func NewCommand() *cobra.Command {
command := &cobra.Command{
Use: "reject",
Short: "Continuously report all rejected data",
Args: cobra.NoArgs,
RunE: parseArgumentsAndCallVerify,
RunE: parseArgumentsAndReadRejectTopic,
}
rejectTopicFlagName := "reject-topic"
command.Flags().String(rejectTopicFlagName, "", "name of reject-topic")
Expand All @@ -23,13 +27,22 @@ func NewCommand() *cobra.Command {
return command
}

func parseArgumentsAndCallVerify(cmd *cobra.Command, args []string) error {
func parseArgumentsAndReadRejectTopic(cmd *cobra.Command, args []string) error {
rejectTopicName, err := cmd.Flags().GetString("reject-topic")
if err != nil {
return fmt.Errorf("failed to get reject-topic flag, cause: `%w`", err)
}

err = rejectImplementation.Verify(rejectTopicName, common_flags.KafkaEndpoints, common_flags.TeamsWebhookNotificationUrl)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()

reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: common_flags.KafkaEndpoints,
Topic: rejectTopicName,
GroupID: "nettarkivet-hermetic-verify-reject",
})

err = rejectImplementation.ReadRejectTopic(ctx, reader, common_flags.TeamsWebhookNotificationUrl)
if err != nil {
err = fmt.Errorf("verification error, cause: `%w`", err)
fmt.Printf("Sending error message to Teams\n")
Expand Down
42 changes: 42 additions & 0 deletions internal/dps/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package dps

import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/segmentio/kafka-go"
)

func ReadMessages(ctx context.Context, reader *kafka.Reader) (*KafkaResponse, error) {
for {
fmt.Println("Reading next message...")
message, err := reader.ReadMessage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read message: %w", err)
}

var dpsResponse DigitalPreservationSystemResponse

err = json.Unmarshal(message.Value, &dpsResponse)
if err != nil {
syntaxError := new(json.SyntaxError)
if errors.As(err, &syntaxError) {
fmt.Printf("Could not read message at offset '%d', syntax error in message, skipping offset\n", message.Offset)
continue
}
return nil, fmt.Errorf("failed to unmarshal json, original error: '%w'", err)
}

if !IsWebArchiveOwned(&dpsResponse) {
fmt.Printf("Message at offset '%d' is not owned by web archive, skipping offset\n", message.Offset)
continue
}

return &KafkaResponse{
Offset: message.Offset,
Key: string(message.Key),
DPSResponse: dpsResponse,
}, nil
}
}
2 changes: 1 addition & 1 deletion internal/send/send.go → internal/dps/send.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sendImplementation
package dps

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion internal/send/send_test.go → internal/dps/send_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package sendImplementation
package dps

import (
"hermetic/internal/submission_information_package"
Expand Down
28 changes: 28 additions & 0 deletions internal/dps/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package dps

type Check struct {
Status string
Message string
Reason string
File string
}

type DigitalPreservationSystemResponse struct {
Date string
Identifier string
Urn string
Path string
ContentType string
ContentCategory string
Checks []Check
}

type KafkaResponse struct {
Offset int64
Key string
DPSResponse DigitalPreservationSystemResponse
}

func IsWebArchiveOwned(message *DigitalPreservationSystemResponse) bool {
return message.ContentCategory == "nettarkiv"
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package rejectImplementation
package dps

import (
"testing"
)

func TestIsWebArchiveOwnedInvalid(t *testing.T) {
notWebArchive := kafkaResponse{
notWebArchive := KafkaResponse{
Offset: int64(0),
Key: "key",

DPSResponse: digitalPreservationSystemResponse{
DPSResponse: DigitalPreservationSystemResponse{
ContentCategory: "something else",
Date: "date",
Identifier: "identifier",
Urn: "urn",
Path: "path",
ContentType: "contentType",
Checks: []check{
Checks: []Check{
{
Status: "status",
Message: "message",
Expand All @@ -26,23 +26,23 @@ func TestIsWebArchiveOwnedInvalid(t *testing.T) {
},
},
}
if isWebArchiveOwned(notWebArchive) {
if IsWebArchiveOwned(&notWebArchive.DPSResponse) {
t.Errorf("Expected false, got true")
}
}
func TestIsWebArchiveOwnedValid(t *testing.T) {
webArchiveResponse := kafkaResponse{
webArchiveResponse := KafkaResponse{
Offset: int64(0),
Key: "key",

DPSResponse: digitalPreservationSystemResponse{
DPSResponse: DigitalPreservationSystemResponse{
ContentCategory: "nettarkiv",
Date: "date",
Identifier: "identifier",
Urn: "urn",
Path: "path",
ContentType: "contentType",
Checks: []check{
Checks: []Check{
{
Status: "status",
Message: "message",
Expand All @@ -52,7 +52,7 @@ func TestIsWebArchiveOwnedValid(t *testing.T) {
},
},
}
if !isWebArchiveOwned(webArchiveResponse) {
if !IsWebArchiveOwned(&webArchiveResponse.DPSResponse) {
t.Errorf("Expected true, got false")
}

Expand Down
16 changes: 11 additions & 5 deletions internal/verify/confirm/confirm.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package confirmImplmentation

import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"hermetic/internal/dps"
)

func Verify(confirmTopicName string) error {
fmt.Printf("Reading messages from topic '%s'\n", confirmTopicName)
fmt.Printf("This command is not implemented yet\n" +
"Aims to solve issue https://github.com/nlnwa/hermetic/issues/3 ")
return nil
func ReadConfirmTopic(ctx context.Context, reader *kafka.Reader) error {
for {
response, err := dps.ReadMessages(ctx, reader)
if err != nil {
return fmt.Errorf("failed to read message from confirm-topic: `%w`", err)
}
fmt.Printf("SIP successfully preserved: %+v\n", response)
}
}
14 changes: 0 additions & 14 deletions internal/verify/confirm/confirm_test.go

This file was deleted.

Loading

0 comments on commit 6dfdeed

Please sign in to comment.