Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broken Pipe Exception from Pool #524

Open
channingwalton opened this issue Aug 12, 2021 · 14 comments
Open

Broken Pipe Exception from Pool #524

channingwalton opened this issue Aug 12, 2021 · 14 comments
Assignees

Comments

@channingwalton
Copy link

I see this exception that seems related to Skunk, which happens randomly and not even when there is any interaction with the DB.

There was a suggestion it is a connection loss which I'm investigating.

java.io.IOException: Broken pipe
at apply @ skunk.util.Pool$.poolImpl$1(Pool.scala:80)
at java.base/sun.nio.ch.EPollPort$EventHandlerTask.run(EPollPort.java:306)
at flatMap @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:132)
at java.base/sun.nio.ch.UnixAsynchronousSocketChannelImpl.onEvent(UnixAsynchronousSocketChannelImpl.java:213)
at acquireN @ fs2.Stream.waitN$1(Stream.scala:1392)
at java.base/sun.nio.ch.AsynchronousChannelGroupImpl$1.run(AsynchronousChannelGroupImpl.java:112)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:50)
at java.base/sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:113)
at java.base/sun.nio.ch.IOUtil.write(IOUtil.java:79)
at java.base/java.lang.Thread.run(Thread.java:829)
at async_ @ fs2.io.net.SocketCompanionPlatform$AsyncSocket.go$2(SocketPlatform.scala:126)
@sebastianvoss
Copy link
Contributor

@channingwalton I'm currently seeing similar errors. Did you figure out the root cause?

@channingwalton
Copy link
Author

@channingwalton I'm currently seeing similar errors. Did you figure out the root cause?

Hi, no unfortunately I didn’t have time. We’ve switched to ember which doesn’t suffer the same issue.

@ahjohannessen
Copy link

ahjohannessen commented May 2, 2022

I am also getting: java.io.IOException: Broken pipe and skunk.exception.EofException: EOF was reached on the network socket. - Using Session.pooled(max = 5) - This is using consul connect with envoy for mTLS.

It only happens after a while, and hitting the same endpoint a couple of times makes thing work again. So, I suppose it is some kind of timeout issue.

@sebastianvoss Did you figure out what was wrong in your case?

@sebastianvoss
Copy link
Contributor

@ahjohannessen The root cause are connections which get into an invalid state (loosing connectivity to the DB or being closed by Postgres after idle timeout). Those connections remain in the pool and won't be removed as the health check only runs when returning a connection to the pool. We solved this for our specific use case but my plan is also to look into a more generic solution to contribute it.

@wvandermerwe
Copy link

@sebastianvoss How did you manage to work around this issue?

@fdietze
Copy link
Contributor

fdietze commented Aug 8, 2022

Here's how I'm currently working around the problem in ScalaJS / AWS lambda: I only use skunk for typescafe query construction, but submit them using pg-node/pg-connection-string (with scalablytyped). It also gave me a massive performance boost.

import skunk._

import scala.async.Async.{async, await}
import scala.concurrent.{ExecutionContext, Future}
import scala.scalajs.js
import scala.scalajs.js.annotation.JSImport
import scala.annotation.nowarn
import typings.pg.mod.ClientConfig
import typings.pg.mod.QueryArrayConfig
import typings.pg.mod.{Client => PgClient}
import cats.effect.std.Semaphore
import cats.effect.IO
import cats.implicits._

import cats.effect.unsafe.implicits.{global => unsafeIORuntimeGlobal}

import scala.scalajs.js.JSConverters._
import scala.util.{Failure, Success}

import skunk.implicits._

@js.native
@JSImport("pg-connection-string", JSImport.Namespace)
@nowarn("msg=never used")
object PgConnectionString extends js.Object {
  def parse(arg: String): ClientConfig = js.native
}

class PostgresClient(connectionString: String)(implicit ec: ExecutionContext) {

  private val client: PgClient              = new PgClient(PgConnectionString.parse(connectionString))
  private lazy val connection: Future[Unit] = client.connect().toFuture

  val transactionSemaphore: Future[Semaphore[IO]] = Semaphore[IO](1).unsafeToFuture()

  def command[PARAMS](
    command: Command[PARAMS],
    params: PARAMS = Void,
  ): Future[Unit] = async {
    await(connection)
    await(
      client
        .query(
          command.sql,
          command.encoder.encode(params).map(_.orNull).toJSArray,
        )
        .toFuture,
    )
    ()
  }

  def query[PARAMS, ROW](
    query: Query[PARAMS, ROW],
    params: PARAMS = Void,
  ): Future[Vector[ROW]] = async {

    await(connection) // wait until connection is ready
    val result = await(
      client
        .query[js.Array[js.Any], js.Array[js.Any]](
          QueryArrayConfig[js.Array[js.Any]](query.sql),
          query.encoder.encode(params).map(_.orNull.asInstanceOf[js.Any]).toJSArray,
        )
        .toFuture,
    )
    result.rows.view.map { row =>
      query.decoder.decode(
        0,
        row.view
          .map(any =>
            Option(any: Any).map {
              // TODO: tests for these data types
              case bool: Boolean => if (bool) "t" else "f"
              case date: js.Date =>
                // toString on js.Date localizes the date. `toISOString`
                // also adds time information that doesn't exist in the original
                // `Date` object in the database. This overhead gets cut away by
                // calling `substring`.
                date.toISOString().substring(0, 10)
              case other => other.toString
            },
          )
          .toList,
      ) match {
        case Left(err)         => throw new Exception(err.message)
        case Right(decodedRow) => decodedRow
      }
    }.toVector
  }

  def querySingleRow[PARAMS, ROW](queryFragment: Query[PARAMS, ROW], params: PARAMS = Void): Future[ROW] = async {
    val rows = await(query[PARAMS, ROW](queryFragment, params))
    if (rows.isEmpty) throw new RuntimeException("Requested single row, but got no rows.")
    rows.head
  }

  @nowarn("msg=dead code")
  def tx[T](code: => Future[T]) = async {
    await(connection) // wait until connection is ready
    val semaphore: Semaphore[IO] = await(transactionSemaphore)

    await(semaphore.acquire.unsafeToFuture()) // wait until other transaction has finished
    await(command(sql"BEGIN".command))        // begin transaction

    await(code.transformWith {
      case Success(result) =>
        async {
          val committed = await(command(sql"COMMIT".command).attempt)
          await(semaphore.release.unsafeToFuture())
          committed match {
            case Right(_)  => result
            case Left(err) => throw err
          }
        }
      case Failure(exception) =>
        async {
          println(s"Transaction failed. Rolling back.")
          val rolledBack = await(command(sql"ROLLBACK".command).attempt)
          await(semaphore.release.unsafeToFuture())
          rolledBack match {
            case Right(_) => throw exception
            case Left(err) =>
              val finalException = new Exception("ROLLBACK FAILED")
              finalException.addSuppressed(err)
              finalException.addSuppressed(exception)

              throw finalException
          }
        }
    })
  }
}

@pmsfc
Copy link

pmsfc commented Jun 20, 2023

@tpolecat This continues to happen in the last version (0.6.0) when using an RDS proxy.
The proxy will apply an IdleClientTimeout to every connection (default 30mins), so if we have a pool slightly bigger than we need some sessions will be in idle for more than 30 mins and when used will throw skunk.exception.EofException.

Right now we changed our approach to a lower pool size and bumped the rds proxy timeout to 4h, this will make it happen way less often but still feels hacky.

Is there a way to apply health checks to idle sessions or recover easily from this exception?

@armanbilge
Copy link
Member

Ember Client had a similar problem with connections going stale, here's how we fixed it:

@yilinwei
Copy link
Contributor

@pmsfc

It seems to be a common enough issue that libpq has a keep alive parameter associated with it, which it sets on the underlying socket.

fs2-io does have support for the option under .keepAlive, which you can pass in during creation of the pooled session.

@pmsfc
Copy link

pmsfc commented Jun 23, 2023

Thanks! I'll try it and comeback here with the results

@pmsfc
Copy link

pmsfc commented Jun 23, 2023

I'm still seeing issues with the keep alive set to true socketOptions = List(SocketOption.keepAlive(true)). I went back to the default 30 mins idle timeout on the rds proxy and after +/- 30 minutes the errors appeared.

@pmsfc
Copy link

pmsfc commented Jun 23, 2023

We moved to Doobie, hopefully no more of these on jdbc land

@armanbilge armanbilge self-assigned this Sep 18, 2023
@TonioGela
Copy link
Member

TonioGela commented Jan 29, 2024

@pmsfc

It seems to be a common enough issue that libpq has a keep alive parameter associated with it, which it sets on the underlying socket.

fs2-io does have support for the option under .keepAlive, which you can pass in during creation of the pooled session.

Won't this fix the issue only when the connection CAN be kept alive?
At the moment, I'm experiencing it since my Postgres instance, if untouched for at least 30 minutes, gets "garbage collected" by the infrastructure, and it spins up again whenever you attempt a new query/command. In this case, the keepAlive parameter is useless, as the connection can't be kept alive (the target doesn't exist in network terms).
Also, I want to point out that calling a second time the corresponding GET API that performs that postgres call ALWAYS works.
Probably something useful would be having some kind of health check that (when the db is not reachable, closes the session/resource).
I haven't taken a look at the source tbf, it's all theoretical.

[EDIT] The easy fix I have in place is ofc a retry that fixes this issue 99.9% of the time
[EDIT 2] I've looked at the code, and Recyclers should serve this purpose, so I can't tell where the problem is coming from.

@ThatScalaGuy
Copy link

I encountered the issue when running my application and database in a Docker Swarm environment. After precisely 30 minutes of inactivity, connections are terminated but remain in the connection pool. Interestingly, this problem does not occur when running the application natively with the database in Docker on my development machine.

The exact point in the network stack where the connection termination occurs remains unclear. To mitigate this issue, we I implemented two potential solutions:

1. Custom Connection Pool Implementation:
I developed a custom implementation of the ConnectionPool that periodically checks and validates the connections available in the pool.

2. Idle Connection Termination:
A more efficient approach involves terminating idle connections after a specified period. This method is likely to be more effective and resource-friendly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants