Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC VStream Copy: streaming events from the beginning #6277

Closed
rohit-nayak-ps opened this issue Jun 6, 2020 · 1 comment
Closed

RFC VStream Copy: streaming events from the beginning #6277

rohit-nayak-ps opened this issue Jun 6, 2020 · 1 comment
Milestone

Comments

@rohit-nayak-ps
Copy link
Contributor

VStream Copy

Allow vstreams to stream entire databases or tables

Motivation

Currently the vstream API streams events starting either from the current position of the binlog or from a position specified by the client. The VStream Copy feature adds support to send all events starting from the first position of the binlog.

A naive extension of the current mechanism is to stream from the starting position. However this is
impractical for any database/table of a reasonable size. We will extend VStream to make use of the bulk copy based mechanism similar to vreplication streams, used in MoveTables or Reshard sharding workflows.

Note that with vstream copy the client vstream will not faithfully reproduce the events from the binlog. The aim is to be eventually (and rapidly) consistent with the current database snapshot. This improves performance since we will be merging multiple row updates into a single transaction.
Once we have caught up (i.e. the replication lag is small) binlog events will again be directly streamed similar to the current implementation.

Current API

Clients create vstreams by grpc-ing to VTGate using the Vstream API call. In golang:

conn, _ := 	VTGate.Dial(ctx, "localhost:15991")
// tabletType is one of replica/master/rdonly, filter,vgtid: see below
reader, _ := 	VStream(ctx, tabletType, vgtid, filter)
e, _  := 	reader.Recv() //receive VEvents in a loop until io.EOF

It is possible for network errors to occur or for client process to fail. In addition, the vstreamer itself might fail at VTGate or at VTTablet. Thus, VTGate needs to send state frequently allowing VTGate to be stateless and clients to recover properly from failures.

Also, while creating the stream the client can specify multiple shards and/or keyspaces from which to stream events.

The vgtid structure facilitates both: determining the stream sources and maintaining state. vgtid is a list of tuples: (keyspace, shard, gtid). When a stream is created, gtid can either be “current” or a valid binlog position at which the vstream starts streaming events.

Some examples:

// stream from current position from two shards
vgtid := &binlogdatapb.VGtid{
         		ShardGtids: []*binlogdatapb.ShardGtid{{
         			Keyspace: "ks",
         			Shard:    "-40",
         			Gtid:     "current",
         		},{
         			Keyspace: "ks",
         			Shard:    "80-c0",
         			Gtid:     "current",
         		}}
         }

// stream from specific position from all shards in keyspace ks
vgtid := &binlogdatapb.VGtid{
         		ShardGtids: []*binlogdatapb.ShardGtid{{
         			Keyspace: "ks",
         			Gtid:     "MariaDB/0-41983-20",
                }}
         }

// stream from current position from all keyspaces
vgtid := &binlogdatapb.VGtid{
         		ShardGtids: []*binlogdatapb.ShardGtid{{
         			Gtid:     "current",
                }}
         }

The data streamed is sourced from the list of keyspace/shards after applying the specified filter.

To achieve this VTGate sends a vgtid event whenever it encounters a gtid event with the current vgtid state at VTGate. Thus if the stream is broken, for any reason, the client needs to simply create a new vstream using the last vgtid that it received.

Architecture/Design

During a copy there will two distinct phases:

  1. Copy phase: where the vstreamer is sending row data in bulk using the primary key to “paginate” the table
  2. Replication phase: once copying is completed and going forward we only stream events

The copy phase is nuanced: we copy a batch of rows until a particular PK using a consistent snapshot. However once the copy is completed the binlog position would have moved possibly containing updates to the rows already transmitted.
Hence we need to perform a “catchup” where we play the events up to the current position. We can only send updates to rows that we have already sent to the stream.

After the catchup we send the next batch of rows and perform the related catchup. This copy-catchup loop continues until all tables are copied, after which it is business as usual and events are streamed as they appear in the binlog.

API Changes for VStream Copy

To use VStream Copy you just need to pass an empty string as the position.
The only other change is in the vgtid structure. It now becomes a list of

(keyspace, shard, gtid,[]LastTablePK)

While the copy is in progress, the LastPK list contains the last seen primary key for each table in that shard. Once copy is completed and we are replicating the stream this parameter will be nil.

Note that the vgtid is opaque to the consumer of the vstream API once the vstream starts and the ongoing state does not need to be interpreted on the client.

To start a VStream Copy user is expected to provide an empty gtid along with a list of tables to copy
(essentially a LastTablePK list with a nil PK for each). Some examples
(see https://github.com/vitessio/contrib/blob/master/vstream_client/vstream_client.go for a sample client):

// vstream copy two tables table from two shards
filter := &binlogdatapb.Filter{
		Rules: []*binlogdatapb.Rule{{
			Match: "t2",
            Filter: "select id, val from t2",
		},{
			Match: "t1",
            Filter: "select * from t1",
        }},
	}
vgtid := &binlogdatapb.VGtid{
         		ShardGtids: []*binlogdatapb.ShardGtid{{
         			Keyspace: "ks",
         			Shard:    "-40",
         			Gtid:     "",
         		},{
         			Keyspace: "ks",
         			Shard:    "80-c0",
         			Gtid:     "",
         		}}
         }

// stream the entire database: vstream copy from all tables in all keyspaces
filter := &binlogdatapb.Filter{
		Rules: []*binlogdatapb.Rule{{
			Match: "/.*/",
		}},
	}
vgtid := &binlogdatapb.VGtid{
         		ShardGtids: []*binlogdatapb.ShardGtid{{
         			Gtid:     "",
                }}
         }
@deepthi deepthi added this to the v7.0 milestone Jun 15, 2020
@deepthi
Copy link
Member

deepthi commented Jun 22, 2020

Fixed by #6232

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants