Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 55 additions & 17 deletions internal/handlers/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ type LokiData struct {

// LokiEntry represents a single log entry from Loki
type LokiEntry struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"` // [timestamp, log line]
Stream map[string]string `json:"stream"` // For log queries
Metric map[string]string `json:"metric"` // For metric queries
Values [][]interface{} `json:"values"` // [timestamp, log line or numeric value]
}

// SSEEvent represents an event to be sent via SSE
Expand Down Expand Up @@ -346,6 +347,7 @@ func executeLokiQuery(ctx context.Context, queryURL string, username, password,
return nil, err
}


// Check for HTTP errors
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("HTTP error: %d - %s", resp.StatusCode, string(body))
Expand Down Expand Up @@ -389,11 +391,18 @@ func formatLokiResults(result *LokiResult, format string) (string, error) {
// Return raw log lines with timestamps and labels in simple format
var output string
for _, entry := range result.Data.Result {
// Build labels string
// Build labels string from either stream or metric labels
var labels string
var labelMap map[string]string
if len(entry.Stream) > 0 {
labelParts := make([]string, 0, len(entry.Stream))
for k, v := range entry.Stream {
labelMap = entry.Stream
} else if len(entry.Metric) > 0 {
labelMap = entry.Metric
}

if len(labelMap) > 0 {
labelParts := make([]string, 0, len(labelMap))
for k, v := range labelMap {
labelParts = append(labelParts, fmt.Sprintf("%s=%s", k, v))
}
labels = "{" + strings.Join(labelParts, ",") + "} "
Expand All @@ -402,17 +411,29 @@ func formatLokiResults(result *LokiResult, format string) (string, error) {
for _, val := range entry.Values {
if len(val) >= 2 {
// Parse timestamp and convert to readable format
ts, err := strconv.ParseFloat(val[0], 64)
timestampStr := fmt.Sprintf("%v", val[0])
ts, err := strconv.ParseFloat(timestampStr, 64)
var timestamp string
if err == nil {
// Convert to time - Loki returns timestamps in nanoseconds
t := time.Unix(0, int64(ts))
// Detect if timestamp is in seconds or nanoseconds
// Timestamps after year 2000 in nanoseconds are > 1e15
// Timestamps in seconds are typically < 1e12
var t time.Time
if ts > 1e15 {
// Nanoseconds (log queries)
t = time.Unix(0, int64(ts))
} else {
// Seconds (metric queries)
t = time.Unix(int64(ts), 0)
}
timestamp = t.Format(time.RFC3339)
} else {
timestamp = val[0]
timestamp = timestampStr
}

output += fmt.Sprintf("%s %s%s\n", timestamp, labels, val[1])
// Convert value to string (handles both log lines and numeric values)
value := fmt.Sprintf("%v", val[1])
output += fmt.Sprintf("%s %s%s\n", timestamp, labels, value)
}
}
}
Expand All @@ -424,12 +445,19 @@ func formatLokiResults(result *LokiResult, format string) (string, error) {
output = fmt.Sprintf("Found %d streams:\n\n", len(result.Data.Result))

for i, entry := range result.Data.Result {
// Format stream labels
// Format labels from either stream or metric
streamInfo := "Stream "
var labelMap map[string]string
if len(entry.Stream) > 0 {
labelMap = entry.Stream
} else if len(entry.Metric) > 0 {
labelMap = entry.Metric
}

if len(labelMap) > 0 {
streamInfo += "("
first := true
for k, v := range entry.Stream {
for k, v := range labelMap {
if !first {
streamInfo += ", "
}
Expand All @@ -445,13 +473,23 @@ func formatLokiResults(result *LokiResult, format string) (string, error) {
for _, val := range entry.Values {
if len(val) >= 2 {
// Parse timestamp
ts, err := strconv.ParseFloat(val[0], 64)
timestampStr := fmt.Sprintf("%v", val[0])
ts, err := strconv.ParseFloat(timestampStr, 64)
if err == nil {
// Convert to time - Loki returns timestamps in nanoseconds already
timestamp := time.Unix(0, int64(ts))
output += fmt.Sprintf("[%s] %s\n", timestamp.Format(time.RFC3339), val[1])
// Detect if timestamp is in seconds or nanoseconds
var timestamp time.Time
if ts > 1e15 {
// Nanoseconds (log queries)
timestamp = time.Unix(0, int64(ts))
} else {
// Seconds (metric queries)
timestamp = time.Unix(int64(ts), 0)
}
value := fmt.Sprintf("%v", val[1])
output += fmt.Sprintf("[%s] %s\n", timestamp.Format(time.RFC3339), value)
} else {
output += fmt.Sprintf("[%s] %s\n", val[0], val[1])
value := fmt.Sprintf("%v", val[1])
output += fmt.Sprintf("[%s] %s\n", timestampStr, value)
}
}
}
Expand Down
132 changes: 119 additions & 13 deletions internal/handlers/loki_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestFormatLokiResults_TimestampParsing(t *testing.T) {
"job": "test-job",
"level": "info",
},
Values: [][]string{
Values: [][]interface{}{
{timestampStr, "Test log message"},
},
},
Expand All @@ -35,7 +35,7 @@ func TestFormatLokiResults_TimestampParsing(t *testing.T) {
}

// Format the results
output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestFormatLokiResults_MultipleTimestamps(t *testing.T) {
Stream: map[string]string{
"job": "test-job",
},
Values: [][]string{
Values: [][]interface{}{
{"1705312245000000000", "First log message"}, // 2024-01-15T10:30:45Z
{"1705312260000000000", "Second log message"}, // 2024-01-15T10:31:00Z
{"1705312275000000000", "Third log message"}, // 2024-01-15T10:31:15Z
Expand All @@ -83,7 +83,7 @@ func TestFormatLokiResults_MultipleTimestamps(t *testing.T) {
},
}

output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}
Expand Down Expand Up @@ -111,22 +111,22 @@ func TestFormatLokiResults_InvalidTimestamp(t *testing.T) {
Stream: map[string]string{
"job": "test-job",
},
Values: [][]string{
Values: [][]interface{}{
{"invalid-timestamp", "Log with invalid timestamp"},
},
},
},
},
}

output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}

// Should contain the original invalid timestamp as fallback
if !strings.Contains(output, "[invalid-timestamp]") {
t.Errorf("Expected output to contain '[invalid-timestamp]' as fallback, but got:\n%s", output)
if !strings.Contains(output, "invalid-timestamp") {
t.Errorf("Expected output to contain 'invalid-timestamp' as fallback, but got:\n%s", output)
}

// Should still contain the log message
Expand All @@ -145,7 +145,7 @@ func TestFormatLokiResults_EmptyResult(t *testing.T) {
},
}

output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}
Expand All @@ -172,15 +172,15 @@ func TestFormatLokiResults_RecentTimestamp(t *testing.T) {
Stream: map[string]string{
"job": "recent-test",
},
Values: [][]string{
Values: [][]interface{}{
{timestampStr, "Recent log message"},
},
},
},
},
}

output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}
Expand Down Expand Up @@ -233,15 +233,15 @@ func TestFormatLokiResults_NoYear2262Bug(t *testing.T) {
Stream: map[string]string{
"job": "regression-test",
},
Values: [][]string{
Values: [][]interface{}{
{tc.timestampNs, "Test log message"},
},
},
},
},
}

output, err := formatLokiResults(result)
output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}
Expand All @@ -258,3 +258,109 @@ func TestFormatLokiResults_NoYear2262Bug(t *testing.T) {
})
}
}

// TestFormatLokiResults_NumericValues tests handling of numeric values from aggregation functions
func TestFormatLokiResults_NumericValues(t *testing.T) {
// This simulates a count_over_time() result which returns numeric values
result := &LokiResult{
Status: "success",
Data: LokiData{
ResultType: "matrix",
Result: []LokiEntry{
{
Stream: map[string]string{
"cluster": "mia2",
"container": "ds-microservices-router",
},
Values: [][]interface{}{
{"1705312245", 42.0}, // timestamp as string, count as float64
{"1705312260", 35.0}, // timestamp as string, count as float64
{"1705312275", 18.0}, // timestamp as string, count as float64
},
},
},
},
}

output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}

// Should contain the numeric values as strings
if !strings.Contains(output, "42") {
t.Errorf("Expected output to contain '42', but got:\n%s", output)
}

if !strings.Contains(output, "35") {
t.Errorf("Expected output to contain '35', but got:\n%s", output)
}

if !strings.Contains(output, "18") {
t.Errorf("Expected output to contain '18', but got:\n%s", output)
}

// Should contain the cluster and container labels
if !strings.Contains(output, "cluster=mia2") {
t.Errorf("Expected output to contain 'cluster=mia2', but got:\n%s", output)
}

if !strings.Contains(output, "container=ds-microservices-router") {
t.Errorf("Expected output to contain 'container=ds-microservices-router', but got:\n%s", output)
}
}

// TestFormatLokiResults_MetricQuery tests handling of metric query responses with "metric" field
func TestFormatLokiResults_MetricQuery(t *testing.T) {
result := &LokiResult{
Status: "success",
Data: LokiData{
ResultType: "matrix",
Result: []LokiEntry{
{
Metric: map[string]string{
"job": "prometheus",
"instance": "localhost:9090",
},
Values: [][]interface{}{
{"1705312245", 100.0}, // timestamp in seconds, metric value
{"1705312260", 95.0},
{"1705312275", 88.0},
},
},
},
},
}

output, err := formatLokiResults(result, "raw")
if err != nil {
t.Fatalf("formatLokiResults failed: %v", err)
}

// Should contain the metric values
if !strings.Contains(output, "100") {
t.Errorf("Expected output to contain '100', but got:\n%s", output)
}

if !strings.Contains(output, "95") {
t.Errorf("Expected output to contain '95', but got:\n%s", output)
}

if !strings.Contains(output, "88") {
t.Errorf("Expected output to contain '88', but got:\n%s", output)
}

// Should contain the metric labels
if !strings.Contains(output, "job=prometheus") {
t.Errorf("Expected output to contain 'job=prometheus', but got:\n%s", output)
}

if !strings.Contains(output, "instance=localhost:9090") {
t.Errorf("Expected output to contain 'instance=localhost:9090', but got:\n%s", output)
}

// Should contain properly formatted timestamps (seconds format)
if !strings.Contains(output, "2024-01-") {
t.Errorf("Expected output to contain proper timestamp format '2024-01-', but got:\n%s", output)
}
}