From 8a1940408eed4fc9e21fe2863ccb6c97299571c5 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Tue, 7 Jan 2020 10:44:09 +0800 Subject: [PATCH 1/2] [ARROW-7505][Java] Remove Netty dependency for ArrowBuf --- .../main/java/io/netty/buffer/ArrowBuf.java | 83 ++++++++++--------- .../apache/arrow/memory/util/MemoryUtil.java | 79 ++++++++++++++++++ 2 files changed, 121 insertions(+), 41 deletions(-) create mode 100644 java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java diff --git a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java index 89e699d0b42..1938e12af99 100644 --- a/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java +++ b/java/memory/src/main/java/io/netty/buffer/ArrowBuf.java @@ -35,10 +35,9 @@ import org.apache.arrow.memory.BufferManager; import org.apache.arrow.memory.ReferenceManager; import org.apache.arrow.memory.util.HistoricalLog; +import org.apache.arrow.memory.util.MemoryUtil; import org.apache.arrow.util.Preconditions; -import io.netty.util.internal.PlatformDependent; - /** * ArrowBuf serves as a facade over underlying memory by providing * several access APIs to read/write data into a chunk of direct @@ -349,7 +348,7 @@ private void checkIndexD(long index, long fieldLength) { */ public long getLong(long index) { chk(index, LONG_SIZE); - return PlatformDependent.getLong(addr(index)); + return MemoryUtil.UNSAFE.getLong(addr(index)); } /** @@ -361,7 +360,7 @@ public long getLong(long index) { */ public void setLong(long index, long value) { chk(index, LONG_SIZE); - PlatformDependent.putLong(addr(index), value); + MemoryUtil.UNSAFE.putLong(addr(index), value); } /** @@ -384,7 +383,7 @@ public float getFloat(long index) { */ public void setFloat(long index, float value) { chk(index, FLOAT_SIZE); - PlatformDependent.putInt(addr(index), Float.floatToRawIntBits(value)); + MemoryUtil.UNSAFE.putInt(addr(index), Float.floatToRawIntBits(value)); } /** @@ -407,7 +406,7 @@ public double getDouble(long index) { */ public void setDouble(long index, double value) { chk(index, DOUBLE_SIZE); - PlatformDependent.putLong(addr(index), Double.doubleToRawLongBits(value)); + MemoryUtil.UNSAFE.putLong(addr(index), Double.doubleToRawLongBits(value)); } /** @@ -430,7 +429,7 @@ public char getChar(long index) { */ public void setChar(long index, int value) { chk(index, SHORT_SIZE); - PlatformDependent.putShort(addr(index), (short) value); + MemoryUtil.UNSAFE.putShort(addr(index), (short) value); } /** @@ -442,7 +441,7 @@ public void setChar(long index, int value) { */ public int getInt(long index) { chk(index, INT_SIZE); - return PlatformDependent.getInt(addr(index)); + return MemoryUtil.UNSAFE.getInt(addr(index)); } /** @@ -454,7 +453,7 @@ public int getInt(long index) { */ public void setInt(long index, int value) { chk(index, INT_SIZE); - PlatformDependent.putInt(addr(index), value); + MemoryUtil.UNSAFE.putInt(addr(index), value); } /** @@ -466,7 +465,7 @@ public void setInt(long index, int value) { */ public short getShort(long index) { chk(index, SHORT_SIZE); - return PlatformDependent.getShort(addr(index)); + return MemoryUtil.UNSAFE.getShort(addr(index)); } /** @@ -489,7 +488,7 @@ public void setShort(long index, int value) { */ public void setShort(long index, short value) { chk(index, SHORT_SIZE); - PlatformDependent.putShort(addr(index), value); + MemoryUtil.UNSAFE.putShort(addr(index), value); } /** @@ -501,7 +500,7 @@ public void setShort(long index, short value) { */ public void setByte(long index, int value) { chk(index, 1); - PlatformDependent.putByte(addr(index), (byte) value); + MemoryUtil.UNSAFE.putByte(addr(index), (byte) value); } /** @@ -513,7 +512,7 @@ public void setByte(long index, int value) { */ public void setByte(long index, byte value) { chk(index, 1); - PlatformDependent.putByte(addr(index), value); + MemoryUtil.UNSAFE.putByte(addr(index), value); } /** @@ -525,7 +524,7 @@ public void setByte(long index, byte value) { */ public byte getByte(long index) { chk(index, 1); - return PlatformDependent.getByte(addr(index)); + return MemoryUtil.UNSAFE.getByte(addr(index)); } @@ -603,7 +602,7 @@ public void readBytes(byte[] dst) { */ public void writeByte(byte value) { ensureWritable(1); - PlatformDependent.putByte(addr(writerIndex), value); + MemoryUtil.UNSAFE.putByte(addr(writerIndex), value); ++writerIndex; } @@ -614,7 +613,7 @@ public void writeByte(byte value) { */ public void writeByte(int value) { ensureWritable(1); - PlatformDependent.putByte(addr(writerIndex), (byte)value); + MemoryUtil.UNSAFE.putByte(addr(writerIndex), (byte)value); ++writerIndex; } @@ -647,7 +646,7 @@ public void writeBytes(byte[] src, int srcIndex, int length) { */ public void writeShort(int value) { ensureWritable(SHORT_SIZE); - PlatformDependent.putShort(addr(writerIndex), (short) value); + MemoryUtil.UNSAFE.putShort(addr(writerIndex), (short) value); writerIndex += SHORT_SIZE; } @@ -657,7 +656,7 @@ public void writeShort(int value) { */ public void writeInt(int value) { ensureWritable(INT_SIZE); - PlatformDependent.putInt(addr(writerIndex), value); + MemoryUtil.UNSAFE.putInt(addr(writerIndex), value); writerIndex += INT_SIZE; } @@ -667,7 +666,7 @@ public void writeInt(int value) { */ public void writeLong(long value) { ensureWritable(LONG_SIZE); - PlatformDependent.putLong(addr(writerIndex), value); + MemoryUtil.UNSAFE.putLong(addr(writerIndex), value); writerIndex += LONG_SIZE; } @@ -677,7 +676,7 @@ public void writeLong(long value) { */ public void writeFloat(float value) { ensureWritable(FLOAT_SIZE); - PlatformDependent.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); + MemoryUtil.UNSAFE.putInt(addr(writerIndex), Float.floatToRawIntBits(value)); writerIndex += FLOAT_SIZE; } @@ -687,7 +686,7 @@ public void writeFloat(float value) { */ public void writeDouble(double value) { ensureWritable(DOUBLE_SIZE); - PlatformDependent.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); + MemoryUtil.UNSAFE.putLong(addr(writerIndex), Double.doubleToRawLongBits(value)); writerIndex += DOUBLE_SIZE; } @@ -753,7 +752,7 @@ public void getBytes(long index, byte[] dst, int dstIndex, int length) { if (length != 0) { // copy "length" bytes from this ArrowBuf starting at addr(index) address // into dst byte array at dstIndex onwards - PlatformDependent.copyMemory(addr(index), dst, dstIndex, (long)length); + MemoryUtil.UNSAFE.copyMemory(null, addr(index), dst, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, length); } } @@ -790,7 +789,7 @@ public void setBytes(long index, byte[] src, int srcIndex, long length) { if (length > 0) { // copy "length" bytes from src byte array at the starting index (srcIndex) // into this ArrowBuf starting at address "addr(index)" - PlatformDependent.copyMemory(src, srcIndex, addr(index), length); + MemoryUtil.UNSAFE.copyMemory(src, MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, addr(index), length); } } @@ -815,8 +814,8 @@ public void getBytes(long index, ByteBuffer dst) { // copy dst.remaining() bytes of data from this ArrowBuf starting // at address srcAddress into the dst ByteBuffer starting at // address dstAddress - final long dstAddress = PlatformDependent.directBufferAddress(dst) + (long)dst.position(); - PlatformDependent.copyMemory(srcAddress, dstAddress, (long)dst.remaining()); + final long dstAddress = MemoryUtil.getByteBufferAddress(dst) + dst.position(); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, dst.remaining()); // after copy, bump the next write position for the dst ByteBuffer dst.position(dst.position() + dst.remaining()); } else if (dst.hasArray()) { @@ -824,7 +823,8 @@ public void getBytes(long index, ByteBuffer dst) { // at address srcAddress into the dst ByteBuffer starting at // index dstIndex final int dstIndex = dst.arrayOffset() + dst.position(); - PlatformDependent.copyMemory(srcAddress, dst.array(), dstIndex, (long)dst.remaining()); + MemoryUtil.UNSAFE.copyMemory( + null, srcAddress, dst.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + dstIndex, dst.remaining()); // after copy, bump the next write position for the dst ByteBuffer dst.position(dst.position() + dst.remaining()); } else { @@ -851,34 +851,35 @@ public void setBytes(long index, ByteBuffer src) { if (src.isDirect()) { // copy src.remaining() bytes of data from src ByteBuffer starting at // address srcAddress into this ArrowBuf starting at address dstAddress - final long srcAddress = PlatformDependent.directBufferAddress(src) + (long)src.position(); - PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length); + final long srcAddress = MemoryUtil.getByteBufferAddress(src) + src.position(); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length); // after copy, bump the next read position for the src ByteBuffer src.position(src.position() + length); } else if (src.hasArray()) { // copy src.remaining() bytes of data from src ByteBuffer starting at // index srcIndex into this ArrowBuf starting at address dstAddress final int srcIndex = src.arrayOffset() + src.position(); - PlatformDependent.copyMemory(src.array(), srcIndex, dstAddress, (long)length); + MemoryUtil.UNSAFE.copyMemory( + src.array(), MemoryUtil.BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddress, length); // after copy, bump the next read position for the src ByteBuffer src.position(src.position() + length); } else { // copy word at a time while (length - 128 >= LONG_SIZE) { for (int x = 0; x < 16; x++) { - PlatformDependent.putLong(dstAddress, src.getLong()); + MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong()); length -= LONG_SIZE; dstAddress += LONG_SIZE; } } while (length >= LONG_SIZE) { - PlatformDependent.putLong(dstAddress, src.getLong()); + MemoryUtil.UNSAFE.putLong(dstAddress, src.getLong()); length -= LONG_SIZE; dstAddress += LONG_SIZE; } // copy last byte while (length > 0) { - PlatformDependent.putByte(dstAddress, src.get()); + MemoryUtil.UNSAFE.putByte(dstAddress, src.get()); --length; ++dstAddress; } @@ -903,9 +904,9 @@ public void setBytes(long index, ByteBuffer src, int srcIndex, int length) { if (src.isDirect()) { // copy length bytes of data from src ByteBuffer starting at address // srcAddress into this ArrowBuf at address dstAddress - final long srcAddress = PlatformDependent.directBufferAddress(src) + srcIndex; + final long srcAddress = MemoryUtil.getByteBufferAddress(src) + srcIndex; final long dstAddress = addr(index); - PlatformDependent.copyMemory(srcAddress, dstAddress, length); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length); } else { if (srcIndex == 0 && src.capacity() == length) { // copy the entire ByteBuffer from start to end of length @@ -945,7 +946,7 @@ public void getBytes(long index, ArrowBuf dst, int dstIndex, int length) { // dstAddress final long srcAddress = addr(index); final long dstAddress = dst.memoryAddress() + (long)dstIndex; - PlatformDependent.copyMemory(srcAddress, dstAddress, (long)length); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length); } } @@ -975,7 +976,7 @@ public void setBytes(long index, ArrowBuf src, long srcIndex, long length) { // dstAddress final long srcAddress = src.memoryAddress() + srcIndex; final long dstAddress = addr(index); - PlatformDependent.copyMemory(srcAddress, dstAddress, length); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length); } } @@ -995,7 +996,7 @@ public void setBytes(long index, ArrowBuf src) { checkIndex(index, length); final long srcAddress = src.memoryAddress() + src.readerIndex; final long dstAddress = addr(index); - PlatformDependent.copyMemory(srcAddress, dstAddress, length); + MemoryUtil.UNSAFE.copyMemory(null, srcAddress, null, dstAddress, length); src.readerIndex(src.readerIndex + length); } @@ -1020,7 +1021,7 @@ public int setBytes(long index, InputStream in, int length) throws IOException { if (readBytes > 0) { // copy readBytes length of data from the tmp byte array starting // at srcIndex 0 into this ArrowBuf starting at address addr(index) - PlatformDependent.copyMemory(tmp, 0, addr(index), readBytes); + MemoryUtil.UNSAFE.copyMemory(tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, null, addr(index), readBytes); } } return readBytes; @@ -1042,7 +1043,7 @@ public void getBytes(long index, OutputStream out, int length) throws IOExceptio // copy length bytes of data from this ArrowBuf starting at // address addr(index) into the tmp byte array starting at index 0 byte[] tmp = new byte[length]; - PlatformDependent.copyMemory(addr(index), tmp, 0, length); + MemoryUtil.UNSAFE.copyMemory(null, addr(index), tmp, MemoryUtil.BYTE_ARRAY_BASE_OFFSET, length); // write the copied data to output stream out.write(tmp); } @@ -1170,7 +1171,7 @@ public ArrowBuf writerIndex(long writerIndex) { public ArrowBuf setZero(long index, long length) { if (length != 0) { this.checkIndex(index, length); - PlatformDependent.setMemory(this.addr + index, length, (byte) 0); + MemoryUtil.UNSAFE.setMemory(this.addr + index, length, (byte) 0); } return this; } @@ -1185,7 +1186,7 @@ public ArrowBuf setZero(long index, long length) { public ArrowBuf setOne(int index, int length) { if (length != 0) { this.checkIndex(index, length); - PlatformDependent.setMemory(this.addr + index, length, (byte) 0xff); + MemoryUtil.UNSAFE.setMemory(this.addr + index, length, (byte) 0xff); } return this; } diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java new file mode 100644 index 00000000000..3775daf0a3a --- /dev/null +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory.util; + +import java.lang.reflect.Field; +import java.nio.ByteBuffer; + +import io.netty.buffer.ByteBuf; +import sun.misc.Unsafe; + +/** + * Utilities for memory related operations. + */ +public class MemoryUtil { + + /** + * The unsafe object from which to access the off-heap memory. + */ + public static final Unsafe UNSAFE; + + /** + * The start offset of array data relative to the start address of the array object. + */ + public static final long BYTE_ARRAY_BASE_OFFSET; + + /** + * The offset of the address field with the {@link java.nio.ByteBuffer} object. + */ + static final long BYTE_BUFFER_ADDRESS_OFFSET; + + static { + // get the unsafe object + try { + Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); + unsafeField.setAccessible(true); + UNSAFE = (Unsafe) unsafeField.get(null); + } catch (Throwable e) { + throw new Error("Failed to get the unsafe object.", e); + } + + BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + try { + Field addressField = java.nio.Buffer.class.getDeclaredField("address"); + addressField.setAccessible(true); + BYTE_BUFFER_ADDRESS_OFFSET = UNSAFE.objectFieldOffset(addressField); + } catch (NoSuchFieldException e) { + throw new Error("Failed to get java.nio.Buffer#address field.", e); + } + } + + /** + * Given a {@link ByteBuf}, gets the address the underlying memory space. + * + * @param buf the byte buffer. + * @return address of the underlying memory. + */ + public static long getByteBufferAddress(ByteBuffer buf) { + return UNSAFE.getLong(buf, BYTE_BUFFER_ADDRESS_OFFSET); + } + + private MemoryUtil() { + } +} From 97b84ae9b9532a87d223ba336105296d0f89ece9 Mon Sep 17 00:00:00 2001 From: liyafan82 Date: Fri, 7 Feb 2020 21:29:29 +0800 Subject: [PATCH 2/2] [ARROW-7505][Java] Mimic PlatformDependent implementation --- .../apache/arrow/memory/util/MemoryUtil.java | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java index 3775daf0a3a..4a83fd7426c 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/util/MemoryUtil.java @@ -19,8 +19,11 @@ import java.lang.reflect.Field; import java.nio.ByteBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; import io.netty.buffer.ByteBuf; +import io.netty.util.internal.ReflectionUtil; import sun.misc.Unsafe; /** @@ -44,23 +47,39 @@ public class MemoryUtil { static final long BYTE_BUFFER_ADDRESS_OFFSET; static { - // get the unsafe object try { - Field unsafeField = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); - unsafeField.setAccessible(true); - UNSAFE = (Unsafe) unsafeField.get(null); - } catch (Throwable e) { - throw new Error("Failed to get the unsafe object.", e); - } + // try to get the unsafe object + final Object maybeUnsafe = AccessController.doPrivileged(new PrivilegedAction() { + @Override + public Object run() { + try { + final Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe"); + Throwable cause = ReflectionUtil.trySetAccessible(unsafeField, false); + if (cause != null) { + return cause; + } + return unsafeField.get(null); + } catch (Throwable e) { + return e; + } + } + }); - BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + if (maybeUnsafe instanceof Throwable) { + throw (Throwable) maybeUnsafe; + } - try { + UNSAFE = (Unsafe) maybeUnsafe; + + // get the offset of the data inside a byte array object + BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + + // get the offset of the address field in a java.nio.Buffer object Field addressField = java.nio.Buffer.class.getDeclaredField("address"); addressField.setAccessible(true); BYTE_BUFFER_ADDRESS_OFFSET = UNSAFE.objectFieldOffset(addressField); - } catch (NoSuchFieldException e) { - throw new Error("Failed to get java.nio.Buffer#address field.", e); + } catch (Throwable e) { + throw new RuntimeException("Failed to initialize MemoryUtil.", e); } }