Skip to content

Commit

Permalink
scaladoc
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 23, 2024
1 parent 19f34ab commit 94acc5d
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ sealed trait RocksDBValueStateEncoder {
def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
}

/**
* The DataEncoder can encode UnsafeRows into raw bytes in two ways:
* - Using the direct byte layout of the UnsafeRow
* - Converting the UnsafeRow into an Avro row, and encoding that
* In both of these cases, the raw bytes that are written into RockDB have
* headers, footers and other metadata, but they also have data that is provided
* by the callers. The metadata in each row does not need to be written as Avro or UnsafeRow,
* but the actual data provided by the caller does.
*/
trait DataEncoder {
def encodeKey(row: UnsafeRow): Array[Byte]
def encodeRemainingKey(row: UnsafeRow): Array[Byte]
Expand Down Expand Up @@ -333,11 +342,12 @@ class AvroStateEncoder(
avroEncoder: AvroEncoder) extends RocksDBDataEncoder(keyStateEncoderSpec, valueSchema)
with Logging {

// Avro schema used by the avro encoders
private lazy val keyAvroType: Schema = SchemaConverters.toAvroType(keySchema)
private lazy val keyProj = UnsafeProjection.create(keySchema)
private lazy val valueProj = UnsafeProjection.create(valueSchema)

private lazy val valueAvroType: Schema = SchemaConverters.toAvroType(valueSchema)
private lazy val valueProj = UnsafeProjection.create(valueSchema)

// Prefix Key schema and projection definitions used by the Avro Serializers
// and Deserializers
Expand All @@ -346,10 +356,11 @@ class AvroStateEncoder(
StructType(keySchema.take (numColsPrefixKey))
case _ => null
}

private lazy val prefixKeyAvroType = SchemaConverters.toAvroType(prefixKeySchema)
private lazy val prefixKeyProj = UnsafeProjection.create(prefixKeySchema)

// Range Key schema nd projection definitions used by the Avro Serializers and
// Deserializers
private lazy val rangeScanKeyFieldsWithOrdinal = keyStateEncoderSpec match {
case RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) =>
orderingOrdinals.map { ordinal =>
Expand All @@ -366,7 +377,7 @@ class AvroStateEncoder(

private lazy val rangeScanAvroProjection = UnsafeProjection.create(rangeScanAvroSchema)

// Existing remainder key schema stuff
// Existing remainder key schema definitions
// Remaining Key schema and projection definitions used by the Avro Serializers
// and Deserializers
private val remainingKeySchema = keyStateEncoderSpec match {
Expand Down Expand Up @@ -427,6 +438,7 @@ class AvroStateEncoder(
}

private val out = new ByteArrayOutputStream

override def encodeKey(row: UnsafeRow): Array[Byte] = {
keyStateEncoderSpec match {
case NoPrefixKeyStateEncoderSpec(_) =>
Expand All @@ -447,8 +459,27 @@ class AvroStateEncoder(
}
}

override def encodePrefixKeyForRangeScan(
row: UnsafeRow): Array[Byte] = {
/**
* Encodes an UnsafeRow into an Avro-compatible byte array format for range scan operations.
*
* This method transforms row data into a binary format that preserves ordering when
* used in range scans.
* For each field in the row:
* - A marker byte is written to indicate null status or sign (for numeric types)
* - The value is written in big-endian format
*
* Special handling is implemented for:
* - Null values: marked with nullValMarker followed by zero bytes
* - Negative numbers: marked with negativeValMarker
* - Floating point numbers: bit manipulation to handle sign and NaN values correctly
*
* @param row The UnsafeRow to encode
* @param avroType The Avro schema defining the structure for encoding
* @return Array[Byte] containing the Avro-encoded data that preserves ordering for range scans
* @throws UnsupportedOperationException if a field's data type is not supported for range
* scan encoding
*/
override def encodePrefixKeyForRangeScan(row: UnsafeRow): Array[Byte] = {
val record = new GenericData.Record(rangeScanAvroType)
var fieldIdx = 0
rangeScanKeyFieldsWithOrdinal.zipWithIndex.foreach { case (fieldWithOrdinal, idx) =>
Expand Down Expand Up @@ -594,6 +625,25 @@ class AvroStateEncoder(
}
}

/**
* Decodes an Avro-encoded byte array back into an UnsafeRow for range scan operations.
*
* This method reverses the encoding process performed by encodePrefixKeyForRangeScan:
* - Reads the marker byte to determine null status or sign
* - Reconstructs the original values from big-endian format
* - Handles special cases for floating point numbers by reversing bit manipulations
*
* The decoding process preserves the original data types and values, including:
* - Null values marked by nullValMarker
* - Sign information for numeric types
* - Proper restoration of negative floating point values
*
* @param bytes The Avro-encoded byte array to decode
* @param avroType The Avro schema defining the structure for decoding
* @return UnsafeRow containing the decoded data
* @throws UnsupportedOperationException if a field's data type is not supported for range
* scan decoding
*/
override def decodePrefixKeyForRangeScan(bytes: Array[Byte]): UnsafeRow = {
val reader = new GenericDatumReader[GenericRecord](rangeScanAvroType)
val decoder = DecoderFactory.get().binaryDecoder(bytes, 0, bytes.length, null)
Expand Down Expand Up @@ -730,21 +780,21 @@ abstract class RocksDBKeyStateEncoderBase(

object RocksDBStateEncoder {
def getKeyEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
keyStateEncoderSpec: KeyStateEncoderSpec,
useColumnFamilies: Boolean,
virtualColFamilyId: Option[Short] = None): RocksDBKeyStateEncoder = {
// Return the key state encoder based on the requested type
keyStateEncoderSpec match {
case NoPrefixKeyStateEncoderSpec(keySchema) =>
new NoPrefixKeyStateEncoder(stateEncoder, keySchema, useColumnFamilies, virtualColFamilyId)
new NoPrefixKeyStateEncoder(dataEncoder, keySchema, useColumnFamilies, virtualColFamilyId)

case PrefixKeyScanStateEncoderSpec(keySchema, numColsPrefixKey) =>
new PrefixKeyScanStateEncoder(stateEncoder, keySchema, numColsPrefixKey,
new PrefixKeyScanStateEncoder(dataEncoder, keySchema, numColsPrefixKey,
useColumnFamilies, virtualColFamilyId)

case RangeKeyScanStateEncoderSpec(keySchema, orderingOrdinals) =>
new RangeKeyScanStateEncoder(stateEncoder, keySchema, orderingOrdinals,
new RangeKeyScanStateEncoder(dataEncoder, keySchema, orderingOrdinals,
useColumnFamilies, virtualColFamilyId)

case _ =>
Expand All @@ -754,13 +804,13 @@ object RocksDBStateEncoder {
}

def getValueEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
valueSchema: StructType,
useMultipleValuesPerKey: Boolean): RocksDBValueStateEncoder = {
if (useMultipleValuesPerKey) {
new MultiValuedStateEncoder(stateEncoder, valueSchema)
new MultiValuedStateEncoder(dataEncoder, valueSchema)
} else {
new SingleValueStateEncoder(stateEncoder, valueSchema)
new SingleValueStateEncoder(dataEncoder, valueSchema)
}
}

Expand All @@ -774,12 +824,13 @@ object RocksDBStateEncoder {
/**
* RocksDB Key Encoder for UnsafeRow that supports prefix scan
*
* @param dataEncoder - the encoder that handles actual encoding/decoding of data
* @param keySchema - schema of the key to be encoded
* @param numColsPrefixKey - number of columns to be used for prefix key
* @param useColumnFamilies - if column family is enabled for this encoder
*/
class PrefixKeyScanStateEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
keySchema: StructType,
numColsPrefixKey: Int,
useColumnFamilies: Boolean = false,
Expand Down Expand Up @@ -812,8 +863,8 @@ class PrefixKeyScanStateEncoder(
private val joinedRowOnKey = new JoinedRow()

override def encodeKey(row: UnsafeRow): Array[Byte] = {
val prefixKeyEncoded = stateEncoder.encodeKey(extractPrefixKey(row))
val remainingEncoded = stateEncoder.encodeRemainingKey(remainingKeyProjection(row))
val prefixKeyEncoded = dataEncoder.encodeKey(extractPrefixKey(row))
val remainingEncoded = dataEncoder.encodeRemainingKey(remainingKeyProjection(row))

val (encodedBytes, startingOffset) = encodeColumnFamilyPrefix(
prefixKeyEncoded.length + remainingEncoded.length + 4
Expand Down Expand Up @@ -845,9 +896,9 @@ class PrefixKeyScanStateEncoder(
Platform.copyMemory(keyBytes, decodeKeyStartOffset + 4 + prefixKeyEncodedLen,
remainingKeyEncoded, Platform.BYTE_ARRAY_OFFSET, remainingKeyEncodedLen)

val prefixKeyDecoded = stateEncoder.decodeKey(
val prefixKeyDecoded = dataEncoder.decodeKey(
prefixKeyEncoded)
val remainingKeyDecoded = stateEncoder.decodeRemainingKey(remainingKeyEncoded)
val remainingKeyDecoded = dataEncoder.decodeRemainingKey(remainingKeyEncoded)

restoreKeyProjection(joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded))
}
Expand All @@ -857,7 +908,7 @@ class PrefixKeyScanStateEncoder(
}

override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
val prefixKeyEncoded = stateEncoder.encodeKey(prefixKey)
val prefixKeyEncoded = dataEncoder.encodeKey(prefixKey)
val (prefix, startingOffset) = encodeColumnFamilyPrefix(
prefixKeyEncoded.length + 4
)
Expand Down Expand Up @@ -898,12 +949,13 @@ class PrefixKeyScanStateEncoder(
* the right lexicographical ordering. For the rationale around this, please check the link
* here: https://en.wikipedia.org/wiki/IEEE_754#Design_rationale
*
* @param dataEncoder - the encoder that handles the actual encoding/decoding of data
* @param keySchema - schema of the key to be encoded
* @param orderingOrdinals - the ordinals for which the range scan is constructed
* @param useColumnFamilies - if column family is enabled for this encoder
*/
class RangeKeyScanStateEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
keySchema: StructType,
orderingOrdinals: Seq[Int],
useColumnFamilies: Boolean = false,
Expand Down Expand Up @@ -985,10 +1037,10 @@ class RangeKeyScanStateEncoder(
override def encodeKey(row: UnsafeRow): Array[Byte] = {
// This prefix key has the columns specified by orderingOrdinals
val prefixKey = extractPrefixKey(row)
val rangeScanKeyEncoded = stateEncoder.encodePrefixKeyForRangeScan(prefixKey)
val rangeScanKeyEncoded = dataEncoder.encodePrefixKeyForRangeScan(prefixKey)

val result = if (orderingOrdinals.length < keySchema.length) {
val remainingEncoded = stateEncoder.encodeRemainingKey(remainingKeyProjection(row))
val remainingEncoded = dataEncoder.encodeRemainingKey(remainingKeyProjection(row))
val (encodedBytes, startingOffset) = encodeColumnFamilyPrefix(
rangeScanKeyEncoded.length + remainingEncoded.length + 4
)
Expand Down Expand Up @@ -1025,7 +1077,7 @@ class RangeKeyScanStateEncoder(
Platform.copyMemory(keyBytes, decodeKeyStartOffset + 4,
prefixKeyEncoded, Platform.BYTE_ARRAY_OFFSET, prefixKeyEncodedLen)

val prefixKeyDecoded = stateEncoder.decodePrefixKeyForRangeScan(
val prefixKeyDecoded = dataEncoder.decodePrefixKeyForRangeScan(
prefixKeyEncoded)

if (orderingOrdinals.length < keySchema.length) {
Expand All @@ -1038,7 +1090,7 @@ class RangeKeyScanStateEncoder(
remainingKeyEncoded, Platform.BYTE_ARRAY_OFFSET,
remainingKeyEncodedLen)

val remainingKeyDecoded = stateEncoder.decodeRemainingKey(remainingKeyEncoded)
val remainingKeyDecoded = dataEncoder.decodeRemainingKey(remainingKeyEncoded)

val joined = joinedRowOnKey.withLeft(prefixKeyDecoded).withRight(remainingKeyDecoded)
val restored = restoreKeyProjection(joined)
Expand All @@ -1051,7 +1103,7 @@ class RangeKeyScanStateEncoder(
}

override def encodePrefixKey(prefixKey: UnsafeRow): Array[Byte] = {
val rangeScanKeyEncoded = stateEncoder.encodePrefixKeyForRangeScan(prefixKey)
val rangeScanKeyEncoded = dataEncoder.encodePrefixKeyForRangeScan(prefixKey)
val (prefix, startingOffset) = encodeColumnFamilyPrefix(rangeScanKeyEncoded.length + 4)

Platform.putInt(prefix, startingOffset, rangeScanKeyEncoded.length)
Expand All @@ -1076,7 +1128,7 @@ class RangeKeyScanStateEncoder(
* then the generated array byte will be N+1 bytes.
*/
class NoPrefixKeyStateEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
keySchema: StructType,
useColumnFamilies: Boolean = false,
virtualColFamilyId: Option[Short] = None)
Expand All @@ -1087,9 +1139,9 @@ class NoPrefixKeyStateEncoder(

override def encodeKey(row: UnsafeRow): Array[Byte] = {
if (!useColumnFamilies) {
stateEncoder.encodeKey(row)
dataEncoder.encodeKey(row)
} else {
val bytesToEncode = stateEncoder.encodeKey(row)
val bytesToEncode = dataEncoder.encodeKey(row)
val (encodedBytes, startingOffset) = encodeColumnFamilyPrefix(
bytesToEncode.length +
STATE_ENCODING_NUM_VERSION_BYTES
Expand Down Expand Up @@ -1122,12 +1174,12 @@ class NoPrefixKeyStateEncoder(
dataBytes,
Platform.BYTE_ARRAY_OFFSET,
dataLength)
stateEncoder.decodeKey(dataBytes)
dataEncoder.decodeKey(dataBytes)
} else {
null
}
} else {
stateEncoder.decodeKey(keyBytes)
dataEncoder.decodeKey(keyBytes)
}
}

Expand All @@ -1152,12 +1204,12 @@ class NoPrefixKeyStateEncoder(
* operation.
*/
class MultiValuedStateEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
valueSchema: StructType)
extends RocksDBValueStateEncoder with Logging {

override def encodeValue(row: UnsafeRow): Array[Byte] = {
val bytes = stateEncoder.encodeValue(row)
val bytes = dataEncoder.encodeValue(row)
val numBytes = bytes.length

val encodedBytes = new Array[Byte](java.lang.Integer.BYTES + bytes.length)
Expand All @@ -1176,7 +1228,7 @@ class MultiValuedStateEncoder(
val encodedValue = new Array[Byte](numBytes)
Platform.copyMemory(valueBytes, java.lang.Integer.BYTES + Platform.BYTE_ARRAY_OFFSET,
encodedValue, Platform.BYTE_ARRAY_OFFSET, numBytes)
stateEncoder.decodeValue(encodedValue)
dataEncoder.decodeValue(encodedValue)
}
}

Expand All @@ -1202,7 +1254,7 @@ class MultiValuedStateEncoder(

pos += numBytes
pos += 1 // eat the delimiter character
stateEncoder.decodeValue(encodedValue)
dataEncoder.decodeValue(encodedValue)
}
}
}
Expand All @@ -1224,11 +1276,11 @@ class MultiValuedStateEncoder(
* then the generated array byte will be N+1 bytes.
*/
class SingleValueStateEncoder(
stateEncoder: RocksDBDataEncoder,
dataEncoder: RocksDBDataEncoder,
valueSchema: StructType)
extends RocksDBValueStateEncoder {

override def encodeValue(row: UnsafeRow): Array[Byte] = stateEncoder.encodeValue(row)
override def encodeValue(row: UnsafeRow): Array[Byte] = dataEncoder.encodeValue(row)

/**
* Decode byte array for a value to a UnsafeRow.
Expand All @@ -1237,7 +1289,7 @@ class SingleValueStateEncoder(
* the given byte array.
*/
override def decodeValue(valueBytes: Array[Byte]): UnsafeRow = {
stateEncoder.decodeValue(valueBytes)
dataEncoder.decodeValue(valueBytes)
}

override def supportsMultipleValuesPerKey: Boolean = false
Expand Down
Loading

0 comments on commit 94acc5d

Please sign in to comment.