Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connector does not work in local Spark mode #15

Open
maxim-lixakov opened this issue Sep 10, 2024 · 3 comments
Open

Connector does not work in local Spark mode #15

maxim-lixakov opened this issue Sep 10, 2024 · 3 comments

Comments

@maxim-lixakov
Copy link

maxim-lixakov commented Sep 10, 2024

Description

The Spark-Greenplum connector does not work correctly in local Spark mode (local-master).
When performing read operations, there is a significant waiting time,
and write operations crash with a timeout error.
The problem is reproduced on Spark 3.x versions when writing data to Greenplum via spark-greenplum-connector_2.12-3.1.jar.

Steps to reproduce

To reproduce this issue you need:

  1. start GP container:

docker-compose.yml:

services:
  greenplum:
    image: andruche/greenplum:6
    restart: unless-stopped
    ports:
      - 5432:5432
    extra_hosts:
      - host.docker.internal:host-gateway
    sysctls:
      - net.ipv6.conf.all.disable_ipv6=1
docker compose up -d greenplum
  1. set up table with some test data
create database test
    
CREATE TABLE public.employee (
    employee_id SERIAL PRIMARY KEY,    
    first_name VARCHAR(50),            
    last_name VARCHAR(50),             
    birth_date DATE,                   
    salary NUMERIC(10, 2),            
    is_active BOOLEAN                  
)
DISTRIBUTED BY (employee_id);           

INSERT INTO public.employee (first_name, last_name, birth_date, salary, is_active)
VALUES 
('John', 'Doe', '1985-05-15', 55000.00, TRUE),
('Jane', 'Smith', '1990-10-25', 62000.00, TRUE),
('Mark', 'Johnson', '1978-03-12', 70000.00, FALSE),
('Lucy', 'Williams', '1983-07-19', 48000.00, TRUE);

SELECT * FROM employee;

  1. Build spark-greenplum-connector_2.12-3.1.jar from source, but it does not work in local mode,
    the problem is described in issue: The show method for the query data is stuck #14, I have made
    in the source code the changes suggested in the issue:
  def guessMaxParallelTasks(): Int = {
    val sparkContext = SparkContext.getOrCreate
    var guess: Int = -1
    val osName = System.getProperty("os.name")
    var isLocal: Boolean = false
    if (osName.toLowerCase().contains("windows") || osName.toLowerCase().contains("mac")) {
      isLocal = true
    }
    if (isLocal) {
      guess = sparkContext.getConf.getInt("spark.default.parallelism", 1) - 1;
    } else {
      while ((guess <= 0) && !Thread.currentThread().isInterrupted) {
        guess = sparkContext.getExecutorMemoryStatus.keys.size - 1
        if (sparkContext.deployMode == "cluster")
          guess -= 1
      }
    }
    guess
  }
  1. launch a spark session, when creating a spark session specify spark.default.parallelism=2 to make the guess value non-zero:
spark-shell --jars /path/to/spark-greenplum-connector/target/spark-greenplum-connector_2.12-3.1.jar --conf spark.default.parallelism=2
  1. run the code and make sure the connector is not working properly
val gpdf = spark.read.format("its-greenplum").option("url", "jdbc:postgresql://localhost:5432/test").option("user", "gpadmin").option( "password", "").option("dbtable","employee").load()
spark.sparkContext.setLogLevel("DEBUG")

// takes an average of 2 minutes
gpdf.show()

// takes an average of 2 minutes
gpdf.count()

val newEmployeeData = Seq(
  (9, "Alice", "Green", "1992-09-12", 58000.00, true),
  (10, "Bob", "White", "1980-01-23", 64000.00, false)
).toDF("employee_id", "first_name", "last_name", "birth_date", "salary", "is_active")


// fails with error:
// `Caused by: java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
//                                                                         "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
//                                                                          "executorId": "driver", "batchNo": null, "gpSegmentId": null,
//                                                                         "rowCount": 0, "status": "null", "gpfdistUrl": "null"}`
newEmployeeData.write.format("its-greenplum").option("url", "jdbc:postgresql://localhost:5432/test").option("user", "gpadmin").option("password", "").option("dbtable","employee").mode("append").save()

Logs

logs while trying to read data from table:

24/09/10 16:34:58 INFO RMISlave: 
coordinatorAsks: sqlTransferAbort, {"queryId": "a42adbf508224efa858b9a722482522d", "partId": "0",
                                    "instanceId": "0:0:2", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56701/output.pipe"}
24/09/10 16:34:58 DEBUG ProgressTracker: gpfCommitMs took 1038 ms
24/09/10 16:35:27 DEBUG RMIMaster: 
749 handlerAsks: checkIn, New gpfdist instance started on {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:27 INFO RMIMaster: New batch: 0,
seg2ServiceProvider.keySet={0}
address2Seg={10.195.113.139 -> Set(0)}
pcbByInstanceId=
{0:0:3 -> {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}}
24/09/10 16:35:27 DEBUG RMIMaster: 
waitBatch success, localBatchNo=0, batchSize=1

24/09/10 16:35:27 INFO RMISlave: 
coordinatorAsks: sqlTransferComplete, {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 0, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:27 DEBUG RMISlave: End of stream: threadId=132, instanceId=0:0:3
24/09/10 16:35:27 DEBUG GreenplumInputPartitionReader: gpfdist://10.195.113.139:56747/output.pipe epoch=0 end of stream, rowCount=4
24/09/10 16:35:27 WARN BufferExchange: doFlush: buff==null
24/09/10 16:35:27 INFO RMISlave: Calling commit on {"queryId": "6e031f3880a04c3ebfda01e1684f8a63", "partId": "0",
                                    "instanceId": "0:0:3", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 4, "status": "i", "gpfdistUrl": "gpfdist://10.195.113.139:56747/output.pipe"}
24/09/10 16:35:58 INFO RMIMaster: Batch 0: disconnected {"queryId": "a42adbf508224efa858b9a722482522d", "partId": "0",
                                    "instanceId": "0:0:2", "nodeIp": "10.195.113.139", "dir": "R",
                                    "executorId": "driver", "batchNo": 0, "gpSegmentId": 0,
                                    "rowCount": 1, "status": "a", "gpfdistUrl": "gpfdist://10.195.113.139:56701/output.pipe"}

as can be seen from the logs between the interaction of RMISlave and RMIMaster takes more than 30 seconds.

logs while trying to write data to table:

24/09/05 14:41:09 WARN TaskSetManager: Lost task 0.0 in stage 4.0 (TID 3) (192.168.1.69 executor driver): TaskKilled (Stage cancelled: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 4) (192.168.1.69 executor driver): java.rmi.UnexpectedException: unexpected exception; nested exception is: 
	java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
                                    "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
                                    "executorId": "driver", "batchNo": null, "gpSegmentId": null,
                                    "rowCount": 0, "status": "null", "gpfdistUrl": "null"}
	at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:253)
	at java.rmi.server.RemoteObjectInvocationHandler.invoke(RemoteObjectInvocationHandler.java:180)
	at com.sun.proxy.$Proxy29.handlerAsks(Unknown Source)
	at com.itsumma.gpconnector.rmi.RMISlave.<init>(RMISlave.scala:204)
	at com.itsumma.gpconnector.writer.GreenplumDataWriter.init(GreenplumDataWriter.scala:50)
	at com.itsumma.gpconnector.writer.GreenplumDataWriter.write(GreenplumDataWriter.scala:74)
	at com.itsumma.gpconnector.writer.GreenplumDataWriter.write(GreenplumDataWriter.scala:27)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
	at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
	at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
	at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Timeout 60000 elapsed for func=checkIn, {"queryId": "5b0d274c-e585-4d1d-a698-de00ab2bcf25", "partId": "1",
                                    "instanceId": "1:4:0", "nodeIp": "192.168.1.69", "dir": "W",
                                    "executorId": "driver", "batchNo": null, "gpSegmentId": null,
                                    "rowCount": 0, "status": "null", "gpfdistUrl": "null"}
	at com.itsumma.gpconnector.rmi.RMIMaster.handlerAsks(RMIMaster.scala:383)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357)
	at sun.rmi.transport.Transport$1.run(Transport.java:200)
	at sun.rmi.transport.Transport$1.run(Transport.java:197)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:573)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:834)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:688)
	at java.security.AccessController.doPrivileged(Native Method)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:687)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
	at sun.rmi.transport.StreamRemoteCall.exceptionReceivedFromServer(StreamRemoteCall.java:303)
	at sun.rmi.transport.StreamRemoteCall.executeCall(StreamRemoteCall.java:279)
	at sun.rmi.server.UnicastRef.invoke(UnicastRef.java:164)
	at java.rmi.server.RemoteObjectInvocationHandler.invokeRemoteMethod(RemoteObjectInvocationHandler.java:235)
	... 24 more

Driver stacktrace:)

Environment

Connector version: spark-greenplum-connector_2.12-3.1.jar
Java version, Scala version: Java 1.8.0, Scala 2.12
OS: macOS 13.4.1 (22F82)

@maxim-lixakov maxim-lixakov changed the title Коннектор не работает в локальном режиме Spark Connector does not work in local Spark mode Sep 10, 2024
@hovercraft-github
Copy link
Collaborator

Hello!
The reason probably is that your Greenplum instance, running inside a docker container, is not able to reach the gpfdist server which the connector start in the context of your Spark session.
For example, as it could be seen from the log, in this moment the gpfdist server is listening on the following address:
gpfdist://10.195.113.139:56701/output.pipe
, where port 56701 is a dynamic random port that change on every operation.
So you can try adjust your network routing rules somehow to let containerized application reach arbitrary TCP port
on the 10.195.113.139 address.
However, we don't recommend running Greenplum or Spark in a container, because we didn't tested such a scenario and believe it has no much sense.
Also there could be yet another network related problem in your setup:
the second log reveals address 192.168.1.69: how is it related to 10.195.113.139 ?
Do you have several network cards?
Or network configuration changed between passes ?

@dolfinus
Copy link

dolfinus commented Sep 20, 2024

The reason probably is that your Greenplum instance, running inside a docker container, is not able to reach the gpfdist server which the connector start in the context of your Spark session.

Reading data from Greenplum container to Spark executor is working. But for some reason it takes a minute to read a table with just 4 rows.
Also writing from Spark executor to the same Greenplum container is failing with timeout. Having network access INSERT INTO WRITABLE EXTERNAL TABLE -> Spark executor gpfdist server but not having network access SELECT FROM REABABLE EXTERNAL TABLE -> Spark executor gpfdist server does not sound plausible for me.

@hovercraft-github
Copy link
Collaborator

Oh, I see, you are right - reading "somehow" works, and writing doesn't at all.
By the way, I doubt that applying the guessMaxParallelTasks patch you mentioned is a good idea.
The purpose of guessMaxParallelTasks is to find the number of executor instances and it doesn't necessary correlate at all with number of partitions in the DataFrame (spark.default.parallelism).
I will try to reproduce your case, but it can take some time.
For a while I'd play with number of threads as in this post

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants