Skip to content

Commit

Permalink
feat: breaks topic client into standalone client (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
eaddingtonwhite authored and cprice404 committed Mar 14, 2023
1 parent 3388ea2 commit 078ccc4
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 119 deletions.
35 changes: 18 additions & 17 deletions internal/momentoerrors/scs_errors.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package momentoerrors

import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -36,38 +37,38 @@ const (

func ConvertSvcErr(err error) MomentoSvcErr {
if grpcStatus, ok := status.FromError(err); ok {
switch grpcStatus.Code().String() {
case "InvalidArgument":
switch grpcStatus.Code() {
case codes.InvalidArgument:
return NewMomentoSvcErr(InvalidArgumentError, grpcStatus.Message(), err)
case "Unimplemented":
case codes.Unimplemented:
return NewMomentoSvcErr(BadRequestError, grpcStatus.Message(), err)
case "OutOfRange":
case codes.OutOfRange:
return NewMomentoSvcErr(BadRequestError, grpcStatus.Message(), err)
case "FailedPrecondition":
case codes.FailedPrecondition:
return NewMomentoSvcErr(FailedPreconditionError, grpcStatus.Message(), err)
case "Canceled":
case codes.Canceled:
return NewMomentoSvcErr(CanceledError, grpcStatus.Message(), err)
case "DeadlineExceeded":
case codes.DeadlineExceeded:
return NewMomentoSvcErr(TimeoutError, grpcStatus.Message(), err)
case "PermissionDenied":
case codes.PermissionDenied:
return NewMomentoSvcErr(PermissionError, grpcStatus.Message(), err)
case "Unauthenticated":
case codes.Unauthenticated:
return NewMomentoSvcErr(AuthenticationError, grpcStatus.Message(), err)
case "ResourceExhausted":
case codes.ResourceExhausted:
return NewMomentoSvcErr(LimitExceededError, grpcStatus.Message(), err)
case "NotFound":
case codes.NotFound:
return NewMomentoSvcErr(NotFoundError, grpcStatus.Message(), err)
case "AlreadyExists":
case codes.AlreadyExists:
return NewMomentoSvcErr(AlreadyExistsError, grpcStatus.Message(), err)
case "Unknown":
case codes.Unknown:
return NewMomentoSvcErr(UnknownServiceError, grpcStatus.Message(), err)
case "Aborted":
case codes.Aborted:
return NewMomentoSvcErr(InternalServerError, grpcStatus.Message(), err)
case "Internal":
case codes.Internal:
return NewMomentoSvcErr(InternalServerError, grpcStatus.Message(), err)
case "Unavailable":
case codes.Unavailable:
return NewMomentoSvcErr(ServerUnavailableError, grpcStatus.Message(), err)
case "DataLoss":
case codes.DataLoss:
return NewMomentoSvcErr(InternalServerError, grpcStatus.Message(), err)
default:
return NewMomentoSvcErr(UnknownServiceError, InternalServerErrorMessage, err)
Expand Down
96 changes: 3 additions & 93 deletions momento/simple_cache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ package momento

import (
"context"
"fmt"
"strings"
"time"

"github.com/momentohq/client-sdk-go/auth"
"github.com/momentohq/client-sdk-go/config"
"github.com/momentohq/client-sdk-go/internal/models"
"github.com/momentohq/client-sdk-go/internal/momentoerrors"
pb "github.com/momentohq/client-sdk-go/internal/protos"
"github.com/momentohq/client-sdk-go/internal/services"

"github.com/momentohq/client-sdk-go/auth"
"github.com/momentohq/client-sdk-go/config"
)

type CacheClient interface {
Expand All @@ -24,9 +23,6 @@ type CacheClient interface {
Get(ctx context.Context, r *GetRequest) (GetResponse, error)
Delete(ctx context.Context, r *DeleteRequest) (DeleteResponse, error)

TopicSubscribe(ctx context.Context, request *TopicSubscribeRequest) (TopicSubscription, error)
TopicPublish(ctx context.Context, request *TopicPublishRequest) (TopicPublishResponse, error)

SortedSetFetch(ctx context.Context, r *SortedSetFetchRequest) (SortedSetFetchResponse, error)
SortedSetPut(ctx context.Context, r *SortedSetPutRequest) (SortedSetPutResponse, error)
SortedSetGetScore(ctx context.Context, r *SortedSetGetScoreRequest) (SortedSetGetScoreResponse, error)
Expand Down Expand Up @@ -67,7 +63,6 @@ type defaultScsClient struct {
credentialProvider auth.CredentialProvider
controlClient *services.ScsControlClient
dataClient *scsDataClient
pubSubClient *pubSubClient
}

type CacheClientProps struct {
Expand Down Expand Up @@ -98,14 +93,6 @@ func NewCacheClient(configuration config.Configuration, credentialProvider auth.
return nil, convertMomentoSvcErrorToCustomerError(momentoerrors.ConvertSvcErr(err))
}

pubSubClient, err := newPubSubClient(&models.PubSubClientRequest{
CredentialProvider: props.CredentialProvider,
Configuration: props.Configuration,
})
if err != nil {
return nil, convertMomentoSvcErrorToCustomerError(momentoerrors.ConvertSvcErr(err))
}

if props.DefaultTtl == 0 {
return nil, convertMomentoSvcErrorToCustomerError(
momentoerrors.NewMomentoSvcErr(
Expand All @@ -125,7 +112,6 @@ func NewCacheClient(configuration config.Configuration, credentialProvider auth.

client.dataClient = dataClient
client.controlClient = controlClient
client.pubSubClient = pubSubClient

return client, nil
}
Expand Down Expand Up @@ -196,82 +182,6 @@ func (c defaultScsClient) Delete(ctx context.Context, r *DeleteRequest) (DeleteR
return r.response, nil
}

func (c defaultScsClient) TopicSubscribe(ctx context.Context, request *TopicSubscribeRequest) (TopicSubscription, error) {
if err := isCacheNameValid(request.CacheName); err != nil {
return nil, err
}

if _, err := prepareName(request.TopicName, "Topic name"); err != nil {
return nil, err
}

clientStream, err := c.pubSubClient.TopicSubscribe(ctx, &TopicSubscribeRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
})
if err != nil {
return nil, err
}

// Ping the stream to provide a nice error message if the cache does not exist.
rawMsg := new(pb.XSubscriptionItem)
err = clientStream.RecvMsg(rawMsg)
if err != nil {
return nil, momentoerrors.NewMomentoSvcErr(
momentoerrors.NotFoundError,
fmt.Sprintf("Did not get a heartbeat from topic %v in cache %v", request.TopicName, request.CacheName),
err,
)
}
switch rawMsg.Kind.(type) {
case *pb.XSubscriptionItem_Heartbeat:
// The first message to a new subscription will always be a heartbeat.
default:
return nil, momentoerrors.NewMomentoSvcErr(
momentoerrors.InternalServerError,
fmt.Sprintf("expected a heartbeat message, got: %T", rawMsg.Kind),
err,
)
}

return &topicSubscription{
grpcClient: clientStream,
momentoTopicClient: c.pubSubClient,
cacheName: request.CacheName,
topicName: request.TopicName,
}, nil
}

func (c defaultScsClient) TopicPublish(ctx context.Context, request *TopicPublishRequest) (TopicPublishResponse, error) {
if err := isCacheNameValid(request.CacheName); err != nil {
return nil, err
}

if _, err := prepareName(request.TopicName, "Topic name"); err != nil {
return nil, err
}

if request.Value == nil {
return nil, convertMomentoSvcErrorToCustomerError(
momentoerrors.NewMomentoSvcErr(
momentoerrors.InvalidArgumentError, "value cannot be nil", nil,
),
)
}

err := c.pubSubClient.TopicPublish(ctx, &TopicPublishRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
Value: request.Value,
})

if err != nil {
return nil, momentoerrors.ConvertSvcErr(err)
}

return &TopicPublishSuccess{}, err
}

func (c defaultScsClient) SortedSetFetch(ctx context.Context, r *SortedSetFetchRequest) (SortedSetFetchResponse, error) {
if err := c.dataClient.makeRequest(ctx, r); err != nil {
return nil, err
Expand Down
8 changes: 7 additions & 1 deletion momento/test_helpers/shared_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type SharedContext struct {
Client momento.CacheClient
TopicClient momento.TopicClient
CacheName string
CollectionName string
Ctx context.Context
Expand All @@ -29,13 +30,18 @@ func NewSharedContext() SharedContext {
shared.Configuration = config.LatestLaptopConfig()
shared.DefaultTtl = 3 * time.Second

var err error
client, err := momento.NewCacheClient(shared.Configuration, shared.CredentialProvider, shared.DefaultTtl)
if err != nil {
panic(err)
}

topicClient, err := momento.NewTopicClient(shared.Configuration, shared.CredentialProvider)
if err != nil {
panic(err)
}

shared.Client = client
shared.TopicClient = topicClient

shared.CacheName = uuid.NewString()
shared.CollectionName = uuid.NewString()
Expand Down
123 changes: 123 additions & 0 deletions momento/topic_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Package momento represents API CacheClient interface accessors including control/data operations, errors, operation requests and responses for the SDK.
package momento

import (
"context"
"fmt"

"github.com/momentohq/client-sdk-go/auth"
"github.com/momentohq/client-sdk-go/config"
"github.com/momentohq/client-sdk-go/internal/models"
"github.com/momentohq/client-sdk-go/internal/momentoerrors"
pb "github.com/momentohq/client-sdk-go/internal/protos"
)

type TopicClient interface {
Subscribe(ctx context.Context, request *TopicSubscribeRequest) (TopicSubscription, error)
Publish(ctx context.Context, request *TopicPublishRequest) (TopicPublishResponse, error)

Close()
}

// defaultScsClient represents all information needed for momento client to enable cache control and data operations.
type defaultTopicClient struct {
credentialProvider auth.CredentialProvider
pubSubClient *pubSubClient
}

// NewTopicClient returns a new TopicClient with provided configuration and credential provider arguments.
func NewTopicClient(configuration config.Configuration, credentialProvider auth.CredentialProvider) (TopicClient, error) {
if configuration.GetClientSideTimeout() < 1 {
return nil, momentoerrors.NewMomentoSvcErr(momentoerrors.InvalidArgumentError, "request timeout must not be 0", nil)
}
client := &defaultTopicClient{
credentialProvider: credentialProvider,
}

pubSubClient, err := newPubSubClient(&models.PubSubClientRequest{
CredentialProvider: credentialProvider,
Configuration: configuration,
})
if err != nil {
return nil, convertMomentoSvcErrorToCustomerError(momentoerrors.ConvertSvcErr(err))
}

client.pubSubClient = pubSubClient

return client, nil
}

func (c defaultTopicClient) Subscribe(ctx context.Context, request *TopicSubscribeRequest) (TopicSubscription, error) {
if err := isCacheNameValid(request.CacheName); err != nil {
return nil, err
}

if _, err := prepareName(request.TopicName, "Topic name"); err != nil {
return nil, err
}

clientStream, err := c.pubSubClient.TopicSubscribe(ctx, &TopicSubscribeRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
})
if err != nil {
return nil, err
}

// Ping the stream to provide a nice error message if the cache does not exist.
rawMsg := new(pb.XSubscriptionItem)
err = clientStream.RecvMsg(rawMsg)
if err != nil {
return nil, momentoerrors.ConvertSvcErr(err)
}
switch rawMsg.Kind.(type) {
case *pb.XSubscriptionItem_Heartbeat:
// The first message to a new subscription will always be a heartbeat.
default:
return nil, momentoerrors.NewMomentoSvcErr(
momentoerrors.InternalServerError,
fmt.Sprintf("expected a heartbeat message, got: %T", rawMsg.Kind),
err,
)
}

return &topicSubscription{
grpcClient: clientStream,
momentoTopicClient: c.pubSubClient,
cacheName: request.CacheName,
topicName: request.TopicName,
}, nil
}

func (c defaultTopicClient) Publish(ctx context.Context, request *TopicPublishRequest) (TopicPublishResponse, error) {
if err := isCacheNameValid(request.CacheName); err != nil {
return nil, err
}

if _, err := prepareName(request.TopicName, "Topic name"); err != nil {
return nil, err
}

if request.Value == nil {
return nil, convertMomentoSvcErrorToCustomerError(
momentoerrors.NewMomentoSvcErr(
momentoerrors.InvalidArgumentError, "value cannot be nil", nil,
),
)
}

err := c.pubSubClient.TopicPublish(ctx, &TopicPublishRequest{
CacheName: request.CacheName,
TopicName: request.TopicName,
Value: request.Value,
})

if err != nil {
return nil, momentoerrors.ConvertSvcErr(err)
}

return &TopicPublishSuccess{}, err
}
func (c defaultTopicClient) Close() {
defer c.pubSubClient.Close()
}
Loading

0 comments on commit 078ccc4

Please sign in to comment.