Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.parquet

import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -263,17 +264,23 @@ private[parquet] class CatalystRowConverter(
val scale = decimalType.scale
val bytes = value.getBytes

var unscaled = 0L
var i = 0
if (precision <= 8) {
// Constructs a `Decimal` with an unscaled `Long` value if possible.
var unscaled = 0L
var i = 0

while (i < bytes.length) {
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
i += 1
}
while (i < bytes.length) {
unscaled = (unscaled << 8) | (bytes(i) & 0xff)
i += 1
}

val bits = 8 * bytes.length
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
Decimal(unscaled, precision, scale)
val bits = 8 * bytes.length
unscaled = (unscaled << (64 - bits)) >> (64 - bits)
Decimal(unscaled, precision, scale)
} else {
// Otherwise, resorts to an unscaled `BigInteger` instead.
Decimal(new BigDecimal(new BigInteger(bytes), scale), precision, scale)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,24 +387,18 @@ private[parquet] class CatalystSchemaConverter(
// =====================================

// Spark 1.4.x and prior versions only support decimals with a maximum precision of 18 and
// always store decimals in fixed-length byte arrays.
case DecimalType.Fixed(precision, scale)
if precision <= maxPrecisionForBytes(8) && !followParquetFormatSpec =>
// always store decimals in fixed-length byte arrays. To keep compatibility with these older
// versions, here we convert decimals with all precisions to `FIXED_LEN_BYTE_ARRAY` annotated
// by `DECIMAL`.
case DecimalType.Fixed(precision, scale) if !followParquetFormatSpec =>
Types
.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(minBytesForPrecision(precision))
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

case dec @ DecimalType() if !followParquetFormatSpec =>
throw new AnalysisException(
s"Data type $dec is not supported. " +
s"When ${SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key} is set to false," +
"decimal precision and scale must be specified, " +
"and precision must be less than or equal to 18.")

// =====================================
// Decimals (follow Parquet format spec)
// =====================================
Expand Down Expand Up @@ -436,7 +430,7 @@ private[parquet] class CatalystSchemaConverter(
.as(DECIMAL)
.precision(precision)
.scale(scale)
.length(minBytesForPrecision(precision))
.length(CatalystSchemaConverter.minBytesForPrecision(precision))
.named(field.name)

// ===================================================
Expand Down Expand Up @@ -548,15 +542,6 @@ private[parquet] class CatalystSchemaConverter(
Math.pow(2, 8 * numBytes - 1) - 1))) // max value stored in numBytes
.asInstanceOf[Int]
}

// Min byte counts needed to store decimals with various precisions
private val minBytesForPrecision: Array[Int] = Array.tabulate(38) { precision =>
var numBytes = 1
while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
numBytes += 1
}
numBytes
}
}


Expand All @@ -580,4 +565,23 @@ private[parquet] object CatalystSchemaConverter {
throw new AnalysisException(message)
}
}

private def computeMinBytesForPrecision(precision : Int) : Int = {
var numBytes = 1
while (math.pow(2.0, 8 * numBytes - 1) < math.pow(10.0, precision)) {
numBytes += 1
}
numBytes
}

private val MIN_BYTES_FOR_PRECISION = Array.tabulate[Int](39)(computeMinBytesForPrecision)

// Returns the minimum number of bytes needed to store a decimal with a given `precision`.
def minBytesForPrecision(precision : Int) : Int = {
if (precision < MIN_BYTES_FOR_PRECISION.length) {
MIN_BYTES_FOR_PRECISION(precision)
} else {
computeMinBytesForPrecision(precision)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.parquet

import java.math.BigInteger
import java.nio.{ByteBuffer, ByteOrder}
import java.util.{HashMap => JHashMap}

Expand Down Expand Up @@ -114,11 +115,8 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
Binary.fromByteArray(value.asInstanceOf[UTF8String].getBytes))
case BinaryType => writer.addBinary(
Binary.fromByteArray(value.asInstanceOf[Array[Byte]]))
case d: DecimalType =>
if (d.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
}
writeDecimal(value.asInstanceOf[Decimal], d.precision)
case DecimalType.Fixed(precision, _) =>
writeDecimal(value.asInstanceOf[Decimal], precision)
case _ => sys.error(s"Do not know how to writer $schema to consumer")
}
}
Expand Down Expand Up @@ -199,20 +197,47 @@ private[parquet] class RowWriteSupport extends WriteSupport[InternalRow] with Lo
writer.endGroup()
}

// Scratch array used to write decimals as fixed-length binary
private[this] val scratchBytes = new Array[Byte](8)
// Scratch array used to write decimals as fixed-length byte array
private[this] var reusableDecimalBytes = new Array[Byte](16)

private[parquet] def writeDecimal(decimal: Decimal, precision: Int): Unit = {
val numBytes = ParquetTypesConverter.BYTES_FOR_PRECISION(precision)
val unscaledLong = decimal.toUnscaledLong
var i = 0
var shift = 8 * (numBytes - 1)
while (i < numBytes) {
scratchBytes(i) = (unscaledLong >> shift).toByte
i += 1
shift -= 8
val numBytes = CatalystSchemaConverter.minBytesForPrecision(precision)

def longToBinary(unscaled: Long): Binary = {
var i = 0
var shift = 8 * (numBytes - 1)
while (i < numBytes) {
reusableDecimalBytes(i) = (unscaled >> shift).toByte
i += 1
shift -= 8
}
Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)
}
writer.addBinary(Binary.fromByteArray(scratchBytes, 0, numBytes))

def bigIntegerToBinary(unscaled: BigInteger): Binary = {
unscaled.toByteArray match {
case bytes if bytes.length == numBytes =>
Binary.fromByteArray(bytes)

case bytes if bytes.length <= reusableDecimalBytes.length =>
val signedByte = (if (bytes.head < 0) -1 else 0).toByte
java.util.Arrays.fill(reusableDecimalBytes, 0, numBytes - bytes.length, signedByte)
System.arraycopy(bytes, 0, reusableDecimalBytes, numBytes - bytes.length, bytes.length)
Binary.fromByteArray(reusableDecimalBytes, 0, numBytes)

case bytes =>
reusableDecimalBytes = new Array[Byte](bytes.length)
bigIntegerToBinary(unscaled)
}
}

val binary = if (numBytes <= 8) {
longToBinary(decimal.toUnscaledLong)
} else {
bigIntegerToBinary(decimal.toJavaBigDecimal.unscaledValue())
}

writer.addBinary(binary)
}

// array used to write Timestamp as Int96 (fixed-length binary)
Expand Down Expand Up @@ -268,11 +293,8 @@ private[parquet] class MutableRowWriteSupport extends RowWriteSupport {
writer.addBinary(Binary.fromByteArray(record.getUTF8String(index).getBytes))
case BinaryType =>
writer.addBinary(Binary.fromByteArray(record.getBinary(index)))
case d: DecimalType =>
if (d.precision > 18) {
sys.error(s"Unsupported datatype $d, cannot write to consumer")
}
writeDecimal(record.getDecimal(index), d.precision)
case DecimalType.Fixed(precision, _) =>
writeDecimal(record.getDecimal(index), precision)
case _ => sys.error(s"Unsupported datatype $ctype, cannot write to consumer")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,13 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
// Parquet doesn't allow column names with spaces, have to add an alias here
.select($"_1" cast decimal as "dec")

for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17), (19, 0), (38, 37))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should fail if due to followParquetFormatSpec = false.

I was trying to test "withSQLConf" but couldn't get it to work: https://github.com/apache/spark/pull/6796/files#diff-82fab6131b7092c5faa4064fd04c3d72R135

(I have to find out why I can't run tests locally, ./build/sbt sql/test fails with a compiler assertion?!?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should it fail? Decimals with large precisions are available no matter followParquetFormatSpec is true or false, right? For your local test failure, I guess a clean would probably solve the problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that as stated in the PR description, this PR doesn't support writing decimals when followParquetFormatSpec is true, because it doesn't make sense yet until the whole Parquet write path is refactored to conform Parquet format spec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, you removed the < 8 check too. But shouldn't followParquetFormatSpec = false generate compatible files?

I'm getting a compiler assertion on test compile and I tried cleaning :-S Anyway, must be some sort of local ******.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, good question... Maybe we should just disable large decimal precisions in compatible mode? However, if we do that, this PR will only be able to read decimals with large precisions. I'll probably refactor Parquet write path for Parquet format spec in 1.5 and add proper decimal writing support then, but it's not a promise yet, since it's assigned a relatively low priority.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer the way you currently wrote it.
I don't see a point in keeping a "store it in a way that an older version can read" flag. You should always try a new version and then use it for real storage. And reading files written by old spark version will always be possible.

PS: solved the test thing. It looks like spark sbt somehow managed to use a local 2.9.6 scalac 0.o

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One scenario is this:

  1. You were using some old Spark version for writing Parquet files
  2. And you developed some down stream tools to process those Parquet files
  3. Then you upgraded to Spark 1.5

Parquet format spec is relatively new and few tools/systems implemented it. So it's quite possible that tools mentioned in 2 are bound to the legacy non-standard Parquet format the older Spark version adopts. If we don't provide a compatible mode, these tools are screwed up and must be rewritten.

The reason why I added large decimal precision support for compatible mode is that it just adds an extra ability that older versions don't have without breaking any existing things. I guess keeping the current behavior is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a changelog entry to me.
(We can now write parquet files for Decimal >18, so please check compatibility if you use spark parquet files elsewhere)

withTempPath { dir =>
val data = makeDecimalRDD(DecimalType(precision, scale))
data.write.parquet(dir.getCanonicalPath)
checkAnswer(sqlContext.read.parquet(dir.getCanonicalPath), data.collect().toSeq)
}
}

// Decimals with precision above 18 are not yet supported
intercept[Throwable] {
withTempPath { dir =>
makeDecimalRDD(DecimalType(19, 10)).write.parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).collect()
}
}
}

test("date type") {
Expand Down