Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1 #101

Open
alankala opened this issue Jan 31, 2017 · 6 comments

Comments

@alankala
Copy link

alankala commented Jan 31, 2017

spark-rabbitmq version - 0.5.1
spark version - 2.1.0 (scala version - 2.11.8)
rabbitmq version - 3.5.6

I'm using Distributed approach for streaming -

    List<JavaRabbitMQDistributedKey> distributedKeys = new      LinkedList<JavaRabbitMQDistributedKey>();

    distributedKeys.add(new JavaRabbitMQDistributedKey(prop.getProperty("queue.name"),
            new ExchangeAndRouting(prop.getProperty("queue.exchange")),
            rabbitMqConParams
    ));

    Function<Delivery, String> messageHandler = new Function<Delivery, String>() {

        public String call(Delivery message) {
            return new String(message.getBody());
        }
    };

    JavaInputDStream<String> receiverStream =
            RabbitMQUtils.createJavaDistributedStream(streamCtx, String.class, distributedKeys, rabbitMqConParams, messageHandler);

I keep getting

    2017-01-31 12:48:10,376 ERROR [Executor task launch worker-4] executor.Executor: Exception in task 0.0 in stage 2.0 (TID 7)
    java.util.concurrent.TimeoutException
        at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:76)
        at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:110)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:372)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:583)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:508)
        at com.rabbitmq.client.impl.ChannelN.close(ChannelN.java:501)
        at org.apache.spark.streaming.rabbitmq.consumer.Consumer.close(Consumer.scala:132)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.close(RabbitMQRDD.scala:234)
        at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:66)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:75)
        at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:215)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745
   
    2017-01-31 12:48:10,424 INFO  [Executor task launch worker-4] executor.Executor: Running task 0.1 in stage 2.0 (TID 8)
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] consumer.Consumer: Failed to createChannel connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1. Remove connection 
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] storage.BlockManager: Putting block rdd_3_0 failed due to an exception
    2017-01-31 12:48:10,432 WARN  [Executor task launch worker-4] storage.BlockManager: Block rdd_3_0 could not be removed as it was not found on disk or in memory
    2017-01-31 12:48:10,435 ERROR [Executor task launch worker-4] executor.Executor: Exception in task 0.1 in stage 2.0 (TID 8)
    org.apache.spark.SparkException: Error creating channel and connection: connection is already closed due to connection error; cause: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 1
        at org.apache.spark.streaming.rabbitmq.consumer.Consumer$.apply(Consumer.scala:211)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.getConsumer(RabbitMQRDD.scala:243)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD$RabbitMQRDDIterator.<init>(RabbitMQRDD.scala:166)
        at org.apache.spark.streaming.rabbitmq.distributed.RabbitMQRDD.compute(RabbitMQRDD.scala:143)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
        at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:957)
        at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Anyone had this issue before? Any suggestions on how to solve it.

Thanks
Akhila.

@nelsou
Copy link
Contributor

nelsou commented Apr 14, 2017

+1, any updates on this ?
EDIT: For me, RabbitMQ was running on an old server. Migrating it on a more recent server with more CPU fixes the problem (3k msg/s).
EDIT2! @alankala ?

@compae
Copy link
Member

compae commented Aug 18, 2017

Any updates??

@nelsou
Copy link
Contributor

nelsou commented Aug 18, 2017

We had this problem when RabbitMQ was overload (10'000 msg/s in the same queue).
One of the solution was to create 10 queues (distributed on different server) with 2 consummers (level parallelism) on each queue (more is useless).

We also made a lot of change in the receiver (didn't PR yet but we will) to better handle timeouts, causing Spark Streaming to have scheduling delay.

@kaveh-hariri
Copy link

I'm having the same issue. When the queue grows beyond a certain threshold, it looks like RabbitMQ will discard messages from memory, and when you consume them it has to read from disk. Our streaming app will tend to get 5-6k ack rates, but when RAbbitMQ is reading from disk that falls to 80-120 per second which is horrible. With those low ack rates, it looks like you start to get failures like the above or other timeout exceptions.

For me the workaround was to run multiple streaming jobs during extremely heavy load periods, so when the app goes down to write (current maxReceiveTime setting is .8 the streaming window), the queue doesn't go over the threshold.

@kaveh-hariri
Copy link

Is there any way we can increase the timeout settngs?

@nelsou
Copy link
Contributor

nelsou commented Sep 16, 2017

yes and no. Some timeouts are hard coded (10s) in the AMQP client.
Create more queues and use the routing keys to dispatch them

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants