Skip to content

Commit

Permalink
GG-20879 .NET: specify expiry policy when creating cache using thin c…
Browse files Browse the repository at this point in the history
…lient

Added ability to create dynamic caches for thin clients with expiry policies. 

- Added expiry policy property  to thin client cache configuration
- Added withExpiryPolicy method to the client cache interface
  • Loading branch information
ashapkin authored Nov 7, 2019
1 parent c12cda4 commit b2b12eb
Show file tree
Hide file tree
Showing 17 changed files with 353 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ public class ClientConnectionContext extends ClientListenerAbstractConnectionCon
/** Version 1.4.0. Added: Affinity Awareness, IEP-23. */
public static final ClientListenerProtocolVersion VER_1_4_0 = ClientListenerProtocolVersion.create(1, 4, 0);

/** Version 1.5.0. Added: Expiration Policy configuration. */
public static final ClientListenerProtocolVersion VER_1_5_0 = ClientListenerProtocolVersion.create(1, 5, 0);

/** Default version. */
public static final ClientListenerProtocolVersion DEFAULT_VER = VER_1_4_0;
public static final ClientListenerProtocolVersion DEFAULT_VER = VER_1_5_0;

/** Supported versions. */
private static final Collection<ClientListenerProtocolVersion> SUPPORTED_VERS = Arrays.asList(
VER_1_5_0,
VER_1_4_0,
VER_1_3_0,
VER_1_2_0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils;

import static org.apache.ignite.internal.processors.platform.client.ClientConnectionContext.VER_1_5_0;
import static org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils.readQueryEntity;
import static org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils.writeEnumInt;
import static org.apache.ignite.internal.processors.platform.utils.PlatformConfigurationUtils.writeQueryEntity;
Expand Down Expand Up @@ -129,6 +131,9 @@ public class ClientCacheConfigurationSerializer {
/** */
private static final short STATISTICS_ENABLED = 406;

/** */
private static final short EXPIRY_POLICY = 407;


/**
* Writes the cache configuration.
Expand Down Expand Up @@ -196,6 +201,9 @@ static void write(BinaryRawWriterEx writer, CacheConfiguration cfg, ClientListen
} else
writer.writeInt(0);

if (ver.compareTo(VER_1_5_0) >= 0)
PlatformConfigurationUtils.writeExpiryPolicyFactory(writer, cfg.getExpiryPolicyFactory());

// Write length (so that part of the config can be skipped).
writer.writeInt(pos, writer.out().position() - pos - 4);
}
Expand Down Expand Up @@ -242,6 +250,10 @@ static CacheConfiguration read(BinaryRawReader reader, ClientListenerProtocolVer
cfg.setEagerTtl(reader.readBoolean());
break;

case EXPIRY_POLICY:
cfg.setExpiryPolicyFactory(PlatformConfigurationUtils.readExpiryPolicyFactory(reader));
break;

case STATISTICS_ENABLED:
cfg.setStatisticsEnabled(reader.readBoolean());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,33 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.processors.platform.client.IgniteClientException;

import javax.cache.expiry.ExpiryPolicy;

/**
* Cache get request.
*/
class ClientCacheRequest extends ClientRequest {
/** Flag: keep binary. */
private static final byte FLAG_KEEP_BINARY = 1;

/** Flag: with expiry policy. */
private static final byte FLAG_WITH_EXPIRY_POLICY = 2;

/** Cache ID. */
private final int cacheId;

/** Flags. */
private final byte flags;

/** Expiry policy. */
private final ExpiryPolicy expiryPolicy;

/**
* Constructor.
*
Expand All @@ -48,6 +57,10 @@ class ClientCacheRequest extends ClientRequest {
cacheId = reader.readInt();

flags = reader.readByte();

expiryPolicy = withExpiryPolicy()
? new PlatformExpiryPolicy(reader.readLong(), reader.readLong(), reader.readLong())
: null;
}

/**
Expand All @@ -69,6 +82,15 @@ protected boolean isKeepBinary() {
return (flags & FLAG_KEEP_BINARY) == FLAG_KEEP_BINARY;
}

/**
* Gets a value indicating whether expiry policy is set in this request.
*
* @return expiry policy flag value.
*/
private boolean withExpiryPolicy() {
return (flags & FLAG_WITH_EXPIRY_POLICY) == FLAG_WITH_EXPIRY_POLICY;
}

/**
* Gets the cache for current cache id, ignoring any flags.
*
Expand All @@ -80,7 +102,11 @@ protected IgniteCache rawCache(ClientConnectionContext ctx) {

String cacheName = cacheDesc.cacheName();

return ctx.kernalContext().grid().cache(cacheName);
IgniteCache<Object, Object> cache = ctx.kernalContext().grid().cache(cacheName);
if (withExpiryPolicy())
cache = cache.withExpiryPolicy(expiryPolicy);

return cache;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public static CacheConfiguration readCacheConfiguration(BinaryRawReaderEx in, Cl
* @param in Reader.
* @return Expiry policy factory.
*/
private static Factory<? extends ExpiryPolicy> readExpiryPolicyFactory(BinaryRawReader in) {
public static Factory<? extends ExpiryPolicy> readExpiryPolicyFactory(BinaryRawReader in) {
if (!in.readBoolean())
return null;

Expand All @@ -301,7 +301,7 @@ private static Factory<? extends ExpiryPolicy> readExpiryPolicyFactory(BinaryRaw
*
* @param out Writer.
*/
private static void writeExpiryPolicyFactory(BinaryRawWriter out, Factory<? extends ExpiryPolicy> factory) {
public static void writeExpiryPolicyFactory(BinaryRawWriter out, Factory<? extends ExpiryPolicy> factory) {
if (!(factory instanceof PlatformExpiryPolicyFactory)) {
out.writeBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Expiry;
using Apache.Ignite.Core.Cache.Query;
using Apache.Ignite.Core.Client.Cache;

Expand Down Expand Up @@ -349,5 +350,11 @@ public ICacheClient<TK1, TV1> WithKeepBinary<TK1, TV1>()
{
return _cache.WithKeepBinary<TK1, TV1>();
}

/** <inheritDoc /> */
public ICacheClient<TK, TV> WithExpiryPolicy(IExpiryPolicy plc)
{
return _cache.WithExpiryPolicy(plc);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using System.Threading;
using Apache.Ignite.Core.Binary;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Expiry;
using Apache.Ignite.Core.Client;
using NUnit.Framework;

Expand Down Expand Up @@ -674,7 +675,7 @@ public void TestRemoveKeyVal()
/// Tests the RemoveAll with a set of keys.
/// </summary>
[Test]
public void TestRemoveReys()
public void TestRemoveKeys()
{
var cache = GetClientCache<int?, int?>();
var keys = Enumerable.Range(1, 10).Cast<int?>().ToArray();
Expand Down Expand Up @@ -916,6 +917,169 @@ public void TestCacheNames()
}
}

/// <summary>
/// Test cache with expiry policy for Create action.
/// </summary>
[Test]
public void TestCacheWithExpiryPolicyOnCreate()
{
const int val = 3;
var expiryPolicy = new ExpiryPolicy(TimeSpan.FromMilliseconds(100), null, null);
var cache = GetClientCache<int>();
var cacheWithExpiryPolicy = cache.WithExpiryPolicy(expiryPolicy);

cacheWithExpiryPolicy.Put(val, val);

// Initially added value is the same.
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

// Wait for an expiration.
Thread.Sleep(200);

// Expiry policies should be applied, no cache item exists.
Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsFalse(cache.ContainsKey(val));
}

/// <summary>
/// Test cache with expiry policy for Update enabled.
/// </summary>
[Test]
public void TestCacheWithExpiryPolicyOnUpdate()
{
const int val = 4;
var expiryPolicy = new ExpiryPolicy(null, TimeSpan.FromMilliseconds(100), null);
var cache = GetClientCache<int>();
var cacheWithExpiryPolicy = cache.WithExpiryPolicy(expiryPolicy);

cacheWithExpiryPolicy.Put(val, val);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

Thread.Sleep(50);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

cacheWithExpiryPolicy.Put(val, val + 1);

Thread.Sleep(70);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

Thread.Sleep(50);

// Expiry policies should be applied, no cache item exists.
Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsFalse(cache.ContainsKey(val));
}

/// <summary>
/// Test cache with expiry policy for Access enabled.
/// </summary>
[Test]
public void TestCacheWithExpiryPolicyOnAccess()
{
const int val = 6;
var expiryPolicy = new ExpiryPolicy(null, null, TimeSpan.FromMilliseconds(200));
var cache = GetClientCache<int>();
var cacheWithExpiryPolicy = cache.WithExpiryPolicy(expiryPolicy);

cacheWithExpiryPolicy.Put(val, val);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

Thread.Sleep(100);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

cacheWithExpiryPolicy.Get(val);

Thread.Sleep(150);
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsTrue(cache.ContainsKey(val));

Thread.Sleep(150);

// Expiry policies should be applied, no cache item exists.
Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(val));
Assert.IsFalse(cache.ContainsKey(val));
}

/// <summary>
/// Test cache with expiration does not affect original cache.
/// </summary>
[Test]
public void TestCacheWithExpirationHasIsolatedScope()
{
const int val = 7;
var expiryPolicy = new ExpiryPolicy(TimeSpan.FromMilliseconds(100), null, null);
var cache = GetClientCache<int>();
var cacheWithExpiryPolicy = cache.WithExpiryPolicy(expiryPolicy);

cache.Put(val, val);
cacheWithExpiryPolicy.Put(val + 1, val);

Thread.Sleep(200);

// Both caches contains the original value.
Assert.IsTrue(cache.ContainsKey(val));
Assert.IsTrue(cacheWithExpiryPolicy.ContainsKey(val));

// New value is being absent for both caches.
Assert.IsFalse(cache.ContainsKey(val + 1));
Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(val + 1));
}

/// <summary>
/// Test cache with expiration does not modify keepBinary flag.
/// </summary>
[Test]
public void TestCacheWithExpirationDoesNotAffectKeepBinarySettings()
{
const int key = 10;
var person = new Person(1);

var cache = GetClientCache<Person>();
cache.Put(key, person);

var cacheWithKeepBinary = cache.WithKeepBinary<int, IBinaryObject>();
AssertExtensions.ReflectionEqual(person, cacheWithKeepBinary.Get(key).Deserialize<Person>());

var expiryPolicy = new ExpiryPolicy(null, null, TimeSpan.FromMilliseconds(100));

var cacheWithExpiryPolicy = cacheWithKeepBinary.WithExpiryPolicy(expiryPolicy);
AssertExtensions.ReflectionEqual(person, cacheWithExpiryPolicy.Get(key).Deserialize<Person>());

Thread.Sleep(200);

Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(key));
Assert.IsFalse(cache.ContainsKey(key));
}

/// <summary>
/// Test cache with keepBinary does not modify expiry policy settings.
/// </summary>
[Test]
public void TestCacheWithKeepBinaryDoesNotAffectExpirationPolicy()
{
const int key = 11;
var person = new Person(1);

var expiryPolicy = new ExpiryPolicy(null, null, TimeSpan.FromMilliseconds(100));
var cacheWithExpiryPolicy = GetClientCache<Person>().WithExpiryPolicy(expiryPolicy);

cacheWithExpiryPolicy.Put(key, person);

var cacheWithKeepBinary = cacheWithExpiryPolicy.WithKeepBinary<int, IBinaryObject>();
AssertExtensions.ReflectionEqual(person, cacheWithKeepBinary.Get(key).Deserialize<Person>());

Thread.Sleep(200);

Assert.IsFalse(cacheWithKeepBinary.ContainsKey(key));
Assert.IsFalse(cacheWithExpiryPolicy.ContainsKey(key));
}

private class Container
{
public Container Inner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void TestRemoveKeyVal()
/// Tests the RemoveAll with a set of keys.
/// </summary>
[Test]
public void TestRemoveReys()
public void TestRemoveKeys()
{
var cache = GetBinaryKeyCache();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ public void TestSerializeDeserialize()
TestSerializeDeserializeUnspported(cfg, "EvictionPolicy");
cfg.EvictionPolicy = null;

TestSerializeDeserializeUnspported(cfg, "ExpiryPolicyFactory");
cfg.ExpiryPolicyFactory = null;

TestSerializeDeserializeUnspported(cfg, "PluginConfigurations");
cfg.PluginConfigurations = null;

Expand Down
Loading

0 comments on commit b2b12eb

Please sign in to comment.