diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index 06079a7d..63dc950c 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -19,7 +19,7 @@ package org.apache.s2graph.loader.subscriber -import org.apache.hadoop.hbase.client.Put +import com.typesafe.config.ConfigFactory import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding import org.apache.hadoop.hbase.mapreduce.{TableOutputFormat} @@ -36,6 +36,7 @@ import org.apache.spark.rdd.RDD import org.hbase.async.{PutRequest} import play.api.libs.json.Json import scala.collection.JavaConversions._ +import scala.concurrent.ExecutionContext object TransferToHFile extends SparkApp with JSONParser { @@ -144,27 +145,22 @@ object TransferToHFile extends SparkApp with JSONParser { val zkQuorum = args(2) val tableName = args(3) val dbUrl = args(4) - val maxHFilePerResionServer = args(5).toInt - val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] - val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false - val buildDegree = if (args.length >= 9) args(8).toBoolean else true - val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" + val labelMapping = if (args.length >= 6) GraphSubscriberHelper.toLabelMapping(args(5)) else Map.empty[String, String] + val autoEdgeCreate = if (args.length >= 7) args(6).toBoolean else false + val buildDegree = if (args.length >= 8) args(7).toBoolean else true val conf = sparkConf(s"$input: TransferToHFile") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryoserializer.buffer.mb", "24") val sc = new SparkContext(conf) - GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) - /** set up hbase init */ val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") - - val rdd = sc.textFile(input) + val rdd = if (args.length >= 9) sc.textFile(input).repartition(args(8).toInt) else sc.textFile(input) val kvs = rdd.mapPartitions { iter => @@ -172,20 +168,6 @@ object TransferToHFile extends SparkApp with JSONParser { GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) } - // - // val newRDD = if (!buildDegree) new HFileRDD(kvs) - // else { - // val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - // agg + current - // }.mapPartitions { iter => - // val phase = System.getProperty("phase") - // GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - // toKeyValues(iter.toSeq) - // } - // new HFileRDD(kvs ++ degreeKVs) - // } - // - // newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath) val merged = if (!buildDegree) kvs else { kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => @@ -211,3 +193,45 @@ object TransferToHFile extends SparkApp with JSONParser { } } + + +object CmdHTableUtil { + val usages = + s""" + |create hbase Table on zkQuorum specified. + |note that hbase table can be configured with max file per region multiplier, replicationScope and compression + |algorithm from commandline arguments. + | + |params: + |0. zkQuorum: running hbase cluster zkQuorum. + |1. tableName: table name for this bulk upload. + |2. max file per region server : number of region files per RegionServer. + |3. replicationScope[optional] : table columnfamily's replication scope value ( e.g. 1 master and 1 slave, only master has 1 scope) + | - default = none + |4. compression algorithm[optional] : table compression algorithm ( e.g. lz, gzip, etc. ) + | - default = lz + """.stripMargin + + def main(args: Array[String]) : Unit = { + + if ( args.length < 3 ) { + println(usages) + sys.exit(1) + } + + val zkQuorum = args(0) + val tableName = args(1) + val maxHFilePerResionServer = args(2).toInt + val replicationScopeOpt = if ( args(3) == "none" ) None else Some(args(3).toInt) + val compressionAlgorithm = if (args.length >= 5) args(4) else "lz4" + + val config = ConfigFactory.load() + val s2graph = new Graph(config)(ExecutionContext.Implicits.global) + val storageManagement = new Management(s2graph) + + storageManagement.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt) + + sys.exit(0) + } + +} diff --git a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala index f9b74314..c3ff7ead 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/Management.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/Management.scala @@ -297,8 +297,9 @@ class Management(graph: Graph) { cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = - storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) + compressionAlgorithm: String = DefaultCompressionAlgorithm, + replicationScopeOpt: Option[Int] = None): Unit = + storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt) /** HBase specific code */ def createService(serviceName: String, diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala index fd968add..812ca8b9 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/Storage.scala @@ -256,7 +256,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None): Unit diff --git a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 66a1be48..224bd1e1 100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -369,7 +369,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit = { + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None): Unit = { logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") val admin = getAdmin(zkAddr) val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier @@ -388,6 +389,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte .setBlocksize(32768) .setBlockCacheEnabled(true) if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) desc.addFamily(columnDesc) }