diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index 8fd84d38..b38c0079 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -19,6 +19,7 @@ import com.google.common.base.Preconditions; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BaseIntVector; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.DateDayVector; @@ -31,6 +32,7 @@ import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeStampMicroVector; import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -44,6 +46,7 @@ import org.apache.doris.sdk.thrift.TScanBatchResult; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.rest.models.Schema; +import org.apache.doris.spark.util.IPUtils; import org.apache.spark.sql.types.Decimal; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +72,8 @@ import java.util.NoSuchElementException; import java.util.Objects; +import static org.apache.doris.spark.util.IPUtils.convertLongToIPv4Address; + /** * row batch data container. */ @@ -246,6 +251,20 @@ public void convertArrowToRowBatch() throws DorisException { } } break; + case "IPV4": + Preconditions.checkArgument(mt.equals(Types.MinorType.UINT4) || mt.equals(Types.MinorType.INT), + typeMismatchMessage(currentType, mt)); + BaseIntVector ipv4Vector; + if (mt.equals(Types.MinorType.INT)) { + ipv4Vector = (IntVector) curFieldVector; + } else { + ipv4Vector = (UInt4Vector) curFieldVector; + } + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex)); + addValueToRow(rowIndex, fieldValue); + } + break; case "FLOAT": Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4), typeMismatchMessage(currentType, mt)); @@ -314,7 +333,7 @@ public void convertArrowToRowBatch() throws DorisException { case "DATE": case "DATEV2": Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR) - || mt.equals(Types.MinorType.DATEDAY), typeMismatchMessage(currentType, mt)); + || mt.equals(Types.MinorType.DATEDAY), typeMismatchMessage(currentType, mt)); if (mt.equals(Types.MinorType.VARCHAR)) { VarCharVector date = (VarCharVector) curFieldVector; for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { @@ -392,6 +411,20 @@ public void convertArrowToRowBatch() throws DorisException { addValueToRow(rowIndex, value); } break; + case "IPV6": + Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); + VarCharVector ipv6VarcharVector = (VarCharVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { + if (ipv6VarcharVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); + break; + } + String ipv6Str = new String(ipv6VarcharVector.get(rowIndex)); + String ipv6Address = IPUtils.fromBigInteger(new BigInteger(ipv6Str)); + addValueToRow(rowIndex, ipv6Address); + } + break; case "ARRAY": Preconditions.checkArgument(mt.equals(Types.MinorType.LIST), typeMismatchMessage(currentType, mt)); diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index 914190a8..76d231ae 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -130,6 +130,8 @@ private[spark] object SchemaUtils { case "MAP" => MapType(DataTypes.StringType, DataTypes.StringType) case "STRUCT" => DataTypes.StringType case "VARIANT" => DataTypes.StringType + case "IPV4" => DataTypes.StringType + case "IPV6" => DataTypes.StringType case "HLL" => throw new DorisException("Unsupported type " + dorisType) case _ => diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java new file mode 100644 index 00000000..b086d0fc --- /dev/null +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.util; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.LongBuffer; +import java.util.Arrays; + +/** + * + */ +public class IPUtils { + /** + * Create an IPv6 address from a (positive) {@link java.math.BigInteger}. The magnitude of the + * {@link java.math.BigInteger} represents the IPv6 address value. Or in other words, the {@link + * java.math.BigInteger} with value N defines the Nth possible IPv6 address. + * + * @param bigInteger {@link java.math.BigInteger} value + * @return IPv6 address + */ + public static String fromBigInteger(BigInteger bigInteger) { + byte[] bytes = bigInteger.toByteArray(); + if (bytes[0] == 0) { + bytes = Arrays.copyOfRange(bytes, 1, bytes.length); // Skip leading zero byte + } + bytes = prefixWithZeroBytes(bytes); + long[] ipv6Bits = fromByteArray(bytes); + return toIPv6String(ipv6Bits[0], ipv6Bits[1]); + } + + private static byte[] prefixWithZeroBytes(byte[] original) { + byte[] target = new byte[16]; + System.arraycopy(original, 0, target, 16 - original.length, original.length); + return target; + } + + /** + * Create an IPv6 address from a byte array. + * + * @param bytes byte array with 16 bytes (interpreted unsigned) + * @return IPv6 address + */ + public static long[] fromByteArray(byte[] bytes) { + if (bytes == null || bytes.length != 16) { + throw new IllegalArgumentException("Byte array must be exactly 16 bytes long"); + } + ByteBuffer buf = ByteBuffer.wrap(bytes); + LongBuffer longBuffer = buf.asLongBuffer(); + return new long[] {longBuffer.get(), longBuffer.get()}; + } + + private static String toShortHandNotationString(long highBits, long lowBits) { + String[] strings = toArrayOfShortStrings(highBits, lowBits); + StringBuilder result = new StringBuilder(); + int[] shortHandNotationPositionAndLength = + startAndLengthOfLongestRunOfZeroes(highBits, lowBits); + int shortHandNotationPosition = shortHandNotationPositionAndLength[0]; + int shortHandNotationLength = shortHandNotationPositionAndLength[1]; + boolean useShortHandNotation = shortHandNotationLength > 1; + + for (int i = 0; i < strings.length; ++i) { + if (useShortHandNotation && i == shortHandNotationPosition) { + if (i == 0) { + result.append("::"); + } else { + result.append(":"); + } + } else if (i <= shortHandNotationPosition + || i >= shortHandNotationPosition + shortHandNotationLength) { + result.append(strings[i]); + if (i < 7) { + result.append(":"); + } + } + } + + return result.toString().toLowerCase(); + } + + private static String[] toArrayOfShortStrings(long highBits, long lowBits) { + short[] shorts = toShortArray(highBits, lowBits); + String[] strings = new String[shorts.length]; + + for (int i = 0; i < shorts.length; ++i) { + strings[i] = String.format("%x", shorts[i]); + } + + return strings; + } + + private static short[] toShortArray(long highBits, long lowBits) { + short[] shorts = new short[8]; + + for (int i = 0; i < 8; ++i) { + if (inHighRange(i)) { + shorts[i] = (short) ((int) (highBits << i * 16 >>> 48 & 0xFFFF)); + } else { + shorts[i] = (short) ((int) (lowBits << i * 16 >>> 48 & 0xFFFF)); + } + } + + return shorts; + } + + private static int[] startAndLengthOfLongestRunOfZeroes(long highBits, long lowBits) { + int longestConsecutiveZeroes = 0; + int longestConsecutiveZeroesPos = -1; + short[] shorts = toShortArray(highBits, lowBits); + + for (int pos = 0; pos < shorts.length; ++pos) { + int consecutiveZeroesAtCurrentPos = countConsecutiveZeroes(shorts, pos); + if (consecutiveZeroesAtCurrentPos > longestConsecutiveZeroes) { + longestConsecutiveZeroes = consecutiveZeroesAtCurrentPos; + longestConsecutiveZeroesPos = pos; + } + } + + return new int[] {longestConsecutiveZeroesPos, longestConsecutiveZeroes}; + } + + private static boolean inHighRange(int shortNumber) { + return shortNumber >= 0 && shortNumber < 4; + } + + private static int countConsecutiveZeroes(short[] shorts, int offset) { + int count = 0; + + for (int i = offset; i < shorts.length && shorts[i] == 0; ++i) { + ++count; + } + + return count; + } + + public static String toIPv6String(long highBits, long lowBits) { + + if (isIPv4Mapped(highBits, lowBits)) { + return toIPv4MappedAddressString(lowBits); + } else if (isIPv4Compatibility(highBits, lowBits)) { + return toIPv4CompatibilityAddressString(lowBits); + } + + return toShortHandNotationString(highBits, lowBits); + } + + public static String convertLongToIPv4Address(long lowBits) { + return String.format( + "%d.%d.%d.%d", + (lowBits >> 24) & 0xff, + (lowBits >> 16) & 0xff, + (lowBits >> 8) & 0xff, + lowBits & 0xff); + } + + private static String toIPv4MappedAddressString(long lowBits) { + return "::ffff:" + convertLongToIPv4Address(lowBits); + } + + private static String toIPv4CompatibilityAddressString(long lowBits) { + return "::" + convertLongToIPv4Address(lowBits); + } + + /** + * Returns true if the address is an IPv4-mapped IPv6 address. In these addresses, the first 80 + * bits are zero, the next 16 bits are one, and the remaining 32 bits are the IPv4 address. + * + * @return true if the address is an IPv4-mapped IPv6 addresses. + */ + private static boolean isIPv4Mapped(long highBits, long lowBits) { + return highBits == 0 + && (lowBits & 0xFFFF000000000000L) == 0 + && (lowBits & 0x0000FFFF00000000L) == 0x0000FFFF00000000L; + } + + /** + * Checks if the given IPv6 address is in IPv4 compatibility format. IPv4 compatibility format + * is characterized by having the high 96 bits of the IPv6 address set to zero, while the low 32 + * bits represent an IPv4 address. The criteria for determining IPv4 compatibility format are as + * follows: 1. The high 96 bits of the IPv6 address are all zeros. 2. The low 32 bits are within + * the range from 0 to 4294967295 (0xFFFFFFFF). 3. The first 16 bits of the low 32 bits are all + * ones (0xFFFF), indicating the special identifier for IPv4 compatibility format. + * + * @return True if the given IPv6 address is in IPv4 compatibility format; otherwise, false. + */ + private static boolean isIPv4Compatibility(long highBits, long lowBits) { + return highBits == 0L && lowBits <= 0xFFFFFFFFL && (lowBits & 65536L) == 65536L; + } +} diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 348895d4..0c830500 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -19,6 +19,7 @@ import org.apache.arrow.vector.DateDayVector; import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.types.DateUnit; import org.apache.arrow.vector.types.TimeUnit; import org.apache.doris.sdk.thrift.TScanBatchResult; @@ -73,13 +74,12 @@ import java.sql.Date; import java.time.LocalDateTime; import java.time.ZoneId; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; -import java.util.TimeZone; import static org.hamcrest.core.StringStartsWith.startsWith; +import static org.junit.Assert.assertEquals; public class TestRowBatch { private final static Logger logger = LoggerFactory.getLogger(TestRowBatch.class); @@ -481,7 +481,7 @@ public void testDate() throws DorisException, IOException { root.setRowCount(1); FieldVector vector = root.getVector("k1"); - VarCharVector dateVector = (VarCharVector)vector; + VarCharVector dateVector = (VarCharVector) vector; dateVector.setInitialCapacity(1); dateVector.allocateNew(); dateVector.setIndexDefined(0); @@ -491,7 +491,7 @@ public void testDate() throws DorisException, IOException { vector = root.getVector("k2"); - VarCharVector dateV2Vector = (VarCharVector)vector; + VarCharVector dateV2Vector = (VarCharVector) vector; dateV2Vector.setInitialCapacity(1); dateV2Vector.allocateNew(); dateV2Vector.setIndexDefined(0); @@ -500,7 +500,7 @@ public void testDate() throws DorisException, IOException { vector.setValueCount(1); vector = root.getVector("k3"); - DateDayVector dateNewVector = (DateDayVector)vector; + DateDayVector dateNewVector = (DateDayVector) vector; dateNewVector.setInitialCapacity(1); dateNewVector.allocateNew(); dateNewVector.setIndexDefined(0); @@ -563,7 +563,7 @@ public void testLargeInt() throws DorisException, IOException { root.setRowCount(1); FieldVector vector = root.getVector("k1"); - VarCharVector lageIntVector = (VarCharVector)vector; + VarCharVector lageIntVector = (VarCharVector) vector; lageIntVector.setInitialCapacity(1); lageIntVector.allocateNew(); lageIntVector.setIndexDefined(0); @@ -573,7 +573,7 @@ public void testLargeInt() throws DorisException, IOException { vector = root.getVector("k2"); - FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector)vector; + FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector; lageIntVector1.setInitialCapacity(1); lageIntVector1.allocateNew(); lageIntVector1.setIndexDefined(0); @@ -777,7 +777,7 @@ public void testDateTime() throws IOException, DorisException { root.setRowCount(3); FieldVector vector = root.getVector("k1"); - VarCharVector datetimeVector = (VarCharVector)vector; + VarCharVector datetimeVector = (VarCharVector) vector; datetimeVector.setInitialCapacity(3); datetimeVector.allocateNew(); datetimeVector.setIndexDefined(0); @@ -869,7 +869,7 @@ public void testVariant() throws DorisException, IOException { root.setRowCount(3); FieldVector vector = root.getVector("k1"); - VarCharVector datetimeVector = (VarCharVector)vector; + VarCharVector datetimeVector = (VarCharVector) vector; datetimeVector.setInitialCapacity(3); datetimeVector.allocateNew(); datetimeVector.setIndexDefined(0); @@ -921,4 +921,244 @@ public void testVariant() throws DorisException, IOException { } + @Test + public void testIPv4() throws DorisException, IOException { + + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + childrenBuilder.add( + new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null), + new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema( + childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(5); + + FieldVector vector = root.getVector("k1"); + UInt4Vector uInt4Vector = (UInt4Vector) vector; + uInt4Vector.setInitialCapacity(5); + uInt4Vector.allocateNew(); + uInt4Vector.setIndexDefined(0); + uInt4Vector.setSafe(0, 0); + uInt4Vector.setIndexDefined(1); + uInt4Vector.setSafe(1, 255); + uInt4Vector.setIndexDefined(2); + uInt4Vector.setSafe(2, 65535); + uInt4Vector.setIndexDefined(3); + uInt4Vector.setSafe(3, 16777215); + uInt4Vector.setIndexDefined(4); + uInt4Vector.setWithPossibleTruncate(4, 4294967295L); + + FieldVector vector1 = root.getVector("k2"); + IntVector int4Vector = (IntVector) vector1; + int4Vector.setInitialCapacity(5); + int4Vector.allocateNew(); + int4Vector.setIndexDefined(0); + int4Vector.setSafe(0, 0); + int4Vector.setIndexDefined(1); + int4Vector.setSafe(1, 255); + int4Vector.setIndexDefined(2); + int4Vector.setSafe(2, 65535); + int4Vector.setIndexDefined(3); + int4Vector.setSafe(3, 16777215); + int4Vector.setIndexDefined(4); + int4Vector.setWithPossibleTruncate(4, 4294967295L); + + vector.setValueCount(5); + vector1.setValueCount(5); + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, " + + "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + Assert.assertTrue(rowBatch.hasNext()); + List actualRow0 = rowBatch.next(); + assertEquals("0.0.0.0", actualRow0.get(0)); + assertEquals("0.0.0.0", actualRow0.get(1)); + List actualRow1 = rowBatch.next(); + assertEquals("0.0.0.255", actualRow1.get(0)); + assertEquals("0.0.0.255", actualRow1.get(1)); + Assert.assertTrue(rowBatch.hasNext()); + List actualRow2 = rowBatch.next(); + assertEquals("0.0.255.255", actualRow2.get(0)); + assertEquals("0.0.255.255", actualRow2.get(1)); + Assert.assertTrue(rowBatch.hasNext()); + List actualRow3 = rowBatch.next(); + assertEquals("0.255.255.255", actualRow3.get(0)); + assertEquals("0.255.255.255", actualRow3.get(1)); + List actualRow4 = rowBatch.next(); + assertEquals("255.255.255.255", actualRow4.get(0)); + assertEquals("255.255.255.255", actualRow4.get(1)); + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } + + @Test + public void testIPv6() throws DorisException, IOException { + + ImmutableList.Builder childrenBuilder = ImmutableList.builder(); + childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Utf8()), null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema( + childrenBuilder.build(), null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(13); + + FieldVector vector = root.getVector("k1"); + VarCharVector ipv6Vector = (VarCharVector) vector; + ipv6Vector.setInitialCapacity(13); + ipv6Vector.allocateNew(); + ipv6Vector.setIndexDefined(0); + ipv6Vector.setValueLengthSafe(0, 1); + ipv6Vector.setSafe(0, "0".getBytes()); + + ipv6Vector.setIndexDefined(1); + ipv6Vector.setValueLengthSafe(0, 1); + ipv6Vector.setSafe(1, "1".getBytes()); + + ipv6Vector.setIndexDefined(2); + ipv6Vector.setSafe(2, "65535".getBytes()); + + ipv6Vector.setIndexDefined(3); + ipv6Vector.setSafe(3, "65536".getBytes()); + + ipv6Vector.setIndexDefined(4); + ipv6Vector.setSafe(4, "4294967295".getBytes()); + + ipv6Vector.setIndexDefined(5); + ipv6Vector.setSafe(5, "4294967296".getBytes()); + + ipv6Vector.setIndexDefined(6); + ipv6Vector.setSafe(6, "8589934591".getBytes()); + + ipv6Vector.setIndexDefined(7); + ipv6Vector.setSafe(7, "281470681743359".getBytes()); + + ipv6Vector.setIndexDefined(8); + ipv6Vector.setSafe(8, "281470681743360".getBytes()); + + ipv6Vector.setIndexDefined(9); + ipv6Vector.setSafe(9, "281474976710655".getBytes()); + + ipv6Vector.setIndexDefined(10); + ipv6Vector.setSafe(10, "281474976710656".getBytes()); + + ipv6Vector.setIndexDefined(11); + ipv6Vector.setSafe(11, "340277174624079928635746639885392347137".getBytes()); + + ipv6Vector.setIndexDefined(12); + ipv6Vector.setSafe(12, "340282366920938463463374607431768211455".getBytes()); + + vector.setValueCount(13); + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[" + + "{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}" + + "], \"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema); + Assert.assertTrue(rowBatch.hasNext()); + List actualRow0 = rowBatch.next(); + assertEquals("::", actualRow0.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow1 = rowBatch.next(); + assertEquals("::1", actualRow1.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow2 = rowBatch.next(); + assertEquals("::ffff", actualRow2.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow3 = rowBatch.next(); + assertEquals("::0.1.0.0", actualRow3.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow4 = rowBatch.next(); + assertEquals("::255.255.255.255", actualRow4.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow5 = rowBatch.next(); + assertEquals("::1:0:0", actualRow5.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow6 = rowBatch.next(); + assertEquals("::1:ffff:ffff", actualRow6.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow7 = rowBatch.next(); + assertEquals("::fffe:ffff:ffff", actualRow7.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow8 = rowBatch.next(); + assertEquals("::ffff:0.0.0.0", actualRow8.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow9 = rowBatch.next(); + assertEquals("::ffff:255.255.255.255", actualRow9.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow10 = rowBatch.next(); + assertEquals("::1:0:0:0", actualRow10.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow11 = rowBatch.next(); + assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0)); + + Assert.assertTrue(rowBatch.hasNext()); + List actualRow12 = rowBatch.next(); + assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", actualRow12.get(0)); + + Assert.assertFalse(rowBatch.hasNext()); + thrown.expect(NoSuchElementException.class); + thrown.expectMessage(startsWith("Get row offset:")); + rowBatch.next(); + } }