Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java] Unable to update VectorSchemaRoot field vectors. #46

Open
hnasrullakhan opened this issue Oct 15, 2024 · 12 comments
Open

[Java] Unable to update VectorSchemaRoot field vectors. #46

hnasrullakhan opened this issue Oct 15, 2024 · 12 comments
Labels
Type: usage usage question

Comments

@hnasrullakhan
Copy link

hnasrullakhan commented Oct 15, 2024

Describe the usage question you have. Please include as many useful details as possible.

Hi community,

I have a trying to modify data buffers of string column.
if i print the buffer readable bytes after buffers.set(index, compressedBuf);, i see the right value.

 List<ArrowBuf> buffers = fieldVector.getFieldBuffers();
                    int index = buffers.size() - 1;
                    ArrowBuf compressedBuf = null;
                    ArrowBuf originalBuf = buffers.get(index);
                    ByteBuffer originalBuffer = originalBuf.nioBuffer(0, (int) originalBuf.readableBytes());

                    // Compress the buffer using LZ4
                    byte[] originalBytes = new byte[originalBuffer.remaining()];
                    System.out.println("This is buffer id: "+ index + " originalbytes: "+ originalBuf.readableBytes());
                    originalBuffer.get(originalBytes);

                    int maxCompressedLength = compressor.maxCompressedLength(originalBytes.length);
                    byte[] compressedBytes = new byte[maxCompressedLength];
                    int compressedLength = compressor.compress(originalBytes, 0, originalBytes.length, compressedBytes, 0, maxCompressedLength);

                    // Allocate a new ArrowBuf for the compressed data
                    compressedBuf = allocator.buffer(compressedLength);

                    compressedBuf.writeBytes(compressedBytes, 0, compressedLength);
                    compressedBuf.writerIndex(compressedLength);
                    System.out.println("This is buffer id: "+ index+ " compressedbytes: "+ compressedBuf.readableBytes());

                    buffers.set(index, compressedBuf);  // Replace with compressed buffer

When i update my fieldvector.

int nullCount = fieldVector.getNullCount();  // Get the null count
                    ArrowFieldNode fieldNode = new ArrowFieldNode(fieldVectorValueCount, nullCount);
                    ArrowBuf validityBuffer = fieldVector.getValidityBuffer(); // Assuming it exists
                    ArrowBuf offsetBuffer = fieldVector.getOffsetBuffer(); // Assuming it exists

                    // Update FieldVector with the new compressed buffers
                    fieldVector.clear();  // Clear existing buffers (to prevent memory leaks)
                    fieldVector.loadFieldBuffers(fieldNode,java.util.Arrays.asList(validityBuffer, offsetBuffer, compressedBuf));  // Load the new (compressed) buffers
                    fieldVector.setValueCount(fieldVectorValueCount);  // Reset value count after buffer replacement
                    Field field1 = fieldVector.getField();

and i check the buffers again then buffers reflect old value.

please advise .

Component(s)

Java

@hnasrullakhan
Copy link
Author

hnasrullakhan commented Oct 15, 2024

vibhatha / lidavidm Can you please help .

@hnasrullakhan hnasrullakhan changed the title Unable to update VectorSchemaRoot field vectors. [JAVA] Unable to update VectorSchemaRoot field vectors. Oct 15, 2024
@vibhatha
Copy link
Contributor

Right, so if I simplify this, you want to know how to modify ArrowBuf, right?

@kou kou changed the title [JAVA] Unable to update VectorSchemaRoot field vectors. [Java] Unable to update VectorSchemaRoot field vectors. Oct 15, 2024
@vibhatha
Copy link
Contributor

You could try something like the following, but since this is not using the recommended high level APIs of Vector, user has to take responsibility in reserving memory and keeping track of buffers if they are explicitly created.

A naive example of shifting ["Hello", "World"] vector by shifting the byte value by 1.

try (VarCharVector v = new VarCharVector("myvec", allocator)) {
      v.allocateNewSafe();
      v.set(0, new Text("Hello"));
      v.set(1, new Text("World"));
      v.setValueCount(2);

      System.out.println("Vector");
      System.out.println(v);

      final ArrowBuf dataBuffer = v.getDataBuffer();
      byte[] resAtZero = new byte[10];
      dataBuffer.getBytes(0, resAtZero);
      for (int i = 0; i < resAtZero.length; i++) {
        System.out.print(resAtZero[i] + ", ");
        resAtZero[i] += 1;
      }

      dataBuffer.setBytes(0, resAtZero);
      System.out.println();
      System.out.println("After");

      System.out.println(v);
      
      /*
      * 
      * Vector
        [Hello, World]
        72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 
        After
        [Ifmmp, Xpsme]
      * 
      */
    }

@vibhatha
Copy link
Contributor

Or...

try (VarCharVector v = new VarCharVector("myvec", allocator)) {
      v.allocateNewSafe();
      v.set(0, new Text("Hello"));
      v.set(1, new Text("World"));
      v.setValueCount(2);

      System.out.println("Vector");
      System.out.println(v);

      final ArrowBuf dataBuffer = v.getDataBuffer();
      final ArrowBuf validityBuffer = v.getValidityBuffer();
      final ArrowBuf offsetBuffer = v.getOffsetBuffer();

      try (final ArrowBuf newDataBuffer = allocator.buffer(dataBuffer.capacity())) {
        byte[] resAtZero = new byte[20];
        dataBuffer.getBytes(0, resAtZero);

        for (int i = 0; i < resAtZero.length; i++) {
          System.out.print(resAtZero[i] + ", ");
          resAtZero[i] += 1;
        }

        newDataBuffer.writeBytes(resAtZero, 0, resAtZero.length);
        newDataBuffer.writerIndex(dataBuffer.writerIndex());

        // dataBuffer.setBytes(0, resAtZero);

        newDataBuffer.readerIndex(0);

        ArrowFieldNode fieldNode = new ArrowFieldNode(v.getValueCount(), v.getNullCount());
        v.loadFieldBuffers(fieldNode, List.of(validityBuffer, offsetBuffer, newDataBuffer));

        System.out.println();
        System.out.println("After");

        System.out.println(v);
      }

      /*
      *
      * Vector
        [Hello, World]
        72, 101, 108, 108, 111, 87, 111, 114, 108, 100,
        After
        [Ifmmp, Xpsme]
      *
      */
    }

@hnasrullakhan
Copy link
Author

hnasrullakhan commented Oct 15, 2024

@vibhatha Thanks

 Field field = v.getField();

                    if (field.getType().getTypeID() == org.apache.gluten.shaded.org.apache.arrow.vector.types.pojo.ArrowType.Utf8.TYPE_TYPE) {
                        System.out.println("Vector");
                        System.out.println(v);
                        final int valueCount = v.getValueCount();
                        final ArrowBuf dataBuffer = v.getDataBuffer();
                        final ArrowBuf validityBuffer = v.getValidityBuffer();
                        final ArrowBuf offsetBuffer = v.getOffsetBuffer();
                        int maxCompressedLength = compressor.maxCompressedLength((int) dataBuffer.readableBytes());
                        byte[] compressedBytes = new byte[maxCompressedLength];
                        ByteBuffer originalBuffer = dataBuffer.nioBuffer(0, (int) dataBuffer.readableBytes());
                        byte[] originalBytes = new byte[originalBuffer.remaining()];
                        System.out.println( " originalbytes: "+ originalBytes.length);
                        originalBuffer.get(originalBytes);
                        int compressedLength = compressor.compress(originalBytes, 0, originalBytes.length, compressedBytes, 0, maxCompressedLength);
                        System.out.println( " compressedLength: "+ compressedLength);

                        try (final ArrowBuf newDataBuffer = allocator.buffer(compressedLength)) {

                            if (compressedLength > newDataBuffer.capacity()) {
                                throw new IllegalStateException("Compressed data exceeds buffer capacity.");
                            }
                            System.out.println( " compressedBytes: "+ compressedBytes.length);

                            newDataBuffer.writeBytes(compressedBytes, 0, compressedLength);
                            System.out.println("writer index before="+dataBuffer.writerIndex());
                            newDataBuffer.writerIndex(compressedLength);
                            System.out.println("writer index after="+newDataBuffer.writerIndex());
                            newDataBuffer.readerIndex(0);
                            dataBuffer.readerIndex(0);

                            ArrowFieldNode fieldNode = new ArrowFieldNode(v.getValueCount(), v.getNullCount());
                            v.loadFieldBuffers(fieldNode, List.of(validityBuffer, offsetBuffer, newDataBuffer));
                            System.out.println("newDataBuffer="+newDataBuffer.readableBytes());
                            final ArrowBuf mewDBuffer = v.getDataBuffer();
                            System.out.println("mewDBuffer="+mewDBuffer.readableBytes());
                          System.out.println(v);

                        }
                    }

I tried this and loadFieldBuffers doesnt load newdata

Vector
[CMT, CMT, VTS, CMT, VTS, CMT, VTS, CMT, VTS, CMT, ... CMT, CMT, CMT, CMT, CMT, CMT, CMT, CMT, CMT, CMT]
 originalbytes: 12288
 compressedLength: 348
 compressedBytes: 12352
writer index before=12288
writer index after=348
newDataBuffer=348
mewDBuffer=348

// I see that new databuffer shows right compression value
last System.out.println(v); fails with error below

java.lang.IndexOutOfBoundsException: index: 12258, length: 3 (expected: range(0, 512))
	at org.apache.gluten.shaded.org.apache.arrow.memory.ArrowBuf.checkIndex(ArrowBuf.java:701)
	at org.apache.gluten.shaded.org.apache.arrow.memory.ArrowBuf.getBytes(ArrowBuf.java:728)
	at org.apache.gluten.shaded.org.apache.arrow.vector.util.ReusableByteArray.set(ReusableByteArray.java:63)
	at org.apache.gluten.shaded.org.apache.arrow.vector.VarCharVector.read(VarCharVector.java:142)
	at org.apache.gluten.shaded.org.apache.arrow.vector.VarCharVector.getObject(VarCharVector.java:128)
	at org.apache.gluten.shaded.org.apache.arrow.vector.VarCharVector.getObject(VarCharVector.java:40)
	at org.apache.gluten.shaded.org.apache.arrow.vector.util.ValueVectorUtility.lambda$getToString$0(ValueVectorUtility.java:58)
	at org.apache.gluten.shaded.org.apache.arrow.vector.util.ValueVectorUtility.getToString(ValueVectorUtility.java:95)
	at org.apache.gluten.shaded.org.apache.arrow.vector.util.ValueVectorUtility.getToString(ValueVectorUtility.java:58)
	at org.apache.gluten.shaded.org.apache.arrow.vector.BaseValueVector.toString(BaseValueVector.java:68)
	at java.base/java.lang.String.valueOf(String.java:2951)
	at java.base/java.io.PrintStream.println(PrintStream.java:897)

And VectorSchemaRoot from which this fieldvectors are extracted still use old fieldVector only

@hnasrullakhan
Copy link
Author

hnasrullakhan commented Oct 15, 2024

java.lang.IndexOutOfBoundsException: index: 0, length: 12288 (expected: range(0, 512))
	at org.apache.gluten.shaded.org.apache.arrow.memory.ArrowBuf.checkIndexD(ArrowBuf.java:319)
	at org.apache.gluten.shaded.org.apache.arrow.memory.ArrowBuf.chk(ArrowBuf.java:306)
	at org.apache.gluten.shaded.org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:223)
	at org.apache.gluten.shaded.org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
	at org.apache.gluten.shaded.org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
	at org.apache.gluten.shaded.org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
	at org.apache.gluten.shaded.org.apache.arrow.vector.ipc.ArrowWriter.writeRecordBatch(ArrowWriter.java:160)
	at org.apache.gluten.shaded.org.apache.arrow.vector.ipc.ArrowWriter.writeBatch(ArrowWriter.java:124)

if I try to serialize using ArrowStreamWriter, it uses older offsets.
Infact the vectorschemaroot readable bytes before and after are the same.

originalvectorSchema readableBytes=627084
newVsr readableBytes=627084

i tried to use v.setValueCount(valueCount); and it reset the field vector to orignal
12288 is original databuffer length.

@hnasrullakhan
Copy link
Author

hnasrullakhan commented Oct 15, 2024

My use case:

  1. Get string columns (field vectors) from vectorschemaroot,
  2. lz4 compress databuffers from field vector of string column.
  3. update the original vectorschemaroot.
  4. serialize the vectorschema root using arrowstreamwriter.
    Can we retain original offsetbuffer and validitybuffer ?

@hnasrullakhan
Copy link
Author

 List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
            for (FieldVector fieldVector : fieldVectors) {
//                System.out.println("fieldVector value column: "+ fieldVector.getValueCount());
                int fieldVectorValueCount = fieldVector.getValueCount();
                Field field = fieldVector.getField();

does this make a copy of field vectors and not update original ?

@vibhatha
Copy link
Contributor

If you change the copy of the object, then you have to set it back. I am not sure if you can do the in place setting. Also when playing with buffers, make sure to clear existing and set the updated ones. Also when mutating buffers manually which we don't recommend, the developer has to take responsibility in managing memory. If new buffers were created, they need to be released (i.e use try-with-resource).

@vibhatha
Copy link
Contributor

Right, I understand your problem. Just to save time for both of us. May I get a minimal code snippet with the end-to-end workflow? Then I can debug and try to get a version working? Would that be okay?

@hnasrullakhan
Copy link
Author

 public VectorSchemaRoot  lz4CompressStringColumns1(VectorSchemaRoot vectorSchemaRoot) throws IOException {
        System.out.println("Newlz4CompressStringColumns start originalReadableBytes: "+ totaReadableBytes(vectorSchemaRoot));
        try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
            int rowCount = vectorSchemaRoot.getRowCount();
            // Get LZ4 compressor instance
            LZ4Factory factory = LZ4Factory.fastestInstance();
            LZ4Compressor compressor = factory.fastCompressor();

            // Compress string columns using LZ4
            int column = 0;
            int allColumns = vectorSchemaRoot.getFieldVectors().size();
            System.out.println("all columns="+ allColumns);
//            List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();

            for (int col=0; col< allColumns ; col++) {
                    FieldVector v = vectorSchemaRoot.getVector(col);
                    Field field = v.getField();

                    if (field.getType().getTypeID() == org.apache.gluten.shaded.org.apache.arrow.vector.types.pojo.ArrowType.Utf8.TYPE_TYPE) {
                        System.out.println("Vector");
                        System.out.println(v);
                        final int valueCount = v.getValueCount();
                        final ArrowBuf dataBuffer = v.getDataBuffer();
                        final ArrowBuf validityBuffer = v.getValidityBuffer();
                        final ArrowBuf offsetBuffer = v.getOffsetBuffer();
                        int maxCompressedLength = compressor.maxCompressedLength((int) dataBuffer.readableBytes());
                        byte[] compressedBytes = new byte[maxCompressedLength];
                        ByteBuffer originalBuffer = dataBuffer.nioBuffer(0, (int) dataBuffer.readableBytes());
                        byte[] originalBytes = new byte[originalBuffer.remaining()];
                        System.out.println( " originalbytes: "+ originalBytes.length);
                        originalBuffer.get(originalBytes);
                        int compressedLength = compressor.compress(originalBytes, 0, originalBytes.length, compressedBytes, 0, maxCompressedLength);
                        System.out.println( " compressedLength: "+ compressedLength);

                        try (final ArrowBuf newDataBuffer = allocator.buffer(compressedLength)) {

                            if (compressedLength > newDataBuffer.capacity()) {
                                throw new IllegalStateException("Compressed data exceeds buffer capacity.");
                            }
                            System.out.println( " compressedBytes: "+ compressedBytes.length);

                            newDataBuffer.writeBytes(compressedBytes, 0, compressedLength);
                            System.out.println("writer index before="+dataBuffer.writerIndex());
                            newDataBuffer.writerIndex(compressedLength);
                            System.out.println("writer index after="+newDataBuffer.writerIndex());
                            newDataBuffer.readerIndex(0);
                            dataBuffer.readerIndex(0);
                            v.setValueCount(valueCount);
                            ArrowFieldNode fieldNode = new ArrowFieldNode(v.getValueCount(), v.getNullCount());
                            v.loadFieldBuffers(fieldNode, List.of(validityBuffer, offsetBuffer, newDataBuffer));
                            System.out.println("newDataBuffer="+newDataBuffer.readableBytes());
                            final ArrowBuf mewDBuffer = v.getDataBuffer();
                            System.out.println("mewDBuffer="+mewDBuffer.readableBytes());
                        }
                    }

                column++;

            }
            System.out.println("lz4CompressStringColumns compressedReadableBytes: "+ totaReadableBytes(vectorSchemaRoot));

            return vectorSchemaRoot;
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;  // Rethrow any exceptions to handle them properly
        }
    }
 public byte[]  serializeVectorSchemaRoot(VectorSchemaRoot vectorSchemaRoot) throws IOException {
           ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            // Serialize the full VectorSchemaRoot to a byte array
        try (ArrowStreamWriter writer = new ArrowStreamWriter(vectorSchemaRoot, null, byteArrayOutputStream)) {
            writer.start();
            writer.writeBatch();
            writer.end();
        }
        catch (Exception e) {
            e.printStackTrace();
            throw e;  // Rethrow any exceptions to handle them properly
        }
        System.out.println("lz4CompressStringColumns end");
        return byteArrayOutputStream.toByteArray();
    }


    val vsrNew = arrowAbiUtil.lz4CompressStringColumns2(vectorSchema)

    val serialBytes2 = arrowAbiUtil.serializeVectorSchemaRoot(vsrNew)

@hnasrullakhan
Copy link
Author

@vibhatha need your help

@assignUser assignUser transferred this issue from apache/arrow Nov 26, 2024
@assignUser assignUser added the Type: usage usage question label Nov 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Type: usage usage question
Projects
None yet
Development

No branches or pull requests

3 participants