TankGo is a stream processing library client for Tank written in Go. The project is still under development and until the latest release, some features like publishing messages are still missing. Currently only consuming from Tank server is supported. You can find more details regarding a high overview of the Tank protocol here. Also for more details regarding the Tank encoding scheme, which is the same that TankGo client follows refer to this document.
The basic struct your program will need to use in order to start communicating with Tank server is the TankSubscriber
. And the high-level API that this structs implements can be found here.
type Subscriber interface {
Connect(ctx context.Context, broker string, connectTimeout time.Duration, bufsize int) error
Subscribe(r *message.ConsumeRequest, maxConcurrentReads int) (<-chan message.Log, <-chan error)
Reset(ctx context.Context) error
GetTopicsHighWaterMark(r *message.ConsumeRequest) (map[string]uint64, error)
Ping() error
Close() error
}
In order to connect with Tank you should just create a new TankSubscriber
and call its Connect
method like this example.
s := &TankSubscriber{}
err := s.Connect(ctx, "ip:port", DefaultConTimeout, DefaultBufSize)
Connect
method uses a context as its first argument that is used if you need to handle the cancel of the initial underlying TCP connection with the Tank broker before connectTimeout
.
The connectTimeout
argument sets how long the Connection
should try to establish the underlying TCP connection before returning an error.
The last argument bufsize
sets the size of the internal read buffer.
On a successful connection and in order to start consuming messages from Tank you can now construct a new consume request described here. The previous document describes in detail what kind of values the consume request can have and how to use them in some special cases.
The respective TankGo struct is the ConsumeRequest
struct which can be found here. An example of a new consume request is:
req := message.ConsumeRequest{
ClientVersion: 1,
RequestID: 1,
Client: "my_go_client",
MaxWaitMS: 0,
MinBytes: 0,
Topics: []message.FetchRequestTopic{
{
Name: "test_topic",
Partitions: []message.FetchRequestTopicPartition{
{
PartitionID: 0,
ABSSequenceNumber: 1,
FetchSize: 1024,
},
},
},
},
}
your program should always call Ping()
method and on succes call the Subscribe(r *message.ConsumeRequest, maxConcurrentReads int) (<-chan message.Log, <-chan error)
method like this:
err := s.Ping()
// Check err here and on success continue.
messages, errChan := s.Subscribe(&req, 100)
Where req
is the consume request and maxConcurrentReads
sets the size of the buffered size returned.
Because TankGo is channel-based it is important your program should take into account all the pros and cons a channel-based approach can have.
For example, if the consumer blocks frequently and the channel is full that means the TankGo client will not be able to push new messages and will eventually slow down reading from the underlying stream. So it is very important that your program should take that into account and try to never block the gorouting that consumes from the returned channel.
Another important thing to notice is that Subscribe
returns also an error channel that should be also checked for errors if any. A good approach would be to range over the messages
channel and when this is over check the error in the errChan
. It will always have a value even if it is a nil one.
- Always call
defer Close()
to close the underlying connection with Tank. - On a successful consume request Tank server will start streaming all the available data specific for this request and it will expect to be consumed from the client. If the client exits early before consuming everything a new consume request to Tank will result to an error response.
- To handle such cases the
Reset(ctx context.Context)
can be used. - The method
GetTopicsHighWaterMark(r *message.ConsumeRequest)
is a hacky way to get the sequence number of the latest committed of the specified topic/partition. Because it returns early without consuming all the responses from Tank a call toReset
is needed afterward.