forked from gocraft/dbr
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dbr.go
216 lines (190 loc) · 5.43 KB
/
dbr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
// Package dbr provides additions to Go's database/sql for super fast performance and convenience.
package dbr
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/gocraft/dbr/v2/dialect"
)
// Open creates a Connection.
// log can be nil to ignore logging.
func Open(driver, dsn string, log EventReceiver) (*Connection, error) {
if log == nil {
log = nullReceiver
}
conn, err := sql.Open(driver, dsn)
if err != nil {
return nil, err
}
var d Dialect
switch driver {
case "mysql":
d = dialect.MySQL
case "postgres", "pgx":
d = dialect.PostgreSQL
case "sqlite3":
d = dialect.SQLite3
default:
return nil, ErrNotSupported
}
return &Connection{DB: conn, EventReceiver: log, Dialect: d}, nil
}
const (
placeholder = "?"
)
// Connection wraps sql.DB with an EventReceiver
// to send events, errors, and timings.
type Connection struct {
*sql.DB
Dialect
EventReceiver
}
// Session represents a business unit of execution.
//
// All queries in gocraft/dbr are made in the context of a session.
// This is because when instrumenting your app, it's important
// to understand which business action the query took place in.
//
// A custom EventReceiver can be set.
//
// Timeout specifies max duration for an operation like Select.
type Session struct {
*Connection
EventReceiver
Timeout time.Duration
}
// GetTimeout returns current timeout enforced in session.
func (sess *Session) GetTimeout() time.Duration {
return sess.Timeout
}
// NewSession instantiates a Session from Connection.
// If log is nil, Connection EventReceiver is used.
func (conn *Connection) NewSession(log EventReceiver) *Session {
if log == nil {
log = conn.EventReceiver // Use parent instrumentation
}
return &Session{Connection: conn, EventReceiver: log}
}
// Ensure that tx and session are session runner
var (
_ SessionRunner = (*Tx)(nil)
_ SessionRunner = (*Session)(nil)
)
// SessionRunner can do anything that a Session can except start a transaction.
// Both Session and Tx implements this interface.
type SessionRunner interface {
Select(column ...string) *SelectBuilder
SelectBySql(query string, value ...interface{}) *SelectBuilder
InsertInto(table string) *InsertBuilder
InsertBySql(query string, value ...interface{}) *InsertBuilder
Update(table string) *UpdateBuilder
UpdateBySql(query string, value ...interface{}) *UpdateBuilder
DeleteFrom(table string) *DeleteBuilder
DeleteBySql(query string, value ...interface{}) *DeleteBuilder
}
type runner interface {
GetTimeout() time.Duration
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
func exec(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect) (sql.Result, error) {
timeout := runner.GetTimeout()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.encodePlaceholder(builder, true)
query, value := i.String(), i.Value()
if err != nil {
return nil, log.EventErrKv("dbr.exec.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.exec", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
traceImpl, hasTracingImpl := log.(TracingEventReceiver)
if hasTracingImpl {
ctx = traceImpl.SpanStart(ctx, "dbr.exec", query)
defer traceImpl.SpanFinish(ctx)
}
result, err := runner.ExecContext(ctx, query, value...)
if err != nil {
if hasTracingImpl {
traceImpl.SpanError(ctx, err)
}
return result, log.EventErrKv("dbr.exec.exec", err, kvs{
"sql": query,
})
}
return result, nil
}
func queryRows(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect) (string, *sql.Rows, error) {
// discard the timeout set in the runner, the context should not be canceled
// implicitly here but explicitly by the caller since the returned *sql.Rows
// may still listening to the context
i := interpolator{
Buffer: NewBuffer(),
Dialect: d,
IgnoreBinary: true,
}
err := i.encodePlaceholder(builder, true)
query, value := i.String(), i.Value()
if err != nil {
return query, nil, log.EventErrKv("dbr.select.interpolate", err, kvs{
"sql": query,
"args": fmt.Sprint(value),
})
}
startTime := time.Now()
defer func() {
log.TimingKv("dbr.select", time.Since(startTime).Nanoseconds(), kvs{
"sql": query,
})
}()
traceImpl, hasTracingImpl := log.(TracingEventReceiver)
if hasTracingImpl {
ctx = traceImpl.SpanStart(ctx, "dbr.select", query)
defer traceImpl.SpanFinish(ctx)
}
rows, err := runner.QueryContext(ctx, query, value...)
if err != nil {
if hasTracingImpl {
traceImpl.SpanError(ctx, err)
}
return query, nil, log.EventErrKv("dbr.select.load.query", err, kvs{
"sql": query,
})
}
return query, rows, nil
}
func query(ctx context.Context, runner runner, log EventReceiver, builder Builder, d Dialect, dest interface{}) (int, error) {
timeout := runner.GetTimeout()
if timeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
query, rows, err := queryRows(ctx, runner, log, builder, d)
if err != nil {
return 0, err
}
count, err := Load(rows, dest)
if err != nil {
return 0, log.EventErrKv("dbr.select.load.scan", err, kvs{
"sql": query,
})
}
return count, nil
}