Skip to content

Commit

Permalink
json formatter and null handling
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincabbage committed Mar 31, 2019
1 parent 073a91b commit 99ebb01
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 102 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# chiv

[![Codeship Status for gavincabbage/chiv](https://app.codeship.com/projects/d63650d0-31a3-0137-a334-52c4eded8102/status?branch=master)](https://app.codeship.com/projects/332043)
[![Go Report](https://goreportcard.com/badge/github.com/gavincabbage/chiv)](https://goreportcard.com/report/github.com/gavincabbage/chiv)
[![GoDoc](https://godoc.org/github.com/gavincabbage/chiv?status.svg)](https://godoc.org/github.com/gavincabbage/chiv)
[![License](http://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/gavincabbage/chiv/blob/master/LICENSE)

Archive arbitrarily large relational database tables to Amazon S3.

## Example

TODO

## CLI

TODO

169 changes: 91 additions & 78 deletions chiv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

// Archiver archives arbitrarily large relational database tables to Amazon S3. It contains a database connection
// and upload client. Options set on creation apply to all calls to Archive unless overridden.
type Archiver struct {
db *sql.DB
s3 *s3manager.Uploader
key string
format Format
key string
null []byte
}

const (
// DefaultFormat is CSV.
DefaultFormat = CSV
)

Expand Down Expand Up @@ -66,106 +70,115 @@ type archiver struct {
config *Archiver
}

func (a *archiver) archive(table, bucket string) error {
func (a *archiver) archive(table string, bucket string) error {
errs := make(chan error)
r, w := io.Pipe()
defer r.Close()
defer w.Close()

go func() {
cw := csv.NewWriter(w)

selectAll := fmt.Sprintf(`select * from "%s";`, table)
rows, err := a.db.QueryContext(a.ctx, selectAll)
if err != nil {
errs <- err
return
}
defer rows.Close()
go a.download(w, table, errs)
go a.upload(r, table, bucket, errs)

columns, err := rows.Columns()
if err != nil {
errs <- err
return
}
select {
case err := <-errs:
return err
case <-a.ctx.Done():
return nil
}
}

if err := cw.Write(columns); err != nil {
errs <- err
return
}
func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
var w formatter
switch a.config.format {
case YAML:
w = &yamlFormatter{}
case JSON:
w = &jsonFormatter{w: wc}
default:
w = &csvFormatter{w: csv.NewWriter(wc)}
}

var (
rawBytes = make([]sql.RawBytes, len(columns))
record = make([]string, len(columns))
dest = make([]interface{}, len(columns))
)
for i := range rawBytes {
dest[i] = &rawBytes[i]
}
selectAll := fmt.Sprintf(`select * from "%s";`, table)
rows, err := a.db.QueryContext(a.ctx, selectAll)
if err != nil {
errs <- err
return
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(dest...)
if err != nil {
errs <- err
return
}
columns, err := rows.ColumnTypes()
if err != nil {
errs <- err
return
}

for i, raw := range rawBytes {
if raw == nil {
record[i] = "\\N"
} else {
record[i] = string(raw)
}
}
if err := w.Begin(columns); err != nil {
errs <- err
return
}

if err := cw.Write(record); err != nil {
errs <- err
return
}
}
var (
rawBytes = make([]sql.RawBytes, len(columns))
record = make([]interface{}, len(columns))
)
for i := range rawBytes {
record[i] = &rawBytes[i]
}

if err := rows.Err(); err != nil {
for rows.Next() {
err = rows.Scan(record...)
if err != nil {
errs <- err
return
}

cw.Flush()
if err := cw.Error(); err != nil {
errs <- err
return
for i, raw := range rawBytes {
if raw == nil && a.config.null != nil {
rawBytes[i] = a.config.null
}
}

if err := w.Close(); err != nil {
if err := w.Write(rawBytes); err != nil {
errs <- err
return
}
}()

go func() {
if a.config.key == "" {
switch a.config.format {
case CSV:
a.config.key = fmt.Sprintf("%s.csv", table)
case JSON:
a.config.key = fmt.Sprintf("%s.json", table)
}
}
}

if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{
Body: r,
Bucket: aws.String(bucket),
Key: aws.String(a.config.key),
}); err != nil {
errs <- err
}
if err := rows.Err(); err != nil {
errs <- err
return
}

errs <- nil
}()
if err := w.End(); err != nil {
errs <- err
return
}

select {
case err := <-errs:
return err
case <-a.ctx.Done():
return nil
if err := wc.Close(); err != nil {
errs <- err
return
}
}

func (a *archiver) upload(r io.Reader, table string, bucket string, errs chan error) {
if a.config.key == "" {
switch a.config.format {
case YAML:
a.config.key = fmt.Sprintf("%s.yml", table)
case JSON:
a.config.key = fmt.Sprintf("%s.json", table)
default:
a.config.key = fmt.Sprintf("%s.csv", table)
}
}

if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{
Body: r,
Bucket: aws.String(bucket),
Key: aws.String(a.config.key),
}); err != nil {
errs <- err
}

errs <- nil
}
63 changes: 45 additions & 18 deletions chiv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,54 @@ func TestArchiver_Archive(t *testing.T) {
name: "postgres to csv",
driver: "postgres",
database: os.Getenv("POSTGRES_URL"),
setup: "./testdata/postgres_to_csv_setup.sql",
teardown: "./testdata/postgres_to_csv_teardown.sql",
expected: "./testdata/postgres_to_csv.csv",
bucket: "postgres_to_csv_bucket",
table: "postgres_to_csv_table",
key: "postgres_to_csv_table.csv",
setup: "./testdata/postgres_setup.sql",
teardown: "./testdata/postgres_teardown.sql",
expected: "./testdata/postgres.csv",
bucket: "postgres_bucket",
table: "postgres_table",
key: "postgres_table.csv",
options: []chiv.Option{},
},
{
name: "postgres to csv key override",
driver: "postgres",
database: os.Getenv("POSTGRES_URL"),
setup: "./testdata/postgres_to_csv_setup.sql",
teardown: "./testdata/postgres_to_csv_teardown.sql",
expected: "./testdata/postgres_to_csv.csv",
bucket: "postgres_to_csv_bucket",
table: "postgres_to_csv_table",
key: "postgres_to_csv_custom_key",
setup: "./testdata/postgres_setup.sql",
teardown: "./testdata/postgres_teardown.sql",
expected: "./testdata/postgres.csv",
bucket: "postgres_bucket",
table: "postgres_table",
key: "postgres_custom_key",
options: []chiv.Option{
chiv.WithKey("postgres_to_csv_custom_key"),
chiv.WithKey("postgres_custom_key"),
},
},
{
name: "postgres to csv null override",
driver: "postgres",
database: os.Getenv("POSTGRES_URL"),
setup: "./testdata/postgres_setup.sql",
teardown: "./testdata/postgres_teardown.sql",
expected: "./testdata/postgres_with_null.csv",
bucket: "postgres_bucket",
table: "postgres_table",
key: "postgres_table.csv",
options: []chiv.Option{
chiv.WithNull("custom_null"),
},
},
{
name: "postgres to json",
driver: "postgres",
database: os.Getenv("POSTGRES_URL"),
setup: "./testdata/postgres_setup.sql",
teardown: "./testdata/postgres_teardown.sql",
expected: "./testdata/postgres.json",
bucket: "postgres_bucket",
table: "postgres_table",
key: "postgres_table.json",
options: []chiv.Option{
chiv.WithFormat(chiv.JSON),
},
},
}
Expand All @@ -82,8 +110,7 @@ func TestArchiver_Archive(t *testing.T) {

require.NoError(t, subject.Archive(test.table, test.bucket, test.options...))

n, actual := download(t, downloader, test.bucket, test.key)
require.Equal(t, len([]byte(expected)), n)
actual := download(t, downloader, test.bucket, test.key)
require.Equal(t, expected, actual)
})
}
Expand Down Expand Up @@ -142,15 +169,15 @@ func readFile(t *testing.T, path string) string {
return string(contents)
}

func download(t *testing.T, downloader *s3manager.Downloader, bucket string, key string) (int, string) {
func download(t *testing.T, downloader *s3manager.Downloader, bucket string, key string) string {
b := &aws.WriteAtBuffer{}
n, err := downloader.Download(b, &s3.GetObjectInput{
_, err := downloader.Download(b, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
t.Error(err)
}

return int(n), string(b.Bytes())
return string(b.Bytes())
}
Loading

0 comments on commit 99ebb01

Please sign in to comment.