Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage/dataflux): address deadlock when reading from ranges #11303

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions storage/dataflux/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func TestMain(m *testing.M) {

// Lists the all the objects in the bucket.
func TestIntegration_NextBatch_All(t *testing.T) {
t.Skip("#11198")
if testing.Short() {
t.Skip("Integration tests skipped in short mode")
}
Expand All @@ -97,7 +96,6 @@ func TestIntegration_NextBatch_All(t *testing.T) {
}

func TestIntegration_NextBatch(t *testing.T) {
t.Skip("#11196")
// Accessing public bucket to list large number of files in batches.
// See https://cloud.google.com/storage/docs/public-datasets/landsat
if testing.Short() {
Expand Down
4 changes: 4 additions & 0 deletions storage/dataflux/range_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ func (rs *rangeSplitter) convertStringRangeToMinimalIntRange(

// charPosition returns the index of the character in the alphabet set.
func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
rs.mu.Lock() // Acquire the lock
jdnurme marked this conversation as resolved.
Show resolved Hide resolved
defer rs.mu.Unlock() // Release the lock when the function exits
if idx, ok := rs.alphabetMap[ch]; ok {
return idx, nil
}
Expand All @@ -337,6 +339,8 @@ func (rs *rangeSplitter) charPosition(ch rune) (int, error) {
// convertRangeStringToArray transforms the range string into a rune slice while
// verifying the presence of each character in the alphabets.
func (rs *rangeSplitter) convertRangeStringToArray(rangeString string) ([]rune, error) {
rs.mu.Lock() // Acquire the lock
defer rs.mu.Unlock() // Release the lock when the function exits
for _, char := range rangeString {
if _, exists := rs.alphabetMap[char]; !exists {
return nil, fmt.Errorf("character %c in range string %q is not found in the alphabet array", char, rangeString)
Expand Down
13 changes: 5 additions & 8 deletions storage/dataflux/worksteal.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func (c *Lister) workstealListing(ctx context.Context) ([]*storage.ObjectAttrs,
if err != nil {
return nil, fmt.Errorf("creating new range splitter: %w", err)
}

g, ctx := errgroup.WithContext(ctx)
// Initialize all workers as idle.
for i := 0; i < c.parallelism; i++ {
Expand Down Expand Up @@ -126,13 +125,12 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {
// If a worker is idle, sleep for a while before checking the next update.
// Worker status is changed to active when it finds work in range channel.
if w.status == idle {
if len(w.lister.ranges) == 0 {
time.Sleep(sleepDurationWhenIdle)
continue
} else {
newRange := <-w.lister.ranges
select {
case newRange := <-w.lister.ranges:
<-w.idleChannel
w.updateWorker(newRange.startRange, newRange.endRange, active)
case <-time.After(sleepDurationWhenIdle):
continue
}
}
// Active worker to list next page of objects within the range
Expand All @@ -153,7 +151,7 @@ func (w *worker) doWorkstealListing(ctx context.Context) error {

// If listing not complete and idle workers are available, split the range
// and give half of work to idle worker.
for len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
if len(w.idleChannel)-len(w.lister.ranges) > 0 && ctx.Err() == nil {
// Split range and upload half of work for idle worker.
splitPoint, err := w.rangesplitter.splitRange(w.startRange, w.endRange, 1)
if err != nil {
Expand Down Expand Up @@ -191,7 +189,6 @@ func (w *worker) shutDownSignal() bool {
w.result.mu.Unlock()

alreadyListedBatchSizeObjects := w.lister.batchSize > 0 && lenResult >= w.lister.batchSize

return noMoreObjects || alreadyListedBatchSizeObjects
}

Expand Down
1 change: 0 additions & 1 deletion storage/dataflux/worksteal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
)

func TestWorkstealListingEmulated(t *testing.T) {
t.Skip("https://github.com/googleapis/google-cloud-go/issues/11205")
transportClientTest(context.Background(), t, func(t *testing.T, ctx context.Context, project, bucket string, client *storage.Client) {

attrs := &storage.BucketAttrs{
Expand Down
Loading