-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
changefeed_dist.go
255 lines (230 loc) · 8.64 KB
/
changefeed_dist.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright 2018 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package changefeedccl
import (
"context"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)
func init() {
rowexec.NewChangeAggregatorProcessor = newChangeAggregatorProcessor
rowexec.NewChangeFrontierProcessor = newChangeFrontierProcessor
}
const (
changeAggregatorProcName = `changeagg`
changeFrontierProcName = `changefntr`
)
var changefeedResultTypes = []*types.T{
types.Bytes, // resolved span
types.String, // topic
types.Bytes, // key
types.Bytes, // value
}
// distChangefeedFlow plans and runs a distributed changefeed.
//
// One or more ChangeAggregator processors watch table data for changes. These
// transform the changed kvs into changed rows and either emit them to a sink
// (such as kafka) or, if there is no sink, forward them in columns 1,2,3 (where
// they will be eventually returned directly via pgwire). In either case,
// periodically a span will become resolved as of some timestamp, meaning that
// no new rows will ever be emitted at or below that timestamp. These span-level
// resolved timestamps are emitted as a marshaled `jobspb.ResolvedSpan` proto in
// column 0.
//
// The flow will always have exactly one ChangeFrontier processor which all the
// ChangeAggregators feed into. It collects all span-level resolved timestamps
// and aggregates them into a changefeed-level resolved timestamp, which is the
// minimum of the span-level resolved timestamps. This changefeed-level resolved
// timestamp is emitted into the changefeed sink (or returned to the gateway if
// there is no sink) whenever it advances. ChangeFrontier also updates the
// progress of the changefeed's corresponding system job.
func distChangefeedFlow(
ctx context.Context,
execCtx sql.JobExecContext,
jobID int64,
details jobspb.ChangefeedDetails,
progress jobspb.Progress,
resultsCh chan<- tree.Datums,
) error {
var err error
details, err = validateDetails(details)
if err != nil {
return err
}
// NB: A non-empty high water indicates that we have checkpointed a resolved
// timestamp. Skipping the initial scan is equivalent to starting the
// changefeed from a checkpoint at its start time. Initialize the progress
// based on whether we should perform an initial scan.
{
h := progress.GetHighWater()
noHighWater := (h == nil || h.IsEmpty())
// We want to set the highWater and thus avoid an initial scan if either
// this is a cursor and there was no request for one, or we don't have a
// cursor but we have a request to not have an initial scan.
if noHighWater && !initialScanFromOptions(details.Opts) {
// If there is a cursor, the statement time has already been set to it.
progress.Progress = &jobspb.Progress_HighWater{HighWater: &details.StatementTime}
}
}
spansTS := details.StatementTime
var initialHighWater hlc.Timestamp
if h := progress.GetHighWater(); h != nil && !h.IsEmpty() {
initialHighWater = *h
// If we have a high-water set, use it to compute the spans, since the
// ones at the statement time may have been garbage collected by now.
spansTS = initialHighWater
}
execCfg := execCtx.ExecCfg()
trackedSpans, err := fetchSpansForTargets(ctx, execCfg.DB, execCfg.Codec, details.Targets, spansTS)
if err != nil {
return err
}
// Changefeed flows handle transactional consistency themselves.
var noTxn *kv.Txn
dsp := execCtx.DistSQLPlanner()
evalCtx := execCtx.ExtendedEvalContext()
planCtx := dsp.NewPlanningCtx(ctx, evalCtx, nil /* planner */, noTxn, execCfg.Codec.ForSystemTenant() /* distribute */)
var spanPartitions []sql.SpanPartition
if details.SinkURI == `` {
// Sinkless feeds get one ChangeAggregator on the gateway.
spanPartitions = []sql.SpanPartition{{Node: dsp.GatewayID(), Spans: trackedSpans}}
} else {
// All other feeds get a ChangeAggregator local on the leaseholder.
spanPartitions, err = dsp.PartitionSpans(planCtx, trackedSpans)
if err != nil {
return err
}
}
corePlacement := make([]physicalplan.ProcessorCorePlacement, len(spanPartitions))
for i, sp := range spanPartitions {
// TODO(dan): Merge these watches with the span-level resolved
// timestamps from the job progress.
watches := make([]execinfrapb.ChangeAggregatorSpec_Watch, len(sp.Spans))
for watchIdx, nodeSpan := range sp.Spans {
watches[watchIdx] = execinfrapb.ChangeAggregatorSpec_Watch{
Span: nodeSpan,
InitialResolved: initialHighWater,
}
}
corePlacement[i].NodeID = sp.Node
corePlacement[i].Core.ChangeAggregator = &execinfrapb.ChangeAggregatorSpec{
Watches: watches,
Feed: details,
UserProto: execCtx.User().EncodeProto(),
}
}
// NB: This SpanFrontier processor depends on the set of tracked spans being
// static. Currently there is no way for them to change after the changefeed
// is created, even if it is paused and unpaused, but #28982 describes some
// ways that this might happen in the future.
changeFrontierSpec := execinfrapb.ChangeFrontierSpec{
TrackedSpans: trackedSpans,
Feed: details,
JobID: jobID,
UserProto: execCtx.User().EncodeProto(),
}
p := planCtx.NewPhysicalPlan()
p.AddNoInputStage(corePlacement, execinfrapb.PostProcessSpec{}, changefeedResultTypes, execinfrapb.Ordering{})
p.AddSingleGroupStage(
dsp.GatewayID(),
execinfrapb.ProcessorCoreUnion{ChangeFrontier: &changeFrontierSpec},
execinfrapb.PostProcessSpec{},
changefeedResultTypes,
)
p.PlanToStreamColMap = []int{1, 2, 3}
dsp.FinalizePlan(planCtx, p)
resultRows := makeChangefeedResultWriter(resultsCh)
recv := sql.MakeDistSQLReceiver(
ctx,
resultRows,
tree.Rows,
execCfg.RangeDescriptorCache,
noTxn,
nil, /* clockUpdater */
evalCtx.Tracing,
)
defer recv.Release()
var finishedSetupFn func()
if details.SinkURI != `` {
// We abuse the job's results channel to make CREATE CHANGEFEED wait for
// this before returning to the user to ensure the setup went okay. Job
// resumption doesn't have the same hack, but at the moment ignores
// results and so is currently okay. Return nil instead of anything
// meaningful so that if we start doing anything with the results
// returned by resumed jobs, then it breaks instead of returning
// nonsense.
finishedSetupFn = func() { resultsCh <- tree.Datums(nil) }
}
// Copy the evalCtx, as dsp.Run() might change it.
evalCtxCopy := *evalCtx
dsp.Run(planCtx, noTxn, p, recv, &evalCtxCopy, finishedSetupFn)()
return resultRows.Err()
}
func fetchSpansForTargets(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
targets jobspb.ChangefeedTargets,
ts hlc.Timestamp,
) ([]roachpb.Span, error) {
var spans []roachpb.Span
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
spans = nil
txn.SetFixedTimestamp(ctx, ts)
// Note that all targets are currently guaranteed to be tables.
for tableID := range targets {
tableDesc, err := catalogkv.MustGetTableDescByID(ctx, txn, codec, tableID)
if err != nil {
return err
}
spans = append(spans, tableDesc.PrimaryIndexSpan(codec))
}
return nil
})
return spans, err
}
// changefeedResultWriter implements the `rowexec.resultWriter` that sends
// the received rows back over the given channel.
type changefeedResultWriter struct {
rowsCh chan<- tree.Datums
rowsAffected int
err error
}
func makeChangefeedResultWriter(rowsCh chan<- tree.Datums) *changefeedResultWriter {
return &changefeedResultWriter{rowsCh: rowsCh}
}
func (w *changefeedResultWriter) AddRow(ctx context.Context, row tree.Datums) error {
// Copy the row because it's not guaranteed to exist after this function
// returns.
row = append(tree.Datums(nil), row...)
select {
case <-ctx.Done():
return ctx.Err()
case w.rowsCh <- row:
return nil
}
}
func (w *changefeedResultWriter) IncrementRowsAffected(n int) {
w.rowsAffected += n
}
func (w *changefeedResultWriter) SetError(err error) {
w.err = err
}
func (w *changefeedResultWriter) Err() error {
return w.err
}