Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use gogofast to have in-place protobuf serialization #294

Merged
merged 1 commit into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/golang/protobuf v1.3.1
github.com/gogo/protobuf v1.3.1
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
github.com/pierrec/lz4 v2.0.5+incompatible
Expand Down
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.5 h1:7q6vHIqubShURwQz8cQK6yIe/xC3IF0Vm7TGfqjewrc=
github.com/klauspost/compress v1.10.5/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
Expand Down Expand Up @@ -68,8 +73,17 @@ golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/license_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var otherCheck = regexp.MustCompile(`#
`)

var skip = map[string]bool{
"../pulsar/internal/pb/PulsarApi.pb.go": true,
"../pulsar/internal/pulsar_proto/PulsarApi.pb.go": true,
}

func TestLicense(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"net/url"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

var ErrConsumerClosed = errors.New("consumer closed")
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

type consumerState int
Expand Down
2 changes: 1 addition & 1 deletion pulsar/consumer_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"

"github.com/stretchr/testify/assert"

Expand Down
4 changes: 2 additions & 2 deletions pulsar/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (

pkgerrors "github.com/pkg/errors"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

// NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
Expand Down
4 changes: 2 additions & 2 deletions pulsar/impl_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"sync"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

type messageID struct {
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar/internal/compression"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
)
Expand Down
39 changes: 22 additions & 17 deletions pulsar/internal/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

"github.com/apache/pulsar-client-go/pulsar/internal/compression"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down Expand Up @@ -202,37 +202,42 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg proto.Message) *pb.BaseCommand
return cmd
}

func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
serialized, err := proto.Marshal(smm)
func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload []byte) {
metadataSize := uint32(smm.Size())
wb.WriteUint32(metadataSize)

wb.ResizeIfNeeded(metadataSize)
_, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
log.WithError(err).Fatal("Protobuf serialization error")
}

wb.WriteUint32(uint32(len(serialized)))
wb.Write(serialized)
wb.WrittenBytes(metadataSize)
wb.Write(payload)
}

func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
func serializeBatch(wb Buffer,
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
uncompressedPayload Buffer,
compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))

frameSizeIdx := wb.WriterIndex()
wb.WriteUint32(0) // Skip frame size until we now the size
frameStartIdx := wb.WriterIndex()

// Write cmd
wb.WriteUint32(uint32(cmdSize))
serialized, err := proto.Marshal(cmdSend)
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing cmdSend")
}

wb.Write(serialized)
wb.WrittenBytes(cmdSize)

// Create checksum placeholder
wb.WriteUint16(magicCrc32c)
Expand All @@ -241,13 +246,13 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,

// Write metadata
metadataStartIdx := wb.WriterIndex()
wb.WriteUint32(uint32(msgMetadataSize))
serialized, err = proto.Marshal(msgMetadata)
wb.WriteUint32(msgMetadataSize)
wb.ResizeIfNeeded(msgMetadataSize)
_, err = msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
if err != nil {
log.WithError(err).Fatal("Protobuf error when serializing msgMetadata")
}

wb.Write(serialized)
wb.WrittenBytes(msgMetadataSize)

// Make sure the buffer has enough space to hold the compressed data
// and perform the compression in-place
Expand Down
13 changes: 7 additions & 6 deletions pulsar/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"sync/atomic"
"time"

"github.com/golang/protobuf/proto"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/apache/pulsar-client-go/pulsar/internal/pb"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)

const (
Expand Down Expand Up @@ -390,24 +390,25 @@ func (c *connection) internalWriteData(data Buffer) {
}
}

func (c *connection) writeCommand(cmd proto.Message) {
func (c *connection) writeCommand(cmd *pb.BaseCommand) {
// Wire format
// [FRAME_SIZE] [CMD_SIZE][CMD]
cmdSize := uint32(proto.Size(cmd))
cmdSize := uint32(cmd.Size())
frameSize := cmdSize + 4

c.writeBufferLock.Lock()
defer c.writeBufferLock.Unlock()

c.writeBuffer.Clear()
c.writeBuffer.WriteUint32(frameSize)

c.writeBuffer.WriteUint32(cmdSize)
serialized, err := proto.Marshal(cmd)
_, err := cmd.MarshalToSizedBuffer(c.writeBuffer.WritableSlice()[:cmdSize])
if err != nil {
c.log.WithError(err).Fatal("Protobuf serialization error")
}

c.writeBuffer.Write(serialized)
c.writeBuffer.WrittenBytes(cmdSize)
c.internalWriteData(c.writeBuffer)
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/connection_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"bufio"
"io"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
)

Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"fmt"
"net/url"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"

log "github.com/sirupsen/logrus"
)
Expand Down
4 changes: 2 additions & 2 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"net/url"
"testing"

"github.com/apache/pulsar-client-go/pulsar/internal/pb"
"github.com/golang/protobuf/proto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
)

Expand Down
Loading