Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore: add check for context cancel or timeout
Browse files Browse the repository at this point in the history
pgautier404 committed Feb 3, 2023
1 parent e7e5fff commit 3dd0ce8
Showing 4 changed files with 160 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -61,6 +61,6 @@ jobs:
- name: Run test
run: |
go vet ./...
go test -v ./momento
go test -v ./momento ./incubating
228 changes: 151 additions & 77 deletions incubating/simple_cache_client_test.go
Original file line number Diff line number Diff line change
@@ -12,18 +12,72 @@ import (
"github.com/momentohq/client-sdk-go/momento"
)

// Basic happy path test - create a cache, operate set/get, and delete the cache
func TestBasicHappyPathLocalPubSub(t *testing.T) {
var client ScsClient

func TestMain(m *testing.M) {
setup()
m.Run()
teardown()
}

func getClient() ScsClient {
credProvider, err := auth.NewEnvMomentoTokenProvider("TEST_AUTH_TOKEN")
if err != nil {
panic(err)
}
client, err := NewScsClient(&momento.SimpleCacheClientProps{
Configuration: config.LatestLaptopConfig(),
CredentialProvider: credProvider,
})
if err != nil {
panic(err)
}
return client
}

func setup() {
ctx := context.Background()
testPortToUse := 3000
go func() {
newMomentoLocalTestServer(testPortToUse)
}()
client = getClient()
err := client.CreateCache(ctx, &momento.CreateCacheRequest{
CacheName: "test-cache",
})
if err != nil {
var momentoErr momento.MomentoError
if errors.As(err, &momentoErr) {
if momentoErr.Code() != momento.AlreadyExistsError {
panic(err)
}
}
}
}

func teardown() {
client.Close()
}

func publishTopic(pubClient ScsClient, i int, ctx context.Context) {
var topicVal TopicValue

if i%2 == 0 {
topicVal = &TopicValueString{Text: "hello txt"}
} else {
topicVal = &TopicValueBytes{Bytes: []byte("hello bytes")}
}

client, err := newLocalScsClient(testPortToUse)
err := pubClient.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: topicVal,
})
if err != nil {
panic(err)
}
}

// Basic happy path test using a context which we cancel
func TestHappyPathPubSubCancelContext(t *testing.T) {
ctx := context.Background()
cancelContext, cancelFunction := context.WithCancel(ctx)

sub, err := client.SubscribeTopic(ctx, &TopicSubscribeRequest{
CacheName: "test-cache",
@@ -33,121 +87,141 @@ func TestBasicHappyPathLocalPubSub(t *testing.T) {
panic(err)
}

// TODO: use a channel instead of a counter variable
numMessagesReceived := 0
go func() {
err := sub.Recv(context.Background(), func(ctx context.Context, m TopicValue) {
switch msg := m.(type) {
// Just block and make sure we get stubbed messages for now for quick test
err := sub.Recv(cancelContext, func(ctx context.Context, m TopicValue) {
switch m.(type) {
case *TopicValueString:
fmt.Printf("got a msg! val=%s\n", msg.Text)
case *TopicValueBytes:
fmt.Printf("got a msg! val=%s\n", msg.Bytes)
numMessagesReceived++
}
})
if err != nil {
panic(err)
}
}()

cancelAtNumber := 5
for i := 0; i < 10; i++ {
if i%2 == 0 {
err := client.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: &TopicValueString{
Text: fmt.Sprintf("string hello %d", i),
},
})
if err != nil {
panic(err)
}
} else {
err := client.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: &TopicValueBytes{
Bytes: []byte(fmt.Sprintf("byte hello %d", i)),
},
})
if err != nil {
panic(err)
}
publishTopic(client, i, ctx)
// Call the cancel function here and make sure it stops the messages.
if i == cancelAtNumber {
cancelFunction()
}
time.Sleep(time.Second)
}

// Ensure cancelFunction is called to quiet a context leak warning
cancelFunction()

// if we have received more than cancelAtNumber, our cancel failed
if numMessagesReceived > cancelAtNumber {
t.Errorf("expected no more than %d messages but received %d", cancelAtNumber, numMessagesReceived)
}
}

// Basic happy path pubsub integration test
func TestBasicHappyPathPubSubIntegrationTest(t *testing.T) {
// Basic happy path test using a context with a timeout
func TestHappyPathPubSubTimeoutContext(t *testing.T) {
var timeoutUnits time.Duration = 5
ctx := context.Background()
credProvider, err := auth.NewEnvMomentoTokenProvider("TEST_AUTH_TOKEN")
if err != nil {
panic(err)
}
client, err := NewScsClient(&momento.SimpleCacheClientProps{
Configuration: config.LatestLaptopConfig(),
CredentialProvider: credProvider,
timeoutContext, cancelFunc := context.WithTimeout(ctx, timeoutUnits*time.Second)

sub, err := client.SubscribeTopic(ctx, &TopicSubscribeRequest{
CacheName: "test-cache",
TopicName: "test-topic",
})
if err != nil {
panic(err)
}
err = client.CreateCache(ctx, &momento.CreateCacheRequest{
CacheName: "test-cache",
})
if err != nil {
var momentoErr momento.MomentoError
if errors.As(err, &momentoErr) {
if momentoErr.Code() != momento.AlreadyExistsError {
panic(err)

// TODO: use a channel instead of a counter variable
numMessagesReceived := 0
go func() {
// Just block and make sure we get stubbed messages for now for quick test
err := sub.Recv(timeoutContext, func(ctx context.Context, m TopicValue) {
switch m.(type) {
case *TopicValueString:
case *TopicValueBytes:
numMessagesReceived++
}
})
if err != nil {
panic(err)
}
}()

for i := 0; i < 10; i++ {
publishTopic(client, i, ctx)
time.Sleep(time.Second)
}

sub, err := client.SubscribeTopic(ctx, &TopicSubscribeRequest{
cancelFunc()

// at a rate of 1 per second, we should not get back more than timeoutUnits messages
if numMessagesReceived > int(timeoutUnits) {
t.Errorf("expected no more than %d messages but received %d", timeoutUnits, numMessagesReceived)
}

}

// Basic happy path test using local test server
// TODO: are we going to keep the local client and server around?
func TestBasicHappyPathLocalPubSub(t *testing.T) {
ctx := context.Background()
testPortToUse := 3000
go func() {
newMomentoLocalTestServer(testPortToUse)
}()

localClient, err := newLocalScsClient(testPortToUse)
if err != nil {
panic(err)
}

sub, err := localClient.SubscribeTopic(ctx, &TopicSubscribeRequest{
CacheName: "test-cache",
TopicName: "test-topic",
})
if err != nil {
panic(err)
}

numMessagesReceived := 0
numMessagesToSend := 10
go func() {
// Just block and make sure we get stubbed messages for now for quick test
err := sub.Recv(context.Background(), func(ctx context.Context, m TopicValue) {
switch msg := m.(type) {
case *TopicValueString:
fmt.Printf("got a msg! val=%s\n", msg.Text)
case *TopicValueBytes:
fmt.Printf("got a msg! val=%s\n", msg.Bytes)
}
numMessagesReceived++
})
if err != nil {
panic(err)
}
}()

for i := 0; i < 10; i++ {
for i := 0; i < numMessagesToSend; i++ {
var topicVal TopicValue
if i%2 == 0 {
err := client.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: &TopicValueString{
Text: fmt.Sprintf("string hello %d", i),
},
})
if err != nil {
panic(err)
topicVal = &TopicValueString{
Text: fmt.Sprintf("string hello %d", i),
}
} else {
err := client.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: &TopicValueBytes{
Bytes: []byte(fmt.Sprintf("byte hello %d", i)),
},
})
if err != nil {
panic(err)
topicVal = &TopicValueBytes{
Bytes: []byte(fmt.Sprintf("byte hello %d", i)),
}
}
err := localClient.PublishTopic(ctx, &TopicPublishRequest{
CacheName: "test-cache",
TopicName: "test-topic",
Value: topicVal,
})
if err != nil {
panic(err)
}
time.Sleep(time.Second)
}

if numMessagesToSend != numMessagesReceived {
t.Errorf("expected %d messages but got %d", numMessagesToSend, numMessagesReceived)
}
}
7 changes: 7 additions & 0 deletions incubating/subscription.go
Original file line number Diff line number Diff line change
@@ -28,6 +28,13 @@ func (s *Subscription) Recv(ctx context.Context, f func(ctx context.Context, m T
return err
}

select {
case <-ctx.Done():
return nil
default:
// pass
}

switch typedMsg := rawMsg.Kind.(type) {
case *pb.XSubscriptionItem_Discontinuity:
// Don't pass discontinuity messages back to user for now
2 changes: 1 addition & 1 deletion internal/services/pubsub_data_service.go
Original file line number Diff line number Diff line change
@@ -78,7 +78,7 @@ func (client *PubSubClient) Publish(ctx context.Context, request *models.TopicPu
},
})
return err
case models.TopicValueBytes:
case *models.TopicValueBytes:
_, err := client.unaryGrpcClient.Publish(ctx, &pb.XPublishRequest{
CacheName: request.CacheName,
Topic: request.TopicName,

0 comments on commit 3dd0ce8

Please sign in to comment.