Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AutoBufferLedger #31

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cpp/src/arrow/jniutil/jni_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,15 @@ class ReservationListenableMemoryPool::Impl {
}
int64_t bytes_granted = (new_block_count - blocks_reserved_) * block_size_;
blocks_reserved_ = new_block_count;
if (bytes_reserved_ > max_bytes_reserved_) {
max_bytes_reserved_ = bytes_reserved_;
}
return bytes_granted;
}

int64_t bytes_allocated() { return pool_->bytes_allocated(); }
int64_t bytes_allocated() const { return bytes_reserved_; }

int64_t max_memory() { return pool_->max_memory(); }
int64_t max_memory() const { return max_bytes_reserved_; }

std::string backend_name() { return pool_->backend_name(); }

Expand All @@ -128,6 +131,7 @@ class ReservationListenableMemoryPool::Impl {
int64_t block_size_;
int64_t blocks_reserved_;
int64_t bytes_reserved_;
int64_t max_bytes_reserved_ = 0L;
std::mutex mutex_;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.arrow.memory.util.MemoryUtil;
import org.apache.arrow.util.VisibleForTesting;

/**
Expand All @@ -29,20 +30,6 @@
* "-XX:MaxDirectMemorySize".
*/
public class DirectReservationListener implements ReservationListener {
private final Method methodReserve;
private final Method methodUnreserve;

private DirectReservationListener() {
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
methodReserve = classBits.getDeclaredMethod("reserveMemory", long.class, int.class);
methodReserve.setAccessible(true);
methodUnreserve = classBits.getDeclaredMethod("unreserveMemory", long.class, int.class);
methodUnreserve.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static final DirectReservationListener INSTANCE = new DirectReservationListener();

Expand All @@ -55,43 +42,22 @@ public static DirectReservationListener instance() {
*/
@Override
public void reserve(long size) {
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("reserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodReserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
MemoryUtil.reserveDirectMemory(size);
}

/**
* Unreserve bytes by invoking java.nio.java.Bitjava.nio.Bitss#unreserveMemory.
*/
@Override
public void unreserve(long size) {
try {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("unreserve size should not be larger than Integer.MAX_VALUE (0x7fffffff)");
}
methodUnreserve.invoke(null, (int) size, (int) size);
} catch (Exception e) {
throw new RuntimeException(e);
}
MemoryUtil.unreserveDirectMemory(size);
}

/**
* Get current reservation of jVM direct memory. Visible for testing.
*/
@VisibleForTesting
public long getCurrentDirectMemReservation() {
try {
final Class<?> classBits = Class.forName("java.nio.Bits");
final Field f = classBits.getDeclaredField("reservedMemory");
f.setAccessible(true);
return ((AtomicLong) f.get(null)).get();
} catch (Exception e) {
throw new RuntimeException(e);
}
return MemoryUtil.getCurrentDirectMemReservation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
import org.apache.arrow.flatbuf.Message;
import org.apache.arrow.flatbuf.MessageHeader;
import org.apache.arrow.flatbuf.RecordBatch;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.BufferLedger;
import org.apache.arrow.memory.NativeUnderlyingMemory;
import org.apache.arrow.memory.*;
import org.apache.arrow.memory.util.LargeMemoryUtil;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.compression.NoCompressionCodec;
Expand Down Expand Up @@ -131,8 +128,8 @@ public static ArrowRecordBatch deserializeUnsafe(
final int size = LargeMemoryUtil.checkedCastToInt(bufferMeta.length());
final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
size, nativeBufferRef, bufferMeta.offset());
BufferLedger ledger = am.associate(allocator);
ArrowBuf buf = new ArrowBuf(ledger, null, size, bufferMeta.offset());
ReferenceManager rm = am.createReferenceManager(allocator);
ArrowBuf buf = new ArrowBuf(rm, null, size, bufferMeta.offset());
buffers.add(buf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int
return new NativeUnderlyingMemory(bufferAllocator, size, nativeBufferId, address);
}

public BufferLedger associate(BufferAllocator allocator) {
return super.associate(allocator);
public ReferenceManager createReferenceManager(BufferAllocator allocator) {
return super.associate(allocator).newReferenceManager();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private BufferLedger associate(final BufferAllocator allocator, final boolean re
return ledger;
}

ledger = new BufferLedger(allocator, this);
ledger = allocator.getBufferLedgerFactory().create(allocator, this);

if (retain) {
// the new reference manager will have a ref count of 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,15 @@ public long getId() {
return id;
}

/**
* Create a logger of this {@link ArrowBuf}.
*
* @return the newly created logger
*/
Logger createLogger() {
return new Logger(id, memoryAddress(), length, historicalLog);
}

/**
* Prints information of this buffer into <code>sb</code> at the given
* indentation and verbosity level.
Expand All @@ -1103,12 +1112,7 @@ public long getId() {
*
*/
public void print(StringBuilder sb, int indent, Verbosity verbosity) {
CommonUtil.indent(sb, indent).append(toString());

if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
sb.append("\n");
historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
new Logger(id, addr, length, historicalLog).print(sb, indent, verbosity);
}

/**
Expand Down Expand Up @@ -1242,4 +1246,36 @@ public ArrowBuf setIndex(int readerIndex, int writerIndex) {
}
}

/**
* Create a logger for an {@link ArrowBuf}. This is currently used in debugging or historical logging
* in code of {@link BufferLedger} to avoid directly holding a strong reference to {@link ArrowBuf}.
* So that GC could be able to involved in auto cleaning logic in {@link AutoBufferLedger}.
*/
static class Logger {
private final long id;
private final long addr;
private final long length;
private final HistoricalLog historicalLog;

public Logger(long id, long addr, long length, HistoricalLog historicalLog) {
this.id = id;
this.addr = addr;
this.length = length;
this.historicalLog = historicalLog;
}

public void print(StringBuilder sb, int indent, Verbosity verbosity) {
CommonUtil.indent(sb, indent).append(toString());

if (BaseAllocator.DEBUG && verbosity.includeHistoricalLog) {
sb.append("\n");
historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
}

@Override
public String toString() {
return String.format("ArrowBuf.Logger[%d], address:%d, length:%d", id, addr, length);
}
}
}
Loading