Skip to content

Commit

Permalink
update pullData and consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
weilong.pwl committed Aug 14, 2023
1 parent 2323c24 commit b258497
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 7 deletions.
1 change: 0 additions & 1 deletion client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ type PullLogRequest struct {
LogGroupMaxCount int
Query string
PullMode string
ResponseWithMeta bool
}

func convertLogstore(c *Client, project, logstore string) *LogStore {
Expand Down
3 changes: 3 additions & 0 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
Cursor: cursor,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
if plr.Query != "" {
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytesWithQuery(plr)
if err == nil {
Expand Down
9 changes: 3 additions & 6 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,10 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, nextC
uri += fmt.Sprintf("&end_cursor=%v", plr.EndCursor)
}
if plr.Query != "" {
if plr.PullMode == "" {
plr.PullMode = "scan_on_stream"
}
uri += fmt.Sprintf("&pullMode=%v&query=%v", plr.PullMode, plr.Query)
uri += fmt.Sprintf("&query=%v", plr.Query)
}
if plr.ResponseWithMeta {
uri += "&responseWithMeta=true"
if plr.PullMode != "" {
uri += fmt.Sprintf("&pullMode=%v", plr.PullMode)
}

r, err := request(s.project, "GET", uri, h, nil)
Expand Down

0 comments on commit b258497

Please sign in to comment.