Skip to content

Commit

Permalink
update collector(v2): unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
kainonly committed Oct 22, 2024
1 parent 161609b commit 5a4e346
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 21 deletions.
21 changes: 11 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,28 @@ func New(js nats.JetStreamContext) (x *Client, err error) {
}

type StreamOption struct {
Key string `msgpack:"key"`
Description string `msgpack:"description"`
Key string `json:"key"`
Description string `json:"description"`
}

func (x *Client) Get(key string) (result map[string]interface{}, err error) {
result = make(map[string]interface{})
type Result struct {
Option StreamOption `json:"option"`
Info *nats.StreamInfo `json:"info"`
}

func (x *Client) Get(key string) (result *Result, err error) {
result = new(Result)
var entry nats.KeyValueEntry
if entry, err = x.Kv.Get(key); err != nil {
return
}
var option StreamOption
if err = sonic.Unmarshal(entry.Value(), &option); err != nil {
if err = sonic.Unmarshal(entry.Value(), &result.Option); err != nil {
return
}
result["option"] = option
name := fmt.Sprintf(`COLLECT_%s`, key)
var info *nats.StreamInfo
if info, err = x.Js.StreamInfo(name); err != nil {
if result.Info, err = x.Js.StreamInfo(name); err != nil {
return
}
result["info"] = *info
return
}

Expand Down
26 changes: 15 additions & 11 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,47 +45,49 @@ func UseNats(ctx context.Context) (err error) {
if js, err = nc.JetStream(nats.PublishAsyncMaxPending(256), nats.Context(ctx)); err != nil {
return
}
if _, err = js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "collector"}); err != nil {
if _, err = js.KeyValue("collector"); err != nil {
return
}
return
}

func TestTransfer_Set(t *testing.T) {
err := x.Set(context.TODO(), client.StreamOption{
Key: "system",
Description: "system example",
Key: "beta",
Description: "beta example",
})
assert.Nil(t, err)
}

func TestTransfer_Update(t *testing.T) {
err := x.Update(context.TODO(), client.StreamOption{
Key: "system",
Description: "system example 123",
Key: "beta",
Description: "beta example 123",
})
assert.Nil(t, err)
}

func TestTransfer_Get(t *testing.T) {
_, err := x.Get("not_exists")
assert.Error(t, err)
result, err := x.Get("system")
result, err := x.Get("beta")
assert.Nil(t, err)
t.Log(result)
t.Log(result.Option)
t.Log(result.Info)
}

func TestTransfer_Publish(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
subjectName := fmt.Sprintf(`collects.%s`, "system")
queueName := fmt.Sprintf(`COLLECT_%s`, "system")
subjectName := fmt.Sprintf(`collects.%s`, "beta")
queueName := fmt.Sprintf(`COLLECT_%s`, "beta")
now := time.Now()
data := map[string]interface{}{
"uuid": "0ff5483a-7ddc-44e0-b723-c3417988663f",
"msg": "hi",
}
go js.QueueSubscribe(subjectName, queueName, func(msg *nats.Msg) {
t.Log("get", string(msg.Data))
var payload common.Payload
if err := gob.NewDecoder(bytes.NewBuffer(msg.Data)).
Decode(&payload); err != nil {
Expand All @@ -96,11 +98,13 @@ func TestTransfer_Publish(t *testing.T) {
assert.Equal(t, now.UnixNano(), payload.Timestamp.UnixNano())
wg.Done()
})
err := x.Publish(context.TODO(), "system", common.Payload{
time.Sleep(time.Second)
err := x.Publish(context.TODO(), "beta", common.Payload{
Data: data,
Timestamp: now,
})
assert.NoError(t, err)
t.Log("send")
wg.Wait()
}

Expand Down Expand Up @@ -139,6 +143,6 @@ func TestTransfer_Publish(t *testing.T) {
//}

func TestTransfer_Remove(t *testing.T) {
err := x.Remove("system")
err := x.Remove("beta")
assert.Nil(t, err)
}

0 comments on commit 5a4e346

Please sign in to comment.