From 77052b5a908b9dc1757742062ff98617b7d512a8 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 25 Sep 2024 14:02:29 +0800 Subject: [PATCH] Refactor Bigtable schema registry classes --- .../store/bigtable/BaseSchemaRegistry.java | 64 +++++++++++++++++++ .../bigtable/BigTableSchemaRegistry.java | 58 +---------------- .../store/bigtable/HBaseOnlineRetriever.java | 25 ++++++++ .../store/bigtable/HBaseSchemaRegistry.java | 58 +---------------- 4 files changed, 95 insertions(+), 110 deletions(-) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java new file mode 100644 index 0000000..6abee93 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java @@ -0,0 +1,64 @@ +package dev.caraml.serving.store.bigtable; + +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.ByteString; +import java.util.concurrent.ExecutionException; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; + +public abstract class BaseSchemaRegistry { + protected LoadingCache> cache = null; + + protected static String COLUMN_FAMILY = "metadata"; + protected static String QUALIFIER = "avro"; + protected static String KEY_PREFIX = "schema#"; + + public static class SchemaReference { + private final String tableName; + private final ByteString schemaHash; + + public SchemaReference(String tableName, ByteString schemaHash) { + this.tableName = tableName; + this.schemaHash = schemaHash; + } + + public String getTableName() { + return tableName; + } + + public ByteString getSchemaHash() { + return schemaHash; + } + + @Override + public int hashCode() { + int result = tableName.hashCode(); + result = 31 * result + schemaHash.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaReference that = (SchemaReference) o; + + if (!tableName.equals(that.tableName)) return false; + return schemaHash.equals(that.schemaHash); + } + } + + public GenericDatumReader getReader(SchemaReference reference) { + GenericDatumReader reader; + try { + reader = this.cache.get(reference); + } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { + throw new RuntimeException(String.format("Unable to find Schema"), e); + } + return reader; + } + + public abstract GenericDatumReader loadReader(SchemaReference reference); +} diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java index d37ae0e..054284b 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java @@ -6,57 +6,14 @@ import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; -import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -public class BigTableSchemaRegistry { +public class BigTableSchemaRegistry extends BaseSchemaRegistry { private final BigtableDataClient client; - private final LoadingCache> cache; - - private static String COLUMN_FAMILY = "metadata"; - private static String QUALIFIER = "avro"; - private static String KEY_PREFIX = "schema#"; - - public static class SchemaReference { - private final String tableName; - private final ByteString schemaHash; - - public SchemaReference(String tableName, ByteString schemaHash) { - this.tableName = tableName; - this.schemaHash = schemaHash; - } - - public String getTableName() { - return tableName; - } - - public ByteString getSchemaHash() { - return schemaHash; - } - - @Override - public int hashCode() { - int result = tableName.hashCode(); - result = 31 * result + schemaHash.hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SchemaReference that = (SchemaReference) o; - - if (!tableName.equals(that.tableName)) return false; - return schemaHash.equals(that.schemaHash); - } - } public BigTableSchemaRegistry(BigtableDataClient client) { this.client = client; @@ -67,17 +24,8 @@ public BigTableSchemaRegistry(BigtableDataClient client) { cache = CacheBuilder.newBuilder().build(schemaCacheLoader); } - public GenericDatumReader getReader(SchemaReference reference) { - GenericDatumReader reader; - try { - reader = this.cache.get(reference); - } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { - throw new RuntimeException(String.format("Unable to find Schema"), e); - } - return reader; - } - - private GenericDatumReader loadReader(SchemaReference reference) { + @Override + public GenericDatumReader loadReader(SchemaReference reference) { Row row = client.readRow( reference.getTableName(), diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 145a901..9bed4d9 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -27,6 +27,13 @@ public HBaseOnlineRetriever(Connection client) { this.schemaRegistry = new HBaseSchemaRegistry(client); } + /** + * Generate BigTable key in the form of entity values joined by #. + * + * @param entityRow Single EntityRow representation in feature retrieval call + * @param entityNames List of entities related to feature references in retrieval call + * @return + */ @Override public ByteString convertEntityValueToKey( ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List entityNames) { @@ -39,6 +46,15 @@ public ByteString convertEntityValueToKey( .getBytes()); } + /** + * Converts rowCell feature into @NativeFeature type, HBase specific implementation + * + * @param tableName Name of SSTable + * @param rowKeys List of keys of rows to retrieve + * @param rows Map of rowKey to Row related to it + * @param featureReferences List of feature references + * @return + */ @Override public List> convertRowToFeature( String tableName, @@ -134,6 +150,15 @@ public Map getFeaturesFromSSTable( } } + /** + * @param tableName + * @param value + * @param featureReferences + * @param reusedDecoder + * @param timestamp + * @return + * @throws IOException + */ private List decodeFeatures( String tableName, ByteString value, diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 6062fdf..ff0a7b2 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -2,11 +2,9 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -17,49 +15,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -public class HBaseSchemaRegistry { +public class HBaseSchemaRegistry extends BaseSchemaRegistry { private final Connection hbaseClient; - private final LoadingCache> cache; - - private static String COLUMN_FAMILY = "metadata"; - private static String QUALIFIER = "avro"; - private static String KEY_PREFIX = "schema#"; - - public static class SchemaReference { - private final String tableName; - private final ByteString schemaHash; - - public SchemaReference(String tableName, ByteString schemaHash) { - this.tableName = tableName; - this.schemaHash = schemaHash; - } - - public String getTableName() { - return tableName; - } - - public ByteString getSchemaHash() { - return schemaHash; - } - - @Override - public int hashCode() { - int result = tableName.hashCode(); - result = 31 * result + schemaHash.hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SchemaReference that = (SchemaReference) o; - - if (!tableName.equals(that.tableName)) return false; - return schemaHash.equals(that.schemaHash); - } - } public HBaseSchemaRegistry(Connection hbaseClient) { this.hbaseClient = hbaseClient; @@ -70,17 +27,8 @@ public HBaseSchemaRegistry(Connection hbaseClient) { cache = CacheBuilder.newBuilder().build(schemaCacheLoader); } - public GenericDatumReader getReader(SchemaReference reference) { - GenericDatumReader reader; - try { - reader = this.cache.get(reference); - } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { - throw new RuntimeException(String.format("Unable to find Schema"), e); - } - return reader; - } - - private GenericDatumReader loadReader(SchemaReference reference) { + @Override + public GenericDatumReader loadReader(SchemaReference reference) { try { Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName()));