Skip to content

Commit 2ee5838

Browse files
committed
adding max interval
1 parent b4b1936 commit 2ee5838

File tree

6 files changed

+63
-48
lines changed

6 files changed

+63
-48
lines changed

README.md

+18-18
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ type Option struct {
5858
Level slog.Leveler
5959

6060
// parquet rows buffer
61-
Buffer parquet.ParquetBuffer
61+
Buffer slogparquet.ParquetBuffer
6262

6363
// optional: customize json payload builder
6464
Converter Converter
@@ -68,7 +68,7 @@ type Option struct {
6868
### Parquet buffer
6969

7070
```go
71-
func NewParquetBuffer(bucket objstore.Bucket, prefix string, rows int) parquet.ParquetBuffer
71+
func NewParquetBuffer(bucket objstore.Bucket, prefix string, maxRecords int, maxInterval time.Duration) slogparquet.ParquetBuffer
7272
```
7373

7474
Attributes will be injected in log payload.
@@ -88,20 +88,20 @@ import (
8888
)
8989

9090
func main() {
91-
bucket, _ := s3.NewBucketWithConfig(
92-
slogparquet.NewLogger(),
93-
s3.Config{
94-
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
95-
Region: os.Getenv("AWS_S3_REGION"),
96-
Bucket: os.Getenv("AWS_S3_BUCKET"),
97-
AccessKey: os.Getenv("AWS_ACCESS_KEY"),
98-
SecretKey: os.Getenv("AWS_SECRET_KEY"),
99-
PartSize: 16 * 1024 * 1024, // 16MB
100-
},
101-
"logger",
102-
)
103-
104-
buffer := slogparquet.NewParquetBuffer(bucket, "api/logs", 10*1024*1024)
91+
bucket, _ := s3.NewBucketWithConfig(
92+
slogparquet.NewLogger(),
93+
s3.Config{
94+
Endpoint: os.Getenv("AWS_S3_ENDPOINT"),
95+
Region: os.Getenv("AWS_S3_REGION"),
96+
Bucket: os.Getenv("AWS_S3_BUCKET"),
97+
AccessKey: os.Getenv("AWS_ACCESS_KEY"),
98+
SecretKey: os.Getenv("AWS_SECRET_KEY"),
99+
PartSize: 16 * 1024 * 1024, // 16MB
100+
},
101+
"logger",
102+
)
103+
104+
buffer := slogparquet.NewParquetBuffer(bucket, "api/logs", 10*1024*1024)
105105

106106
logger := slog.New(slogparquet.Option{Level: slog.LevelDebug, Buffer: buffer}.NewParquetHandler())
107107
logger = logger.
@@ -126,8 +126,8 @@ func main() {
126126
).
127127
Info("user registration")
128128

129-
buffer.Flush(true)
130-
bucket.Close()
129+
buffer.Flush(true)
130+
bucket.Close()
131131
}
132132
```
133133

docker-compose.yml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
version: '3'
2+
3+
services:
4+
minio:
5+
image: minio/minio
6+
ports:
7+
- "9000:9000" # s3-compatible api
8+
- "9001:9001" # web ui
9+
environment:
10+
MINIO_ROOT_USER: helloworld
11+
MINIO_ROOT_PASSWORD: helloworld
12+
command: server --console-address ":9001" /data

example/example.go

+22-21
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@ import (
1313
)
1414

1515
func main() {
16+
// export AWS_ACCESS_KEY=helloworld
17+
// export AWS_SECRET_KEY=helloworld
18+
// export AWS_S3_REGION=fr-par
19+
// export AWS_S3_ENDPOINT=localhost:9000
20+
// export AWS_S3_BUCKET=slog-test
21+
// go run *.go
22+
1623
bucket, err := s3.NewBucketWithConfig(
1724
slogparquet.NewLogger(),
1825
s3.Config{
@@ -29,7 +36,7 @@ func main() {
2936
log.Fatal(err)
3037
}
3138

32-
buffer := slogparquet.NewParquetBuffer(bucket, "logs", 10*1024*1024)
39+
buffer := slogparquet.NewParquetBuffer(bucket, "logs", 10*1024*1024, 1*time.Second)
3340

3441
logger := slog.New(slogparquet.Option{Level: slog.LevelDebug, Buffer: buffer}.NewParquetHandler())
3542
logger = logger.
@@ -54,26 +61,20 @@ func main() {
5461
).
5562
Info("user registration")
5663

64+
for i := 0; i < 10_000_000; i++ {
65+
logger.
66+
With(
67+
slog.Group("user",
68+
slog.String("id", "user-123"),
69+
slog.Time("created_at", time.Now().AddDate(0, 0, -1)),
70+
),
71+
).
72+
With("a", i).
73+
With("environment", "dev").
74+
With("error", fmt.Errorf("an error")).
75+
Error("A message")
76+
}
77+
5778
buffer.Flush(true)
5879
bucket.Close()
59-
60-
// logger := slog.New(slogparquet.Option{Level: slog.LevelDebug, Buffer: buffer}.NewParquetHandler())
61-
// logger = logger.With("release", "v1.0.0")
62-
63-
// for i := 0; i < 10_000_000; i++ {
64-
// logger.
65-
// With(
66-
// slog.Group("user",
67-
// slog.String("id", "user-123"),
68-
// slog.Time("created_at", time.Now().AddDate(0, 0, -1)),
69-
// ),
70-
// ).
71-
// With("a", i).
72-
// With("environment", "dev").
73-
// With("error", fmt.Errorf("an error")).
74-
// Error("A message")
75-
// }
76-
77-
// buffer.Flush(true)
78-
// bucket.Close()
7980
}

example/go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ require (
77
github.com/thanos-io/objstore v0.0.0-20230816175749-20395bffdf26
88
)
99

10+
replace github.com/samber/slog-parquet => ../
11+
1012
require (
1113
github.com/andybalholm/brotli v1.0.5 // indirect
1214
github.com/aws/aws-sdk-go-v2 v1.16.0 // indirect

example/go.sum

-2
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,6 @@ github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
311311
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
312312
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
313313
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
314-
github.com/samber/slog-parquet v0.0.0-20230819003546-f6e0e0bb6a8a h1:Bi9kTzF4F75Lry3MDi7hRk9FaNIYwTgXbWz9Ipth97s=
315-
github.com/samber/slog-parquet v0.0.0-20230819003546-f6e0e0bb6a8a/go.mod h1:mz88Z+AwBfpgRltNWNbD3Mu4yxNNgHFsraPVzmCCNgk=
316314
github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg=
317315
github.com/segmentio/encoding v0.3.6 h1:E6lVLyDPseWEulBmCmAKPanDd3jiyGDo5gMcugCRwZQ=
318316
github.com/segmentio/encoding v0.3.6/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM=

parquet.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ type parquetBuffer struct {
2626
prefix string
2727
id string
2828

29-
size int
29+
maxRecords int
30+
maxInterval time.Duration
3031
rowGroupSize int
3132

3233
mutex sync.Mutex // ☢️
@@ -36,13 +37,14 @@ type parquetBuffer struct {
3637
buffer *parquet.GenericBuffer[log]
3738
}
3839

39-
func NewParquetBuffer(bucket objstore.Bucket, prefix string, size int) ParquetBuffer {
40+
func NewParquetBuffer(bucket objstore.Bucket, prefix string, maxRecords int, maxInterval time.Duration) ParquetBuffer {
4041
return &parquetBuffer{
4142
prefix: prefix,
4243
id: lo.Must(uuid.NewV4()).String()[0:5],
4344

44-
size: size,
45-
rowGroupSize: int(math.Ceil(float64(size) / 10)),
45+
maxRecords: maxRecords,
46+
maxInterval: maxInterval,
47+
rowGroupSize: int(math.Ceil(float64(maxRecords) / 10)),
4648

4749
mutex: sync.Mutex{},
4850
bucket: bucket,
@@ -52,7 +54,7 @@ func NewParquetBuffer(bucket objstore.Bucket, prefix string, size int) ParquetBu
5254
}
5355
}
5456

55-
func (b *parquetBuffer) Append(time time.Time, logLevel slog.Level, message string, attributes map[string]any) error {
57+
func (b *parquetBuffer) Append(tIme time.Time, logLevel slog.Level, message string, attributes map[string]any) error {
5658
serializedAttrs, err := json.Marshal(attributes)
5759
if err != nil {
5860
return err
@@ -62,7 +64,7 @@ func (b *parquetBuffer) Append(time time.Time, logLevel slog.Level, message stri
6264

6365
_, err = b.buffer.Write([]log{
6466
{
65-
Time: time,
67+
Time: tIme,
6668
LogLevel: logLevel.String(),
6769
Message: message,
6870
Attributes: serializedAttrs,
@@ -73,7 +75,7 @@ func (b *parquetBuffer) Append(time time.Time, logLevel slog.Level, message stri
7375
return err
7476
}
7577

76-
if b.buffer.Len() >= b.size {
78+
if b.buffer.Len() >= b.maxRecords || b.start.Add(b.maxInterval).Before(time.Now()) {
7779
b.mutex.Unlock()
7880
return b.Flush(false)
7981
}

0 commit comments

Comments
 (0)