Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: David Lawrence <david.lawrence@docker.com> (github: endophage)
  • Loading branch information
David Lawrence committed Aug 28, 2017
1 parent 07a5c04 commit 55f635c
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 22 deletions.
7 changes: 6 additions & 1 deletion server/handlers/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func Changefeed(ctx context.Context, w http.ResponseWriter, r *http.Request) err

func changefeed(logger ctxu.Logger, store storage.MetaStore, gun, changeID string, records int64) ([]byte, error) {
changes, err := store.GetChanges(changeID, int(records), gun)
if err != nil {
switch err.(type) {
case nil:
// no error to return
case storage.ErrBadQuery:
return nil, errors.ErrInvalidParams.WithDetail(err)
default:
logger.Errorf("%d GET could not retrieve records: %s", http.StatusInternalServerError, err.Error())
return nil, errors.ErrUnknown.WithDetail(err)
}
Expand Down
18 changes: 18 additions & 0 deletions server/handlers/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package handlers

import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"io"
"net/http"
Expand Down Expand Up @@ -114,9 +116,24 @@ func atomicUpdateHandler(ctx context.Context, w http.ResponseWriter, r *http.Req
logger.Errorf("500 POST error applying update request: %v", err)
return errors.ErrUpdating.WithDetail(nil)
}

logTS(logger, gun.String(), updates)

return nil
}

// logTS logs the timestamp update at Info level
func logTS(logger ctxu.Logger, gun string, updates []storage.MetaUpdate) {
for _, update := range updates {
if update.Role == data.CanonicalTimestampRole {
checksumBin := sha256.Sum256(update.Data)
checksum := hex.EncodeToString(checksumBin[:])
logger.Infof("updated %s to timestamp version %d, checksum %s", gun, update.Version, checksum)
break
}
}
}

// GetHandler returns the json for a specified role and GUN.
func GetHandler(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
defer r.Body.Close()
Expand Down Expand Up @@ -174,6 +191,7 @@ func DeleteHandler(ctx context.Context, w http.ResponseWriter, r *http.Request)
logger.Error("500 DELETE repository")
return errors.ErrUnknown.WithDetail(err)
}
logger.Infof("trust data deleted for %s", gun)
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions server/storage/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ type ErrNoKey struct {
func (err ErrNoKey) Error() string {
return fmt.Sprintf("Error, no timestamp key found for %s", err.gun)
}

// ErrBadQuery is used when the parameters provided cannot be appropriately
// coerced.
type ErrBadQuery struct {
msg string
}

func (err ErrBadQuery) Error() string {
return fmt.Sprintf("did not recognize parameters: %s", err.msg)
}
2 changes: 1 addition & 1 deletion server/storage/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (st *MemStorage) GetChanges(changeID string, records int, filterName string
} else {
id, err = strconv.ParseInt(changeID, 10, 32)
if err != nil {
return nil, err
return nil, ErrBadQuery{msg: fmt.Sprintf("change ID expected to be integer, provided ID was: %d", changeID)}
}
}
var (
Expand Down
35 changes: 25 additions & 10 deletions server/storage/rethinkdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ import (
"gopkg.in/dancannon/gorethink.v3"
)

// RethinkDB has eventual consistency. This represents a 60 second blackout
// period of the most recent changes in the changefeed which will not be
// returned while the eventual consistency works itself out.
// It's a var not a const so that the tests can turn it down to zero rather
// than have to include a sleep.
var blackoutTime = 60

// RDBTUFFile is a TUF file record
Expand All @@ -35,12 +40,12 @@ func (r RDBTUFFile) TableName() string {

// Change defines the the fields required for an object in the changefeed
type Change struct {
ID string `gorethink:"id,omitempty"`
ID string `gorethink:"id,omitempty" gorm:"primary_key" sql:"not null"`
CreatedAt time.Time `gorethink:"created_at"`
GUN string `gorethink:"gun"`
Version int `gorethink:"version"`
SHA256 string `gorethink:"sha256"`
Category string `gorethink:"category"`
GUN string `gorethink:"gun" gorm:"column:gun" sql:"type:varchar(255);not null"`
Version int `gorethink:"version" sql:"not null"`
SHA256 string `gorethink:"sha256" gorm:"column:sha256" sql:"type:varchar(64);"`
Category string `gorethink:"category" sql:"type:varchar(20);not null;"`
}

// TableName sets a specific table name for Changefeed
Expand Down Expand Up @@ -342,7 +347,8 @@ func (rdb RethinkDB) writeChange(gun string, version int, sha256, category strin
return err
}

// GetChanges is not implemented for RethinkDB
// GetChanges returns up to pageSize changes starting from changeID. It uses the
// blackout to account for RethinkDB's eventual consistency model
func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string) ([]Change, error) {
var (
lower, upper, bound []interface{}
Expand Down Expand Up @@ -386,8 +392,15 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string

changes := make([]Change, 0, pageSize)

// Between returns a slice of results from the rethinkdb table.
// The results are ordered using BetweenOpts.Index, which will
// default to the index of the immediately preceding OrderBy.
// The lower and upper are the start and end points for the slice
// and the Left/RightBound values determine whether the lower and
// upper values are included in the result per normal set semantics
// of "open" and "closed"
res, err := gorethink.DB(rdb.dbName).
Table(Change{}.TableName()).
Table(Change{}.TableName(), gorethink.TableOpts{ReadMode: "majority"}).
OrderBy(order).
Between(
lower,
Expand All @@ -414,10 +427,12 @@ func (rdb RethinkDB) GetChanges(changeID string, pageSize int, filterName string
return changes, res.All(&changes)
}

// bound creates the correct boundary based in the index that should be used for
// querying the changefeed.
func (rdb RethinkDB) bound(changeID, filterName string) ([]interface{}, string) {
term := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at")
createdAtTerm := gorethink.DB(rdb.dbName).Table(Change{}.TableName()).Get(changeID).Field("created_at")
if filterName != "" {
return []interface{}{filterName, term, changeID}, "rdb_gun_created_at_id"
return []interface{}{filterName, createdAtTerm, changeID}, "rdb_gun_created_at_id"
}
return []interface{}{term, changeID}, "rdb_created_at_id"
return []interface{}{createdAtTerm, changeID}, "rdb_created_at_id"
}
2 changes: 1 addition & 1 deletion server/storage/sql_models.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (g TUFFile) TableName() string {

// SQLChange defines the the fields required for an object in the changefeed
type SQLChange struct {
ID uint `gorm:"primary_key" sql:";not null" json:",string"`
ID uint `gorm:"primary_key" sql:"not null" json:",string"`
CreatedAt time.Time
GUN string `gorm:"column:gun" sql:"type:varchar(255);not null"`
Version int `sql:"not null"`
Expand Down
2 changes: 1 addition & 1 deletion server/storage/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (db *SQLStorage) GetChanges(changeID string, records int, filterName string
} else {
id, err = strconv.ParseInt(changeID, 10, 32)
if err != nil {
return nil, err
return nil, ErrBadQuery{msg: fmt.Sprintf("change ID expected to be integer, provided ID was: %d", changeID)}
}
}

Expand Down
20 changes: 12 additions & 8 deletions server/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,6 @@ func testGetChanges(t *testing.T, s MetaStore) {
require.NoError(t, err)
require.Len(t, c, 0)

//c, err = s.GetChanges(full[7].ID, -4, "")
//require.NoError(t, err)
//require.Len(t, c, 4)
//for i := 0; i < 4; i++ {
// require.Equal(t, "busybox", c[i].GUN)
// require.Equal(t, i+1, c[i].Version)
//}

c, err = s.GetChanges(full[6].ID, -4, "")
require.NoError(t, err)
require.Len(t, c, 4)
Expand Down Expand Up @@ -437,6 +429,17 @@ func testGetChanges(t *testing.T, s MetaStore) {
require.NoError(t, err)
require.Equal(t, before, after)

_, err1 := s.GetChanges("1000", 0, "")
_, err2 := s.GetChanges("doesn't exist", 0, "")
if _, ok := s.(RethinkDB); ok {
require.Error(t, err1)
require.Error(t, err2)
} else {
require.NoError(t, err1)
require.Error(t, err2)
require.IsType(t, ErrBadQuery{}, err2)
}

// do a deletion and check is shows up.
require.NoError(t, s.Delete("alpine"))
c, err = s.GetChanges("-1", -1, "")
Expand All @@ -453,4 +456,5 @@ func testGetChanges(t *testing.T, s MetaStore) {
require.Len(t, c, 2)
require.NotEqual(t, changeCategoryDeletion, c[0].Category)
require.NotEqual(t, "alpine", c[0].GUN)

}

0 comments on commit 55f635c

Please sign in to comment.