Skip to content

Commit

Permalink
Merge pull request #127 from caraml-dev/bayu/hbase
Browse files Browse the repository at this point in the history
feat: add HBase SDK for serving
  • Loading branch information
bayu-aditya authored Oct 2, 2024
2 parents 9178b66 + 3b4f01a commit b343616
Show file tree
Hide file tree
Showing 21 changed files with 937 additions and 131 deletions.
3 changes: 2 additions & 1 deletion caraml-store-serving/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.10'
implementation 'org.apache.avro:avro:1.10.2'
implementation platform('com.google.cloud:libraries-bom:26.43.0')
implementation 'com.google.cloud:google-cloud-bigtable:2.40.0'
implementation 'com.google.cloud:google-cloud-bigtable:2.39.2'
implementation 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.14.3'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'io.lettuce:lettuce-core:6.2.0.RELEASE'
implementation 'io.netty:netty-transport-native-epoll:4.1.52.Final:linux-x86_64'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dev.caraml.serving.store.bigtable;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public abstract class BaseSchemaRegistry {
protected LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache = null;

protected static String COLUMN_FAMILY = "metadata";
protected static String QUALIFIER = "avro";
protected static String KEY_PREFIX = "schema#";
public static final int SCHEMA_REFERENCE_LENGTH = 4;

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

public abstract GenericDatumReader<GenericRecord> loadReader(SchemaReference reference);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ private List<Feature> decodeFeatures(
BinaryDecoder reusedDecoder,
long timestamp)
throws IOException {
ByteString schemaReferenceBytes = value.substring(0, 4);
byte[] featureValueBytes = value.substring(4).toByteArray();
ByteString schemaReferenceBytes =
value.substring(0, BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH);
byte[] featureValueBytes =
value.substring(BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH).toByteArray();

BigTableSchemaRegistry.SchemaReference schemaReference =
new BigTableSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,14 @@
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public class BigTableSchemaRegistry {
public class BigTableSchemaRegistry extends BaseSchemaRegistry {
private final BigtableDataClient client;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public BigTableSchemaRegistry(BigtableDataClient client) {
this.client = client;
Expand All @@ -67,17 +24,8 @@ public BigTableSchemaRegistry(BigtableDataClient client) {
cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
@Override
public GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
Row row =
client.readRow(
reference.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -23,10 +26,22 @@ public class BigTableStoreConfig {
private String appProfileId;
private Boolean enableClientSideMetrics;
private Long timeoutMs;
private Boolean isUsingHBaseSDK;

@Bean
public OnlineRetriever getRetriever() {
try {
// Using HBase SDK
if (isUsingHBaseSDK) {
org.apache.hadoop.conf.Configuration config =
BigtableConfiguration.configure(projectId, instanceId);
config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId);

Connection connection = BigtableConfiguration.connect(config);
return new HBaseOnlineRetriever(connection);
}

// Using BigTable SDK
BigtableDataSettings.Builder builder =
BigtableDataSettings.newBuilder()
.setProjectId(projectId)
Expand All @@ -45,6 +60,7 @@ public OnlineRetriever getRetriever() {
}
BigtableDataClient client = BigtableDataClient.create(settings);
return new BigTableOnlineRetriever(client);

} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading

0 comments on commit b343616

Please sign in to comment.