Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix initialization after reconnect debasishg/scala-redis#185 #255

Merged
merged 1 commit into from
Oct 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/scala/com/redis/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ trait IO extends Log {
socket != null && socket.isBound && !socket.isClosed && socket.isConnected && !socket.isInputShutdown && !socket.isOutputShutdown
}

def onConnect(): Unit

// Connects the socket, and sets the input and output streams.
def connect: Boolean = {
try {
Expand All @@ -31,6 +33,7 @@ trait IO extends Log {

out = socket.getOutputStream
in = new BufferedInputStream(socket.getInputStream)
onConnect()
true
} catch {
case x: Throwable =>
Expand Down
31 changes: 9 additions & 22 deletions src/main/scala/com/redis/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ trait Redis extends IO with Protocol {
result
} catch {
case e: RedisConnectionException =>
if (reconnect) send(command, args)(result)
if (disconnect) send(command, args)(result)
else throw e
case e: SocketException =>
if (reconnect) send(command, args)(result)
if (disconnect) send(command, args)(result)
else throw e
}

Expand All @@ -42,10 +42,10 @@ trait Redis extends IO with Protocol {
result
} catch {
case e: RedisConnectionException =>
if (reconnect) send(command)(result)
if (disconnect) send(command)(result)
else throw e
case e: SocketException =>
if (reconnect) send(command)(result)
if (disconnect) send(command)(result)
else throw e
}

Expand All @@ -54,11 +54,6 @@ trait Redis extends IO with Protocol {
protected def flattenPairs(in: Iterable[Product2[Any, Any]]): List[Any] =
in.iterator.flatMap(x => Iterator(x._1, x._2)).toList

def reconnect: Boolean = {
disconnect && initialize
}

protected def initialize : Boolean
}

trait RedisCommand extends Redis
Expand All @@ -78,16 +73,11 @@ trait RedisCommand extends Redis
val database: Int = 0
val secret: Option[Any] = None

override def initialize : Boolean = {
if(connect) {
secret.foreach {s =>
auth(s)
}
selectDatabase()
true
} else {
false
override def onConnect: Unit = {
secret.foreach {s =>
auth(s)
}
selectDatabase()
}

private def selectDatabase(): Unit = {
Expand All @@ -106,8 +96,6 @@ class RedisClient(override val host: String, override val port: Int,
override val database: Int = 0, override val secret: Option[Any] = None, override val timeout : Int = 0)
extends RedisCommand with PubSub {

initialize

def this() = this("localhost", 6379)
def this(connectionUri: java.net.URI) = this(
host = connectionUri.getHost,
Expand Down Expand Up @@ -217,13 +205,12 @@ class RedisClient(override val host: String, override val port: Int,
// TODO: Find a better abstraction
override def connected = parent.connected
override def connect = parent.connect
override def reconnect = parent.reconnect
override def disconnect = parent.disconnect
override def clearFd = parent.clearFd
override def write(data: Array[Byte]) = parent.write(data)
override def readLine = parent.readLine
override def readCounted(count: Int) = parent.readCounted(count)
override def initialize = parent.initialize
override def onConnect() = parent.onConnect()

override def close(): Unit = parent.close()
}
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/com/redis/ds/Deque.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class RedisDequeClient(val h: String, val p: Int, val d: Int = 0, val s: Option[
val key = k
override val database = d
override val secret = s
initialize

override def close(): Unit = disconnect
}
Expand Down
46 changes: 43 additions & 3 deletions src/test/scala/com/redis/RedisClientSpec.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package com.redis

import java.net.URI
import java.net.{ServerSocket, URI}

import com.github.dockerjava.core.DefaultDockerClientConfig
import com.redis.api.ApiSpec
import org.scalatest.FunSpec
import org.scalatest.Matchers
import com.whisk.docker.DockerContainerManager
import com.whisk.docker.impl.dockerjava.Docker
import org.scalatest.{FunSpec, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration._

class RedisClientSpec extends FunSpec
with Matchers with ApiSpec {
Expand Down Expand Up @@ -67,4 +72,39 @@ class RedisClientSpec extends FunSpec
r.get("vvl:qm")
r.close()
}}

describe("test reconnect") {
it("should re-init after server restart") {
val docker = new Docker(DefaultDockerClientConfig.createDefaultConfigBuilder().build()).client

val port = {
val s = new ServerSocket(0)
val p = s.getLocalPort
s.close()
p
}

val manager = new DockerContainerManager(
createContainer(ports = Map(redisPort -> port)) :: Nil, dockerFactory.createExecutor()
)

val key = "test-1"
val value = "test-value-1"

val (cs, _) :: _ = Await.result(manager.initReadyAll(20.seconds), 21.second)
val id = Await.result(cs.id, 10.seconds)

val c = new RedisClient(redisContainerHost, port, 8, timeout = 10.seconds.toMillis.toInt)
c.set(key, value)
docker.stopContainerCmd(id).exec()
try {c.get(key)} catch { case e: Throwable => }
docker.startContainerCmd(id).exec()
val got = c.get(key)
c.close()
docker.removeContainerCmd(id).withForce(true).withRemoveVolumes(true).exec()
docker.close()

got shouldBe Some(value)
}
}
}