Skip to content

Commit

Permalink
Refactor Bigtable schema registry classes
Browse files Browse the repository at this point in the history
  • Loading branch information
shydefoo committed Sep 25, 2024
1 parent a8a861b commit 77052b5
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 110 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package dev.caraml.serving.store.bigtable;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public abstract class BaseSchemaRegistry {
protected LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache = null;

protected static String COLUMN_FAMILY = "metadata";
protected static String QUALIFIER = "avro";
protected static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

public abstract GenericDatumReader<GenericRecord> loadReader(SchemaReference reference);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,14 @@
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public class BigTableSchemaRegistry {
public class BigTableSchemaRegistry extends BaseSchemaRegistry {
private final BigtableDataClient client;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public BigTableSchemaRegistry(BigtableDataClient client) {
this.client = client;
Expand All @@ -67,17 +24,8 @@ public BigTableSchemaRegistry(BigtableDataClient client) {
cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
@Override
public GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
Row row =
client.readRow(
reference.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ public HBaseOnlineRetriever(Connection client) {
this.schemaRegistry = new HBaseSchemaRegistry(client);
}

/**
* Generate BigTable key in the form of entity values joined by #.
*
* @param entityRow Single EntityRow representation in feature retrieval call
* @param entityNames List of entities related to feature references in retrieval call
* @return
*/
@Override
public ByteString convertEntityValueToKey(
ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List<String> entityNames) {
Expand All @@ -39,6 +46,15 @@ public ByteString convertEntityValueToKey(
.getBytes());
}

/**
* Converts rowCell feature into @NativeFeature type, HBase specific implementation
*
* @param tableName Name of SSTable
* @param rowKeys List of keys of rows to retrieve
* @param rows Map of rowKey to Row related to it
* @param featureReferences List of feature references
* @return
*/
@Override
public List<List<Feature>> convertRowToFeature(
String tableName,
Expand Down Expand Up @@ -134,6 +150,15 @@ public Map<ByteString, Result> getFeaturesFromSSTable(
}
}

/**
* @param tableName
* @param value
* @param featureReferences
* @param reusedDecoder
* @param timestamp
* @return
* @throws IOException
*/
private List<Feature> decodeFeatures(
String tableName,
ByteString value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
Expand All @@ -17,49 +15,8 @@
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;

public class HBaseSchemaRegistry {
public class HBaseSchemaRegistry extends BaseSchemaRegistry {
private final Connection hbaseClient;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public HBaseSchemaRegistry(Connection hbaseClient) {
this.hbaseClient = hbaseClient;
Expand All @@ -70,17 +27,8 @@ public HBaseSchemaRegistry(Connection hbaseClient) {
cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
@Override
public GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
try {
Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName()));

Expand Down

0 comments on commit 77052b5

Please sign in to comment.