Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Add support to use hbase online store #126

Merged
merged 14 commits into from
Sep 20, 2024
Next Next commit
Add spark changes to use hbase api
shydefoo committed Sep 5, 2024
commit ad454e1b1b044c1cf97eb77368aab1c2c1e5e603
Original file line number Diff line number Diff line change
@@ -33,6 +33,10 @@ object BasePipeline {
conf
.set("spark.bigtable.projectId", projectId)
.set("spark.bigtable.instanceId", instanceId)
case HBaseConfig(zookeeperQuorum, zookeeperPort) =>
conf
.set("spark.hbase.zookeeper.quorum", zookeeperQuorum)
.set("spark.hbase.zookeeper.port", zookeeperPort.toString)
}

jobConfig.metrics match {
Original file line number Diff line number Diff line change
@@ -66,11 +66,19 @@ object BatchPipeline extends BasePipeline {
.map(metrics.incrementRead)
.filter(rowValidator.allChecks)

val onlineStore = config.store match {
case _: RedisConfig => "redis"
case _: BigTableConfig => "bigtable"
case _: HBaseConfig => "hbase"
}

validRows.write
.format(config.store match {
case _: RedisConfig => "dev.caraml.spark.stores.redis"
case _: BigTableConfig => "dev.caraml.spark.stores.bigtable"
case _: HBaseConfig => "dev.caraml.spark.stores.bigtable"
})
.option("online_store", onlineStore)
.option("entity_columns", featureTable.entities.map(_.name).mkString(","))
.option("namespace", featureTable.name)
.option("project_name", featureTable.project)
Original file line number Diff line number Diff line change
@@ -87,6 +87,9 @@ object IngestionJob {
opt[String](name = "bigtable")
.action((x, c) => c.copy(store = parseJSON(x).camelizeKeys.extract[BigTableConfig]))

opt[String](name = "hbase")
.action((x, c) => c.copy(store = parseJSON(x).extract[HBaseConfig]))

opt[String](name = "statsd")
.action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))

Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ case class RedisWriteProperties(
ratePerSecondLimit: Int = 50000
)
case class BigTableConfig(projectId: String, instanceId: String) extends StoreConfig
case class HBaseConfig(zookeeperQuorum: String, zookeeperPort: Int) extends StoreConfig

sealed trait MetricConfig

Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration
import dev.caraml.spark.serialization.Serializer
import dev.caraml.spark.utils.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.{Admin, Connection, Put}
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.mapred.JobConf
@@ -30,8 +30,12 @@ class BigTableSinkRelation(

override def schema: StructType = ???

def getConnection(hadoopConfig: Configuration): Connection = {
BigtableConfiguration.connect(hadoopConfig)
}

def createTable(): Unit = {
val btConn = BigtableConfiguration.connect(hadoopConfig)
val btConn = getConnection(hadoopConfig)
try {
val admin = btConn.getAdmin

Original file line number Diff line number Diff line change
@@ -23,27 +23,43 @@ class DefaultSource extends CreatableRelationProvider {
parameters: Map[String, String],
data: DataFrame
): BaseRelation = {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)

if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
val onlineStore = parameters.getOrElse("onlineStore", "bigtable")
var rel: BigTableSinkRelation = null
if (onlineStore == "bigtable") {
val bigtableConf = BigtableConfiguration.configure(
sqlContext.getConf(PROJECT_KEY),
sqlContext.getConf(INSTANCE_KEY)
)
}

configureBigTableClient(bigtableConf, sqlContext)
if (sqlContext.getConf("spark.bigtable.emulatorHost", "").nonEmpty) {
bigtableConf.set(
BIGTABLE_EMULATOR_HOST_KEY,
sqlContext.getConf("spark.bigtable.emulatorHost")
)
}

configureBigTableClient(bigtableConf, sqlContext)

val rel =
new BigTableSinkRelation(
rel =
new BigTableSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
bigtableConf
)
} else if (onlineStore == "hbase"){
val hbaseConf = new Configuration()
hbaseConf.set("hbase.zookeeper.quorum", sqlContext.getConf(ZOOKEEPER_QUOROM_KEY))
hbaseConf.set("hbase.zookeeper.property.clientPort", sqlContext.getConf(ZOOKEEPER_PORT_KEY))
rel = new HbaseSinkRelation(
sqlContext,
new AvroSerializer,
SparkBigtableConfig.parse(parameters),
bigtableConf
hbaseConf
)
} else {
throw new UnsupportedOperationException(s"Unsupported online store: $onlineStore")
}
rel.createTable()
rel.saveWriteSchema(data)
rel.insert(data, overwrite = false)
@@ -79,4 +95,7 @@ object DefaultSource {
private val THROTTLING_THRESHOLD_MILLIS_KEY = "spark.bigtable.throttlingThresholdMs"
private val MAX_ROW_COUNT_KEY = "spark.bigtable.maxRowCount"
private val MAX_INFLIGHT_KEY = "spark.bigtable.maxInflightRpcs"

private val ZOOKEEPER_QUOROM_KEY = "spark.hbase.zookeeper.quorum"
private val ZOOKEEPER_PORT_KEY = "spark.hbase.zookeeper.port"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package dev.caraml.spark.stores.bigtable

import dev.caraml.spark.serialization.Serializer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.spark.sql.SQLContext

class HbaseSinkRelation(
sqlContext: SQLContext,
serializer: Serializer,
config: SparkBigtableConfig,
hadoopConfig: Configuration
) extends BigTableSinkRelation(sqlContext, serializer, config, hadoopConfig) {
override def getConnection(hadoopConfig: Configuration): Connection = {
ConnectionFactory.createConnection(hadoopConfig)
}
}