Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ingestion logs pagination and metrics adjustments #171

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions diode-proto/diode/v1/reconciler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ message IngestionMetrics {

// An ingestion log
message IngestionLog {
string data_type = 1;
State state = 2;
string request_id = 3;
int64 ingestion_ts = 4;
string producer_app_name = 5;
string producer_app_version = 6;
string sdk_name = 7;
string sdk_version = 8;
diode.v1.Entity entity = 9;
IngestionError error = 10;
string id = 1;
string data_type = 2;
State state = 3;
string request_id = 4;
int64 ingestion_ts = 5;
string producer_app_name = 6;
string producer_app_version = 7;
string sdk_name = 8;
string sdk_version = 9;
diode.v1.Entity entity = 10;
IngestionError error = 11;
}

// The request to retrieve ingestion logs
Expand Down
53 changes: 31 additions & 22 deletions diode-server/gen/diode/v1/reconcilerpb/reconciler.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions diode-server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/oklog/run v1.1.0
github.com/redis/go-redis/v9 v9.5.1
github.com/segmentio/ksuid v1.0.4
github.com/stretchr/testify v1.9.0
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
Expand Down
2 changes: 2 additions & 0 deletions diode-server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8=
github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/segmentio/ksuid v1.0.4 h1:sBo2BdShXjmcugAMwjugoGUdUV0pcxY5mW4xKRn3v4c=
github.com/segmentio/ksuid v1.0.4/go.mod h1:/XUiZBD3kVx5SmUOl55voK5yeAbBNNIed+2O73XgrPE=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand Down
7 changes: 5 additions & 2 deletions diode-server/reconciler/ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"regexp"
"strconv"

"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"github.com/redis/go-redis/v9"
"github.com/segmentio/ksuid"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -195,10 +195,13 @@ func (p *IngestionProcessor) handleStreamMessage(ctx context.Context, msg redis.
continue
}

key := fmt.Sprintf("ingest-entity:%s-%d-%s", objectType, ingestionTs, uuid.NewString())
ingestionLogID := ksuid.New().String()

key := fmt.Sprintf("ingest-entity:%s-%d-%s", objectType, ingestionTs, ingestionLogID)
p.logger.Debug("ingest entity key", "key", key)

ingestionLog := &reconcilerpb.IngestionLog{
Id: ingestionLogID,
RequestId: ingestReq.GetId(),
ProducerAppName: ingestReq.GetProducerAppName(),
ProducerAppVersion: ingestReq.GetProducerAppVersion(),
Expand Down
133 changes: 75 additions & 58 deletions diode-server/reconciler/logs_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"log/slog"
"strconv"

"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -49,51 +48,51 @@ func convertMapInterface(data interface{}) interface{} {
}
}

func encodeInt64ToBase64(num int64) string {
func encodeIntToBase64(num int32) string {
// Create a buffer to hold the binary representation
buf := new(bytes.Buffer)

// Write the int64 value as a binary value into the buffer
err := binary.Write(buf, binary.BigEndian, num)
if err != nil {
fmt.Println("Error writing binary:", err)
// Write the int value as a binary value into the buffer
if err := binary.Write(buf, binary.BigEndian, num); err != nil {
fmt.Println("error writing binary:", err)
}

// Encode the binary data to base64
encoded := base64.StdEncoding.EncodeToString(buf.Bytes())
return encoded
return base64.StdEncoding.EncodeToString(buf.Bytes())
}

func decodeBase64ToInt64(encoded string) (int64, error) {
func decodeBase64ToInt(encoded string) (int32, error) {
// Decode the base64 string back to bytes
data, err := base64.StdEncoding.DecodeString(encoded)
decoded, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return 0, err
}

// Convert the byte slice back to int64
var num int64
buf := bytes.NewReader(data)
err = binary.Read(buf, binary.BigEndian, &num)
if err != nil {
buf := bytes.NewReader(decoded)
var num int32
if err := binary.Read(buf, binary.BigEndian, &num); err != nil {
return 0, err
}

return num, nil
}

func retrieveIngestionMetrics(ctx context.Context, client RedisClient) (*reconcilerpb.RetrieveIngestionLogsResponse, error) {

pipe := client.Pipeline()

results := make([]*redis.Cmd, 0)
results = append(results, pipe.Do(ctx, "FT.SEARCH", "ingest-entity", "*", "LIMIT", 0, 0))
results := []*redis.Cmd{
pipe.Do(ctx, "FT.SEARCH", "ingest-entity", "*", "LIMIT", 0, 0),
}
for s := reconcilerpb.State_NEW; s <= reconcilerpb.State_NO_CHANGES; s++ {
results = append(results, pipe.Do(ctx, "FT.SEARCH", "ingest-entity", fmt.Sprintf("@state:[%d %d]", s, s), "LIMIT", 0, 0))
stateName, ok := reconcilerpb.State_name[int32(s)]
if !ok {
return nil, fmt.Errorf("failed to retrieve ingestion logs: failed to get state name of %d", s)
}
results = append(results, pipe.Do(ctx, "FT.SEARCH", "ingest-entity", fmt.Sprintf("@state:%s", stateName), "LIMIT", 0, 0))
}

_, err := pipe.Exec(ctx)
if err != nil {
if _, err := pipe.Exec(ctx); err != nil {
return nil, fmt.Errorf("failed to retrieve ingestion logs: %w", err)
}

Expand Down Expand Up @@ -133,27 +132,11 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi
}

pageSize := in.GetPageSize()
if in.PageSize == nil {
if in.PageSize == nil || pageSize >= 1000 {
pageSize = 100 // Default to 100
}

var err error
var ingestionTs int64

//Check start TS filter
var startTs int64
if in.GetIngestionTsStart() != 0 {
startTs = in.GetIngestionTsStart()
}
query := fmt.Sprintf("@ingestion_ts:[%d inf]", startTs)

if in.PageToken != "" {
ingestionTs, err = decodeBase64ToInt64(in.PageToken)
if err != nil {
return nil, fmt.Errorf("error decoding page token: %w", err)
}
query = fmt.Sprintf("@ingestion_ts:[%d %d]", startTs, ingestionTs)
}
query := buildQueryFilter(in)

// Construct the base FT.SEARCH query
queryArgs := []interface{}{
Expand All @@ -162,24 +145,20 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi
query,
}

queryIndex := len(queryArgs) - 1

// Apply optional filters
if in.State != nil {
stateFilter := fmt.Sprintf("@state:[%d %d]", in.GetState(), in.GetState())
queryArgs[queryIndex] = fmt.Sprintf("%s %s", queryArgs[queryIndex], stateFilter)
}
// Apply sorting by id in descending order
queryArgs = append(queryArgs, "SORTBY", "id", "DESC")

if in.GetDataType() != "" {
dataType := fmt.Sprintf("@data_type:%s", in.GetDataType())
queryArgs[queryIndex] = fmt.Sprintf("%s %s", queryArgs[queryIndex], dataType)
}

// Apply sorting by ingestion_ts in descending order
queryArgs = append(queryArgs, "SORTBY", "ingestion_ts", "DESC")
var err error

// Apply limit for pagination
queryArgs = append(queryArgs, "LIMIT", 0, pageSize)
var offset int32
if in.PageToken != "" {
offset, err = decodeBase64ToInt(in.PageToken)
if err != nil {
logger.Warn("error decoding page token", "error", err)
}
}
queryArgs = append(queryArgs, "LIMIT", offset, pageSize)

logger.Debug("retrieving ingestion logs", "query", queryArgs)

Expand Down Expand Up @@ -212,11 +191,13 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi
}

logs = append(logs, ingestionLog)
}

ingestionTs, err = strconv.ParseInt(logsResult.ExtraAttributes.IngestionTs, 10, 64)
if err != nil {
return nil, fmt.Errorf("error converting ingestion timestamp: %w", err)
}
var nextPageToken string

if len(logs) == int(pageSize) {
offset += int32(len(logs))
nextPageToken = encodeIntToBase64(offset)
}

// Fill metrics
Expand All @@ -237,5 +218,41 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, client Redi
metrics.Total = response.TotalResults
}

return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: &metrics, NextPageToken: encodeInt64ToBase64(ingestionTs)}, nil
return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: &metrics, NextPageToken: nextPageToken}, nil
}

func buildQueryFilter(req *reconcilerpb.RetrieveIngestionLogsRequest) string {
queryFilter := "*"

// apply optional filters for ingestion timestamps (start and end)
if req.GetIngestionTsStart() > 0 || req.GetIngestionTsEnd() > 0 {
ingestionTsFilter := fmt.Sprintf("@ingestion_ts:[%d inf]", req.GetIngestionTsStart())

if req.GetIngestionTsEnd() > 0 {
mfiedorowicz marked this conversation as resolved.
Show resolved Hide resolved
ingestionTsFilter = fmt.Sprintf("@ingestion_ts:[%d %d]", req.GetIngestionTsStart(), req.GetIngestionTsEnd())
}

queryFilter = ingestionTsFilter
}

// apply optional filters for ingestion state
if req.State != nil {
stateFilter := fmt.Sprintf("@state:[%d %d]", req.GetState(), req.GetState())
if queryFilter == "*" {
queryFilter = stateFilter
} else {
queryFilter = fmt.Sprintf("%s %s", queryFilter, stateFilter)
}
}

if req.GetDataType() != "" {
dataTypeFilter := fmt.Sprintf("@data_type:%s", req.GetDataType())
if queryFilter == "*" {
queryFilter = dataTypeFilter
} else {
queryFilter = fmt.Sprintf("%s %s", queryFilter, dataTypeFilter)
}
}

return queryFilter
}
5 changes: 5 additions & 0 deletions diode-server/reconciler/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func initialMigration() func(context.Context, *slog.Logger, RedisClient) error {
"1",
"ingest-entity:",
"SCHEMA",
"$.id",
"AS",
"id",
"TEXT",
"SORTABLE",
"$.dataType",
"AS",
"data_type",
Expand Down
5 changes: 5 additions & 0 deletions diode-server/reconciler/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func TestMigrate(t *testing.T) {
"1",
"ingest-entity:",
"SCHEMA",
"$.id",
"AS",
"id",
"TEXT",
"SORTABLE",
"$.dataType",
"AS",
"data_type",
Expand Down
Loading
Loading