The documentation can be found at http://www.gpudb.com/docs/5.2/index.html. The connector specific documentation can be found at:
For changes to the connector API, please refer to CHANGELOG.md. For changes to GPUdb functions, please refer to CHANGELOG-FUNCTIONS.md.
The following guide provides step by step instructions to get started integrating GPUdb with Spark.
This project is aimed to make GPUdb Spark accessible, meaning an RDD
or DStream
can be generated from a GPUdb table or can be saved to a GPUdb table.
Source code for the connector can be found at https://github.com/GPUdb/gpudb-connector-spark
The three connector classes that integrate GPUdb with Spark are:
com.gpudb.spark.input
GPUdbReader
- Reads data from a table into anRDD
GPUdbReceiver
- A Spark streamingReceiver
that receives data from a GPUdb table monitor stream
com.gpudb.spark.output
GPUdbWriter
- Writes data from anRDD
orDStream
into GPUdb
The GPUdb Spark connector uses the Spark configuration to pass GPUdb instance information to the Spark workers. Ensure that the following properties are set in the Spark configuration (SparkConf
) of the Spark context (JavaSparkContext
) when using each of the following connector interfaces:
GPUdbReader
gpudb.host
- The hostname or IP address of the GPUdb instancegpudb.port
- The port number on which the GPUdb service is listeninggpudb.threads
- The number of threads GPUdb should usegpudb.table
- The name of the GPUdb table being accessed
GPUdbWriter
gpudb.host
- The hostname or IP address of the GPUdb instancegpudb.port
- The port number on which the GPUdb service is listeninggpudb.threads
- The number of threads GPUdb should usegpudb.table
- The name of the GPUdb table being accessedgpudb.insert.size
- The number of records to queue before inserting into GPUdb
To read from a GPUdb table, create a class that extends RecordObject
and implements Serializable
. For example::
public class PersonRecord extends RecordObject implements Serializable
{
@RecordObject.Column(order = 0, properties = { ColumnProperty.DATA })
public long id;
@RecordObject.Column(order = 1)
public String name;
@RecordObject.Column(order = 2)
public long birthDate;
public PersonRecord(){}
}
Note: The column order specified in the class must correspond to the table schema.
Next, instantiate a GPUdbReader
for that class and call the readTable
method with an optional filter expression
::
GPUdbReader<PersonRecord> reader = new GPUdbReader<PersonRecord>(sparkConf);
JavaRDD<PersonRecord> rdd = reader.readTable(PersonRecord.class, "PeopleTable", expression, sparkContext);
The expression
in the readTable
call is equivalent to a SQL WHERE
clause. For details, read the Expressions section of the Concepts page.
Creating a GPUdb table::
GPUdbUtil.createTable(gpudbUrl, tableName, PersonRecord.class);
Writing to a GPUdb table::
final GPUdbWriter<PersonRecord> writer = new GPUdbWriter<PersonRecord>(sparkConf);
writer.write(rdd);
The following creates a DStream
from any new data inserted into the table tableName
::
GPUdbReceiver receiver = new GPUdbReceiver(gpudbUrl, gpudbStreamUrl, tableName);
JavaReceiverInputDStream<AvroWrapper> dstream = javaStreamingContext.receiverStream(receiver);
Each record in the DStream
is of type AvroWrapper
, which is an Avro object along with its schema to decode it.
Note: At this time, only add
and bulkadd
functions will trigger the GPUdb to publish added records to ZMQ to be received by the Spark streaming interface. New records can also be added via the GPUdb administration page.
Creating a GPUdb table::
GPUdbUtil.createTable(gpudbUrl, tableName, PersonRecord.class);
Writing to a GPUdb table::
final GPUdbWriter<PersonRecord> writer = new GPUdbWriter<PersonRecord>(sparkConf);
writer.write(dstream);
Examples can be found in the com.gpudb.spark
package:
BatchExample
- Reading & writing GPUdb data via Spark using anRDD
StreamExample
- Reading & writing GPUdb data via Spark using aDStream
The example code provided in this project assumes launching will be done on a Spark server using /bin/spark-submit
. The example.sh
script can run each example with minimal configuration via the example.properites
file.
To install the example, the Spark connector RPM needs to be deployed onto the Spark driver host. The RPM generated by this project should be installed, where <X.Y.Z>
is the GPUdb version and <YYYYMMDDhhmmss>
is the build date::
[root@local]# yum -y install gpudb-connector-spark-<X.Y.Z>-<YYYYMMDDhhmmss>.noarch.rpm
Once this RPM is installed, the following files should exist::
/opt/gpudb/connectors/spark/example.properties
/opt/gpudb/connectors/spark/example.sh
/opt/gpudb/connectors/spark/gpudb-spark-5.2.0.jar
/opt/gpudb/connectors/spark/gpudb-spark-5.2.0-jar-with-dependencies.jar
/opt/gpudb/connectors/spark/gpudb-spark-5.2.0-node-assembly.jar
/opt/gpudb/connectors/spark/gpudb-spark-5.2.0-shaded.jar
/opt/gpudb/connectors/spark/README.md
The gpudb.host
property in example.properties
should be modified to be the name of the GPUdb host being accessed.
To run the example, issue this Unix command with no parameters to display usage information::
[gpudb@local]$ ./example.sh