Skip to content

Commit

Permalink
move into convertRowCellsToFeatures function
Browse files Browse the repository at this point in the history
  • Loading branch information
bayu-aditya committed Oct 2, 2024
1 parent 455e253 commit 4355a1a
Showing 1 changed file with 56 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -68,54 +69,66 @@ public List<List<Feature>> convertRowToFeature(
rowKey -> {
if (!rows.containsKey(rowKey)) {
return Collections.<Feature>emptyList();
} else {
Result row = rows.get(rowKey);
return featureReferences.stream()
.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells -> {
Cell rowCell = rowCells.get(0); // Latest cell
ByteBuffer valueBuffer =
HBaseSchemaRegistry.GetValueByteBufferFromRowCell(rowCell);
ByteBuffer familyBuffer =
ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
featureReferences.stream()
.filter(
featureReference ->
featureReference.getFeatureTable().equals(family))
.collect(Collectors.toList());

try {
features =
decodeFeatures(
tableName,
value,
localFeatureReferences,
reusedDecoder,
rowCell.getTimestamp());
} catch (IOException e) {
throw new RuntimeException("Failed to decode features from BigTable");
}

return features.stream();
})
.collect(Collectors.toList());
}

Result row = rows.get(rowKey);
return featureReferences.stream()
.map(ServingServiceProto.FeatureReference::getFeatureTable)
.distinct()
.map(cf -> row.getColumnCells(cf.getBytes(), null))
.filter(ls -> !ls.isEmpty())
.flatMap(
rowCells ->
this.convertRowCellsToFeatures(
featureReferences, reusedDecoder, tableName, rowCells))
.collect(Collectors.toList());
})
.collect(Collectors.toList());
}

/**
* Converts rowCells feature into stream @NativeFeature type
*
* @param featureReferences List of feature references
* @param reusedDecoder Decoder for decoding feature values
* @param tableName Name of SSTable
* @param rowCells row cells data from SSTable
* @return Stream of @NativeFeature
* @throws RuntimeException failed to decode features
*/
private Stream<Feature> convertRowCellsToFeatures(
List<ServingServiceProto.FeatureReference> featureReferences,
BinaryDecoder reusedDecoder,
String tableName,
List<Cell> rowCells) {

Cell rowCell = rowCells.get(0); // Latest cell
ByteBuffer valueBuffer = HBaseSchemaRegistry.GetValueByteBufferFromRowCell(rowCell);
ByteBuffer familyBuffer =
ByteBuffer.wrap(rowCell.getFamilyArray())
.position(rowCell.getFamilyOffset())
.limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength())
.slice();
String family = ByteString.copyFrom(familyBuffer).toStringUtf8();
ByteString value = ByteString.copyFrom(valueBuffer);

List<Feature> features;
List<ServingServiceProto.FeatureReference> localFeatureReferences =
featureReferences.stream()
.filter(featureReference -> featureReference.getFeatureTable().equals(family))
.collect(Collectors.toList());

try {
features =
decodeFeatures(
tableName, value, localFeatureReferences, reusedDecoder, rowCell.getTimestamp());
} catch (IOException e) {
throw new RuntimeException("Failed to decode features from BigTable");
}

return features.stream();
}

/**
* Retrieve rows with required column families for each row entity by sending batch Get request,
* HBase specific implementation
Expand Down

0 comments on commit 4355a1a

Please sign in to comment.