Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add HBase SDK for serving #127

Merged
merged 33 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
fd4470c
feat: add HBase SDK for serving
bayu-aditya Sep 3, 2024
ad454e1
Add spark changes to use hbase api
shydefoo Aug 30, 2024
2528ffe
Fix connection issues
shydefoo Sep 2, 2024
773eac1
Fix linting
shydefoo Sep 2, 2024
fd3030a
Add configuration for hbase api
shydefoo Sep 5, 2024
cdfad3d
Set platform to linux/amd64
shydefoo Sep 5, 2024
73dfd8e
Update application.yaml to include hbase
shydefoo Sep 5, 2024
acd54c2
Refator BigTableSinkRelation to use updated classes
shydefoo Sep 5, 2024
f685315
Fix issue due to difference in bigtable and hbase response
shydefoo Sep 6, 2024
0054a14
Fix linting
shydefoo Sep 6, 2024
e1840ef
Fix issue due to difference in bigtable and hbase response
shydefoo Sep 6, 2024
0eb1b13
Fix linting
shydefoo Sep 6, 2024
350783e
Remove commented code
shydefoo Sep 9, 2024
38f7bc7
Clean up comments
shydefoo Sep 17, 2024
691c8d1
Fix application yaml
shydefoo Sep 17, 2024
8855d0a
Merge branch 'bayu/hbase' into hbase-poc
shydefoo Sep 17, 2024
a0e44a3
Add option for hbase for stream ingestion jobs
shydefoo Sep 17, 2024
fa28a59
Fix linting
shydefoo Sep 18, 2024
7292b48
Merge pull request #126 from caraml-dev/hbase-poc
shydefoo Sep 20, 2024
f699815
Add region split policy for hbase
shydefoo Sep 20, 2024
a8a861b
Fix linting
shydefoo Sep 23, 2024
77052b5
Refactor Bigtable schema registry classes
shydefoo Sep 25, 2024
2c64047
Add tests to query bigtable (w hbase sdk) and hbase
shydefoo Sep 25, 2024
fbc5c3f
Fix linting
shydefoo Sep 25, 2024
536deb9
Make compressionAlgo and region split policy type configurable
shydefoo Sep 26, 2024
58c09fc
Test using network host mode
shydefoo Sep 26, 2024
a04f6f7
Fix linting
shydefoo Sep 26, 2024
efc1862
Fix hbase unit test
shydefoo Sep 26, 2024
abcfb3e
Refactor functions
shydefoo Sep 26, 2024
7e7e417
Move avro schema length to BaseSchemRegistry
shydefoo Sep 26, 2024
455e253
add try catch block while create bigtable connection
bayu-aditya Sep 27, 2024
4355a1a
move into convertRowCellsToFeatures function
bayu-aditya Oct 2, 2024
3b4f01a
using single try catch block for connection
bayu-aditya Oct 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion caraml-store-serving/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ dependencies {
implementation 'org.apache.commons:commons-lang3:3.10'
implementation 'org.apache.avro:avro:1.10.2'
implementation platform('com.google.cloud:libraries-bom:26.43.0')
implementation 'com.google.cloud:google-cloud-bigtable:2.40.0'
implementation 'com.google.cloud:google-cloud-bigtable:2.39.2'
implementation 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.14.3'
implementation 'commons-codec:commons-codec:1.17.1'
implementation 'io.lettuce:lettuce-core:6.2.0.RELEASE'
implementation 'io.netty:netty-transport-native-epoll:4.1.52.Final:linux-x86_64'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package dev.caraml.serving.store.bigtable;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public abstract class BaseSchemaRegistry {
protected LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache = null;

protected static String COLUMN_FAMILY = "metadata";
protected static String QUALIFIER = "avro";
protected static String KEY_PREFIX = "schema#";
public static final int SCHEMA_REFERENCE_LENGTH = 4;

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

public abstract GenericDatumReader<GenericRecord> loadReader(SchemaReference reference);
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ private List<Feature> decodeFeatures(
BinaryDecoder reusedDecoder,
long timestamp)
throws IOException {
ByteString schemaReferenceBytes = value.substring(0, 4);
byte[] featureValueBytes = value.substring(4).toByteArray();
ByteString schemaReferenceBytes =
value.substring(0, BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH);
byte[] featureValueBytes =
value.substring(BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH).toByteArray();

BigTableSchemaRegistry.SchemaReference schemaReference =
new BigTableSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,57 +6,14 @@
import com.google.cloud.bigtable.data.v2.models.RowCell;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;

public class BigTableSchemaRegistry {
public class BigTableSchemaRegistry extends BaseSchemaRegistry {
private final BigtableDataClient client;
private final LoadingCache<SchemaReference, GenericDatumReader<GenericRecord>> cache;

private static String COLUMN_FAMILY = "metadata";
private static String QUALIFIER = "avro";
private static String KEY_PREFIX = "schema#";

public static class SchemaReference {
private final String tableName;
private final ByteString schemaHash;

public SchemaReference(String tableName, ByteString schemaHash) {
this.tableName = tableName;
this.schemaHash = schemaHash;
}

public String getTableName() {
return tableName;
}

public ByteString getSchemaHash() {
return schemaHash;
}

@Override
public int hashCode() {
int result = tableName.hashCode();
result = 31 * result + schemaHash.hashCode();
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

SchemaReference that = (SchemaReference) o;

if (!tableName.equals(that.tableName)) return false;
return schemaHash.equals(that.schemaHash);
}
}

public BigTableSchemaRegistry(BigtableDataClient client) {
this.client = client;
Expand All @@ -67,17 +24,8 @@ public BigTableSchemaRegistry(BigtableDataClient client) {
cache = CacheBuilder.newBuilder().build(schemaCacheLoader);
}

public GenericDatumReader<GenericRecord> getReader(SchemaReference reference) {
GenericDatumReader<GenericRecord> reader;
try {
reader = this.cache.get(reference);
} catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) {
throw new RuntimeException(String.format("Unable to find Schema"), e);
}
return reader;
}

private GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
@Override
public GenericDatumReader<GenericRecord> loadReader(SchemaReference reference) {
Row row =
client.readRow(
reference.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import com.google.cloud.bigtable.hbase.BigtableOptionsFactory;
import dev.caraml.serving.store.OnlineRetriever;
import java.io.IOException;
import lombok.Getter;
import lombok.Setter;
import org.apache.hadoop.hbase.client.Connection;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand All @@ -23,10 +26,22 @@ public class BigTableStoreConfig {
private String appProfileId;
private Boolean enableClientSideMetrics;
private Long timeoutMs;
private Boolean isUsingHBaseSDK;

@Bean
public OnlineRetriever getRetriever() {
try {
// Using HBase SDK
if (isUsingHBaseSDK) {
org.apache.hadoop.conf.Configuration config =
BigtableConfiguration.configure(projectId, instanceId);
config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId);

Connection connection = BigtableConfiguration.connect(config);
return new HBaseOnlineRetriever(connection);
}

// Using BigTable SDK
BigtableDataSettings.Builder builder =
BigtableDataSettings.newBuilder()
.setProjectId(projectId)
Expand All @@ -45,6 +60,7 @@ public OnlineRetriever getRetriever() {
}
BigtableDataClient client = BigtableDataClient.create(settings);
return new BigTableOnlineRetriever(client);

} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Loading
Loading