Skip to content

Commit

Permalink
test(pubsub): cleanup ordered keys json integration test (googleapis#…
Browse files Browse the repository at this point in the history
…8575)

* test(pubsub): remove checking of publish result to avoid creating too many goroutines

* add check for receive to exit before deleting subscription
  • Loading branch information
hongalex authored Sep 21, 2023
1 parent 6ef1945 commit 53f17c4
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,23 +1263,18 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
key := parts[0]
msg := parts[1]
publishData = append(publishData, testutil2.OrderedKeyMsg{Key: key, Data: msg})
r := topic.Publish(ctx, &Message{
topic.Publish(ctx, &Message{
Data: []byte(msg),
OrderingKey: key,
})
go func() {
_, err := r.Get(ctx)
if err != nil {
// Can't fail inside goroutine, so just log the error.
t.Logf("publish error for message(%s): %v", msg, err)
}
}()
wg.Add(1)
}
if err := scanner.Err(); err != nil {
t.Fatal(err)
}

receiveDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
mu.Lock()
Expand All @@ -1298,6 +1293,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
t.Error(err)
}
}
close(receiveDone)
}()

done := make(chan struct{})
Expand All @@ -1308,6 +1304,7 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {

select {
case <-done:
cancel()
case <-time.After(5 * time.Minute):
t.Fatal("timed out after 5m waiting for all messages to be received")
}
Expand All @@ -1317,6 +1314,13 @@ func TestIntegration_OrderedKeys_JSON(t *testing.T) {
if err := testutil2.VerifyKeyOrdering(publishData, receiveData); err != nil {
t.Fatalf("VerifyKeyOrdering error: %v", err)
}

select {
case <-receiveDone:
case <-time.After(5 * time.Minute):
t.Fatal("timed out after 5m waiting for receive to exit")
}

}

func TestIntegration_OrderedKeys_ResumePublish(t *testing.T) {
Expand Down

0 comments on commit 53f17c4

Please sign in to comment.