Skip to content

Commit

Permalink
more stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 23, 2024
1 parent 94acc5d commit 5d5cb65
Showing 1 changed file with 19 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ abstract class RocksDBDataEncoder(
val positiveValMarker: Byte = 0x01.toByte
val nullValMarker: Byte = 0x02.toByte


def unsupportedOperationForKeyStateEncoder(
operation: String
): UnsupportedOperationException = {
new UnsupportedOperationException(
s"Method $operation not supported for encoder spec type " +
s"${keyStateEncoderSpec.getClass.getSimpleName}")
}

/**
* Encode the UnsafeRow of N bytes as a N+1 byte array.
* @note This creates a new byte array and memcopies the UnsafeRow to the new array.
Expand Down Expand Up @@ -259,7 +268,7 @@ class UnsafeRowDataEncoder(
decodeToUnsafeRow(bytes, reusedKeyRow)
case PrefixKeyScanStateEncoderSpec(_, numColsPrefixKey) =>
decodeToUnsafeRow(bytes, numFields = numColsPrefixKey)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
}
}

Expand All @@ -269,7 +278,7 @@ class UnsafeRowDataEncoder(
decodeToUnsafeRow(bytes, numFields = numColsPrefixKey)
case RangeKeyScanStateEncoderSpec(_, orderingOrdinals) =>
decodeToUnsafeRow(bytes, keySchema.length - orderingOrdinals.length)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("decodeRemainingKey")
}
}

Expand Down Expand Up @@ -354,7 +363,7 @@ class AvroStateEncoder(
private lazy val prefixKeySchema = keyStateEncoderSpec match {
case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
StructType(keySchema.take (numColsPrefixKey))
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("prefixKeySchema")
}
private lazy val prefixKeyAvroType = SchemaConverters.toAvroType(prefixKeySchema)
private lazy val prefixKeyProj = UnsafeProjection.create(prefixKeySchema)
Expand All @@ -367,7 +376,8 @@ class AvroStateEncoder(
val field = keySchema(ordinal)
(field, ordinal)
}
case _ => null
case _ =>
throw unsupportedOperationForKeyStateEncoder("rangeScanKey")
}

private lazy val rangeScanAvroSchema = StateStoreColumnFamilySchemaUtils.convertForRangeScan(
Expand All @@ -385,7 +395,7 @@ class AvroStateEncoder(
StructType(keySchema.drop(numColsPrefixKey))
case RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) =>
StructType(0.until(keySchema.length).diff(orderingOrdinals).map(keySchema(_)))
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("remainingKeySchema")
}

private lazy val remainingKeyAvroType = SchemaConverters.toAvroType(remainingKeySchema)
Expand Down Expand Up @@ -445,7 +455,7 @@ class AvroStateEncoder(
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, keyAvroType, out)
case PrefixKeyScanStateEncoderSpec(_, _) =>
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, prefixKeyAvroType, out)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("encodeKey")
}
}

Expand All @@ -455,7 +465,7 @@ class AvroStateEncoder(
encodeUnsafeRowToAvro(row, avroEncoder.suffixKeySerializer.get, remainingKeyAvroType, out)
case RangeKeyScanStateEncoderSpec(_, _) =>
encodeUnsafeRowToAvro(row, avroEncoder.keySerializer, remainingKeyAvroType, out)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("encodeRemainingKey")
}
}

Expand Down Expand Up @@ -608,7 +618,7 @@ class AvroStateEncoder(
case PrefixKeyScanStateEncoderSpec(_, _) =>
decodeFromAvroToUnsafeRow(
bytes, avroEncoder.keyDeserializer, prefixKeyAvroType, prefixKeyProj)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("decodeKey")
}
}

Expand All @@ -621,7 +631,7 @@ class AvroStateEncoder(
case RangeKeyScanStateEncoderSpec(_, _) =>
decodeFromAvroToUnsafeRow(
bytes, avroEncoder.keyDeserializer, remainingKeyAvroType, remainingKeyAvroProjection)
case _ => null
case _ => throw unsupportedOperationForKeyStateEncoder("decodeRemainingKey")
}
}

Expand Down

0 comments on commit 5d5cb65

Please sign in to comment.