From 76a3dc8c9b0b23076faefc32d19dfa1dc0f5b9cc Mon Sep 17 00:00:00 2001 From: "June.Kay" Date: Mon, 29 Feb 2016 17:17:23 +0900 Subject: [PATCH 1/3] S2GRAPH-52 add create table utility class and reflect changes of core - can specify HTable columnfamily's replication scope option (for master/slave replication) - add command line utility to create table --- .../scala/subscriber/GraphSubscriber.scala | 2 +- .../scala/subscriber/TransferToHFile.scala | 56 ++++++++++++++++--- .../com/kakao/s2graph/core/Management.scala | 5 +- .../kakao/s2graph/core/storage/Storage.scala | 3 +- .../storage/hbase/AsynchbaseStorage.scala | 4 +- 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/loader/src/main/scala/subscriber/GraphSubscriber.scala b/loader/src/main/scala/subscriber/GraphSubscriber.scala index f4f78657..9c647fbc 100644 --- a/loader/src/main/scala/subscriber/GraphSubscriber.scala +++ b/loader/src/main/scala/subscriber/GraphSubscriber.scala @@ -66,7 +66,7 @@ object GraphSubscriberHelper extends WithKafka { if (g == null) { val ec = ExecutionContext.Implicits.global g = new Graph(config)(ec) - management = new Management(g)(ec) + management = new Management(g) } } diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala b/loader/src/main/scala/subscriber/TransferToHFile.scala index 516bb39a..cad5e2ea 100644 --- a/loader/src/main/scala/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/subscriber/TransferToHFile.scala @@ -1,9 +1,12 @@ package subscriber +import java.util.concurrent.Executors + import com.kakao.s2graph.core._ import com.kakao.s2graph.core.mysqls.{LabelMeta, Label} import com.kakao.s2graph.core.types.{InnerValLikeWithTs, LabelWithDirection, SourceVertexId} +import com.typesafe.config.ConfigFactory import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.compress.Compression.Algorithm import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding @@ -18,6 +21,7 @@ import play.api.libs.json.Json import s2.spark.{SparkApp} import spark.{FamilyHFileWriteOptions, KeyFamilyQualifier, HBaseContext} import scala.collection.JavaConversions._ +import scala.concurrent.ExecutionContext object TransferToHFile extends SparkApp with JSONParser { @@ -126,19 +130,15 @@ object TransferToHFile extends SparkApp with JSONParser { val zkQuorum = args(2) val tableName = args(3) val dbUrl = args(4) - val maxHFilePerResionServer = args(5).toInt - val labelMapping = if (args.length >= 7) GraphSubscriberHelper.toLabelMapping(args(6)) else Map.empty[String, String] - val autoEdgeCreate = if (args.length >= 8) args(7).toBoolean else false - val buildDegree = if (args.length >= 9) args(8).toBoolean else true - val compressionAlgorithm = if (args.length >= 10) args(9) else "lz4" + val labelMapping = if (args.length >= 6) GraphSubscriberHelper.toLabelMapping(args(5)) else Map.empty[String, String] + val autoEdgeCreate = if (args.length >= 7) args(6).toBoolean else false + val buildDegree = if (args.length >= 8) args(7).toBoolean else true val conf = sparkConf(s"$input: TransferToHFile") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryoserializer.buffer.mb", "24") val sc = new SparkContext(conf) - Management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) - /** set up hbase init */ val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) @@ -193,3 +193,45 @@ object TransferToHFile extends SparkApp with JSONParser { } } + + +object CmdHTableUtil { + val usages = + s""" + |create hbase Table on zkQuorum specified. + |note that hbase table can be configured with max file per region multiplier, replicationScope and compression + |algorithm from commandline arguments. + | + |params: + |0. zkQuorum: running hbase cluster zkQuorum. + |1. tableName: table name for this bulk upload. + |2. max file per region server : number of region files per RegionServer. + |3. replicationScope[optional] : table columnfamily's replication scope value ( e.g. 1 master and 1 slave, only master has 1 scope) + | - default = none + |4. compression algorithm[optional] : table compression algorithm ( e.g. lz, gzip, etc. ) + | - default = lz + """.stripMargin + + def main(args: Array[String]) : Unit = { + + if ( args.length < 3 ) { + println(usages) + sys.exit(1) + } + + val zkQuorum = args(0) + val tableName = args(1) + val maxHFilePerResionServer = args(2).toInt + val replicationScopeOpt = if ( args(3) == "none" ) None else Some(args(3).toInt) + val compressionAlgorithm = if (args.length >= 5) args(4) else "lz4" + + val config = ConfigFactory.load() + val s2graph = new Graph(config)(ExecutionContext.Implicits.global) + val storageManagement = new Management(s2graph) + + storageManagement.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt) + + sys.exit(0) + } + +} diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala index ccf9d1f5..75f7da63 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/Management.scala @@ -278,8 +278,9 @@ class Management(graph: Graph) { cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String = DefaultCompressionAlgorithm): Unit = - storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm) + compressionAlgorithm: String = DefaultCompressionAlgorithm, + replicationScopeOpt: Option[Int] = None): Unit = + storage.createTable(zkAddr, tableName, cfs, regionMultiplier, ttl, compressionAlgorithm, replicationScopeOpt = replicationScopeOpt) /** HBase specific code */ def createService(serviceName: String, diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala index bca8df35..3569fa31 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/Storage.scala @@ -193,7 +193,8 @@ abstract class Storage[R](val config: Config)(implicit ec: ExecutionContext) { cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None): Unit diff --git a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala index 7c05aedf..ef1f5d58 100644 --- a/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala +++ b/s2core/src/main/scala/com/kakao/s2graph/core/storage/hbase/AsynchbaseStorage.scala @@ -297,7 +297,8 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte cfs: List[String], regionMultiplier: Int, ttl: Option[Int], - compressionAlgorithm: String): Unit = { + compressionAlgorithm: String, + replicationScopeOpt: Option[Int] = None): Unit = { logger.info(s"create table: $tableName on $zkAddr, $cfs, $regionMultiplier, $compressionAlgorithm") val admin = getAdmin(zkAddr) val regionCount = admin.getClusterStatus.getServersSize * regionMultiplier @@ -316,6 +317,7 @@ class AsynchbaseStorage(override val config: Config)(implicit ec: ExecutionConte .setBlocksize(32768) .setBlockCacheEnabled(true) if (ttl.isDefined) columnDesc.setTimeToLive(ttl.get) + if (replicationScopeOpt.isDefined) columnDesc.setScope(replicationScopeOpt.get) desc.addFamily(columnDesc) } From d9b626a7de9c4cd67e02f8e21a546a9b18ab3263 Mon Sep 17 00:00:00 2001 From: "elric.kang" Date: Fri, 4 Mar 2016 19:37:46 +0900 Subject: [PATCH 2/3] add optional param to repartition input --- loader/src/main/scala/subscriber/TransferToHFile.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/loader/src/main/scala/subscriber/TransferToHFile.scala b/loader/src/main/scala/subscriber/TransferToHFile.scala index cad5e2ea..c6336aad 100644 --- a/loader/src/main/scala/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/subscriber/TransferToHFile.scala @@ -145,8 +145,7 @@ object TransferToHFile extends SparkApp with JSONParser { hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) hbaseConf.set("hadoop.tmp.dir", s"/tmp/$tableName") - - val rdd = sc.textFile(input) + val rdd = if (args.length >= 9) sc.textFile(input).repartition(args(8).toInt) else sc.textFile(input) val kvs = rdd.mapPartitions { iter => From cb6988b200a81ed1855b1e3c8992dde8cfd39109 Mon Sep 17 00:00:00 2001 From: "wishoping (Junki Kim)" Date: Wed, 30 Mar 2016 13:40:57 +0900 Subject: [PATCH 3/3] Remove useless create table codes --- .../loader/subscriber/TransferToHFile.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala index b14263d2..63dc950c 100644 --- a/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala +++ b/loader/src/main/scala/org/apache/s2graph/loader/subscriber/TransferToHFile.scala @@ -154,8 +154,6 @@ object TransferToHFile extends SparkApp with JSONParser { val sc = new SparkContext(conf) - GraphSubscriberHelper.management.createTable(zkQuorum, tableName, List("e", "v"), maxHFilePerResionServer, None, compressionAlgorithm) - /** set up hbase init */ val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", zkQuorum) @@ -170,20 +168,6 @@ object TransferToHFile extends SparkApp with JSONParser { GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") toKeyValues(iter.toSeq, labelMapping, autoEdgeCreate) } - // - // val newRDD = if (!buildDegree) new HFileRDD(kvs) - // else { - // val degreeKVs = buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) => - // agg + current - // }.mapPartitions { iter => - // val phase = System.getProperty("phase") - // GraphSubscriberHelper.apply(phase, dbUrl, "none", "none") - // toKeyValues(iter.toSeq) - // } - // new HFileRDD(kvs ++ degreeKVs) - // } - // - // newRDD.toHFile(hbaseConf, zkQuorum, tableName, maxHFilePerResionServer, tmpPath) val merged = if (!buildDegree) kvs else { kvs ++ buildDegrees(rdd, labelMapping, autoEdgeCreate).reduceByKey { (agg, current) =>