Skip to content

Commit

Permalink
IGNITE-3653 Add concurrent operations test.
Browse files Browse the repository at this point in the history
  • Loading branch information
dmekhanikov committed May 31, 2019
1 parent 81390e0 commit 930452b
Showing 1 changed file with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
Expand All @@ -38,6 +39,7 @@
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

Expand Down Expand Up @@ -212,6 +214,7 @@ public void testMvccTx() throws Exception {

testContinuousQuery(ccfg, false);
}

/**
* @throws Exception If failed.
*/
Expand Down Expand Up @@ -251,6 +254,62 @@ public void testMvccTxReplicatedClient() throws Exception {
testContinuousQuery(ccfg, true);
}

/**
* @throws Exception If failed.
*/
@Test
public void testMultithreadedUpdatesNodeJoin() throws Exception {
Ignite client = startGrid("client");

CacheConfiguration<Object, Object> cacheCfg = cacheConfiguration(PARTITIONED,
0,
ATOMIC
);
IgniteCache<Object, Object> cache = client.createCache(cacheCfg);

int iterations = 50;
int keysNum = 100;
int threadsNum = Runtime.getRuntime().availableProcessors();

CountDownLatch updatesLatch = new CountDownLatch(iterations * keysNum * threadsNum / 2);

ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();

final Class<Factory<CacheEntryEventFilter>> evtFilterFactoryCls =
(Class<Factory<CacheEntryEventFilter>>)getExternalClassLoader().
loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
qry.setRemoteFilterFactory(
(Factory<? extends CacheEntryEventFilter<Object, Object>>)(Object)evtFilterFactoryCls.newInstance());

qry.setLocalListener((evts) -> {
for (CacheEntryEvent<?, ?> ignored : evts)
updatesLatch.countDown();
});

cache.query(qry);

for (int t = 0; t < threadsNum; t++) {
int threadId = t;

GridTestUtils.runAsync(() -> {
for (int i = 0; i < iterations; i++) {
log.info("Iteration #" + (i + 1));

for (int k = 0; k < keysNum; k++) {
int key = keysNum * threadId + k;

cache.put(key, key);
}
}
}, "cache-writer-thread-" + threadId);
}

startGrid(NODES);

assertTrue("Failed to wait for all cache updates invocations. Latch: " + updatesLatch,
updatesLatch.await(30, TimeUnit.SECONDS));
}

/**
* @param ccfg Cache configuration.
* @param isClient Client.
Expand Down

0 comments on commit 930452b

Please sign in to comment.