Skip to content

Commit

Permalink
update collector(v2): continue using msgpack transfers
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 22, 2024
1 parent 487cf47 commit 1be40db
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 39 deletions.
45 changes: 26 additions & 19 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package app

import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"github.com/bytedance/sonic"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
"github.com/weplanx/collector/client"
"github.com/weplanx/collector/common"
"go.uber.org/zap"
Expand Down Expand Up @@ -163,8 +163,7 @@ func (x *App) RemoveSubscribe(key string) (err error) {

func (x *App) Push(key string, msg *nats.Msg) (err error) {
var payload common.Payload
if err = gob.NewDecoder(bytes.NewReader(msg.Data)).
Decode(&payload); err != nil {
if err = msgpack.Unmarshal(msg.Data, &payload); err != nil {
common.Log.Error("decoding fail",
zap.String("subject", msg.Subject),
zap.String("data", string(msg.Data)),
Expand Down Expand Up @@ -235,36 +234,44 @@ func (x *App) Pipe(input M, paths []string, kind interface{}) (err error) {
var data interface{}
switch kind {
case "date":
if data, err = time.Parse(time.RFC1123, unknow.(string)); err != nil {
return
if v, ok := unknow.(string); ok {
if data, err = time.Parse(time.RFC1123, v); err != nil {
return
}
}
break
case "dates":
dates := unknow.([]interface{})
for i, date := range dates {
if dates[i], err = time.Parse(time.RFC1123, date.(string)); err != nil {
return
if dates, ok := unknow.([]interface{}); ok {
for i, date := range dates {
if dates[i], err = time.Parse(time.RFC1123, date.(string)); err != nil {
return
}
}
data = dates
}
data = dates
break
case "timestamp":
if data, err = time.Parse(time.RFC3339, unknow.(string)); err != nil {
return
if v, ok := unknow.(string); ok {
if data, err = time.Parse(time.RFC3339, v); err != nil {
return
}
}
break
case "timestamps":
timestamps := unknow.([]interface{})
for i, timestamp := range timestamps {
if timestamps[i], err = time.Parse(time.RFC3339, timestamp.(string)); err != nil {
return
if timestamps, ok := unknow.([]interface{}); ok {
for i, timestamp := range timestamps {
if timestamps[i], err = time.Parse(time.RFC3339, timestamp.(string)); err != nil {
return
}
}
data = timestamps
}
data = timestamps
break
case "json":
if err = sonic.Unmarshal(unknow.([]byte), &data); err != nil {
return
if b, ok := unknow.([]byte); ok {
if err = sonic.Unmarshal(b, &data); err != nil {
return
}
}
break
}
Expand Down
10 changes: 4 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package client

import (
"bytes"
"context"
"encoding/gob"
"fmt"
"github.com/bytedance/sonic"
"github.com/nats-io/nats.go"
"github.com/vmihailenco/msgpack/v5"
"github.com/weplanx/collector/common"
)

Expand Down Expand Up @@ -106,13 +105,12 @@ func (x *Client) Remove(key string) (err error) {
}

func (x *Client) Publish(ctx context.Context, key string, payload common.Payload) (err error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err = encoder.Encode(payload); err != nil {
var b []byte
if b, err = msgpack.Marshal(payload); err != nil {
return
}
subject := fmt.Sprintf(`collects.%s`, key)
if _, err = x.Js.Publish(subject, buf.Bytes(), nats.Context(ctx)); err != nil {
if _, err = x.Js.Publish(subject, b, nats.Context(ctx)); err != nil {
return
}
return
Expand Down
38 changes: 24 additions & 14 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package client_test

import (
"bytes"
"encoding/gob"
"fmt"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/vmihailenco/msgpack/v5"
"github.com/weplanx/collector/client"
"github.com/weplanx/collector/common"
"os"
Expand Down Expand Up @@ -95,8 +94,7 @@ func TestTransfer_Publish(t *testing.T) {
go js.QueueSubscribe(subjectName, queueName, func(msg *nats.Msg) {
t.Log("get", string(msg.Data))
var payload common.Payload
if err := gob.NewDecoder(bytes.NewBuffer(msg.Data)).
Decode(&payload); err != nil {
if err := msgpack.Unmarshal(msg.Data, &payload); err != nil {
t.Error()
}
t.Log(payload)
Expand All @@ -116,33 +114,45 @@ func TestTransfer_Publish(t *testing.T) {

//func TestTransfer_ManualPublish(t *testing.T) {
// now := time.Now()
// err := client.Publish(context.TODO(), "system", transfer.Payload{
// err := x.Publish(context.TODO(), "beta", common.Payload{
// Timestamp: now,
// Data: map[string]interface{}{
// "metadata": map[string]interface{}{
// "user_id": "640e7c2c7d8a24d6f831e9bf",
// },
// "msg": "123456",
// },
// Format: map[string]interface{}{
// "metadata.user_id": "oid",
// },
// XData: map[string]interface{}{},
// })
// assert.NoError(t, err)
//}

//
//func TestTransfer_ManualPublishNone(t *testing.T) {
// now := time.Now()
// err := client.Publish(context.TODO(), "system", transfer.Payload{
// err := x.Publish(context.TODO(), "beta", common.Payload{
// Timestamp: now,
// Data: map[string]interface{}{
// "metadata": map[string]interface{}{
// "user_id": "",
// "now": now.Format(time.RFC1123),
// "range": []string{
// now.Format(time.RFC1123),
// now.Add(time.Hour).Format(time.RFC1123),
// },
// "ts": now.Format(time.RFC3339),
// "ts-range": []string{
// now.Format(time.RFC3339),
// now.Add(time.Hour).Format(time.RFC3339),
// },
// },
// "msg": "123456",
// "msg": "123456",
// "data": []byte(`{"name":"kain"}`),
// },
// Format: map[string]interface{}{
// "metadata.user_id": "oid",
// XData: map[string]interface{}{
// "metadata.now": "date",
// "metadata.range": "dates",
// "metadata.ts": "timestamp",
// "metadata.ts-range": "timestamps",
// "data": "json",
// },
// })
// assert.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/google/wire v0.6.0
github.com/nats-io/nats.go v1.37.0
github.com/stretchr/testify v1.9.0
github.com/vmihailenco/msgpack/v5 v5.4.1
go.uber.org/zap v1.27.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -26,6 +27,7 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8=
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY=
go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE=
Expand Down

0 comments on commit 1be40db

Please sign in to comment.