Skip to content

Commit

Permalink
feat: implement reconciler retrieve ingestion logs (#166)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoparente authored Sep 11, 2024
1 parent c170a04 commit c091b45
Show file tree
Hide file tree
Showing 3 changed files with 565 additions and 4 deletions.
168 changes: 168 additions & 0 deletions diode-server/reconciler/logs_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package reconciler

import (
"bytes"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"fmt"
"strconv"

"github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb"
)

type extraAttributesWrapper struct {
ExtraAttributes string `json:"$"`
IngestionTs string `json:"ingestion_ts"`
}
type redisLogResult struct {
ExtraAttributes extraAttributesWrapper `json:"extra_attributes"`
ID string `json:"id"`
}

type redisLogsResponse struct {
Results []redisLogResult `json:"results"`
TotalResults int `json:"total_results"`
}

func convertMapInterface(data interface{}) interface{} {
switch v := data.(type) {
case map[interface{}]interface{}:
converted := make(map[string]interface{})
for key, value := range v {
converted[fmt.Sprintf("%v", key)] = convertMapInterface(value) // Recursive conversion for nested maps
}
return converted
case []interface{}:
// If the value is a slice, apply conversion recursively to each element
for i, value := range v {
v[i] = convertMapInterface(value)
}
return v
default:
return v
}
}

func encodeInt64ToBase64(num int64) string {
// Create a buffer to hold the binary representation
buf := new(bytes.Buffer)

// Write the int64 value as a binary value into the buffer
err := binary.Write(buf, binary.BigEndian, num)
if err != nil {
fmt.Println("Error writing binary:", err)
}

// Encode the binary data to base64
encoded := base64.StdEncoding.EncodeToString(buf.Bytes())
return encoded
}

func decodeBase64ToInt64(encoded string) (int64, error) {
// Decode the base64 string back to bytes
data, err := base64.StdEncoding.DecodeString(encoded)
if err != nil {
return 0, err
}

// Convert the byte slice back to int64
var num int64
buf := bytes.NewReader(data)
err = binary.Read(buf, binary.BigEndian, &num)
if err != nil {
return 0, err
}

return num, nil
}

func retrieveIngestionLogs(ctx context.Context, client RedisClient, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) {
logs := make([]*reconcilerpb.IngestionLog, 0)
pageSize := in.GetPageSize()
if pageSize == 0 {
pageSize = 10 // Default to 10
}

var err error
var ingestionTs int64

//Check start TS filter
var startTs int64
if in.GetIngestionTsStart() != 0 {
startTs = in.GetIngestionTsStart()
}
query := fmt.Sprintf("@ingestion_ts:[%d inf]", startTs)

if in.PageToken != "" {
ingestionTs, err = decodeBase64ToInt64(in.PageToken)
if err != nil {
return nil, fmt.Errorf("error decoding page token: %w", err)
}
query = fmt.Sprintf("@ingestion_ts:[%d %d]", startTs, ingestionTs)
}

// Construct the base FT.SEARCH query
queryArgs := []interface{}{
"FT.SEARCH",
"ingest-entity", // Index name
query,
}

queryIndex := len(queryArgs) - 1

// Apply optional filters
if in.State != nil {
stateFilter := fmt.Sprintf("@state:[%d %d]", in.GetState(), in.GetState())
queryArgs[queryIndex] = fmt.Sprintf("%s %s", queryArgs[queryIndex], stateFilter)
}
if in.GetDataType() != "" {
dataType := fmt.Sprintf("@data_type:%s", in.GetDataType())
queryArgs[queryIndex] = fmt.Sprintf("%s %s", queryArgs[queryIndex], dataType)
}

// Apply sorting by ingestion_ts in descending order
queryArgs = append(queryArgs, "SORTBY", "ingestion_ts", "DESC")

// Apply limit for pagination
queryArgs = append(queryArgs, "LIMIT", 0, pageSize)

// Execute the query using Redis
result, err := client.Do(ctx, queryArgs...).Result()
if err != nil {
return nil, fmt.Errorf("failed to retrieve logs: %w", err)
}

res := convertMapInterface(result)

jsonBytes, err := json.Marshal(res)
if err != nil {
return nil, fmt.Errorf("error marshaling logs: %w", err)
}

var response redisLogsResponse

// Unmarshal the result into the struct
err = json.Unmarshal([]byte(jsonBytes), &response)
if err != nil {
return nil, fmt.Errorf("error parsing JSON: %w", err)
}

for _, logsResult := range response.Results {
var extraAttr *reconcilerpb.IngestionLog
err := json.Unmarshal([]byte(logsResult.ExtraAttributes.ExtraAttributes), &extraAttr)
if err != nil {
return nil, fmt.Errorf("error parsing ExtraAttributes JSON: %v", err)
}

logs = append(logs, extraAttr)

ingestionTs, err = strconv.ParseInt(logsResult.ExtraAttributes.IngestionTs, 10, 64)
if err != nil {
return nil, fmt.Errorf("error converting ingestion timestamp: %w", err)
}
}

return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, NextPageToken: encodeInt64ToBase64(ingestionTs)}, nil
}
7 changes: 3 additions & 4 deletions diode-server/reconciler/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Server struct {
logger *slog.Logger
grpcListener net.Listener
grpcServer *grpc.Server
redisClient *redis.Client
redisClient RedisClient
apiKeys APIKeys
}

Expand Down Expand Up @@ -139,9 +139,8 @@ func (s *Server) RetrieveIngestionDataSources(_ context.Context, in *reconcilerp
}

// RetrieveIngestionLogs retrieves logs
func (s *Server) RetrieveIngestionLogs(_ context.Context, _ *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) {
logs := make([]*reconcilerpb.IngestionLog, 0)
return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs}, nil
func (s *Server) RetrieveIngestionLogs(ctx context.Context, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) {
return retrieveIngestionLogs(ctx, s.redisClient, in)
}

func validateRetrieveIngestionDataSourcesRequest(in *reconcilerpb.RetrieveIngestionDataSourcesRequest) error {
Expand Down
Loading

0 comments on commit c091b45

Please sign in to comment.