From ecbe3f2c906cb46cb7180994ea1320bd8c779499 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Fri, 11 Mar 2022 16:01:44 -0800 Subject: [PATCH] Add support for log export and easier querying for recent logs (#1047) - Adds the `last` option to specify most recent 1 hour of logs with `last=1h` - Adds the `export=FORMAT` option to download logs in the given time range (and matching filter parameters) to a file. FORMAT can be `csv` or `ndjson`. --- logsearchapi/README.md | 29 +++-- logsearchapi/server/db.go | 239 ++++++++++++++++++++++++++++------ logsearchapi/server/query.go | 35 ++++- logsearchapi/server/server.go | 12 +- 4 files changed, 267 insertions(+), 48 deletions(-) diff --git a/logsearchapi/README.md b/logsearchapi/README.md index a4080526676..9da5133120b 100644 --- a/logsearchapi/README.md +++ b/logsearchapi/README.md @@ -60,15 +60,26 @@ The `token` parameter is used to authenticate the request and should be equal to Additional query parameters specify the logs to be retrieved. -| Query parameter | Value Description | Required | Default | -|----------------------|-------------------------------------------------------------------------------------------------------------------|----------|------------| -| `q` | `reqinfo` or `raw`. | Yes | - | -| `timeStart` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - | -| `timeEnd` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - | -| `timeAsc`/`timeDesc` | Flag parameter (no value); either one may be specified. Specifies result ordering. | No | `timeDesc` | -| `pageSize` | Number of results to return per API call. Allows values between 10 and 10000. | No | `10` | -| `pageNo` | 0-based page number of results. | No | `0` | -| `fp` | Repeatable parameter specifying key-value match filters. See the [filter parameters](#filter-parameters) section. | No | - | +| Query parameter | Value Description | Required | Default | +|----------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------| +| `q` | `reqinfo` or `raw`. | Yes | - | +| `timeStart` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - | +| `timeEnd` | RFC3339 time or date. Examples: `2006-01-02T15:04:05.999999999Z07:00` or `2006-01-02`. | No | - | +| `last` | Represents a integer duration with unit (`24h` or `60m`). Use this to get logs for the most recent time window of the given length. Valid time units are "m" for minutes, "h" for hours. | No | - | +| `timeAsc`/`timeDesc` | Flag parameter (no value); either one may be specified. Specifies result ordering. | No | `timeDesc` | +| `fp` | Repeatable parameter specifying key-value match filters. See the [filter parameters](#filter-parameters) section. | No | - | +| `pageSize` | Number of results to return per API call. Allows values between 10 and 10000. | No | `10` | +| `pageNo` | 0-based page number of results. | No | `0` | +| `export` | Specify an export format. This skips pagination. `csv` and `ndjson` are supported. | No | - | + +For example, to get the last 24 hours of request-info logs dumped in line-delimited JSON format: + +``` +# If your token contains URL-unsafe characters, it must be URL-encoded appropriately as shown with curl below: +curl -XGET -s 'http://logsearch:8080/api/query?q=reqinfo&timeAsc&export=ndjson&last=24h' --data-urlencode 'token=xxx' > /tmp/output.ndjson +``` + + #### Filter Parameters diff --git a/logsearchapi/server/db.go b/logsearchapi/server/db.go index b159cf240df..d896cfbd994 100644 --- a/logsearchapi/server/db.go +++ b/logsearchapi/server/db.go @@ -17,6 +17,7 @@ package server import ( "context" "database/sql" + "encoding/csv" "encoding/json" "fmt" "io" @@ -280,18 +281,45 @@ type ReqInfoRow struct { ResponseContentLength *uint64 `json:"response_content_length"` } +func iPtrToStr(i *uint64) string { + if i == nil { + return "" + } + return fmt.Sprintf("%d", *i) +} + // Search executes a search query on the db. func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) error { ctx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() + var ( + logEventCSVHeader = []string{"event_time", "log"} + reqInfoCSVHeader = []string{ + "time", + "api_name", + "access_key", + "bucket", + "object", + "time_to_response_ns", + "remote_host", + "request_id", + "user_agent", + "response_status", + "response_status_code", + "request_content_length", + "response_content_length", + } + ) + const ( logEventSelect QTemplate = `SELECT event_time, log FROM %s %s ORDER BY event_time %s - OFFSET $1 LIMIT $2;` + %s;` + reqInfoSelect QTemplate = `SELECT time, api_name, access_key, @@ -308,7 +336,7 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro FROM %s %s ORDER BY time %s - OFFSET $1 LIMIT $2;` + %s;` ) timeOrder := "DESC" @@ -316,57 +344,123 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro timeOrder = "ASC" } - jw := json.NewEncoder(w) switch s.Query { case rawQ: whereClauses := []string{} // only filter by time if provided if s.TimeStart != nil { - timeRangeOp := ">=" - timeRangeClause := fmt.Sprintf("event_time %s '%s'", timeRangeOp, s.TimeStart.Format(time.RFC3339Nano)) + timeRangeClause := fmt.Sprintf("event_time >= '%s'", s.TimeStart.Format(time.RFC3339Nano)) whereClauses = append(whereClauses, timeRangeClause) } if s.TimeEnd != nil { - timeRangeOp := "<" - timeRangeClause := fmt.Sprintf("event_time %s '%s'", timeRangeOp, s.TimeEnd.Format(time.RFC3339Nano)) + timeRangeClause := fmt.Sprintf("event_time < '%s'", s.TimeEnd.Format(time.RFC3339Nano)) whereClauses = append(whereClauses, timeRangeClause) } + if s.LastDuration != nil { + // s.TimeEnd and s.TimeStart would be nil due to + // validation of s. + durationSeconds := int64(s.LastDuration.Seconds()) + timeRangeClause := fmt.Sprintf("event_time >= CURRENT_TIMESTAMP - '%d seconds'::interval", durationSeconds) + whereClauses = append(whereClauses, timeRangeClause) + } + whereClause := strings.Join(whereClauses, " AND ") if len(whereClauses) > 0 { whereClause = fmt.Sprintf("WHERE %s", whereClause) } - q := logEventSelect.build(auditLogEventsTable.Name, whereClause, timeOrder) - rows, err := c.QueryContext(ctx, q, s.PageNumber*s.PageSize, s.PageSize) + pagingClause := "" + if s.ExportFormat == "" { + pagingClause = "OFFSET $1 LIMIT $2" + } + + q := logEventSelect.build(auditLogEventsTable.Name, whereClause, timeOrder, pagingClause) + var rows *sql.Rows + var err error + if pagingClause == "" { + rows, err = c.QueryContext(ctx, q) + } else { + rows, err = c.QueryContext(ctx, q, s.PageNumber*s.PageSize, s.PageSize) + } if err != nil { return fmt.Errorf("Error querying db: %v", err) } - var logEventsRaw []logEventRawRow - if err := sqlscan.ScanAll(&logEventsRaw, rows); err != nil { - return fmt.Errorf("Error accessing db: %v", err) - } - // parse the encoded json string stored in the db into a json - // object for output - logEvents := make([]LogEventRow, len(logEventsRaw)) - for i, e := range logEventsRaw { - logEvents[i].EventTime = e.EventTime - logEvents[i].Log = make(map[string]interface{}) - if err := json.Unmarshal([]byte(e.Log), &logEvents[i].Log); err != nil { - return fmt.Errorf("Error decoding json log: %v", err) + defer rows.Close() + + switch s.ExportFormat { + case "ndjson": + jw := json.NewEncoder(w) + for rows.Next() { + var logEventRaw logEventRawRow + if err := sqlscan.ScanRow(&logEventRaw, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + var logEvent LogEventRow + logEvent.EventTime = logEventRaw.EventTime + logEvent.Log = make(map[string]interface{}) + if err := json.Unmarshal([]byte(logEventRaw.Log), &logEvent.Log); err != nil { + return fmt.Errorf("Error decoding json log: %v", err) + } + if err := jw.Encode(logEvent); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + } + + case "csv": + cw := csv.NewWriter(w) + + // Write CSV header + if err := cw.Write(logEventCSVHeader); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + + // Write rows + for rows.Next() { + var logEventRaw logEventRawRow + if err := sqlscan.ScanRow(&logEventRaw, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + record := []string{ + logEventRaw.EventTime.Format(time.RFC3339Nano), + logEventRaw.Log, + } + if err := cw.Write(record); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + } + cw.Flush() + if err := cw.Error(); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + + default: + // Send out one page of results in response. + var logEventsRaw []logEventRawRow + if err := sqlscan.ScanAll(&logEventsRaw, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + // parse the encoded json string stored in the db into a json + // object for output + logEvents := make([]LogEventRow, len(logEventsRaw)) + for i, e := range logEventsRaw { + logEvents[i].EventTime = e.EventTime + logEvents[i].Log = make(map[string]interface{}) + if err := json.Unmarshal([]byte(e.Log), &logEvents[i].Log); err != nil { + return fmt.Errorf("Error decoding json log: %v", err) + } + } + jw := json.NewEncoder(w) + if err := jw.Encode(logEvents); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) } } - if err := jw.Encode(logEvents); err != nil { - return fmt.Errorf("Error writing to output stream: %v", err) - } - case reqInfoQ: - // For this query, $1 and $2 are used for offset and limit. - sqlArgs := []interface{}{s.PageNumber * s.PageSize, s.PageSize} - dollarStart := 3 + case reqInfoQ: + sqlArgs := []interface{}{} + dollarStart := 1 whereClauses := []string{} // only filter by time if provided if s.TimeStart != nil { - // $3 will be used for the time parameter timeRangeOp := ">=" timeRangeClause := fmt.Sprintf("time %s $%d", timeRangeOp, dollarStart) sqlArgs = append(sqlArgs, s.TimeStart.Format(time.RFC3339Nano)) @@ -382,9 +476,16 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro whereClauses = append(whereClauses, timeRangeClause) dollarStart++ } + if s.LastDuration != nil { + // s.TimeEnd and s.TimeStart would be nil due to + // validation of s. + durationSeconds := int64(s.LastDuration.Seconds()) + timeRangeClause := fmt.Sprintf("event_time >= CURRENT_TIMESTAMP - '%d seconds'::interval", durationSeconds) + whereClauses = append(whereClauses, timeRangeClause) + } // Remaining dollar params are added for filter where clauses - filterClauses, filterArgs := generateFilterClauses(s.FParams, dollarStart) + filterClauses, filterArgs, dollarStart := generateFilterClauses(s.FParams, dollarStart) whereClauses = append(whereClauses, filterClauses...) sqlArgs = append(sqlArgs, filterArgs...) @@ -392,17 +493,81 @@ func (c *DBClient) Search(ctx context.Context, s *SearchQuery, w io.Writer) erro if len(whereClauses) > 0 { whereClause = fmt.Sprintf("WHERE %s", whereClause) } - q := reqInfoSelect.build(requestInfoTable.Name, whereClause, timeOrder) + + pagingClause := "" + if s.ExportFormat == "" { + sqlArgs = append(sqlArgs, s.PageNumber*s.PageSize, s.PageSize) + pagingClause = fmt.Sprintf("OFFSET $%d LIMIT $%d", dollarStart, dollarStart+1) + } + + q := reqInfoSelect.build(requestInfoTable.Name, whereClause, timeOrder, pagingClause) rows, err := c.QueryContext(ctx, q, sqlArgs...) if err != nil { return fmt.Errorf("Error querying db: %v", err) } - var reqInfos []ReqInfoRow - if err := sqlscan.ScanAll(&reqInfos, rows); err != nil { - return fmt.Errorf("Error accessing db: %v", err) - } - if err := jw.Encode(reqInfos); err != nil { - return fmt.Errorf("Error writing to output stream: %v", err) + defer rows.Close() + + switch s.ExportFormat { + case "ndjson": + jw := json.NewEncoder(w) + for rows.Next() { + var reqInfo ReqInfoRow + if err := sqlscan.ScanRow(&reqInfo, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + if err := jw.Encode(reqInfo); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + } + + case "csv": + cw := csv.NewWriter(w) + + // Write CSV header + if err := cw.Write(reqInfoCSVHeader); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + + // Write rows + for rows.Next() { + var i ReqInfoRow + if err := sqlscan.ScanRow(&i, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + record := []string{ + i.Time.Format(time.RFC3339Nano), + i.APIName, + i.AccessKey, + i.Bucket, + i.Object, + fmt.Sprintf("%d", i.TimeToResponseNs), + i.RemoteHost, + i.RequestID, + i.UserAgent, + i.ResponseStatus, + fmt.Sprintf("%d", i.ResponseStatusCode), + iPtrToStr(i.RequestContentLength), + iPtrToStr(i.ResponseContentLength), + } + if err := cw.Write(record); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + } + cw.Flush() + if err := cw.Error(); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } + + default: + // Send out one page of results in response + var reqInfos []ReqInfoRow + if err := sqlscan.ScanAll(&reqInfos, rows); err != nil { + return fmt.Errorf("Error accessing db: %v", err) + } + jw := json.NewEncoder(w) + if err := jw.Encode(reqInfos); err != nil { + return fmt.Errorf("Error writing to output stream: %v", err) + } } default: return fmt.Errorf("Invalid query name: %v", s.Query) diff --git a/logsearchapi/server/query.go b/logsearchapi/server/query.go index 42a21ddadf1..5fed2585811 100644 --- a/logsearchapi/server/query.go +++ b/logsearchapi/server/query.go @@ -48,9 +48,11 @@ type SearchQuery struct { Query qType TimeStart *time.Time TimeEnd *time.Time + LastDuration *time.Duration TimeAscending bool PageNumber int PageSize int + ExportFormat string FParams map[fParam]string } @@ -106,8 +108,32 @@ func searchQueryFromRequest(r *http.Request) (*SearchQuery, error) { timeEnd = &ts } + var last *time.Duration + if lastDuration := values.Get("last"); lastDuration != "" { + d, err := time.ParseDuration(lastDuration) + if err != nil { + return nil, fmt.Errorf("Invalid `last` parameter: %s (Use for example `24h` or `90m`)", lastDuration) + } + if timeEnd != nil || timeStart != nil { + return nil, fmt.Errorf("`last` parameter cannot be specified with `timeStart` or `timeEnd`") + } + last = &d + } + + export := "" + if exportParam := values.Get("export"); exportParam != "" { + if exportParam != "csv" && exportParam != "ndjson" { + return nil, fmt.Errorf("Only `csv` and `ndjson` export formats are supported") + } + export = exportParam + } + pageSize := 10 if psParam := values.Get("pageSize"); psParam != "" { + if export != "" { + return nil, fmt.Errorf("`pageSize` may not be specified with `export`") + } + pageSize, err = strconv.Atoi(psParam) if err != nil { return nil, fmt.Errorf("Invalid pageSize parameter: %s", psParam) @@ -119,6 +145,10 @@ func searchQueryFromRequest(r *http.Request) (*SearchQuery, error) { var pageNumber int if pnParam := values.Get("pageStart"); pnParam != "" { + if export != "" { + return nil, fmt.Errorf("`pageStart` may not be specified with `export`") + } + pageNumber, err = strconv.Atoi(pnParam) if err != nil { return nil, fmt.Errorf("Invalid pageStart parameter: %s", pnParam) @@ -157,9 +187,11 @@ func searchQueryFromRequest(r *http.Request) (*SearchQuery, error) { Query: q, TimeStart: timeStart, TimeEnd: timeEnd, + LastDuration: last, TimeAscending: timeAscending, PageSize: pageSize, PageNumber: pageNumber, + ExportFormat: export, FParams: fParams, }, nil } @@ -180,7 +212,7 @@ func parseSQTimeString(s string) (r time.Time, err error) { return } -func generateFilterClauses(m map[fParam]string, dollarStart int) (clauses []string, args []interface{}) { +func generateFilterClauses(m map[fParam]string, dollarStart int) (clauses []string, args []interface{}, dollarEnd int) { for k, v := range m { arg, op := v, "=" if strings.Contains(v, ".") || strings.Contains(v, "*") { @@ -194,5 +226,6 @@ func generateFilterClauses(m map[fParam]string, dollarStart int) (clauses []stri args = append(args, arg) dollarStart++ } + dollarEnd = dollarStart return } diff --git a/logsearchapi/server/server.go b/logsearchapi/server/server.go index ccc90443163..1627682edea 100644 --- a/logsearchapi/server/server.go +++ b/logsearchapi/server/server.go @@ -134,7 +134,17 @@ func (ls *LogSearch) queryHandler(w http.ResponseWriter, r *http.Request) { return } - w.Header().Add("Content-Type", "application/json") + switch sq.ExportFormat { + case "csv": + w.Header().Add("Content-Type", "text/csv") + w.Header().Add("Content-Disposition", "attachment; filename=logs-export.csv") + case "ndjson": + // Ref: https://github.com/ndjson/ndjson-spec + w.Header().Add("Content-Type", "application/x-ndjson") + w.Header().Add("Content-Disposition", "attachment; filename=logs-export.ndjson") + default: + w.Header().Add("Content-Type", "application/json") + } err = ls.DBClient.Search(r.Context(), sq, w) if err != nil { w.Header().Del("Content-Type")