Skip to content

Commit

Permalink
Add support for log export and easier querying for recent logs
Browse files Browse the repository at this point in the history
- Adds the `last` option to specify most recent 1 hour of logs with `last=1h`

- Adds the `export=FORMAT` option to download logs in the given time range (and
matching filter parameters) to a file. FORMAT can be `csv` or `ndjson`.
  • Loading branch information
donatello committed Mar 9, 2022
1 parent d9ac7af commit 7ec03b9
Show file tree
Hide file tree
Showing 4 changed files with 267 additions and 48 deletions.
29 changes: 20 additions & 9 deletions logsearchapi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,26 @@ The `token` parameter is used to authenticate the request and should be equal to

Additional query parameters specify the logs to be retrieved.

| Query parameter | Value Description | Required | Default |
|----------------------|-------------------------------------------------------------------------------------------------------------------|----------|------------|
| `q` | `reqinfo` or `raw`. | Yes | - |
| `timeStart` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - |
| `timeEnd` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - |
| `timeAsc`/`timeDesc` | Flag parameter (no value); either one may be specified. Specifies result ordering. | No | `timeDesc` |
| `pageSize` | Number of results to return per API call. Allows values between 10 and 10000. | No | `10` |
| `pageNo` | 0-based page number of results. | No | `0` |
| `fp` | Repeatable parameter specifying key-value match filters. See the [filter parameters](#filter-parameters) section. | No | - |
| Query parameter | Value Description | Required | Default |
|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------|
| `q` | `reqinfo` or `raw`. | Yes | - |
| `timeStart` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - |
| `timeEnd` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - |
| `last` | Represents a integer duration with unit (`24h` or `60m`). Use this to get logs for the most recent time window of the given length. Valid time units are "m" for minutes, "h" for hours. | No | - |
| `timeAsc`/`timeDesc` | Flag parameter (no value); either one may be specified. Specifies result ordering. | No | `timeDesc` |
| `fp` | Repeatable parameter specifying key-value match filters. See the [filter parameters](#filter-parameters) section. | No | - |
| `pageSize` | Number of results to return per API call. Allows values between 10 and 10000. | No | `10` |
| `pageNo` | 0-based page number of results. | No | `0` |
| `export` | Specify an export format. This skips pagination. `csv` and `ndjson` are supported. | No | - |

For example, to get the last 24 hours of request-info logs dumped in line-delimited JSON format:

```
# If your token contains URL-unsafe characters, it must be URL-encoded appropriately as shown with curl below:
curl -XGET -s 'http://logsearch:8080/api/query?q=reqinfo&timeAsc&export=ndjson&last=24h' --data-urlencode 'token=xxx' > /tmp/output.ndjson
```



#### Filter Parameters

Expand Down
239 changes: 202 additions & 37 deletions logsearchapi/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"database/sql"
"encoding/csv"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -280,18 +281,45 @@ type ReqInfoRow struct {
ResponseContentLength *uint64 `json:"response_content_length"`
}

func iPtrToStr(i *uint64) string {
if i == nil {
return ""
}
return fmt.Sprintf("%d", *i)
}

// Search executes a search query on the db.
func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) error {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()

var (
logEventCSVHeader = []string{"event_time", "log"}
reqInfoCSVHeader = []string{
"time",
"api_name",
"access_key",
"bucket",
"object",
"time_to_response_ns",
"remote_host",
"request_id",
"user_agent",
"response_status",
"response_status_code",
"request_content_length",
"response_content_length",
}
)

const (
logEventSelect QTemplate = `SELECT event_time,
log
FROM %s
%s
ORDER BY event_time %s
OFFSET $1 LIMIT $2;`
%s;`

reqInfoSelect QTemplate = `SELECT time,
api_name,
access_key,
Expand All @@ -308,65 +336,131 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro
FROM %s
%s
ORDER BY time %s
OFFSET $1 LIMIT $2;`
%s;`
)

timeOrder := "DESC"
if s.TimeAscending {
timeOrder = "ASC"
}

jw := json.NewEncoder(w)
switch s.Query {
case rawQ:
whereClauses := []string{}
// only filter by time if provided
if s.TimeStart != nil {
timeRangeOp := ">="
timeRangeClause := fmt.Sprintf("event_time %s '%s'", timeRangeOp, s.TimeStart.Format(time.RFC3339Nano))
timeRangeClause := fmt.Sprintf("event_time >= '%s'", s.TimeStart.Format(time.RFC3339Nano))
whereClauses = append(whereClauses, timeRangeClause)
}
if s.TimeEnd != nil {
timeRangeOp := "<"
timeRangeClause := fmt.Sprintf("event_time %s '%s'", timeRangeOp, s.TimeEnd.Format(time.RFC3339Nano))
timeRangeClause := fmt.Sprintf("event_time < '%s'", s.TimeEnd.Format(time.RFC3339Nano))
whereClauses = append(whereClauses, timeRangeClause)
}
if s.LastDuration != nil {
// s.TimeEnd and s.TimeStart would be nil due to
// validation of s.
durationSeconds := int64(s.LastDuration.Seconds())
timeRangeClause := fmt.Sprintf("event_time >= CURRENT_TIMESTAMP - '%d seconds'::interval", durationSeconds)
whereClauses = append(whereClauses, timeRangeClause)
}

whereClause := strings.Join(whereClauses, " AND ")
if len(whereClauses) > 0 {
whereClause = fmt.Sprintf("WHERE %s", whereClause)
}

q := logEventSelect.build(auditLogEventsTable.Name, whereClause, timeOrder)
rows, err := c.QueryContext(ctx, q, s.PageNumber*s.PageSize, s.PageSize)
pagingClause := ""
if s.ExportFormat == "" {
pagingClause = "OFFSET $1 LIMIT $2"
}

q := logEventSelect.build(auditLogEventsTable.Name, whereClause, timeOrder, pagingClause)
var rows *sql.Rows
var err error
if pagingClause == "" {
rows, err = c.QueryContext(ctx, q)
} else {
rows, err = c.QueryContext(ctx, q, s.PageNumber*s.PageSize, s.PageSize)
}
if err != nil {
return fmt.Errorf("Error querying db: %v", err)
}
var logEventsRaw []logEventRawRow
if err := sqlscan.ScanAll(&logEventsRaw, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
// parse the encoded json string stored in the db into a json
// object for output
logEvents := make([]LogEventRow, len(logEventsRaw))
for i, e := range logEventsRaw {
logEvents[i].EventTime = e.EventTime
logEvents[i].Log = make(map[string]interface{})
if err := json.Unmarshal([]byte(e.Log), &logEvents[i].Log); err != nil {
return fmt.Errorf("Error decoding json log: %v", err)
defer rows.Close()

switch s.ExportFormat {
case "ndjson":
jw := json.NewEncoder(w)
for rows.Next() {
var logEventRaw logEventRawRow
if err := sqlscan.ScanRow(&logEventRaw, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
var logEvent LogEventRow
logEvent.EventTime = logEventRaw.EventTime
logEvent.Log = make(map[string]interface{})
if err := json.Unmarshal([]byte(logEventRaw.Log), &logEvent.Log); err != nil {
return fmt.Errorf("Error decoding json log: %v", err)
}
if err := jw.Encode(logEvent); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}

case "csv":
cw := csv.NewWriter(w)

// Write CSV header
if err := cw.Write(logEventCSVHeader); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}

// Write rows
for rows.Next() {
var logEventRaw logEventRawRow
if err := sqlscan.ScanRow(&logEventRaw, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
record := []string{
logEventRaw.EventTime.Format(time.RFC3339Nano),
logEventRaw.Log,
}
if err := cw.Write(record); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}
cw.Flush()
if err := cw.Error(); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}

default:
// Send out one page of results in response.
var logEventsRaw []logEventRawRow
if err := sqlscan.ScanAll(&logEventsRaw, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
// parse the encoded json string stored in the db into a json
// object for output
logEvents := make([]LogEventRow, len(logEventsRaw))
for i, e := range logEventsRaw {
logEvents[i].EventTime = e.EventTime
logEvents[i].Log = make(map[string]interface{})
if err := json.Unmarshal([]byte(e.Log), &logEvents[i].Log); err != nil {
return fmt.Errorf("Error decoding json log: %v", err)
}
}
jw := json.NewEncoder(w)
if err := jw.Encode(logEvents); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}
if err := jw.Encode(logEvents); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
case reqInfoQ:
// For this query, $1 and $2 are used for offset and limit.
sqlArgs := []interface{}{s.PageNumber * s.PageSize, s.PageSize}

dollarStart := 3
case reqInfoQ:
sqlArgs := []interface{}{}
dollarStart := 1
whereClauses := []string{}
// only filter by time if provided
if s.TimeStart != nil {
// $3 will be used for the time parameter
timeRangeOp := ">="
timeRangeClause := fmt.Sprintf("time %s $%d", timeRangeOp, dollarStart)
sqlArgs = append(sqlArgs, s.TimeStart.Format(time.RFC3339Nano))
Expand All @@ -382,27 +476,98 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro
whereClauses = append(whereClauses, timeRangeClause)
dollarStart++
}
if s.LastDuration != nil {
// s.TimeEnd and s.TimeStart would be nil due to
// validation of s.
durationSeconds := int64(s.LastDuration.Seconds())
timeRangeClause := fmt.Sprintf("event_time >= CURRENT_TIMESTAMP - '%d seconds'::interval", durationSeconds)
whereClauses = append(whereClauses, timeRangeClause)
}

// Remaining dollar params are added for filter where clauses
filterClauses, filterArgs := generateFilterClauses(s.FParams, dollarStart)
filterClauses, filterArgs, dollarStart := generateFilterClauses(s.FParams, dollarStart)
whereClauses = append(whereClauses, filterClauses...)
sqlArgs = append(sqlArgs, filterArgs...)

whereClause := strings.Join(whereClauses, " AND ")
if len(whereClauses) > 0 {
whereClause = fmt.Sprintf("WHERE %s", whereClause)
}
q := reqInfoSelect.build(requestInfoTable.Name, whereClause, timeOrder)

pagingClause := ""
if s.ExportFormat == "" {
sqlArgs = append(sqlArgs, s.PageNumber*s.PageSize, s.PageSize)
pagingClause = fmt.Sprintf("OFFSET $%d LIMIT $%d", dollarStart, dollarStart+1)
}

q := reqInfoSelect.build(requestInfoTable.Name, whereClause, timeOrder, pagingClause)
rows, err := c.QueryContext(ctx, q, sqlArgs...)
if err != nil {
return fmt.Errorf("Error querying db: %v", err)
}
var reqInfos []ReqInfoRow
if err := sqlscan.ScanAll(&reqInfos, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
if err := jw.Encode(reqInfos); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
defer rows.Close()

switch s.ExportFormat {
case "ndjson":
jw := json.NewEncoder(w)
for rows.Next() {
var reqInfo ReqInfoRow
if err := sqlscan.ScanRow(&reqInfo, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
if err := jw.Encode(reqInfo); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}

case "csv":
cw := csv.NewWriter(w)

// Write CSV header
if err := cw.Write(reqInfoCSVHeader); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}

// Write rows
for rows.Next() {
var i ReqInfoRow
if err := sqlscan.ScanRow(&i, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
record := []string{
i.Time.Format(time.RFC3339Nano),
i.APIName,
i.AccessKey,
i.Bucket,
i.Object,
fmt.Sprintf("%d", i.TimeToResponseNs),
i.RemoteHost,
i.RequestID,
i.UserAgent,
i.ResponseStatus,
fmt.Sprintf("%d", i.ResponseStatusCode),
iPtrToStr(i.RequestContentLength),
iPtrToStr(i.ResponseContentLength),
}
if err := cw.Write(record); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}
cw.Flush()
if err := cw.Error(); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}

default:
// Send out one page of results in response
var reqInfos []ReqInfoRow
if err := sqlscan.ScanAll(&reqInfos, rows); err != nil {
return fmt.Errorf("Error accessing db: %v", err)
}
jw := json.NewEncoder(w)
if err := jw.Encode(reqInfos); err != nil {
return fmt.Errorf("Error writing to output stream: %v", err)
}
}
default:
return fmt.Errorf("Invalid query name: %v", s.Query)
Expand Down
Loading

0 comments on commit 7ec03b9

Please sign in to comment.