- * When used within the default serialization provider, there are additional requirements.
- * The implementations must define either or both of the two constructors:
- *
- *
Serializer(ClassLoader loader)
- *
This constructor is used to initialize the serializer for transient caches.
- *
This constructor is used to initialize the serializer for persistent caches and allows them to store any relevant
- * state in the provided repository.
- *
+ * When used within the default serialization provider, there is an additional requirement.
+ * The implementations must define a constructor that takes in a {@code ClassLoader}.
* The {@code ClassLoader} value may be {@code null}. If not {@code null}, the class loader
* instance provided should be used during deserialization to load classes needed by the deserialized objects.
*
diff --git a/api/src/main/java/org/ehcache/spi/serialization/StatefulSerializer.java b/api/src/main/java/org/ehcache/spi/serialization/StatefulSerializer.java
new file mode 100644
index 0000000000..3b3d0e0ff0
--- /dev/null
+++ b/api/src/main/java/org/ehcache/spi/serialization/StatefulSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.ehcache.spi.serialization;
+
+import org.ehcache.spi.persistence.StateRepository;
+
+/**
+ * Implementations of this interface can have their state maintained in a {@code StateRepository}.
+ * The state will be maintained by the authoritative tier of the cache for which this is configured.
+ *
+ * Implementations must be thread-safe.
+ *
+ *
+ * When used within the default serialization provider, there is an additional constructor requirement.
+ * The implementations must define a constructor that takes in a {@code ClassLoader}.
+ * Post instantiation, the state repository will be injected with the {@code init} method invocation.
+ * This is guaranteed to happen before any serialization/deserialization interaction.
+ *
+ *
+ * @param the type of the instances to serialize
+ *
+ * @see Serializer
+ */
+public interface StatefulSerializer extends Serializer {
+
+ /**
+ * This method is used to inject a {@code StateRepository} to the serializer
+ * by the authoritative tier of a cache during the cache initialization.
+ * The passed in state repository will have the persistent properties of the injecting tier.
+ *
+ * @param stateRepository the state repository
+ */
+ void init(StateRepository stateRepository);
+}
diff --git a/api/src/main/java/org/ehcache/spi/service/MaintainableService.java b/api/src/main/java/org/ehcache/spi/service/MaintainableService.java
index d5fc4adce7..a9d3e76adf 100644
--- a/api/src/main/java/org/ehcache/spi/service/MaintainableService.java
+++ b/api/src/main/java/org/ehcache/spi/service/MaintainableService.java
@@ -22,11 +22,23 @@
*/
@PluralService
public interface MaintainableService extends Service {
+
+ /**
+ * Defines Maintenance scope
+ */
+ enum MaintenanceScope {
+ /** Will impact the cache manager */
+ CACHE_MANAGER,
+ /** Will impact one or many caches */
+ CACHE
+ }
+
/**
* Start this service for maintenance, based on its default configuration.
- *
* @param serviceProvider enables to depend on other maintainable services
+ * @param maintenanceScope the scope of the maintenance
+ *
*/
- void startForMaintenance(ServiceProvider serviceProvider);
+ void startForMaintenance(ServiceProvider super MaintainableService> serviceProvider, MaintenanceScope maintenanceScope);
}
diff --git a/build.gradle b/build.gradle
index b850391e31..c5aafb303e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -14,32 +14,93 @@
* limitations under the License.
*/
import scripts.*
+import org.gradle.internal.jvm.Jvm
+
+buildscript {
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ classpath "io.codearte.gradle.nexus:gradle-nexus-staging-plugin:0.5.3"
+ }
+}
+
+// This adds tasks to auto close or release nexus staging repos
+// see https://github.com/Codearte/gradle-nexus-staging-plugin/
+project.plugins.apply 'io.codearte.nexus-staging'
+project.nexusStaging {
+ username = project.sonatypeUser
+ password = project.sonatypePwd
+ packageGroup = 'org.ehcache'
+}
+
+// Disable automatic promotion for added safety
+closeAndPromoteRepository.enabled = false
+
ext {
- baseVersion = '3.1.2-SNAPSHOT'
+
+ baseVersion = findProperty('overrideVersion') ?: '3.2.1-SNAPSHOT'
// Third parties
- offheapVersion = '2.2.2'
- statisticVersion = '1.1.0'
+ offheapVersion = '2.3.2'
+ statisticVersion = '1.4.1'
jcacheVersion = '1.0.0'
slf4jVersion = '1.7.7'
sizeofVersion = '0.3.0'
// Clustered
- terracottaPlatformVersion = '5.0.6.beta5'
+ terracottaPlatformVersion = '5.1.1-pre3'
managementVersion = terracottaPlatformVersion
- terracottaApisVersion = '1.0.6.beta'
- terracottaCoreVersion = '5.0.6-beta2'
+ terracottaApisVersion = '1.1.0'
+ terracottaCoreVersion = '5.1.1-pre2'
offheapResourceVersion = terracottaPlatformVersion
entityApiVersion = terracottaApisVersion
- terracottaPassthroughTestingVersion = '1.0.6.beta2'
+ terracottaPassthroughTestingVersion = '1.1.1-pre2'
entityTestLibVersion = terracottaPassthroughTestingVersion
- galvanVersion = '1.0.6-beta2'
+ galvanVersion = '1.1.1-pre2'
+
+ // Tools
+ findbugsVersion = '3.0.1'
utils = new Utils(baseVersion, logger)
isReleaseVersion = !baseVersion.endsWith('SNAPSHOT')
isCloudbees = System.getenv('JENKINS_URL')?.contains('cloudbees')
- logger.info("Is cloudbees? $isCloudbees")
+}
+
+if (deployUrl.contains('nexus')) {
+ ext {
+ deployUser = tcDeployUser
+ deployPwd = tcDeployPassword
+ }
+} else {
+ ext {
+ deployUser = sonatypeUser
+ deployPwd = sonatypePwd
+ }
+}
+
+// Java 6 build setup
+def java6Error = 'Set the poperty \'java6Home\' in your $HOME/.gradle/gradle.properties pointing to a Java 6 installation'
+assert (JavaVersion.current().isJava8Compatible()) : 'The Ehcache 3 build requires Java 8 to run and a configured Java 6 installation\n' + java6Error
+assert hasProperty('java6Home') : 'The Ehcache 3 build requires a configured Java 6 installation\n' + java6Error
+def java6HomeLocation = new File(java6Home)
+def testJavaHomeLocation = java6HomeLocation
+
+if (hasProperty('testVM')) {
+ switch (testVM) {
+ case '6':
+ break
+ case '7':
+ assert hasProperty('java7Home') : 'Set the poperty \'java7Home\' in your $HOME/.gradle/gradle.properties pointing to a Java 7 installation'
+ testJavaHomeLocation = new File(java7Home)
+ break
+ case '8':
+ testJavaHomeLocation = Jvm.current().javaHome
+ break
+ default:
+ throw new AssertionError("Unrecognized 'testVM' value $testVM - Accepted values are 7 or 8")
+ }
}
subprojects {
@@ -58,6 +119,9 @@ subprojects {
targetCompatibility = 1.6
repositories {
+ if (project.hasProperty('mvnlocal')) {
+ mavenLocal()
+ }
mavenCentral()
maven { url "http://repo.terracotta.org/maven2" }
}
@@ -72,12 +136,12 @@ subprojects {
}
dependencies {
- if (JavaVersion.current().compareTo(JavaVersion.VERSION_1_7) >= 0) {
- compileOnly 'com.google.code.findbugs:annotations:3.0.0'
- } else {
- compileOnly 'com.google.code.findbugs:annotations:2.0.3'
+ compileOnly "com.google.code.findbugs:annotations:$parent.findbugsVersion"
+ testCompileOnly "com.google.code.findbugs:annotations:$parent.findbugsVersion"
+ testCompile 'junit:junit:4.12', 'org.assertj:assertj-core:1.7.1', 'org.hamcrest:hamcrest-library:1.3'
+ testCompile('org.mockito:mockito-core:1.9.5') {
+ exclude group:'org.hamcrest', module:'hamcrest-core'
}
- testCompile 'junit:junit:4.11', 'org.hamcrest:hamcrest-library:1.3', 'org.mockito:mockito-core:1.9.5'
testRuntime "org.slf4j:slf4j-simple:$parent.slf4jVersion"
}
@@ -87,7 +151,7 @@ subprojects {
}
test {
- maxHeapSize = "512m"
+ maxHeapSize = "1408m"
systemProperty 'java.awt.headless', 'true'
if (parent.isCloudbees) {
systemProperty 'disable.concurrent.tests', 'true'
@@ -113,14 +177,6 @@ subprojects {
exclude '**/internal/**'
}
- if (JavaVersion.current().isJava8Compatible()) {
- allprojects {
- tasks.withType(Javadoc) {
- options.addStringOption('Xdoclint:none', '-quiet')
- }
- }
- }
-
task javadocJar(type: Jar, dependsOn: javadoc) {
from javadoc.destinationDir
classifier = 'javadoc'
@@ -136,17 +192,13 @@ subprojects {
checkstyle {
configFile = file("$rootDir/config/checkstyle.xml")
configProperties = ['projectDir':projectDir, 'rootDir':rootDir]
- toolVersion = '5.7'
+ toolVersion = '5.9'
}
findbugs {
ignoreFailures = false
sourceSets = [sourceSets.main]
- if (JavaVersion.current().compareTo(JavaVersion.VERSION_1_7) >= 0) {
- findbugs.toolVersion = '3.0.1'
- } else {
- findbugs.toolVersion = '2.0.3'
- }
+ findbugs.toolVersion = parent.findbugsVersion
}
jacoco {
@@ -159,6 +211,28 @@ subprojects {
csv.enabled false
}
}
+
+ tasks.withType(AbstractCompile) {
+ options.with {
+ fork = true
+ forkOptions.executable = utils.executables(java6HomeLocation).javac
+ }
+ }
+ tasks.withType(Test) {
+ executable = utils.executables(testJavaHomeLocation).java
+ }
+ tasks.withType(JavaExec) {
+ executable = utils.executables(testJavaHomeLocation).java
+ }
+ tasks.withType(Javadoc) {
+ options.addStringOption('Xdoclint:none', '-quiet')
+ }
+
+ configurations.all {
+ resolutionStrategy {
+ failOnVersionConflict()
+ }
+ }
}
allprojects {
diff --git a/buildSrc/src/main/groovy/EhDeploy.groovy b/buildSrc/src/main/groovy/EhDeploy.groovy
index 282c40b8e1..6c280a54b8 100644
--- a/buildSrc/src/main/groovy/EhDeploy.groovy
+++ b/buildSrc/src/main/groovy/EhDeploy.groovy
@@ -1,6 +1,8 @@
import org.gradle.api.Plugin
import org.gradle.api.Project
+import org.gradle.api.artifacts.maven.Conf2ScopeMappingContainer
import org.gradle.api.artifacts.maven.MavenDeployment
+import org.gradle.api.plugins.MavenPlugin
import org.gradle.plugins.signing.Sign
import scripts.Utils
@@ -32,35 +34,56 @@ class EhDeploy implements Plugin {
project.plugins.apply 'signing'
project.plugins.apply 'maven'
+ project.configurations {
+ provided
+ }
+
+ project.sourceSets {
+ main {
+ compileClasspath += project.configurations.provided
+ }
+ test {
+ compileClasspath += project.configurations.provided
+ runtimeClasspath += project.configurations.provided
+ }
+ }
+
project.signing {
required { project.isReleaseVersion && project.gradle.taskGraph.hasTask("uploadArchives") }
sign project.configurations.getByName('archives')
}
+ def artifactFiltering = {
+ pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.provided, Conf2ScopeMappingContainer.PROVIDED)
+
+ utils.pomFiller(pom, project.subPomName, project.subPomDesc)
+
+ }
+
+ project.install {
+ repositories.mavenInstaller artifactFiltering
+ }
+
project.uploadArchives {
repositories {
- mavenDeployer {
+ mavenDeployer ({
beforeDeployment { MavenDeployment deployment -> project.signing.signPom(deployment)}
if (project.isReleaseVersion) {
- repository(id: 'sonatype-nexus-staging', url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2/') {
- authentication(userName: project.sonatypeUser, password: project.sonatypePwd)
+ repository(url: project.deployUrl) {
+ authentication(userName: project.deployUser, password: project.deployPwd)
}
} else {
repository(id: 'sonatype-nexus-snapshot', url: 'https://oss.sonatype.org/content/repositories/snapshots') {
authentication(userName: project.sonatypeUser, password: project.sonatypePwd)
}
}
- }
+ } << artifactFiltering)
}
}
def installer = project.install.repositories.mavenInstaller
def deployer = project.uploadArchives.repositories.mavenDeployer
- [installer, deployer]*.pom*.whenConfigured {pom ->
- utils.pomFiller(pom, project.subPomName, project.subPomDesc)
- }
-
}
}
diff --git a/buildSrc/src/main/groovy/EhDistribute.groovy b/buildSrc/src/main/groovy/EhDistribute.groovy
index af43ca6361..a815108ba5 100644
--- a/buildSrc/src/main/groovy/EhDistribute.groovy
+++ b/buildSrc/src/main/groovy/EhDistribute.groovy
@@ -40,6 +40,7 @@ class EhDistribute implements Plugin {
def OSGI_OVERRIDE_KEYS = ['Import-Package', 'Export-Package', 'Private-Package', 'Tool', 'Bnd-LastModified', 'Created-By', 'Require-Capability']
project.configurations {
+ shadowCompile
shadowProvided
}
diff --git a/buildSrc/src/main/groovy/EhOsgi.groovy b/buildSrc/src/main/groovy/EhOsgi.groovy
index 1704876074..521d58f400 100644
--- a/buildSrc/src/main/groovy/EhOsgi.groovy
+++ b/buildSrc/src/main/groovy/EhOsgi.groovy
@@ -54,7 +54,7 @@ class EhOsgi implements Plugin {
if (project.hasProperty('shadowJar')) {
classesDir = project.shadowJar.archivePath
- classpath = project.files(project.configurations.shadow, project.configurations.shadowProvided)
+ classpath = project.files(project.configurations.shadowCompile, project.configurations.shadowProvided)
} else {
classesDir = new File(project.buildDir, 'classes/main') //can't figure out where to get this value
classpath = project.sourceSets.main.compileClasspath
diff --git a/buildSrc/src/main/groovy/EhPomMangle.groovy b/buildSrc/src/main/groovy/EhPomMangle.groovy
index a20274e57e..271271ab3d 100644
--- a/buildSrc/src/main/groovy/EhPomMangle.groovy
+++ b/buildSrc/src/main/groovy/EhPomMangle.groovy
@@ -27,7 +27,7 @@ import scripts.Utils
* Removes all implicit dependencies from the pom
* and adds only what is specified in (from shadowJar)
*
- * project.configurations.shadow (as compile)
+ * project.configurations.shadowCompile (as compile)
* project.configurations.shadowProvided (as provided)
*
* as well as (these do not affect shadow)
@@ -49,7 +49,7 @@ class EhPomMangle implements Plugin {
project.plugins.apply 'signing'
project.configurations {
- shadow
+ shadowCompile
shadowProvided
pomOnlyCompile
pomOnlyProvided
@@ -60,14 +60,14 @@ class EhPomMangle implements Plugin {
pom.scopeMappings.mappings.remove(project.configurations.runtime)
pom.scopeMappings.mappings.remove(project.configurations.testCompile)
pom.scopeMappings.mappings.remove(project.configurations.testRuntime)
- pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.shadow, Conf2ScopeMappingContainer.COMPILE)
+ pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.shadowCompile, Conf2ScopeMappingContainer.COMPILE)
pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.shadowProvided, Conf2ScopeMappingContainer.PROVIDED)
//Anything extra to add to pom that isn't in the shadowed jar or compilation
pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.pomOnlyCompile, Conf2ScopeMappingContainer.COMPILE)
pom.scopeMappings.addMapping(MavenPlugin.COMPILE_PRIORITY, project.configurations.pomOnlyProvided, Conf2ScopeMappingContainer.PROVIDED)
- utils.pomFiller(pom, 'Ehcache', 'Ehcache single jar, containing all modules')
+ utils.pomFiller(pom, project.subPomName, project.subPomDesc)
}
@@ -81,8 +81,8 @@ class EhPomMangle implements Plugin {
beforeDeployment { MavenDeployment deployment -> project.signing.signPom(deployment)}
if (project.isReleaseVersion) {
- repository(id: 'sonatype-nexus-staging', url: 'https://oss.sonatype.org/service/local/staging/deploy/maven2/') {
- authentication(userName: project.sonatypeUser, password: project.sonatypePwd)
+ repository(url: project.deployUrl) {
+ authentication(userName: project.deployUser, password: project.deployPwd)
}
} else {
repository(id: 'sonatype-nexus-snapshot', url: 'https://oss.sonatype.org/content/repositories/snapshots') {
diff --git a/buildSrc/src/main/groovy/MavenToolchain.groovy b/buildSrc/src/main/groovy/MavenToolchain.groovy
deleted file mode 100644
index af5b930343..0000000000
--- a/buildSrc/src/main/groovy/MavenToolchain.groovy
+++ /dev/null
@@ -1,50 +0,0 @@
-import org.gradle.api.JavaVersion
-import org.gradle.api.resources.MissingResourceException;
-import org.gradle.internal.os.OperatingSystem;
-
-/**
- * Emulates maven toolchains support by looking at the user's
- * ~/.m2/toolchains.xml
- *
- * Throws if this file is not found
- *
- * Provides a closure to use to find the correct jvm's executable, eg:
- * MavenToolchain.javaExecutable(JavaVersion.VERSION_1_8, 'javac')
- */
-class MavenToolchain {
-
- static def mavenToolchainDefinitions = {
- String userHome = System.getProperty("user.home");
- File toolchain = new File(userHome, ".m2" + File.separator + "toolchains.xml")
- if (toolchain.isFile()) {
- def xmlSlurper = new XmlSlurper()
- return new XmlSlurper().parse(toolchain)
- } else {
- throw new MissingResourceException("toolchain file not found at ${toolchain}" );
- }
- }
-
- static def toolchains;
- static {
- def xml = mavenToolchainDefinitions()
- if (xml == null) {
- toolchains = [:]
- } else {
- toolchains = xml.toolchain.findAll({ it.type.text() == 'jdk' }).collectEntries{[JavaVersion.toVersion(it.provides.version.text()), it.configuration.jdkHome.text()]}
- }
- }
-
- private static def exe = OperatingSystem.current().isWindows() ? '.exe' : ''
-
- static def javaHome = { v ->
- def jdk = toolchains.get(v);
- if (jdk == null) {
-
- throw new MissingResourceException("JDK $v not available - check your toolchains.xml")
- } else {
- return jdk;
- }
- }
-
- static def javaExecutable = { v, exec -> MavenToolchain.javaHome(v) + ['', 'bin', exec].join(File.separator) + exe }
-}
diff --git a/buildSrc/src/main/groovy/scripts/Utils.groovy b/buildSrc/src/main/groovy/scripts/Utils.groovy
index 1388470e90..40a88267d3 100644
--- a/buildSrc/src/main/groovy/scripts/Utils.groovy
+++ b/buildSrc/src/main/groovy/scripts/Utils.groovy
@@ -14,12 +14,15 @@
* limitations under the License.
*/
-package scripts;
+package scripts
+
+import org.gradle.internal.os.OperatingSystem
class Utils {
String version
String revision
+ Map> executablesPath = [:]
Utils(version, logger) {
this.version = version
@@ -85,4 +88,18 @@ class Utils {
}
}
}
+
+ def executables(path) {
+ def execMap = executablesPath.get(path)
+ if (execMap == null) {
+ execMap = [:].withDefault { execName ->
+ def extension = OperatingSystem.current().isWindows() ? ".exe" : ""
+ def executable = new File(path, 'bin' + File.separator + execName + extension)
+ assert executable.exists(): "There is no ${execName} executable in ${path}"
+ executable
+ }
+ executablesPath.put(path, execMap)
+ }
+ execMap
+ }
}
diff --git a/clustered/client/build.gradle b/clustered/client/build.gradle
index 9e1c16d5d3..fd4dc8a556 100644
--- a/clustered/client/build.gradle
+++ b/clustered/client/build.gradle
@@ -14,13 +14,16 @@
* limitations under the License.
*/
+import org.gradle.internal.jvm.Jvm
+
apply plugin: EhDeploy
dependencies {
compileOnly project(':api')
compileOnly project(':xml')
- compile project(':clustered:common')
- compile "org.terracotta:entity-client-api:$parent.entityApiVersion"
+ compile project(':clustered:common'), "org.slf4j:slf4j-api:$parent.slf4jVersion"
+ provided "org.terracotta:entity-client-api:$parent.entityApiVersion"
+ provided "org.terracotta:runnel:$parent.terracottaPlatformVersion"
testCompile project(':api')
testCompile project(':xml')
@@ -31,3 +34,17 @@ dependencies {
testCompile "org.terracotta:entity-test-lib:$parent.entityTestLibVersion"
testCompile "org.terracotta:passthrough-server:$parent.terracottaPassthroughTestingVersion"
}
+
+compileTestJava {
+ options.forkOptions.executable = Jvm.current().javacExecutable
+ sourceCompatibility = 1.8
+ targetCompatibility = 1.8
+}
+
+test {
+ executable = Jvm.current().javaExecutable
+}
+
+tasks.withType(JavaCompile) {
+ options.compilerArgs += ['-Werror']
+}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntity.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntity.java
index 7fd43c4d1a..803109f3df 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntity.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntity.java
@@ -27,15 +27,17 @@
import org.ehcache.clustered.common.ServerSideConfiguration;
import org.ehcache.clustered.common.internal.ServerStoreConfiguration;
import org.ehcache.clustered.common.internal.exceptions.ClusterException;
+import org.ehcache.clustered.common.internal.exceptions.InvalidClientIdException;
import org.ehcache.clustered.common.internal.exceptions.ResourceBusyException;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.Failure;
-import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse.Type;
+import org.ehcache.clustered.common.internal.messages.EhcacheResponseType;
+import org.ehcache.clustered.common.internal.messages.EhcacheMessageType;
+import org.ehcache.clustered.common.internal.messages.EhcacheOperationMessage;
import org.ehcache.clustered.common.internal.messages.LifeCycleMessageFactory;
-import org.ehcache.clustered.common.internal.messages.LifecycleMessage;
-import org.ehcache.clustered.common.internal.messages.ReconnectDataCodec;
-import org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ServerStoreOp;
+import org.ehcache.clustered.common.internal.messages.ReconnectMessage;
+import org.ehcache.clustered.common.internal.messages.ReconnectMessageCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.terracotta.connection.entity.Entity;
@@ -46,8 +48,9 @@
import org.terracotta.entity.MessageCodecException;
import org.terracotta.exception.EntityException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -56,9 +59,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-
-import static org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ServerStoreOp.GET;
-import static org.ehcache.clustered.common.internal.messages.ServerStoreOpMessage.ServerStoreOp.getServerStoreOp;
+import java.util.concurrent.atomic.AtomicLong;
/**
* The client-side {@link Entity} through which clustered cache operations are performed.
@@ -69,9 +70,6 @@ public class EhcacheClientEntity implements Entity {
private static final Logger LOGGER = LoggerFactory.getLogger(EhcacheClientEntity.class);
- private Set reconnectData = new HashSet();
- private int reconnectDatalen = 0;
-
public interface ResponseListener {
void onResponse(T response);
}
@@ -80,12 +78,22 @@ public interface DisconnectionListener {
void onDisconnection();
}
+ public interface ReconnectListener {
+ void onHandleReconnect(ReconnectMessage reconnectMessage);
+ }
+
+ private final AtomicLong sequenceGenerator = new AtomicLong(0L);
+
private final EntityClientEndpoint endpoint;
private final LifeCycleMessageFactory messageFactory;
private final Map, List>> responseListeners = new ConcurrentHashMap, List>>();
private final List disconnectionListeners = new CopyOnWriteArrayList();
- private final ReconnectDataCodec reconnectDataCodec = new ReconnectDataCodec();
+ private final List reconnectListeners = new ArrayList();
+ private final ReconnectMessageCodec reconnectMessageCodec = new ReconnectMessageCodec();
private volatile boolean connected = true;
+ private final Set caches = Collections.newSetFromMap(new ConcurrentHashMap());
+ private final Object lock = new Object();
+ private volatile UUID clientId;
private Timeouts timeouts = Timeouts.builder().build();
@@ -95,6 +103,7 @@ public EhcacheClientEntity(EntityClientEndpoint> responseListeners = this.responseListeners.get(response.getClass());
+ private void fireResponseEvent(T response) {
+ @SuppressWarnings("unchecked")
+ List> responseListeners = (List) this.responseListeners.get(response.getClass());
if (responseListeners == null) {
+ LOGGER.warn("Ignoring the response {} as no registered response listener could be found.", response);
return;
}
LOGGER.debug("{} registered response listener(s) for {}", responseListeners.size(), response.getClass());
- for (ResponseListener responseListener : responseListeners) {
+ for (ResponseListener responseListener : responseListeners) {
responseListener.onResponse(response);
}
}
+ public UUID getClientId() {
+ if (clientId == null) {
+ throw new IllegalStateException("Client Id cannot be null");
+ }
+ return this.clientId;
+ }
+
public boolean isConnected() {
return connected;
}
@@ -146,6 +170,30 @@ public void addDisconnectionListener(DisconnectionListener listener) {
disconnectionListeners.add(listener);
}
+ public void removeDisconnectionListener(DisconnectionListener listener) {
+ disconnectionListeners.remove(listener);
+ }
+
+ public List getDisconnectionListeners() {
+ return Collections.unmodifiableList(disconnectionListeners);
+ }
+
+ public void addReconnectListener(ReconnectListener listener) {
+ synchronized (lock) {
+ reconnectListeners.add(listener);
+ }
+ }
+
+ public void removeReconnectListener(ReconnectListener listener) {
+ synchronized (lock) {
+ reconnectListeners.remove(listener);
+ }
+ }
+
+ public List getReconnectListeners() {
+ return Collections.unmodifiableList(reconnectListeners);
+ }
+
public void addResponseListener(Class responseType, ResponseListener responseListener) {
List> responseListeners = this.responseListeners.get(responseType);
if (responseListeners == null) {
@@ -155,6 +203,13 @@ public void addResponseListener(Class respo
responseListeners.add(responseListener);
}
+ public void removeResponseListener(Class responseType, ResponseListener responseListener) {
+ List> responseListeners = this.responseListeners.get(responseType);
+ if (responseListeners != null) {
+ responseListeners.remove(responseListener);
+ }
+ }
+
public UUID identity() {
return ClusteredEhcacheIdentity.deserialize(endpoint.getEntityConfiguration());
}
@@ -162,11 +217,23 @@ public UUID identity() {
@Override
public void close() {
endpoint.close();
+ this.responseListeners.clear();
+ this.disconnectionListeners.clear();
+ this.reconnectListeners.clear();
}
public void validate(ServerSideConfiguration config) throws ClusteredTierManagerValidationException, TimeoutException {
try {
- invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.validateStoreManager(config), false);
+ while (true) {
+ try {
+ clientId = UUID.randomUUID();
+ this.messageFactory.setClientId(clientId);
+ invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.validateStoreManager(config), false);
+ break;
+ } catch (InvalidClientIdException e) {
+ //nothing to do - loop again since the earlier generated UUID is being already tracked by the server
+ }
+ }
} catch (ClusterException e) {
throw new ClusteredTierManagerValidationException("Error validating server clustered tier manager", e);
}
@@ -174,6 +241,8 @@ public void validate(ServerSideConfiguration config) throws ClusteredTierManager
public void configure(ServerSideConfiguration config) throws ClusteredTierManagerConfigurationException, TimeoutException {
try {
+ clientId = UUID.randomUUID();
+ this.messageFactory.setClientId(clientId);
invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.configureStoreManager(config), true);
} catch (ClusterException e) {
throw new ClusteredTierManagerConfigurationException("Error configuring clustered tier manager", e);
@@ -184,7 +253,7 @@ public void createCache(String name, ServerStoreConfiguration serverStoreConfigu
throws ClusteredTierCreationException, TimeoutException {
try {
invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.createServerStore(name, serverStoreConfiguration), true);
- addReconnectData(name);
+ caches.add(name);
} catch (ClusterException e) {
throw new ClusteredTierCreationException("Error creating clustered tier '" + name + "'", e);
}
@@ -194,7 +263,7 @@ public void validateCache(String name, ServerStoreConfiguration serverStoreConfi
throws ClusteredTierValidationException, TimeoutException {
try {
invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.validateServerStore(name , serverStoreConfiguration), false);
- addReconnectData(name);
+ caches.add(name);
} catch (ClusterException e) {
throw new ClusteredTierValidationException("Error validating clustered tier '" + name + "'", e);
}
@@ -203,7 +272,7 @@ public void validateCache(String name, ServerStoreConfiguration serverStoreConfi
public void releaseCache(String name) throws ClusteredTierReleaseException, TimeoutException {
try {
invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.releaseServerStore(name), false);
- removeReconnectData(name);
+ caches.remove(name);
} catch (ClusterException e) {
throw new ClusteredTierReleaseException("Error releasing clustered tier '" + name + "'", e);
}
@@ -212,7 +281,6 @@ public void releaseCache(String name) throws ClusteredTierReleaseException, Time
public void destroyCache(String name) throws ClusteredTierDestructionException, TimeoutException {
try {
invokeInternal(timeouts.getLifecycleOperationTimeout(), messageFactory.destroyServerStore(name), true);
- removeReconnectData(name);
} catch (ResourceBusyException e) {
throw new ClusteredTierDestructionException(e.getMessage(), e);
} catch (ClusterException e) {
@@ -220,18 +288,6 @@ public void destroyCache(String name) throws ClusteredTierDestructionException,
}
}
- private void addReconnectData(String name) {
- reconnectData.add(name);
- reconnectDatalen += name.length();
- }
-
- private void removeReconnectData(String name) {
- if (!reconnectData.contains(name)) {
- reconnectData.remove(name);
- reconnectDatalen -= name.length();
- }
- }
-
/**
* Sends a message to the {@code EhcacheActiveEntity} associated with this {@code EhcacheClientEntity} and
* awaits a response.
@@ -246,24 +302,23 @@ private void removeReconnectData(String name) {
*/
public EhcacheEntityResponse invoke(EhcacheEntityMessage message, boolean replicate)
throws ClusterException, TimeoutException {
- TimeoutDuration timeLimit;
- if (message.getType() == EhcacheEntityMessage.Type.SERVER_STORE_OP
- && GET_STORE_OPS.contains(getServerStoreOp(message.getOpCode()))) {
- timeLimit = timeouts.getReadOperationTimeout();
- } else {
- timeLimit = timeouts.getMutativeOperationTimeout();
+ TimeoutDuration timeLimit = timeouts.getMutativeOperationTimeout();
+ if (message instanceof EhcacheOperationMessage) {
+ if (GET_STORE_OPS.contains(((EhcacheOperationMessage) message).getMessageType())) {
+ timeLimit = timeouts.getReadOperationTimeout();
+ }
}
return invokeInternal(timeLimit, message, replicate);
}
- private static final Set GET_STORE_OPS = EnumSet.of(GET);
+ private static final Set GET_STORE_OPS = EnumSet.of(EhcacheMessageType.GET_STORE);
private EhcacheEntityResponse invokeInternal(TimeoutDuration timeLimit, EhcacheEntityMessage message, boolean replicate)
throws ClusterException, TimeoutException {
try {
EhcacheEntityResponse response = waitFor(timeLimit, invokeAsync(message, replicate));
- if (Type.FAILURE.equals(response.getType())) {
+ if (EhcacheResponseType.FAILURE.equals(response.getResponseType())) {
throw ((Failure)response).getCause();
} else {
return response;
@@ -283,12 +338,11 @@ private EhcacheEntityResponse invokeInternal(TimeoutDuration timeLimit, EhcacheE
public InvokeFuture invokeAsync(EhcacheEntityMessage message, boolean replicate)
throws MessageCodecException {
+ getClientId();
if (replicate) {
- return endpoint.beginInvoke().message(message).replicate(true).invoke(); //TODO: remove replicate call once
- //https://github.com/Terracotta-OSS/terracotta-apis/issues/139 is fixed
- } else {
- return endpoint.beginInvoke().message(message).replicate(false).invoke();
+ message.setId(sequenceGenerator.getAndIncrement());
}
+ return endpoint.beginInvoke().message(message).replicate(replicate).invoke();
}
private static T waitFor(TimeoutDuration timeLimit, InvokeFuture future)
@@ -317,7 +371,7 @@ private static T waitFor(TimeoutDuration timeLimit, InvokeFuture future)
*/
public static final class Timeouts {
- public static final TimeoutDuration DEFAULT_READ_OPERATION_TIMEOUT = TimeoutDuration.of(5, TimeUnit.SECONDS);
+ public static final TimeoutDuration DEFAULT_READ_OPERATION_TIMEOUT = TimeoutDuration.of(20, TimeUnit.SECONDS);
private final TimeoutDuration readOperationTimeout;
private final TimeoutDuration mutativeOperationTimeout;
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactory.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactory.java
index 3391af2f1f..0b91b03c86 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactory.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactory.java
@@ -27,9 +27,11 @@
import org.terracotta.connection.Connection;
import org.terracotta.connection.entity.EntityRef;
import org.terracotta.exception.EntityAlreadyExistsException;
+import org.terracotta.exception.EntityConfigurationException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityNotProvidedException;
import org.terracotta.exception.EntityVersionMismatchException;
+import org.terracotta.exception.PermanentEntityException;
import java.util.Map;
import java.util.UUID;
@@ -91,7 +93,7 @@ public void abandonLeadership(String entityIdentifier) {
* lifecycle operation timeout
*/
public void create(final String identifier, final ServerSideConfiguration config)
- throws EntityAlreadyExistsException, EhcacheEntityCreationException, EntityBusyException, TimeoutException {
+ throws EntityAlreadyExistsException, EhcacheEntityCreationException, EntityBusyException, TimeoutException {
Hold existingMaintenance = maintenanceHolds.get(identifier);
Hold localMaintenance = null;
if (existingMaintenance == null) {
@@ -99,43 +101,60 @@ public void create(final String identifier, final ServerSideConfiguration config
}
if (existingMaintenance == null && localMaintenance == null) {
throw new EntityBusyException("Unable to create clustered tier manager for id "
- + identifier + ": another client owns the maintenance lease");
- } else {
+ + identifier + ": another client owns the maintenance lease");
+ }
+
+ boolean finished = false;
+
+ try {
+ EntityRef ref = getEntityRef(identifier);
try {
- EntityRef ref = getEntityRef(identifier);
- try {
- while (true) {
- ref.create(UUID.randomUUID());
+ while (true) {
+ ref.create(UUID.randomUUID());
+ try {
+ EhcacheClientEntity entity = ref.fetchEntity();
try {
- EhcacheClientEntity entity = ref.fetchEntity();
- try {
- entity.setTimeouts(entityTimeouts);
- entity.configure(config);
- return;
- } finally {
+ entity.setTimeouts(entityTimeouts);
+ entity.configure(config);
+ finished = true;
+ return;
+ } finally {
+ if (finished) {
entity.close();
+ } else {
+ silentlyClose(entity, identifier);
}
- } catch (ClusteredTierManagerConfigurationException e) {
- try {
- ref.destroy();
- } catch (EntityNotFoundException f) {
- //ignore
- }
- throw new EhcacheEntityCreationException("Unable to configure clustered tier manager for id " + identifier, e);
- } catch (EntityNotFoundException e) {
- //continue;
}
+ } catch (ClusteredTierManagerConfigurationException e) {
+ try {
+ ref.destroy();
+ } catch (EntityNotFoundException f) {
+ //ignore
+ }
+ throw new EhcacheEntityCreationException("Unable to configure clustered tier manager for id " + identifier, e);
+ } catch (EntityNotFoundException e) {
+ //continue;
}
- } catch (EntityNotProvidedException e) {
- LOGGER.error("Unable to create clustered tier manager for id {}", identifier, e);
- throw new AssertionError(e);
- } catch (EntityVersionMismatchException e) {
- LOGGER.error("Unable to create clustered tier manager for id {}", identifier, e);
- throw new AssertionError(e);
}
- } finally {
- if (localMaintenance != null) {
+ } catch (EntityNotProvidedException e) {
+ LOGGER.error("Unable to create clustered tier manager for id {}", identifier, e);
+ throw new AssertionError(e);
+ } catch (EntityVersionMismatchException e) {
+ LOGGER.error("Unable to create clustered tier manager for id {}", identifier, e);
+ throw new AssertionError(e);
+ } catch (PermanentEntityException e) {
+ LOGGER.error("Unable to create entity - server indicates it is permanent", e);
+ throw new AssertionError(e);
+ } catch (EntityConfigurationException e) {
+ LOGGER.error("Unable to create entity - configuration exception", e);
+ throw new AssertionError(e);
+ }
+ } finally {
+ if (localMaintenance != null) {
+ if (finished) {
localMaintenance.unlock();
+ } else {
+ silentlyUnlock(localMaintenance, identifier);
}
}
}
@@ -156,64 +175,97 @@ public void create(final String identifier, final ServerSideConfiguration config
* lifecycle operation timeout
*/
public EhcacheClientEntity retrieve(String identifier, ServerSideConfiguration config)
- throws EntityNotFoundException, EhcacheEntityValidationException, TimeoutException {
+ throws EntityNotFoundException, EhcacheEntityValidationException, TimeoutException {
+
+ Hold fetchHold = createAccessLockFor(identifier).readLock();
+
+ EhcacheClientEntity entity;
try {
- Hold fetchHold = createAccessLockFor(identifier).readLock();
- EhcacheClientEntity entity = getEntityRef(identifier).fetchEntity();
- /*
- * Currently entities are never closed as doing so can stall the client
- * when the server is dead. Instead the connection is forcibly closed,
- * which suits our purposes since that will unlock the fetchHold too.
- */
- boolean validated = false;
- try {
- entity.setTimeouts(entityTimeouts);
- entity.validate(config);
- validated = true;
- return entity;
- } catch (ClusteredTierManagerValidationException e) {
- throw new EhcacheEntityValidationException("Unable to validate clustered tier manager for id " + identifier, e);
- } finally {
- if (!validated) {
- entity.close();
- fetchHold.unlock();
- }
- }
+ entity = getEntityRef(identifier).fetchEntity();
} catch (EntityVersionMismatchException e) {
LOGGER.error("Unable to retrieve clustered tier manager for id {}", identifier, e);
+ silentlyUnlock(fetchHold, identifier);
throw new AssertionError(e);
}
+
+ /*
+ * Currently entities are never closed as doing so can stall the client
+ * when the server is dead. Instead the connection is forcibly closed,
+ * which suits our purposes since that will unlock the fetchHold too.
+ */
+ boolean validated = false;
+ try {
+ entity.setTimeouts(entityTimeouts);
+ entity.validate(config);
+ validated = true;
+ return entity;
+ } catch (ClusteredTierManagerValidationException e) {
+ throw new EhcacheEntityValidationException("Unable to validate clustered tier manager for id " + identifier, e);
+ } finally {
+ if (!validated) {
+ silentlyClose(entity, identifier);
+ silentlyUnlock(fetchHold, identifier);
+ }
+ }
}
public void destroy(final String identifier) throws EhcacheEntityNotFoundException, EntityBusyException {
Hold existingMaintenance = maintenanceHolds.get(identifier);
Hold localMaintenance = null;
+
if (existingMaintenance == null) {
localMaintenance = createAccessLockFor(identifier).tryWriteLock();
}
+
if (existingMaintenance == null && localMaintenance == null) {
throw new EntityBusyException("Destroy operation failed; " + identifier + " clustered tier's maintenance lease held");
- } else {
+ }
+
+ boolean finished = false;
+
+ try {
+ EntityRef ref = getEntityRef(identifier);
try {
- EntityRef ref = getEntityRef(identifier);
- try {
- if (!ref.destroy()) {
- throw new EntityBusyException("Destroy operation failed; " + identifier + " clustered tier in use by other clients");
- }
- } catch (EntityNotProvidedException e) {
- LOGGER.error("Unable to delete clustered tier manager for id {}", identifier, e);
- throw new AssertionError(e);
- } catch (EntityNotFoundException e) {
- throw new EhcacheEntityNotFoundException(e);
+ if (!ref.destroy()) {
+ throw new EntityBusyException("Destroy operation failed; " + identifier + " clustered tier in use by other clients");
}
- } finally {
- if (localMaintenance != null) {
+ finished = true;
+ } catch (EntityNotProvidedException e) {
+ LOGGER.error("Unable to delete clustered tier manager for id {}", identifier, e);
+ throw new AssertionError(e);
+ } catch (EntityNotFoundException e) {
+ throw new EhcacheEntityNotFoundException(e);
+ } catch (PermanentEntityException e) {
+ LOGGER.error("Unable to destroy entity - server says it is permanent", e);
+ throw new AssertionError(e);
+ }
+ } finally {
+ if (localMaintenance != null) {
+ if (finished) {
localMaintenance.unlock();
+ } else {
+ silentlyUnlock(localMaintenance, identifier);
}
}
}
}
+ private void silentlyClose(EhcacheClientEntity entity, String identifier) {
+ try {
+ entity.close();
+ } catch (Exception e) {
+ LOGGER.error("Failed to close entity {}", identifier, e);
+ }
+ }
+
+ private void silentlyUnlock(Hold localMaintenance, String identifier) {
+ try {
+ localMaintenance.unlock();
+ } catch(Exception e) {
+ LOGGER.error("Failed to unlock for id {}", identifier, e);
+ }
+ }
+
private VoltronReadWriteLock createAccessLockFor(String entityIdentifier) {
return new VoltronReadWriteLock(connection, "EhcacheClientEntityFactory-AccessLock-" + entityIdentifier);
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityService.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityService.java
index bc7c03e6c5..b45d486f70 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityService.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/EhcacheClientEntityService.java
@@ -19,10 +19,15 @@
import java.util.UUID;
import org.ehcache.clustered.common.internal.ClusteredEhcacheIdentity;
+import org.ehcache.clustered.common.internal.messages.CommonConfigCodec;
import org.ehcache.clustered.common.internal.messages.EhcacheCodec;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityMessage;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
+import org.ehcache.clustered.common.internal.messages.LifeCycleMessageCodec;
+import org.ehcache.clustered.common.internal.messages.ResponseCodec;
+import org.ehcache.clustered.common.internal.messages.ServerStoreOpCodec;
+import org.ehcache.clustered.common.internal.messages.StateRepositoryOpCodec;
import org.terracotta.entity.EntityClientEndpoint;
import org.terracotta.entity.EntityClientService;
import org.terracotta.entity.MessageCodec;
@@ -45,12 +50,13 @@ public UUID deserializeConfiguration(byte[] configuration) {
}
@Override
- public EhcacheClientEntity create(EntityClientEndpoint endpoint) {
+ public EhcacheClientEntity create(EntityClientEndpoint endpoint) {
return new EhcacheClientEntity(endpoint);
}
@Override
public MessageCodec getMessageCodec() {
- return EhcacheCodec.messageCodec();
+ return new EhcacheCodec(new ServerStoreOpCodec(), new LifeCycleMessageCodec(new CommonConfigCodec()),
+ new StateRepositoryOpCodec(), new ResponseCodec());
}
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImpl.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImpl.java
index 8c74da7ead..9d1da5223a 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImpl.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImpl.java
@@ -50,7 +50,7 @@ public boolean isPersistent() {
@Override
public void validateUpdate(ResourcePool newPool) {
- super.validateUpdate(newPool);
+ throw new UnsupportedOperationException("Updating CLUSTERED resource is not supported");
}
@Override
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImpl.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImpl.java
index f5ea3c82f3..e49d2b99b0 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImpl.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImpl.java
@@ -18,6 +18,7 @@
import org.ehcache.clustered.client.config.ClusteredResourceType;
import org.ehcache.clustered.common.PoolAllocation;
+import org.ehcache.config.ResourcePool;
import org.ehcache.config.units.MemoryUnit;
import org.ehcache.core.config.SizedResourcePoolImpl;
@@ -64,6 +65,11 @@ public PoolAllocation getPoolAllocation() {
return new PoolAllocation.Dedicated(this.getFromResource(), this.getUnit().toBytes(this.getSize()));
}
+ @Override
+ public void validateUpdate(final ResourcePool newPool) {
+ throw new UnsupportedOperationException("Updating CLUSTERED resource is not supported");
+ }
+
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Pool {");
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImpl.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImpl.java
index 7d394beea5..647f133654 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImpl.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImpl.java
@@ -19,6 +19,7 @@
import org.ehcache.clustered.client.config.ClusteredResourceType;
import org.ehcache.clustered.client.config.SharedClusteredResourcePool;
import org.ehcache.clustered.common.PoolAllocation;
+import org.ehcache.config.ResourcePool;
import org.ehcache.core.config.AbstractResourcePool;
/**
@@ -60,6 +61,11 @@ public PoolAllocation getPoolAllocation() {
return new PoolAllocation.Shared(this.getSharedResourcePool());
}
+ @Override
+ public void validateUpdate(final ResourcePool newPool) {
+ throw new UnsupportedOperationException("Updating CLUSTERED resource is not supported");
+ }
+
@Override
public String toString() {
return "Pool {"
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLock.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLock.java
index 153bc111a5..b6d9d6e991 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLock.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLock.java
@@ -19,15 +19,21 @@
import java.io.Closeable;
import org.ehcache.clustered.common.internal.lock.LockMessaging.HoldType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.terracotta.connection.Connection;
import org.terracotta.connection.entity.EntityRef;
import org.terracotta.exception.EntityAlreadyExistsException;
+import org.terracotta.exception.EntityConfigurationException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityNotProvidedException;
import org.terracotta.exception.EntityVersionMismatchException;
+import org.terracotta.exception.PermanentEntityException;
public class VoltronReadWriteLock {
+ private static final Logger LOGGER = LoggerFactory.getLogger(VoltronReadWriteLock.class);
+
private final EntityRef reference;
public VoltronReadWriteLock(Connection connection, String id) {
@@ -71,13 +77,19 @@ private Hold tryLock(final HoldType type) {
}
}
- private boolean tryDestroy() {
+ private void tryDestroy() {
try {
- return reference.destroy();
+ boolean destroyed = reference.destroy();
+ if (destroyed) {
+ LOGGER.debug("Destroyed lock entity " + reference.getName());
+ }
} catch (EntityNotProvidedException e) {
throw new AssertionError(e);
} catch (EntityNotFoundException e) {
- return false;
+ // Nothing to do
+ } catch (PermanentEntityException e) {
+ LOGGER.error("Failed to destroy lock entity - server says it is permanent", e);
+ throw new AssertionError(e);
}
}
@@ -109,7 +121,9 @@ public void close() {
public void unlock() {
client.unlock(type);
client.close();
- tryDestroy();
+ if (type == HoldType.WRITE) {
+ tryDestroy();
+ }
}
}
@@ -118,8 +132,12 @@ private VoltronReadWriteLockClient createClientEntity() {
while (true) {
try {
reference.create(null);
+ LOGGER.debug("Created lock entity " + reference.getName());
} catch (EntityAlreadyExistsException f) {
//ignore
+ } catch (EntityConfigurationException e) {
+ LOGGER.error("Error creating lock entity - configuration exception", e);
+ throw new AssertionError(e);
}
try {
return reference.fetchEntity();
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/AbstractClientEntityFactory.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/AbstractClientEntityFactory.java
index 82733fdab0..b07093a170 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/AbstractClientEntityFactory.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/AbstractClientEntityFactory.java
@@ -23,9 +23,11 @@
import org.terracotta.connection.entity.Entity;
import org.terracotta.connection.entity.EntityRef;
import org.terracotta.exception.EntityAlreadyExistsException;
+import org.terracotta.exception.EntityConfigurationException;
import org.terracotta.exception.EntityNotFoundException;
import org.terracotta.exception.EntityNotProvidedException;
import org.terracotta.exception.EntityVersionMismatchException;
+import org.terracotta.exception.PermanentEntityException;
abstract class AbstractClientEntityFactory implements ClientEntityFactory {
@@ -86,6 +88,9 @@ public void create() throws EntityAlreadyExistsException {
} catch (EntityVersionMismatchException e) {
LOGGER.error("Unable to create entity {} for id {}", entityType.getName(), entityIdentifier, e);
throw new AssertionError(e);
+ } catch (EntityConfigurationException e) {
+ LOGGER.error("Unable to create entity - configuration exception", e);
+ throw new AssertionError(e);
}
}
@@ -109,6 +114,9 @@ public void destroy() throws EntityNotFoundException, EntityBusyException {
} catch (EntityNotProvidedException e) {
LOGGER.error("Unable to destroy entity {} for id {}", entityType.getName(), entityIdentifier, e);
throw new AssertionError(e);
+ } catch (PermanentEntityException e) {
+ LOGGER.error("Unable to destroy entity - server says it is permanent", e);
+ throw new AssertionError(e);
}
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java
similarity index 58%
rename from clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java
rename to clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java
index 3d1f96569c..d0693a7458 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ConcurrentClusteredMap.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateHolder.java
@@ -21,18 +21,17 @@
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.StateRepositoryMessageFactory;
import org.ehcache.clustered.common.internal.messages.StateRepositoryOpMessage;
+import org.ehcache.spi.persistence.StateHolder;
import java.util.AbstractMap;
-import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import static org.ehcache.clustered.client.internal.service.ValueCodecFactory.getCodecForClass;
-public class ConcurrentClusteredMap implements ConcurrentMap {
+public class ClusteredStateHolder implements StateHolder {
private final StateRepositoryMessageFactory messageFactory;
private final EhcacheClientEntity entity;
@@ -40,35 +39,16 @@ public class ConcurrentClusteredMap implements ConcurrentMap {
private final ValueCodec keyCodec;
private final ValueCodec valueCodec;
- public ConcurrentClusteredMap(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class keyClass, Class valueClass) {
+ public ClusteredStateHolder(final String cacheId, final String mapId, final EhcacheClientEntity entity, Class keyClass, Class valueClass) {
this.keyClass = keyClass;
this.keyCodec = getCodecForClass(keyClass);
this.valueCodec = getCodecForClass(valueClass);
- this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId);
+ this.messageFactory = new StateRepositoryMessageFactory(cacheId, mapId, entity.getClientId());
this.entity = entity;
}
@Override
- public int size() {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public boolean isEmpty() {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public boolean containsKey(final Object key) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public boolean containsValue(final Object value) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
+ @SuppressWarnings("unchecked")
public V get(final Object key) {
if (!keyClass.isAssignableFrom(key.getClass())) {
return null;
@@ -90,41 +70,12 @@ private Object getResponse(StateRepositoryOpMessage message) {
}
@Override
- public V put(final K key, final V value) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public V remove(final Object key) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public void putAll(final Map extends K, ? extends V> m) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public void clear() {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public Set keySet() {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public Collection values() {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public Set> entrySet() {
+ @SuppressWarnings("unchecked")
+ public Set> entrySet() {
@SuppressWarnings("unchecked")
- Set> response = (Set>) getResponse(messageFactory.entrySetMessage());
- Set> entries = new HashSet>();
- for (Entry objectEntry : response) {
+ Set> response = (Set>) getResponse(messageFactory.entrySetMessage());
+ Set> entries = new HashSet>();
+ for (Map.Entry objectEntry : response) {
entries.add(new AbstractMap.SimpleEntry(keyCodec.decode(objectEntry.getKey()),
valueCodec.decode(objectEntry.getValue())));
}
@@ -132,23 +83,10 @@ public Set> entrySet() {
}
@Override
+ @SuppressWarnings("unchecked")
public V putIfAbsent(final K key, final V value) {
Object response = getResponse(messageFactory.putIfAbsentMessage(keyCodec.encode(key), valueCodec.encode(value)));
return valueCodec.decode(response);
}
- @Override
- public boolean remove(final Object key, final Object value) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public boolean replace(final K key, final V oldValue, final V newValue) {
- throw new UnsupportedOperationException("TODO");
- }
-
- @Override
- public V replace(final K key, final V value) {
- throw new UnsupportedOperationException("TODO");
- }
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java
index c80d91d240..d4205d028e 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteredStateRepository.java
@@ -18,11 +18,10 @@
import org.ehcache.clustered.client.internal.EhcacheClientEntity;
import org.ehcache.clustered.client.service.ClusteringService;
-import org.ehcache.impl.internal.concurrent.ConcurrentHashMap;
+import org.ehcache.spi.persistence.StateHolder;
import org.ehcache.spi.persistence.StateRepository;
import java.io.Serializable;
-import java.util.concurrent.ConcurrentMap;
/**
* ClusteredStateRepository
@@ -40,7 +39,7 @@ class ClusteredStateRepository implements StateRepository {
}
@Override
- public ConcurrentMap getPersistentConcurrentMap(String name, Class keyClass, Class valueClass) {
- return new ConcurrentClusteredMap(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass);
+ public StateHolder getPersistentStateHolder(String name, Class keyClass, Class valueClass) {
+ return new ClusteredStateHolder(clusterCacheIdentifier.getId(), composedId + "-" + name, clientEntity, keyClass, valueClass);
}
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteringServiceFactory.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteringServiceFactory.java
index fb12d4c83c..d835ae6407 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteringServiceFactory.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/ClusteringServiceFactory.java
@@ -27,6 +27,7 @@
*
* @author Clifford W. Johnson
*/
+@ServiceFactory.RequiresConfiguration
public class ClusteringServiceFactory implements ServiceFactory {
@Override
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/DefaultClusteringService.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/DefaultClusteringService.java
index 77f46ff091..8a9fc2eabb 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/DefaultClusteringService.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/service/DefaultClusteringService.java
@@ -26,7 +26,6 @@
import org.ehcache.clustered.client.internal.EhcacheEntityNotFoundException;
import org.ehcache.clustered.client.internal.EhcacheEntityValidationException;
import org.ehcache.clustered.client.internal.config.ExperimentalClusteringServiceConfiguration;
-import org.ehcache.clustered.client.internal.store.ClusteredStore;
import org.ehcache.clustered.client.internal.store.EventualServerStoreProxy;
import org.ehcache.clustered.client.internal.store.ServerStoreProxy;
import org.ehcache.clustered.client.internal.store.StrongServerStoreProxy;
@@ -45,7 +44,6 @@
import org.ehcache.spi.persistence.StateRepository;
import org.ehcache.spi.service.MaintainableService;
import org.ehcache.spi.service.Service;
-import org.ehcache.spi.service.ServiceDependencies;
import org.ehcache.spi.service.ServiceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,7 +66,6 @@
/**
* Provides support for accessing server-based cluster services.
*/
-@ServiceDependencies(ClusteredStore.Provider.class)
class DefaultClusteringService implements ClusteringService, EntityService {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultClusteringService.class);
@@ -81,9 +78,9 @@ class DefaultClusteringService implements ClusteringService, EntityService {
private final ConcurrentMap knownPersistenceSpaces = new ConcurrentHashMap();
private final EhcacheClientEntity.Timeouts operationTimeouts;
- private Connection clusterConnection;
+ private volatile Connection clusterConnection;
private EhcacheClientEntityFactory entityFactory;
- private EhcacheClientEntity entity;
+ EhcacheClientEntity entity;
private volatile boolean inMaintenance = false;
@@ -125,7 +122,7 @@ public ClientEntityFactory newClientEntityFactory(St
return new AbstractClientEntityFactory(entityIdentifier, entityType, entityVersion, configuration) {
@Override
protected Connection getConnection() {
- if (clusterConnection == null) {
+ if (!isConnected()) {
throw new IllegalStateException(getClass().getSimpleName() + " not started.");
}
return clusterConnection;
@@ -133,6 +130,11 @@ protected Connection getConnection() {
};
}
+ @Override
+ public boolean isConnected() {
+ return clusterConnection != null;
+ }
+
@Override
public void start(final ServiceProvider serviceProvider) {
initClusterConnection();
@@ -153,16 +155,25 @@ public void start(final ServiceProvider serviceProvider) {
}
} catch (RuntimeException e) {
entityFactory = null;
- try {
- clusterConnection.close();
- clusterConnection = null;
- } catch (IOException ex) {
- LOGGER.warn("Error closing cluster connection: " + ex);
- }
+ closeConnection();
throw e;
}
}
+ @Override
+ public void startForMaintenance(ServiceProvider super MaintainableService> serviceProvider, MaintenanceScope maintenanceScope) {
+ initClusterConnection();
+ createEntityFactory();
+ if(maintenanceScope == MaintenanceScope.CACHE_MANAGER) {
+ if (!entityFactory.acquireLeadership(entityIdentifier)) {
+ entityFactory = null;
+ closeConnection();
+ throw new IllegalStateException("Couldn't acquire cluster-wide maintenance lease");
+ }
+ }
+ inMaintenance = true;
+ }
+
private void createEntityFactory() {
entityFactory = new EhcacheClientEntityFactory(clusterConnection, operationTimeouts);
}
@@ -204,24 +215,6 @@ private EhcacheClientEntity autoCreateEntity() throws EhcacheEntityValidationExc
}
}
- @Override
- public void startForMaintenance(ServiceProvider serviceProvider) {
- initClusterConnection();
- createEntityFactory();
-
- if (!entityFactory.acquireLeadership(entityIdentifier)) {
- entityFactory = null;
- try {
- clusterConnection.close();
- clusterConnection = null;
- } catch (IOException e) {
- LOGGER.warn("Error closing cluster connection: " + e);
- }
- throw new IllegalStateException("Couldn't acquire cluster-wide maintenance lease");
- }
- inMaintenance = true;
- }
-
@Override
public void stop() {
LOGGER.info("stop called for clustered tiers on {}", this.clusterUri);
@@ -238,14 +231,7 @@ public void stop() {
entity = null;
- try {
- if (clusterConnection != null) {
- clusterConnection.close();
- clusterConnection = null;
- }
- } catch (IOException ex) {
- throw new RuntimeException(ex);
- }
+ closeConnection();
}
@Override
@@ -315,38 +301,42 @@ public StateRepository getStateRepositoryWithin(PersistenceSpaceIdentifier> id
}
}
+ private void checkStarted() {
+ if(!isStarted()) {
+ throw new IllegalStateException(getClass().getName() + " should be started to call destroy");
+ }
+ }
+
@Override
public void destroy(String name) throws CachePersistenceException {
- boolean wasStarted = isStarted();
- // If the cluster isn't started, start it first to be able to destroy the cache
- if(!wasStarted) {
- initClusterConnection();
- createEntityFactory();
+ checkStarted();
+
+ // will happen when in maintenance mode
+ if(entity == null) {
try {
entity = entityFactory.retrieve(entityIdentifier, configuration.getServerConfiguration());
} catch (EntityNotFoundException e) {
// No entity on the server, so no need to destroy anything
} catch (TimeoutException e) {
throw new CachePersistenceException("Could not connect to the clustered tier manager '" + entityIdentifier
- + "'; retrieve operation timed out", e);
+ + "'; retrieve operation timed out", e);
}
}
+
try {
- entity.destroyCache(name);
+ if (entity != null) {
+ entity.destroyCache(name);
+ }
} catch (ClusteredTierDestructionException e) {
throw new CachePersistenceException(e.getMessage() + " (on " + clusterUri + ")", e);
} catch (TimeoutException e) {
throw new CachePersistenceException("Could not destroy clustered tier '" + name + "' on " + clusterUri
+ "; destroy operation timed out" + clusterUri, e);
- } finally {
- if (!wasStarted) {
- stop();
- }
}
}
protected boolean isStarted() {
- return entity != null;
+ return entityFactory != null;
}
@Override
@@ -376,6 +366,19 @@ public ServerStoreProxy getServerStoreProxy(final ClusteredCacheIdentifie
throw new IllegalStateException("A clustered resource is required for a clustered cache");
}
+ ServerStoreProxy serverStoreProxy;
+ ServerStoreMessageFactory messageFactory = new ServerStoreMessageFactory(cacheId, entity.getClientId());
+ switch (configuredConsistency) {
+ case STRONG:
+ serverStoreProxy = new StrongServerStoreProxy(messageFactory, entity);
+ break;
+ case EVENTUAL:
+ serverStoreProxy = new EventualServerStoreProxy(messageFactory, entity);
+ break;
+ default:
+ throw new AssertionError("Unknown consistency : " + configuredConsistency);
+ }
+
final ServerStoreConfiguration clientStoreConfiguration = new ServerStoreConfiguration(
clusteredResourcePool.getPoolAllocation(),
storeConfig.getKeyType().getName(),
@@ -390,40 +393,28 @@ public ServerStoreProxy getServerStoreProxy(final ClusteredCacheIdentifie
try {
if (configuration.isAutoCreate()) {
try {
- this.entity.validateCache(cacheId, clientStoreConfiguration);
- } catch (ClusteredTierValidationException ex) {
- if (ex.getCause() instanceof InvalidStoreException) {
- try {
- this.entity.createCache(cacheId, clientStoreConfiguration);
- } catch (TimeoutException e) {
- throw new CachePersistenceException("Unable to create clustered tier proxy '"
- + cacheIdentifier.getId() + "' for entity '" + entityIdentifier
- + "'; create operation timed out", e);
- }
- } else {
- throw ex;
+ entity.createCache(cacheId, clientStoreConfiguration);
+ } catch (ClusteredTierCreationException e) {
+ // An InvalidStoreException means the cache already exists. That's fine, the validateCache will then work
+ if (!(e.getCause() instanceof InvalidStoreException)) {
+ throw e;
}
+ entity.validateCache(cacheId, clientStoreConfiguration);
}
} else {
- this.entity.validateCache(cacheId, clientStoreConfiguration);
+ entity.validateCache(cacheId, clientStoreConfiguration);
}
} catch (ClusteredTierException e) {
+ serverStoreProxy.close();
throw new CachePersistenceException("Unable to create clustered tier proxy '" + cacheIdentifier.getId() + "' for entity '" + entityIdentifier + "'", e);
} catch (TimeoutException e) {
+ serverStoreProxy.close();
throw new CachePersistenceException("Unable to create clustered tier proxy '"
+ cacheIdentifier.getId() + "' for entity '" + entityIdentifier
+ "'; validate operation timed out", e);
}
- ServerStoreMessageFactory messageFactory = new ServerStoreMessageFactory(cacheId);
- switch (configuredConsistency) {
- case STRONG:
- return new StrongServerStoreProxy(messageFactory, entity);
- case EVENTUAL:
- return new EventualServerStoreProxy(messageFactory, entity);
- default:
- throw new AssertionError("Unknown consistency : " + configuredConsistency);
- }
+ return serverStoreProxy;
}
@Override
@@ -444,6 +435,18 @@ public void releaseServerStoreProxy(ServerStoreProxy storeProxy) {
}
}
+ private void closeConnection() {
+ Connection conn = clusterConnection;
+ clusterConnection = null;
+ if(conn != null) {
+ try {
+ conn.close();
+ } catch (IOException e) {
+ LOGGER.warn("Error closing cluster connection: " + e);
+ }
+ }
+ }
+
/**
* Supplies the identifier to use for identifying a client-side cache to its server counterparts.
*/
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java
index 17a8b29ddf..311f6c6607 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ClusteredStore.java
@@ -46,21 +46,29 @@
import org.ehcache.core.spi.store.events.StoreEventSource;
import org.ehcache.core.spi.store.StoreAccessException;
import org.ehcache.core.spi.store.tiering.AuthoritativeTier;
+import org.ehcache.core.statistics.AuthoritativeTierOperationOutcomes;
import org.ehcache.core.statistics.StoreOperationOutcomes;
import org.ehcache.core.spi.time.TimeSource;
import org.ehcache.core.spi.time.TimeSourceService;
+import org.ehcache.core.statistics.TierOperationOutcomes;
import org.ehcache.impl.config.loaderwriter.DefaultCacheLoaderWriterConfiguration;
import org.ehcache.impl.internal.events.NullStoreEventDispatcher;
+import org.ehcache.spi.persistence.StateRepository;
+import org.ehcache.spi.serialization.Serializer;
+import org.ehcache.spi.serialization.StatefulSerializer;
+import org.ehcache.spi.service.ServiceDependencies;
import org.ehcache.spi.service.ServiceProvider;
import org.ehcache.spi.service.Service;
import org.ehcache.spi.service.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.terracotta.context.annotations.ContextAttribute;
+import org.terracotta.statistics.MappedOperationStatistic;
import org.terracotta.statistics.StatisticsManager;
import org.terracotta.statistics.observer.OperationObserver;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -68,9 +76,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
-import static java.util.Collections.singleton;
import static org.ehcache.core.exceptions.StorePassThroughException.handleRuntimeException;
import static org.ehcache.core.internal.service.ServiceLocator.findSingletonAmongst;
import static org.terracotta.statistics.StatisticBuilder.operation;
@@ -80,7 +88,8 @@
*/
public class ClusteredStore implements AuthoritativeTier {
- private static final String STATISTICS_TAG = "clustered-store";
+ private static final String STATISTICS_TAG = "Clustered";
+ private static final int TIER_HEIGHT = ClusteredResourceType.Types.UNKNOWN.getTierHeight(); //TierHeight is the same for all ClusteredResourceType.Types
private final OperationsCodec codec;
private final ChainResolver resolver;
@@ -99,8 +108,8 @@ public class ClusteredStore implements AuthoritativeTier {
private final OperationObserver conditionalReplaceObserver;
// Needed for JSR-107 compatibility even if unused
private final OperationObserver evictionObserver;
+ private final OperationObserver getAndFaultObserver;
- private final ClusteredStoreStatsSettings clusteredStoreStatsSettings;
private ClusteredStore(final OperationsCodec codec, final ChainResolver resolver, TimeSource timeSource) {
this.codec = codec;
@@ -115,8 +124,36 @@ private ClusteredStore(final OperationsCodec codec, final ChainResolver tags = new HashSet(Arrays.asList(STATISTICS_TAG, "tier"));
+ Map properties = new HashMap();
+ properties.put("discriminator", STATISTICS_TAG);
+ StatisticsManager.createPassThroughStatistic(this, "mappings", tags, properties, new Callable() {
+ @Override
+ public Number call() throws Exception {
+ return -1L;
+ }
+ });
+ StatisticsManager.createPassThroughStatistic(this, "maxMappings", tags, properties, new Callable() {
+ @Override
+ public Number call() throws Exception {
+ return -1L;
+ }
+ });
+ StatisticsManager.createPassThroughStatistic(this, "allocatedMemory", tags, properties, new Callable() {
+ @Override
+ public Number call() throws Exception {
+ return -1L;
+ }
+ });
+ StatisticsManager.createPassThroughStatistic(this, "occupiedMemory", tags, properties, new Callable() {
+ @Override
+ public Number call() throws Exception {
+ return -1L;
+ }
+ });
- this.clusteredStoreStatsSettings = new ClusteredStoreStatsSettings(this);
}
/**
@@ -406,7 +443,7 @@ public Map> bulkCompute(final Set extends K> keys, final Fun
throws StoreAccessException {
Map> valueHolderMap = new HashMap>();
if(remappingFunction instanceof Ehcache.PutAllFunction) {
- Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction)remappingFunction;
+ Ehcache.PutAllFunction putAllFunction = (Ehcache.PutAllFunction)remappingFunction;
Map entriesToRemap = putAllFunction.getEntriesToRemap();
for(Map.Entry entry: entriesToRemap.entrySet()) {
PutStatus putStatus = silentPut(entry.getKey(), entry.getValue());
@@ -416,7 +453,7 @@ public Map> bulkCompute(final Set extends K> keys, final Fun
}
}
} else if(remappingFunction instanceof Ehcache.RemoveAllFunction) {
- Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction)remappingFunction;
+ Ehcache.RemoveAllFunction removeAllFunction = (Ehcache.RemoveAllFunction)remappingFunction;
for (K key : keys) {
boolean removed = silentRemove(key);
if(removed) {
@@ -472,7 +509,21 @@ public List getConfigurationChangeListeners()
@Override
public ValueHolder getAndFault(K key) throws StoreAccessException {
- return get(key);
+ getAndFaultObserver.begin();
+ V value;
+ try {
+ value = getInternal(key);
+ } catch (TimeoutException e) {
+ getAndFaultObserver.end(AuthoritativeTierOperationOutcomes.GetAndFaultOutcome.TIMEOUT);
+ return null;
+ }
+ if(value == null) {
+ getAndFaultObserver.end(AuthoritativeTierOperationOutcomes.GetAndFaultOutcome.MISS);
+ return null;
+ } else {
+ getAndFaultObserver.end(AuthoritativeTierOperationOutcomes.GetAndFaultOutcome.HIT);
+ return new ClusteredValueHolder(value);
+ }
}
@Override
@@ -495,6 +546,7 @@ public void setInvalidationValve(InvalidationValve valve) {
/**
* Provider of {@link ClusteredStore} instances.
*/
+ @ServiceDependencies({TimeSourceService.class, ClusteringService.class})
public static class Provider implements Store.Provider, AuthoritativeTier.Provider {
private static final Logger LOGGER = LoggerFactory.getLogger(Provider.class);
@@ -510,16 +562,36 @@ public static class Provider implements Store.Provider, AuthoritativeTier.Provid
private volatile ClusteringService clusteringService;
private final Map, StoreConfig> createdStores = new ConcurrentWeakIdentityHashMap, StoreConfig>();
+ private final Map, Collection>> tierOperationStatistics = new ConcurrentWeakIdentityHashMap, Collection>>();
@Override
public ClusteredStore createStore(final Configuration storeConfig, final ServiceConfiguration>... serviceConfigs) {
+ ClusteredStore store = createStoreInternal(storeConfig, serviceConfigs);
+ Collection> tieredOps = new ArrayList>();
+
+ MappedOperationStatistic get =
+ new MappedOperationStatistic(
+ store, TierOperationOutcomes.GET_TRANSLATION, "get", TIER_HEIGHT, "get", STATISTICS_TAG);
+ StatisticsManager.associate(get).withParent(store);
+ tieredOps.add(get);
+
+ MappedOperationStatistic evict =
+ new MappedOperationStatistic(
+ store, TierOperationOutcomes.EVICTION_TRANSLATION, "eviction", TIER_HEIGHT, "eviction", STATISTICS_TAG);
+ StatisticsManager.associate(evict).withParent(store);
+ tieredOps.add(evict);
+
+ tierOperationStatistics.put(store, tieredOps);
+ return store;
+ }
- DefaultCacheLoaderWriterConfiguration loaderWriterConfiguration = findSingletonAmongst(DefaultCacheLoaderWriterConfiguration.class, (Object[])serviceConfigs);
+ private ClusteredStore createStoreInternal(Configuration storeConfig, Object[] serviceConfigs) {
+ DefaultCacheLoaderWriterConfiguration loaderWriterConfiguration = findSingletonAmongst(DefaultCacheLoaderWriterConfiguration.class, serviceConfigs);
if (loaderWriterConfiguration != null) {
throw new IllegalStateException("CacheLoaderWriter is not supported with clustered tiers");
}
- CacheEventListenerConfiguration eventListenerConfiguration = findSingletonAmongst(CacheEventListenerConfiguration.class, (Object[])serviceConfigs);
+ CacheEventListenerConfiguration eventListenerConfiguration = findSingletonAmongst(CacheEventListenerConfiguration.class, serviceConfigs);
if (eventListenerConfiguration != null) {
throw new IllegalStateException("CacheEventListener is not supported with clustered tiers");
}
@@ -539,11 +611,11 @@ public ClusteredStore createStore(final Configuration storeCo
throw new IllegalStateException(Provider.class.getCanonicalName() + ".createStore can not create clustered tier with multiple clustered resources");
}
- ClusteredStoreConfiguration clusteredStoreConfiguration = findSingletonAmongst(ClusteredStoreConfiguration.class, (Object[])serviceConfigs);
+ ClusteredStoreConfiguration clusteredStoreConfiguration = findSingletonAmongst(ClusteredStoreConfiguration.class, serviceConfigs);
if (clusteredStoreConfiguration == null) {
clusteredStoreConfiguration = new ClusteredStoreConfiguration();
}
- ClusteredCacheIdentifier cacheId = findSingletonAmongst(ClusteredCacheIdentifier.class, (Object[]) serviceConfigs);
+ ClusteredCacheIdentifier cacheId = findSingletonAmongst(ClusteredCacheIdentifier.class, serviceConfigs);
TimeSource timeSource = serviceProvider.getService(TimeSourceService.class).getTimeSource();
@@ -552,7 +624,7 @@ public ClusteredStore createStore(final Configuration storeCo
ClusteredStore store = new ClusteredStore(codec, resolver, timeSource);
- StatisticsManager.associate(store.clusteredStoreStatsSettings).withParent(store);
+
createdStores.put(store, new StoreConfig(cacheId, storeConfig, clusteredStoreConfiguration.getConsistency()));
return store;
}
@@ -564,7 +636,8 @@ public void releaseStore(final Store, ?> resource) {
}
ClusteredStore clusteredStore = (ClusteredStore)resource;
this.clusteringService.releaseServerStoreProxy(clusteredStore.storeProxy);
- StatisticsManager.dissociate(clusteredStore.clusteredStoreStatsSettings).fromParent(clusteredStore);
+ StatisticsManager.nodeFor(clusteredStore).clean();
+ tierOperationStatistics.remove(clusteredStore);
}
@Override
@@ -573,15 +646,40 @@ public void initStore(final Store, ?> resource) {
if (storeConfig == null) {
throw new IllegalArgumentException("Given clustered tier is not managed by this provider : " + resource);
}
- final ClusteredStore clusteredStore = (ClusteredStore) resource;
+ final ClusteredStore, ?> clusteredStore = (ClusteredStore, ?>) resource;
+ ClusteredCacheIdentifier cacheIdentifier = storeConfig.getCacheIdentifier();
try {
- clusteredStore.storeProxy = clusteringService.getServerStoreProxy(storeConfig.getCacheIdentifier(), storeConfig.getStoreConfig(), storeConfig.getConsistency());
+ clusteredStore.storeProxy = clusteringService.getServerStoreProxy(cacheIdentifier, storeConfig.getStoreConfig(), storeConfig.getConsistency());
} catch (CachePersistenceException e) {
- throw new RuntimeException("Unable to create clustered tier proxy - " + storeConfig.getCacheIdentifier(), e);
+ throw new RuntimeException("Unable to create clustered tier proxy - " + cacheIdentifier, e);
+ }
+
+ Serializer keySerializer = clusteredStore.codec.getKeySerializer();
+ if (keySerializer instanceof StatefulSerializer) {
+ StateRepository stateRepository = null;
+ try {
+ stateRepository = clusteringService.getStateRepositoryWithin(cacheIdentifier, cacheIdentifier.getId() + "-Key");
+ } catch (CachePersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ ((StatefulSerializer)keySerializer).init(stateRepository);
}
+ Serializer valueSerializer = clusteredStore.codec.getValueSerializer();
+ if (valueSerializer instanceof StatefulSerializer) {
+ StateRepository stateRepository = null;
+ try {
+ stateRepository = clusteringService.getStateRepositoryWithin(cacheIdentifier, cacheIdentifier.getId() + "-Value");
+ } catch (CachePersistenceException e) {
+ throw new RuntimeException(e);
+ }
+ ((StatefulSerializer)valueSerializer).init(stateRepository);
+ }
+
clusteredStore.storeProxy.addInvalidationListener(new ServerStoreProxy.InvalidationListener() {
@Override
public void onInvalidateHash(long hash) {
+ StoreOperationOutcomes.EvictionOutcome result = StoreOperationOutcomes.EvictionOutcome.SUCCESS;
+ clusteredStore.evictionObserver.begin();
if (clusteredStore.invalidationValve != null) {
try {
LOGGER.debug("CLIENT: calling invalidation valve for hash {}", hash);
@@ -589,8 +687,10 @@ public void onInvalidateHash(long hash) {
} catch (StoreAccessException sae) {
//TODO: what should be done here? delegate to resilience strategy?
LOGGER.error("Error invalidating hash {}", hash, sae);
+ result = StoreOperationOutcomes.EvictionOutcome.FAILURE;
}
}
+ clusteredStore.evictionObserver.end(result);
}
@Override
@@ -640,7 +740,23 @@ public void stop() {
@Override
public AuthoritativeTier createAuthoritativeTier(Configuration storeConfig, ServiceConfiguration>... serviceConfigs) {
- return createStore(storeConfig, serviceConfigs);
+ ClusteredStore authoritativeTier = createStoreInternal(storeConfig, serviceConfigs);
+ Collection> tieredOps = new ArrayList>();
+
+ MappedOperationStatistic get =
+ new MappedOperationStatistic(
+ authoritativeTier, TierOperationOutcomes.GET_AND_FAULT_TRANSLATION, "get", TIER_HEIGHT, "getAndFault", STATISTICS_TAG);
+ StatisticsManager.associate(get).withParent(authoritativeTier);
+ tieredOps.add(get);
+
+ MappedOperationStatistic evict =
+ new MappedOperationStatistic(
+ authoritativeTier, TierOperationOutcomes.EVICTION_TRANSLATION, "eviction", TIER_HEIGHT, "eviction", STATISTICS_TAG);
+ StatisticsManager.associate(evict).withParent(authoritativeTier);
+ tieredOps.add(evict);
+
+ tierOperationStatistics.put(authoritativeTier, tieredOps);
+ return authoritativeTier;
}
@Override
@@ -657,16 +773,16 @@ public void initAuthoritativeTier(AuthoritativeTier, ?> resource) {
private static class StoreConfig {
private final ClusteredCacheIdentifier cacheIdentifier;
- private final Store.Configuration storeConfig;
+ private final Store.Configuration, ?> storeConfig;
private final Consistency consistency;
- StoreConfig(ClusteredCacheIdentifier cacheIdentifier, Configuration storeConfig, Consistency consistency) {
+ StoreConfig(ClusteredCacheIdentifier cacheIdentifier, Configuration, ?> storeConfig, Consistency consistency) {
this.cacheIdentifier = cacheIdentifier;
this.storeConfig = storeConfig;
this.consistency = consistency;
}
- public Configuration getStoreConfig() {
+ public Configuration, ?> getStoreConfig() {
return this.storeConfig;
}
@@ -678,14 +794,4 @@ public Consistency getConsistency() {
return consistency;
}
}
-
- private static final class ClusteredStoreStatsSettings {
- @ContextAttribute("tags") private final Set tags = singleton("store");
- @ContextAttribute("authoritativeTier") private final ClusteredStore, ?> authoritativeTier;
-
- ClusteredStoreStatsSettings(ClusteredStore, ?> store) {
- this.authoritativeTier = store;
- }
- }
-
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java
new file mode 100644
index 0000000000..8bc103a176
--- /dev/null
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/CommonServerStoreProxy.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered.client.internal.store;
+
+import org.ehcache.clustered.client.internal.EhcacheClientEntity;
+import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
+import org.ehcache.clustered.common.internal.messages.EhcacheResponseType;
+import org.ehcache.clustered.common.internal.messages.ServerStoreMessageFactory;
+import org.ehcache.clustered.common.internal.store.Chain;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Provides client-side access to the services of a {@code ServerStore}.
+ */
+class CommonServerStoreProxy implements ServerStoreProxy {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(CommonServerStoreProxy.class);
+
+ private final ServerStoreMessageFactory messageFactory;
+ private final EhcacheClientEntity entity;
+
+ private final List invalidationListeners = new CopyOnWriteArrayList();
+ private final Map, EhcacheClientEntity.ResponseListener extends EhcacheEntityResponse>> responseListeners
+ = new ConcurrentHashMap, EhcacheClientEntity.ResponseListener extends EhcacheEntityResponse>>();
+
+ CommonServerStoreProxy(final ServerStoreMessageFactory messageFactory, final EhcacheClientEntity entity) {
+ this.messageFactory = messageFactory;
+ this.entity = entity;
+ this.responseListeners.put(EhcacheEntityResponse.ServerInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
+ @Override
+ public void onResponse(EhcacheEntityResponse.ServerInvalidateHash response) {
+ if (response.getCacheId().equals(messageFactory.getCacheId())) {
+ long key = response.getKey();
+ LOGGER.debug("CLIENT: on cache {}, server requesting hash {} to be invalidated", messageFactory.getCacheId(), key);
+ for (InvalidationListener listener : invalidationListeners) {
+ listener.onInvalidateHash(key);
+ }
+ } else {
+ LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
+ }
+ }
+ });
+ this.responseListeners.put(EhcacheEntityResponse.ClientInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
+ @Override
+ public void onResponse(EhcacheEntityResponse.ClientInvalidateHash response) {
+ final String cacheId = response.getCacheId();
+ final long key = response.getKey();
+ final int invalidationId = response.getInvalidationId();
+
+ if (cacheId.equals(messageFactory.getCacheId())) {
+ LOGGER.debug("CLIENT: doing work to invalidate hash {} from cache {} (ID {})", key, cacheId, invalidationId);
+ for (InvalidationListener listener : invalidationListeners) {
+ listener.onInvalidateHash(key);
+ }
+
+ try {
+ LOGGER.debug("CLIENT: ack'ing invalidation of hash {} from cache {} (ID {})", key, cacheId, invalidationId);
+ entity.invokeAsync(messageFactory.clientInvalidationAck(invalidationId), false);
+ } catch (Exception e) {
+ //TODO: what should be done here?
+ LOGGER.error("error acking client invalidation of hash {} on cache {}", key, cacheId, e);
+ }
+ } else {
+ LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
+ }
+ }
+ });
+ this.responseListeners.put(EhcacheEntityResponse.ClientInvalidateAll.class, new EhcacheClientEntity.ResponseListener() {
+ @Override
+ public void onResponse(EhcacheEntityResponse.ClientInvalidateAll response) {
+ final String cacheId = response.getCacheId();
+ final int invalidationId = response.getInvalidationId();
+
+ if (cacheId.equals(messageFactory.getCacheId())) {
+ LOGGER.debug("CLIENT: doing work to invalidate all from cache {} (ID {})", cacheId, invalidationId);
+ for (InvalidationListener listener : invalidationListeners) {
+ listener.onInvalidateAll();
+ }
+
+ try {
+ LOGGER.debug("CLIENT: ack'ing invalidation of all from cache {} (ID {})", cacheId, invalidationId);
+ entity.invokeAsync(messageFactory.clientInvalidationAck(invalidationId), false);
+ } catch (Exception e) {
+ //TODO: what should be done here?
+ LOGGER.error("error acking client invalidation of all on cache {}", cacheId, e);
+ }
+ } else {
+ LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
+ }
+ }
+ });
+
+ addResponseListenersToEntity();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addResponseListenersToEntity() {
+ for (Map.Entry, EhcacheClientEntity.ResponseListener extends EhcacheEntityResponse>> classResponseListenerEntry :
+ this.responseListeners.entrySet()) {
+ this.entity.addResponseListener(classResponseListenerEntry.getKey(), (EhcacheClientEntity.ResponseListener)classResponseListenerEntry.getValue());
+ }
+ }
+
+ @Override
+ public String getCacheId() {
+ return messageFactory.getCacheId();
+ }
+
+ @Override
+ public void addInvalidationListener(InvalidationListener listener) {
+ invalidationListeners.add(listener);
+ }
+
+ @Override
+ public boolean removeInvalidationListener(InvalidationListener listener) {
+ return invalidationListeners.remove(listener);
+ }
+
+ void addResponseListeners(Class listenerClass, EhcacheClientEntity.ResponseListener listener) {
+ this.responseListeners.put(listenerClass, listener);
+ this.entity.addResponseListener(listenerClass, listener);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void close() {
+ for (Map.Entry, EhcacheClientEntity.ResponseListener extends EhcacheEntityResponse>> classResponseListenerEntry :
+ this.responseListeners.entrySet()) {
+ this.entity.removeResponseListener(classResponseListenerEntry.getKey(), (EhcacheClientEntity.ResponseListener) classResponseListenerEntry.getValue());
+ }
+ }
+
+ @Override
+ public Chain get(long key) throws TimeoutException {
+ EhcacheEntityResponse response;
+ try {
+ response = entity.invoke(messageFactory.getOperation(key), false);
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ServerStoreProxyException(e);
+ }
+ if (response != null && response.getResponseType() == EhcacheResponseType.GET_RESPONSE) {
+ return ((EhcacheEntityResponse.GetResponse)response).getChain();
+ } else {
+ throw new ServerStoreProxyException("Response for get operation was invalid : " +
+ (response != null ? response.getResponseType() : "null message"));
+ }
+ }
+
+ @Override
+ public void append(long key, ByteBuffer payLoad) throws TimeoutException {
+ try {
+ entity.invoke(messageFactory.appendOperation(key, payLoad), true);
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ServerStoreProxyException(e);
+ }
+ }
+
+ @Override
+ public Chain getAndAppend(long key, ByteBuffer payLoad) throws TimeoutException {
+ EhcacheEntityResponse response;
+ try {
+ response = entity.invoke(messageFactory.getAndAppendOperation(key, payLoad), true);
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ServerStoreProxyException(e);
+ }
+ if (response != null && response.getResponseType() == EhcacheResponseType.GET_RESPONSE) {
+ return ((EhcacheEntityResponse.GetResponse)response).getChain();
+ } else {
+ throw new ServerStoreProxyException("Response for getAndAppend operation was invalid : " +
+ (response != null ? response.getResponseType() : "null message"));
+ }
+ }
+
+ @Override
+ public void replaceAtHead(long key, Chain expect, Chain update) {
+ // TODO: Optimize this method to just send sequences for expect Chain
+ try {
+ entity.invokeAsync(messageFactory.replaceAtHeadOperation(key, expect, update), true);
+ } catch (Exception e) {
+ throw new ServerStoreProxyException(e);
+ }
+ }
+
+ @Override
+ public void clear() throws TimeoutException {
+ try {
+ entity.invoke(messageFactory.clearOperation(), true);
+ } catch (TimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ServerStoreProxyException(e);
+ }
+ }
+}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/EventualServerStoreProxy.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/EventualServerStoreProxy.java
index 7858717bc8..6920e447f6 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/EventualServerStoreProxy.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/EventualServerStoreProxy.java
@@ -16,76 +16,18 @@
package org.ehcache.clustered.client.internal.store;
import org.ehcache.clustered.client.internal.EhcacheClientEntity;
-import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
import org.ehcache.clustered.common.internal.messages.ServerStoreMessageFactory;
import org.ehcache.clustered.common.internal.store.Chain;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeoutException;
-/**
- * @author Ludovic Orban
- */
public class EventualServerStoreProxy implements ServerStoreProxy {
- private static final Logger LOGGER = LoggerFactory.getLogger(EventualServerStoreProxy.class);
-
private final ServerStoreProxy delegate;
- private final List invalidationListeners = new CopyOnWriteArrayList();
public EventualServerStoreProxy(final ServerStoreMessageFactory messageFactory, final EhcacheClientEntity entity) {
- this.delegate = new NoInvalidationServerStoreProxy(messageFactory, entity);
- entity.addResponseListener(EhcacheEntityResponse.ServerInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ServerInvalidateHash response) {
- if (response.getCacheId().equals(messageFactory.getCacheId())) {
- long key = response.getKey();
- LOGGER.debug("CLIENT: on cache {}, server requesting hash {} to be invalidated", messageFactory.getCacheId(), key);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateHash(key);
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
- entity.addResponseListener(EhcacheEntityResponse.ClientInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ClientInvalidateHash response) {
- final String cacheId = response.getCacheId();
- final long key = response.getKey();
- final int invalidationId = response.getInvalidationId();
-
- if (cacheId.equals(messageFactory.getCacheId())) {
- LOGGER.debug("CLIENT: doing work to invalidate hash {} from cache {} (ID {})", key, cacheId, invalidationId);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateHash(key);
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
- entity.addResponseListener(EhcacheEntityResponse.ClientInvalidateAll.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ClientInvalidateAll response) {
- final String cacheId = response.getCacheId();
- final int invalidationId = response.getInvalidationId();
-
- if (cacheId.equals(messageFactory.getCacheId())) {
- LOGGER.debug("CLIENT: doing work to invalidate all from cache {} (ID {})", cacheId, invalidationId);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateAll();
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
+ this.delegate = new CommonServerStoreProxy(messageFactory, entity);
}
@Override
@@ -95,12 +37,17 @@ public String getCacheId() {
@Override
public void addInvalidationListener(InvalidationListener listener) {
- invalidationListeners.add(listener);
+ delegate.addInvalidationListener(listener);
}
@Override
public boolean removeInvalidationListener(InvalidationListener listener) {
- return invalidationListeners.remove(listener);
+ return delegate.removeInvalidationListener(listener);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
}
@Override
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/NoInvalidationServerStoreProxy.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/NoInvalidationServerStoreProxy.java
deleted file mode 100644
index b2a6c4c1c0..0000000000
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/NoInvalidationServerStoreProxy.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright Terracotta, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.ehcache.clustered.client.internal.store;
-
-import org.ehcache.clustered.client.internal.EhcacheClientEntity;
-import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
-import org.ehcache.clustered.common.internal.messages.ServerStoreMessageFactory;
-import org.ehcache.clustered.common.internal.store.Chain;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Provides client-side access to the services of a {@code ServerStore}.
- */
-class NoInvalidationServerStoreProxy implements ServerStoreProxy {
-
- private final ServerStoreMessageFactory messageFactory;
- private final EhcacheClientEntity entity;
-
- NoInvalidationServerStoreProxy(ServerStoreMessageFactory messageFactory, final EhcacheClientEntity entity) {
- this.messageFactory = messageFactory;
- this.entity = entity;
- }
-
- @Override
- public String getCacheId() {
- return messageFactory.getCacheId();
- }
-
- @Override
- public void addInvalidationListener(InvalidationListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean removeInvalidationListener(InvalidationListener listener) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Chain get(long key) throws TimeoutException {
- EhcacheEntityResponse response;
- try {
- response = entity.invoke(messageFactory.getOperation(key), false);
- } catch (TimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new ServerStoreProxyException(e);
- }
- if (response != null && response.getType() == EhcacheEntityResponse.Type.GET_RESPONSE) {
- return ((EhcacheEntityResponse.GetResponse)response).getChain();
- } else {
- throw new ServerStoreProxyException("Response for get operation was invalid : " +
- (response != null ? response.getType().toString() : "null message"));
- }
- }
-
- @Override
- public void append(long key, ByteBuffer payLoad) throws TimeoutException {
- try {
- entity.invoke(messageFactory.appendOperation(key, payLoad), true);
- } catch (TimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new ServerStoreProxyException(e);
- }
- }
-
- @Override
- public Chain getAndAppend(long key, ByteBuffer payLoad) throws TimeoutException {
- EhcacheEntityResponse response;
- try {
- response = entity.invoke(messageFactory.getAndAppendOperation(key, payLoad), true);
- } catch (TimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new ServerStoreProxyException(e);
- }
- if (response != null && response.getType() == EhcacheEntityResponse.Type.GET_RESPONSE) {
- return ((EhcacheEntityResponse.GetResponse)response).getChain();
- } else {
- throw new ServerStoreProxyException("Response for getAndAppend operation was invalid : " +
- (response != null ? response.getType().toString() : "null message"));
- }
- }
-
- @Override
- public void replaceAtHead(long key, Chain expect, Chain update) {
- // TODO: Optimize this method to just send sequences for expect Chain
- try {
- entity.invokeAsync(messageFactory.replaceAtHeadOperation(key, expect, update), true);
- } catch (Exception e) {
- throw new ServerStoreProxyException(e);
- }
- }
-
- @Override
- public void clear() throws TimeoutException {
- try {
- entity.invoke(messageFactory.clearOperation(), true);
- } catch (TimeoutException e) {
- throw e;
- } catch (Exception e) {
- throw new ServerStoreProxyException(e);
- }
- }
-}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ServerStoreProxy.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ServerStoreProxy.java
index dadb81a480..c8b93f178e 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ServerStoreProxy.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/ServerStoreProxy.java
@@ -60,4 +60,9 @@ interface InvalidationListener {
*/
boolean removeInvalidationListener(InvalidationListener listener);
+ /**
+ * Closes this proxy.
+ */
+ void close();
+
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxy.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxy.java
index 2f9f34182a..f9bd1ff898 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxy.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/StrongServerStoreProxy.java
@@ -17,41 +17,51 @@
import org.ehcache.clustered.client.internal.EhcacheClientEntity;
import org.ehcache.clustered.common.internal.messages.EhcacheEntityResponse;
+import org.ehcache.clustered.common.internal.messages.ReconnectMessage;
import org.ehcache.clustered.common.internal.messages.ServerStoreMessageFactory;
import org.ehcache.clustered.common.internal.store.Chain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-/**
- * @author Ludovic Orban
- */
public class StrongServerStoreProxy implements ServerStoreProxy {
private static final Logger LOGGER = LoggerFactory.getLogger(StrongServerStoreProxy.class);
- private final ServerStoreProxy delegate;
+ private final CommonServerStoreProxy delegate;
private final ConcurrentMap hashInvalidationsInProgress = new ConcurrentHashMap();
private final Lock invalidateAllLock = new ReentrantLock();
- private CountDownLatch invalidateAllLatch;
- private final List invalidationListeners = new CopyOnWriteArrayList();
+ private volatile CountDownLatch invalidateAllLatch;
private final EhcacheClientEntity entity;
+ private final EhcacheClientEntity.ReconnectListener reconnectListener;
+ private final EhcacheClientEntity.DisconnectionListener disconnectionListener;
public StrongServerStoreProxy(final ServerStoreMessageFactory messageFactory, final EhcacheClientEntity entity) {
- this.delegate = new NoInvalidationServerStoreProxy(messageFactory, entity);
+ this.delegate = new CommonServerStoreProxy(messageFactory, entity);
this.entity = entity;
- entity.addResponseListener(EhcacheEntityResponse.HashInvalidationDone.class, new EhcacheClientEntity.ResponseListener() {
+ this.reconnectListener = new EhcacheClientEntity.ReconnectListener() {
+ @Override
+ public void onHandleReconnect(ReconnectMessage reconnectMessage) {
+ Set inflightInvalidations = hashInvalidationsInProgress.keySet();
+ reconnectMessage.addInvalidationsInProgress(delegate.getCacheId(), inflightInvalidations);
+ if (invalidateAllLatch != null) {
+ reconnectMessage.addClearInProgress(delegate.getCacheId());
+ }
+ }
+ };
+ entity.addReconnectListener(reconnectListener);
+
+ delegate.addResponseListeners(EhcacheEntityResponse.HashInvalidationDone.class, new EhcacheClientEntity.ResponseListener() {
@Override
public void onResponse(EhcacheEntityResponse.HashInvalidationDone response) {
if (response.getCacheId().equals(messageFactory.getCacheId())) {
@@ -66,7 +76,7 @@ public void onResponse(EhcacheEntityResponse.HashInvalidationDone response) {
}
}
});
- entity.addResponseListener(EhcacheEntityResponse.AllInvalidationDone.class, new EhcacheClientEntity.ResponseListener() {
+ delegate.addResponseListeners(EhcacheEntityResponse.AllInvalidationDone.class, new EhcacheClientEntity.ResponseListener() {
@Override
public void onResponse(EhcacheEntityResponse.AllInvalidationDone response) {
if (response.getCacheId().equals(messageFactory.getCacheId())) {
@@ -90,70 +100,8 @@ public void onResponse(EhcacheEntityResponse.AllInvalidationDone response) {
}
}
});
- entity.addResponseListener(EhcacheEntityResponse.ServerInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ServerInvalidateHash response) {
- if (response.getCacheId().equals(messageFactory.getCacheId())) {
- long key = response.getKey();
- LOGGER.debug("CLIENT: on cache {}, server requesting hash {} to be invalidated", messageFactory.getCacheId(), key);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateHash(key);
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
- entity.addResponseListener(EhcacheEntityResponse.ClientInvalidateHash.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ClientInvalidateHash response) {
- final String cacheId = response.getCacheId();
- final long key = response.getKey();
- final int invalidationId = response.getInvalidationId();
- if (cacheId.equals(messageFactory.getCacheId())) {
- LOGGER.debug("CLIENT: doing work to invalidate hash {} from cache {} (ID {})", key, cacheId, invalidationId);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateHash(key);
- }
-
- try {
- LOGGER.debug("CLIENT: ack'ing invalidation of hash {} from cache {} (ID {})", key, cacheId, invalidationId);
- entity.invokeAsync(messageFactory.clientInvalidationAck(invalidationId), true);
- } catch (Exception e) {
- //TODO: what should be done here?
- LOGGER.error("error acking client invalidation of hash {} on cache {}", key, cacheId, e);
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
- entity.addResponseListener(EhcacheEntityResponse.ClientInvalidateAll.class, new EhcacheClientEntity.ResponseListener() {
- @Override
- public void onResponse(EhcacheEntityResponse.ClientInvalidateAll response) {
- final String cacheId = response.getCacheId();
- final int invalidationId = response.getInvalidationId();
-
- if (cacheId.equals(messageFactory.getCacheId())) {
- LOGGER.debug("CLIENT: doing work to invalidate all from cache {} (ID {})", cacheId, invalidationId);
- for (InvalidationListener listener : invalidationListeners) {
- listener.onInvalidateAll();
- }
-
- try {
- LOGGER.debug("CLIENT: ack'ing invalidation of all from cache {} (ID {})", cacheId, invalidationId);
- entity.invokeAsync(messageFactory.clientInvalidationAck(invalidationId), true);
- } catch (Exception e) {
- //TODO: what should be done here?
- LOGGER.error("error acking client invalidation of all on cache {}", cacheId, e);
- }
- } else {
- LOGGER.debug("CLIENT: on cache {}, ignoring invalidation on unrelated cache : {}", messageFactory.getCacheId(), response.getCacheId());
- }
- }
- });
- entity.addDisconnectionListener(new EhcacheClientEntity.DisconnectionListener() {
+ this.disconnectionListener = new EhcacheClientEntity.DisconnectionListener() {
@Override
public void onDisconnection() {
for (Map.Entry entry : hashInvalidationsInProgress.entrySet()) {
@@ -170,7 +118,8 @@ public void onDisconnection() {
invalidateAllLock.unlock();
}
}
- });
+ };
+ entity.addDisconnectionListener(disconnectionListener);
}
private T performWaitingForHashInvalidation(long key, NullaryFunction c) throws InterruptedException, TimeoutException {
@@ -188,6 +137,7 @@ private T performWaitingForHashInvalidation(long key, NullaryFunction c)
try {
T result = c.apply();
+ LOGGER.debug("CLIENT: Waiting for invalidations on key {}", key);
awaitOnLatch(latch);
LOGGER.debug("CLIENT: key {} invalidated on all clients, unblocking call", key);
return result;
@@ -266,12 +216,19 @@ public String getCacheId() {
@Override
public void addInvalidationListener(InvalidationListener listener) {
- invalidationListeners.add(listener);
+ delegate.addInvalidationListener(listener);
}
@Override
public boolean removeInvalidationListener(InvalidationListener listener) {
- return invalidationListeners.remove(listener);
+ return delegate.removeInvalidationListener(listener);
+ }
+
+ @Override
+ public void close() {
+ this.entity.removeDisconnectionListener(this.disconnectionListener);
+ this.entity.removeReconnectListener(this.reconnectListener);
+ delegate.close();
}
@Override
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/BaseKeyValueOperation.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/BaseKeyValueOperation.java
index 3059e109af..b2ecae09eb 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/BaseKeyValueOperation.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/BaseKeyValueOperation.java
@@ -46,10 +46,11 @@ abstract class BaseKeyValueOperation implements Operation {
}
this.timeStamp = buffer.getLong();
int keySize = buffer.getInt();
+ int maxLimit = buffer.limit();
buffer.limit(buffer.position() + keySize);
ByteBuffer keyBlob = buffer.slice();
buffer.position(buffer.limit());
- buffer.limit(buffer.capacity());
+ buffer.limit(maxLimit);
try {
this.key = keySerializer.read(keyBlob);
} catch (ClassNotFoundException e) {
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ChainResolver.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ChainResolver.java
index c705f9b4ff..66a8966609 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ChainResolver.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ChainResolver.java
@@ -122,7 +122,7 @@ public V value() {
}
}
} else {
- payload.flip();
+ payload.rewind();
chainBuilder = chainBuilder.add(payload);
}
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ConditionalReplaceOperation.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ConditionalReplaceOperation.java
index afa68fb3e7..70e1f0532f 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ConditionalReplaceOperation.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/ConditionalReplaceOperation.java
@@ -53,16 +53,17 @@ public ConditionalReplaceOperation(final K key, final V oldValue, final V newVal
}
this.timeStamp = buffer.getLong();
int keySize = buffer.getInt();
+ int maxLimit = buffer.limit();
buffer.limit(buffer.position() + keySize);
ByteBuffer keyBlob = buffer.slice();
buffer.position(buffer.limit());
- buffer.limit(buffer.capacity());
+ buffer.limit(maxLimit);
int oldValueSize = buffer.getInt();
buffer.limit(buffer.position() + oldValueSize);
ByteBuffer oldValueBlob = buffer.slice();
buffer.position(buffer.limit());
- buffer.limit(buffer.capacity());
+ buffer.limit(maxLimit);
ByteBuffer valueBlob = buffer.slice();
@@ -146,7 +147,8 @@ public boolean equals(final Object obj) {
return false;
}
- ConditionalReplaceOperation other = (ConditionalReplaceOperation)obj;
+ @SuppressWarnings("unchecked")
+ ConditionalReplaceOperation other = (ConditionalReplaceOperation) obj;
if(this.getOpCode() != other.getOpCode()) {
return false;
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/RemoveOperation.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/RemoveOperation.java
index 5e0de52354..63385b7829 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/RemoveOperation.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/RemoveOperation.java
@@ -96,7 +96,8 @@ public boolean equals(final Object obj) {
return false;
}
- RemoveOperation other = (RemoveOperation)obj;
+ @SuppressWarnings("unchecked")
+ RemoveOperation other = (RemoveOperation) obj;
if(this.getOpCode() != other.getOpCode()) {
return false;
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/codecs/OperationsCodec.java b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/codecs/OperationsCodec.java
index c6868312b6..16bbf347f9 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/codecs/OperationsCodec.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/internal/store/operations/codecs/OperationsCodec.java
@@ -42,4 +42,12 @@ public Operation decode(ByteBuffer buffer) {
buffer.rewind();
return opCode.decode(buffer, keySerializer, valueSerializer);
}
+
+ public Serializer getKeySerializer() {
+ return keySerializer;
+ }
+
+ public Serializer getValueSerializer() {
+ return valueSerializer;
+ }
}
diff --git a/clustered/client/src/main/java/org/ehcache/clustered/client/service/ClusteringService.java b/clustered/client/src/main/java/org/ehcache/clustered/client/service/ClusteringService.java
index bb89c24456..b0cc5ed2f6 100644
--- a/clustered/client/src/main/java/org/ehcache/clustered/client/service/ClusteringService.java
+++ b/clustered/client/src/main/java/org/ehcache/clustered/client/service/ClusteringService.java
@@ -31,6 +31,11 @@ public interface ClusteringService extends PersistableResourceService {
ClusteringServiceConfiguration getConfiguration();
+ /**
+ * @return true if a connection to a cluster exists
+ */
+ boolean isConnected();
+
/**
* Gets a {@link ServerStoreProxy} though which a server-resident {@code ServerStore} is accessed.
*
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/ClusteredResourcePoolUpdationTest.java b/clustered/client/src/test/java/org/ehcache/clustered/ClusteredResourcePoolUpdationTest.java
new file mode 100644
index 0000000000..6a80761325
--- /dev/null
+++ b/clustered/client/src/test/java/org/ehcache/clustered/ClusteredResourcePoolUpdationTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered;
+
+import org.ehcache.Cache;
+import org.ehcache.PersistentCacheManager;
+import org.ehcache.clustered.client.config.builders.ClusteredResourcePoolBuilder;
+import org.ehcache.clustered.client.config.builders.ClusteringServiceConfigurationBuilder;
+import org.ehcache.clustered.client.internal.UnitTestConnectionService;
+import org.ehcache.config.builders.CacheConfigurationBuilder;
+import org.ehcache.config.builders.CacheManagerBuilder;
+import org.ehcache.config.builders.ResourcePoolsBuilder;
+import org.ehcache.config.units.MemoryUnit;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.net.URI;
+
+public class ClusteredResourcePoolUpdationTest {
+
+ private static final URI CLUSTER_URI = URI.create("terracotta://example.com:9540/my-application");
+
+ private static PersistentCacheManager cacheManager;
+ private static Cache dedicatedCache;
+ private static Cache sharedCache;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UnitTestConnectionService.add(CLUSTER_URI,
+ new UnitTestConnectionService.PassthroughServerBuilder()
+ .resource("primary-server-resource", 8, MemoryUnit.MB)
+ .resource("secondary-server-resource", 8, MemoryUnit.MB)
+ .build());
+
+ cacheManager = CacheManagerBuilder.newCacheManagerBuilder()
+ .with(ClusteringServiceConfigurationBuilder.cluster(CLUSTER_URI).autoCreate()
+ .defaultServerResource("primary-server-resource")
+ .resourcePool("resource-pool-a", 2, MemoryUnit.MB, "secondary-server-resource")
+ .resourcePool("resource-pool-b", 4, MemoryUnit.MB))
+ .withCache("dedicated-cache", CacheConfigurationBuilder.newCacheConfigurationBuilder(Long.class, String.class,
+ ResourcePoolsBuilder.newResourcePoolsBuilder()
+ .with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 4, MemoryUnit.MB))))
+ .withCache("shared-cache", CacheConfigurationBuilder.newCacheConfigurationBuilder(Long.class, String.class,
+ ResourcePoolsBuilder.newResourcePoolsBuilder()
+ .with(ClusteredResourcePoolBuilder.clusteredShared("resource-pool-a"))))
+ .build();
+ cacheManager.init();
+
+ dedicatedCache = cacheManager.getCache("dedicated-cache", Long.class, String.class);
+ sharedCache = cacheManager.getCache("shared-cache", Long.class, String.class);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cacheManager.close();
+ UnitTestConnectionService.remove(CLUSTER_URI);
+ }
+
+ @Test
+ public void testClusteredDedicatedResourcePoolUpdation() throws Exception {
+ expectedException.expect(UnsupportedOperationException.class);
+ expectedException.expectMessage("Updating CLUSTERED resource is not supported");
+ dedicatedCache.getRuntimeConfiguration().updateResourcePools(
+ ResourcePoolsBuilder.newResourcePoolsBuilder()
+ .with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 32, MemoryUnit.MB))
+ .build()
+ );
+ }
+
+ @Test
+ public void testClusteredSharedResourcePoolUpdation() throws Exception {
+ expectedException.expect(UnsupportedOperationException.class);
+ expectedException.expectMessage("Updating CLUSTERED resource is not supported");
+ sharedCache.getRuntimeConfiguration().updateResourcePools(
+ ResourcePoolsBuilder.newResourcePoolsBuilder()
+ .with(ClusteredResourcePoolBuilder.clusteredShared("resource-pool-a"))
+ .build()
+ );
+ }
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredCacheDestroyTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredCacheDestroyTest.java
index 8e6b78909f..c191427c81 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredCacheDestroyTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredCacheDestroyTest.java
@@ -46,7 +46,6 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -186,7 +185,17 @@ public void testDestroyCacheWithCacheManagerStopped() throws CachePersistenceExc
}
@Test
- public void testDestroyCacheWithCacheManagerStopped_whenUsedExclusively() throws CachePersistenceException {
+ public void testDestroyCacheWithTwoCacheManagerOnSameCache_forbiddenWhenInUse() throws CachePersistenceException {
+ PersistentCacheManager persistentCacheManager1 = clusteredCacheManagerBuilder.build(true);
+ PersistentCacheManager persistentCacheManager2 = clusteredCacheManagerBuilder.build(true);
+
+ expectedException.expect(CachePersistenceException.class);
+ expectedException.expectMessage("Cannot destroy clustered tier 'clustered-cache': in use by 1 other client(s) (on terracotta://example.com:9540)");
+ persistentCacheManager1.destroyCache(CLUSTERED_CACHE);
+ }
+
+ @Test
+ public void testDestroyCacheWithTwoCacheManagerOnSameCache_firstRemovesSecondDestroy() throws CachePersistenceException {
PersistentCacheManager persistentCacheManager1 = clusteredCacheManagerBuilder.build(true);
PersistentCacheManager persistentCacheManager2 = clusteredCacheManagerBuilder.build(true);
@@ -196,12 +205,12 @@ public void testDestroyCacheWithCacheManagerStopped_whenUsedExclusively() throws
}
@Test
- public void testDestroyCacheWithCacheManagerStopped_forbiddenWhenInUse() throws CachePersistenceException {
- PersistentCacheManager persistentCacheManager1 = clusteredCacheManagerBuilder.build(true);
+ public void testDestroyCacheWithTwoCacheManagerOnSameCache_secondDoesntHaveTheCacheButPreventExclusiveAccessToCluster() throws CachePersistenceException {
+ PersistentCacheManager persistentCacheManager1 = clusteredCacheManagerBuilder.build(false);
PersistentCacheManager persistentCacheManager2 = clusteredCacheManagerBuilder.build(true);
- expectedException.expect(CachePersistenceException.class);
- expectedException.expectMessage("Cannot destroy clustered tier 'clustered-cache': in use by 1 other client(s) (on terracotta://example.com:9540)");
+ persistentCacheManager2.removeCache(CLUSTERED_CACHE);
+
persistentCacheManager1.destroyCache(CLUSTERED_CACHE);
}
}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredConcurrencyTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredConcurrencyTest.java
new file mode 100644
index 0000000000..f9625c9c9f
--- /dev/null
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/ClusteredConcurrencyTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered.client;
+
+import org.ehcache.PersistentCacheManager;
+import org.ehcache.clustered.client.config.ClusteredStoreConfiguration;
+import org.ehcache.clustered.client.config.builders.ClusteredResourcePoolBuilder;
+import org.ehcache.clustered.client.config.builders.ClusteringServiceConfigurationBuilder;
+import org.ehcache.clustered.client.internal.UnitTestConnectionService;
+import org.ehcache.clustered.common.Consistency;
+import org.ehcache.config.builders.CacheConfigurationBuilder;
+import org.ehcache.config.builders.CacheManagerBuilder;
+import org.ehcache.config.builders.ResourcePoolsBuilder;
+import org.ehcache.config.units.MemoryUnit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * This test makes sure a clustered cache can be opened from many client instances. As usual with concurrency tests, a
+ * success doesn't mean it will work forever and a failure might not occur reliably. However, it puts together all
+ * conditions to make it fail in case of race condition
+ *
+ * @author Henri Tremblay
+ */
+public class ClusteredConcurrencyTest {
+
+ private static final URI CLUSTER_URI = URI.create("terracotta://example.com:9540/my-application");
+ private static final String CACHE_NAME = "clustered-cache";
+
+ private AtomicReference exception = new AtomicReference();
+
+ @Before
+ public void definePassthroughServer() throws Exception {
+ UnitTestConnectionService.add(CLUSTER_URI,
+ new UnitTestConnectionService.PassthroughServerBuilder()
+ .resource("primary-server-resource", 64, MemoryUnit.MB)
+ .resource("secondary-server-resource", 64, MemoryUnit.MB)
+ .build());
+ }
+
+ @After
+ public void removePassthroughServer() throws Exception {
+ UnitTestConnectionService.remove(CLUSTER_URI);
+ }
+
+ @Test
+ public void test() throws Throwable {
+ final int THREAD_NUM = 50;
+
+ final CountDownLatch latch = new CountDownLatch(THREAD_NUM + 1);
+
+ List threads = new ArrayList(THREAD_NUM);
+ for (int i = 0; i < THREAD_NUM; i++) {
+ Thread t1 = new Thread(content(latch));
+ t1.start();
+ threads.add(t1);
+ }
+
+ latch.countDown();
+ latch.await();
+
+ for(Thread t : threads) {
+ t.join();
+ }
+
+ Throwable throwable = exception.get();
+ if(throwable != null) {
+ throw throwable;
+ }
+ }
+
+ private Runnable content(final CountDownLatch latch) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ CacheManagerBuilder clusteredCacheManagerBuilder = CacheManagerBuilder.newCacheManagerBuilder()
+ .with(ClusteringServiceConfigurationBuilder.cluster(CLUSTER_URI).autoCreate()
+ .defaultServerResource("primary-server-resource")
+ .resourcePool("resource-pool-a", 32, MemoryUnit.MB)
+ .resourcePool("resource-pool-b", 32, MemoryUnit.MB, "secondary-server-resource"))
+ .withCache(CACHE_NAME, CacheConfigurationBuilder.newCacheConfigurationBuilder(Long.class, String.class,
+ ResourcePoolsBuilder.newResourcePoolsBuilder()
+ .with(ClusteredResourcePoolBuilder.clusteredDedicated("primary-server-resource", 32, MemoryUnit.MB)))
+ .add(new ClusteredStoreConfiguration(Consistency.STRONG)));
+
+ latch.countDown();
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ // continue
+ }
+
+ clusteredCacheManagerBuilder.build(true);
+ } catch (Throwable t) {
+ exception.compareAndSet(null, t); // only keep the first exception
+ }
+ }
+ };
+ }
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/UnSupportedCombinationsWIthClusteredCacheTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/UnSupportedCombinationsWIthClusteredCacheTest.java
index 2af0633f05..4a0f36fcfe 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/UnSupportedCombinationsWIthClusteredCacheTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/UnSupportedCombinationsWIthClusteredCacheTest.java
@@ -184,7 +184,7 @@ public void deleteAll(Iterable extends Long> keys) throws BulkCacheWritingExce
private static class TestEventListener implements CacheEventListener {
@Override
- public void onEvent(CacheEvent event) {
+ public void onEvent(CacheEvent extends Long, ? extends String> event) {
}
}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/config/ClusteringServiceConfigurationTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/config/ClusteringServiceConfigurationTest.java
index 858e85f2ae..d6cfee67a2 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/config/ClusteringServiceConfigurationTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/config/ClusteringServiceConfigurationTest.java
@@ -56,7 +56,7 @@ public void testGetReadOperationTimeout() throws Exception {
@Test
public void testDefaultReadOperationTimeout() throws Exception {
- assertThat(new ClusteringServiceConfiguration(DEFAULT_URI).getReadOperationTimeout(), is(TimeoutDuration.of(5, TimeUnit.SECONDS)));
+ assertThat(new ClusteringServiceConfiguration(DEFAULT_URI).getReadOperationTimeout(), is(TimeoutDuration.of(20, TimeUnit.SECONDS)));
}
@Test(expected = NullPointerException.class)
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/config/TimeoutDurationTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/config/TimeoutDurationTest.java
index 042478aa02..dfd7b37bb0 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/config/TimeoutDurationTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/config/TimeoutDurationTest.java
@@ -41,6 +41,7 @@ public class TimeoutDurationTest {
@Test
public void testEquals() throws Exception {
+ @SuppressWarnings("unchecked")
List> equalPairs = Arrays.asList(
Pair.of(TimeoutDuration.of(1, NANOSECONDS), TimeoutDuration.of(1, NANOSECONDS)),
Pair.of(TimeoutDuration.of(1, MICROSECONDS), TimeoutDuration.of(1000, NANOSECONDS)),
@@ -70,7 +71,7 @@ public void testEquals() throws Exception {
Pair.of(TimeoutDuration.of(1, DAYS), TimeoutDuration.of(24L, HOURS)),
- Pair.of(TimeoutDuration.of(7, NANOSECONDS), TimeoutDuration.of(1 * 7, NANOSECONDS)),
+ Pair.of(TimeoutDuration.of(7, NANOSECONDS), TimeoutDuration.of(7, NANOSECONDS)),
Pair.of(TimeoutDuration.of(7, MICROSECONDS), TimeoutDuration.of(1000 * 7, NANOSECONDS)),
Pair.of(TimeoutDuration.of(7, MILLISECONDS), TimeoutDuration.of(1000000 * 7, NANOSECONDS)),
Pair.of(TimeoutDuration.of(7, SECONDS), TimeoutDuration.of(1000000000L * 7, NANOSECONDS)),
@@ -128,6 +129,7 @@ public void testEquals() throws Exception {
assertThat(pair.getFirst().hashCode(), is(equalTo(pair.getSecond().hashCode())));
}
+ @SuppressWarnings("unchecked")
List> unEqualPairs = Arrays.asList(
Pair.of(TimeoutDuration.of(Long.MAX_VALUE, DAYS), TimeoutDuration.of(Long.MAX_VALUE, HOURS)),
Pair.of(TimeoutDuration.of(Long.MAX_VALUE, DAYS), TimeoutDuration.of(Long.MAX_VALUE, MINUTES)),
@@ -238,4 +240,4 @@ public T getSecond() {
return this.second;
}
}
-}
\ No newline at end of file
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactoryTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactoryTest.java
index 765816809d..fa853b800a 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactoryTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/EhcacheClientEntityFactoryTest.java
@@ -22,7 +22,10 @@
import org.ehcache.clustered.common.ServerSideConfiguration;
import org.ehcache.clustered.common.internal.lock.LockMessaging.HoldType;
import org.ehcache.clustered.client.internal.lock.VoltronReadWriteLockClient;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.terracotta.connection.Connection;
import static org.hamcrest.core.Is.is;
@@ -44,12 +47,21 @@
public class EhcacheClientEntityFactoryTest {
+ @Mock
+ private EntityRef entityRef;
+ @Mock
+ private EhcacheClientEntity entity;
+ @Mock
+ private Connection connection;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void testCreate() throws Exception {
- EhcacheClientEntity entity = mock(EhcacheClientEntity.class);
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenReturn(entity);
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -63,11 +75,8 @@ public void testCreate() throws Exception {
@Test
public void testCreateBadConfig() throws Exception {
- EhcacheClientEntity entity = mock(EhcacheClientEntity.class);
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenReturn(entity);
doThrow(ClusteredTierManagerConfigurationException.class).when(entity).configure(any(ServerSideConfiguration.class));
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -87,9 +96,7 @@ public void testCreateBadConfig() throws Exception {
@Test
public void testCreateWhenExisting() throws Exception {
- EntityRef entityRef = mock(EntityRef.class);
doThrow(EntityAlreadyExistsException.class).when(entityRef).create(any());
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -105,10 +112,7 @@ public void testCreateWhenExisting() throws Exception {
@Test
public void testRetrieve() throws Exception {
- EhcacheClientEntity entity = mock(EhcacheClientEntity.class);
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenReturn(entity);
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -121,11 +125,8 @@ public void testRetrieve() throws Exception {
@Test
public void testRetrieveFailedValidate() throws Exception {
- EhcacheClientEntity entity = mock(EhcacheClientEntity.class);
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenReturn(entity);
doThrow(IllegalArgumentException.class).when(entity).validate(any(ServerSideConfiguration.class));
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -142,11 +143,10 @@ public void testRetrieveFailedValidate() throws Exception {
}
@Test
+ @SuppressWarnings("unchecked")
public void testRetrieveWhenNotExisting() throws Exception {
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenThrow(EntityNotFoundException.class);
doThrow(EntityAlreadyExistsException.class).when(entityRef).create(any());
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -162,9 +162,7 @@ public void testRetrieveWhenNotExisting() throws Exception {
@Test
public void testDestroy() throws Exception {
- EntityRef entityRef = mock(EntityRef.class);
doReturn(Boolean.TRUE).when(entityRef).destroy();
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -176,9 +174,7 @@ public void testDestroy() throws Exception {
@Test
public void testDestroyWhenNotExisting() throws Exception {
- EntityRef entityRef = mock(EntityRef.class);
doThrow(EntityNotFoundException.class).when(entityRef).destroy();
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(eq(EhcacheClientEntity.class), anyInt(), anyString())).thenReturn(entityRef);
addMockUnlockedLock(connection, "VoltronReadWriteLock-EhcacheClientEntityFactory-AccessLock-test");
@@ -199,6 +195,7 @@ private static void addMockUnlockedLock(Connection connection, String lockname)
private static void addMockLock(Connection connection, String lockname, boolean result, Boolean ... results) throws Exception {
VoltronReadWriteLockClient lock = mock(VoltronReadWriteLockClient.class);
when(lock.tryLock(any(HoldType.class))).thenReturn(result, results);
+ @SuppressWarnings("unchecked")
EntityRef interlockRef = mock(EntityRef.class);
when(connection.getEntityRef(eq(VoltronReadWriteLockClient.class), anyInt(), eq(lockname))).thenReturn(interlockRef);
when(interlockRef.fetchEntity()).thenReturn(lock);
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/UnitTestConnectionService.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/UnitTestConnectionService.java
index 23cfd4b976..6c47a30e6c 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/UnitTestConnectionService.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/UnitTestConnectionService.java
@@ -32,6 +32,7 @@
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import org.ehcache.clustered.client.internal.lock.VoltronReadWriteLockEntityClientService;
@@ -44,20 +45,28 @@
import org.terracotta.connection.ConnectionException;
import org.terracotta.connection.ConnectionPropertyNames;
import org.terracotta.connection.ConnectionService;
+import org.terracotta.connection.entity.Entity;
+import org.terracotta.connection.entity.EntityRef;
import org.terracotta.entity.EntityClientService;
import org.terracotta.entity.EntityMessage;
import org.terracotta.entity.EntityResponse;
import org.terracotta.entity.EntityServerService;
import org.terracotta.entity.ServiceProvider;
import org.terracotta.entity.ServiceProviderConfiguration;
-import org.terracotta.offheapresource.OffHeapResourcesConfiguration;
+import org.terracotta.exception.EntityNotFoundException;
+import org.terracotta.exception.EntityNotProvidedException;
+import org.terracotta.exception.PermanentEntityException;
import org.terracotta.offheapresource.OffHeapResourcesProvider;
import org.terracotta.offheapresource.config.MemoryUnit;
import org.terracotta.offheapresource.config.OffheapResourcesType;
import org.terracotta.offheapresource.config.ResourceType;
+import org.terracotta.passthrough.IAsynchronousServerCrasher;
+import org.terracotta.passthrough.PassthroughConnection;
import org.terracotta.passthrough.PassthroughServer;
import org.terracotta.passthrough.PassthroughServerRegistry;
+import static org.mockito.Mockito.mock;
+
/**
* A {@link ConnectionService} implementation used to simulate Voltron server connections for unit testing purposes.
@@ -137,6 +146,8 @@ public static void add(URI uri, PassthroughServer server) {
}
SERVERS.put(keyURI, new ServerDescriptor(server));
+ // TODO rework that better
+ server.registerAsynchronousServerCrasher(mock(IAsynchronousServerCrasher.class));
server.start(true, false);
LOGGER.info("Started PassthroughServer at {}", keyURI);
}
@@ -207,9 +218,6 @@ public static PassthroughServer remove(URI uri) {
URI keyURI = createKey(uri);
ServerDescriptor serverDescriptor = SERVERS.remove(keyURI);
if (serverDescriptor != null) {
- serverDescriptor.server.stop();
- LOGGER.info("Stopped PassthroughServer at {}", keyURI);
-
for (Connection connection : serverDescriptor.getConnections().keySet()) {
try {
LOGGER.warn("Force close {}", formatConnectionId(connection));
@@ -220,6 +228,31 @@ public static PassthroughServer remove(URI uri) {
// Ignored
}
}
+
+ //open destroy connection. You need to make sure connection doesn't have any entities associated with it.
+ PassthroughConnection connection = serverDescriptor.server.connectNewClient("destroy-connection");
+
+ for(Entry entry : serverDescriptor.knownEntities.entrySet()) {
+ @SuppressWarnings("unchecked")
+ Class extends Entity> type = (Class) entry.getKey();
+ List args = (List)entry.getValue();
+ Long version = (Long)args.get(0);
+ String stringArg = (String)args.get(1);
+
+ try {
+ EntityRef entityRef = connection.getEntityRef(type, version, stringArg);
+ entityRef.destroy();
+ } catch (EntityNotProvidedException ex) {
+ LOGGER.error("Entity destroy failed: ", ex);
+ } catch (EntityNotFoundException ex) {
+ LOGGER.error("Entity destroy failed: ", ex);
+ } catch (PermanentEntityException ex) {
+ LOGGER.error("Entity destroy failed (permanent???): ", ex);
+ }
+ }
+
+ serverDescriptor.server.stop();
+ LOGGER.info("Stopped PassthroughServer at {}", keyURI);
return serverDescriptor.server;
} else {
return null;
@@ -332,7 +365,7 @@ public PassthroughServer build() {
}
if (!this.resources.getResource().isEmpty()) {
- newServer.registerServiceProvider(new OffHeapResourcesProvider(), new OffHeapResourcesConfiguration(this.resources));
+ newServer.registerExtendedConfiguration(new OffHeapResourcesProvider(this.resources));
}
for (Map.Entry entry : serviceProviders.entrySet()) {
@@ -465,6 +498,7 @@ synchronized void removeConnections() {
private static final class ServerDescriptor {
private final PassthroughServer server;
private final Map connections = new IdentityHashMap();
+ private final Map, List> knownEntities = new HashMap, List>();
ServerDescriptor(PassthroughServer server) {
this.server = server;
@@ -481,6 +515,13 @@ synchronized void add(Connection connection, Properties properties) {
synchronized void remove(Connection connection) {
this.connections.remove(connection);
}
+
+ public void addKnownEntity(Class extends Entity> arg, Object arg1, Object arg2) {
+ List set = new ArrayList();
+ set.add(arg1);
+ set.add(arg2);
+ knownEntities.put(arg, set);
+ }
}
/**
@@ -498,11 +539,16 @@ private static final class ConnectionInvocationHandler implements InvocationHand
}
@Override
+ @SuppressWarnings("unchecked")
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("close")) {
serverDescriptor.remove(connection);
LOGGER.info("Client closed {}", formatConnectionId(connection));
}
+
+ if (method.getName().equals("getEntityRef")) {
+ serverDescriptor.addKnownEntity((Class extends Entity>) args[0], args[1] ,args[2]);
+ }
try {
return method.invoke(connection, args);
} catch (InvocationTargetException e) {
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImplTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImplTest.java
new file mode 100644
index 0000000000..12fd179431
--- /dev/null
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/ClusteredResourcePoolImplTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered.client.internal.config;
+
+import org.ehcache.config.ResourcePool;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class ClusteredResourcePoolImplTest {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void validateUpdate() throws Exception {
+ ClusteredResourcePoolImpl resourcePool = new ClusteredResourcePoolImpl();
+ resourcePool.validateUpdate(mock(ResourcePool.class));
+ }
+
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImplTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImplTest.java
new file mode 100644
index 0000000000..99daba303a
--- /dev/null
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/DedicatedClusteredResourcePoolImplTest.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered.client.internal.config;
+
+import org.ehcache.config.ResourcePool;
+import org.ehcache.config.units.MemoryUnit;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class DedicatedClusteredResourcePoolImplTest {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void validateUpdate() throws Exception {
+ DedicatedClusteredResourcePoolImpl resourcePool = new DedicatedClusteredResourcePoolImpl("foo", 3, MemoryUnit.MB);
+ resourcePool.validateUpdate(mock(ResourcePool.class));
+ }
+
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImplTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImplTest.java
new file mode 100644
index 0000000000..e4f0dea3d6
--- /dev/null
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/SharedClusteredResourcePoolImplTest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright Terracotta, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.ehcache.clustered.client.internal.config;
+
+import org.ehcache.config.ResourcePool;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+public class SharedClusteredResourcePoolImplTest {
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void validateUpdate() throws Exception {
+ SharedClusteredResourcePoolImpl resourcePool = new SharedClusteredResourcePoolImpl("foo");
+ resourcePool.validateUpdate(mock(ResourcePool.class));
+ }
+
+}
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/xml/ClusteringServiceConfigurationParserTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/xml/ClusteringServiceConfigurationParserTest.java
index 000df77bce..8b72a046ca 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/xml/ClusteringServiceConfigurationParserTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/config/xml/ClusteringServiceConfigurationParserTest.java
@@ -170,7 +170,7 @@ public void testGetTimeoutNone() throws Exception {
ServiceLocator.findSingletonAmongst(ClusteringServiceConfiguration.class, serviceCreationConfigurations);
assertThat(clusteringServiceConfiguration, is(notNullValue()));
- assertThat(clusteringServiceConfiguration.getReadOperationTimeout(), is(TimeoutDuration.of(5, TimeUnit.SECONDS)));
+ assertThat(clusteringServiceConfiguration.getReadOperationTimeout(), is(TimeoutDuration.of(20, TimeUnit.SECONDS)));
}
@Test
diff --git a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockTest.java b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockTest.java
index 72c174e981..762a5fc320 100644
--- a/clustered/client/src/test/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockTest.java
+++ b/clustered/client/src/test/java/org/ehcache/clustered/client/internal/lock/VoltronReadWriteLockTest.java
@@ -16,7 +16,11 @@
package org.ehcache.clustered.client.internal.lock;
+import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
import org.terracotta.connection.Connection;
import org.terracotta.connection.entity.EntityRef;
@@ -27,21 +31,30 @@
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.terracotta.exception.EntityAlreadyExistsException;
public class VoltronReadWriteLockTest {
+ @Mock
+ private VoltronReadWriteLockClient client;
+
+ @Mock
+ private EntityRef entityRef;
+
+ @Mock
+ private Connection connection;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
@Test
public void testCreateLockEntityWhenNotExisting() throws Exception {
- VoltronReadWriteLockClient client = mock(VoltronReadWriteLockClient.class);
-
- EntityRef entityRef = mock(EntityRef.class);
when(entityRef.fetchEntity()).thenReturn(client);
- Connection connection = mock(Connection.class);
when(connection.getEntityRef(VoltronReadWriteLockClient.class, 1, "VoltronReadWriteLock-TestLock")).thenReturn(entityRef);
VoltronReadWriteLock lock = new VoltronReadWriteLock(connection, "TestLock");
@@ -52,13 +65,9 @@ public void testCreateLockEntityWhenNotExisting() throws Exception {
@Test
public void testFetchExistingLockEntityWhenExists() throws Exception {
- VoltronReadWriteLockClient client = mock(VoltronReadWriteLockClient.class);
-
- EntityRef