Skip to content

Commit

Permalink
benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincabbage committed Sep 24, 2019
1 parent 3c788d3 commit 345ea19
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
go-version: ${{ matrix.go-version }}
- name: Install linter
run: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s v1.18.0
run: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s v1.18.0
- name: Run linter
run: ./bin/golangci-lint run

Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
dist/
chiv.test
*.out
59 changes: 31 additions & 28 deletions chiv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,40 +13,58 @@ import (
"golang.org/x/sync/errgroup"
)

// Database queries tables for rows.
type Database interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

// Uploader uploads input to S3.
type Uploader interface {
UploadWithContext(ctx aws.Context, input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}

// Archive a database table to S3.
func Archive(db database, s3 uploader, table, bucket string, options ...Option) error {
func Archive(db Database, s3 Uploader, table, bucket string, options ...Option) error {
return ArchiveWithContext(context.Background(), db, s3, table, bucket, options...)
}

// ArchiveWithContext is like Archive, with context.
func ArchiveWithContext(ctx context.Context, db database, s3 uploader, table, bucket string, options ...Option) error {
func ArchiveWithContext(ctx context.Context, db Database, s3 Uploader, table, bucket string, options ...Option) error {
return NewArchiver(db, s3).ArchiveWithContext(ctx, table, bucket, options...)
}

// Rows reports its column types and is iterable.
type Rows interface {
ColumnTypes() ([]*sql.ColumnType, error)
Next() bool
Scan(...interface{}) error
Err() error
}

// ArchiveRows to S3.
func ArchiveRows(rows rows, s3 uploader, bucket string, options ...Option) error {
func ArchiveRows(rows Rows, s3 Uploader, bucket string, options ...Option) error {
return ArchiveRowsWithContext(context.Background(), rows, s3, bucket, options...)
}

// ArchiveRowsWithContext is like ArchiveRows, with context.
func ArchiveRowsWithContext(ctx context.Context, rows rows, s3 uploader, bucket string, options ...Option) error {
func ArchiveRowsWithContext(ctx context.Context, rows Rows, s3 Uploader, bucket string, options ...Option) error {
return NewArchiver(nil, s3).ArchiveRowsWithContext(ctx, rows, bucket, options...)
}

// Archiver archives database tables to Amazon S3.
type Archiver struct {
db database
s3 uploader
db Database
s3 Uploader
format FormatterFunc
key string
extension string
null []byte
columns []string
}

// NewArchiver constructs an archiver with the given database, S3 uploader and options.
// NewArchiver constructs an archiver with the given Database, S3 uploader and options.
// Options set on creation apply to all calls to Archive unless overridden.
func NewArchiver(db database, s3 uploader, options ...Option) *Archiver {
func NewArchiver(db Database, s3 Uploader, options ...Option) *Archiver {
a := Archiver{
db: db,
s3: s3,
Expand All @@ -60,7 +78,7 @@ func NewArchiver(db database, s3 uploader, options ...Option) *Archiver {
return &a
}

// Archive a database table to S3. Any options provided override those set on creation.
// Archive a Database table to S3. Any options provided override those set on creation.
func (a *Archiver) Archive(table, bucket string, options ...Option) error {
return a.ArchiveWithContext(context.Background(), table, bucket, options...)
}
Expand All @@ -86,12 +104,12 @@ func (a *Archiver) ArchiveWithContext(ctx context.Context, table, bucket string,
}

// ArchiveRows to S3.
func (a *Archiver) ArchiveRows(rows rows, bucket string, options ...Option) (err error) {
func (a *Archiver) ArchiveRows(rows Rows, bucket string, options ...Option) (err error) {
return a.ArchiveRowsWithContext(context.Background(), rows, bucket)
}

// ArchiveRowsWithContext is like ArchiveRows, with context.
func (a *Archiver) ArchiveRowsWithContext(ctx context.Context, rows rows, bucket string, options ...Option) (err error) {
func (a *Archiver) ArchiveRowsWithContext(ctx context.Context, rows Rows, bucket string, options ...Option) (err error) {
b := *a
for _, option := range options {
option(&b)
Expand All @@ -100,18 +118,7 @@ func (a *Archiver) ArchiveRowsWithContext(ctx context.Context, rows rows, bucket
return b.archive(ctx, rows, "", bucket)
}

type database interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}

type rows interface {
ColumnTypes() ([]*sql.ColumnType, error)
Next() bool
Scan(...interface{}) error
Err() error
}

func (a *Archiver) archive(ctx context.Context, rows rows, table, bucket string) (err error) {
func (a *Archiver) archive(ctx context.Context, rows Rows, table, bucket string) (err error) {
var (
r, w = io.Pipe()
g, gctx = errgroup.WithContext(ctx)
Expand All @@ -126,7 +133,7 @@ func (a *Archiver) archive(ctx context.Context, rows rows, table, bucket string)
return g.Wait()
}

func (a *Archiver) download(ctx context.Context, rows rows, w io.WriteCloser) (err error) {
func (a *Archiver) download(ctx context.Context, rows Rows, w io.WriteCloser) (err error) {
defer func() {
if e := w.Close(); e != nil && err == nil {
err = e
Expand Down Expand Up @@ -204,10 +211,6 @@ func (a *Archiver) query(ctx context.Context, table string) (*sql.Rows, error) {
return a.db.QueryContext(ctx, query)
}

type uploader interface {
UploadWithContext(ctx aws.Context, input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}

func (a *Archiver) upload(ctx context.Context, r io.ReadCloser, table string, bucket string) (err error) {
defer func() {
if e := r.Close(); e != nil && err == nil {
Expand Down
18 changes: 13 additions & 5 deletions chiv_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import (

func BenchmarkArchiver_Archive(b *testing.B) {
var (
benchmarks = []int{1, 10, 100, 1_000, 5_000, 10_000}
benchmarks = []int{1, 10, 100, 1_000, 5_000, 10_000, 1_000_000}
ctx = context.Background()
rows = &benchmarkRows{columnTypes: make([]*sql.ColumnType, 8)}
uploader = &uploader{}
bucket = "benchmark_bucket"
format = chiv.WithFormat(formatterFunc(&benchmarkFormatter{}, nil))
rows = &benchmarkRows{
columnTypes: make([]*sql.ColumnType, 10),
column: sql.RawBytes("column_value"),
}
uploader = &uploader{}
bucket = "benchmark_bucket"
format = chiv.WithFormat(formatterFunc(&benchmarkFormatter{}, nil))
)

for _, count := range benchmarks {
Expand All @@ -39,6 +42,7 @@ func BenchmarkArchiver_Archive(b *testing.B) {

type benchmarkRows struct {
columnTypes []*sql.ColumnType
column sql.RawBytes
ndx, max int
}

Expand All @@ -51,6 +55,10 @@ func (r *benchmarkRows) Next() bool {
}

func (r *benchmarkRows) Scan(c ...interface{}) error {
for i := range c {
*(c[i].(*sql.RawBytes)) = r.column
}

r.ndx++
return nil
}
Expand Down

0 comments on commit 345ea19

Please sign in to comment.