Skip to content

Commit

Permalink
[CR] Remove probably-unused fields from monitoring
Browse files Browse the repository at this point in the history
After consulting @johnnyaug:
- These fields are not reported in callhome
- We do not know of anyone using these fields
- At least one of these fields may have been added recently by Go.  (It's in `database/sql`,
  which is a part of Go and therefore timeless and versionless, so we actually don't know!)
  • Loading branch information
arielshaqed committed Oct 28, 2020
1 parent 4ec6f1d commit 72c25aa
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 9 deletions.
14 changes: 14 additions & 0 deletions catalog/cataloger.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ type ExportConfigurator interface {
PutExportConfiguration(repository string, branch string, conf *ExportConfiguration) error
}

type ExportStateHandler interface {
// ExportMarkStart starts an export operation on branch of repo and returns the ref of
// the previous export. If the previous export failed it returns ErrExportFailed. If
// another export is running return state ExportStatusInProgress -- and caller should
// clean it up by removing and adding the "next export" withint this transaction. If
// another transaction concurrently runs ExportMarkStart on branchID, one blocks until
// the other is done.
ExportMarkStart(tx db.Tx, repo string, branch string, newRef string) (string, CatalogBranchExportStatus, error)
// Delete any export state for repo. Mostly useful in tests: in a living system the
// export state is part of the state of the world.
ExportStateDelete(tx db.Tx, repo string, branch string) error
}

type Cataloger interface {
RepositoryCataloger
BranchCataloger
Expand All @@ -178,6 +191,7 @@ type Cataloger interface {
Merger
Hookser
ExportConfigurator
ExportStateHandler
io.Closer
}

Expand Down
68 changes: 67 additions & 1 deletion catalog/cataloger_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strings"

"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgx/v4"
"github.com/lib/pq"
"github.com/treeverse/lakefs/db"
)
Expand Down Expand Up @@ -101,7 +102,7 @@ func (c *cataloger) GetExportConfigurations() ([]ExportConfigurationForBranch, e
e.export_path export_path, e.export_status_path export_status_path,
e.last_keys_in_prefix_regexp last_keys_in_prefix_regexp
FROM catalog_branches_export e JOIN catalog_branches b ON e.branch_id = b.id
JOIN catalog_repositories r ON b.repository_id = r.id`)
JOIN catalog_repositories r ON b.repository_id = r.id`)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,3 +134,68 @@ func (c *cataloger) PutExportConfiguration(repository string, branch string, con
})
return err
}

type errExportFailed struct {
Message string // Error string reported in database
}

func (e errExportFailed) Error() string {
return e.Message
}

var ErrExportFailed = errExportFailed{}

func (c *cataloger) ExportMarkStart(tx db.Tx, repo string, branch string, newRef string) (oldRef string, state CatalogBranchExportStatus, err error) {
var res struct {
CurrentRef string
State CatalogBranchExportStatus
ErrorMessage string
}
branchID, err := c.getBranchIDCache(tx, repo, branch)
if err != nil {
return
}
err = tx.Get(&res, `
SELECT current_ref, state, error_message
FROM catalog_branches_export_state
WHERE branch_id=$1 FOR UPDATE`,
branchID)
if err != nil && !errors.Is(err, ErrEntryNotFound) {
err = fmt.Errorf("ExportMarkStart: failed to get existing state: %w", err)
return
}
oldRef = res.CurrentRef
state = res.State

tag, err := tx.Exec(`
UPDATE catalog_branches
SET current_ref=$2, state='in-progress', error_message=NULL
WHERE branch_id=$1`,
branchID, newRef)
if err != nil {
return
}
if tag.RowsAffected() != 1 {
err = fmt.Errorf("[I] ExportMarkStart: Updated %d rows instead of just 1: %w", pgx.ErrNoRows, err)
return
}
if state == ExportStatusFailed {
err = errExportFailed{res.ErrorMessage}
}
return
}

func (c *cataloger) ExportStateDelete(tx db.Tx, repo string, branch string) error {
branchID, err := c.getBranchIDCache(tx, repo, branch)
if err != nil {
return err
}
tag, err := tx.Exec(`DELETE FROM catalog_branches_export_state WHERE branch_id=$1`, branchID)
if err != nil {
return err
}
if tag.RowsAffected() != 1 {
return fmt.Errorf("[I] ExportStateDelete: deleted %d rows instead of just 1: %w", tag.RowsAffected(), pgx.ErrNoRows)
}
return nil
}
90 changes: 86 additions & 4 deletions catalog/cataloger_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package catalog
import (
"context"
"errors"
"fmt"
"regexp/syntax"
"sort"
"testing"

"github.com/go-test/deep"
"github.com/jackc/pgx"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/lib/pq"
"github.com/treeverse/lakefs/db"
)

const (
Expand Down Expand Up @@ -37,10 +41,6 @@ func (s configForBranchSlice) Swap(i int, j int) {
}

func TestExportConfiguration(t *testing.T) {
const (
branchID = 17
anotherBranchID = 29
)
ctx := context.Background()
c := testCataloger(t)
repo := testCatalogerRepo(t, ctx, c, prefix, defaultBranch)
Expand Down Expand Up @@ -148,3 +148,85 @@ func TestExportConfiguration(t *testing.T) {
}
})
}

func TestExportState(t *testing.T) {
const (
ref1 = "this commit"
ref2 = "that commit"
)
ctx := context.Background()
c := testCataloger(t)
repo := testCatalogerRepo(t, ctx, c, prefix, defaultBranch)

cases := []struct {
name string
startRef string // start with this ref (and state) if set, otherwise start with no row
startState CatalogBranchExportStatus
setRef string
expectState CatalogBranchExportStatus
expectErr func(t *testing.T, err error)
}{
{
name: "clean",
setRef: ref2,
expectState: ExportStatusInProgress,
}, {
name: "reset",
startRef: ref1,
setRef: ref2,
expectState: ExportStatusInProgress,
}, {
name: "previousFailed",
startRef: ref1,
startState: ExportStatusFailed,
setRef: ref2,
expectState: ExportStatusInProgress,
expectErr: func(t *testing.T, err error) {
if !errors.Is(err, ErrExportFailed) {
t.Errorf("expected ErrExportFailed but got %s", err)
}
},
},
}
for _, tt := range cases {
pool, err := pgxpool.Connect(ctx, c.DbConnURI)
if err != nil {
t.Fatalf(err.Error())
}
err = db.Ping(ctx, pool)
if err != nil {
t.Fatalf(err.Error())
}
d := db.NewPgxDatabase(pool)
t.Run(tt.name, func(t *testing.T) {
_, err = d.Transact(func(tx db.Tx) (interface{}, error) {
if tt.startRef != "" {
// This also ends up testing ExportMarkStart in the same way
// each time.
if _, _, err := c.ExportMarkStart(tx, repo, defaultBranch, tt.startRef); err != nil {
return nil, fmt.Errorf("setup (mark previous): %w", err)
}
} else {
if err := c.ExportStateDelete(tx, repo, defaultBranch); err != nil && !errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("setup (delete): %w", err)
}
}

gotRef, gotState, err := c.ExportMarkStart(tx, repo, defaultBranch, tt.setRef)
if tt.expectErr != nil {
tt.expectErr(t, err)
}
if gotRef != tt.startRef {
t.Errorf("expected to old ref %s but got %s", tt.startRef, gotRef)
}
if tt.startState != "" && gotState != tt.startState {
t.Errorf("expected previous state %s but got %s", tt.startState, gotState)
}
return nil, nil
})
if err != nil {
t.Errorf(err.Error())
}
})
}
}
7 changes: 3 additions & 4 deletions db/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,8 @@ func (d *PgxDatabase) Stats() sql.DBStats {
// waiting. The two are close enough (but race each other).
WaitCount: stat.EmptyAcquireCount(),
// Time to acquire is close enough to time spent waiting; fudge.
WaitDuration: stat.AcquireDuration(),
MaxIdleClosed: 0, // TODO(ariels): Does pgx have this anywhere?
MaxIdleTimeClosed: 0, // TODO(ariels): Could generate this with post-connect/release hooks
MaxLifetimeClosed: 0, // TODO(ariels): Is this even possible in pgx?
WaitDuration: stat.AcquireDuration(),
// Not clear that pgx can report MaxIdleClosed, MaxIdleTimeClosed,
// MaxLifetimeClosed.
}
}
4 changes: 4 additions & 0 deletions ddl/000009_export_current.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ CREATE TABLE IF NOT EXISTS catalog_branches_export_state (
ALTER TABLE catalog_branches_export_state
ADD CONSTRAINT branches_export_state_branches_fk
FOREIGN KEY (branch_id) REFERENCES catalog_branches(id)
-- Does *not* reference catalog_branches_export - state is independent of configuration,
-- e.g. when configuration is changed.
ON DELETE CASCADE;

ALTER TABLE catalog_branches_export_state
ADD CONSTRAINT catalog_branches_export_error_on_failure
CHECK ((state = 'export-failed') = (error_message IS NOT NULL));

-- BUG(ariels): reset export state when catalog_branches_export changes it physical address.

END;
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ require (
github.com/jackc/pgerrcode v0.0.0-20190803225404-afa3381909a6
github.com/jackc/pgproto3/v2 v2.0.4 // indirect
github.com/jackc/pgtype v1.4.2
github.com/jackc/pgx v3.6.2+incompatible
github.com/jackc/pgx/v4 v4.8.1
github.com/jamiealquiza/tachymeter v2.0.0+incompatible
github.com/jedib0t/go-pretty v4.3.0+incompatible
Expand Down

0 comments on commit 72c25aa

Please sign in to comment.