-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
data.go
188 lines (171 loc) · 6.1 KB
/
data.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package distsqlpb
import (
"fmt"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/pkg/errors"
)
// ConvertToColumnOrdering converts an Ordering type (as defined in data.proto)
// to a sqlbase.ColumnOrdering type.
func ConvertToColumnOrdering(specOrdering Ordering) sqlbase.ColumnOrdering {
ordering := make(sqlbase.ColumnOrdering, len(specOrdering.Columns))
for i, c := range specOrdering.Columns {
ordering[i].ColIdx = int(c.ColIdx)
if c.Direction == Ordering_Column_ASC {
ordering[i].Direction = encoding.Ascending
} else {
ordering[i].Direction = encoding.Descending
}
}
return ordering
}
// ConvertToSpecOrdering converts a sqlbase.ColumnOrdering type
// to an Ordering type (as defined in data.proto).
func ConvertToSpecOrdering(columnOrdering sqlbase.ColumnOrdering) Ordering {
return ConvertToMappedSpecOrdering(columnOrdering, nil)
}
// ConvertToMappedSpecOrdering converts a sqlbase.ColumnOrdering type
// to an Ordering type (as defined in data.proto), using the column
// indices contained in planToStreamColMap.
func ConvertToMappedSpecOrdering(
columnOrdering sqlbase.ColumnOrdering, planToStreamColMap []int,
) Ordering {
specOrdering := Ordering{}
specOrdering.Columns = make([]Ordering_Column, len(columnOrdering))
for i, c := range columnOrdering {
colIdx := c.ColIdx
if planToStreamColMap != nil {
colIdx = planToStreamColMap[c.ColIdx]
if colIdx == -1 {
panic(fmt.Sprintf("column %d in sort ordering not available", c.ColIdx))
}
}
specOrdering.Columns[i].ColIdx = uint32(colIdx)
if c.Direction == encoding.Ascending {
specOrdering.Columns[i].Direction = Ordering_Column_ASC
} else {
specOrdering.Columns[i].Direction = Ordering_Column_DESC
}
}
return specOrdering
}
// ExprFmtCtxBase produces a FmtCtx used for serializing expressions; a proper
// IndexedVar formatting function needs to be added on. It replaces placeholders
// with their values.
func ExprFmtCtxBase(evalCtx *tree.EvalContext) *tree.FmtCtx {
fmtCtx := tree.NewFmtCtx(tree.FmtCheckEquivalence)
fmtCtx.SetPlaceholderFormat(
func(fmtCtx *tree.FmtCtx, p *tree.Placeholder) {
d, err := p.Eval(evalCtx)
if err != nil {
panic(fmt.Sprintf("failed to serialize placeholder: %s", err))
}
d.Format(fmtCtx)
})
return fmtCtx
}
// Expression is the representation of a SQL expression.
// See data.proto for the corresponding proto definition. Its automatic type
// declaration is suppressed in the proto via the typedecl=false option, so that
// we can add the LocalExpr field which is not serialized. It never needs to be
// serialized because we only use it in the case where we know we won't need to
// send it, as a proto, to another machine.
type Expression struct {
// Version is unused.
Version string
// Expr, if present, is the string representation of this expression.
// SQL expressions are passed as a string, with ordinal references
// (@1, @2, @3 ..) used for "input" variables.
Expr string
// LocalExpr is an unserialized field that's used to pass expressions to local
// flows without serializing/deserializing them.
LocalExpr tree.TypedExpr
}
// Empty returns true if the expression has neither an Expr nor LocalExpr.
func (e *Expression) Empty() bool {
return e.Expr == "" && e.LocalExpr == nil
}
// String implements the Stringer interface.
func (e Expression) String() string {
if e.LocalExpr != nil {
ctx := tree.NewFmtCtx(tree.FmtCheckEquivalence)
ctx.FormatNode(e.LocalExpr)
return ctx.CloseAndGetString()
}
if e.Expr != "" {
return e.Expr
}
return "none"
}
// String implements fmt.Stringer.
func (e *Error) String() string {
if err := e.ErrorDetail(); err != nil {
return err.Error()
}
return "<nil>"
}
// NewError creates an Error from an error, to be sent on the wire. It will
// recognize certain errors and marshall them accordingly, and everything
// unrecognized is turned into a PGError with code "internal".
func NewError(err error) *Error {
// Unwrap the error, to attain the cause.
// Otherwise, Wrap() may hide the roachpb error
// from the cast attempt below.
origErr := err
err = errors.Cause(err)
if pgErr, ok := pgerror.GetPGCause(err); ok {
return &Error{Detail: &Error_PGError{PGError: pgErr}}
}
switch e := err.(type) {
case *roachpb.UnhandledRetryableError:
return &Error{Detail: &Error_RetryableTxnError{RetryableTxnError: e}}
case *roachpb.NodeUnavailableError:
// Node failures are common enough that we shouldn't fail with
// assertion errors upon them. Simply signal them in a way that
// may make sense to a client.
return &Error{
Detail: &Error_PGError{
PGError: pgerror.Newf(pgerror.CodeConnectionExceptionError, "%v", e),
},
}
default:
// Anything unrecognized is an "internal error".
return &Error{
Detail: &Error_PGError{
PGError: pgerror.AssertionFailedf(
"uncaught error: %+v", origErr)}}
}
}
// ErrorDetail returns the payload as a Go error.
func (e *Error) ErrorDetail() error {
if e == nil {
return nil
}
switch t := e.Detail.(type) {
case *Error_PGError:
return t.PGError
case *Error_RetryableTxnError:
return t.RetryableTxnError
default:
// We're receiving an error we don't know about. It's all right,
// it's still an error, just one we didn't expect. Let it go
// through. We'll pick it up in reporting.
return pgerror.AssertionFailedf("unknown error detail type: %+v", t)
}
}