Skip to content

Commit

Permalink
formatterfunc and yaml parser
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincabbage committed Apr 14, 2019
1 parent 18fb791 commit 5fe7c30
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 157 deletions.
95 changes: 40 additions & 55 deletions chiv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,87 +17,69 @@ var (
DefaultFormatFunc = CSV
// ErrRecordLength does not match the number of columns.
ErrRecordLength = errors.New("record length does not match number of columns")
// ErrParserRegex initialization problem.
ErrParserRegex = errors.New("initializing parser regex")
)

// Archiver archives arbitrarily large relational database tables to Amazon S3. It contains a database connection
// and upload client. Options set on creation apply to all calls to Archive unless overridden.
type Archiver struct {
db *sql.DB
s3 *s3manager.Uploader
config config
}

type config struct {
format FormatterFunc
key string
null []byte
type archiver struct {
db *sql.DB
s3 *s3manager.Uploader
format FormatterFunc
key string
extension string
null []byte
}

// NewArchiver constructs an Archiver with the given database connection, S3 uploader and options.
func NewArchiver(db *sql.DB, s3 *s3manager.Uploader, options ...Option) *Archiver {
a := Archiver{
db: db,
s3: s3,
config: config{
format: DefaultFormatFunc,
},
// NewArchiver constructs an archiver with the given database, S3 uploader and options. Options set on
// creation apply to all calls to Archive unless overridden.
func NewArchiver(db *sql.DB, s3 *s3manager.Uploader, options ...Option) *archiver {
a := archiver{
db: db,
s3: s3,
format: DefaultFormatFunc,
}

for _, option := range options {
option(&a.config)
option(&a)
}

return &a
}

// Archive a database table to S3.
func (a *Archiver) Archive(table, bucket string, options ...Option) error {
func (a *archiver) Archive(table, bucket string, options ...Option) error {
return a.ArchiveWithContext(context.Background(), table, bucket, options...)
}

// ArchiveWithContext is like Archive, with context.
func (a *Archiver) ArchiveWithContext(ctx context.Context, table, bucket string, options ...Option) error {
archiver := archiver{
db: a.db,
s3: a.s3,
ctx: ctx,
config: a.config,
}

func (a *archiver) ArchiveWithContext(ctx context.Context, table, bucket string, options ...Option) error {
for _, option := range options {
option(&archiver.config)
option(a)
}

return archiver.archive(table, bucket)
return a.archive(ctx, table, bucket)
}

type archiver struct {
db *sql.DB
s3 *s3manager.Uploader
ctx context.Context
config config
}

func (a *archiver) archive(table string, bucket string) error {
func (a *archiver) archive(ctx context.Context, table string, bucket string) error {
errs := make(chan error)
r, w := io.Pipe()
defer r.Close()
defer w.Close()

go a.download(w, table, errs)
go a.upload(r, table, bucket, errs)
go a.download(ctx, w, table, errs)
go a.upload(ctx, r, table, bucket, errs)

select {
case err := <-errs:
return err
case <-a.ctx.Done():
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
func (a *archiver) download(ctx context.Context, wc io.WriteCloser, table string, errs chan error) {
selectAll := fmt.Sprintf(`select * from "%s";`, table)
rows, err := a.db.QueryContext(a.ctx, selectAll)
rows, err := a.db.QueryContext(ctx, selectAll)
if err != nil {
errs <- err
return
Expand All @@ -110,7 +92,7 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
return
}

f, err := a.config.format(wc, columns)
f, err := a.format(wc, columns)
if err != nil {
errs <- err
return
Expand All @@ -133,8 +115,8 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
}

for i, raw := range rawBytes {
if raw == nil && a.config.null != nil {
record[i] = a.config.null
if raw == nil && a.null != nil {
record[i] = a.null
} else {
record[i] = raw
}
Expand Down Expand Up @@ -162,16 +144,19 @@ func (a *archiver) download(wc io.WriteCloser, table string, errs chan error) {
}
}

func (a *archiver) upload(r io.Reader, table string, bucket string, errs chan error) {
if a.config.key == "" {
// TODO if a.config.extension or something? can pass in '.json'? wish i could connect to formatter hm
a.config.key = table
func (a *archiver) upload(ctx context.Context, r io.Reader, table string, bucket string, errs chan error) {
if a.key == "" {
if a.extension != "" {
a.key = fmt.Sprintf("%s.%s", table, a.extension)
} else {
a.key = table
}
}

if _, err := a.s3.UploadWithContext(a.ctx, &s3manager.UploadInput{
if _, err := a.s3.UploadWithContext(ctx, &s3manager.UploadInput{
Body: r,
Bucket: aws.String(bucket),
Key: aws.String(a.config.key),
Key: aws.String(a.key),
}); err != nil {
errs <- err
}
Expand Down
Loading

0 comments on commit 5fe7c30

Please sign in to comment.