Skip to content

Commit

Permalink
formatter open method to allow for extensioner pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincabbage committed Oct 6, 2019
1 parent 0f26c47 commit 4703e01
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 63 deletions.
48 changes: 26 additions & 22 deletions chiv.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func (a *Archiver) ArchiveWithContext(ctx context.Context, table, bucket string,

rows, err := b.query(ctx, table)
if err != nil {
return fmt.Errorf("chiv: querying '%s': %w", table, err)
return errorf("querying '%s': %w", table, err)
}
defer func() {
if e := rows.Close(); e != nil && err == nil {
err = fmt.Errorf("chiv: closing rows: %w", e)
err = errorf("closing rows: %w", e)
}
}()

Expand All @@ -119,12 +119,18 @@ func (a *Archiver) ArchiveRowsWithContext(ctx context.Context, rows Rows, bucket
}

func (a *Archiver) archive(ctx context.Context, rows Rows, table, bucket string) (err error) {
columns, err := interfaced(rows.ColumnTypes())
if err != nil {
return errorf("getting column types from rows: %w", err)
}

var (
r, w = io.Pipe()
g, gctx = errgroup.WithContext(ctx)
r, w = io.Pipe()
formatter = a.format(w, columns)
g, gctx = errgroup.WithContext(ctx)
)
g.Go(func() error {
return a.download(gctx, rows, w)
return a.download(gctx, rows, columns, formatter, w)
})
g.Go(func() error {
return a.upload(gctx, r, table, bucket)
Expand All @@ -133,21 +139,15 @@ func (a *Archiver) archive(ctx context.Context, rows Rows, table, bucket string)
return g.Wait()
}

func (a *Archiver) download(ctx context.Context, rows Rows, w io.WriteCloser) (err error) {
func (a *Archiver) download(ctx context.Context, rows Rows, columns []Column, formatter Formatter, w io.WriteCloser) (err error) {
defer func() {
if e := w.Close(); e != nil && err == nil {
err = fmt.Errorf("chiv: downloading: closing writer: %w", e)
err = errorf("downloading: closing writer: %w", e)
}
}()

columns, err := interfaced(rows.ColumnTypes())
if err != nil {
return fmt.Errorf("chiv: downloading: getting column types: %w", err)
}

formatter, err := a.format(w, columns)
if err != nil {
return fmt.Errorf("chiv: downloading: opening formatter: %w", err)
if err := formatter.Open(); err != nil {
return errorf("downloading: opening formatter: %w", err)
}

var (
Expand All @@ -166,7 +166,7 @@ func (a *Archiver) download(ctx context.Context, rows Rows, w io.WriteCloser) (e
default:
err = rows.Scan(scanned...)
if err != nil {
return fmt.Errorf("chiv: downloading: scanning row: %w", err)
return errorf("downloading: scanning row: %w", err)
}

for i, raw := range rawBytes {
Expand All @@ -178,17 +178,17 @@ func (a *Archiver) download(ctx context.Context, rows Rows, w io.WriteCloser) (e
}

if err := formatter.Format(record); err != nil {
return fmt.Errorf("chiv: downloading: formatting row: %w", err)
return errorf("downloading: formatting row: %w", err)
}
}
}

if err := rows.Err(); err != nil {
return fmt.Errorf("chiv: downloading: scanning rows: %w", err)
return errorf("downloading: scanning rows: %w", err)
}

if err := formatter.Close(); err != nil {
return fmt.Errorf("chiv: downloading: closing formatter: %w", err)
return errorf("downloading: closing formatter: %w", err)
}

return nil
Expand All @@ -207,14 +207,14 @@ func (a *Archiver) query(ctx context.Context, table string) (*sql.Rows, error) {
columns = b.String()
}

query := fmt.Sprintf(`select %s from %s;`, columns, table)
query := fmt.Sprintf(`SELECT %s FROM %s;`, columns, table)
return a.db.QueryContext(ctx, query)
}

func (a *Archiver) upload(ctx context.Context, r io.ReadCloser, table string, bucket string) (err error) {
defer func() {
if e := r.Close(); e != nil && err == nil {
err = fmt.Errorf("chiv: uploading: closing reader: %w", e)
err = errorf("uploading: closing reader: %w", e)
}
}()

Expand All @@ -231,7 +231,7 @@ func (a *Archiver) upload(ctx context.Context, r io.ReadCloser, table string, bu
Bucket: aws.String(bucket),
Key: aws.String(a.key),
}); err != nil {
return fmt.Errorf("chiv: uploading: %w", err)
return errorf("uploading: %w", err)
}

return nil
Expand All @@ -245,3 +245,7 @@ func interfaced(in []*sql.ColumnType, err error) ([]Column, error) {

return out, err
}

func errorf(format string, args ...interface{}) error {
return fmt.Errorf("chiv: "+format, args...)
}
68 changes: 43 additions & 25 deletions chiv_formatters.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,66 @@ import (
"gopkg.in/yaml.v2"
)

// Column reports its name, database type name and scan type.
type Column interface {
DatabaseTypeName() string
Name() string
DatabaseTypeName() string
ScanType() reflect.Type
}

// FormatterFunc returns an initialized Formatter.
type FormatterFunc func(io.Writer, []Column) (Formatter, error)
type FormatterFunc func(io.Writer, []Column) Formatter

// Formatter formats and writes records.
// Formatter formats and writes records. A custom Formatter may
// implement Extensioner to provide chiv with a default file extension.
type Formatter interface {
// FormatterFunc and write a single record.
// Open the Formatter and perform any format-specific initialization.
Open() error
// Format and write a single record.
Format([][]byte) error
// Close the formatter and perform any format-specific cleanup operations.
// Close the Formatter and perform any format-specific cleanup.
Close() error
}

// Extensioner is a Formatter that provides a default extension.
type Extensioner interface {
Extension() string
}

type csvFormatter struct {
w *csv.Writer
count int
w *csv.Writer
columns []Column
}

// CSV writes column headers and returns an initialized CSV formatter.
func CSV(w io.Writer, columns []Column) (Formatter, error) {
f := &csvFormatter{
w: csv.NewWriter(w),
count: len(columns),
func CSV(w io.Writer, columns []Column) Formatter {
return &csvFormatter{
w: csv.NewWriter(w),
columns: columns,
}
}

header := make([]string, f.count)
for i, column := range columns {
// Open the CSV formatter by writing the CSV header.
func (f *csvFormatter) Open() error {
header := make([]string, len(f.columns))
for i, column := range f.columns {
header[i] = column.Name()
}

if err := f.w.Write(header); err != nil {
return nil, fmt.Errorf("writing header: %w", err)
return fmt.Errorf("writing header: %w", err)
}

return f, nil
return nil
}

// Format a CSV record.
func (f *csvFormatter) Format(record [][]byte) error {
if f.count != len(record) {
if len(f.columns) != len(record) {
return errors.New("record length does not match number of columns")
}

strings := make([]string, f.count)
strings := make([]string, len(f.columns))
for i, item := range record {
strings[i] = string(item)
}
Expand All @@ -84,13 +96,16 @@ type yamlFormatter struct {
}

// YAML returns an initialized YAML formatter.
func YAML(w io.Writer, columns []Column) (Formatter, error) {
f := yamlFormatter{
func YAML(w io.Writer, columns []Column) Formatter {
return &yamlFormatter{
w: w,
columns: columns,
}
}

return &f, nil
// Open the YAML formatter.
func (_ *yamlFormatter) Open() error {
return nil
}

// Format a YAML record.
Expand Down Expand Up @@ -130,17 +145,20 @@ type jsonFormatter struct {
}

// JSON opens a JSON array and returns an initialized JSON formatter.
func JSON(w io.Writer, columns []Column) (Formatter, error) {
f := jsonFormatter{
func JSON(w io.Writer, columns []Column) Formatter {
return &jsonFormatter{
w: w,
columns: columns,
}
}

// Open the JSON array.
func (f *jsonFormatter) Open() error {
if err := f.writeByte(openBracket); err != nil {
return nil, fmt.Errorf("writing json: %w", err)
return fmt.Errorf("writing json: %w", err)
}

return &f, nil
return nil
}

// Format a JSON record.
Expand Down Expand Up @@ -169,7 +187,7 @@ func (f *jsonFormatter) Format(record [][]byte) error {
return nil
}

// Close the jsonFormatter after closing the JSON array.
// Close the JSON formatter after closing the JSON array.
func (f *jsonFormatter) Close() error {
if err := f.writeByte(closeBracket); err != nil {
return fmt.Errorf("closing json formatter: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions chiv_formatters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func test(t *testing.T, expected []string, format chiv.FormatterFunc) {
columns[i] = test.columns[i]
}

subject, err := format(&b, columns)
assert.NoError(t, err)
subject := format(&b, columns)
assert.NoError(t, subject.Open())

for _, record := range test.records {
assert.NoError(t, subject.Format(record))
Expand Down
28 changes: 16 additions & 12 deletions chiv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ func TestArchiveRows(t *testing.T) {
uploader *uploader
bucket string
formatter *formatter
formatErr error
options []chiv.Option
expectedErr string
}{
Expand Down Expand Up @@ -62,7 +61,7 @@ func TestArchiveRows(t *testing.T) {
scan: [][]string{{"first", "second"}},
columnTypesErr: errors.New("column types"),
},
expectedErr: "chiv: downloading: getting column types: column types",
expectedErr: "chiv: getting column types from rows: column types",
uploader: &uploader{},
formatter: &formatter{},
},
Expand All @@ -72,10 +71,11 @@ func TestArchiveRows(t *testing.T) {
columns: []string{"first_column", "second_column"},
scan: [][]string{{"first", "second"}},
},
formatErr: errors.New("formatter func"),
expectedErr: "chiv: downloading: opening formatter: formatter func",
expectedErr: "chiv: downloading: opening formatter: opening formatter",
uploader: &uploader{},
formatter: &formatter{},
formatter: &formatter{
openErr: errors.New("opening formatter"),
},
},
{
name: "scan error",
Expand Down Expand Up @@ -140,7 +140,7 @@ func TestArchiveRows(t *testing.T) {
for _, test := range cases {
t.Run(test.name, func(t *testing.T) {
var (
options = append(test.options, chiv.WithFormat(format(test.formatter, test.formatErr)))
options = append(test.options, chiv.WithFormat(format(test.formatter)))
err = chiv.ArchiveRows(test.rows, test.uploader, "bucket", options...)
)

Expand Down Expand Up @@ -215,16 +215,20 @@ func (u *uploader) UploadWithContext(ctx aws.Context, input *s3manager.UploadInp
return nil, u.uploadErr
}

func format(f chiv.Formatter, err error) chiv.FormatterFunc {
return func(_ io.Writer, _ []chiv.Column) (chiv.Formatter, error) {
return f, err
func format(f chiv.Formatter) chiv.FormatterFunc {
return func(_ io.Writer, _ []chiv.Column) chiv.Formatter {
return f
}
}

type formatter struct {
closed bool
written [][]string
formatErr, closeErr error
closed bool
written [][]string
openErr, formatErr, closeErr error
}

func (f *formatter) Open() error {
return f.openErr
}

func (f *formatter) Format(record [][]byte) error {
Expand Down
1 change: 0 additions & 1 deletion cmd/chiv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
var version = "dev"

func main() {

app := cli.App{
Name: "chiv",
HelpName: "chiv",
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:

test:
build: .
command: go test -p 1 -tags=unit,integration -covermode=atomic -timeout=60s ./...
command: go test -p 1 -tags=unit,integration -covermode=count -timeout=60s ./...
environment:
- AWS_ACCESS_KEY_ID=bogus
- AWS_SECRET_KEY=bogus
Expand Down
Binary file modified img/chiv.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 4703e01

Please sign in to comment.