diff --git a/README.md b/README.md index 2143eb4..adf591c 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,17 @@ # chiv +[![Codeship Status for gavincabbage/chiv](https://app.codeship.com/projects/d63650d0-31a3-0137-a334-52c4eded8102/status?branch=master)](https://app.codeship.com/projects/332043) +[![Go Report](https://goreportcard.com/badge/github.com/gavincabbage/chiv)](https://goreportcard.com/report/github.com/gavincabbage/chiv) +[![GoDoc](https://godoc.org/github.com/gavincabbage/chiv?status.svg)](https://godoc.org/github.com/gavincabbage/chiv) +[![License](http://img.shields.io/badge/License-MIT-blue.svg)](https://github.com/gavincabbage/chiv/blob/master/LICENSE) + Archive arbitrarily large relational database tables to Amazon S3. +## Example + +TODO + +## CLI + +TODO diff --git a/chiv.go b/chiv.go index 369d1d2..aca5746 100644 --- a/chiv.go +++ b/chiv.go @@ -12,14 +12,18 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) +// Archiver archives arbitrarily large relational database tables to Amazon S3. It contains a database connection +// and upload client. Options set on creation apply to all calls to Archive unless overridden. type Archiver struct { db *sql.DB s3 *s3manager.Uploader - key string format Format + key string + null []byte } const ( + // DefaultFormat is CSV. DefaultFormat = CSV ) @@ -66,106 +70,115 @@ type archiver struct { config *Archiver } -func (a *archiver) archive(table, bucket string) error { +func (a *archiver) archive(table string, bucket string) error { errs := make(chan error) r, w := io.Pipe() defer r.Close() defer w.Close() - go func() { - cw := csv.NewWriter(w) - - selectAll := fmt.Sprintf(`select * from "%s";`, table) - rows, err := a.db.QueryContext(a.ctx, selectAll) - if err != nil { - errs <- err - return - } - defer rows.Close() + go a.download(w, table, errs) + go a.upload(r, table, bucket, errs) - columns, err := rows.Columns() - if err != nil { - errs <- err - return - } + select { + case err := <-errs: + return err + case <-a.ctx.Done(): + return nil + } +} - if err := cw.Write(columns); err != nil { - errs <- err - return - } +func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) { + var w formatter + switch a.config.format { + case YAML: + w = &yamlFormatter{} + case JSON: + w = &jsonFormatter{w: wc} + default: + w = &csvFormatter{w: csv.NewWriter(wc)} + } - var ( - rawBytes = make([]sql.RawBytes, len(columns)) - record = make([]string, len(columns)) - dest = make([]interface{}, len(columns)) - ) - for i := range rawBytes { - dest[i] = &rawBytes[i] - } + selectAll := fmt.Sprintf(`select * from "%s";`, table) + rows, err := a.db.QueryContext(a.ctx, selectAll) + if err != nil { + errs <- err + return + } + defer rows.Close() - for rows.Next() { - err = rows.Scan(dest...) - if err != nil { - errs <- err - return - } + columns, err := rows.ColumnTypes() + if err != nil { + errs <- err + return + } - for i, raw := range rawBytes { - if raw == nil { - record[i] = "\\N" - } else { - record[i] = string(raw) - } - } + if err := w.Begin(columns); err != nil { + errs <- err + return + } - if err := cw.Write(record); err != nil { - errs <- err - return - } - } + var ( + rawBytes = make([]sql.RawBytes, len(columns)) + record = make([]interface{}, len(columns)) + ) + for i := range rawBytes { + record[i] = &rawBytes[i] + } - if err := rows.Err(); err != nil { + for rows.Next() { + err = rows.Scan(record...) + if err != nil { errs <- err return } - cw.Flush() - if err := cw.Error(); err != nil { - errs <- err - return + for i, raw := range rawBytes { + if raw == nil && a.config.null != nil { + rawBytes[i] = a.config.null + } } - if err := w.Close(); err != nil { + if err := w.Write(rawBytes); err != nil { errs <- err return } - }() - - go func() { - if a.config.key == "" { - switch a.config.format { - case CSV: - a.config.key = fmt.Sprintf("%s.csv", table) - case JSON: - a.config.key = fmt.Sprintf("%s.json", table) - } - } + } - if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{ - Body: r, - Bucket: aws.String(bucket), - Key: aws.String(a.config.key), - }); err != nil { - errs <- err - } + if err := rows.Err(); err != nil { + errs <- err + return + } - errs <- nil - }() + if err := w.End(); err != nil { + errs <- err + return + } - select { - case err := <-errs: - return err - case <-a.ctx.Done(): - return nil + if err := wc.Close(); err != nil { + errs <- err + return } } + +func (a *archiver) upload(r io.Reader, table string, bucket string, errs chan error) { + if a.config.key == "" { + switch a.config.format { + case YAML: + a.config.key = fmt.Sprintf("%s.yml", table) + case JSON: + a.config.key = fmt.Sprintf("%s.json", table) + default: + a.config.key = fmt.Sprintf("%s.csv", table) + } + } + + if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{ + Body: r, + Bucket: aws.String(bucket), + Key: aws.String(a.config.key), + }); err != nil { + errs <- err + } + + errs <- nil +} diff --git a/chiv_test.go b/chiv_test.go index c4e8d50..9e5e78f 100644 --- a/chiv_test.go +++ b/chiv_test.go @@ -38,26 +38,54 @@ func TestArchiver_Archive(t *testing.T) { name: "postgres to csv", driver: "postgres", database: os.Getenv("POSTGRES_URL"), - setup: "./testdata/postgres_to_csv_setup.sql", - teardown: "./testdata/postgres_to_csv_teardown.sql", - expected: "./testdata/postgres_to_csv.csv", - bucket: "postgres_to_csv_bucket", - table: "postgres_to_csv_table", - key: "postgres_to_csv_table.csv", + setup: "./testdata/postgres_setup.sql", + teardown: "./testdata/postgres_teardown.sql", + expected: "./testdata/postgres.csv", + bucket: "postgres_bucket", + table: "postgres_table", + key: "postgres_table.csv", options: []chiv.Option{}, }, { name: "postgres to csv key override", driver: "postgres", database: os.Getenv("POSTGRES_URL"), - setup: "./testdata/postgres_to_csv_setup.sql", - teardown: "./testdata/postgres_to_csv_teardown.sql", - expected: "./testdata/postgres_to_csv.csv", - bucket: "postgres_to_csv_bucket", - table: "postgres_to_csv_table", - key: "postgres_to_csv_custom_key", + setup: "./testdata/postgres_setup.sql", + teardown: "./testdata/postgres_teardown.sql", + expected: "./testdata/postgres.csv", + bucket: "postgres_bucket", + table: "postgres_table", + key: "postgres_custom_key", options: []chiv.Option{ - chiv.WithKey("postgres_to_csv_custom_key"), + chiv.WithKey("postgres_custom_key"), + }, + }, + { + name: "postgres to csv null override", + driver: "postgres", + database: os.Getenv("POSTGRES_URL"), + setup: "./testdata/postgres_setup.sql", + teardown: "./testdata/postgres_teardown.sql", + expected: "./testdata/postgres_with_null.csv", + bucket: "postgres_bucket", + table: "postgres_table", + key: "postgres_table.csv", + options: []chiv.Option{ + chiv.WithNull("custom_null"), + }, + }, + { + name: "postgres to json", + driver: "postgres", + database: os.Getenv("POSTGRES_URL"), + setup: "./testdata/postgres_setup.sql", + teardown: "./testdata/postgres_teardown.sql", + expected: "./testdata/postgres.json", + bucket: "postgres_bucket", + table: "postgres_table", + key: "postgres_table.json", + options: []chiv.Option{ + chiv.WithFormat(chiv.JSON), }, }, } @@ -82,8 +110,7 @@ func TestArchiver_Archive(t *testing.T) { require.NoError(t, subject.Archive(test.table, test.bucket, test.options...)) - n, actual := download(t, downloader, test.bucket, test.key) - require.Equal(t, len([]byte(expected)), n) + actual := download(t, downloader, test.bucket, test.key) require.Equal(t, expected, actual) }) } @@ -142,9 +169,9 @@ func readFile(t *testing.T, path string) string { return string(contents) } -func download(t *testing.T, downloader *s3manager.Downloader, bucket string, key string) (int, string) { +func download(t *testing.T, downloader *s3manager.Downloader, bucket string, key string) string { b := &aws.WriteAtBuffer{} - n, err := downloader.Download(b, &s3.GetObjectInput{ + _, err := downloader.Download(b, &s3.GetObjectInput{ Bucket: aws.String(bucket), Key: aws.String(key), }) @@ -152,5 +179,5 @@ func download(t *testing.T, downloader *s3manager.Downloader, bucket string, key t.Error(err) } - return int(n), string(b.Bytes()) + return string(b.Bytes()) } diff --git a/formatters.go b/formatters.go new file mode 100644 index 0000000..072ee36 --- /dev/null +++ b/formatters.go @@ -0,0 +1,170 @@ +package chiv + +import ( + "database/sql" + "encoding/csv" + "encoding/json" + "errors" + "io" + "regexp" + "strconv" +) + +const ( + openBracket = byte('[') + closeBracket = byte(']') + comma = byte(',') +) + +var ( + ErrRecordLength = errors.New("record length does not match number of keys") +) + +type formatter interface { + Begin([]*sql.ColumnType) error + Write([]sql.RawBytes) error + End() error +} + +// csvFormatter formats columns in CSV format. +type csvFormatter struct { + w *csv.Writer + count int +} + +func (c *csvFormatter) Begin(columns []*sql.ColumnType) error { + c.count = len(columns) + + header := make([]string, 0, c.count) + for _, column := range columns { + header = append(header, column.Name()) + } + + return c.w.Write(header) +} + +func (c *csvFormatter) Write(record []sql.RawBytes) error { + if c.count != len(record) { + return ErrRecordLength + } + + strings := make([]string, c.count) + for i, item := range record { + strings[i] = string(item) + } + + return c.w.Write(strings) +} + +func (c *csvFormatter) End() error { + c.w.Flush() + return c.w.Error() +} + +// yamlFormatter formats columns in YAML format. +type yamlFormatter struct { + columns []*sql.ColumnType +} + +func (c *yamlFormatter) Begin(columns []*sql.ColumnType) error { + return nil +} + +func (c *yamlFormatter) Write(record []sql.RawBytes) error { + if len(c.columns) != len(record) { + return ErrRecordLength + } + + return nil +} + +func (c *yamlFormatter) End() error { + return nil +} + +// jsonFormatter formats columns in JSON format. +type jsonFormatter struct { + w io.Writer + columns []*sql.ColumnType + notFirst bool +} + +func (c *jsonFormatter) Begin(columns []*sql.ColumnType) error { + c.columns = columns + return writeByte(c.w, openBracket) +} + +func (c *jsonFormatter) Write(record []sql.RawBytes) error { + if len(c.columns) != len(record) { + return ErrRecordLength + } + + m := make(map[string]interface{}) + for i, column := range c.columns { + r, err := parse(record[i], c.columns[i].DatabaseTypeName()) + if err != nil { + return err + } + m[column.Name()] = r + } + + b, err := json.Marshal(m) + if err != nil { + return err + } + + if c.notFirst { + err := writeByte(c.w, comma) + if err != nil { + return err + } + } + + n, err := c.w.Write(b) + if err != nil { + return err + } else if n != len(b) { + return io.ErrShortWrite + } + + c.notFirst = true + return nil +} + +func (c *jsonFormatter) End() error { + return writeByte(c.w, closeBracket) +} + +func writeByte(w io.Writer, b byte) error { + n, err := w.Write([]byte{b}) + if err != nil { + return err + } else if n != 1 { + return io.ErrShortWrite + } + + return nil +} + +func parse(b sql.RawBytes, t string) (interface{}, error) { + if b == nil { + return nil, nil + } + + var ( + s = string(b) + boolRegex = regexp.MustCompile("BOOL*") + intRegex = regexp.MustCompile("INT*") + decimalRegex = regexp.MustCompile("DECIMAL*") + ) + switch { + case boolRegex.MatchString(t): + return strconv.ParseBool(s) + case intRegex.MatchString(t): + return strconv.Atoi(s) + case decimalRegex.MatchString(t): + return strconv.ParseFloat(s, 64) + default: + return s, nil + } +} diff --git a/options.go b/options.go index 9d0c3d3..86935d4 100644 --- a/options.go +++ b/options.go @@ -1,22 +1,37 @@ package chiv +// Option configures the Archiver. Options can be provided when creating an Archiver or on each call to Archive. type Option func(*Archiver) +// Format uploaded to S3. type Format int const ( + // CSV file format. CSV Format = iota + // YAML file format. + YAML + // JSON file format. JSON ) +// WithFormat configures the upload format. func WithFormat(f Format) Option { return func(a *Archiver) { a.format = f } } -func WithKey(k string) Option { +// WithKey configures the upload object key in S3. +func WithKey(s string) Option { return func(a *Archiver) { - a.key = k + a.key = s + } +} + +// WithNull configures a custom null string. +func WithNull(s string) Option { + return func(a *Archiver) { + a.null = []byte(s) } } diff --git a/testdata/postgres_to_csv.csv b/testdata/postgres.csv similarity index 74% rename from testdata/postgres_to_csv.csv rename to testdata/postgres.csv index ed88dfe..93013d1 100644 --- a/testdata/postgres_to_csv.csv +++ b/testdata/postgres.csv @@ -1,3 +1,4 @@ id,text_column,char_column,int_column,bool_column,ts_column ea09d13c-f441-4550-9492-115f8b409c96,some text,some chars,42,true,2018-01-04T00:00:00Z +4289a9e3-32d5-4bad-b79b-034c528e8f41,some other text,,100,true,2018-02-04T00:00:00Z 7530a381-526a-42aa-a9ba-97fb2bca283f,some more text,some more chars,101,false,2018-02-05T00:00:00Z diff --git a/testdata/postgres.json b/testdata/postgres.json new file mode 100644 index 0000000..b6d8a20 --- /dev/null +++ b/testdata/postgres.json @@ -0,0 +1 @@ +[{"bool_column":true,"char_column":"some chars","id":"ea09d13c-f441-4550-9492-115f8b409c96","int_column":42,"text_column":"some text","ts_column":"2018-01-04T00:00:00Z"},{"bool_column":true,"char_column":null,"id":"4289a9e3-32d5-4bad-b79b-034c528e8f41","int_column":100,"text_column":"some other text","ts_column":"2018-02-04T00:00:00Z"},{"bool_column":false,"char_column":"some more chars","id":"7530a381-526a-42aa-a9ba-97fb2bca283f","int_column":101,"text_column":"some more text","ts_column":"2018-02-05T00:00:00Z"}] \ No newline at end of file diff --git a/testdata/postgres_to_csv_setup.sql b/testdata/postgres_setup.sql similarity index 57% rename from testdata/postgres_to_csv_setup.sql rename to testdata/postgres_setup.sql index e0fab46..2f4ebb2 100644 --- a/testdata/postgres_to_csv_setup.sql +++ b/testdata/postgres_setup.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS "postgres_to_csv_table" ( +CREATE TABLE IF NOT EXISTS "postgres_table" ( id UUID PRIMARY KEY, text_column TEXT, char_column VARCHAR(50), @@ -7,7 +7,7 @@ CREATE TABLE IF NOT EXISTS "postgres_to_csv_table" ( ts_column TIMESTAMP ); -INSERT INTO "postgres_to_csv_table" VALUES ( +INSERT INTO "postgres_table" VALUES ( 'ea09d13c-f441-4550-9492-115f8b409c96', 'some text', 'some chars', @@ -16,7 +16,16 @@ INSERT INTO "postgres_to_csv_table" VALUES ( '2018-01-04'::timestamp ); -INSERT INTO "postgres_to_csv_table" VALUES ( +INSERT INTO "postgres_table" VALUES ( + '4289a9e3-32d5-4bad-b79b-034c528e8f41', + 'some other text', + null, + 100, + true, + '2018-02-04'::timestamp + ); + +INSERT INTO "postgres_table" VALUES ( '7530a381-526a-42aa-a9ba-97fb2bca283f', 'some more text', 'some more chars', diff --git a/testdata/postgres_teardown.sql b/testdata/postgres_teardown.sql new file mode 100644 index 0000000..4b72634 --- /dev/null +++ b/testdata/postgres_teardown.sql @@ -0,0 +1 @@ +DROP TABLE "postgres_table"; \ No newline at end of file diff --git a/testdata/postgres_to_csv_teardown.sql b/testdata/postgres_to_csv_teardown.sql deleted file mode 100644 index 68e61e7..0000000 --- a/testdata/postgres_to_csv_teardown.sql +++ /dev/null @@ -1 +0,0 @@ -DROP TABLE "postgres_to_csv_table"; \ No newline at end of file diff --git a/testdata/postgres_with_null.csv b/testdata/postgres_with_null.csv new file mode 100644 index 0000000..daaddfa --- /dev/null +++ b/testdata/postgres_with_null.csv @@ -0,0 +1,4 @@ +id,text_column,char_column,int_column,bool_column,ts_column +ea09d13c-f441-4550-9492-115f8b409c96,some text,some chars,42,true,2018-01-04T00:00:00Z +4289a9e3-32d5-4bad-b79b-034c528e8f41,some other text,custom_null,100,true,2018-02-04T00:00:00Z +7530a381-526a-42aa-a9ba-97fb2bca283f,some more text,some more chars,101,false,2018-02-05T00:00:00Z