Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.uniffle.client.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -110,11 +109,7 @@ public ShuffleReadClientImpl(
}

// copy blockIdBitmap to track all pending blocks
try {
pendingBlockIds = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
} catch (IOException ioe) {
throw new RuntimeException("Can't create pending blockIds.", ioe);
}
pendingBlockIds = RssUtils.cloneBitMap(blockIdBitmap);

clientReadHandler = ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
}
Expand Down Expand Up @@ -213,11 +208,7 @@ private int read() {
@Override
public void checkProcessedBlockIds() {
Roaring64NavigableMap cloneBitmap;
try {
cloneBitmap = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
} catch (IOException ioe) {
throw new RuntimeException("Can't validate processed blockIds.", ioe);
}
cloneBitmap = RssUtils.cloneBitMap(blockIdBitmap);
cloneBitmap.and(processedBlockIds);
if (!blockIdBitmap.equals(cloneBitmap)) {
throw new RssException("Blocks read inconsistent: expected " + blockIdBitmap.getLongCardinality()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ public static Roaring64NavigableMap deserializeBitMap(byte[] bytes) throws IOExc
return bitmap;
}

public static Roaring64NavigableMap cloneBitMap(Roaring64NavigableMap bitmap) {
Roaring64NavigableMap clone = Roaring64NavigableMap.bitmapOf();
clone.or(bitmap);
return clone;
}

public static List<ShuffleDataSegment> transIndexDataToSegments(
ShuffleIndexResult shuffleIndexResult, int readBufferSize) {
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -88,6 +89,14 @@ public void testSerializeBitmap() throws Exception {
assertEquals(Roaring64NavigableMap.bitmapOf(), RssUtils.deserializeBitMap(new byte[]{}));
}

@Test
public void testCloneBitmap() {
Roaring64NavigableMap bitmap1 = Roaring64NavigableMap.bitmapOf(1, 2, 100, 10000);
Roaring64NavigableMap bitmap2 = RssUtils.cloneBitMap(bitmap1);
assertNotSame(bitmap1, bitmap2);
assertEquals(bitmap1, bitmap2);
}

@Test
public void testShuffleIndexSegment() {
ShuffleIndexResult shuffleIndexResult = new ShuffleIndexResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public void readTest9() throws Exception {
ShuffleReadClientImpl readClient;

createTestData(testAppId, expectedData, blockIdBitmap, taskIdBitmap);
Roaring64NavigableMap beforeAdded = RssUtils.deserializeBitMap(RssUtils.serializeBitMap(blockIdBitmap));
Roaring64NavigableMap beforeAdded = RssUtils.cloneBitMap(blockIdBitmap);
// write data by another task, read data again, the cache for index file should be updated
blocks = createShuffleBlockList(
0, 0, 1, 3, 25, blockIdBitmap, Maps.newHashMap(), mockSSI);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,9 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
if (System.currentTimeMillis() - start > commitTimeout) {
throw new RuntimeException("Shuffle data commit timeout for " + commitTimeout + " ms");
}
byte[] bitmapBytes;
synchronized (cachedBlockIds) {
bitmapBytes = RssUtils.serializeBitMap(cachedBlockIds);
cloneBlockIds = RssUtils.cloneBitMap(cachedBlockIds);
}
cloneBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
long expectedCommitted = cloneBlockIds.getLongCardinality();
shuffleBufferManager.commitShuffleTask(appId, shuffleId);
Roaring64NavigableMap committedBlockIds;
Expand All @@ -183,9 +181,8 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
while (true) {
committedBlockIds = shuffleFlushManager.getCommittedBlockIds(appId, shuffleId);
synchronized (committedBlockIds) {
bitmapBytes = RssUtils.serializeBitMap(committedBlockIds);
cloneCommittedBlockIds = RssUtils.cloneBitMap(committedBlockIds);
}
cloneCommittedBlockIds = RssUtils.deserializeBitMap(bitmapBytes);
cloneBlockIds.andNot(cloneCommittedBlockIds);
if (cloneBlockIds.isEmpty()) {
break;
Expand Down