Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add query for client #220

Merged
merged 5 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,14 @@ type ClientInterface interface {
// The nextCursor is the next curosr can be used to read logs at next time.
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error)
GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error)
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogs query logs with [from, to) time range
Expand Down
21 changes: 19 additions & 2 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,20 @@ func (c *Client) GetPrevCursorTime(project, logstore string, shardID int, cursor
// The nextCursor is the next curosr can be used to read logs at next time.
func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, project, logstore)
return ls.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
}

func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesV2(plr)
}

// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
Expand All @@ -200,6 +212,11 @@ func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCurs
return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount)
}

func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsV2(plr)
}

// GetHistograms query logs with [from, to) time range
func (c *Client) GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error) {
ls := convertLogstore(c, project, logstore)
Expand Down
2 changes: 2 additions & 0 deletions consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type LogHubConfig struct {
//:param SecurityToken: If you use sts token to consume data, you must make sure consumer will be stopped before this token expired.
//:param Project:
//:param Logstore:
//:param Query:
//:param ConsumerGroupName:
//:param ConsumerName:
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
Expand Down Expand Up @@ -43,6 +44,7 @@ type LogHubConfig struct {
AccessKeySecret string
Project string
Logstore string
Query string
ConsumerGroupName string
ConsumerName string
CursorPosition string
Expand Down
15 changes: 12 additions & 3 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,19 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
ShardID: shardId,
Query: consumer.option.Query,
Cursor: cursor,
LogGroupMaxCount: consumer.option.MaxFetchLogGroupCount,
}
if plr.Query != "" {
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytes(consumer.option.Project, consumer.option.Logstore, shardId,
cursor, "",
consumer.option.MaxFetchLogGroupCount)
logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
Expand Down
36 changes: 25 additions & 11 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,25 +437,29 @@ func (s *LogStore) GetCursor(shardID int, from string) (cursor string, err error
return cursor, nil
}

func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.GetLogsBytesV2(plr)
}

// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
"Accept-Encoding": "lz4",
}

uri := ""
if endCursor == "" {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&count=%v",
s.Name, shardID, cursor, logGroupMaxCount)
} else {
uri = fmt.Sprintf("/logstores/%v/shards/%v?type=logs&cursor=%v&end_cursor=%v&count=%v",
s.Name, shardID, cursor, endCursor, logGroupMaxCount)
}
urlVal := plr.ToURLParams()
uri := fmt.Sprintf("/logstores/%v/shards/%v?%s", s.Name, plr.ShardID, urlVal.Encode())

r, err := request(s.project, "GET", uri, h, nil)
if err != nil {
Expand Down Expand Up @@ -536,8 +540,18 @@ func LogsBytesDecode(data []byte) (gl *LogGroupList, err error) {
// @note if you want to pull logs continuous, set endCursor = ""
func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.PullLogsV2(plr)
}

func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {

out, nextCursor, err := s.GetLogsBytes(shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err := s.GetLogsBytesV2(plr)
if err != nil {
return nil, "", err
}
Expand Down
29 changes: 29 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,35 @@ func (glr *GetLogRequest) ToURLParams() url.Values {
return urlVal
}

type PullLogRequest struct {
Project string
Logstore string
ShardID int
Cursor string
EndCursor string
LogGroupMaxCount int
Query string
PullMode string
}

func (plr *PullLogRequest) ToURLParams() url.Values {
urlVal := url.Values{}
urlVal.Add("type", "logs")
urlVal.Add("cursor", plr.Cursor)
urlVal.Add("count", strconv.Itoa(plr.LogGroupMaxCount))
if plr.EndCursor != "" {
urlVal.Add("end_cursor", plr.EndCursor)
}
if plr.Query != "" {
urlVal.Add("query", plr.Query)
}
if plr.PullMode != "" {
urlVal.Add("pullMode", plr.PullMode)
}

return urlVal
}

// GetHistogramsResponse defines response from GetHistograms call
type SingleHistogram struct {
Progress string `json:"progress"`
Expand Down
28 changes: 26 additions & 2 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,8 +770,20 @@ func (c *TokenAutoUpdateClient) GetCursorTime(project, logstore string, shardID

func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
}

func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
out, nextCursor, err = c.logClient.GetLogsBytes(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
out, nextCursor, err = c.logClient.GetLogsBytesV2(plr)
if !c.processError(err) {
return
}
Expand All @@ -781,8 +793,20 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i

func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error) {
plr := &PullLogRequest{
Project: project,
Logstore: logstore,
ShardID: shardID,
Cursor: cursor,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.PullLogsV2(plr)
}

func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
for i := 0; i < c.maxTryTimes; i++ {
gl, nextCursor, err = c.logClient.PullLogs(project, logstore, shardID, cursor, endCursor, logGroupMaxCount)
gl, nextCursor, err = c.logClient.PullLogsV2(plr)
if !c.processError(err) {
return
}
Expand Down
Loading