Skip to content

Commit

Permalink
Merge pull request #46 from h0n9/develop
Browse files Browse the repository at this point in the history
Prepare to release `v0.0.7`
  • Loading branch information
h0n9 authored Mar 27, 2024
2 parents 2b4e5e0 + 67ffa4e commit 62b04f5
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 110 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Login to GitHub Container Registry
uses: docker/login-action@v2
uses: docker/login-action@v3
with:
registry: ${{ env.img-registry }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
uses: docker/setup-buildx-action@v3
- name: "Set env vars (develop, feature)"
if: ${{ github.ref_name == 'develop' || startsWith(github.ref_name, 'feature') }}
shell: bash
Expand All @@ -45,7 +45,7 @@ jobs:
echo "img-push=true" >> $GITHUB_ENV
echo "img-platforms=linux/amd64,linux/arm64" >> $GITHUB_ENV
- name: Build Docker image
uses: docker/build-push-action@v4
uses: docker/build-push-action@v5
with:
platforms: ${{ env.img-platforms }}
push: ${{ env.img-push }}
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ COPY cmd/ cmd/
COPY util/ util/
COPY proto/ proto/
COPY cli/ cli/
COPY client/ client/
COPY msg/ msg/
COPY lake/ lake/
COPY relayer/ relayer/
Expand Down
126 changes: 20 additions & 106 deletions cli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"math/rand"
Expand All @@ -18,12 +16,9 @@ import (

"github.com/spf13/cobra"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

"github.com/postie-labs/go-postie-lib/crypto"

"github.com/h0n9/msg-lake/client"
pb "github.com/h0n9/msg-lake/proto"
)

Expand All @@ -38,9 +33,8 @@ var Cmd = &cobra.Command{
Use: "client",
Short: "run msg lake client (interactive)",
RunE: func(cmd *cobra.Command, args []string) error {
var (
conn *grpc.ClientConn
)
var msgLakeClient *client.Client

// init wg
wg := sync.WaitGroup{}

Expand All @@ -61,10 +55,8 @@ var Cmd = &cobra.Command{
return
case s := <-sigCh:
fmt.Printf("got signal %v, attempting graceful shutdown\n", s)
if conn != nil {
fmt.Printf("closing grpc client ... ")
conn.Close()
fmt.Printf("done\n")
if msgLakeClient != nil {
msgLakeClient.Close()
}
fmt.Printf("cancelling ctx ... ")
cancel()
Expand All @@ -79,81 +71,32 @@ var Cmd = &cobra.Command{
}
pubKeyBytes := privKey.PubKey().Bytes()

// init grpc client
creds := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsEnabled {
creds = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
}
conn, err = grpc.Dial(hostAddr, creds)
// init msg lake client
msgLakeClient, err = client.NewClient(privKey, hostAddr, tlsEnabled)
if err != nil {
return err
}
cli := pb.NewMsgLakeClient(conn)

msg := Msg{
Data: []byte(topicID),
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
sigDataBytes, err := privKey.Sign(data)
if err != nil {
return err
}

stream, err := cli.Subscribe(ctx, &pb.SubscribeReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: pubKeyBytes,
Data: sigDataBytes,
},
},
})
if err != nil {
return err
}

// block until recieve subscribe ack msg
subRes, err := stream.Recv()
if err != nil {
return err
}

// check subscribe ack msg
if subRes.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK {
return fmt.Errorf("failed to receive subscribe ack from agent")
}
if !subRes.GetOk() {
return fmt.Errorf("failed to begin subscribing msgs")
}

// execute goroutine (receiver)
wg.Add(1)
go func() {
defer wg.Done()
for {
res, err := stream.Recv()
if err != nil {
fmt.Println(err)
cancel()
return
}
if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY {
continue
}
msgCapsule := res.GetMsgCapsule()
err := msgLakeClient.Subscribe(ctx, topicID, func(msgCapsule *pb.MsgCapsule) error {
signature := msgCapsule.GetSignature()
if bytes.Equal(signature.GetPubKey(), pubKeyBytes) {
continue
return nil
}
if len(msgCapsule.GetData()) == 0 {
continue
return nil
}
printOutput(true, msgCapsule)
printInput(true)
return nil
})
if err != nil {
fmt.Println(err)
cancel()
return
}
}()

Expand All @@ -180,39 +123,10 @@ var Cmd = &cobra.Command{
if input == "" {
continue
}
go func() {
data, err := json.Marshal(input)
if err != nil {
fmt.Println(err)
return
}
sigDataBytes, err := privKey.Sign(data)
if err != nil {
fmt.Println(err)
return
}

pubRes, err := cli.Publish(ctx, &pb.PublishReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: pubKeyBytes,
Data: sigDataBytes,
},
},
})
if err != nil {
fmt.Println(err)
return
}

// check publish res
if !pubRes.GetOk() {
fmt.Println("failed to send message")
return
}
}()
err = msgLakeClient.Publish(ctx, topicID, input)
if err != nil {
fmt.Println(err)
}
}
}
}()
Expand Down
162 changes: 162 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package client

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"

pb "github.com/h0n9/msg-lake/proto"
"github.com/postie-labs/go-postie-lib/crypto"
)

type Client struct {
privKey *crypto.PrivKey
grpcClientConn *grpc.ClientConn
msgLakeClient pb.MsgLakeClient
}

func NewClient(privKey *crypto.PrivKey, hostAddr string, tlsEnabled bool) (*Client, error) {
// init grpc client
creds := grpc.WithTransportCredentials(insecure.NewCredentials())
if tlsEnabled {
creds = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
}
grpcClientConn, err := grpc.Dial(hostAddr, creds)
if err != nil {
return nil, err
}

// init msg lake client
msgLakeClient := pb.NewMsgLakeClient(grpcClientConn)

return &Client{
privKey: privKey,
grpcClientConn: grpcClientConn,
msgLakeClient: msgLakeClient,
}, nil
}

// Close() closes the grpc client connection
func (c *Client) Close() {
err := c.grpcClientConn.Close()
if err != nil {
fmt.Printf("failed to close grpc client: %v\n", err)
}
}

// Subscribe() subscribes to a topic
func (c *Client) Subscribe(ctx context.Context, topicID string, msgCapsuleHandler func(*pb.MsgCapsule) error) error {
// serialize topicID
data, err := json.Marshal(topicID)
if err != nil {
return err
}

// sign the serialized topicID
sigDataBytes, err := c.privKey.Sign(data)
if err != nil {
return err
}

// subscribe to the topic
stream, err := c.msgLakeClient.Subscribe(ctx, &pb.SubscribeReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: c.privKey.PubKey().Bytes(),
Data: sigDataBytes,
},
},
})
if err != nil {
return err
}

// block until recieve subscribe ack msg
subRes, err := stream.Recv()
if err != nil {
return err
}

// check subscribe ack msg
if subRes.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK {
return fmt.Errorf("failed to receive subscribe ack from agent")
}
if !subRes.GetOk() {
return fmt.Errorf("failed to begin subscribing msgs")
}

for {
select {
case <-ctx.Done():
return nil
default:
res, err := stream.Recv()
if err != nil {
return err
}

// check if the received message is a relay message
if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY {
continue
}

// get a msgCapsule from the received message
msgCapsule := res.GetMsgCapsule()

// check if the msgCapsule is empty
if len(msgCapsule.GetData()) == 0 {
continue
}

// handle the received msgCapsule
err = msgCapsuleHandler(msgCapsule)
if err != nil {
fmt.Println(err)
}
}
}
}

// Publish() publishes a message to a topic
func (c *Client) Publish(ctx context.Context, topicID, message string) error {
// serialize the message
data, err := json.Marshal(message)
if err != nil {
return err
}

// sign the serialized message
sigDataBytes, err := c.privKey.Sign(data)
if err != nil {
return err
}

// publish the message
pubRes, err := c.msgLakeClient.Publish(ctx, &pb.PublishReq{
TopicId: topicID,
MsgCapsule: &pb.MsgCapsule{
Data: data,
Signature: &pb.Signature{
PubKey: c.privKey.PubKey().Bytes(),
Data: sigDataBytes,
},
},
})
if err != nil {
return err
}

// check publish ack msg
if !pubRes.GetOk() {
return fmt.Errorf("failed to publish msg")
}

return nil
}

0 comments on commit 62b04f5

Please sign in to comment.