Skip to content

Commit

Permalink
Merge pull request #6 from gavincabbage/format
Browse files Browse the repository at this point in the history
formatter func and yaml parser
  • Loading branch information
gavincabbage authored Apr 14, 2019
2 parents 145bceb + 5fe7c30 commit 7f82240
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 205 deletions.
120 changes: 46 additions & 74 deletions chiv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,39 @@ package chiv
import (
"context"
"database/sql"
"encoding/csv"
"errors"
"fmt"
"io"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

// Archiver archives arbitrarily large relational database tables to Amazon S3. It contains a database connection
// and upload client. Options set on creation apply to all calls to Archive unless overridden.
type Archiver struct {
db *sql.DB
s3 *s3manager.Uploader
format Format
key string
null []byte
}

const (
var (
// DefaultFormat is CSV.
DefaultFormat = CSV
DefaultFormatFunc = CSV
// ErrRecordLength does not match the number of columns.
ErrRecordLength = errors.New("record length does not match number of columns")
// ErrParserRegex initialization problem.
ErrParserRegex = errors.New("initializing parser regex")
)

// NewArchiver constructs an Archiver with the given database connection, S3 uploader and options.
func NewArchiver(db *sql.DB, s3 *s3manager.Uploader, options ...Option) *Archiver {
a := Archiver{
type archiver struct {
db *sql.DB
s3 *s3manager.Uploader
format FormatterFunc
key string
extension string
null []byte
}

// NewArchiver constructs an archiver with the given database, S3 uploader and options. Options set on
// creation apply to all calls to Archive unless overridden.
func NewArchiver(db *sql.DB, s3 *s3manager.Uploader, options ...Option) *archiver {
a := archiver{
db: db,
s3: s3,
format: DefaultFormat,
format: DefaultFormatFunc,
}

for _, option := range options {
Expand All @@ -43,69 +47,39 @@ func NewArchiver(db *sql.DB, s3 *s3manager.Uploader, options ...Option) *Archive
}

// Archive a database table to S3.
func (a *Archiver) Archive(table, bucket string, options ...Option) error {
func (a *archiver) Archive(table, bucket string, options ...Option) error {
return a.ArchiveWithContext(context.Background(), table, bucket, options...)
}

// ArchiveWithContext is like Archive, with context.
func (a *Archiver) ArchiveWithContext(ctx context.Context, table, bucket string, options ...Option) error {
archiver := archiver{
db: a.db,
s3: a.s3,
ctx: ctx,
config: a,
}

func (a *archiver) ArchiveWithContext(ctx context.Context, table, bucket string, options ...Option) error {
for _, option := range options {
option(archiver.config)
option(a)
}

return archiver.archive(table, bucket)
return a.archive(ctx, table, bucket)
}

type archiver struct {
db *sql.DB
s3 *s3manager.Uploader
ctx context.Context
config *Archiver
}

func (a *archiver) archive(table string, bucket string) error {
func (a *archiver) archive(ctx context.Context, table string, bucket string) error {
errs := make(chan error)
r, w := io.Pipe()
defer r.Close()
defer w.Close()

go a.download(w, table, errs)
go a.upload(r, table, bucket, errs)
go a.download(ctx, w, table, errs)
go a.upload(ctx, r, table, bucket, errs)

select {
case err := <-errs:
return err
case <-a.ctx.Done():
return nil
case <-ctx.Done():
return ctx.Err()
}
}

type formatter interface {
Begin([]*sql.ColumnType) error
Write([][]byte) error
End() error
}

func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
var w formatter
switch a.config.format {
case YAML:
w = &yamlFormatter{}
case JSON:
w = &jsonFormatter{w: wc}
default:
w = &csvFormatter{w: csv.NewWriter(wc)}
}

func (a *archiver) download(ctx context.Context, wc io.WriteCloser, table string, errs chan error) {
selectAll := fmt.Sprintf(`select * from "%s";`, table)
rows, err := a.db.QueryContext(a.ctx, selectAll)
rows, err := a.db.QueryContext(ctx, selectAll)
if err != nil {
errs <- err
return
Expand All @@ -118,7 +92,8 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
return
}

if err := w.Begin(columns); err != nil {
f, err := a.format(wc, columns)
if err != nil {
errs <- err
return
}
Expand All @@ -140,14 +115,14 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
}

for i, raw := range rawBytes {
if raw == nil && a.config.null != nil {
record[i] = a.config.null
if raw == nil && a.null != nil {
record[i] = a.null
} else {
record[i] = raw
}
}

if err := w.Write(record); err != nil {
if err := f.Format(record); err != nil {
errs <- err
return
}
Expand All @@ -158,7 +133,7 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
return
}

if err := w.End(); err != nil {
if err := f.Close(); err != nil {
errs <- err
return
}
Expand All @@ -169,22 +144,19 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
}
}

func (a *archiver) upload(r io.Reader, table string, bucket string, errs chan error) {
if a.config.key == "" {
switch a.config.format {
case YAML:
a.config.key = fmt.Sprintf("%s.yml", table)
case JSON:
a.config.key = fmt.Sprintf("%s.json", table)
default:
a.config.key = fmt.Sprintf("%s.csv", table)
func (a *archiver) upload(ctx context.Context, r io.Reader, table string, bucket string, errs chan error) {
if a.key == "" {
if a.extension != "" {
a.key = fmt.Sprintf("%s.%s", table, a.extension)
} else {
a.key = table
}
}

if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{
if _, err := a.s3.UploadWithContext(ctx, &s3manager.UploadInput{
Body: r,
Bucket: aws.String(bucket),
Key: aws.String(a.config.key),
Key: aws.String(a.key),
}); err != nil {
errs <- err
}
Expand Down
Loading

0 comments on commit 7f82240

Please sign in to comment.