Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-10821. Ensure ChunkBuffer fully writes buffer to FileChannel #6652

Merged
merged 9 commits into from
Dec 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Objects;
import java.util.function.Function;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.UncheckedAutoCloseable;

Expand Down Expand Up @@ -102,7 +103,7 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
return BufferUtils.writeFully(channel, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;

import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -246,9 +248,12 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
findCurrent();
return bytes;
return written;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.utils.db.CodecBuffer;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
Expand Down Expand Up @@ -279,7 +280,11 @@ public List<ByteBuffer> asByteBufferList() {

@Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffers.toArray(new ByteBuffer[0]));
long written = 0;
for (ByteBuffer buf : buffers) {
written += BufferUtils.writeFully(channel, buf);
}
return written;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.hadoop.ozone.common.utils;

import com.google.common.base.Preconditions;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -136,4 +139,19 @@ public static int getNumberOfBins(long numElements, int maxElementsPerBin) {
}
return Math.toIntExact(n);
}

/**
* Write all remaining bytes in buffer to the given channel.
*/
public static long writeFully(GatheringByteChannel ch, ByteBuffer bb) throws IOException {
long written = 0;
while (bb.remaining() > 0) {
int n = ch.write(bb);
if (n <= 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the javadoc, n == 0 is a valid case.

BTW, let's have another method to handle array? The GatheringByteChannel works more efficient in that way.

  public static long writeFully(GatheringByteChannel ch, ByteBuffer[] buffers) throws IOException {
    long written = 0;
    for(int i = 0; i < buffers.length; i++) {
      while (buffers[i].remaining() > 0) {
        final long n = ch.write(buffers, i, buffers.length - i);
        if (n < 0) {
          throw new IllegalStateException("GatheringByteChannel.write returns " + n + " for " + ch);
        }
        written += n;
      }
    }
    return written;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szetszwo for the suggestion. Can we address it in follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let's do it in a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szetszwo. If n == 0 is valid, I guess we should change the condition to n < 0 in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about merging this first? We can change it immediately in the follow-up pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw new IllegalStateException("no bytes written");
}
written += n;
}
return written;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,21 @@ public long write(ByteBuffer[] srcs) throws IOException {

@Override
public int write(ByteBuffer src) throws IOException {
return delegate.write(src);
// If src has more than 1 byte left, simulate partial write by adjusting limit.
// Remaining 1 byte should be written on next call.
// This helps verify that the caller ensures buffer is written fully.
final int adjustment = 1;
final boolean limitWrite = src.remaining() > adjustment;
if (limitWrite) {
src.limit(src.limit() - adjustment);
}
try {
return delegate.write(src);
} finally {
if (limitWrite) {
src.limit(src.limit() + adjustment);
}
}
}

@Override
Expand Down
Loading