forked from freeeve/cq
-
Notifications
You must be signed in to change notification settings - Fork 11
/
statement.go
143 lines (126 loc) · 3.02 KB
/
statement.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package cq
import (
"bytes"
"database/sql/driver"
"encoding/json"
"errors"
"io"
"io/ioutil"
"net/http"
"strconv"
"gopkg.in/cq.v1/types"
)
type rows struct {
stmt *cypherStmt
result *cypherResult
pos int
}
type cypherStmt struct {
c *conn
query *string
}
type cypherResult struct {
Columns []string `json:"columns"`
Data [][]types.CypherValue `json:"data"`
ErrorMessage string `json:"message"`
ErrorException string `json:"exception"`
ErrorFullname string `json:"fullname"`
ErrorStacktrace []string `json:"stacktrace"`
}
type cypherRequest struct {
Query *string `json:"query"`
Params map[string]interface{} `json:"params,omitempty"`
}
func (stmt *cypherStmt) Close() error {
stmt.query = nil
return nil
}
func (stmt *cypherStmt) Exec(args []driver.Value) (driver.Result, error) {
if stmt.c.transaction != nil {
err := stmt.c.transaction.query(stmt.query, args)
// TODO add counts and error support
return driver.RowsAffected(0), err
}
rows, err := stmt.Query(args)
if rows != nil {
defer rows.Close()
}
// TODO add counts and error support
return driver.RowsAffected(0), err
}
func (stmt *cypherStmt) NumInput() int {
return -1 // avoid sanity check
}
func (stmt *cypherStmt) Query(args []driver.Value) (driver.Rows, error) {
if stmt.c.transaction != nil {
return nil, errors.New("transactions only support Exec")
}
// this only happens outside of a transaction
cyphReq := cypherRequest{
Query: stmt.query,
Params: makeArgsMap(args),
}
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(cyphReq)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", stmt.c.cypherURL, &buf)
if err != nil {
return nil, err
}
setDefaultHeaders(req)
res, err := client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
cyphRes := cypherResult{}
err = json.NewDecoder(res.Body).Decode(&cyphRes)
io.Copy(ioutil.Discard, res.Body)
res.Body.Close()
if err != nil {
return nil, err
}
if cyphRes.ErrorMessage != "" {
return nil, errors.New("Cypher error: " + cyphRes.ErrorMessage)
}
return &rows{stmt, &cyphRes, 0}, nil
}
func (rs *rows) Close() error {
rs.result = nil
return nil
}
func (rs *rows) Columns() []string {
return rs.result.Columns
}
func (rs *rows) Next(dest []driver.Value) error {
// TODO handle transaction
if len(rs.result.Data) <= rs.pos {
return io.EOF
}
for i := 0; i < len(dest); i++ {
dest[i] = rs.result.Data[rs.pos][i].Val
}
rs.pos++
return nil
}
func makeArgsMap(args []driver.Value) map[string]interface{} {
argsmap := make(map[string]interface{})
for idx, e := range args {
switch e.(type) {
case []byte:
cv := types.CypherValue{}
err := json.Unmarshal(e.([]byte), &cv)
if err == nil {
argsmap[strconv.Itoa(idx)] = cv.Val
continue
}
}
argsmap[strconv.Itoa(idx)] = e
}
return argsmap
}
func (cs cypherStmt) ColumnConverter(idx int) driver.ValueConverter {
return types.CypherValue{}
}