Skip to content

Commit

Permalink
Feat cgo-zstd (#284)
Browse files Browse the repository at this point in the history
* test: add two types of zstd compressor

* feat: mv cgo to extension

* feat: add cgo-extensions

* chore: update readme

* chore: readme

* chore: rm

* refine name

* rename package name

* refine readme

* update readme
  • Loading branch information
crimson-gao authored Jul 31, 2024
1 parent 8b8880e commit 9a152f5
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 21 deletions.
77 changes: 77 additions & 0 deletions cgo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
## 依赖
使用此扩展,需要设置 go env 中的 CGO_ENABLED=1,且环境中已安装了合适的编译器,linux 上通常是 gcc。
可以通过以下命令查看当前 CGO_ENABLED 是否打开。

```bash
go env | grep CGO_ENABLED
```
查看默认的编译器。
```bash
go env | grep CC
```

如果 CGO_ENABLED 值是 1,则可跳过下面开启 CGO_ENABLED 的步骤。

### 全局永久开启
```bash
go env -w CGO_ENABLED=1
```

### 临时开启
```bash
CGO_ENABLED=1 go build
```

## 使用方法
开启 cgo-zstd 扩展

```golang
import (
cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
sls "github.com/aliyun/aliyun-log-go-sdk"
)
cgo.SetZstdCgoCompressor(1)
```


使用 zstd 压缩写入日志的示例
```golang
import (
"time"

cgo "github.com/aliyun/aliyun-log-go-sdk/cgo"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/golang/protobuf/proto"
)

func main() {
cgo.SetZstdCgoCompressor(1)
client := sls.CreateNormalInterface("endpoint",
"accessKeyId", "accessKeySecret", "")
lg := &sls.LogGroup{
Logs: []*sls.Log{
{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: []*sls.LogContent{
{
Key: proto.String("HELLO"),
Value: proto.String("world"),
},
},
},
},
}
err := client.PostLogStoreLogsV2(
"your-project",
"your-logstore",
&sls.PostLogStoreLogsRequest{
LogGroup: lg,
CompressType: sls.Compress_ZSTD, // 指定压缩方式为 ZSTD
},
)
if err != nil {
panic(err)
}

}
```
42 changes: 42 additions & 0 deletions cgo/compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cgo

import (
"sync"

"github.com/DataDog/zstd"
sls "github.com/aliyun/aliyun-log-go-sdk"
)

func SetZstdCgoCompressor(compressLevel int) error {
sls.SetZstdCompressor(newZstdCompressor(compressLevel))
return nil
}

type zstdCompressor struct {
ctxPool sync.Pool
level int
}

func newZstdCompressor(level int) *zstdCompressor {
res := &zstdCompressor{
level: level,
}
res.ctxPool = sync.Pool{
New: func() interface{} {
return zstd.NewCtx()
},
}
return res
}

func (c *zstdCompressor) Compress(src, dst []byte) ([]byte, error) {
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
defer c.ctxPool.Put(zstdCtx)
return zstdCtx.CompressLevel(dst, src, c.level)
}

func (c *zstdCompressor) Decompress(src, dst []byte) ([]byte, error) {
zstdCtx := c.ctxPool.Get().(zstd.Ctx)
defer c.ctxPool.Put(zstdCtx)
return zstdCtx.Decompress(dst, src)
}
52 changes: 52 additions & 0 deletions compressor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sls

import (
"github.com/klauspost/compress/zstd"
)

var slsZstdCompressor LogCompressor = NewZstdCompressor(zstd.SpeedFastest)

func SetZstdCompressor(compressor LogCompressor) error {
slsZstdCompressor = compressor
return nil
}

type LogCompressor interface {
// Compress src into dst. If you have a buffer to use, you can pass it to
// prevent allocation. If it is too small, or if nil is passed, a new buffer
// will be allocated and returned.
Compress(src, dst []byte) ([]byte, error)
// Decompress src into dst. If you have a buffer to use, you can pass it to
// prevent allocation. If it is too small, or if nil is passed, a new buffer
// will be allocated and returned.
Decompress(src, dst []byte) ([]byte, error)
}

type ZstdCompressor struct {
writer *zstd.Encoder
reader *zstd.Decoder
level zstd.EncoderLevel
}

func NewZstdCompressor(level zstd.EncoderLevel) *ZstdCompressor {
res := &ZstdCompressor{
level: level,
}
res.writer, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(res.level))
res.reader, _ = zstd.NewReader(nil)
return res
}

func (c *ZstdCompressor) Compress(src, dst []byte) ([]byte, error) {
if dst != nil {
return c.writer.EncodeAll(src, dst[:0]), nil
}
return c.writer.EncodeAll(src, nil), nil
}

func (c *ZstdCompressor) Decompress(src, dst []byte) ([]byte, error) {
if dst != nil {
return c.reader.DecodeAll(src, dst[:0])
}
return c.reader.DecodeAll(src, nil)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/aliyun/aliyun-log-go-sdk
go 1.19

require (
github.com/DataDog/zstd v1.5.5
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d
github.com/alibabacloud-go/darabonba-openapi/v2 v2.0.4
github.com/alibabacloud-go/sts-20150401/v2 v2.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ=
github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d h1:wvStE9wLpws31NiWUx+38wny1msZ/tm+eL5xmm4Y7So=
github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRyW7fC9XJOWQ+NdAv8VLG7ys7l3x4ozEGLUQ=
Expand Down
26 changes: 5 additions & 21 deletions log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,12 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/klauspost/compress/zstd"
"github.com/pierrec/lz4"
)

// this file is deprecated and no maintenance
// see client_logstore.go

var (
zstdReader = newZstdReader()
zstdWriter = newZstdWriter()
)

func newZstdReader() *zstd.Decoder {
r, _ := zstd.NewReader(nil)
return r
}

func newZstdWriter() *zstd.Encoder {
w, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedDefault))
return w
}

// LogStore defines LogStore struct
type LogStore struct {
Name string `json:"logstoreName"`
Expand Down Expand Up @@ -185,7 +169,7 @@ func (s *LogStore) PutRawLog(rawLogData []byte) (err error) {
}
outLen = n
case Compress_ZSTD:
out = zstdWriter.EncodeAll(rawLogData, nil)
out, _ = slsZstdCompressor.Compress(rawLogData, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(rawLogData)),
Expand Down Expand Up @@ -255,7 +239,7 @@ func (s *LogStore) PostRawLogs(body []byte, hashKey *string) (err error) {
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out = zstdWriter.EncodeAll(body, nil)
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
Expand Down Expand Up @@ -327,7 +311,7 @@ func (s *LogStore) PutLogs(lg *LogGroup) (err error) {
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out = zstdWriter.EncodeAll(body, nil)
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
Expand Down Expand Up @@ -408,7 +392,7 @@ func (s *LogStore) PostLogStoreLogs(lg *LogGroup, hashKey *string) (err error) {
outLen = n
case Compress_ZSTD:
// Compress body with zstd
out = zstdWriter.EncodeAll(body, nil)
out, _ = slsZstdCompressor.Compress(body, nil)
h = map[string]string{
"x-log-compresstype": "zstd",
"x-log-bodyrawsize": strconv.Itoa(len(body)),
Expand Down Expand Up @@ -590,7 +574,7 @@ func (s *LogStore) GetLogsBytesWithQuery(plr *PullLogRequest) (out []byte, pullL
return nil, nil, fmt.Errorf("uncompressed size %d does not match 'x-log-bodyrawsize' %d", uncompressedSize, pullLogMeta.RawSize)
}
case Compress_ZSTD:
out, err = zstdReader.DecodeAll(buf, out[:0])
out, err = slsZstdCompressor.Decompress(buf, out)
if err != nil {
return nil, nil, err
}
Expand Down

0 comments on commit 9a152f5

Please sign in to comment.