From fd4470cf7f48a16b10e9e15f4d0bf831ef290e34 Mon Sep 17 00:00:00 2001 From: Bayu Aditya Date: Tue, 3 Sep 2024 15:35:57 +0700 Subject: [PATCH 01/31] feat: add HBase SDK for serving --- caraml-store-serving/build.gradle | 3 +- .../store/bigtable/BigTableStoreConfig.java | 15 ++ .../store/bigtable/HBaseOnlineRetriever.java | 163 ++++++++++++++++++ .../store/bigtable/HBaseSchemaRegistry.java | 103 +++++++++++ .../src/main/resources/application.yaml | 1 + 5 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java diff --git a/caraml-store-serving/build.gradle b/caraml-store-serving/build.gradle index ccdf03f..0d04182 100644 --- a/caraml-store-serving/build.gradle +++ b/caraml-store-serving/build.gradle @@ -8,7 +8,8 @@ dependencies { implementation 'org.apache.commons:commons-lang3:3.10' implementation 'org.apache.avro:avro:1.10.2' implementation platform('com.google.cloud:libraries-bom:26.43.0') - implementation 'com.google.cloud:google-cloud-bigtable:2.40.0' + implementation 'com.google.cloud:google-cloud-bigtable:2.39.2' + implementation 'com.google.cloud.bigtable:bigtable-hbase-2.x:2.14.3' implementation 'commons-codec:commons-codec:1.17.1' implementation 'io.lettuce:lettuce-core:6.2.0.RELEASE' implementation 'io.netty:netty-transport-native-epoll:4.1.52.Final:linux-x86_64' diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java index 9c0609a..10cad15 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java @@ -2,10 +2,13 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; import dev.caraml.serving.store.OnlineRetriever; import java.io.IOException; import lombok.Getter; import lombok.Setter; +import org.apache.hadoop.hbase.client.Connection; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -23,9 +26,21 @@ public class BigTableStoreConfig { private String appProfileId; private Boolean enableClientSideMetrics; private Long timeoutMs; + private Boolean isUsingHBaseSDK; @Bean public OnlineRetriever getRetriever() { + // Using HBase SDK + if (isUsingHBaseSDK) { + org.apache.hadoop.conf.Configuration config = + BigtableConfiguration.configure(projectId, instanceId); + config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId); + + Connection connection = BigtableConfiguration.connect(config); + return new HBaseOnlineRetriever(connection); + } + + // Using BigTable SDK try { BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilder() diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java new file mode 100644 index 0000000..a52db32 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -0,0 +1,163 @@ +package dev.caraml.serving.store.bigtable; + +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import dev.caraml.serving.store.AvroFeature; +import dev.caraml.serving.store.Feature; +import dev.caraml.store.protobuf.serving.ServingServiceProto; +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; + +public class HBaseOnlineRetriever implements SSTableOnlineRetriever { + private final Connection client; + private final HBaseSchemaRegistry schemaRegistry; + + public HBaseOnlineRetriever(Connection client) { + this.client = client; + this.schemaRegistry = new HBaseSchemaRegistry(client); + } + + @Override + public ByteString convertEntityValueToKey( + ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List entityNames) { + return ByteString.copyFrom( + entityNames.stream() + .sorted() + .map(entity -> entityRow.getFieldsMap().get(entity)) + .map(this::valueToString) + .collect(Collectors.joining("#")) + .getBytes()); + } + + @Override + public List> convertRowToFeature( + String tableName, + List rowKeys, + Map rows, + List featureReferences) { + BinaryDecoder reusedDecoder = DecoderFactory.get().binaryDecoder(new byte[0], null); + + return rowKeys.stream() + .map( + rowKey -> { + if (!rows.containsKey(rowKey)) { + return Collections.emptyList(); + } else { + Result row = rows.get(rowKey); + return featureReferences.stream() + .map(ServingServiceProto.FeatureReference::getFeatureTable) + .distinct() + .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .filter(ls -> !ls.isEmpty()) + .flatMap( + rowCells -> { + Cell rowCell = rowCells.get(0); // Latest cell + String family = Bytes.toString(rowCell.getFamilyArray()); + ByteString value = ByteString.copyFrom(rowCell.getValueArray()); + + List features; + List localFeatureReferences = + featureReferences.stream() + .filter( + featureReference -> + featureReference.getFeatureTable().equals(family)) + .collect(Collectors.toList()); + + try { + features = + decodeFeatures( + tableName, + value, + localFeatureReferences, + reusedDecoder, + rowCell.getTimestamp()); + } catch (IOException e) { + throw new RuntimeException("Failed to decode features from BigTable"); + } + + return features.stream(); + }) + .collect(Collectors.toList()); + } + }) + .collect(Collectors.toList()); + } + + @Override + public Map getFeaturesFromSSTable( + String tableName, List rowKeys, List columnFamilies) { + try { + Table table = this.client.getTable(TableName.valueOf(tableName)); + + // construct query get list + List queryGetList = new ArrayList<>(); + rowKeys.forEach( + rowKey -> { + Get get = new Get(rowKey.toByteArray()); + columnFamilies.forEach(cf -> get.addFamily(cf.getBytes())); + + queryGetList.add(get); + }); + + // fetch data from table + Result[] rows = table.get(queryGetList); + + // construct result + Map result = new HashMap<>(); + Arrays.stream(rows) + .filter(row -> !row.isEmpty()) + .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private List decodeFeatures( + String tableName, + ByteString value, + List featureReferences, + BinaryDecoder reusedDecoder, + long timestamp) + throws IOException { + ByteString schemaReferenceBytes = value.substring(0, 4); + byte[] featureValueBytes = value.substring(4).toByteArray(); + + HBaseSchemaRegistry.SchemaReference schemaReference = + new HBaseSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes); + + GenericDatumReader reader = this.schemaRegistry.getReader(schemaReference); + + reusedDecoder = DecoderFactory.get().binaryDecoder(featureValueBytes, reusedDecoder); + GenericRecord record = reader.read(null, reusedDecoder); + + return featureReferences.stream() + .map( + featureReference -> { + Object featureValue; + try { + featureValue = record.get(featureReference.getName()); + } catch (AvroRuntimeException e) { + // Feature is not found in schema + return null; + } + return new AvroFeature( + featureReference, + Timestamp.newBuilder().setSeconds(timestamp / 1000).build(), + Objects.requireNonNullElseGet(featureValue, Object::new)); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } +} diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java new file mode 100644 index 0000000..7802f19 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -0,0 +1,103 @@ +package dev.caraml.serving.store.bigtable; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +public class HBaseSchemaRegistry { + private final Connection hbaseClient; + private final LoadingCache> cache; + + private static String COLUMN_FAMILY = "metadata"; + private static String QUALIFIER = "avro"; + private static String KEY_PREFIX = "schema#"; + + public static class SchemaReference { + private final String tableName; + private final ByteString schemaHash; + + public SchemaReference(String tableName, ByteString schemaHash) { + this.tableName = tableName; + this.schemaHash = schemaHash; + } + + public String getTableName() { + return tableName; + } + + public ByteString getSchemaHash() { + return schemaHash; + } + + @Override + public int hashCode() { + int result = tableName.hashCode(); + result = 31 * result + schemaHash.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaReference that = (SchemaReference) o; + + if (!tableName.equals(that.tableName)) return false; + return schemaHash.equals(that.schemaHash); + } + } + + public HBaseSchemaRegistry(Connection hbaseClient) { + this.hbaseClient = hbaseClient; + + CacheLoader> schemaCacheLoader = + CacheLoader.from(this::loadReader); + + cache = CacheBuilder.newBuilder().build(schemaCacheLoader); + } + + public GenericDatumReader getReader(SchemaReference reference) { + GenericDatumReader reader; + try { + reader = this.cache.get(reference); + } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { + throw new RuntimeException(String.format("Unable to find Schema"), e); + } + return reader; + } + + private GenericDatumReader loadReader(SchemaReference reference) { + try { + Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName())); + + byte[] rowKey = + ByteString.copyFrom(KEY_PREFIX.getBytes()) + .concat(reference.getSchemaHash()) + .toByteArray(); + Get query = new Get(rowKey); + query.addColumn(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); + + Result result = table.get(query); + + Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); + Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + return new GenericDatumReader<>(schema); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/caraml-store-serving/src/main/resources/application.yaml b/caraml-store-serving/src/main/resources/application.yaml index 5b01830..b963028 100644 --- a/caraml-store-serving/src/main/resources/application.yaml +++ b/caraml-store-serving/src/main/resources/application.yaml @@ -76,6 +76,7 @@ caraml: enableClientSideMetrics: false # Timeout configuration for BigTable client. Set 0 to use the default client configuration. timeoutMs: 0 + isUsingHBaseSDK: true grpc: server: From ad454e1b1b044c1cf97eb77368aab1c2c1e5e603 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 30 Aug 2024 16:51:44 +0800 Subject: [PATCH 02/31] Add spark changes to use hbase api --- .../scala/dev/caraml/spark/BasePipeline.scala | 4 ++ .../dev/caraml/spark/BatchPipeline.scala | 8 ++++ .../scala/dev/caraml/spark/IngestionJob.scala | 3 ++ .../dev/caraml/spark/IngestionJobConfig.scala | 1 + .../bigtable/BigTableSinkRelation.scala | 8 +++- .../spark/stores/bigtable/DefaultSource.scala | 47 +++++++++++++------ .../stores/bigtable/HbaseSinkRelation.scala | 17 +++++++ 7 files changed, 72 insertions(+), 16 deletions(-) create mode 100644 caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala index cb854aa..9fd6322 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala @@ -33,6 +33,10 @@ object BasePipeline { conf .set("spark.bigtable.projectId", projectId) .set("spark.bigtable.instanceId", instanceId) + case HBaseConfig(zookeeperQuorum, zookeeperPort) => + conf + .set("spark.hbase.zookeeper.quorum", zookeeperQuorum) + .set("spark.hbase.zookeeper.port", zookeeperPort.toString) } jobConfig.metrics match { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala index 4ce5d55..d2757f7 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala @@ -66,11 +66,19 @@ object BatchPipeline extends BasePipeline { .map(metrics.incrementRead) .filter(rowValidator.allChecks) + val onlineStore = config.store match { + case _: RedisConfig => "redis" + case _: BigTableConfig => "bigtable" + case _: HBaseConfig => "hbase" + } + validRows.write .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) + .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala index 69196c9..20939b2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJob.scala @@ -87,6 +87,9 @@ object IngestionJob { opt[String](name = "bigtable") .action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig])) + opt[String](name = "hbase") + .action((x, c) => c.copy(store = parseJSON(x).extract[HBaseConfig])) + opt[String](name = "statsd") .action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig]))) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index a13524d..c69c64a 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -27,6 +27,7 @@ case class RedisWriteProperties( ratePerSecondLimit: Int = 50000 ) case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig +case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 8cf36b8..a60668b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,7 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.{Admin, Connection, Put} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -30,8 +30,12 @@ class BigTableSinkRelation( override def schema: StructType = ??? + def getConnection(hadoopConfig: Configuration): Connection = { + BigtableConfiguration.connect(hadoopConfig) + } + def createTable(): Unit = { - val btConn = BigtableConfiguration.connect(hadoopConfig) + val btConn = getConnection(hadoopConfig) try { val admin = btConn.getAdmin diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index 3c31c89..ebaf678 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,27 +23,43 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val bigtableConf = BigtableConfiguration.configure( - sqlContext.getConf(PROJECT_KEY), - sqlContext.getConf(INSTANCE_KEY) - ) - - if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) { - bigtableConf.set( - BIGTABLE_EMULATOR_HOST_KEY, - sqlContext.getConf("spark.bigtable.emulatorHost") + val onlineStore = parameters.getOrElse("onlineStore", "bigtable") + var rel: BigTableSinkRelation = null + if (onlineStore == "bigtable") { + val bigtableConf = BigtableConfiguration.configure( + sqlContext.getConf(PROJECT_KEY), + sqlContext.getConf(INSTANCE_KEY) ) - } - configureBigTableClient(bigtableConf, sqlContext) + if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) { + bigtableConf.set( + BIGTABLE_EMULATOR_HOST_KEY, + sqlContext.getConf("spark.bigtable.emulatorHost") + ) + } + + configureBigTableClient(bigtableConf, sqlContext) - val rel = - new BigTableSinkRelation( + rel = + new BigTableSinkRelation( + sqlContext, + new AvroSerializer, + SparkBigtableConfig.parse(parameters), + bigtableConf + ) + } else if (onlineStore == "hbase"){ + val hbaseConf = new Configuration() + hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY)) + hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY)) + rel = new HbaseSinkRelation( sqlContext, new AvroSerializer, SparkBigtableConfig.parse(parameters), - bigtableConf + hbaseConf ) + } else { + throw new UnsupportedOperationException(s"Unsupported online store: $onlineStore") + } rel.createTable() rel.saveWriteSchema(data) rel.insert(data, overwrite = false) @@ -79,4 +95,7 @@ object DefaultSource { private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs" private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount" private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs" + + private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum" + private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala new file mode 100644 index 0000000..f7596ea --- /dev/null +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -0,0 +1,17 @@ +package dev.caraml.spark.stores.bigtable + +import dev.caraml.spark.serialization.Serializer +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} +import org.apache.spark.sql.SQLContext + +class HbaseSinkRelation( + sqlContext: SQLContext, + serializer: Serializer, + config: SparkBigtableConfig, + hadoopConfig: Configuration + ) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { + override def getConnection(hadoopConfig: Configuration): Connection = { + ConnectionFactory.createConnection(hadoopConfig) + } +} \ No newline at end of file From 2528ffe9e19be569793a8011861a3a17eeac7750 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 2 Sep 2024 15:06:05 +0800 Subject: [PATCH 03/31] Fix connection issues --- .../caraml/spark/stores/bigtable/BigTableSinkRelation.scala | 2 +- .../scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index a60668b..62d7ab2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -119,7 +119,7 @@ class BigTableSinkRelation( val qualifier = "avro".getBytes put.addColumn(metadataColumnFamily.getBytes, qualifier, schema.asInstanceOf[String].getBytes) - val btConn = BigtableConfiguration.connect(hadoopConfig) + val btConn = getConnection(hadoopConfig) try { val table = btConn.getTable(TableName.valueOf(tableName)) table.checkAndPut( diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index ebaf678..2dea536 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,8 +23,9 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val onlineStore = parameters.getOrElse("onlineStore", "bigtable") + val onlineStore = parameters.getOrElse("online_store", "bigtable") var rel: BigTableSinkRelation = null + println(s"onlineStore: $onlineStore") if (onlineStore == "bigtable") { val bigtableConf = BigtableConfiguration.configure( sqlContext.getConf(PROJECT_KEY), From 773eac10a9670766c6d66716b663f83f313db6f7 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 2 Sep 2024 15:51:49 +0800 Subject: [PATCH 04/31] Fix linting --- .../dev/caraml/spark/BatchPipeline.scala | 6 +++--- .../dev/caraml/spark/IngestionJobConfig.scala | 2 +- .../spark/stores/bigtable/DefaultSource.scala | 19 +++++++++---------- .../stores/bigtable/HbaseSinkRelation.scala | 12 ++++++------ 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala index d2757f7..733d7d2 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BatchPipeline.scala @@ -67,16 +67,16 @@ object BatchPipeline extends BasePipeline { .filter(rowValidator.allChecks) val onlineStore = config.store match { - case _: RedisConfig => "redis" + case _: RedisConfig => "redis" case _: BigTableConfig => "bigtable" - case _: HBaseConfig => "hbase" + case _: HBaseConfig => "hbase" } validRows.write .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" - case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index c69c64a..cae6053 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -26,7 +26,7 @@ case class RedisWriteProperties( enableRateLimit: Boolean = false, ratePerSecondLimit: Int = 50000 ) -case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig +case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala index 2dea536..5838aab 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/DefaultSource.scala @@ -23,7 +23,7 @@ class DefaultSource extends CreatableRelationProvider { parameters: Map[String, String], data: DataFrame ): BaseRelation = { - val onlineStore = parameters.getOrElse("online_store", "bigtable") + val onlineStore = parameters.getOrElse("online_store", "bigtable") var rel: BigTableSinkRelation = null println(s"onlineStore: $onlineStore") if (onlineStore == "bigtable") { @@ -41,14 +41,13 @@ class DefaultSource extends CreatableRelationProvider { configureBigTableClient(bigtableConf, sqlContext) - rel = - new BigTableSinkRelation( - sqlContext, - new AvroSerializer, - SparkBigtableConfig.parse(parameters), - bigtableConf - ) - } else if (onlineStore == "hbase"){ + rel = new BigTableSinkRelation( + sqlContext, + new AvroSerializer, + SparkBigtableConfig.parse(parameters), + bigtableConf + ) + } else if (onlineStore == "hbase") { val hbaseConf = new Configuration() hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY)) hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY)) @@ -98,5 +97,5 @@ object DefaultSource { private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs" private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum" - private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" + private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port" } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index f7596ea..6ebf53b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -6,12 +6,12 @@ import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} import org.apache.spark.sql.SQLContext class HbaseSinkRelation( - sqlContext: SQLContext, - serializer: Serializer, - config: SparkBigtableConfig, - hadoopConfig: Configuration - ) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { + sqlContext: SQLContext, + serializer: Serializer, + config: SparkBigtableConfig, + hadoopConfig: Configuration +) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) { override def getConnection(hadoopConfig: Configuration): Connection = { ConnectionFactory.createConnection(hadoopConfig) } -} \ No newline at end of file +} From fd3030ab2a88a9f83d1fdfc5bdb8e1a62574705e Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 10:58:53 +0800 Subject: [PATCH 05/31] Add configuration for hbase api --- .../store/bigtable/HBaseStoreConfig.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java new file mode 100644 index 0000000..83630f1 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -0,0 +1,40 @@ +package dev.caraml.serving.store.bigtable; + +import dev.caraml.serving.store.OnlineRetriever; +import lombok.Getter; +import lombok.Setter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.io.IOException; + +@Configuration +@ConfigurationProperties(prefix = "caraml.store.hbase") +@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") +@Getter +@Setter +public class HBaseStoreConfig { + private String zookeeperQuorum; + private String zookeeperClientPort; + + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try{ + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HBaseOnlineRetriever(connection); + } +} From cdfad3dca834fca499ac1f42d85a6a1de5e7fc31 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 15:29:45 +0800 Subject: [PATCH 06/31] Set platform to linux/amd64 --- caraml-store-spark/docker/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caraml-store-spark/docker/Dockerfile b/caraml-store-spark/docker/Dockerfile index 710d9b8..bb2c48b 100644 --- a/caraml-store-spark/docker/Dockerfile +++ b/caraml-store-spark/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM apache/spark-py:v3.1.3 +FROM --platform=linux/amd64 apache/spark-py:v3.1.3 ARG GCS_CONNECTOR_VERSION=2.2.5 ARG BQ_CONNECTOR_VERSION=0.27.1 From 73dfd8ede974a3ce3f2bbef3af5cdc26b48c8b13 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 15:30:30 +0800 Subject: [PATCH 07/31] Update application.yaml to include hbase --- .../src/main/resources/application.yaml | 74 ++++++++++--------- 1 file changed, 39 insertions(+), 35 deletions(-) diff --git a/caraml-store-serving/src/main/resources/application.yaml b/caraml-store-serving/src/main/resources/application.yaml index b963028..969391f 100644 --- a/caraml-store-serving/src/main/resources/application.yaml +++ b/caraml-store-serving/src/main/resources/application.yaml @@ -33,41 +33,41 @@ caraml: maxExpectedCount: 150 store: - # Active store. Possible values: [redisCluster, redis, bigtable] + # Active store. Possible values: [hbase, redis, bigtable] active: redis - - redis: - host: localhost - port: 6379 - password: "" - ssl: false - - redisCluster: - # Connection string specifies the host:port of Redis instances in the redis cluster. - connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" - # Password authentication. Empty string if password is not set. - password: "" - readFrom: MASTER - # Redis operation timeout in ISO-8601 format - timeout: PT0.5S -# # Uncomment to customize netty behaviour -# tcp: -# # Epoll Channel Option: TCP_KEEPIDLE -# keepIdle: 15 -# # Epoll Channel Option: TCP_KEEPINTVL -# keepInterval: 5 -# # Epoll Channel Option: TCP_KEEPCNT -# keepConnection: 3 -# # Epoll Channel Option: TCP_USER_TIMEOUT -# userConnection: 60000 -# # Uncomment to customize redis cluster topology refresh config -# topologyRefresh: -# # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger) -# enableAllAdaptiveTriggerRefresh: true -# # enable periodic refresh -# enablePeriodicRefresh: false -# # topology refresh period in seconds -# refreshPeriodSecond: 30 + # + # redis: + # host: localhost + # port: 6379 + # password: "" + # ssl: false + # + # redisCluster: + # # Connection string specifies the host:port of Redis instances in the redis cluster. + # connectionString: "localhost:7000,localhost:7001,localhost:7002,localhost:7003,localhost:7004,localhost:7005" + # # Password authentication. Empty string if password is not set. + # password: "" + # readFrom: MASTER + # # Redis operation timeout in ISO-8601 format + # timeout: PT0.5S + # # Uncomment to customize netty behaviour + # tcp: + # # Epoll Channel Option: TCP_KEEPIDLE + # keepIdle: 15 + # # Epoll Channel Option: TCP_KEEPINTVL + # keepInterval: 5 + # # Epoll Channel Option: TCP_KEEPCNT + # keepConnection: 3 + # # Epoll Channel Option: TCP_USER_TIMEOUT + # userConnection: 60000 + # # Uncomment to customize redis cluster topology refresh config + # topologyRefresh: + # # enable adaptive topology refresh from all triggers : MOVED_REDIRECT, ASK_REDIRECT, PERSISTENT_RECONNECTS, UNKNOWN_NODE (since 5.1), and UNCOVERED_SLOT (since 5.2) (see also reconnect attempts for the reconnect trigger) + # enableAllAdaptiveTriggerRefresh: true + # # enable periodic refresh + # enablePeriodicRefresh: false + # # topology refresh period in seconds + # refreshPeriodSecond: 30 bigtable: projectId: gcp-project-name @@ -78,6 +78,10 @@ caraml: timeoutMs: 0 isUsingHBaseSDK: true + hbase: + zookeeperQuorum: 127.0.0.1 + zookeeperClientPort: 2181 + grpc: server: port: 6566 @@ -96,4 +100,4 @@ spring: logging: level: - root: "info" \ No newline at end of file + root: "info" From acd54c2d48b8bbb71a998160698f611e616fe2e6 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 5 Sep 2024 16:59:49 +0800 Subject: [PATCH 08/31] Refator BigTableSinkRelation to use updated classes Use of deprecated classes result in ImmutableHTableDescriptor returned which throws java.lang.UnsupportedOperationException: HTableDescriptor is read-only error --- .../bigtable/BigTableSinkRelation.scala | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 62d7ab2..687037b 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,7 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Admin, Connection, Put} +import org.apache.hadoop.hbase.client.{Admin, ColumnFamilyDescriptorBuilder, Connection, Put, TableDescriptorBuilder} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -40,36 +40,50 @@ class BigTableSinkRelation( val admin = btConn.getAdmin val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) { - val t = new HTableDescriptor(TableName.valueOf(tableName)) - val metadataCF = new HColumnDescriptor(metadataColumnFamily) - t.addFamily(metadataCF) - t + val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) + val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) + tableBuilder.setColumnFamily(cf) + val table = tableBuilder.build() + table +// val t = new HTableDescriptor(TableName.valueOf(tableName)) +// val metadataCF = new HColumnDescriptor(metadataColumnFamily) +// t.addFamily(metadataCF) +// t } else { - admin.getTableDescriptor(TableName.valueOf(tableName)) +// val t = admin.getTableDescriptor(TableName.valueOf(tableName)) + val t = btConn.getTable(TableName.valueOf(tableName)) + t.getDescriptor() } - val featuresCF = new HColumnDescriptor(config.namespace) - if (config.maxAge > 0) { - featuresCF.setTimeToLive(config.maxAge.toInt) +// val featuresCF = new HColumnDescriptor(config.namespace) +// if (config.maxAge > 0) { +// featuresCF.setTimeToLive(config.maxAge.toInt) +// } +// featuresCF.setMaxVersions(1) + val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) + if (config.maxAge > 0){ + featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } + featuresCFBuilder.setMaxVersions(1) + val featuresCF = featuresCFBuilder.build() - featuresCF.setMaxVersions(1) - + println("config.namespaces: ", config.namespace) + val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { - table.addFamily(featuresCF) - +// table.addFamily(featuresCF) + tdb.setColumnFamily(featuresCF) if (!admin.isTableAvailable(table.getTableName)) { - admin.createTable(table) + admin.createTable(tdb.build()) } else { - admin.modifyTable(table) + admin.modifyTable(tdb.build()) } } else if ( config.maxAge > 0 && table .getColumnFamily(config.namespace.getBytes) .getTimeToLive != featuresCF.getTimeToLive ) { - table.modifyFamily(featuresCF) - admin.modifyTable(table) + tdb.modifyColumnFamily(featuresCF) + admin.modifyTable(tdb.build()) } } finally { btConn.close() From f685315f4fb3ea287f49a89760dc4156e0667387 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:43:19 +0800 Subject: [PATCH 09/31] Fix issue due to difference in bigtable and hbase response * Use offset and length to get rowCell values because hbase server returns slightly different response structure than bigtable * This is also applied when looking up the avro schema --- .../store/bigtable/HBaseOnlineRetriever.java | 26 ++++++++++++++++--- .../store/bigtable/HBaseSchemaRegistry.java | 10 ++++++- .../bigtable/BigTableSinkRelation.scala | 5 ++-- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a52db32..a9ff0a8 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -6,6 +6,7 @@ import dev.caraml.serving.store.Feature; import dev.caraml.store.protobuf.serving.ServingServiceProto; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; import org.apache.avro.AvroRuntimeException; @@ -57,13 +58,31 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; + }) +// .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - String family = Bytes.toString(rowCell.getFamilyArray()); - ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// String family = Bytes.toString(rowCell.getFamilyArray()); +// System.out.println("rowCell: " + rowCell.toString()); +// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// System.out.println("value: " + value); + ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -118,6 +137,7 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 7802f19..f9ed029 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -5,6 +5,7 @@ import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -94,7 +95,14 @@ private GenericDatumReader loadReader(SchemaReference reference) Result result = table.get(query); Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); - Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + if (last == null) { + throw new RuntimeException("Schema not found"); + } + ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); + Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 687037b..d9a01f7 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -72,10 +72,11 @@ class BigTableSinkRelation( if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { // table.addFamily(featuresCF) tdb.setColumnFamily(featuresCF) + val t = tdb.build() if (!admin.isTableAvailable(table.getTableName)) { - admin.createTable(tdb.build()) + admin.createTable(t) } else { - admin.modifyTable(tdb.build()) + admin.modifyTable(t) } } else if ( config.maxAge > 0 && table From 0054a146ac894226955f8ba198fb23b9af27ce78 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:49:23 +0800 Subject: [PATCH 10/31] Fix linting --- .../store/bigtable/HBaseOnlineRetriever.java | 48 ++++++++++--------- .../store/bigtable/HBaseSchemaRegistry.java | 12 ++--- .../store/bigtable/HBaseStoreConfig.java | 35 +++++++------- .../bigtable/BigTableSinkRelation.scala | 12 +++-- 4 files changed, 58 insertions(+), 49 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a9ff0a8..847c42d 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -17,7 +17,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseOnlineRetriever implements SSTableOnlineRetriever { private final Connection client; @@ -58,31 +57,37 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; + .map( + cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; }) -// .map(cf -> row.getColumnCells(cf.getBytes(), null)) + // .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell -// String family = Bytes.toString(rowCell.getFamilyArray()); -// System.out.println("rowCell: " + rowCell.toString()); -// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); -// System.out.println("value: " + value); - ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) - .position(rowCell.getValueOffset()) - .limit(rowCell.getValueOffset() + rowCell.getValueLength()) - .slice(); - ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) - .position(rowCell.getFamilyOffset()) - .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) - .slice(); - String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); - ByteString value = ByteString.copyFrom(valueBuffer); + // String family = + // Bytes.toString(rowCell.getFamilyArray()); + // System.out.println("rowCell: " + + // rowCell.toString()); + // ByteString value = + // ByteString.copyFrom(rowCell.getValueArray()); + // System.out.println("value: " + value); + ByteBuffer valueBuffer = + ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = + ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -137,7 +142,6 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); - return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index f9ed029..3af2511 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -16,7 +16,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseSchemaRegistry { private final Connection hbaseClient; @@ -96,12 +95,13 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { - throw new RuntimeException("Schema not found"); + throw new RuntimeException("Schema not found"); } - ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) - .position(last.getValueOffset()) - .limit(last.getValueOffset() + last.getValueLength()) - .slice(); + ByteBuffer schemaBuffer = + ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java index 83630f1..d36203c 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -1,6 +1,7 @@ package dev.caraml.serving.store.bigtable; import dev.caraml.serving.store.OnlineRetriever; +import java.io.IOException; import lombok.Getter; import lombok.Setter; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -11,30 +12,28 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.io.IOException; - @Configuration @ConfigurationProperties(prefix = "caraml.store.hbase") @ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") @Getter @Setter public class HBaseStoreConfig { - private String zookeeperQuorum; - private String zookeeperClientPort; + private String zookeeperQuorum; + private String zookeeperClientPort; - @Bean - public OnlineRetriever getRetriever() { - org.apache.hadoop.conf.Configuration conf; - conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", zookeeperQuorum); - conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); - Connection connection; - try{ - connection = ConnectionFactory.createConnection(conf); - } catch (IOException e) { - throw new RuntimeException(e); - } - - return new HBaseOnlineRetriever(connection); + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try { + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); } + + return new HBaseOnlineRetriever(connection); + } } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index d9a01f7..90d1a67 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -4,7 +4,13 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration import dev.caraml.spark.serialization.Serializer import dev.caraml.spark.utils.StringUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Admin, ColumnFamilyDescriptorBuilder, Connection, Put, TableDescriptorBuilder} +import org.apache.hadoop.hbase.client.{ + Admin, + ColumnFamilyDescriptorBuilder, + Connection, + Put, + TableDescriptorBuilder +} import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName} import org.apache.hadoop.mapred.JobConf @@ -41,7 +47,7 @@ class BigTableSinkRelation( val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) { val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) - val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) + val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) tableBuilder.setColumnFamily(cf) val table = tableBuilder.build() table @@ -61,7 +67,7 @@ class BigTableSinkRelation( // } // featuresCF.setMaxVersions(1) val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) - if (config.maxAge > 0){ + if (config.maxAge > 0) { featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } featuresCFBuilder.setMaxVersions(1) From e1840ef9f07f1e967199a7bb69acf60c0b376ea3 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:43:19 +0800 Subject: [PATCH 11/31] Fix issue due to difference in bigtable and hbase response * Use offset and length to get rowCell values because hbase server returns slightly different response structure than bigtable * This is also applied when looking up the avro schema --- .../store/bigtable/HBaseOnlineRetriever.java | 26 ++++++++++++++++--- .../store/bigtable/HBaseSchemaRegistry.java | 10 ++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a52db32..a9ff0a8 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -6,6 +6,7 @@ import dev.caraml.serving.store.Feature; import dev.caraml.store.protobuf.serving.ServingServiceProto; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; import org.apache.avro.AvroRuntimeException; @@ -57,13 +58,31 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; + }) +// .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - String family = Bytes.toString(rowCell.getFamilyArray()); - ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// String family = Bytes.toString(rowCell.getFamilyArray()); +// System.out.println("rowCell: " + rowCell.toString()); +// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); +// System.out.println("value: " + value); + ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -118,6 +137,7 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); + return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 7802f19..f9ed029 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -5,6 +5,7 @@ import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; @@ -94,7 +95,14 @@ private GenericDatumReader loadReader(SchemaReference reference) Result result = table.get(query); Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); - Schema schema = new Schema.Parser().parse(Bytes.toString(last.getValueArray())); + if (last == null) { + throw new RuntimeException("Schema not found"); + } + ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); + Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e); From 0eb1b135f594ab9258b8cde6cc0c5c0a28f7f2f4 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 6 Sep 2024 22:49:23 +0800 Subject: [PATCH 12/31] Fix linting --- .../store/bigtable/HBaseOnlineRetriever.java | 48 ++++++++++--------- .../store/bigtable/HBaseSchemaRegistry.java | 12 ++--- .../store/bigtable/HBaseStoreConfig.java | 39 +++++++++++++++ 3 files changed, 71 insertions(+), 28 deletions(-) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index a9ff0a8..847c42d 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -17,7 +17,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseOnlineRetriever implements SSTableOnlineRetriever { private final Connection client; @@ -58,31 +57,37 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map(cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; + .map( + cf -> { + List rowCells = row.getColumnCells(cf.getBytes(), null); + System.out.println("Column Family: " + cf); + System.out.println("Row Cells: " + rowCells); + return rowCells; }) -// .map(cf -> row.getColumnCells(cf.getBytes(), null)) + // .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell -// String family = Bytes.toString(rowCell.getFamilyArray()); -// System.out.println("rowCell: " + rowCell.toString()); -// ByteString value = ByteString.copyFrom(rowCell.getValueArray()); -// System.out.println("value: " + value); - ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) - .position(rowCell.getValueOffset()) - .limit(rowCell.getValueOffset() + rowCell.getValueLength()) - .slice(); - ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) - .position(rowCell.getFamilyOffset()) - .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) - .slice(); - String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); - ByteString value = ByteString.copyFrom(valueBuffer); + // String family = + // Bytes.toString(rowCell.getFamilyArray()); + // System.out.println("rowCell: " + + // rowCell.toString()); + // ByteString value = + // ByteString.copyFrom(rowCell.getValueArray()); + // System.out.println("value: " + value); + ByteBuffer valueBuffer = + ByteBuffer.wrap(rowCell.getValueArray()) + .position(rowCell.getValueOffset()) + .limit(rowCell.getValueOffset() + rowCell.getValueLength()) + .slice(); + ByteBuffer familyBuffer = + ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); List features; List localFeatureReferences = @@ -137,7 +142,6 @@ public Map getFeaturesFromSSTable( .filter(row -> !row.isEmpty()) .forEach(row -> result.put(ByteString.copyFrom(row.getRow()), row)); - return result; } catch (IOException e) { throw new RuntimeException(e); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index f9ed029..3af2511 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -16,7 +16,6 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.util.Bytes; public class HBaseSchemaRegistry { private final Connection hbaseClient; @@ -96,12 +95,13 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { - throw new RuntimeException("Schema not found"); + throw new RuntimeException("Schema not found"); } - ByteBuffer schemaBuffer = ByteBuffer.wrap(last.getValueArray()) - .position(last.getValueOffset()) - .limit(last.getValueOffset() + last.getValueLength()) - .slice(); + ByteBuffer schemaBuffer = + ByteBuffer.wrap(last.getValueArray()) + .position(last.getValueOffset()) + .limit(last.getValueOffset() + last.getValueLength()) + .slice(); Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java new file mode 100644 index 0000000..d36203c --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseStoreConfig.java @@ -0,0 +1,39 @@ +package dev.caraml.serving.store.bigtable; + +import dev.caraml.serving.store.OnlineRetriever; +import java.io.IOException; +import lombok.Getter; +import lombok.Setter; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "caraml.store.hbase") +@ConditionalOnProperty(prefix = "caraml.store", name = "active", havingValue = "hbase") +@Getter +@Setter +public class HBaseStoreConfig { + private String zookeeperQuorum; + private String zookeeperClientPort; + + @Bean + public OnlineRetriever getRetriever() { + org.apache.hadoop.conf.Configuration conf; + conf = HBaseConfiguration.create(); + conf.set("hbase.zookeeper.quorum", zookeeperQuorum); + conf.set("hbase.zookeeper.property.clientPort", zookeeperClientPort); + Connection connection; + try { + connection = ConnectionFactory.createConnection(conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + + return new HBaseOnlineRetriever(connection); + } +} From 350783e2540b69e47a2d7f593a1b89217a628188 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 9 Sep 2024 12:28:52 +0800 Subject: [PATCH 13/31] Remove commented code --- .../store/bigtable/HBaseOnlineRetriever.java | 16 +--------------- .../store/bigtable/HBaseSchemaRegistry.java | 1 + 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 847c42d..145a901 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -57,25 +57,11 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map( - cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; - }) - // .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - // String family = - // Bytes.toString(rowCell.getFamilyArray()); - // System.out.println("rowCell: " + - // rowCell.toString()); - // ByteString value = - // ByteString.copyFrom(rowCell.getValueArray()); - // System.out.println("value: " + value); ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) .position(rowCell.getValueOffset()) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 3af2511..6062fdf 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -95,6 +95,7 @@ private GenericDatumReader loadReader(SchemaReference reference) Cell last = result.getColumnLatestCell(COLUMN_FAMILY.getBytes(), QUALIFIER.getBytes()); if (last == null) { + // NOTE: this should never happen throw new RuntimeException("Schema not found"); } ByteBuffer schemaBuffer = From 38f7bc736f8a034e7502f0f374651ef75b240bc0 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 15:51:09 +0800 Subject: [PATCH 14/31] Clean up comments --- .../store/bigtable/HBaseOnlineRetriever.java | 16 +--------------- .../stores/bigtable/BigTableSinkRelation.scala | 14 +------------- 2 files changed, 2 insertions(+), 28 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 847c42d..145a901 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -57,25 +57,11 @@ public List> convertRowToFeature( return featureReferences.stream() .map(ServingServiceProto.FeatureReference::getFeatureTable) .distinct() - .map( - cf -> { - List rowCells = row.getColumnCells(cf.getBytes(), null); - System.out.println("Column Family: " + cf); - System.out.println("Row Cells: " + rowCells); - return rowCells; - }) - // .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .map(cf -> row.getColumnCells(cf.getBytes(), null)) .filter(ls -> !ls.isEmpty()) .flatMap( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell - // String family = - // Bytes.toString(rowCell.getFamilyArray()); - // System.out.println("rowCell: " + - // rowCell.toString()); - // ByteString value = - // ByteString.copyFrom(rowCell.getValueArray()); - // System.out.println("value: " + value); ByteBuffer valueBuffer = ByteBuffer.wrap(rowCell.getValueArray()) .position(rowCell.getValueOffset()) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 90d1a67..36be800 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -51,21 +51,10 @@ class BigTableSinkRelation( tableBuilder.setColumnFamily(cf) val table = tableBuilder.build() table -// val t = new HTableDescriptor(TableName.valueOf(tableName)) -// val metadataCF = new HColumnDescriptor(metadataColumnFamily) -// t.addFamily(metadataCF) -// t } else { -// val t = admin.getTableDescriptor(TableName.valueOf(tableName)) val t = btConn.getTable(TableName.valueOf(tableName)) t.getDescriptor() } - -// val featuresCF = new HColumnDescriptor(config.namespace) -// if (config.maxAge > 0) { -// featuresCF.setTimeToLive(config.maxAge.toInt) -// } -// featuresCF.setMaxVersions(1) val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) if (config.maxAge > 0) { featuresCFBuilder.setTimeToLive(config.maxAge.toInt) @@ -73,10 +62,9 @@ class BigTableSinkRelation( featuresCFBuilder.setMaxVersions(1) val featuresCF = featuresCFBuilder.build() - println("config.namespaces: ", config.namespace) + // TODO: Set compression type for column family val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { -// table.addFamily(featuresCF) tdb.setColumnFamily(featuresCF) val t = tdb.build() if (!admin.isTableAvailable(table.getTableName)) { From 691c8d13af3cab9bcb9baba20816b9dc8e1bc842 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 15:54:47 +0800 Subject: [PATCH 15/31] Fix application yaml --- caraml-store-serving/src/main/resources/application.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/caraml-store-serving/src/main/resources/application.yaml b/caraml-store-serving/src/main/resources/application.yaml index 969391f..e15d3a4 100644 --- a/caraml-store-serving/src/main/resources/application.yaml +++ b/caraml-store-serving/src/main/resources/application.yaml @@ -33,7 +33,7 @@ caraml: maxExpectedCount: 150 store: - # Active store. Possible values: [hbase, redis, bigtable] + # Active store. Possible values: [redisCluster, redis, bigtable, hbase] active: redis # # redis: From a0e44a38b5f32a0087d504476d21f27fd64b03f1 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Tue, 17 Sep 2024 17:09:45 +0800 Subject: [PATCH 16/31] Add option for hbase for stream ingestion jobs --- .../main/scala/dev/caraml/spark/StreamingPipeline.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala index 1620705..1fef66d 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala @@ -77,6 +77,12 @@ object StreamingPipeline extends BasePipeline with Serializable { case _ => Array() } + val onlineStore = config.store match { + case _: RedisConfig => "redis" + case _: BigTableConfig => "bigtable" + case _: HBaseConfig => "hbase" + } + val parsed = input .withColumn("features", featureStruct) .select(metadata :+ col("features.*"): _*) @@ -100,6 +106,7 @@ object StreamingPipeline extends BasePipeline with Serializable { val metadataColName: Array[String] = metadata.map(_.toString) + rowsAfterValidation .map(metrics.incrementRead) .filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks) @@ -108,7 +115,9 @@ object StreamingPipeline extends BasePipeline with Serializable { .format(config.store match { case _: RedisConfig => "dev.caraml.spark.stores.redis" case _: BigTableConfig => "dev.caraml.spark.stores.bigtable" + case _: HBaseConfig => "dev.caraml.spark.stores.bigtable" }) + .option("online_store", onlineStore) .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) From fa28a59f3e732fbab6246c56d1ede461d57a4d7d Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 18 Sep 2024 11:10:37 +0800 Subject: [PATCH 17/31] Fix linting --- .../src/main/scala/dev/caraml/spark/StreamingPipeline.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala index 1fef66d..fedae24 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/StreamingPipeline.scala @@ -106,7 +106,6 @@ object StreamingPipeline extends BasePipeline with Serializable { val metadataColName: Array[String] = metadata.map(_.toString) - rowsAfterValidation .map(metrics.incrementRead) .filter(if (config.doNotIngestInvalidRows) expr("_isValid") else rowValidator.allChecks) From f69981586a14e200ebcb8dcdd09e5675bde3397c Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Fri, 20 Sep 2024 10:32:37 +0800 Subject: [PATCH 18/31] Add region split policy for hbase --- .../bigtable/BigTableSinkRelation.scala | 13 ++--- .../stores/bigtable/HbaseSinkRelation.scala | 51 ++++++++++++++++++- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index 36be800..a428a5d 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -64,6 +64,7 @@ class BigTableSinkRelation( // TODO: Set compression type for column family val tdb = TableDescriptorBuilder.newBuilder(table) + if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { tdb.setColumnFamily(featuresCF) val t = tdb.build() @@ -143,19 +144,19 @@ class BigTableSinkRelation( } } - private def tableName: String = { + protected def tableName: String = { val entities = config.entityColumns.sorted.mkString("__") StringUtils.trimAndHash(s"${config.projectName}__${entities}", maxTableNameLength) } - private def joinEntityKey: UserDefinedFunction = udf { r: Row => + protected def joinEntityKey: UserDefinedFunction = udf { r: Row => ((0 until r.size)).map(r.getString).mkString("#").getBytes } - private val metadataColumnFamily = "metadata" - private val schemaKeyPrefix = "schema#" - private val emptyQualifier = "" - private val maxTableNameLength = 50 + protected val metadataColumnFamily = "metadata" + protected val schemaKeyPrefix = "schema#" + protected val emptyQualifier = "" + protected val maxTableNameLength = 50 private def isSystemColumn(name: String) = (config.entityColumns ++ Seq(config.timestampColumn)).contains(name) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index 6ebf53b..2c50b4a 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -2,7 +2,9 @@ package dev.caraml.spark.stores.bigtable import dev.caraml.spark.serialization.Serializer import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory} +import org.apache.hadoop.hbase.TableName +import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, Connection, ConnectionFactory, TableDescriptorBuilder} +import org.apache.hadoop.hbase.io.compress.Compression import org.apache.spark.sql.SQLContext class HbaseSinkRelation( @@ -14,4 +16,51 @@ class HbaseSinkRelation( override def getConnection(hadoopConfig: Configuration): Connection = { ConnectionFactory.createConnection(hadoopConfig) } + override def createTable(): Unit = { + val hbaseConn = getConnection(hadoopConfig) + try { + val admin = hbaseConn.getAdmin + + val table = if (!admin.isTableAvailable(TableName.valueOf(tableName))) { + val tableBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) + val cf = ColumnFamilyDescriptorBuilder.of(metadataColumnFamily) + tableBuilder.setColumnFamily(cf) + val table = tableBuilder.build() + table + } else { + val t = hbaseConn.getTable(TableName.valueOf(tableName)) + t.getDescriptor() + } + val featuresCFBuilder = ColumnFamilyDescriptorBuilder.newBuilder(config.namespace.getBytes) + if (config.maxAge > 0) { + featuresCFBuilder.setTimeToLive(config.maxAge.toInt) + } + featuresCFBuilder.setMaxVersions(1) + featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + val featuresCF = featuresCFBuilder.build() + + val tdb = TableDescriptorBuilder.newBuilder(table) + // TODO: make this configurable + tdb.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy") + + if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { + tdb.setColumnFamily(featuresCF) + val t = tdb.build() + if (!admin.isTableAvailable(table.getTableName)) { + admin.createTable(t) + } else { + admin.modifyTable(t) + } + } else if ( + config.maxAge > 0 && table + .getColumnFamily(config.namespace.getBytes) + .getTimeToLive != featuresCF.getTimeToLive + ) { + tdb.modifyColumnFamily(featuresCF) + admin.modifyTable(tdb.build()) + } + } finally { + hbaseConn.close() + } + } } From a8a861bff60f7abc0c0c1e564e41307647c545f6 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Mon, 23 Sep 2024 11:11:58 +0800 Subject: [PATCH 19/31] Fix linting --- .../spark/stores/bigtable/HbaseSinkRelation.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index 2c50b4a..4d0a186 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -3,7 +3,12 @@ package dev.caraml.spark.stores.bigtable import dev.caraml.spark.serialization.Serializer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.TableName -import org.apache.hadoop.hbase.client.{ColumnFamilyDescriptorBuilder, Connection, ConnectionFactory, TableDescriptorBuilder} +import org.apache.hadoop.hbase.client.{ + ColumnFamilyDescriptorBuilder, + Connection, + ConnectionFactory, + TableDescriptorBuilder +} import org.apache.hadoop.hbase.io.compress.Compression import org.apache.spark.sql.SQLContext @@ -41,7 +46,9 @@ class HbaseSinkRelation( val tdb = TableDescriptorBuilder.newBuilder(table) // TODO: make this configurable - tdb.setRegionSplitPolicyClassName("org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy") + tdb.setRegionSplitPolicyClassName( + "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy" + ) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { tdb.setColumnFamily(featuresCF) From 77052b5a908b9dc1757742062ff98617b7d512a8 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 25 Sep 2024 14:02:29 +0800 Subject: [PATCH 20/31] Refactor Bigtable schema registry classes --- .../store/bigtable/BaseSchemaRegistry.java | 64 +++++++++++++++++++ .../bigtable/BigTableSchemaRegistry.java | 58 +---------------- .../store/bigtable/HBaseOnlineRetriever.java | 25 ++++++++ .../store/bigtable/HBaseSchemaRegistry.java | 58 +---------------- 4 files changed, 95 insertions(+), 110 deletions(-) create mode 100644 caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java new file mode 100644 index 0000000..6abee93 --- /dev/null +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java @@ -0,0 +1,64 @@ +package dev.caraml.serving.store.bigtable; + +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.protobuf.ByteString; +import java.util.concurrent.ExecutionException; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; + +public abstract class BaseSchemaRegistry { + protected LoadingCache> cache = null; + + protected static String COLUMN_FAMILY = "metadata"; + protected static String QUALIFIER = "avro"; + protected static String KEY_PREFIX = "schema#"; + + public static class SchemaReference { + private final String tableName; + private final ByteString schemaHash; + + public SchemaReference(String tableName, ByteString schemaHash) { + this.tableName = tableName; + this.schemaHash = schemaHash; + } + + public String getTableName() { + return tableName; + } + + public ByteString getSchemaHash() { + return schemaHash; + } + + @Override + public int hashCode() { + int result = tableName.hashCode(); + result = 31 * result + schemaHash.hashCode(); + return result; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SchemaReference that = (SchemaReference) o; + + if (!tableName.equals(that.tableName)) return false; + return schemaHash.equals(that.schemaHash); + } + } + + public GenericDatumReader getReader(SchemaReference reference) { + GenericDatumReader reader; + try { + reader = this.cache.get(reference); + } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { + throw new RuntimeException(String.format("Unable to find Schema"), e); + } + return reader; + } + + public abstract GenericDatumReader loadReader(SchemaReference reference); +} diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java index d37ae0e..054284b 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableSchemaRegistry.java @@ -6,57 +6,14 @@ import com.google.cloud.bigtable.data.v2.models.RowCell; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.Iterables; import com.google.protobuf.ByteString; -import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -public class BigTableSchemaRegistry { +public class BigTableSchemaRegistry extends BaseSchemaRegistry { private final BigtableDataClient client; - private final LoadingCache> cache; - - private static String COLUMN_FAMILY = "metadata"; - private static String QUALIFIER = "avro"; - private static String KEY_PREFIX = "schema#"; - - public static class SchemaReference { - private final String tableName; - private final ByteString schemaHash; - - public SchemaReference(String tableName, ByteString schemaHash) { - this.tableName = tableName; - this.schemaHash = schemaHash; - } - - public String getTableName() { - return tableName; - } - - public ByteString getSchemaHash() { - return schemaHash; - } - - @Override - public int hashCode() { - int result = tableName.hashCode(); - result = 31 * result + schemaHash.hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SchemaReference that = (SchemaReference) o; - - if (!tableName.equals(that.tableName)) return false; - return schemaHash.equals(that.schemaHash); - } - } public BigTableSchemaRegistry(BigtableDataClient client) { this.client = client; @@ -67,17 +24,8 @@ public BigTableSchemaRegistry(BigtableDataClient client) { cache = CacheBuilder.newBuilder().build(schemaCacheLoader); } - public GenericDatumReader getReader(SchemaReference reference) { - GenericDatumReader reader; - try { - reader = this.cache.get(reference); - } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { - throw new RuntimeException(String.format("Unable to find Schema"), e); - } - return reader; - } - - private GenericDatumReader loadReader(SchemaReference reference) { + @Override + public GenericDatumReader loadReader(SchemaReference reference) { Row row = client.readRow( reference.getTableName(), diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 145a901..9bed4d9 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -27,6 +27,13 @@ public HBaseOnlineRetriever(Connection client) { this.schemaRegistry = new HBaseSchemaRegistry(client); } + /** + * Generate BigTable key in the form of entity values joined by #. + * + * @param entityRow Single EntityRow representation in feature retrieval call + * @param entityNames List of entities related to feature references in retrieval call + * @return + */ @Override public ByteString convertEntityValueToKey( ServingServiceProto.GetOnlineFeaturesRequest.EntityRow entityRow, List entityNames) { @@ -39,6 +46,15 @@ public ByteString convertEntityValueToKey( .getBytes()); } + /** + * Converts rowCell feature into @NativeFeature type, HBase specific implementation + * + * @param tableName Name of SSTable + * @param rowKeys List of keys of rows to retrieve + * @param rows Map of rowKey to Row related to it + * @param featureReferences List of feature references + * @return + */ @Override public List> convertRowToFeature( String tableName, @@ -134,6 +150,15 @@ public Map getFeaturesFromSSTable( } } + /** + * @param tableName + * @param value + * @param featureReferences + * @param reusedDecoder + * @param timestamp + * @return + * @throws IOException + */ private List decodeFeatures( String tableName, ByteString value, diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index 6062fdf..ff0a7b2 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -2,11 +2,9 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -17,49 +15,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -public class HBaseSchemaRegistry { +public class HBaseSchemaRegistry extends BaseSchemaRegistry { private final Connection hbaseClient; - private final LoadingCache> cache; - - private static String COLUMN_FAMILY = "metadata"; - private static String QUALIFIER = "avro"; - private static String KEY_PREFIX = "schema#"; - - public static class SchemaReference { - private final String tableName; - private final ByteString schemaHash; - - public SchemaReference(String tableName, ByteString schemaHash) { - this.tableName = tableName; - this.schemaHash = schemaHash; - } - - public String getTableName() { - return tableName; - } - - public ByteString getSchemaHash() { - return schemaHash; - } - - @Override - public int hashCode() { - int result = tableName.hashCode(); - result = 31 * result + schemaHash.hashCode(); - return result; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - SchemaReference that = (SchemaReference) o; - - if (!tableName.equals(that.tableName)) return false; - return schemaHash.equals(that.schemaHash); - } - } public HBaseSchemaRegistry(Connection hbaseClient) { this.hbaseClient = hbaseClient; @@ -70,17 +27,8 @@ public HBaseSchemaRegistry(Connection hbaseClient) { cache = CacheBuilder.newBuilder().build(schemaCacheLoader); } - public GenericDatumReader getReader(SchemaReference reference) { - GenericDatumReader reader; - try { - reader = this.cache.get(reference); - } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { - throw new RuntimeException(String.format("Unable to find Schema"), e); - } - return reader; - } - - private GenericDatumReader loadReader(SchemaReference reference) { + @Override + public GenericDatumReader loadReader(SchemaReference reference) { try { Table table = this.hbaseClient.getTable(TableName.valueOf(reference.getTableName())); From 2c64047c0931c96a4aa423785a3f8961512b27a1 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 25 Sep 2024 15:40:37 +0800 Subject: [PATCH 21/31] Add tests to query bigtable (w hbase sdk) and hbase --- .../bigtable/BigTableOnlineRetrieverTest.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java index 7b51c5f..82655b6 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java @@ -8,6 +8,8 @@ import com.google.cloud.bigtable.data.v2.BigtableDataClient; import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.hbase.BigtableConfiguration; +import com.google.cloud.bigtable.hbase.BigtableOptionsFactory; import com.google.common.hash.Hashing; import com.google.protobuf.ByteString; import dev.caraml.serving.store.Feature; @@ -26,6 +28,8 @@ import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Connection; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.testcontainers.containers.GenericContainer; @@ -40,6 +44,7 @@ public class BigTableOnlineRetrieverTest { static final Integer BIGTABLE_EMULATOR_PORT = 8086; static final String FEAST_PROJECT = "default"; static BigtableDataClient client; + static Connection hbaseClient; static BigtableTableAdminClient adminClient; @Container @@ -74,6 +79,9 @@ public static void setup() throws IOException { .setProjectId(PROJECT_ID) .setInstanceId(INSTANCE_ID) .build()); + Configuration config = BigtableConfiguration.configure(PROJECT_ID, INSTANCE_ID); + config.set(BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, "localhost:" + bigtableEmulator.getMappedPort(BIGTABLE_EMULATOR_PORT)); + hbaseClient = BigtableConfiguration.connect(config); ingestData(); } @@ -227,4 +235,41 @@ public void shouldFilterOutMissingFeatureRef() { assertEquals(1, features.size()); assertEquals(0, features.get(0).size()); } + + @Test + public void shouldRetrieveFeaturesSuccessfullyWhenUsingHbase(){ + HBaseOnlineRetriever retriever = new HBaseOnlineRetriever(hbaseClient); + List featureReferences = + Stream.of("trip_cost", "trip_distance") + .map(f -> FeatureReference.newBuilder().setFeatureTable("rides").setName(f).build()) + .toList(); + List entityNames = List.of("driver"); + List entityRows = + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List> featuresForRows = + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + assertEquals(1, featuresForRows.size()); + List features = featuresForRows.get(0); + assertEquals(2, features.size()); + assertEquals(5L, features.get(0).getFeatureValue(ValueType.Enum.INT64).getInt64Val()); + assertEquals(featureReferences.get(0), features.get(0).getFeatureReference()); + assertEquals(3.5, features.get(1).getFeatureValue(ValueType.Enum.DOUBLE).getDoubleVal()); + assertEquals(featureReferences.get(1), features.get(1).getFeatureReference()); + + } + + @Test + public void shouldFilterOutMissingFeatureRefUsingHbase() { + BigTableOnlineRetriever retriever = new BigTableOnlineRetriever(client); + List featureReferences = + List.of( + FeatureReference.newBuilder().setFeatureTable("rides").setName("not_exists").build()); + List entityNames = List.of("driver"); + List entityRows = + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List> features = + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + assertEquals(1, features.size()); + assertEquals(0, features.get(0).size()); + } } From fbc5c3f1ead5bed38a3d73f415efc7cff5c14e02 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Wed, 25 Sep 2024 17:57:44 +0800 Subject: [PATCH 22/31] Fix linting --- .../bigtable/BigTableOnlineRetrieverTest.java | 27 +-- .../bigtable/GenericHbase2Container.java | 63 ++++++ .../bigtable/HbaseOnlineRetrieverTest.java | 209 ++++++++++++++++++ 3 files changed, 286 insertions(+), 13 deletions(-) create mode 100644 caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java create mode 100644 caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java index 82655b6..0e9fabe 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetrieverTest.java @@ -80,7 +80,9 @@ public static void setup() throws IOException { .setInstanceId(INSTANCE_ID) .build()); Configuration config = BigtableConfiguration.configure(PROJECT_ID, INSTANCE_ID); - config.set(BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, "localhost:" + bigtableEmulator.getMappedPort(BIGTABLE_EMULATOR_PORT)); + config.set( + BigtableOptionsFactory.BIGTABLE_EMULATOR_HOST_KEY, + "localhost:" + bigtableEmulator.getMappedPort(BIGTABLE_EMULATOR_PORT)); hbaseClient = BigtableConfiguration.connect(config); ingestData(); } @@ -237,17 +239,17 @@ public void shouldFilterOutMissingFeatureRef() { } @Test - public void shouldRetrieveFeaturesSuccessfullyWhenUsingHbase(){ + public void shouldRetrieveFeaturesSuccessfullyWhenUsingHbase() { HBaseOnlineRetriever retriever = new HBaseOnlineRetriever(hbaseClient); List featureReferences = - Stream.of("trip_cost", "trip_distance") - .map(f -> FeatureReference.newBuilder().setFeatureTable("rides").setName(f).build()) - .toList(); + Stream.of("trip_cost", "trip_distance") + .map(f -> FeatureReference.newBuilder().setFeatureTable("rides").setName(f).build()) + .toList(); List entityNames = List.of("driver"); List entityRows = - List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); List> featuresForRows = - retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); assertEquals(1, featuresForRows.size()); List features = featuresForRows.get(0); assertEquals(2, features.size()); @@ -255,20 +257,19 @@ public void shouldRetrieveFeaturesSuccessfullyWhenUsingHbase(){ assertEquals(featureReferences.get(0), features.get(0).getFeatureReference()); assertEquals(3.5, features.get(1).getFeatureValue(ValueType.Enum.DOUBLE).getDoubleVal()); assertEquals(featureReferences.get(1), features.get(1).getFeatureReference()); - } @Test public void shouldFilterOutMissingFeatureRefUsingHbase() { - BigTableOnlineRetriever retriever = new BigTableOnlineRetriever(client); + HBaseOnlineRetriever retriever = new HBaseOnlineRetriever(hbaseClient); List featureReferences = - List.of( - FeatureReference.newBuilder().setFeatureTable("rides").setName("not_exists").build()); + List.of( + FeatureReference.newBuilder().setFeatureTable("rides").setName("not_exists").build()); List entityNames = List.of("driver"); List entityRows = - List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); List> features = - retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); assertEquals(1, features.size()); assertEquals(0, features.get(0).size()); } diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java new file mode 100644 index 0000000..d0f269e --- /dev/null +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java @@ -0,0 +1,63 @@ +package dev.caraml.serving.store.bigtable; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.Arrays; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +public class GenericHbase2Container extends GenericContainer { + + private final String hostName; + private final Configuration hbase2Configuration = HBaseConfiguration.create(); + + public GenericHbase2Container() { + super(DockerImageName.parse("jcjabouille/hbase-standalone:2.4.9")); + { + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + } + int masterPort = 16010; + addExposedPort(masterPort); + int regionPort = 16011; + addExposedPort(regionPort); + addExposedPort(2181); + + withCreateContainerCmdModifier( + cmd -> { + cmd.withHostName(hostName); + }); + + waitingFor(Wait.forLogMessage(".*running regionserver.*", 1)); + withStartupTimeout(Duration.ofMinutes(10)); + + withEnv("HBASE_MASTER_PORT", Integer.toString(masterPort)); + withEnv("HBASE_REGION_PORT", Integer.toString(regionPort)); + setPortBindings( + Arrays.asList( + String.format("%d:%d", masterPort, masterPort), + String.format("%d:%d", regionPort, regionPort))); + } + + @Override + protected void doStart() { + super.doStart(); + + hbase2Configuration.set("hbase.client.pause", "200"); + hbase2Configuration.set("hbase.client.retries.number", "10"); + hbase2Configuration.set("hbase.rpc.timeout", "3000"); + hbase2Configuration.set("hbase.client.operation.timeout", "3000"); + hbase2Configuration.set("hbase.client.scanner.timeout.period", "10000"); + hbase2Configuration.set("zookeeper.session.timeout", "10000"); + hbase2Configuration.set("hbase.zookeeper.quorum", "localhost"); + hbase2Configuration.set( + "hbase.zookeeper.property.clientPort", Integer.toString(getMappedPort(2181))); + } +} diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java new file mode 100644 index 0000000..e30555a --- /dev/null +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java @@ -0,0 +1,209 @@ +package dev.caraml.serving.store.bigtable; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.common.hash.Hashing; +import dev.caraml.serving.store.Feature; +import dev.caraml.store.protobuf.serving.ServingServiceProto; +import dev.caraml.store.protobuf.types.ValueProto; +import dev.caraml.store.testutils.it.DataGenerator; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.avro.io.Encoder; +import org.apache.avro.io.EncoderFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class HbaseOnlineRetrieverTest { + static Connection hbaseClient; + static HBaseAdmin admin; + static Configuration hbaseConfiguration = HBaseConfiguration.create(); + static final String FEAST_PROJECT = "default"; + + @Container public static GenericHbase2Container hbase = new GenericHbase2Container(); + + @BeforeAll + public static void setup() throws IOException { + hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); + hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); + hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); + admin = (HBaseAdmin) hbaseClient.getAdmin(); + ingestData(); + } + + private static void ingestData() throws IOException { + String featureTableName = "rides"; + + /** Single Entity Ingestion Workflow */ + Schema schema = + SchemaBuilder.record("DriverData") + .namespace(featureTableName) + .fields() + .requiredLong("trip_cost") + .requiredDouble("trip_distance") + .nullableString("trip_empty", "null") + .requiredString("trip_wrong_type") + .endRecord(); + createTable(FEAST_PROJECT, List.of("driver"), List.of(featureTableName)); + insertSchema(FEAST_PROJECT, List.of("driver"), schema); + + GenericRecord record = + new GenericRecordBuilder(schema) + .set("trip_cost", 5L) + .set("trip_distance", 3.5) + .set("trip_empty", null) + .set("trip_wrong_type", "test") + .build(); + String entityKey = String.valueOf(DataGenerator.createInt64Value(1).getInt64Val()); + insertRow(FEAST_PROJECT, List.of("driver"), entityKey, featureTableName, schema, record); + } + + private static String getTableName(String project, List entityNames) { + return String.format("%s__%s", project, String.join("__", entityNames)); + } + + private static byte[] serializedSchemaReference(Schema schema) { + return Hashing.murmur3_32().hashBytes(schema.toString().getBytes()).asBytes(); + } + + private static void createTable( + String project, List entityNames, List featureTables) { + String tableName = getTableName(project, entityNames); + + List columnFamilies = + Stream.concat(featureTables.stream(), Stream.of("metadata")).toList(); + TableDescriptorBuilder tb = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); + columnFamilies.forEach(cf -> tb.setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf))); + try { + if (admin.tableExists(TableName.valueOf(tableName))) { + return; + } + admin.createTable(tb.build()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static void insertSchema(String project, List entityNames, Schema schema) + throws IOException { + String tableName = getTableName(project, entityNames); + byte[] schemaReference = serializedSchemaReference(schema); + byte[] schemaKey = createSchemaKey(schemaReference); + Table table = hbaseClient.getTable(TableName.valueOf(tableName)); + Put put = new Put(schemaKey); + put.addColumn("metadata".getBytes(), "avro".getBytes(), schema.toString().getBytes()); + table.put(put); + table.close(); + } + + private static byte[] createSchemaKey(byte[] schemaReference) throws IOException { + String schemaKeyPrefix = "schema#"; + ByteArrayOutputStream concatOutputStream = new ByteArrayOutputStream(); + concatOutputStream.write(schemaKeyPrefix.getBytes()); + concatOutputStream.write(schemaReference); + return concatOutputStream.toByteArray(); + } + + private static byte[] createEntityValue(Schema schema, GenericRecord record) throws IOException { + byte[] schemaReference = serializedSchemaReference(schema); + // Entity-Feature Row + byte[] avroSerializedFeatures = recordToAvro(record, schema); + + ByteArrayOutputStream concatOutputStream = new ByteArrayOutputStream(); + concatOutputStream.write(schemaReference); + concatOutputStream.write("".getBytes()); + concatOutputStream.write(avroSerializedFeatures); + byte[] entityFeatureValue = concatOutputStream.toByteArray(); + + return entityFeatureValue; + } + + private static byte[] recordToAvro(GenericRecord datum, Schema schema) throws IOException { + GenericDatumWriter writer = new GenericDatumWriter<>(schema); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + Encoder encoder = EncoderFactory.get().binaryEncoder(output, null); + writer.write(datum, encoder); + encoder.flush(); + + return output.toByteArray(); + } + + private static void insertRow( + String project, + List entityNames, + String entityKey, + String featureTableName, + Schema schema, + GenericRecord record) + throws IOException { + byte[] entityFeatureValue = createEntityValue(schema, record); + String tableName = getTableName(project, entityNames); + + // Update Compound Entity-Feature Row + Table table = hbaseClient.getTable(TableName.valueOf(tableName)); + Put put = new Put(entityKey.getBytes()); + put.addColumn(featureTableName.getBytes(), "".getBytes(), entityFeatureValue); + table.put(put); + table.close(); + } + + @Test + public void shouldRetrieveFeaturesSuccessfully() { + HBaseOnlineRetriever retriever = new HBaseOnlineRetriever(hbaseClient); + List featureReferences = + Stream.of("trip_cost", "trip_distance") + .map( + f -> + ServingServiceProto.FeatureReference.newBuilder() + .setFeatureTable("rides") + .setName(f) + .build()) + .toList(); + List entityNames = List.of("driver"); + List entityRows = + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List> featuresForRows = + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + assertEquals(1, featuresForRows.size()); + List features = featuresForRows.get(0); + assertEquals(2, features.size()); + assertEquals( + 5L, features.get(0).getFeatureValue(ValueProto.ValueType.Enum.INT64).getInt64Val()); + assertEquals(featureReferences.get(0), features.get(0).getFeatureReference()); + assertEquals( + 3.5, features.get(1).getFeatureValue(ValueProto.ValueType.Enum.DOUBLE).getDoubleVal()); + assertEquals(featureReferences.get(1), features.get(1).getFeatureReference()); + } + + @Test + public void shouldFilterOutMissingFeatureRefUsingHbase() { + HBaseOnlineRetriever retriever = new HBaseOnlineRetriever(hbaseClient); + List featureReferences = + List.of( + ServingServiceProto.FeatureReference.newBuilder() + .setFeatureTable("rides") + .setName("not_exists") + .build()); + List entityNames = List.of("driver"); + List entityRows = + List.of(DataGenerator.createEntityRow("driver", DataGenerator.createInt64Value(1), 100)); + List> features = + retriever.getOnlineFeatures(FEAST_PROJECT, entityRows, featureReferences, entityNames); + assertEquals(1, features.size()); + assertEquals(0, features.get(0).size()); + } +} From 536deb9526f352758fa5237da193e92a2d74acbf Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 12:32:40 +0800 Subject: [PATCH 23/31] Make compressionAlgo and region split policy type configurable --- .../main/scala/dev/caraml/spark/BasePipeline.scala | 4 +++- .../scala/dev/caraml/spark/IngestionJobConfig.scala | 3 ++- .../spark/stores/bigtable/BigTableSinkRelation.scala | 1 - .../spark/stores/bigtable/HbaseSinkRelation.scala | 12 +++++++++--- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala index 9fd6322..2c29e3f 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala @@ -33,10 +33,12 @@ object BasePipeline { conf .set("spark.bigtable.projectId", projectId) .set("spark.bigtable.instanceId", instanceId) - case HBaseConfig(zookeeperQuorum, zookeeperPort) => + case HBaseConfig(zookeeperQuorum, zookeeperPort, hbaseProperties) => conf .set("spark.hbase.zookeeper.quorum", zookeeperQuorum) .set("spark.hbase.zookeeper.port", zookeeperPort.toString) + .set("spark.hbase.properties.regionSplitPolicyClassName", hbaseProperties.regionSplitPolicy) + .set("spark.hbase.properties.compressionAlgorithm", hbaseProperties.compressionAlgorithm) } jobConfig.metrics match { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index cae6053..f4eb803 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -27,7 +27,8 @@ case class RedisWriteProperties( ratePerSecondLimit: Int = 50000 ) case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig -case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig +case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int, hbaseProperties: HBaseProperties = HBaseProperties()) extends StoreConfig +case class HBaseProperties(regionSplitPolicy: String = "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy", compressionAlgorithm: String = "ZSTD") sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala index a428a5d..a5049d3 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/BigTableSinkRelation.scala @@ -62,7 +62,6 @@ class BigTableSinkRelation( featuresCFBuilder.setMaxVersions(1) val featuresCF = featuresCFBuilder.build() - // TODO: Set compression type for column family val tdb = TableDescriptorBuilder.newBuilder(table) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index 4d0a186..82e3d44 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -41,13 +41,18 @@ class HbaseSinkRelation( featuresCFBuilder.setTimeToLive(config.maxAge.toInt) } featuresCFBuilder.setMaxVersions(1) - featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + sqlContext.getConf("spark.hbase.properties.compressionAlgorithm") match { + case "ZSTD" => featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + case "GZ" => featuresCFBuilder.setCompressionType(Compression.Algorithm.GZ) + case "LZ4" => featuresCFBuilder.setCompressionType(Compression.Algorithm.LZ4) + case "SNAPPY" => featuresCFBuilder.setCompressionType(Compression.Algorithm.SNAPPY) + case _ => featuresCFBuilder.setCompressionType(Compression.Algorithm.NONE) + } val featuresCF = featuresCFBuilder.build() val tdb = TableDescriptorBuilder.newBuilder(table) - // TODO: make this configurable tdb.setRegionSplitPolicyClassName( - "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy" + sqlContext.getConf("spark.hbase.properties.regionSplitPolicyClassName") ) if (!table.getColumnFamilyNames.contains(config.namespace.getBytes)) { @@ -66,6 +71,7 @@ class HbaseSinkRelation( tdb.modifyColumnFamily(featuresCF) admin.modifyTable(tdb.build()) } + } finally { hbaseConn.close() } From 58c09fc671475658e8ffab834c1a5929f6328f0f Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 13:13:06 +0800 Subject: [PATCH 24/31] Test using network host mode --- .../store/bigtable/GenericHbase2Container.java | 13 ++++++++----- .../store/bigtable/HbaseOnlineRetrieverTest.java | 8 +++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java index d0f269e..24b0fd2 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java @@ -13,7 +13,7 @@ public class GenericHbase2Container extends GenericContainer { private final String hostName; - private final Configuration hbase2Configuration = HBaseConfiguration.create(); + public final Configuration hbase2Configuration = HBaseConfiguration.create(); public GenericHbase2Container() { super(DockerImageName.parse("jcjabouille/hbase-standalone:2.4.9")); @@ -40,10 +40,13 @@ public GenericHbase2Container() { withEnv("HBASE_MASTER_PORT", Integer.toString(masterPort)); withEnv("HBASE_REGION_PORT", Integer.toString(regionPort)); - setPortBindings( - Arrays.asList( - String.format("%d:%d", masterPort, masterPort), - String.format("%d:%d", regionPort, regionPort))); +// setPortBindings( +// Arrays.asList( +// String.format("%d:%d", masterPort, masterPort), +// String.format("%d:%d", regionPort, regionPort))); + + // Set network mode to host + withNetworkMode("host"); } @Override diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java index e30555a..87b0a44 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java @@ -38,9 +38,11 @@ public class HbaseOnlineRetrieverTest { @BeforeAll public static void setup() throws IOException { - hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); - hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); - hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); +// hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); +// hbaseConfiguration.set("hbase.zookeeper.property.clientPort", hbase.getMappedPort(2181).toString()); +// hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); +// hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); + hbaseClient = ConnectionFactory.createConnection(hbase.hbase2Configuration); admin = (HBaseAdmin) hbaseClient.getAdmin(); ingestData(); } From a04f6f775b6fc649cfbb1e559ce957f0c0fab919 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 13:14:39 +0800 Subject: [PATCH 25/31] Fix linting --- .../store/bigtable/GenericHbase2Container.java | 9 ++++----- .../store/bigtable/HbaseOnlineRetrieverTest.java | 9 +++++---- .../main/scala/dev/caraml/spark/BasePipeline.scala | 5 ++++- .../dev/caraml/spark/IngestionJobConfig.scala | 14 +++++++++++--- .../spark/stores/bigtable/HbaseSinkRelation.scala | 8 ++++---- 5 files changed, 28 insertions(+), 17 deletions(-) diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java index 24b0fd2..232bcbf 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java @@ -3,7 +3,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.time.Duration; -import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.testcontainers.containers.GenericContainer; @@ -40,10 +39,10 @@ public GenericHbase2Container() { withEnv("HBASE_MASTER_PORT", Integer.toString(masterPort)); withEnv("HBASE_REGION_PORT", Integer.toString(regionPort)); -// setPortBindings( -// Arrays.asList( -// String.format("%d:%d", masterPort, masterPort), -// String.format("%d:%d", regionPort, regionPort))); + // setPortBindings( + // Arrays.asList( + // String.format("%d:%d", masterPort, masterPort), + // String.format("%d:%d", regionPort, regionPort))); // Set network mode to host withNetworkMode("host"); diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java index 87b0a44..56a1881 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java @@ -38,10 +38,11 @@ public class HbaseOnlineRetrieverTest { @BeforeAll public static void setup() throws IOException { -// hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); -// hbaseConfiguration.set("hbase.zookeeper.property.clientPort", hbase.getMappedPort(2181).toString()); -// hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); -// hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); + // hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); + // hbaseConfiguration.set("hbase.zookeeper.property.clientPort", + // hbase.getMappedPort(2181).toString()); + // hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); + // hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); hbaseClient = ConnectionFactory.createConnection(hbase.hbase2Configuration); admin = (HBaseAdmin) hbaseClient.getAdmin(); ingestData(); diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala index 2c29e3f..82f1d08 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/BasePipeline.scala @@ -37,7 +37,10 @@ object BasePipeline { conf .set("spark.hbase.zookeeper.quorum", zookeeperQuorum) .set("spark.hbase.zookeeper.port", zookeeperPort.toString) - .set("spark.hbase.properties.regionSplitPolicyClassName", hbaseProperties.regionSplitPolicy) + .set( + "spark.hbase.properties.regionSplitPolicyClassName", + hbaseProperties.regionSplitPolicy + ) .set("spark.hbase.properties.compressionAlgorithm", hbaseProperties.compressionAlgorithm) } diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala index f4eb803..639f2e5 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/IngestionJobConfig.scala @@ -26,9 +26,17 @@ case class RedisWriteProperties( enableRateLimit: Boolean = false, ratePerSecondLimit: Int = 50000 ) -case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig -case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int, hbaseProperties: HBaseProperties = HBaseProperties()) extends StoreConfig -case class HBaseProperties(regionSplitPolicy: String = "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy", compressionAlgorithm: String = "ZSTD") +case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig +case class HBaseConfig( + zookeeperQuorum: String, + zookeeperPort: Int, + hbaseProperties: HBaseProperties = HBaseProperties() +) extends StoreConfig +case class HBaseProperties( + regionSplitPolicy: String = + "org.apache.hadoop.hbase.regionserver.IncreasingToUpperBoundRegionSplitPolicy", + compressionAlgorithm: String = "ZSTD" +) sealed trait MetricConfig diff --git a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala index 82e3d44..97cc575 100644 --- a/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala +++ b/caraml-store-spark/src/main/scala/dev/caraml/spark/stores/bigtable/HbaseSinkRelation.scala @@ -42,11 +42,11 @@ class HbaseSinkRelation( } featuresCFBuilder.setMaxVersions(1) sqlContext.getConf("spark.hbase.properties.compressionAlgorithm") match { - case "ZSTD" => featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) - case "GZ" => featuresCFBuilder.setCompressionType(Compression.Algorithm.GZ) - case "LZ4" => featuresCFBuilder.setCompressionType(Compression.Algorithm.LZ4) + case "ZSTD" => featuresCFBuilder.setCompressionType(Compression.Algorithm.ZSTD) + case "GZ" => featuresCFBuilder.setCompressionType(Compression.Algorithm.GZ) + case "LZ4" => featuresCFBuilder.setCompressionType(Compression.Algorithm.LZ4) case "SNAPPY" => featuresCFBuilder.setCompressionType(Compression.Algorithm.SNAPPY) - case _ => featuresCFBuilder.setCompressionType(Compression.Algorithm.NONE) + case _ => featuresCFBuilder.setCompressionType(Compression.Algorithm.NONE) } val featuresCF = featuresCFBuilder.build() From efc1862c6534fc949ef8076a5f7892d967229f47 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 17:41:39 +0800 Subject: [PATCH 26/31] Fix hbase unit test --- .../bigtable/GenericHbase2Container.java | 38 ++++--------------- .../bigtable/HbaseOnlineRetrieverTest.java | 5 --- 2 files changed, 8 insertions(+), 35 deletions(-) diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java index 232bcbf..58fa2bb 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/GenericHbase2Container.java @@ -1,7 +1,5 @@ package dev.caraml.serving.store.bigtable; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.time.Duration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -11,41 +9,21 @@ public class GenericHbase2Container extends GenericContainer { - private final String hostName; + private final String hostName = "hbase-docker"; public final Configuration hbase2Configuration = HBaseConfiguration.create(); public GenericHbase2Container() { - super(DockerImageName.parse("jcjabouille/hbase-standalone:2.4.9")); - { - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - throw new RuntimeException(e); - } - } - int masterPort = 16010; - addExposedPort(masterPort); - int regionPort = 16011; - addExposedPort(regionPort); - addExposedPort(2181); - + super(DockerImageName.parse("dajobe/hbase:latest")); withCreateContainerCmdModifier( cmd -> { cmd.withHostName(hostName); }); - waitingFor(Wait.forLogMessage(".*running regionserver.*", 1)); - withStartupTimeout(Duration.ofMinutes(10)); - - withEnv("HBASE_MASTER_PORT", Integer.toString(masterPort)); - withEnv("HBASE_REGION_PORT", Integer.toString(regionPort)); - // setPortBindings( - // Arrays.asList( - // String.format("%d:%d", masterPort, masterPort), - // String.format("%d:%d", regionPort, regionPort))); - - // Set network mode to host withNetworkMode("host"); + withEnv("HBASE_DOCKER_HOSTNAME", "127.0.0.1"); + + waitingFor(Wait.forLogMessage(".*master.HMaster: Master has completed initialization.*", 1)); + withStartupTimeout(Duration.ofMinutes(10)); } @Override @@ -56,10 +34,10 @@ protected void doStart() { hbase2Configuration.set("hbase.client.retries.number", "10"); hbase2Configuration.set("hbase.rpc.timeout", "3000"); hbase2Configuration.set("hbase.client.operation.timeout", "3000"); + hbase2Configuration.set("hbase.rpc.timeout", "3000"); hbase2Configuration.set("hbase.client.scanner.timeout.period", "10000"); hbase2Configuration.set("zookeeper.session.timeout", "10000"); hbase2Configuration.set("hbase.zookeeper.quorum", "localhost"); - hbase2Configuration.set( - "hbase.zookeeper.property.clientPort", Integer.toString(getMappedPort(2181))); + hbase2Configuration.set("hbase.zookeeper.property.clientPort", "2181"); } } diff --git a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java index 56a1881..45fff5f 100644 --- a/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java +++ b/caraml-store-serving/src/test/java/dev/caraml/serving/store/bigtable/HbaseOnlineRetrieverTest.java @@ -38,11 +38,6 @@ public class HbaseOnlineRetrieverTest { @BeforeAll public static void setup() throws IOException { - // hbaseConfiguration.set("hbase.zookeeper.quorum", hbase.getHost()); - // hbaseConfiguration.set("hbase.zookeeper.property.clientPort", - // hbase.getMappedPort(2181).toString()); - // hbaseConfiguration.set("hbase.zookeeper.property.clientPort", "2181"); - // hbaseClient = ConnectionFactory.createConnection(hbaseConfiguration); hbaseClient = ConnectionFactory.createConnection(hbase.hbase2Configuration); admin = (HBaseAdmin) hbaseClient.getAdmin(); ingestData(); From abcfb3edcb5aa075f763d99915e7c45d748c8d20 Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 18:14:09 +0800 Subject: [PATCH 27/31] Refactor functions --- .../store/bigtable/HBaseOnlineRetriever.java | 33 ++++++++++++------- .../store/bigtable/HBaseSchemaRegistry.java | 13 +++++--- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 9bed4d9..38677c7 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -28,7 +28,7 @@ public HBaseOnlineRetriever(Connection client) { } /** - * Generate BigTable key in the form of entity values joined by #. + * Generate Hbase key in the form of entity values joined by #. * * @param entityRow Single EntityRow representation in feature retrieval call * @param entityNames List of entities related to feature references in retrieval call @@ -53,7 +53,7 @@ public ByteString convertEntityValueToKey( * @param rowKeys List of keys of rows to retrieve * @param rows Map of rowKey to Row related to it * @param featureReferences List of feature references - * @return + * @return List of List of Features associated with respective rowKey */ @Override public List> convertRowToFeature( @@ -79,10 +79,7 @@ public List> convertRowToFeature( rowCells -> { Cell rowCell = rowCells.get(0); // Latest cell ByteBuffer valueBuffer = - ByteBuffer.wrap(rowCell.getValueArray()) - .position(rowCell.getValueOffset()) - .limit(rowCell.getValueOffset() + rowCell.getValueLength()) - .slice(); + HBaseSchemaRegistry.GetValueByteBufferFromRowCell(rowCell); ByteBuffer familyBuffer = ByteBuffer.wrap(rowCell.getFamilyArray()) .position(rowCell.getFamilyOffset()) @@ -119,6 +116,15 @@ public List> convertRowToFeature( .collect(Collectors.toList()); } + /** + * Retrieve rows with required column families for each row entity by sending batch Get request, + * HBase specific implementation + * + * @param tableName Name of SSTable + * @param rowKeys List of keys of rows to retrieve + * @param columnFamilies List of column names + * @return + */ @Override public Map getFeaturesFromSSTable( String tableName, List rowKeys, List columnFamilies) { @@ -151,12 +157,15 @@ public Map getFeaturesFromSSTable( } /** - * @param tableName - * @param value - * @param featureReferences - * @param reusedDecoder - * @param timestamp - * @return + * Decode features from Avro serialized bytes + * + * @param tableName Name of Hbase table + * @param value Value of HBase cell where first 4 bytes represents the schema reference and the + * remaining bytes represent the avro-serialized features + * @param featureReferences List of feature references + * @param reusedDecoder Decoder for decoding feature values + * @param timestamp Timesttamp of rowcell + * @return @NativeFeature with retrieved value stored in Hbase Cell * @throws IOException */ private List decodeFeatures( diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java index ff0a7b2..8fc4612 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseSchemaRegistry.java @@ -46,15 +46,18 @@ public GenericDatumReader loadReader(SchemaReference reference) { // NOTE: this should never happen throw new RuntimeException("Schema not found"); } - ByteBuffer schemaBuffer = - ByteBuffer.wrap(last.getValueArray()) - .position(last.getValueOffset()) - .limit(last.getValueOffset() + last.getValueLength()) - .slice(); + ByteBuffer schemaBuffer = GetValueByteBufferFromRowCell(last); Schema schema = new Schema.Parser().parse(ByteString.copyFrom(schemaBuffer).toStringUtf8()); return new GenericDatumReader<>(schema); } catch (IOException e) { throw new RuntimeException(e); } } + + public static ByteBuffer GetValueByteBufferFromRowCell(Cell cell) { + return ByteBuffer.wrap(cell.getValueArray()) + .position(cell.getValueOffset()) + .limit(cell.getValueOffset() + cell.getValueLength()) + .slice(); + } } From 7e7e4172e99285c7db3382a65fb0f07febde12cd Mon Sep 17 00:00:00 2001 From: Shide Foo Date: Thu, 26 Sep 2024 18:24:08 +0800 Subject: [PATCH 28/31] Move avro schema length to BaseSchemRegistry --- .../caraml/serving/store/bigtable/BaseSchemaRegistry.java | 1 + .../serving/store/bigtable/BigTableOnlineRetriever.java | 6 ++++-- .../caraml/serving/store/bigtable/HBaseOnlineRetriever.java | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java index 6abee93..60f400d 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BaseSchemaRegistry.java @@ -13,6 +13,7 @@ public abstract class BaseSchemaRegistry { protected static String COLUMN_FAMILY = "metadata"; protected static String QUALIFIER = "avro"; protected static String KEY_PREFIX = "schema#"; + public static final int SCHEMA_REFERENCE_LENGTH = 4; public static class SchemaReference { private final String tableName; diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetriever.java index c10009b..921784b 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableOnlineRetriever.java @@ -161,8 +161,10 @@ private List decodeFeatures( BinaryDecoder reusedDecoder, long timestamp) throws IOException { - ByteString schemaReferenceBytes = value.substring(0, 4); - byte[] featureValueBytes = value.substring(4).toByteArray(); + ByteString schemaReferenceBytes = + value.substring(0, BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH); + byte[] featureValueBytes = + value.substring(BigTableSchemaRegistry.SCHEMA_REFERENCE_LENGTH).toByteArray(); BigTableSchemaRegistry.SchemaReference schemaReference = new BigTableSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes); diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index 38677c7..e9ebde7 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -175,8 +175,10 @@ private List decodeFeatures( BinaryDecoder reusedDecoder, long timestamp) throws IOException { - ByteString schemaReferenceBytes = value.substring(0, 4); - byte[] featureValueBytes = value.substring(4).toByteArray(); + ByteString schemaReferenceBytes = + value.substring(0, HBaseSchemaRegistry.SCHEMA_REFERENCE_LENGTH); + byte[] featureValueBytes = + value.substring(HBaseSchemaRegistry.SCHEMA_REFERENCE_LENGTH).toByteArray(); HBaseSchemaRegistry.SchemaReference schemaReference = new HBaseSchemaRegistry.SchemaReference(tableName, schemaReferenceBytes); From 455e253feba123d61b371c96206d766192e0acb1 Mon Sep 17 00:00:00 2001 From: Bayu Aditya Date: Fri, 27 Sep 2024 09:39:07 +0700 Subject: [PATCH 29/31] add try catch block while create bigtable connection --- .../serving/store/bigtable/BigTableStoreConfig.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java index 10cad15..162c2f3 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java @@ -36,7 +36,13 @@ public OnlineRetriever getRetriever() { BigtableConfiguration.configure(projectId, instanceId); config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId); - Connection connection = BigtableConfiguration.connect(config); + Connection connection; + try { + connection = BigtableConfiguration.connect(config); + } catch (IllegalStateException e) { + throw new RuntimeException(e); + } + return new HBaseOnlineRetriever(connection); } From 4355a1a841a7534140af90882cfeeb9bb2b6c43f Mon Sep 17 00:00:00 2001 From: Bayu Aditya Date: Wed, 2 Oct 2024 11:56:23 +0700 Subject: [PATCH 30/31] move into convertRowCellsToFeatures function --- .../store/bigtable/HBaseOnlineRetriever.java | 99 +++++++++++-------- 1 file changed, 56 insertions(+), 43 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java index e9ebde7..3dbcc12 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/HBaseOnlineRetriever.java @@ -9,6 +9,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.avro.AvroRuntimeException; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -68,54 +69,66 @@ public List> convertRowToFeature( rowKey -> { if (!rows.containsKey(rowKey)) { return Collections.emptyList(); - } else { - Result row = rows.get(rowKey); - return featureReferences.stream() - .map(ServingServiceProto.FeatureReference::getFeatureTable) - .distinct() - .map(cf -> row.getColumnCells(cf.getBytes(), null)) - .filter(ls -> !ls.isEmpty()) - .flatMap( - rowCells -> { - Cell rowCell = rowCells.get(0); // Latest cell - ByteBuffer valueBuffer = - HBaseSchemaRegistry.GetValueByteBufferFromRowCell(rowCell); - ByteBuffer familyBuffer = - ByteBuffer.wrap(rowCell.getFamilyArray()) - .position(rowCell.getFamilyOffset()) - .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) - .slice(); - String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); - ByteString value = ByteString.copyFrom(valueBuffer); - - List features; - List localFeatureReferences = - featureReferences.stream() - .filter( - featureReference -> - featureReference.getFeatureTable().equals(family)) - .collect(Collectors.toList()); - - try { - features = - decodeFeatures( - tableName, - value, - localFeatureReferences, - reusedDecoder, - rowCell.getTimestamp()); - } catch (IOException e) { - throw new RuntimeException("Failed to decode features from BigTable"); - } - - return features.stream(); - }) - .collect(Collectors.toList()); } + + Result row = rows.get(rowKey); + return featureReferences.stream() + .map(ServingServiceProto.FeatureReference::getFeatureTable) + .distinct() + .map(cf -> row.getColumnCells(cf.getBytes(), null)) + .filter(ls -> !ls.isEmpty()) + .flatMap( + rowCells -> + this.convertRowCellsToFeatures( + featureReferences, reusedDecoder, tableName, rowCells)) + .collect(Collectors.toList()); }) .collect(Collectors.toList()); } + /** + * Converts rowCells feature into stream @NativeFeature type + * + * @param featureReferences List of feature references + * @param reusedDecoder Decoder for decoding feature values + * @param tableName Name of SSTable + * @param rowCells row cells data from SSTable + * @return Stream of @NativeFeature + * @throws RuntimeException failed to decode features + */ + private Stream convertRowCellsToFeatures( + List featureReferences, + BinaryDecoder reusedDecoder, + String tableName, + List rowCells) { + + Cell rowCell = rowCells.get(0); // Latest cell + ByteBuffer valueBuffer = HBaseSchemaRegistry.GetValueByteBufferFromRowCell(rowCell); + ByteBuffer familyBuffer = + ByteBuffer.wrap(rowCell.getFamilyArray()) + .position(rowCell.getFamilyOffset()) + .limit(rowCell.getFamilyOffset() + rowCell.getFamilyLength()) + .slice(); + String family = ByteString.copyFrom(familyBuffer).toStringUtf8(); + ByteString value = ByteString.copyFrom(valueBuffer); + + List features; + List localFeatureReferences = + featureReferences.stream() + .filter(featureReference -> featureReference.getFeatureTable().equals(family)) + .collect(Collectors.toList()); + + try { + features = + decodeFeatures( + tableName, value, localFeatureReferences, reusedDecoder, rowCell.getTimestamp()); + } catch (IOException e) { + throw new RuntimeException("Failed to decode features from BigTable"); + } + + return features.stream(); + } + /** * Retrieve rows with required column families for each row entity by sending batch Get request, * HBase specific implementation From 3b4f01a0b6991e8847b2365857006a21146155ce Mon Sep 17 00:00:00 2001 From: Bayu Aditya Date: Wed, 2 Oct 2024 13:15:30 +0700 Subject: [PATCH 31/31] using single try catch block for connection --- .../store/bigtable/BigTableStoreConfig.java | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java index 162c2f3..bd80774 100644 --- a/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java +++ b/caraml-store-serving/src/main/java/dev/caraml/serving/store/bigtable/BigTableStoreConfig.java @@ -30,24 +30,18 @@ public class BigTableStoreConfig { @Bean public OnlineRetriever getRetriever() { - // Using HBase SDK - if (isUsingHBaseSDK) { - org.apache.hadoop.conf.Configuration config = - BigtableConfiguration.configure(projectId, instanceId); - config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId); + try { + // Using HBase SDK + if (isUsingHBaseSDK) { + org.apache.hadoop.conf.Configuration config = + BigtableConfiguration.configure(projectId, instanceId); + config.set(BigtableOptionsFactory.APP_PROFILE_ID_KEY, appProfileId); - Connection connection; - try { - connection = BigtableConfiguration.connect(config); - } catch (IllegalStateException e) { - throw new RuntimeException(e); + Connection connection = BigtableConfiguration.connect(config); + return new HBaseOnlineRetriever(connection); } - return new HBaseOnlineRetriever(connection); - } - - // Using BigTable SDK - try { + // Using BigTable SDK BigtableDataSettings.Builder builder = BigtableDataSettings.newBuilder() .setProjectId(projectId) @@ -66,6 +60,7 @@ public OnlineRetriever getRetriever() { } BigtableDataClient client = BigtableDataClient.create(settings); return new BigTableOnlineRetriever(client); + } catch (IOException e) { throw new RuntimeException(e); }