Skip to content

Commit

Permalink
Closes finagle#75 - change PostgresClientImpl to not cache failures
Browse files Browse the repository at this point in the history
  • Loading branch information
leonmaia committed Nov 13, 2018
1 parent c6c1e75 commit 39b34fb
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@ package com.twitter.finagle.postgres
import java.nio.charset.{Charset, StandardCharsets}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.immutable.Queue
import scala.language.implicitConversions
import scala.util.Random

import com.twitter.finagle.Status
import com.twitter.finagle.postgres.codec.Errors
import com.twitter.cache.Refresh
import com.twitter.conversions.time._
import com.twitter.finagle.postgres.messages._
import com.twitter.finagle.postgres.values._
import com.twitter.finagle.{Service, ServiceFactory}
import com.twitter.finagle.{Service, ServiceFactory, Status}
import com.twitter.logging.Logger
import com.twitter.util._
import org.jboss.netty.buffer.ChannelBuffer

import scala.language.existentials
import scala.language.{existentials, implicitConversions}
import scala.util.Random

/*
* A Finagle client for communicating with Postgres.
Expand Down Expand Up @@ -80,20 +77,25 @@ class PostgresClientImpl(
customTypesResult
}

private[postgres] val typeMap = types.map(Future(_)).getOrElse(retrieveTypeMap())

// The OIDs to be used when sending parameters
private[postgres] val encodeOids = typeMap.map {
tm => tm.toIndexedSeq.map {
case (oid, PostgresClient.TypeSpecifier(receiveFn, typeName, elemOid)) => typeName -> oid
}.groupBy(_._1).mapValues(_.map(_._2).min)
private[postgres] val typeMap = Refresh.every(1.hour) {
types.map(Future(_)).getOrElse(retrieveTypeMap())
}

// The OIDs to be used when sending parameters
private[postgres] val encodeOids =
typeMap().map {
tm =>
tm.toIndexedSeq.map {
case (oid, PostgresClient.TypeSpecifier(receiveFn, typeName, elemOid)) => typeName -> oid
}.groupBy(_._1).mapValues(_.map(_._2).min)
}

/*
* Execute some actions inside of a transaction using a single connection
*/
override def inTransaction[T](fn: PostgresClient => Future[T]): Future[T] = for {
types <- typeMap
types <- typeMap()
service <- factory()
constFactory = ServiceFactory.const(service)
id = Random.alphanumeric.take(28).mkString
Expand All @@ -116,7 +118,7 @@ class PostgresClientImpl(
* Issue an arbitrary SQL query and get the response.
*/
override def query(sql: String): Future[QueryResponse] = sendQuery(sql) {
case SelectResult(fields, rows) => typeMap.map {
case SelectResult(fields, rows) => typeMap().map {
types => ResultSet(fields, charset, rows, types, receiveFunctions)
}
case CommandCompleteResponse(affected) => Future(OK(affected))
Expand All @@ -142,7 +144,7 @@ class PostgresClientImpl(
* Run a single SELECT query and wrap the results with the provided function.
*/
override def select[T](sql: String)(f: Row => T): Future[Seq[T]] = for {
types <- typeMap
types <- typeMap()
result <- fetch(sql)
} yield result match {
case SelectResult(fields, rows) => ResultSet(fields, charset, rows, types, receiveFunctions).rows.map(f)
Expand All @@ -152,25 +154,32 @@ class PostgresClientImpl(
* Issue a single, prepared SELECT query and wrap the response rows with the provided function.
*/
override def prepareAndQuery[T](sql: String, params: Param[_]*)(f: Row => T): Future[Seq[T]] = {
for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
result <- statement.select(params: _*)(f)
} yield result
typeMap().flatMap { _ =>
for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
result <- statement.select(params: _*)(f)
} yield result
}
}

/*
* Issue a single, prepared arbitrary query without an expected result set, and provide the affected row count
*/
override def prepareAndExecute(sql: String, params: Param[_]*):Future[Int] = for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
OK(count) <- statement.exec(params: _*)
} yield count
override def prepareAndExecute(sql: String, params: Param[_]*): Future[Int] = {
typeMap().flatMap { _ =>
for {
service <- factory()
statement = new PreparedStatementImpl("", sql, service)
OK(count) <- statement.exec(params: _*)
} yield count
}
}


/**
* Close the underlying connection pool and make this Client eternally down
*
* @return
*/
override def close(): Future[Unit] = {
Expand Down Expand Up @@ -214,7 +223,7 @@ class PostgresClientImpl(
def closeService = service.close()

private[this] def parse(params: Param[_]*): Future[Unit] = {
val paramTypes = encodeOids.map {
val paramTypes = encodeOids().map {
oidMap => params.map {
param => oidMap.getOrElse(param.encoder.typeName, 0)
}
Expand Down Expand Up @@ -279,7 +288,7 @@ class PostgresClientImpl(
}

val f = for {
types <- typeMap
types <- typeMap()
pname <- parse(params: _*)
_ <- bind(paramBuffers)
fields <- describe()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class CustomTypesSpec extends Spec with GeneratorDrivenPropertyChecks {

s"A $mode postgres client with $paramsMode params" should {
"retrieve the available types from the remote DB" in {
val types = Await.result(client.typeMap)
val types = Await.result(client.typeMap())
assert(types.nonEmpty)
assert(types != PostgresClient.defaultTypes)
}
Expand Down

0 comments on commit 39b34fb

Please sign in to comment.