-
Notifications
You must be signed in to change notification settings - Fork 2
/
connection.go
151 lines (135 loc) · 4.73 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package fireboltgosdk
import (
"context"
"database/sql/driver"
"errors"
"fmt"
)
type fireboltConnection struct {
client Client
engineUrl string
parameters map[string]string
connector *FireboltConnector
}
// Prepare returns a firebolt prepared statement
// returns an error if the connection isn't initialized or closed
func (c *fireboltConnection) Prepare(query string) (driver.Stmt, error) {
if c.client != nil && len(c.engineUrl) != 0 {
return &fireboltStmt{execer: c, queryer: c, query: query}, nil
}
return nil, errors.New("fireboltConnection isn't properly initialized")
}
// Close closes the connection, and make the fireboltConnection unusable
func (c *fireboltConnection) Close() error {
c.client = nil
c.parameters = make(map[string]string)
c.engineUrl = ""
return nil
}
// Begin is not implemented, as firebolt doesn't support transactions
func (c *fireboltConnection) Begin() (driver.Tx, error) {
return nil, fmt.Errorf("Transactions are not implemented in firebolt")
}
// ExecContext sends the query to the engine and returns empty fireboltResult
func (c *fireboltConnection) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
_, err := c.queryContextInternal(ctx, query, args, false)
return &FireboltResult{}, err
}
// QueryContext sends the query to the engine and returns fireboltRows
func (c *fireboltConnection) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
return c.queryContextInternal(ctx, query, args, true)
}
func (c *fireboltConnection) queryContextInternal(ctx context.Context, query string, args []driver.NamedValue, isMultiStatementAllowed bool) (driver.Rows, error) {
query, err := prepareStatement(query, args)
if err != nil {
return nil, ConstructNestedError("error during preparing a statement", err)
}
queries, err := SplitStatements(query)
if err != nil {
return nil, ConstructNestedError("error during splitting query", err)
}
if len(queries) > 1 && !isMultiStatementAllowed {
return nil, fmt.Errorf("multistatement is not allowed")
}
var rows fireboltRows
for _, query := range queries {
if isSetStatement, err := processSetStatement(ctx, c, query); isSetStatement {
if err == nil {
rows.response = append(rows.response, QueryResponse{})
continue
} else {
return &rows, ConstructNestedError("statement recognized as an invalid set statement", err)
}
}
if response, err := c.client.Query(ctx, c.engineUrl, query, c.parameters, connectionControl{
updateParameters: c.setParameter,
setEngineURL: c.setEngineURL,
resetParameters: c.resetParameters,
}); err != nil {
return &rows, ConstructNestedError("error during query execution", err)
} else {
rows.response = append(rows.response, *response)
}
}
return &rows, nil
}
// processSetStatement is an internal function for checking whether query is a valid set statement
// and updating set statement map of the fireboltConnection
func processSetStatement(ctx context.Context, c *fireboltConnection, query string) (bool, error) {
setKey, setValue, err := parseSetStatement(query)
if err != nil {
// if parsing of set statement returned an error, we will not handle the request as a set statement
return false, nil
}
err = validateSetStatement(setKey)
if err != nil {
return false, err
}
// combine parameters from connection and set statement
combinedParameters := make(map[string]string)
for k, v := range c.parameters {
combinedParameters[k] = v
}
combinedParameters[setKey] = setValue
_, err = c.client.Query(ctx, c.engineUrl, "SELECT 1", combinedParameters, connectionControl{
updateParameters: c.setParameter,
setEngineURL: c.setEngineURL,
resetParameters: c.resetParameters,
})
if err == nil {
c.setParameter(setKey, setValue)
return true, nil
}
return true, err
}
func (c *fireboltConnection) setParameter(key, value string) {
if c.parameters == nil {
c.parameters = make(map[string]string)
}
c.parameters[key] = value
// Cache parameter in connector as well in case connection will be recreated by the pool
if c.connector.cachedParameters == nil {
c.connector.cachedParameters = make(map[string]string)
}
c.connector.cachedParameters[key] = value
}
func (c *fireboltConnection) setEngineURL(engineUrl string) {
c.engineUrl = engineUrl
}
func (c *fireboltConnection) resetParameters() {
ignoreParameters := append(getUseParametersList(), getDisallowedParametersList()...)
if c.parameters != nil {
for k := range c.parameters {
if !contains(ignoreParameters, k) {
delete(c.parameters, k)
}
}
}
if c.connector.cachedParameters != nil {
for k := range c.connector.cachedParameters {
if !contains(ignoreParameters, k) {
delete(c.connector.cachedParameters, k)
}
}
}
}