diff --git a/go/cmd/vtgateclienttest/services/echo.go b/go/cmd/vtgateclienttest/services/echo.go index edb05ddcf9e..ce7604124a9 100644 --- a/go/cmd/vtgateclienttest/services/echo.go +++ b/go/cmd/vtgateclienttest/services/echo.go @@ -29,6 +29,7 @@ import ( "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/vtgate/vtgateservice" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" @@ -395,3 +396,33 @@ func (c *echoClient) UpdateStream(ctx context.Context, keyspace string, shard st } return c.fallbackClient.UpdateStream(ctx, keyspace, shard, keyRange, tabletType, timestamp, event, callback) } + +func (c *echoClient) VStream(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, callback func([]*binlogdatapb.VEvent) error) error { + if strings.HasPrefix(vgtid.ShardGtids[0].Shard, EchoPrefix) { + _ = callback([]*binlogdatapb.VEvent{ + { + Type: 1, + Timestamp: 1234, + Gtid: "echo-gtid-1", + Ddl: "echo-ddl-1", + Vgtid: vgtid, + RowEvent: &binlogdatapb.RowEvent{ + TableName:"echo-table-1", + }, + }, + { + Type: 2, + Timestamp: 4321, + Gtid: "echo-gtid-2", + Ddl: "echo-ddl-2", + Vgtid: vgtid, + FieldEvent: &binlogdatapb.FieldEvent{ + TableName:"echo-table-2", + }, + }, + }) + return nil + } + + return c.fallbackClient.VStream(ctx, tabletType, vgtid, filter, callback) +} diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index b56143a272c..fdfcd7e51ee 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -54,7 +54,7 @@ func (x TransactionMode) String() string { return proto.EnumName(TransactionMode_name, int32(x)) } func (TransactionMode) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{0} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{0} } // CommitOrder is used to designate which of the ShardSessions @@ -89,7 +89,7 @@ func (x CommitOrder) String() string { return proto.EnumName(CommitOrder_name, int32(x)) } func (CommitOrder) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{1} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{1} } // Session objects are exchanged like cookies through various @@ -138,7 +138,7 @@ func (m *Session) Reset() { *m = Session{} } func (m *Session) String() string { return proto.CompactTextString(m) } func (*Session) ProtoMessage() {} func (*Session) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{0} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{0} } func (m *Session) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Session.Unmarshal(m, b) @@ -240,7 +240,7 @@ func (m *Session_ShardSession) Reset() { *m = Session_ShardSession{} } func (m *Session_ShardSession) String() string { return proto.CompactTextString(m) } func (*Session_ShardSession) ProtoMessage() {} func (*Session_ShardSession) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{0, 0} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{0, 0} } func (m *Session_ShardSession) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_Session_ShardSession.Unmarshal(m, b) @@ -298,7 +298,7 @@ func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteRequest) ProtoMessage() {} func (*ExecuteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{1} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{1} } func (m *ExecuteRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteRequest.Unmarshal(m, b) @@ -386,7 +386,7 @@ func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteResponse) ProtoMessage() {} func (*ExecuteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{2} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{2} } func (m *ExecuteResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteResponse.Unmarshal(m, b) @@ -456,7 +456,7 @@ func (m *ExecuteShardsRequest) Reset() { *m = ExecuteShardsRequest{} } func (m *ExecuteShardsRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteShardsRequest) ProtoMessage() {} func (*ExecuteShardsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{3} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{3} } func (m *ExecuteShardsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteShardsRequest.Unmarshal(m, b) @@ -551,7 +551,7 @@ func (m *ExecuteShardsResponse) Reset() { *m = ExecuteShardsResponse{} } func (m *ExecuteShardsResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteShardsResponse) ProtoMessage() {} func (*ExecuteShardsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{4} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{4} } func (m *ExecuteShardsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteShardsResponse.Unmarshal(m, b) @@ -622,7 +622,7 @@ func (m *ExecuteKeyspaceIdsRequest) Reset() { *m = ExecuteKeyspaceIdsReq func (m *ExecuteKeyspaceIdsRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteKeyspaceIdsRequest) ProtoMessage() {} func (*ExecuteKeyspaceIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{5} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{5} } func (m *ExecuteKeyspaceIdsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteKeyspaceIdsRequest.Unmarshal(m, b) @@ -717,7 +717,7 @@ func (m *ExecuteKeyspaceIdsResponse) Reset() { *m = ExecuteKeyspaceIdsRe func (m *ExecuteKeyspaceIdsResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteKeyspaceIdsResponse) ProtoMessage() {} func (*ExecuteKeyspaceIdsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{6} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{6} } func (m *ExecuteKeyspaceIdsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteKeyspaceIdsResponse.Unmarshal(m, b) @@ -788,7 +788,7 @@ func (m *ExecuteKeyRangesRequest) Reset() { *m = ExecuteKeyRangesRequest func (m *ExecuteKeyRangesRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteKeyRangesRequest) ProtoMessage() {} func (*ExecuteKeyRangesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{7} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{7} } func (m *ExecuteKeyRangesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteKeyRangesRequest.Unmarshal(m, b) @@ -883,7 +883,7 @@ func (m *ExecuteKeyRangesResponse) Reset() { *m = ExecuteKeyRangesRespon func (m *ExecuteKeyRangesResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteKeyRangesResponse) ProtoMessage() {} func (*ExecuteKeyRangesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{8} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{8} } func (m *ExecuteKeyRangesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteKeyRangesResponse.Unmarshal(m, b) @@ -956,7 +956,7 @@ func (m *ExecuteEntityIdsRequest) Reset() { *m = ExecuteEntityIdsRequest func (m *ExecuteEntityIdsRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteEntityIdsRequest) ProtoMessage() {} func (*ExecuteEntityIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{9} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{9} } func (m *ExecuteEntityIdsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteEntityIdsRequest.Unmarshal(m, b) @@ -1055,7 +1055,7 @@ func (m *ExecuteEntityIdsRequest_EntityId) Reset() { *m = ExecuteEntityI func (m *ExecuteEntityIdsRequest_EntityId) String() string { return proto.CompactTextString(m) } func (*ExecuteEntityIdsRequest_EntityId) ProtoMessage() {} func (*ExecuteEntityIdsRequest_EntityId) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{9, 0} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{9, 0} } func (m *ExecuteEntityIdsRequest_EntityId) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteEntityIdsRequest_EntityId.Unmarshal(m, b) @@ -1115,7 +1115,7 @@ func (m *ExecuteEntityIdsResponse) Reset() { *m = ExecuteEntityIdsRespon func (m *ExecuteEntityIdsResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteEntityIdsResponse) ProtoMessage() {} func (*ExecuteEntityIdsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{10} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{10} } func (m *ExecuteEntityIdsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteEntityIdsResponse.Unmarshal(m, b) @@ -1180,7 +1180,7 @@ func (m *ExecuteBatchRequest) Reset() { *m = ExecuteBatchRequest{} } func (m *ExecuteBatchRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchRequest) ProtoMessage() {} func (*ExecuteBatchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{11} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{11} } func (m *ExecuteBatchRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchRequest.Unmarshal(m, b) @@ -1268,7 +1268,7 @@ func (m *ExecuteBatchResponse) Reset() { *m = ExecuteBatchResponse{} } func (m *ExecuteBatchResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchResponse) ProtoMessage() {} func (*ExecuteBatchResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{12} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{12} } func (m *ExecuteBatchResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchResponse.Unmarshal(m, b) @@ -1328,7 +1328,7 @@ func (m *BoundShardQuery) Reset() { *m = BoundShardQuery{} } func (m *BoundShardQuery) String() string { return proto.CompactTextString(m) } func (*BoundShardQuery) ProtoMessage() {} func (*BoundShardQuery) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{13} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{13} } func (m *BoundShardQuery) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BoundShardQuery.Unmarshal(m, b) @@ -1396,7 +1396,7 @@ func (m *ExecuteBatchShardsRequest) Reset() { *m = ExecuteBatchShardsReq func (m *ExecuteBatchShardsRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchShardsRequest) ProtoMessage() {} func (*ExecuteBatchShardsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{14} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{14} } func (m *ExecuteBatchShardsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchShardsRequest.Unmarshal(m, b) @@ -1477,7 +1477,7 @@ func (m *ExecuteBatchShardsResponse) Reset() { *m = ExecuteBatchShardsRe func (m *ExecuteBatchShardsResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchShardsResponse) ProtoMessage() {} func (*ExecuteBatchShardsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{15} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{15} } func (m *ExecuteBatchShardsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchShardsResponse.Unmarshal(m, b) @@ -1538,7 +1538,7 @@ func (m *BoundKeyspaceIdQuery) Reset() { *m = BoundKeyspaceIdQuery{} } func (m *BoundKeyspaceIdQuery) String() string { return proto.CompactTextString(m) } func (*BoundKeyspaceIdQuery) ProtoMessage() {} func (*BoundKeyspaceIdQuery) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{16} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{16} } func (m *BoundKeyspaceIdQuery) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BoundKeyspaceIdQuery.Unmarshal(m, b) @@ -1605,7 +1605,7 @@ func (m *ExecuteBatchKeyspaceIdsRequest) Reset() { *m = ExecuteBatchKeys func (m *ExecuteBatchKeyspaceIdsRequest) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchKeyspaceIdsRequest) ProtoMessage() {} func (*ExecuteBatchKeyspaceIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{17} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{17} } func (m *ExecuteBatchKeyspaceIdsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchKeyspaceIdsRequest.Unmarshal(m, b) @@ -1686,7 +1686,7 @@ func (m *ExecuteBatchKeyspaceIdsResponse) Reset() { *m = ExecuteBatchKey func (m *ExecuteBatchKeyspaceIdsResponse) String() string { return proto.CompactTextString(m) } func (*ExecuteBatchKeyspaceIdsResponse) ProtoMessage() {} func (*ExecuteBatchKeyspaceIdsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{18} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{18} } func (m *ExecuteBatchKeyspaceIdsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ExecuteBatchKeyspaceIdsResponse.Unmarshal(m, b) @@ -1750,7 +1750,7 @@ func (m *StreamExecuteRequest) Reset() { *m = StreamExecuteRequest{} } func (m *StreamExecuteRequest) String() string { return proto.CompactTextString(m) } func (*StreamExecuteRequest) ProtoMessage() {} func (*StreamExecuteRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{19} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{19} } func (m *StreamExecuteRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteRequest.Unmarshal(m, b) @@ -1829,7 +1829,7 @@ func (m *StreamExecuteResponse) Reset() { *m = StreamExecuteResponse{} } func (m *StreamExecuteResponse) String() string { return proto.CompactTextString(m) } func (*StreamExecuteResponse) ProtoMessage() {} func (*StreamExecuteResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{20} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{20} } func (m *StreamExecuteResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteResponse.Unmarshal(m, b) @@ -1880,7 +1880,7 @@ func (m *StreamExecuteShardsRequest) Reset() { *m = StreamExecuteShardsR func (m *StreamExecuteShardsRequest) String() string { return proto.CompactTextString(m) } func (*StreamExecuteShardsRequest) ProtoMessage() {} func (*StreamExecuteShardsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{21} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{21} } func (m *StreamExecuteShardsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteShardsRequest.Unmarshal(m, b) @@ -1957,7 +1957,7 @@ func (m *StreamExecuteShardsResponse) Reset() { *m = StreamExecuteShards func (m *StreamExecuteShardsResponse) String() string { return proto.CompactTextString(m) } func (*StreamExecuteShardsResponse) ProtoMessage() {} func (*StreamExecuteShardsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{22} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{22} } func (m *StreamExecuteShardsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteShardsResponse.Unmarshal(m, b) @@ -2009,7 +2009,7 @@ func (m *StreamExecuteKeyspaceIdsRequest) Reset() { *m = StreamExecuteKe func (m *StreamExecuteKeyspaceIdsRequest) String() string { return proto.CompactTextString(m) } func (*StreamExecuteKeyspaceIdsRequest) ProtoMessage() {} func (*StreamExecuteKeyspaceIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{23} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{23} } func (m *StreamExecuteKeyspaceIdsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteKeyspaceIdsRequest.Unmarshal(m, b) @@ -2086,7 +2086,7 @@ func (m *StreamExecuteKeyspaceIdsResponse) Reset() { *m = StreamExecuteK func (m *StreamExecuteKeyspaceIdsResponse) String() string { return proto.CompactTextString(m) } func (*StreamExecuteKeyspaceIdsResponse) ProtoMessage() {} func (*StreamExecuteKeyspaceIdsResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{24} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{24} } func (m *StreamExecuteKeyspaceIdsResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteKeyspaceIdsResponse.Unmarshal(m, b) @@ -2138,7 +2138,7 @@ func (m *StreamExecuteKeyRangesRequest) Reset() { *m = StreamExecuteKeyR func (m *StreamExecuteKeyRangesRequest) String() string { return proto.CompactTextString(m) } func (*StreamExecuteKeyRangesRequest) ProtoMessage() {} func (*StreamExecuteKeyRangesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{25} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{25} } func (m *StreamExecuteKeyRangesRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteKeyRangesRequest.Unmarshal(m, b) @@ -2215,7 +2215,7 @@ func (m *StreamExecuteKeyRangesResponse) Reset() { *m = StreamExecuteKey func (m *StreamExecuteKeyRangesResponse) String() string { return proto.CompactTextString(m) } func (*StreamExecuteKeyRangesResponse) ProtoMessage() {} func (*StreamExecuteKeyRangesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{26} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{26} } func (m *StreamExecuteKeyRangesResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_StreamExecuteKeyRangesResponse.Unmarshal(m, b) @@ -2261,7 +2261,7 @@ func (m *BeginRequest) Reset() { *m = BeginRequest{} } func (m *BeginRequest) String() string { return proto.CompactTextString(m) } func (*BeginRequest) ProtoMessage() {} func (*BeginRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{27} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{27} } func (m *BeginRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BeginRequest.Unmarshal(m, b) @@ -2308,7 +2308,7 @@ func (m *BeginResponse) Reset() { *m = BeginResponse{} } func (m *BeginResponse) String() string { return proto.CompactTextString(m) } func (*BeginResponse) ProtoMessage() {} func (*BeginResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{28} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{28} } func (m *BeginResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_BeginResponse.Unmarshal(m, b) @@ -2356,7 +2356,7 @@ func (m *CommitRequest) Reset() { *m = CommitRequest{} } func (m *CommitRequest) String() string { return proto.CompactTextString(m) } func (*CommitRequest) ProtoMessage() {} func (*CommitRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{29} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{29} } func (m *CommitRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CommitRequest.Unmarshal(m, b) @@ -2408,7 +2408,7 @@ func (m *CommitResponse) Reset() { *m = CommitResponse{} } func (m *CommitResponse) String() string { return proto.CompactTextString(m) } func (*CommitResponse) ProtoMessage() {} func (*CommitResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{30} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{30} } func (m *CommitResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_CommitResponse.Unmarshal(m, b) @@ -2444,7 +2444,7 @@ func (m *RollbackRequest) Reset() { *m = RollbackRequest{} } func (m *RollbackRequest) String() string { return proto.CompactTextString(m) } func (*RollbackRequest) ProtoMessage() {} func (*RollbackRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{31} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{31} } func (m *RollbackRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RollbackRequest.Unmarshal(m, b) @@ -2489,7 +2489,7 @@ func (m *RollbackResponse) Reset() { *m = RollbackResponse{} } func (m *RollbackResponse) String() string { return proto.CompactTextString(m) } func (*RollbackResponse) ProtoMessage() {} func (*RollbackResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{32} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{32} } func (m *RollbackResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_RollbackResponse.Unmarshal(m, b) @@ -2525,7 +2525,7 @@ func (m *ResolveTransactionRequest) Reset() { *m = ResolveTransactionReq func (m *ResolveTransactionRequest) String() string { return proto.CompactTextString(m) } func (*ResolveTransactionRequest) ProtoMessage() {} func (*ResolveTransactionRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{33} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{33} } func (m *ResolveTransactionRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ResolveTransactionRequest.Unmarshal(m, b) @@ -2581,7 +2581,7 @@ func (m *MessageStreamRequest) Reset() { *m = MessageStreamRequest{} } func (m *MessageStreamRequest) String() string { return proto.CompactTextString(m) } func (*MessageStreamRequest) ProtoMessage() {} func (*MessageStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{34} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{34} } func (m *MessageStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageStreamRequest.Unmarshal(m, b) @@ -2656,7 +2656,7 @@ func (m *MessageAckRequest) Reset() { *m = MessageAckRequest{} } func (m *MessageAckRequest) String() string { return proto.CompactTextString(m) } func (*MessageAckRequest) ProtoMessage() {} func (*MessageAckRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{35} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{35} } func (m *MessageAckRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageAckRequest.Unmarshal(m, b) @@ -2720,7 +2720,7 @@ func (m *IdKeyspaceId) Reset() { *m = IdKeyspaceId{} } func (m *IdKeyspaceId) String() string { return proto.CompactTextString(m) } func (*IdKeyspaceId) ProtoMessage() {} func (*IdKeyspaceId) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{36} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{36} } func (m *IdKeyspaceId) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_IdKeyspaceId.Unmarshal(m, b) @@ -2773,7 +2773,7 @@ func (m *MessageAckKeyspaceIdsRequest) Reset() { *m = MessageAckKeyspace func (m *MessageAckKeyspaceIdsRequest) String() string { return proto.CompactTextString(m) } func (*MessageAckKeyspaceIdsRequest) ProtoMessage() {} func (*MessageAckKeyspaceIdsRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{37} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{37} } func (m *MessageAckKeyspaceIdsRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_MessageAckKeyspaceIdsRequest.Unmarshal(m, b) @@ -2832,7 +2832,7 @@ func (m *ResolveTransactionResponse) Reset() { *m = ResolveTransactionRe func (m *ResolveTransactionResponse) String() string { return proto.CompactTextString(m) } func (*ResolveTransactionResponse) ProtoMessage() {} func (*ResolveTransactionResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{38} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{38} } func (m *ResolveTransactionResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_ResolveTransactionResponse.Unmarshal(m, b) @@ -2880,7 +2880,7 @@ type SplitQueryRequest struct { // SELECT FROM WHERE . // It must not contain subqueries nor any of the keywords // JOIN, GROUP BY, ORDER BY, LIMIT, DISTINCT. - // Furthermore,
must be a single “concrete” table. + // Furthermore,
must be a single "concrete" table. // It cannot be a view. Query *query.BoundQuery `protobuf:"bytes,3,opt,name=query,proto3" json:"query,omitempty"` // Each generated query-part will be restricted to rows whose values @@ -2950,7 +2950,7 @@ func (m *SplitQueryRequest) Reset() { *m = SplitQueryRequest{} } func (m *SplitQueryRequest) String() string { return proto.CompactTextString(m) } func (*SplitQueryRequest) ProtoMessage() {} func (*SplitQueryRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{39} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{39} } func (m *SplitQueryRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SplitQueryRequest.Unmarshal(m, b) @@ -3039,7 +3039,7 @@ func (m *SplitQueryResponse) Reset() { *m = SplitQueryResponse{} } func (m *SplitQueryResponse) String() string { return proto.CompactTextString(m) } func (*SplitQueryResponse) ProtoMessage() {} func (*SplitQueryResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{40} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{40} } func (m *SplitQueryResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SplitQueryResponse.Unmarshal(m, b) @@ -3080,7 +3080,7 @@ func (m *SplitQueryResponse_KeyRangePart) Reset() { *m = SplitQueryRespo func (m *SplitQueryResponse_KeyRangePart) String() string { return proto.CompactTextString(m) } func (*SplitQueryResponse_KeyRangePart) ProtoMessage() {} func (*SplitQueryResponse_KeyRangePart) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{40, 0} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{40, 0} } func (m *SplitQueryResponse_KeyRangePart) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SplitQueryResponse_KeyRangePart.Unmarshal(m, b) @@ -3128,7 +3128,7 @@ func (m *SplitQueryResponse_ShardPart) Reset() { *m = SplitQueryResponse func (m *SplitQueryResponse_ShardPart) String() string { return proto.CompactTextString(m) } func (*SplitQueryResponse_ShardPart) ProtoMessage() {} func (*SplitQueryResponse_ShardPart) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{40, 1} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{40, 1} } func (m *SplitQueryResponse_ShardPart) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SplitQueryResponse_ShardPart.Unmarshal(m, b) @@ -3181,7 +3181,7 @@ func (m *SplitQueryResponse_Part) Reset() { *m = SplitQueryResponse_Part func (m *SplitQueryResponse_Part) String() string { return proto.CompactTextString(m) } func (*SplitQueryResponse_Part) ProtoMessage() {} func (*SplitQueryResponse_Part) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{40, 2} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{40, 2} } func (m *SplitQueryResponse_Part) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_SplitQueryResponse_Part.Unmarshal(m, b) @@ -3242,7 +3242,7 @@ func (m *GetSrvKeyspaceRequest) Reset() { *m = GetSrvKeyspaceRequest{} } func (m *GetSrvKeyspaceRequest) String() string { return proto.CompactTextString(m) } func (*GetSrvKeyspaceRequest) ProtoMessage() {} func (*GetSrvKeyspaceRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{41} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{41} } func (m *GetSrvKeyspaceRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GetSrvKeyspaceRequest.Unmarshal(m, b) @@ -3282,7 +3282,7 @@ func (m *GetSrvKeyspaceResponse) Reset() { *m = GetSrvKeyspaceResponse{} func (m *GetSrvKeyspaceResponse) String() string { return proto.CompactTextString(m) } func (*GetSrvKeyspaceResponse) ProtoMessage() {} func (*GetSrvKeyspaceResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{42} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{42} } func (m *GetSrvKeyspaceResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_GetSrvKeyspaceResponse.Unmarshal(m, b) @@ -3327,7 +3327,7 @@ func (m *VStreamRequest) Reset() { *m = VStreamRequest{} } func (m *VStreamRequest) String() string { return proto.CompactTextString(m) } func (*VStreamRequest) ProtoMessage() {} func (*VStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{43} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{43} } func (m *VStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamRequest.Unmarshal(m, b) @@ -3387,7 +3387,7 @@ func (m *VStreamResponse) Reset() { *m = VStreamResponse{} } func (m *VStreamResponse) String() string { return proto.CompactTextString(m) } func (*VStreamResponse) ProtoMessage() {} func (*VStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{44} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{44} } func (m *VStreamResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_VStreamResponse.Unmarshal(m, b) @@ -3445,7 +3445,7 @@ func (m *UpdateStreamRequest) Reset() { *m = UpdateStreamRequest{} } func (m *UpdateStreamRequest) String() string { return proto.CompactTextString(m) } func (*UpdateStreamRequest) ProtoMessage() {} func (*UpdateStreamRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{45} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{45} } func (m *UpdateStreamRequest) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_UpdateStreamRequest.Unmarshal(m, b) @@ -3533,7 +3533,7 @@ func (m *UpdateStreamResponse) Reset() { *m = UpdateStreamResponse{} } func (m *UpdateStreamResponse) String() string { return proto.CompactTextString(m) } func (*UpdateStreamResponse) ProtoMessage() {} func (*UpdateStreamResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_vtgate_d9799c8e1157b676, []int{46} + return fileDescriptor_vtgate_339c92b13a08c8a7, []int{46} } func (m *UpdateStreamResponse) XXX_Unmarshal(b []byte) error { return xxx_messageInfo_UpdateStreamResponse.Unmarshal(m, b) @@ -3624,9 +3624,9 @@ func init() { proto.RegisterEnum("vtgate.CommitOrder", CommitOrder_name, CommitOrder_value) } -func init() { proto.RegisterFile("vtgate.proto", fileDescriptor_vtgate_d9799c8e1157b676) } +func init() { proto.RegisterFile("vtgate.proto", fileDescriptor_vtgate_339c92b13a08c8a7) } -var fileDescriptor_vtgate_d9799c8e1157b676 = []byte{ +var fileDescriptor_vtgate_339c92b13a08c8a7 = []byte{ // 2041 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x5a, 0xcd, 0x8f, 0x23, 0x47, 0x15, 0x4f, 0x77, 0xfb, 0xf3, 0xf9, 0x73, 0x6b, 0xbc, 0x1b, 0xc7, 0x19, 0x76, 0x26, 0x1d, 0x46, diff --git a/java/client/src/main/java/io/vitess/client/RpcClient.java b/java/client/src/main/java/io/vitess/client/RpcClient.java index 31bd7e51689..4c83df8ec4b 100644 --- a/java/client/src/main/java/io/vitess/client/RpcClient.java +++ b/java/client/src/main/java/io/vitess/client/RpcClient.java @@ -48,6 +48,8 @@ import io.vitess.proto.Vtgate.StreamExecuteKeyspaceIdsRequest; import io.vitess.proto.Vtgate.StreamExecuteRequest; import io.vitess.proto.Vtgate.StreamExecuteShardsRequest; +import io.vitess.proto.Vtgate.VStreamRequest; +import io.vitess.proto.Vtgate.VStreamResponse; import java.io.Closeable; import java.sql.SQLException; @@ -245,4 +247,16 @@ ListenableFuture splitQuery(Context ctx, SplitQueryRequest r */ ListenableFuture getSrvKeyspace( Context ctx, GetSrvKeyspaceRequest request) throws SQLException; + + /** + * Starts streaming the vstream binlog events. + * + * Stream begins at the specified VGTID. + * + *

See the + * proto + * definition for canonical documentation on this VTGate API. + */ + StreamIterator getVStream( + Context ctx, VStreamRequest vstreamRequest) throws SQLException; } diff --git a/java/client/src/test/java/io/vitess/client/RpcClientTest.java b/java/client/src/test/java/io/vitess/client/RpcClientTest.java index 67e69c93552..750cebff67f 100644 --- a/java/client/src/test/java/io/vitess/client/RpcClientTest.java +++ b/java/client/src/test/java/io/vitess/client/RpcClientTest.java @@ -21,6 +21,12 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; +import binlogdata.Binlogdata.FieldEvent; +import binlogdata.Binlogdata.RowEvent; +import binlogdata.Binlogdata.ShardGtid; +import binlogdata.Binlogdata.VEvent; +import binlogdata.Binlogdata.VEventType; +import binlogdata.Binlogdata.VGtid; import io.vitess.client.cursor.Cursor; import io.vitess.client.cursor.Row; import io.vitess.proto.Query; @@ -33,6 +39,8 @@ import io.vitess.proto.Topodata.SrvKeyspace.KeyspacePartition; import io.vitess.proto.Topodata.TabletType; import io.vitess.proto.Vtgate.SplitQueryResponse; +import io.vitess.proto.Vtgate.VStreamRequest; +import io.vitess.proto.Vtgate.VStreamResponse; import io.vitess.proto.Vtrpc.CallerID; import java.nio.charset.StandardCharsets; @@ -730,4 +738,51 @@ void execute(VTGateBlockingTx tx, String query) throws Exception { } }); } + + @Test + public void testVStream() throws Exception { + VGtid vgtid = VGtid.newBuilder() + .addShardGtids(ShardGtid.newBuilder() + .setGtid("gtid") + .setShard(ECHO_PREFIX + System.currentTimeMillis()) + .setKeyspace("keyspace: " + System.currentTimeMillis()) + .build()) + .build(); + + VStreamRequest vstreamRequest = VStreamRequest.newBuilder() + .setCallerId(CALLER_ID) + .setVgtid(vgtid) + .setTabletType(TABLET_TYPE) + .build(); + + StreamIterator vstream = client.getVStream(ctx, vstreamRequest); + VStreamResponse actual = vstream.next(); + Assert.assertFalse(vstream.hasNext()); + + VStreamResponse expected = VStreamResponse.newBuilder() + .addEvents(VEvent.newBuilder() + .setType(VEventType.forNumber(1)) + .setTimestamp(1234) + .setGtid("echo-gtid-1") + .setDdl("echo-ddl-1") + .setVgtid(vgtid) + .setRowEvent(RowEvent.newBuilder() + .setTableName("echo-table-1") + .build()) + .build()) + .addEvents(VEvent.newBuilder() + .setType(VEventType.forNumber(2)) + .setTimestamp(4321) + .setGtid("echo-gtid-2") + .setDdl("echo-ddl-2") + .setVgtid(vgtid) + .setFieldEvent(FieldEvent.newBuilder() + .setTableName("echo-table-2") + .build()) + .build()) + .build(); + + Assert.assertEquals(expected, actual); + } + } diff --git a/java/grpc-client/src/main/java/io/vitess/client/grpc/GrpcClient.java b/java/grpc-client/src/main/java/io/vitess/client/grpc/GrpcClient.java index cf8f8cdc94f..2f2587fbd3f 100644 --- a/java/grpc-client/src/main/java/io/vitess/client/grpc/GrpcClient.java +++ b/java/grpc-client/src/main/java/io/vitess/client/grpc/GrpcClient.java @@ -63,6 +63,7 @@ import io.vitess.proto.Vtgate.StreamExecuteResponse; import io.vitess.proto.Vtgate.StreamExecuteShardsRequest; import io.vitess.proto.Vtgate.StreamExecuteShardsResponse; +import io.vitess.proto.Vtgate.VStreamResponse; import io.vitess.proto.grpc.VitessGrpc; import io.vitess.proto.grpc.VitessGrpc.VitessFutureStub; import io.vitess.proto.grpc.VitessGrpc.VitessStub; @@ -283,6 +284,21 @@ public ListenableFuture getSrvKeyspace(Context ctx, new ExceptionConverter(), MoreExecutors.directExecutor()); } + @Override + public StreamIterator getVStream(Context ctx, + Vtgate.VStreamRequest vstreamRequest) { + GrpcStreamAdapter adapter = + new GrpcStreamAdapter() { + @Override + VStreamResponse getResult(VStreamResponse response) { + return response; + } + }; + + getAsyncStub(ctx).vStream(vstreamRequest, adapter); + return adapter; + } + /** * Converts an exception from the gRPC framework into the appropriate {@link SQLException}. */