-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
support PostgreSQL data source #62
Changes from 7 commits
3ff17b5
6234df9
bb05d90
244d854
17dc0d1
d8ae6e9
a321d75
0dd8046
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,16 +6,7 @@ | |
package com.vesoft.nebula.exchange.reader | ||
|
||
import com.google.common.collect.Maps | ||
import com.vesoft.exchange.common.config.{ | ||
ClickHouseConfigEntry, | ||
HBaseSourceConfigEntry, | ||
HiveSourceConfigEntry, | ||
JanusGraphSourceConfigEntry, | ||
MaxComputeConfigEntry, | ||
MySQLSourceConfigEntry, | ||
Neo4JSourceConfigEntry, | ||
ServerDataSourceConfigEntry | ||
} | ||
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry} | ||
import org.apache.hadoop.hbase.HBaseConfiguration | ||
import org.apache.hadoop.hbase.client.Result | ||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable | ||
|
@@ -27,10 +18,7 @@ import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema | ||
import org.apache.spark.sql.types.{DataTypes, StructType} | ||
import org.apache.spark.sql.{DataFrame, Row, SparkSession} | ||
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ | ||
ClusterCountMapReduce, | ||
PeerPressureVertexProgram | ||
} | ||
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram} | ||
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer | ||
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD | ||
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory | ||
|
@@ -94,6 +82,30 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo | |
} | ||
} | ||
|
||
/** | ||
* The PostgreSQLReader extends the ServerBaseReader | ||
* | ||
* @param session | ||
* @param postgreConfig | ||
*/ | ||
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry) | ||
extends ServerBaseReader(session, postgreConfig.sentence) { | ||
override def read(): DataFrame = { | ||
val url = | ||
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}" | ||
val df = session.read | ||
.format("jdbc") | ||
.option("driver", "org.postgresql.Driver") | ||
.option("url", url) | ||
.option("dbtable", postgreConfig.table) | ||
.option("user", postgreConfig.user) | ||
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) | ||
session.sql(sentence) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in configs, if user does not config sentence, default sentence value is "", which will throw So, please add check for
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if I check it when user set the configs ?
It will save many There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's contradictory with the default sentence value:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If sentence is not configured, executing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No need to execute |
||
} | ||
} | ||
|
||
/** | ||
* Neo4JReader extends the ServerBaseReader | ||
* this reader support checkpoint by sacrificing performance | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,16 +6,7 @@ | |
package com.vesoft.nebula.exchange.reader | ||
|
||
import com.google.common.collect.Maps | ||
import com.vesoft.exchange.common.config.{ | ||
ClickHouseConfigEntry, | ||
HBaseSourceConfigEntry, | ||
HiveSourceConfigEntry, | ||
JanusGraphSourceConfigEntry, | ||
MaxComputeConfigEntry, | ||
MySQLSourceConfigEntry, | ||
Neo4JSourceConfigEntry, | ||
ServerDataSourceConfigEntry | ||
} | ||
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry} | ||
import com.vesoft.exchange.common.utils.HDFSUtils | ||
import com.vesoft.nebula.exchange.utils.Neo4jUtils | ||
import org.apache.hadoop.hbase.HBaseConfiguration | ||
|
@@ -29,10 +20,7 @@ import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema | ||
import org.apache.spark.sql.types.{DataTypes, StructType} | ||
import org.apache.spark.sql.{DataFrame, Row, SparkSession} | ||
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ | ||
ClusterCountMapReduce, | ||
PeerPressureVertexProgram | ||
} | ||
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram} | ||
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer | ||
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD | ||
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory | ||
|
@@ -96,6 +84,28 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo | |
} | ||
} | ||
|
||
/** | ||
* The PostgreSQLReader extends the ServerBaseReader | ||
* | ||
*/ | ||
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry) | ||
extends ServerBaseReader(session, postgreConfig.sentence) { | ||
override def read(): DataFrame = { | ||
val url = | ||
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}" | ||
val df = session.read | ||
.format("jdbc") | ||
.option("driver", "org.postgresql.Driver") | ||
.option("url", url) | ||
.option("dbtable", postgreConfig.table) | ||
.option("user", postgreConfig.user) | ||
.option("password", postgreConfig.password) | ||
.load() | ||
df.createOrReplaceTempView(postgreConfig.table) | ||
session.sql(sentence) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
} | ||
} | ||
|
||
/** | ||
* Neo4JReader extends the ServerBaseReader | ||
* this reader support checkpoint by sacrificing performance | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate test