Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-11220. Initialize block length using the chunk list from DataNode before seek #7221

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,13 @@ public synchronized void initialize() throws IOException {
try {
blockData = getBlockData();
chunks = blockData.getChunksList();
LOG.debug("Block {} has block data {}", blockID, blockData);
Copy link
Contributor

@ChenSammi ChenSammi Oct 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jojochuang, are these logs still needed? Could you add the if(LOG.isDebugEnabled) check for these newly added logs if yes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameterized logging with a simple object reference has negligible overhead. But let's remove it. It's redundant.

if (blockInfo != null && blockInfo.isUnderConstruction()) {
// use the block length from DN if block is under construction.
length = blockData.getSize();
LOG.debug("Updated block length to {} for block {}", length, blockID);
} else {
LOG.debug("blockInfo = {}", blockInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this debug log still necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant. Removing it.

}
break;
// If we get a StorageContainerException or an IOException due to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class MultipartInputStream extends ExtendedInputStream {

private final String key;
private final long length;
private long length;

// List of PartInputStream, one for each part of the key
private final List<? extends PartInputStream> partStreams;
Expand All @@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream {
// can be reset if a new position is seeked.
private int prevPartIndex;

private boolean initialized = false;

public MultipartInputStream(String keyName,
List<? extends PartInputStream> inputStreams) {

Expand Down Expand Up @@ -130,6 +132,9 @@ protected void checkPartBytesRead(int numBytesToRead, int numBytesRead,
@Override
public synchronized void seek(long pos) throws IOException {
checkOpen();
if (!initialized) {
initialize();
}
if (pos == 0 && length == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
Expand Down Expand Up @@ -173,6 +178,26 @@ public synchronized void seek(long pos) throws IOException {
prevPartIndex = partIndex;
}

public synchronized void initialize() throws IOException {
Copy link
Contributor

@ChenSammi ChenSammi Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jojochuang , I think the problem we are now facing is how to make sure a reader of an open file will succeed. There are two things left last time that I'm not clearly thought about what's the right way to handle are
a. how to handle if the writer of the open file is slower than the reader of the open file. Say block A, it's block size in OM is 10 bytes, when the reader starts, the reader fetches the block A length from DN which is 80 bytes, and later, the writer writes more data, then length becomes 90 bytes, should the reader refetch the block size from DN again in this case?
b. what if there is a new block allocated by the writer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the HBase replication case, the application expects the input stream to read until the hsync length returned by output stream before the input stream instantiates. So for your case the read shouldn't read beyond 80 bytes. And therefore, new blocks added by the writer shouldn't matter either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW I have applied the fix to the HBase cluster. It looks like it's working with no errors. I'm still in the process of verifying that data does get written to the destination cluster though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If HBase reader needs read to the length of output stream when reader is instantiated, then we don't have to handle the two cases mentioned above. And this patch looks good to me. Thanks @jojochuang .

// Pre-check that the stream has not been intialized already
if (initialized) {
return;
}

for (PartInputStream partInputStream : partStreams) {
if (partInputStream instanceof BlockInputStream) {
((BlockInputStream) partInputStream).initialize();
}
}

long streamLength = 0L;
for (PartInputStream partInputStream : partStreams) {
streamLength += partInputStream.getLength();
}
this.length = streamLength;
initialized = true;
}

@Override
public synchronized long getPos() throws IOException {
return length == 0 ? 0 :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public class TestHSync {
private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
private static final int SERVICE_INTERVAL = 100;
private static final int EXPIRE_THRESHOLD_MS = 140;
private static final int WAL_HEADER_LEN = 83;

private static OpenKeyCleanupService openKeyCleanupService;

Expand Down Expand Up @@ -417,6 +418,45 @@ private static String getChunkPathOnDataNode(FSDataOutputStream outputStream)
return chunkPath;
}

@Test
public void testHSyncSeek() throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s.%s/",
OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path key1 = new Path(dir, "key-hsync-seek");

final byte[] data = new byte[1024];
final byte[] buffer = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);

try (FileSystem fs = FileSystem.get(CONF)) {
// Create key1
try (FSDataOutputStream os = fs.create(key1, true)) {
os.write(data, 0, WAL_HEADER_LEN);
// the first hsync will update the correct length in the key info at OM
os.hsync();
os.write(data, 0, data.length);
os.hsync(); // the second hsync will not update the length at OM
try (FSDataInputStream in = fs.open(key1)) {
// the actual key length is 1025, but the length in OM is 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these 1025 and 1 correct? I think they are WAL_HEADER_LEN + 1024, and WAL_HEADER_LEN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you're absolutely right. Updated.

in.seek(WAL_HEADER_LEN + 1);
final int n = in.read(buffer, 1, buffer.length - 1);
// expect to read 1023 bytes
assertEquals(buffer.length - 1, n);
for (int i = 1; i < buffer.length; i++) {
assertEquals(data[i], buffer[i], "expected at i=" + i);
}
}
} finally {
fs.delete(key1, false);
}
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
Expand Down