Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support PostgreSQL data source #62

Merged
merged 8 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ object Configs {
case "HBASE" => SourceCategory.HBASE
case "MAXCOMPUTE" => SourceCategory.MAXCOMPUTE
case "CLICKHOUSE" => SourceCategory.CLICKHOUSE
case "POSTGRESQL" => SourceCategory.POSTGRESQL
case _ => throw new IllegalArgumentException(s"${category} not support")
}
}
Expand Down Expand Up @@ -659,6 +660,17 @@ object Configs {
config.getString("password"),
getOrElse(config, "sentence", "")
)
case SourceCategory.POSTGRESQL =>
PostgreSQLSourceConfigEntry(
SourceCategory.POSTGRESQL,
config.getString("host"),
config.getInt("port"),
config.getString("database"),
config.getString("table"),
config.getString("user"),
config.getString("password"),
getOrElse(config, "sentence", "")
)
case SourceCategory.KAFKA =>
val intervalSeconds =
if (config.hasPath("interval.seconds")) config.getInt("interval.seconds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object SourceCategory extends Enumeration {
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")
val CLICKHOUSE = Value("CLICKHOUSE")
val POSTGRESQL = Value("POSTGRESQL")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -157,6 +158,37 @@ case class MySQLSourceConfigEntry(override val category: SourceCategory.Value,
}
}

/**
* PostgreSQLSourceConfigEntry
*
* @param category
* @param host
* @param port
* @param database
* @param table
* @param user
* @param password
* @param sentence
*/
case class PostgreSQLSourceConfigEntry(override val category: SourceCategory.Value,
host: String,
port: Int,
database: String,
table: String,
user: String,
password: String,
override val sentence: String
)
extends ServerDataSourceConfigEntry {
require(
host.trim.length != 0 && port > 0 && database.trim.length > 0 && table.trim.length > 0 && user.trim.length > 0)

override def toString: String = {
s"PostgreSql source host: ${host}, port: ${port}, database: ${database}, table: ${table}, " +
s"user: ${user}, password: ${password}, sentence: ${sentence}"
}
}

/**
* TODO: Support more com.vesoft.exchange.common.config item about Kafka Consumer
*
Expand Down
25 changes: 25 additions & 0 deletions exchange-common/src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,31 @@
batch: 256
partition: 32
}

# PostgreSQL
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: database
table: table
user: root
password: nebula
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: mysql-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
]

# Processing edges
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.vesoft.exchange.common.config.{
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
MySQLSourceConfigEntry,
PostgreSQLSourceConfigEntry,
Neo4JSourceConfigEntry,
SinkCategory,
SourceCategory
Expand Down Expand Up @@ -156,6 +157,17 @@ class ConfigsSuite {
assert(mysql.database.equals("database"))
assert(mysql.table.equals("table"))
}
case SourceCategory.POSTGRESQL => {
val postgresql = tagConfig.dataSourceConfigEntry.asInstanceOf[PostgreSQLSourceConfigEntry]
assert(label.equals("tag9"))
assert(postgresql.database.equals("database"))
assert(postgresql.host.equals("127.0.0.1"))
assert(postgresql.port == 5432)
assert(postgresql.user.equals("root"))
assert(postgresql.password.equals("nebula"))
assert(postgresql.database.equals("database"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate test

assert(postgresql.table.equals("table"))
}
case _ => {}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,12 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -285,6 +256,11 @@ object Exchange {
LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}")
val reader = new MySQLReader(session, mysqlConfig)
Some(reader.read())
case SourceCategory.POSTGRESQL =>
val postgreConfig = config.asInstanceOf[PostgreSQLSourceConfigEntry]
LOG.info(s"Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}")
val reader = new PostgreSQLReader(session, postgreConfig)
Some(reader.read())
case SourceCategory.PULSAR =>
val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry]
LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,7 @@
package com.vesoft.nebula.exchange.reader

import com.google.common.collect.Maps
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand All @@ -27,10 +18,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{
ClusterCountMapReduce,
PeerPressureVertexProgram
}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram}
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
Expand Down Expand Up @@ -94,6 +82,31 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
}
}

/**
* The PostgreSQLReader extends the ServerBaseReader
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}"
val df = session.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", url)
.option("dbtable", postgreConfig.table)
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
if(!"".equals(sentence.trim)) session.sql(sentence)
else session.sql(s"select * from ${postgreConfig.table}")
}
}

/**
* Neo4JReader extends the ServerBaseReader
* this reader support checkpoint by sacrificing performance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,12 @@
package com.vesoft.nebula.exchange

import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.File

import java.io.File
import com.vesoft.exchange.Argument
import com.vesoft.exchange.common.{CheckPointHandler, ErrorHandler}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
Configs,
DataSourceConfigEntry,
FileBaseSourceConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PulsarSourceConfigEntry,
SinkCategory,
SourceCategory
}
import com.vesoft.nebula.exchange.reader.{
CSVReader,
ClickhouseReader,
HBaseReader,
HiveReader,
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
ParquetReader,
PulsarReader
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, Configs, DataSourceConfigEntry, FileBaseSourceConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, KafkaSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, PulsarSourceConfigEntry, SinkCategory, SourceCategory}
import com.vesoft.nebula.exchange.reader.{CSVReader, ClickhouseReader, HBaseReader, HiveReader, JSONReader, JanusGraphReader, KafkaReader, MaxcomputeReader, MySQLReader, Neo4JReader, ORCReader, ParquetReader, PostgreSQLReader, PulsarReader}
import com.vesoft.exchange.common.processor.ReloadProcessor
import com.vesoft.nebula.exchange.processor.{EdgeProcessor, VerticesProcessor}
import org.apache.log4j.Logger
Expand Down Expand Up @@ -285,6 +256,11 @@ object Exchange {
LOG.info(s"Loading from mysql com.vesoft.exchange.common.config: ${mysqlConfig}")
val reader = new MySQLReader(session, mysqlConfig)
Some(reader.read())
case SourceCategory.POSTGRESQL =>
val postgreConfig = config.asInstanceOf[PostgreSQLSourceConfigEntry]
LOG.info(s"Loading from postgresql com.vesoft.exchange.common.config: ${postgreConfig}")
val reader = new PostgreSQLReader(session, postgreConfig)
Some(reader.read())
case SourceCategory.PULSAR =>
val pulsarConfig = config.asInstanceOf[PulsarSourceConfigEntry]
LOG.info(s"Loading from pulsar com.vesoft.exchange.common.config: ${pulsarConfig}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,7 @@
package com.vesoft.nebula.exchange.reader

import com.google.common.collect.Maps
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry}
import com.vesoft.exchange.common.utils.HDFSUtils
import com.vesoft.nebula.exchange.utils.Neo4jUtils
import org.apache.hadoop.hbase.HBaseConfiguration
Expand All @@ -29,10 +20,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{
ClusterCountMapReduce,
PeerPressureVertexProgram
}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram}
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
Expand Down Expand Up @@ -96,6 +84,29 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
}
}

/**
* The PostgreSQLReader extends the ServerBaseReader
*
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}"
val df = session.read
.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", url)
.option("dbtable", postgreConfig.table)
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
if(!"".equals(sentence.trim)) session.sql(sentence)
else session.sql(s"select * from ${postgreConfig.table}")
}
}

/**
* Neo4JReader extends the ServerBaseReader
* this reader support checkpoint by sacrificing performance
Expand Down
Loading