Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
Expand All @@ -19,7 +19,7 @@

package org.apache.s2graph.loader.subscriber

import org.apache.hadoop.hbase.client.Put
import com.typesafe.config.ConfigFactory
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding
import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat}
Expand All @@ -36,6 +36,7 @@ import org.apache.spark.rdd.RDD
import org.hbase.async.{PutRequest}
import play.api.libs.json.Json
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext


object TransferToHFile extends SparkApp with JSONParser {
Expand Down Expand Up @@ -144,48 +145,29 @@ object TransferToHFile extends SparkApp with JSONParser {
val zkQuorum = args(2)
val tableName = args(3)
val dbUrl = args(4)
val maxHFilePerResionServer = args(5).toInt
val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String]
val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false
val buildDegree = if (args.length >= 9) args(8).toBoolean else true
val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4"
val labelMapping = if (args.length >= 6) GraphSubscriberHelper.toLabelMapping(args(5)) else Map.empty[String, String]
val autoEdgeCreate = if (args.length >= 7) args(6).toBoolean else false
val buildDegree = if (args.length >= 8) args(7).toBoolean else true
val conf = sparkConf(s"$input: TransferToHFile")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "24")

val sc = new SparkContext(conf)

GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm)

/** set up hbase init */
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", zkQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName")


val rdd = sc.textFile(input)
val rdd = if (args.length >= 9) sc.textFile(input).repartition(args(8).toInt) else sc.textFile(input)


val kvs = rdd.mapPartitions { iter =>
val phase = System.getProperty("phase")
GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate)
}
//
// val newRDD = if (!buildDegree) new HFileRDD(kvs)
// else {
// val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) =>
// agg + current
// }.mapPartitions { iter =>
// val phase = System.getProperty("phase")
// GraphSubscriberHelper.apply(phase, dbUrl, "none", "none")
// toKeyValues(iter.toSeq)
// }
// new HFileRDD(kvs ++ degreeKVs)
// }
//
// newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath)
val merged = if (!buildDegree) kvs
else {
kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) =>
Expand All @@ -211,3 +193,45 @@ object TransferToHFile extends SparkApp with JSONParser {
}

}


object CmdHTableUtil {
val usages =
s"""
|create hbase Table on zkQuorum specified.
|note that hbase table can be configured with max file per region multiplier, replicationScope and compression
|algorithm from commandline arguments.
|
|params:
|0. zkQuorum: running hbase cluster zkQuorum.
|1. tableName: table name for this bulk upload.
|2. max file per region server : number of region files per RegionServer.
|3. replicationScope[optional] : table columnfamily's replication scope value ( e.g. 1 master and 1 slave, only master has 1 scope)
| - default = none
|4. compression algorithm[optional] : table compression algorithm ( e.g. lz, gzip, etc. )
| - default = lz
""".stripMargin

def main(args: Array[String]) : Unit = {

if ( args.length < 3 ) {
println(usages)
sys.exit(1)
}

val zkQuorum = args(0)
val tableName = args(1)
val maxHFilePerResionServer = args(2).toInt
val replicationScopeOpt = if ( args(3) == "none" ) None else Some(args(3).toInt)
val compressionAlgorithm = if (args.length >= 5) args(4) else "lz4"

val config = ConfigFactory.load()
val s2graph = new Graph(config)(ExecutionContext.Implicits.global)
val storageManagement = new Management(s2graph)

storageManagement.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt)

sys.exit(0)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ class Management(graph: Graph) {
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit =
storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm)
compressionAlgorithm: String = DefaultCompressionAlgorithm,
replicationScopeOpt: Option[Int] = None): Unit =
storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt)

/** HBase specific code */
def createService(serviceName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) {
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String): Unit
compressionAlgorithm: String,
replicationScopeOpt: Option[Int] = None): Unit



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
cfs: List[String],
regionMultiplier: Int,
ttl: Option[Int],
compressionAlgorithm: String): Unit = {
compressionAlgorithm: String,
replicationScopeOpt: Option[Int] = None): Unit = {
logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm")
val admin = getAdmin(zkAddr)
val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier
Expand All @@ -388,6 +389,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte
.setBlocksize(32768)
.setBlockCacheEnabled(true)
if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get)
if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get)
desc.addFamily(columnDesc)
}

Expand Down