diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index f6a27ad..66803f8 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -22,15 +22,15 @@ jobs: runs-on: ubuntu-22.04 steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Login to GitHub Container Registry - uses: docker/login-action@v2 + uses: docker/login-action@v3 with: registry: ${{ env.img-registry }} username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 + uses: docker/setup-buildx-action@v3 - name: "Set env vars (develop, feature)" if: ${{ github.ref_name == 'develop' || startsWith(github.ref_name, 'feature') }} shell: bash @@ -45,7 +45,7 @@ jobs: echo "img-push=true" >> $GITHUB_ENV echo "img-platforms=linux/amd64,linux/arm64" >> $GITHUB_ENV - name: Build Docker image - uses: docker/build-push-action@v4 + uses: docker/build-push-action@v5 with: platforms: ${{ env.img-platforms }} push: ${{ env.img-push }} diff --git a/Dockerfile b/Dockerfile index d7785bb..ea3ab6c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,6 +7,7 @@ COPY cmd/ cmd/ COPY util/ util/ COPY proto/ proto/ COPY cli/ cli/ +COPY client/ client/ COPY msg/ msg/ COPY lake/ lake/ COPY relayer/ relayer/ diff --git a/cli/client/client.go b/cli/client/client.go index 5680b81..612e1c6 100644 --- a/cli/client/client.go +++ b/cli/client/client.go @@ -4,8 +4,6 @@ import ( "bufio" "bytes" "context" - "crypto/tls" - "encoding/json" "fmt" "io" "math/rand" @@ -18,12 +16,9 @@ import ( "github.com/spf13/cobra" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "github.com/postie-labs/go-postie-lib/crypto" + "github.com/h0n9/msg-lake/client" pb "github.com/h0n9/msg-lake/proto" ) @@ -38,9 +33,8 @@ var Cmd = &cobra.Command{ Use: "client", Short: "run msg lake client (interactive)", RunE: func(cmd *cobra.Command, args []string) error { - var ( - conn *grpc.ClientConn - ) + var msgLakeClient *client.Client + // init wg wg := sync.WaitGroup{} @@ -61,10 +55,8 @@ var Cmd = &cobra.Command{ return case s := <-sigCh: fmt.Printf("got signal %v, attempting graceful shutdown\n", s) - if conn != nil { - fmt.Printf("closing grpc client ... ") - conn.Close() - fmt.Printf("done\n") + if msgLakeClient != nil { + msgLakeClient.Close() } fmt.Printf("cancelling ctx ... ") cancel() @@ -79,81 +71,32 @@ var Cmd = &cobra.Command{ } pubKeyBytes := privKey.PubKey().Bytes() - // init grpc client - creds := grpc.WithTransportCredentials(insecure.NewCredentials()) - if tlsEnabled { - creds = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) - } - conn, err = grpc.Dial(hostAddr, creds) + // init msg lake client + msgLakeClient, err = client.NewClient(privKey, hostAddr, tlsEnabled) if err != nil { return err } - cli := pb.NewMsgLakeClient(conn) - - msg := Msg{ - Data: []byte(topicID), - } - data, err := json.Marshal(msg) - if err != nil { - return err - } - sigDataBytes, err := privKey.Sign(data) - if err != nil { - return err - } - - stream, err := cli.Subscribe(ctx, &pb.SubscribeReq{ - TopicId: topicID, - MsgCapsule: &pb.MsgCapsule{ - Data: data, - Signature: &pb.Signature{ - PubKey: pubKeyBytes, - Data: sigDataBytes, - }, - }, - }) - if err != nil { - return err - } - - // block until recieve subscribe ack msg - subRes, err := stream.Recv() - if err != nil { - return err - } - - // check subscribe ack msg - if subRes.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK { - return fmt.Errorf("failed to receive subscribe ack from agent") - } - if !subRes.GetOk() { - return fmt.Errorf("failed to begin subscribing msgs") - } // execute goroutine (receiver) wg.Add(1) go func() { defer wg.Done() - for { - res, err := stream.Recv() - if err != nil { - fmt.Println(err) - cancel() - return - } - if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY { - continue - } - msgCapsule := res.GetMsgCapsule() + err := msgLakeClient.Subscribe(ctx, topicID, func(msgCapsule *pb.MsgCapsule) error { signature := msgCapsule.GetSignature() if bytes.Equal(signature.GetPubKey(), pubKeyBytes) { - continue + return nil } if len(msgCapsule.GetData()) == 0 { - continue + return nil } printOutput(true, msgCapsule) printInput(true) + return nil + }) + if err != nil { + fmt.Println(err) + cancel() + return } }() @@ -180,39 +123,10 @@ var Cmd = &cobra.Command{ if input == "" { continue } - go func() { - data, err := json.Marshal(input) - if err != nil { - fmt.Println(err) - return - } - sigDataBytes, err := privKey.Sign(data) - if err != nil { - fmt.Println(err) - return - } - - pubRes, err := cli.Publish(ctx, &pb.PublishReq{ - TopicId: topicID, - MsgCapsule: &pb.MsgCapsule{ - Data: data, - Signature: &pb.Signature{ - PubKey: pubKeyBytes, - Data: sigDataBytes, - }, - }, - }) - if err != nil { - fmt.Println(err) - return - } - - // check publish res - if !pubRes.GetOk() { - fmt.Println("failed to send message") - return - } - }() + err = msgLakeClient.Publish(ctx, topicID, input) + if err != nil { + fmt.Println(err) + } } } }() diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..11825d0 --- /dev/null +++ b/client/client.go @@ -0,0 +1,162 @@ +package client + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + pb "github.com/h0n9/msg-lake/proto" + "github.com/postie-labs/go-postie-lib/crypto" +) + +type Client struct { + privKey *crypto.PrivKey + grpcClientConn *grpc.ClientConn + msgLakeClient pb.MsgLakeClient +} + +func NewClient(privKey *crypto.PrivKey, hostAddr string, tlsEnabled bool) (*Client, error) { + // init grpc client + creds := grpc.WithTransportCredentials(insecure.NewCredentials()) + if tlsEnabled { + creds = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) + } + grpcClientConn, err := grpc.Dial(hostAddr, creds) + if err != nil { + return nil, err + } + + // init msg lake client + msgLakeClient := pb.NewMsgLakeClient(grpcClientConn) + + return &Client{ + privKey: privKey, + grpcClientConn: grpcClientConn, + msgLakeClient: msgLakeClient, + }, nil +} + +// Close() closes the grpc client connection +func (c *Client) Close() { + err := c.grpcClientConn.Close() + if err != nil { + fmt.Printf("failed to close grpc client: %v\n", err) + } +} + +// Subscribe() subscribes to a topic +func (c *Client) Subscribe(ctx context.Context, topicID string, msgCapsuleHandler func(*pb.MsgCapsule) error) error { + // serialize topicID + data, err := json.Marshal(topicID) + if err != nil { + return err + } + + // sign the serialized topicID + sigDataBytes, err := c.privKey.Sign(data) + if err != nil { + return err + } + + // subscribe to the topic + stream, err := c.msgLakeClient.Subscribe(ctx, &pb.SubscribeReq{ + TopicId: topicID, + MsgCapsule: &pb.MsgCapsule{ + Data: data, + Signature: &pb.Signature{ + PubKey: c.privKey.PubKey().Bytes(), + Data: sigDataBytes, + }, + }, + }) + if err != nil { + return err + } + + // block until recieve subscribe ack msg + subRes, err := stream.Recv() + if err != nil { + return err + } + + // check subscribe ack msg + if subRes.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_ACK { + return fmt.Errorf("failed to receive subscribe ack from agent") + } + if !subRes.GetOk() { + return fmt.Errorf("failed to begin subscribing msgs") + } + + for { + select { + case <-ctx.Done(): + return nil + default: + res, err := stream.Recv() + if err != nil { + return err + } + + // check if the received message is a relay message + if res.GetType() != pb.SubscribeResType_SUBSCRIBE_RES_TYPE_RELAY { + continue + } + + // get a msgCapsule from the received message + msgCapsule := res.GetMsgCapsule() + + // check if the msgCapsule is empty + if len(msgCapsule.GetData()) == 0 { + continue + } + + // handle the received msgCapsule + err = msgCapsuleHandler(msgCapsule) + if err != nil { + fmt.Println(err) + } + } + } +} + +// Publish() publishes a message to a topic +func (c *Client) Publish(ctx context.Context, topicID, message string) error { + // serialize the message + data, err := json.Marshal(message) + if err != nil { + return err + } + + // sign the serialized message + sigDataBytes, err := c.privKey.Sign(data) + if err != nil { + return err + } + + // publish the message + pubRes, err := c.msgLakeClient.Publish(ctx, &pb.PublishReq{ + TopicId: topicID, + MsgCapsule: &pb.MsgCapsule{ + Data: data, + Signature: &pb.Signature{ + PubKey: c.privKey.PubKey().Bytes(), + Data: sigDataBytes, + }, + }, + }) + if err != nil { + return err + } + + // check publish ack msg + if !pubRes.GetOk() { + return fmt.Errorf("failed to publish msg") + } + + return nil +}