Skip to content

Commit

Permalink
fix: delete tag from chunck if tag deleted or not found (#4995)
Browse files Browse the repository at this point in the history
Co-authored-by: istae <14264581+istae@users.noreply.github.com>
  • Loading branch information
martinconic and istae authored Feb 21, 2025
1 parent 00d14c3 commit b840507
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 36 deletions.
93 changes: 57 additions & 36 deletions pkg/storer/internal/upload/uploadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,24 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state

indexStore := st.IndexStore()

deleteFunc := func() error {
// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
// we no longer need to keep track of this chunk. We also need to cleanup
// the pushItem.
pi := &pushItem{
Timestamp: ui.Uploaded,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}

return errors.Join(
indexStore.Delete(pi),
chunkstamp.Delete(indexStore, uploadScope, pi.Address, pi.BatchID),
st.ChunkStore().Delete(ctx, chunk.Address()),
indexStore.Delete(ui),
)
}

err := indexStore.Get(ui)
if err != nil {
// because of the nature of the feed mechanism of the uploadstore/pusher, a chunk that is in inflight may be sent more than once to the pusher.
Expand All @@ -575,49 +593,52 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state
return fmt.Errorf("failed to read uploadItem %x: %w", ui.BatchID, err)
}

ti := &TagItem{TagID: ui.TagID}
err = indexStore.Get(ti)
if err != nil {
return fmt.Errorf("failed getting tag: %w", err)
}

switch state {
case storage.ChunkSent:
ti.Sent++
case storage.ChunkStored:
ti.Stored++
// also mark it as synced
fallthrough
case storage.ChunkSynced:
ti.Synced++
case storage.ChunkCouldNotSync:
break
}
if ui.TagID > 0 {
ti := &TagItem{TagID: ui.TagID}
err = indexStore.Get(ti)
if err != nil {
if errors.Is(err, storage.ErrNotFound) { // tag was deleted by user
if state == storage.ChunkSent {
ui.TagID = 0
err = indexStore.Put(ui)
if err != nil {
return fmt.Errorf("failed updating empty tag for chunk: %w", err)
}

return nil
}

// state != sent
return deleteFunc()

} else {
return fmt.Errorf("failed getting tag: %w", err)
}
}
switch state {
case storage.ChunkSent:
ti.Sent++
case storage.ChunkStored:
ti.Stored++
// also mark it as synced
fallthrough
case storage.ChunkSynced:
ti.Synced++
case storage.ChunkCouldNotSync:
break
}

err = indexStore.Put(ti)
if err != nil {
return fmt.Errorf("failed updating tag: %w", err)
err = indexStore.Put(ti)
if err != nil {
return fmt.Errorf("failed updating tag: %w", err)
}
}

if state == storage.ChunkSent {
return nil
}

// Once the chunk is stored/synced/failed to sync, it is deleted from the upload store as
// we no longer need to keep track of this chunk. We also need to cleanup
// the pushItem.
pi := &pushItem{
Timestamp: ui.Uploaded,
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}

return errors.Join(
indexStore.Delete(pi),
chunkstamp.Delete(indexStore, uploadScope, pi.Address, pi.BatchID),
st.ChunkStore().Delete(ctx, chunk.Address()),
indexStore.Delete(ui),
)
return deleteFunc()
}

var (
Expand Down
123 changes: 123 additions & 0 deletions pkg/storer/internal/upload/uploadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,129 @@ func TestChunkReporter(t *testing.T) {
})
}

func TestDeleteTagReporter(t *testing.T) {

t.Parallel()

ts := newTestStorage(t)

var (
tag upload.TagItem
putter internal.PutterCloserWithReference
err error
)

if err := ts.Run(context.Background(), func(s transaction.Store) error {
tag, err = upload.NextTag(s.IndexStore())
return err
}); err != nil {
t.Fatalf("failed creating tag: %v", err)
}

if err := ts.Run(context.Background(), func(s transaction.Store) error {
putter, err = upload.NewPutter(s.IndexStore(), tag.TagID)
return err
}); err != nil {
t.Fatalf("failed creating putter: %v", err)
}

t.Run("delete tag while uploading", func(t *testing.T) {

chunk := chunktest.GenerateTestRandomChunks(1)[0]

if err := ts.Run(context.Background(), func(s transaction.Store) error {
return putter.Put(context.Background(), s, chunk)
}); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

report := func(ch swarm.Chunk, state int) {
t.Helper()
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return upload.Report(context.Background(), s, ch, state)
}); err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}
}

tagItem := &upload.TagItem{TagID: tag.TagID}
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Get(tagItem)
}); err != nil {
t.Fatalf("Get(...): unexpected error: %v", err)
}

if err := ts.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Delete(tagItem)
}); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

t.Run("mark sent", func(t *testing.T) {
report(chunk, storage.ChunkSent)
})

t.Run("verify internal state", func(t *testing.T) {

ui := &upload.UploadItem{Address: chunk.Address(), BatchID: chunk.Stamp().BatchID()}
err := ts.IndexStore().Get(ui)
if err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}

if diff := cmp.Diff(uint64(0), ui.TagID); diff != "" {
t.Fatalf("Get(...): unexpected TagItem (-want +have):\n%s", diff)
}
})
})

t.Run("delete tag while uploading and not sent", func(t *testing.T) {
// Generate a chunk.
chunk := chunktest.GenerateTestRandomChunks(1)[0]

// Store the chunk (which creates the uploadItem).
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return putter.Put(context.Background(), s, chunk)
}); err != nil {
t.Fatalf("Put(...): unexpected error: %v", err)
}

// Confirm the upload item exists.
ui := &upload.UploadItem{
Address: chunk.Address(),
BatchID: chunk.Stamp().BatchID(),
}
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Get(ui)
}); err != nil {
t.Fatalf("Get(...): unexpected error: %v", err)
}

// Delete the tag item to simulate a user deleting it.
tagItem := &upload.TagItem{TagID: tag.TagID}
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Delete(tagItem)
}); err != nil {
t.Fatalf("Delete(...): unexpected error: %v", err)
}

// Now report with a state other than ChunkSent, e.g. ChunkStored.
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return upload.Report(context.Background(), s, chunk, storage.ChunkStored)
}); err != nil {
t.Fatalf("Report(...): unexpected error: %v", err)
}

// Verify that the upload item was deleted (cleanup via deleteFunc).
if err := ts.Run(context.Background(), func(s transaction.Store) error {
return s.IndexStore().Get(ui)
}); !errors.Is(err, storage.ErrNotFound) {
t.Fatalf("expected uploadItem to be deleted, got error: %v", err)
}
})

}

func TestNextTagID(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit b840507

Please sign in to comment.