Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol Changes #8

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 64 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,65 @@
# liftbridge-grpc
Liftbridge API
==============

Protobuf definitions for the [Liftbridge](https://github.com/liftbridge-io/liftbridge) gRPC API.
This repository contains the public API definitions for
[Liftbridge](https://github.com/liftbridge-io/liftbridge). It is primarily intended for
Liftbridge client developers.

## gRPC API

The client-facing gRPC API is defined in [api.fbs](api.fbs).

## Direct NATS API

It is also possible for a client to publish messages to Liftbridge via NATS directly.
Liftbridge accepts plain NATS messages, allowing it to make existing subjects durable
without any publisher changes. However, these messages will not have some features such
as acks.

In order to opt into Liftbridge-specific features, the message must be prefixed with the
following header and be encoded as a `PublishRequest` (defined in
[api.fbs](api.fbs)).

### Liftbridge Envelope Header

```
0 8 16 24 32
├───────────────┴───────────────┴───────────────┴───────────────┤
│ Magic Number │
├───────────────┬───────────────┬───────────────┬───────────────┤
│ Version │ HeaderLen │ Flags │ Reserved │
├───────────────┴───────────────┴───────────────┴───────────────┤
│ CRC-32C (optional) │
└───────────────────────────────────────────────────────────────┘
```


#### Magic number [4 bytes]

The Liftbridge magic number is `B9 0E 43 B4`. This was chosen by random but deliberately
restricted to invalid UTF-8 to reduce the chance of a collision. This was also verified
to not match known file signatures.

#### Version [1 byte]

The version byte allows for future protocol upgrades. This should only be bumped if the
envelope format changes or if the message encoding changes in a non-backwards-compatible
way. Adding fields to the messages should not require a version bump.

#### HeaderLen [1 byte]

The header length is the offset of the payload. This is included primarily for safety.

#### Flags [1 byte]

The flag bits are defined as follows:

| Bit | Description |
| --- | --------------- |
| 0 | CRC-32C enabled |

#### CRC-32C [4 bytes, optional]

The CRC-32C (Castagnoli) is the checksum of the payload (i.e. from HeaderLen to the
end). This is optional but should significantly reduce the chance that a random NATS
message is interpreted as a Liftbridge message.
140 changes: 140 additions & 0 deletions api.fbs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
namespace proto;

table MessageHeader {
key : string (id: 0);
value : [ubyte] (id: 1);
}

// Message is the core immutable unit that is propagated/stored.
table Message {
value : [ubyte] (id: 0);
key : [ubyte] (id: 1); // Message key used for log compaction
reply : string (id: 2); // NATS subject to reply to
headers : [MessageHeader] (id: 3); // Message headers
subject : string (id: 4); // NATS subject message was received on
offset : int64 (id: 5); // Monotonic message offset in the stream
}

table CreateStreamRequest {
subject : string (id: 0); // NATS subject to attach to
stream : string (id: 1); // Stream name (unique per subject)
group : string (id: 2); // Partitions NATS subject amongst group members
replicationFactor : int32 (id: 3); // Number of stream replicas (-1 to replicate to the entire cluster)
partitions : uint32 (id: 4); // Number of stream partitions
}

table CreateStreamResponse {
}

// AckPolicy controls the behavior of message acknowledgements.
enum AckPolicy : uint8 {
LEADER = 0, // The ack will be sent once the leader has written the message to its log
ALL = 1, // The ack will be sent after the ISR replicas have written the message to their logs
NONE = 2, // No ack will be sent
}

// PublishRequest is used for both gRPC Publish requests as well as direct NATS
// publishes (see the README for the envelope format).
table PublishRequest {
message : [ubyte] (id: 0, nested_flatbuffer: "Message");
correlationId : [ubyte] (id: 1); // User-supplied id to correlate acks to publishes
ackInbox : string (id: 2); // NATS subject to publish acks to
ackPolicy : AckPolicy (id: 3); // Controls the behavior of acks
}

table PublishResponse {
ack : Ack (id: 0); // The ack for the published message if AckPolicy was not NONE
}

// Ack represents an acknowledgement that a message was committed to a stream partition.
table Ack {
correlationId : [ubyte] (id: 0); // User-supplied value from the message
stream : string (id: 1); // Name of the stream
partitionSubject : string (id: 2); // NATS subject the partition is attached to
msgSubject : string (id: 3); // NATS subject the message was received on
offset : int64 (id: 4); // Stream offset the message was committed to
ackInbox : string (id: 5); // NATS subject to publish acks to
ackPolicy : AckPolicy (id: 6); // The AckPolicy sent on the message
}

enum StartPosition : uint8 {
NEW_ONLY = 0, // Start at new messages after the latest
OFFSET = 1, // Start at a specified offset
EARLIEST = 2, // Start at the oldest message
LATEST = 3, // Start at the newest message
TIMESTAMP = 4, // Start at a specified (ack) timestamp
}
Copy link
Contributor

@caioaao caioaao Jan 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo start position could be an Union instead. This should make consistency easier (eg: not setting an offset when start position type is timestamp)


table SubscribeRequest {
stream : string (id: 0); // Stream name to subscribe to
partitionId : uint32 (id: 1); // Stream partition to subscribe to
startPosition : StartPosition (id: 2); // Where to begin consuming from
startOffset : int64 (id: 3); // Offset to begin consuming from
startTimestamp : int64 (id: 4); // Timestamp to begin consuming from
}

table StreamMessage {
message : [ubyte] (id: 0, nested_flatbuffer: "Message");
subject : string (id: 1); // NATS subject message was received on
offset : int64 (id: 2); // Monotonic message offset in the stream
timestamp : int64 (id: 3); // When the message was received by the broker
}

table Broker {
id : string (id: 0); // Broker id
host : string (id: 1); // Broker host
port : int32 (id: 2); // Broker port
}

// PartitionMetadata contains information for a stream partition.
table PartitionMetadata {
id : uint32 (id: 0); // Partition id
leader : string (id: 1); // Broker id of the partition leader
replicas : [string] (id: 2); // Broker ids of the partition replicas
isr : [string] (id: 3); // Broker ids of the in-sync replica set
}

enum StreamError : uint8 {
OK = 0,
UNKNOWN_STREAM = 1,
}

table StreamMetadata {
stream : string (id: 0);
subject : string (id: 1); // NATS subject the stream is attached to
error : StreamError (id: 2); // Indicates if there was something wrong with the requested stream
partitions : [PartitionMetadata] (id: 3); // Information for the stream partitions
}

table FetchMetadataRequest {
streams : [string] (id: 0); // The streams to fetch metadata for (all if empty)
}

table FetchMetadataResponse {
brokers : [Broker] (id: 0);
metadata : [StreamMetadata] (id: 1);
}

// API is the main Liftbridge server interface clients interact with.
rpc_service API {
// CreateStream creates a new stream attached to a NATS subject. It returns
// an AlreadyExists status code if a stream with the given subject and name
// already exists.
CreateStream(CreateStreamRequest) : CreateStreamResponse;

// FetchMetadata retrieves the latest cluster metadata, including stream
// broker information.
FetchMetadata(FetchMetadataRequest) : FetchMetadataResponse;

// Publish a new message to a subject. If the AckPolicy is not NONE and a
// deadline is provided, this will synchronously block until the ack is
// received. If the ack is not received in time, a DeadlineExceeded status
// code is returned.
Publish(PublishRequest) : PublishResponse;

// Subscribe creates an ephemeral subscription for the given stream. It
// begins to receive messages starting at the given offset and waits for
// new messages when it reaches the end of the stream. Use the request
// context to close the subscription.
Subscribe(SubscribeRequest) : StreamMessage (streaming: "server");
}
189 changes: 189 additions & 0 deletions go/API_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
//Generated by gRPC Go plugin
//If you make any local changes, they will be lost
//source: api

package proto

import (
context "context"
grpc "google.golang.org/grpc"
)

// Client API for API service
type APIClient interface{
CreateStream(ctx context.Context, in *CreateStreamRequest,
opts... grpc.CallOption) (* CreateStreamResponse, error)
Subscribe(ctx context.Context, in *SubscribeRequest,
opts... grpc.CallOption) (API_SubscribeClient, error)
FetchMetadata(ctx context.Context, in *FetchMetadataRequest,
opts... grpc.CallOption) (* FetchMetadataResponse, error)
Publish(ctx context.Context, in *PublishRequest,
opts... grpc.CallOption) (* PublishResponse, error)
}

type aPIClient struct {
cc *grpc.ClientConn
}

func NewAPIClient(cc *grpc.ClientConn) APIClient {
return &aPIClient{cc}
}

func (c *aPIClient) CreateStream(ctx context.Context, in *CreateStreamRequest,
opts... grpc.CallOption) (* CreateStreamResponse, error) {
out := new(CreateStreamResponse)
err := grpc.Invoke(ctx, "/proto.API/CreateStream", in, out, c.cc, opts...)
if err != nil { return nil, err }
return out, nil
}

func (c *aPIClient) Subscribe(ctx context.Context, in *SubscribeRequest,
opts... grpc.CallOption) (API_SubscribeClient, error) {
stream, err := grpc.NewClientStream(ctx, &_API_serviceDesc.Streams[0], c.cc, "/proto.API/Subscribe", opts...)
if err != nil { return nil, err }
x := &aPISubscribeClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil { return nil, err }
if err := x.ClientStream.CloseSend(); err != nil { return nil, err }
return x,nil
}

type API_SubscribeClient interface {
Recv() (*Message, error)
grpc.ClientStream
}

type aPISubscribeClient struct{
grpc.ClientStream
}

func (x *aPISubscribeClient) Recv() (*Message, error) {
m := new(Message)
if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err }
return m, nil
}

func (c *aPIClient) FetchMetadata(ctx context.Context, in *FetchMetadataRequest,
opts... grpc.CallOption) (* FetchMetadataResponse, error) {
out := new(FetchMetadataResponse)
err := grpc.Invoke(ctx, "/proto.API/FetchMetadata", in, out, c.cc, opts...)
if err != nil { return nil, err }
return out, nil
}

func (c *aPIClient) Publish(ctx context.Context, in *PublishRequest,
opts... grpc.CallOption) (* PublishResponse, error) {
out := new(PublishResponse)
err := grpc.Invoke(ctx, "/proto.API/Publish", in, out, c.cc, opts...)
if err != nil { return nil, err }
return out, nil
}

// Server API for API service
type APIServer interface {
CreateStream(context.Context, *CreateStreamRequest) (*CreateStreamResponse, error)
Subscribe(*SubscribeRequest, API_SubscribeServer) error
FetchMetadata(context.Context, *FetchMetadataRequest) (*FetchMetadataResponse, error)
Publish(context.Context, *PublishRequest) (*PublishResponse, error)
}

func RegisterAPIServer(s *grpc.Server, srv APIServer) {
s.RegisterService(&_API_serviceDesc, srv)
}

func _API_CreateStream_Handler(srv interface{}, ctx context.Context,
dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateStreamRequest)
if err := dec(in); err != nil { return nil, err }
if interceptor == nil { return srv.(APIServer).CreateStream(ctx, in) }
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.API/CreateStream",
}

handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).CreateStream(ctx, req.(* CreateStreamRequest))
}
return interceptor(ctx, in, info, handler)
}


func _API_Subscribe_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeRequest)
if err := stream.RecvMsg(m); err != nil { return err }
return srv.(APIServer).Subscribe(m, &aPISubscribeServer{stream})
}

type API_SubscribeServer interface {
Send(* Message) error
grpc.ServerStream
}

type aPISubscribeServer struct {
grpc.ServerStream
}

func (x *aPISubscribeServer) Send(m *Message) error {
return x.ServerStream.SendMsg(m)
}


func _API_FetchMetadata_Handler(srv interface{}, ctx context.Context,
dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(FetchMetadataRequest)
if err := dec(in); err != nil { return nil, err }
if interceptor == nil { return srv.(APIServer).FetchMetadata(ctx, in) }
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.API/FetchMetadata",
}

handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).FetchMetadata(ctx, req.(* FetchMetadataRequest))
}
return interceptor(ctx, in, info, handler)
}


func _API_Publish_Handler(srv interface{}, ctx context.Context,
dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishRequest)
if err := dec(in); err != nil { return nil, err }
if interceptor == nil { return srv.(APIServer).Publish(ctx, in) }
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/proto.API/Publish",
}

handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).Publish(ctx, req.(* PublishRequest))
}
return interceptor(ctx, in, info, handler)
}


var _API_serviceDesc = grpc.ServiceDesc{
ServiceName: "proto.API",
HandlerType: (*APIServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CreateStream",
Handler: _API_CreateStream_Handler,
},
{
MethodName: "FetchMetadata",
Handler: _API_FetchMetadata_Handler,
},
{
MethodName: "Publish",
Handler: _API_Publish_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Subscribe",
Handler: _API_Subscribe_Handler,
ServerStreams: true,
},
},
}

Loading