forked from lukasmartinelli/pgfutter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
json.go
149 lines (126 loc) · 3.19 KB
/
json.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
)
// Try to JSON decode the bytes
func tryUnmarshal(b []byte) error {
var v interface{}
err := json.Unmarshal(b, &v)
return err
}
//Copy JSON Rows and return list of errors
func copyJSONRows(i *Import, reader *bufio.Reader, ignoreErrors bool) (error, int, int) {
success := 0
failed := 0
for {
// ReadBytes instead of a Scanner because it can deal with very long lines
// which happens often with big JSON objects
line, err := reader.ReadBytes('\n')
if err == io.EOF {
err = nil
break
}
if err != nil {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
return err, success, failed
}
err = tryUnmarshal(line)
if err != nil {
failed++
if ignoreErrors {
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
return err, success, failed
}
}
err = i.AddRow(string(line))
if err != nil {
failed++
if ignoreErrors {
os.Stderr.WriteString(string(line))
continue
} else {
err = errors.New(fmt.Sprintf("%s: %s", err, line))
return err, success, failed
}
}
success++
}
return nil, success, failed
}
func importJSONObject(filename string, connStr string, schema string, tableName string, dataType string) error {
db, err := connect(connStr, schema)
if err != nil {
return err
}
defer db.Close()
// The entire file is read into memory because we need to add
// it into the PostgreSQL transaction, this will hit memory limits
// for big JSON objects
var bytes []byte
if filename == "" {
bytes, err = ioutil.ReadAll(os.Stdin)
} else {
bytes, err = ioutil.ReadFile(filename)
}
if err != nil {
return err
}
i, err := NewJSONImport(db, schema, tableName, "data", dataType)
if err != nil {
return err
}
// The JSON file is not validated at client side
// it is just copied into the database
// If the JSON file is corrupt PostgreSQL will complain when querying
err = i.AddRow(string(bytes))
if err != nil {
return err
}
return i.Commit()
}
func importJSON(filename string, connStr string, schema string, tableName string, ignoreErrors bool, dataType string) error {
db, err := connect(connStr, schema)
if err != nil {
return err
}
defer db.Close()
i, err := NewJSONImport(db, schema, tableName, "data", dataType)
if err != nil {
return err
}
var success, failed int
if filename == "" {
reader := bufio.NewReader(os.Stdin)
err, success, failed = copyJSONRows(i, reader, ignoreErrors)
} else {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
bar := NewProgressBar(file)
reader := bufio.NewReader(io.TeeReader(file, bar))
bar.Start()
err, success, failed = copyJSONRows(i, reader, ignoreErrors)
bar.Finish()
}
if err != nil {
lineNumber := success + failed
return errors.New(fmt.Sprintf("line %d: %s", lineNumber, err))
} else {
fmt.Println(fmt.Sprintf("%d rows imported into %s.%s", success, schema, tableName))
if ignoreErrors && failed > 0 {
fmt.Println(fmt.Sprintf("%d rows could not be imported into %s.%s and have been written to stderr.", failed, schema, tableName))
}
return i.Commit()
}
}