Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
api, chunk, network, pushsync, storage: remove ModeSetSync in favor o…
Browse files Browse the repository at this point in the history
…f ModeSetSyncPull and ModeSetSyncPush
  • Loading branch information
acud committed Sep 30, 2019
1 parent a3c3a5e commit 0750fea
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 23 deletions.
2 changes: 1 addition & 1 deletion api/http/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func InitUploadTag(h http.Handler, tags *chunk.Tags) http.Handler {
}

log.Trace("creating tag", "tagName", tagName, "estimatedTotal", estimatedTotal)
anon := anonTag == "true"
anon := strings.ToLower(anonTag) == "true"
t, err := tags.Create(tagName, estimatedTotal, anon)
if err != nil {
log.Error("error creating tag", "err", err, "tagName", tagName)
Expand Down
10 changes: 6 additions & 4 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,10 @@ func (m ModeSet) String() string {
switch m {
case ModeSetAccess:
return "Access"
case ModeSetSync:
return "Sync"
case ModeSetSyncPush:
return "SyncPush"
case ModeSetSyncPull:
return "SyncPull"
case ModeSetRemove:
return "Remove"
case ModeSetPin:
Expand All @@ -229,9 +231,9 @@ func (m ModeSet) String() string {
const (
// ModeSetAccess: when an update request is received for a chunk or chunk is retrieved for delivery
ModeSetAccess ModeSet = iota
// ModeSetSync: when a chunk is added to a pull sync batch or when a push sync receipt is received
ModeSetSync
// ModeSetSyncPush: when a push sync receipt is received for a chunk
ModeSetSyncPush
// ModeSetSyncPull: when a chunk is added to a pull sync batch
ModeSetSyncPull
// ModeSetRemove: when a chunk is removed
ModeSetRemove
Expand Down
2 changes: 1 addition & 1 deletion network/stream/v2/sync_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (s *syncProvider) Get(ctx context.Context, addr ...chunk.Address) ([]chunk.

// Set the supplied addrs as synced in order to allow for garbage collection
func (s *syncProvider) Set(ctx context.Context, addrs ...chunk.Address) error {
err := s.netStore.Set(ctx, chunk.ModeSetSync, addrs...)
err := s.netStore.Set(ctx, chunk.ModeSetSyncPull, addrs...)
if err != nil {
metrics.GetOrRegisterCounter("syncProvider.set-sync-err", nil).Inc(1)
return err
Expand Down
2 changes: 1 addition & 1 deletion pushsync/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (p *Pusher) sync() {
}

// set chunk status to synced, insert to db GC index
if err := p.store.Set(ctx, chunk.ModeSetSync, syncedAddrs...); err != nil {
if err := p.store.Set(ctx, chunk.ModeSetSyncPush, syncedAddrs...); err != nil {
log.Error("pushsync: error setting chunks to synced", "err", err)
}

Expand Down
14 changes: 7 additions & 7 deletions storage/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestPinGC(t *testing.T) {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -264,14 +264,14 @@ func TestGCAfterPin(t *testing.T) {
t.Fatal(err)
}

// Pin before adding to GC in ModeSetSync
// Pin before adding to GC in ModeSetSyncPull
err = db.Set(context.Background(), chunk.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
pinAddrs = append(pinAddrs, ch.Address())

err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestDB_gcSize(t *testing.T) {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions storage/localstore/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestDB_gcIndex(t *testing.T) {
t.Run("sync one chunk", func(t *testing.T) {
ch := chunks[0]

err := db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err := db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand All @@ -147,7 +147,7 @@ func TestDB_gcIndex(t *testing.T) {

t.Run("sync all chunks", func(t *testing.T) {
for i := range chunks {
err := db.Set(context.Background(), chunk.ModeSetSync, chunks[i].Address())
err := db.Set(context.Background(), chunk.ModeSetSyncPull, chunks[i].Address())
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/localstore/mode_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestModeGetRequest(t *testing.T) {
})

// set chunk to synced state
err = db.Set(context.Background(), chunk.ModeSetSync, ch.Address())
err = db.Set(context.Background(), chunk.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
Expand Down
5 changes: 3 additions & 2 deletions storage/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (db *DB) set(mode chunk.ModeSet, addrs ...chunk.Address) (err error) {
db.binIDs.PutInBatch(batch, uint64(po), id)
}

case chunk.ModeSetSync, chunk.ModeSetSyncPush, chunk.ModeSetSyncPull:
case chunk.ModeSetSyncPush, chunk.ModeSetSyncPull:
for _, addr := range addrs {
c, err := db.setSync(batch, addr, mode)
if err != nil {
Expand Down Expand Up @@ -228,8 +228,9 @@ func (db *DB) setSync(batch *leveldb.Batch, addr chunk.Address, mode chunk.ModeS
switch mode {
case chunk.ModeSetSyncPull:
if tag.Anonymous {
// this will not get called twice because we remove the item in L217
// this will not get called twice because we remove the item once after the !moveToGc check
tag.Inc(chunk.StateSent)
tag.Inc(chunk.StateSynced)
moveToGc = true
}
case chunk.ModeSetSyncPush:
Expand Down
48 changes: 45 additions & 3 deletions storage/localstore/mode_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestModeSetAccess(t *testing.T) {
}
}

// TestModeSetSync validates ModeSetSync index values on the provided DB.
func TestModeSetSync(t *testing.T) {
// TestModeSetSyncPull validates ModeSetSyncPull index values on the provided DB.
func TestModeSetSyncPull(t *testing.T) {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
Expand All @@ -82,7 +82,49 @@ func TestModeSetSync(t *testing.T) {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSync, chunkAddresses(chunks)...)
err = db.Set(context.Background(), chunk.ModeSetSyncPull, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}

binIDs := make(map[uint8]uint64)

for _, ch := range chunks {
po := db.po(ch.Address())
binIDs[po]++

newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t)
newPushIndexTest(db, ch, wantTimestamp, leveldb.ErrNotFound)(t)
newGCIndexTest(db, ch, wantTimestamp, wantTimestamp, binIDs[po])(t)
}

t.Run("gc index count", newItemsCountTest(db.gcIndex, tc.count))

t.Run("gc size", newIndexGCSizeTest(db))
})
}
}

// TestModeSetSyncPush validates ModeSetSyncPush index values on the provided DB.
func TestModeSetSyncPush(t *testing.T) {
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()

chunks := generateTestRandomChunks(tc.count)

wantTimestamp := time.Now().UTC().UnixNano()
defer setNow(func() (t int64) {
return wantTimestamp
})()

_, err := db.Put(context.Background(), chunk.ModePutUpload, chunks...)
if err != nil {
t.Fatal(err)
}

err = db.Set(context.Background(), chunk.ModeSetSyncPush, chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/localstore/retrieval_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) {
b.StartTimer()

for i := 0; i < count; i++ {
err := db.Set(context.Background(), chunk.ModeSetSync, addrs[i])
err := db.Set(context.Background(), chunk.ModeSetSyncPull, addrs[i])
if err != nil {
b.Fatal(err)
}
Expand Down

0 comments on commit 0750fea

Please sign in to comment.