Skip to content

Commit

Permalink
Move util files to internal so they are not exported. (apache#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
cckellogg authored and merlimat committed Nov 12, 2019
1 parent 91906f5 commit 1325df1
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 12 deletions.
9 changes: 4 additions & 5 deletions pulsar/impl_partition_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (

"github.com/apache/pulsar-client-go/pkg/pb"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/util"
)

type producerState int
Expand Down Expand Up @@ -58,8 +57,8 @@ type partitionProducer struct {
// Channel where app is posting messages to be published
eventsChan chan interface{}

publishSemaphore util.Semaphore
pendingQueue util.BlockingQueue
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
lastSequenceID int64

partitionIdx int
Expand Down Expand Up @@ -92,8 +91,8 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, 1),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: make(util.Semaphore, maxPendingMessages),
pendingQueue: util.NewBlockingQueue(maxPendingMessages),
publishSemaphore: make(internal.Semaphore, maxPendingMessages),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
lastSequenceID: -1,
partitionIdx: partitionIdx,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package util
package internal

import (
"sync"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package util
package internal

import (
"fmt"
Expand Down
2 changes: 1 addition & 1 deletion util/semaphore.go → pulsar/internal/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package util
package internal

// Semaphore is a channel of bool, used to receive a bool type semaphore.
type Semaphore chan bool
Expand Down
8 changes: 4 additions & 4 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package pulsar
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar/internal"
"net/http"
"sync"
"testing"
"time"

"github.com/apache/pulsar-client-go/util"
"github.com/stretchr/testify/assert"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -124,7 +124,7 @@ func TestProducerAsyncSend(t *testing.T) {

wg := sync.WaitGroup{}
wg.Add(10)
errors := util.NewBlockingQueue(10)
errors := internal.NewBlockingQueue(10)

for i := 0; i < 10; i++ {
producer.SendAsync(context.Background(), &ProducerMessage{
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestFlushInProducer(t *testing.T) {

wg := sync.WaitGroup{}
wg.Add(5)
errors := util.NewBlockingQueue(10)
errors := internal.NewBlockingQueue(10)
for i := 0; i < numOfMessages/2; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
producer.SendAsync(ctx, &ProducerMessage{
Expand Down Expand Up @@ -404,7 +404,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
prefix := "msg-batch-async-"
wg := sync.WaitGroup{}
wg.Add(5)
errors := util.NewBlockingQueue(5)
errors := internal.NewBlockingQueue(5)
for i := 0; i < numOfMessages/2; i++ {
messageContent := prefix + fmt.Sprintf("%d", i)
producer.SendAsync(ctx, &ProducerMessage{
Expand Down

0 comments on commit 1325df1

Please sign in to comment.