Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions java/adapter/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
<dependency>
<groupId>org.apache.arrow</groupId>
Expand Down
3 changes: 1 addition & 2 deletions java/adapter/jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,9 @@
<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-memory -->
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.arrow/arrow-vector -->
<dependency>
<groupId>org.apache.arrow</groupId>
Expand Down
2 changes: 1 addition & 1 deletion java/adapter/orc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion java/algorithm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion java/flight/flight-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion java/flight/flight-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion java/gandiva/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory</artifactId>
<artifactId>arrow-memory-netty</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
Expand Down
8 changes: 0 additions & 8 deletions java/memory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,6 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
final AllocationListener listener;
private final BaseAllocator parentAllocator;
private final ArrowByteBufAllocator thisAsByteBufAllocator;
//private final ArrowByteBufAllocator thisAsByteBufAllocator;
private final Map<BaseAllocator, Object> childAllocators;
private final ArrowBuf empty;
// members used purely for debugging
Expand Down Expand Up @@ -99,7 +99,7 @@ protected BaseAllocator(
this.parentAllocator = parentAllocator;
this.name = name;

this.thisAsByteBufAllocator = new ArrowByteBufAllocator(this);
//this.thisAsByteBufAllocator = new ArrowByteBufAllocator(this);
this.childAllocators = Collections.synchronizedMap(new IdentityHashMap<>());

if (DEBUG) {
Expand Down Expand Up @@ -281,7 +281,7 @@ public ArrowBuf buffer(final long initialRequestSize) {
}

private ArrowBuf createEmpty() {
return new ArrowBuf(ReferenceManager.NO_OP, null, 0, NettyAllocationManager.EMPTY.memoryAddress(), true);
return new ArrowBuf(ReferenceManager.NO_OP, null, 0, 0, true);
}

@Override
Expand Down Expand Up @@ -355,10 +355,10 @@ private AllocationManager newAllocationManager(BaseAllocator accountingAllocator
return allocationManagerFactory.create(accountingAllocator, size);
}

@Override
public ArrowByteBufAllocator getAsByteBufAllocator() {
return thisAsByteBufAllocator;
}
//@Override
//public ArrowByteBufAllocator getAsByteBufAllocator() {
// return thisAsByteBufAllocator;
//}

@Override
public BufferAllocator newChildAllocator(
Expand Down Expand Up @@ -765,7 +765,7 @@ abstract static class Config {
*/
@Value.Default
AllocationManager.Factory getAllocationManagerFactory() {
return NettyAllocationManager.FACTORY;
return TrivialAllocationManager.FACTORY;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,45 @@ public RootAllocator(final AllocationListener listener, final long limit) {
/**
* Constructor.
*
* @param listener the allocation listener
* @param limit max allocation size in bytes
* @param roundingPolicy the policy for rounding the buffer size
* @param listener the allocation listener
* @param limit max allocation size in bytes
* @param roundingPolicy the policy for rounding the buffer size
*/
public RootAllocator(final AllocationListener listener, final long limit, RoundingPolicy roundingPolicy) {
this(configBuilder()
.listener(listener)
.maxAllocation(limit)
.roundingPolicy(roundingPolicy)
.allocationManagerFactory(getFactory())
.build()
);
}

public RootAllocator(Config config) {
super(null, "ROOT", config);

}

private static AllocationManager.Factory getFactory() {
String className = System.getProperty("arrow.default.allocation-manager");
if (className == null) {
try {
return getFactory("org.apache.arrow.memory.NettyAllocationManager");
} catch (Throwable t) {
return getFactory("org.apache.arrow.memory.TrivialAllocationManager");
}
} else {
return getFactory(className);
}
}

private static AllocationManager.Factory getFactory(String className) {
try {
Class<?> clazz = Class.forName(className);
return (AllocationManager.Factory) clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new RuntimeException("Could not instantiate AllocationManager with type " + className, t);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.memory;

import java.lang.reflect.Field;

import org.apache.arrow.memory.util.LargeMemoryUtil;

import sun.misc.Unsafe;

/**
* trivial allocation manager for the case when no other allocation manager exists.
*/
public class TrivialAllocationManager extends AllocationManager {

public static final Factory FACTORY = new Factory();

private final Unsafe unsafe = getUnsafe();
private final long address;
private final int requestedSize;

protected TrivialAllocationManager(BaseAllocator accountingAllocator, int requestedSize) {
super(accountingAllocator);
this.requestedSize = requestedSize;
address = unsafe.allocateMemory(requestedSize);
}

@Override
protected long memoryAddress() {
return address;
}

@Override
protected void release0() {
unsafe.setMemory(address, requestedSize, (byte) 0);
unsafe.freeMemory(address);
}

@Override
public long getSize() {
return requestedSize;
}

private Unsafe getUnsafe() {
Field f = null;
try {
f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe) f.get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
} finally {
if (f != null) {
f.setAccessible(false);
}
}
}

/**
* Factory for creating {@link TrivialAllocationManager}.
*/
public static class Factory implements AllocationManager.Factory {

@Override
public AllocationManager create(BaseAllocator accountingAllocator, long size) {
return new TrivialAllocationManager(accountingAllocator, LargeMemoryUtil.checkedCastToInt(size));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.lang.reflect.Field;

import org.apache.arrow.memory.BaseAllocator;
import org.apache.arrow.memory.NettyAllocationManager;

/**
* The default rounding policy. That is, if the requested size is within the chunk size,
Expand All @@ -38,7 +37,8 @@ public class DefaultRoundingPolicy implements RoundingPolicy {

private DefaultRoundingPolicy() {
try {
Field field = NettyAllocationManager.class.getDeclaredField("CHUNK_SIZE");
Class<?> clazz = Class.forName("org.apache.arrow.memory.NettyAllocationManager");
Field field = clazz.getDeclaredField("CHUNK_SIZE");
field.setAccessible(true);
chunkSize = (Long) field.get(null);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@

package org.apache.arrow.memory.util;

import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;

import io.netty.buffer.ByteBuf;
import io.netty.util.internal.ReflectionUtil;
import sun.misc.Unsafe;

/**
Expand Down Expand Up @@ -54,7 +53,7 @@ public class MemoryUtil {
public Object run() {
try {
final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
Throwable cause = ReflectionUtil.trySetAccessible(unsafeField, false);
Throwable cause = trySetAccessible(unsafeField, false);
if (cause != null) {
return cause;
}
Expand Down Expand Up @@ -84,7 +83,7 @@ public Object run() {
}

/**
* Given a {@link ByteBuf}, gets the address the underlying memory space.
* Given a {@link ByteBuffer}, gets the address the underlying memory space.
*
* @param buf the byte buffer.
* @return address of the underlying memory.
Expand All @@ -95,4 +94,45 @@ public static long getByteBufferAddress(ByteBuffer buf) {

private MemoryUtil() {
}

/**
* Try to call {@link AccessibleObject#setAccessible(boolean)} but will catch any {@link SecurityException} and
* {@link java.lang.reflect.InaccessibleObjectException} and return it.
* The caller must check if it returns {@code null} and if not handle the returned exception.
*/
private static Throwable trySetAccessible(AccessibleObject object, boolean checkAccessible) {
if (checkAccessible && javaVersion() > 9) {
return new UnsupportedOperationException("Reflective setAccessible(true) disabled");
}
try {
object.setAccessible(true);
return null;
} catch (SecurityException e) {
return e;
} catch (RuntimeException e) {
if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {
return e;
}
throw e;
}
}

private static int javaVersion() {
return majorVersion(System.getProperty("java.specification.version", "1.6"));
}

private static int majorVersion(final String javaSpecVersion) {
final String[] components = javaSpecVersion.split("\\.");
final int[] version = new int[components.length];
for (int i = 0; i < components.length; i++) {
version[i] = Integer.parseInt(components[i]);
}

if (version[0] == 1) {
assert version[1] >= 6;
return version[1];
} else {
return version[0];
}
}
}
Loading