Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support spl #251

Merged
merged 16 commits into from
Jan 29, 2024
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ go get -u github.com/aliyun/aliyun-log-go-sdk
Client = sls.CreateNormalInterfaceV2(Endpoint, credentialsProvider)
```

为了防止出现配置错误,您可以在创建 Client 之后,测试一下 Client 是否能成功调用 SLS API
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一下 去掉

```go
_, err := Client.ListProject()
if err != nil {
panic(err)
}
```

2. **创建project**

参考 [log_project.go](https://github.com/aliyun/aliyun-log-go-sdk/blob/master/example/project/log_project.go)文件
Expand Down
4 changes: 2 additions & 2 deletions client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ type ClientInterface interface {
// The nextCursor is the next curosr can be used to read logs at next time.
GetLogsBytes(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (out []byte, nextCursor string, err error)
GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error)
GetLogsBytesV2(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error)
// PullLogs gets logs from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next cursor can be used to read logs at next time.
// @note if you want to pull logs continuous, set endCursor = ""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetLogsBytesV2 也是要废弃的吧?注解

PullLogs(project, logstore string, shardID int, cursor, endCursor string,
logGroupMaxCount int) (gl *LogGroupList, nextCursor string, err error)
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error)
PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, pullLogMeta *PullLogMeta, err error)
// GetHistograms query logs with [from, to) time range
GetHistograms(project, logstore string, topic string, from int64, to int64, queryExp string) (*GetHistogramsResponse, error)
// GetLogs query logs with [from, to) time range
Expand Down
14 changes: 11 additions & 3 deletions client_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ func convertLogstore(c *Client, project, logstore string) *LogStore {
}
}

func convertByteV1(out []byte, meta *PullLogMeta, err error) ([]byte, string, error) {
return out, meta.NextCursor, err
}

func convertGlV1(gl *LogGroupList, meta *PullLogMeta, err error) (*LogGroupList, string, error) {
return gl, meta.NextCursor, err
}

// ListShards returns shard id list of this logstore.
func (c *Client) ListShards(project, logstore string) (shardIDs []*Shard, err error) {
ls := convertLogstore(c, project, logstore)
Expand Down Expand Up @@ -194,10 +202,10 @@ func (c *Client) GetLogsBytes(project, logstore string, shardID int, cursor, end
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
return convertByteV1(c.GetLogsBytesV2(plr))
}

func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
func (c *Client) GetLogsBytesV2(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.GetLogsBytesV2(plr)
}
Expand All @@ -212,7 +220,7 @@ func (c *Client) PullLogs(project, logstore string, shardID int, cursor, endCurs
return ls.PullLogs(shardID, cursor, endCursor, logGroupMaxCount)
}

func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
func (c *Client) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, pullLogMeta *PullLogMeta, err error) {
ls := convertLogstore(c, plr.Project, plr.Logstore)
return ls.PullLogsV2(plr)
}
Expand Down
1 change: 1 addition & 0 deletions consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ LogHubConfig是提供给用户的配置类,用于配置消费策略,您可
|SecurityToken|aliyun SecurityToken|非必填,参考https://help.aliyun.com/document_detail/47277.html|
|AutoCommitDisabled|是否禁用sdk自动提交checkpoint|非必填,默认不会禁用|
|AutoCommitIntervalInMS|自动提交checkpoint的时间间隔|非必填,单位为MS,默认时间为60s|
|Query|过滤规则 基于规则消费时必须设置对应规则 如 *| where a = 'xxx'|非必填|

2.**覆写消费逻辑**

Expand Down
2 changes: 1 addition & 1 deletion consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type LogHubConfig struct {
//:param CredentialsProvider: CredentialsProvider that providers credentials(AccessKeyID, AccessKeySecret, StsToken)
//:param Project:
//:param Logstore:
//:param Query:
//:param Query: Filter rules Corresponding rules must be set when consuming based on rules, such as *| where a = 'xxx'
//:param ConsumerGroupName:
//:param ConsumerName:
//:param CursorPosition: This options is used for initialization, will be ignored once consumer group is created and each shard has beeen started to be consumed.
Expand Down
12 changes: 2 additions & 10 deletions consumer/consumer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ func (consumer *ConsumerClient) getCursor(shardId int, from string) (string, err
return cursor, err
}

func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, nextCursor string, rawSize int, err error) {
var logBytes []byte
func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.LogGroupList, pullLogMeta *sls.PullLogMeta, err error) {
plr := &sls.PullLogRequest{
Project: consumer.option.Project,
Logstore: consumer.option.Logstore,
Expand All @@ -145,14 +144,7 @@ func (consumer *ConsumerClient) pullLogs(shardId int, cursor string) (gl *sls.Lo
plr.PullMode = "scan_on_stream"
}
for retry := 0; retry < 3; retry++ {
logBytes, nextCursor, err = consumer.client.GetLogsBytesV2(plr)
if err == nil {
rawSize = len(logBytes)
gl, err = sls.LogsBytesDecode(logBytes)
if err == nil {
break
}
}
gl, pullLogMeta, err = consumer.client.PullLogsV2(plr)
if err != nil {
slsError, ok := err.(sls.Error)
if ok {
Expand Down
16 changes: 13 additions & 3 deletions consumer/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,26 @@ func (consumer *ShardConsumerWorker) nextFetchTask() error {
// update last fetch time, for control fetch frequency
consumer.lastFetchTime = time.Now()

logGroup, nextCursor, rawSize, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
logGroup, pullLogMeta, err := consumer.client.pullLogs(consumer.shardId, consumer.nextFetchCursor)
if err != nil {
return err
}
// set cursors user to decide whether to save according to the execution of `process`
consumer.consumerCheckPointTracker.setCurrentCursor(consumer.nextFetchCursor)
consumer.lastFetchLogGroupList = logGroup
consumer.nextFetchCursor = nextCursor
consumer.lastFetchRawSize = rawSize
consumer.nextFetchCursor = pullLogMeta.NextCursor
consumer.lastFetchRawSize = pullLogMeta.DataSize
consumer.lastFetchGroupCount = GetLogGroupCount(consumer.lastFetchLogGroupList)
if consumer.client.option.Query != "" {
consumer.lastFetchRawSize = pullLogMeta.RawDataSize
consumer.lastFetchGroupCount = pullLogMeta.RawDataCount
if consumer.lastFetchRawSize == -1 {
consumer.lastFetchRawSize = 0
}
if consumer.lastFetchGroupCount == -1 {
consumer.lastFetchGroupCount = 0
}
}
consumer.consumerCheckPointTracker.setNextCursor(consumer.nextFetchCursor)
level.Debug(consumer.logger).Log(
"shardId", consumer.shardId,
Expand Down
10 changes: 8 additions & 2 deletions example/consumer/query_demo/simple_demo_with_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func main() {
ConsumerName: "",
// This options is used for initialization, will be ignored once consumer group is created and each shard has been started to be consumed.
// Could be "begin", "end", "specific time format in time stamp", it's log receiving time.
CursorPosition: consumerLibrary.BEGIN_CURSOR,
CursorPosition: consumerLibrary.SPECIAL_TIMER_CURSOR,
CursorStartTime: 1706077849,
// Query is for log pre-handling before return to client, more info refer to https://www.alibabacloud.com/help/zh/sls/user-guide/rule-based-consumption
Query: "* | where cast(body_bytes_sent as bigint) > 14000",
}
Expand All @@ -43,7 +44,12 @@ func main() {
// Fill in your consumption logic here, and be careful not to change the parameters of the function and the return value,
// otherwise you will report errors.
func process(shardId int, logGroupList *sls.LogGroupList, checkpointTracker consumerLibrary.CheckPointTracker) (string, error) {
fmt.Println(shardId, logGroupList)
for _, logGroup := range logGroupList.LogGroups {
for _, log := range logGroup.Logs {
fmt.Println("log_content: ", log.Contents)
}
}
fmt.Printf("shardId %v processing works sucess\n", shardId)
checkpointTracker.SaveCheckPoint(false)
return "", nil
}
57 changes: 41 additions & 16 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,13 +445,13 @@ func (s *LogStore) GetLogsBytes(shardID int, cursor, endCursor string,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.GetLogsBytesV2(plr)
return convertByteV1(s.GetLogsBytesV2(plr))
}

// GetLogsBytes gets logs binary data from shard specified by shardId according cursor and endCursor.
// The logGroupMaxCount is the max number of logGroup could be returned.
// The nextCursor is the next curosr can be used to read logs at next time.
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
h := map[string]string{
"x-log-bodyrawsize": "0",
"Accept": "application/x-protobuf",
Expand All @@ -463,12 +463,12 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s

r, err := request(s.project, "GET", uri, h, nil)
AVloger marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, "", err
return
}
defer r.Body.Close()
buf, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, "", err
return
}

if r.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -500,22 +500,47 @@ func (s *LogStore) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor s
err = fmt.Errorf("can't find 'x-log-cursor' header")
return
}
nextCursor = v[0]
pullLogMeta = &PullLogMeta{
RawDataSize: -1,
RawDataCount: -1,
}

pullLogMeta.NextCursor = v[0]

v, ok = r.Header["X-Log-Bodyrawsize"]
if !ok || len(v) == 0 {
err = fmt.Errorf("can't find 'x-log-bodyrawsize' header")
return
}
bodyRawSize, err := strconv.Atoi(v[0])
pullLogMeta.DataSize, err = strconv.Atoi(v[0])
if err != nil {
return nil, "", err
return
}

out = make([]byte, bodyRawSize)
if bodyRawSize != 0 {
out = make([]byte, pullLogMeta.DataSize)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L520/L521换一下顺序

if pullLogMeta.DataSize != 0 {
len := 0
if len, err = lz4.UncompressBlock(buf, out); err != nil || len != bodyRawSize {
if len, err = lz4.UncompressBlock(buf, out); err != nil || len != pullLogMeta.DataSize {
return
}
}
// If it is not in scan mode, exit early
if plr.PullMode != "scan_on_stream" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

最好这样改:
if plr.PullMode == "scan_on_stream" {
xxx
}

return
}
//datasize before data processing
v = r.Header["X-Log-Rawdatasize"]
if len(v) > 0 {
pullLogMeta.RawDataSize, err = strconv.Atoi(v[0])
if err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

header非法或取不到,需要返回用户更容易理解的信息

可以参考以前的:

v, ok = r.Header["X-Log-Bodyrawsize"]
if !ok || len(v) == 0 {
	err = fmt.Errorf("can't find 'x-log-bodyrawsize' header")
	return
}

return
}
}
//lines before data processing
v = r.Header["X-Log-Rawdatacount"]
if len(v) > 0 {
pullLogMeta.RawDataCount, err = strconv.Atoi(v[0])
if err != nil {
return
}
}
Expand Down Expand Up @@ -546,22 +571,22 @@ func (s *LogStore) PullLogs(shardID int, cursor, endCursor string,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return s.PullLogsV2(plr)
return convertGlV1(s.PullLogsV2(plr))
}

func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
func (s *LogStore) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, pullLogMeta *PullLogMeta, err error) {

out, nextCursor, err := s.GetLogsBytesV2(plr)
out, pullLogMeta, err := s.GetLogsBytesV2(plr)
if err != nil {
return nil, "", err
return nil, nil, err
}

gl, err = LogsBytesDecode(out)
if err != nil {
return nil, "", err
return nil, nil, err
}

return gl, nextCursor, nil
return
}

// GetHistograms query logs with [from, to) time range
Expand Down
5 changes: 5 additions & 0 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (plr *PullLogRequest) ToURLParams() url.Values {
return urlVal
}

type PullLogMeta struct {
NextCursor string
DataSize, RawDataSize, RawDataCount int
}

// GetHistogramsResponse defines response from GetHistograms call
type SingleHistogram struct {
Progress string `json:"progress"`
Expand Down
12 changes: 6 additions & 6 deletions token_auto_update_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,12 +813,12 @@ func (c *TokenAutoUpdateClient) GetLogsBytes(project, logstore string, shardID i
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.GetLogsBytesV2(plr)
return convertByteV1(c.GetLogsBytesV2(plr))
}

func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, nextCursor string, err error) {
func (c *TokenAutoUpdateClient) GetLogsBytesV2(plr *PullLogRequest) (out []byte, pullLogMeta *PullLogMeta, err error) {
for i := 0; i < c.maxTryTimes; i++ {
out, nextCursor, err = c.logClient.GetLogsBytesV2(plr)
out, pullLogMeta, err = c.logClient.GetLogsBytesV2(plr)
if !c.processError(err) {
return
}
Expand All @@ -836,12 +836,12 @@ func (c *TokenAutoUpdateClient) PullLogs(project, logstore string, shardID int,
EndCursor: endCursor,
LogGroupMaxCount: logGroupMaxCount,
}
return c.PullLogsV2(plr)
return convertGlV1(c.logClient.PullLogsV2(plr))
}

func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, nextCursor string, err error) {
func (c *TokenAutoUpdateClient) PullLogsV2(plr *PullLogRequest) (gl *LogGroupList, pullLogMeta *PullLogMeta, err error) {
for i := 0; i < c.maxTryTimes; i++ {
gl, nextCursor, err = c.logClient.PullLogsV2(plr)
gl, pullLogMeta, err = c.logClient.PullLogsV2(plr)
if !c.processError(err) {
return
}
Expand Down