Skip to content

Commit

Permalink
GG-19275 [IGNITE-3653] P2P doesn't work for remote filter and filter …
Browse files Browse the repository at this point in the history
…factory - Fixes apache#4566.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govorukhin@gmail.com>
  • Loading branch information
dmekhanikov committed Oct 25, 2019
1 parent 11f8766 commit b1260bc
Show file tree
Hide file tree
Showing 19 changed files with 914 additions and 335 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@
/**
* API for configuring continuous cache queries.
* <p>
* Continuous queries allow to register a remote filter and a local listener
* Continuous queries allow registering a remote filter and a local listener
* for cache updates. If an update event passes the filter, it will be sent to
* the node that executed the query and local listener will be notified.
* the node that executed the query, and local listener will be notified.
* <p>
* Additionally, you can execute initial query to get currently existing data.
* Additionally, you can execute an initial query to get currently existing data.
* Query can be of any type (SQL, TEXT or SCAN) and can be set via {@link #setInitialQuery(Query)}
* method.
* <p>
* Query can be executed either on all nodes in topology using {@link IgniteCache#query(Query)}
* method, or only on the local node, if {@link Query#setLocal(boolean)} parameter is set to {@code true}.
* Note that in case query is distributed and a new node joins, it will get the remote
* filter for the query during discovery process before it actually joins topology,
* Note that if the query is distributed and a new node joins, it will get the remote
* filter for the query during discovery process before it actually joins a topology,
* so no updates will be missed.
* <h1 class="header">Example</h1>
* As an example, suppose we have cache with {@code 'Person'} objects and we need
* to query all persons with salary above 1000.
* As an example, suppose we have a cache with {@code 'Person'} objects and we need
* to query for all people with salary above 1000.
* <p>
* Here is the {@code Person} class:
* <pre name="code" class="java">
Expand All @@ -60,17 +60,17 @@
* }
* </pre>
* <p>
* You can create and execute continuous query like so:
* You can create and execute a continuous query like so:
* <pre name="code" class="java">
* // Create new continuous query.
* // Create a new continuous query.
* ContinuousQuery&lt;Long, Person&gt; qry = new ContinuousQuery&lt;&gt;();
*
* // Initial iteration query will return all persons with salary above 1000.
* // Initial iteration query will return all people with salary above 1000.
* qry.setInitialQuery(new ScanQuery&lt;&gt;((id, p) -> p.getSalary() &gt; 1000));
*
*
* // Callback that is called locally when update notifications are received.
* // It simply prints out information about all created persons.
* // It simply prints out information about all created or modified records.
* qry.setLocalListener((evts) -> {
* for (CacheEntryEvent&lt;? extends Long, ? extends Person&gt; e : evts) {
* Person p = e.getValue();
Expand All @@ -79,29 +79,29 @@
* }
* });
*
* // Continuous listener will be notified for persons with salary above 1000.
* // The continuous listener will be notified for people with salary above 1000.
* qry.setRemoteFilter(evt -> evt.getValue().getSalary() &gt; 1000);
*
* // Execute query and get cursor that iterates through initial data.
* // Execute the query and get a cursor that iterates through the initial data.
* QueryCursor&lt;Cache.Entry&lt;Long, Person&gt;&gt; cur = cache.query(qry);
* </pre>
* This will execute query on all nodes that have cache you are working with and
* listener will start to receive notifications for cache updates.
* This will execute query on all nodes that have the cache you are working with and
* listener will start receiving notifications for cache updates.
* <p>
* To stop receiving updates call {@link QueryCursor#close()} method:
* <pre name="code" class="java">
* cur.close();
* </pre>
* Note that this works even if you didn't provide initial query. Cursor will
* Note that this works even if you didn't provide the initial query. Cursor will
* be empty in this case, but it will still unregister listeners when {@link QueryCursor#close()}
* is called.
* <p>
* {@link IgniteAsyncCallback} annotation is supported for {@link CacheEntryEventFilter}
* (see {@link #setRemoteFilterFactory(Factory)}) and {@link CacheEntryUpdatedListener}
* (see {@link #setLocalListener(CacheEntryUpdatedListener)}).
* If filter and/or listener are annotated with {@link IgniteAsyncCallback} then annotated callback
* is executed in async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
* and notification order is kept the same as update order for given cache key.
* If a filter and/or listener are annotated with {@link IgniteAsyncCallback} then the annotated callback
* is executed in an async callback pool (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()})
* and a notification order is kept the same as an update order for a given cache key.
*
* @see ContinuousQueryWithTransformer
* @see IgniteAsyncCallback
Expand Down Expand Up @@ -130,18 +130,18 @@ public ContinuousQuery<K, V> setInitialQuery(Query<Cache.Entry<K, V>> initQry) {
}

/**
* Sets local callback. This callback is called only in local node when new updates are received.
* Sets a local callback. This callback is called only on local node when new updates are received.
* <p>
* The callback predicate accepts ID of the node from where updates are received and collection
* of received entries. Note that for removed entries value will be {@code null}.
* The callback predicate accepts ID of the node from where updates are received and a collection
* of the received entries. Note that for removed entries values will be {@code null}.
* <p>
* If the predicate returns {@code false}, query execution will be cancelled.
* <p>
* <b>WARNING:</b> all operations that involve any kind of JVM-local or distributed locking (e.g.,
* synchronization or transactional cache operations), should be executed asynchronously without
* blocking the thread that called the callback. Otherwise, you can get deadlocks.
* <p>
* If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in async callback pool
* If local listener are annotated with {@link IgniteAsyncCallback} then it is executed in an async callback pool
* (see {@link IgniteConfiguration#getAsyncCallbackPoolSize()}) that allow to perform a cache operations.
*
* @param locLsnr Local callback.
Expand All @@ -157,8 +157,6 @@ public ContinuousQuery<K, V> setLocalListener(CacheEntryUpdatedListener<K, V> lo
}

/**
* Gets local listener.
*
* @return Local listener.
*/
public CacheEntryUpdatedListener<K, V> getLocalListener() {
Expand Down Expand Up @@ -214,7 +212,7 @@ public ContinuousQuery<K, V> setAutoUnsubscribe(boolean autoUnsubscribe) {
}

/**
* Sets whether this query should be executed on local node only.
* Sets whether this query should be executed on a local node only.
*
* Note: backup event queues are not kept for local continuous queries. It may lead to loss of notifications in case
* of node failures. Use {@link ContinuousQuery#setRemoteFilterFactory(Factory)} to register cache event listeners
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.T2;
Expand Down Expand Up @@ -92,6 +94,9 @@ class GridEventConsumeHandler implements GridContinuousHandler {
/** Listener. */
private GridLocalEventListener lsnr;

/** P2P unmarshalling future. */
private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();

/**
* Required by {@link Externalizable}.
*/
Expand Down Expand Up @@ -142,6 +147,21 @@ public GridEventConsumeHandler() {
// No-op.
}

/**
* Performs remote filter initialization.
*
* @param filter Remote filter.
* @param ctx Kernal context.
* @throws IgniteCheckedException In case if initialization failed.
*/
private void initFilter(IgnitePredicate<Event> filter, GridKernalContext ctx) throws IgniteCheckedException {
if (filter != null)
ctx.resource().injectGeneric(filter);

if (filter instanceof PlatformEventFilterListener)
((PlatformEventFilterListener)filter).initialize(ctx);
}

/** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
Expand All @@ -152,12 +172,6 @@ public GridEventConsumeHandler() {
if (cb != null)
ctx.resource().injectGeneric(cb);

if (filter != null)
ctx.resource().injectGeneric(filter);

if (filter instanceof PlatformEventFilterListener)
((PlatformEventFilterListener)filter).initialize(ctx);

final boolean loc = nodeId.equals(ctx.localNodeId());

lsnr = new GridLocalEventListener() {
Expand Down Expand Up @@ -257,7 +271,18 @@ public GridEventConsumeHandler() {
if (F.isEmpty(types))
types = EVTS_ALL;

ctx.event().addLocalEventListener(lsnr, types);
p2pUnmarshalFut.listen((fut) -> {
if (fut.error() == null) {
try {
initFilter(filter, ctx);
}
catch (IgniteCheckedException e) {
throw F.wrap(e);
}

ctx.event().addLocalEventListener(lsnr, types);
}
});

return RegisterStatus.REGISTERED;
}
Expand Down Expand Up @@ -382,13 +407,22 @@ public GridEventConsumeHandler() {
assert ctx.config().isPeerClassLoadingEnabled();

if (filterBytes != null) {
GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
try {
GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);

if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);

filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));

filter = U.unmarshal(ctx, filterBytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
((GridFutureAdapter)p2pUnmarshalFut).onDone();
}
catch (IgniteCheckedException e) {
((GridFutureAdapter)p2pUnmarshalFut).onDone(e);

throw e;
}
}
}

Expand Down Expand Up @@ -449,6 +483,7 @@ public GridEventConsumeHandler() {
boolean b = in.readBoolean();

if (b) {
p2pUnmarshalFut = new GridFutureAdapter<>();
filterBytes = U.readByteArray(in);
clsName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.S;
Expand Down Expand Up @@ -66,6 +69,9 @@ public class GridMessageListenHandler implements GridContinuousHandler {
/** */
private boolean depEnabled;

/** P2P unmarshalling future. */
private IgniteInternalFuture<Void> p2pUnmarshalFut = new GridFinishedFuture<>();

/**
* Required by {@link Externalizable}.
*/
Expand All @@ -84,22 +90,6 @@ public GridMessageListenHandler(@Nullable Object topic, IgniteBiPredicate<UUID,
this.pred = pred;
}

/**
*
* @param orig Handler to be copied.
*/
public GridMessageListenHandler(GridMessageListenHandler orig) {
assert orig != null;

this.clsName = orig.clsName;
this.depInfo = orig.depInfo;
this.pred = orig.pred;
this.predBytes = orig.predBytes;
this.topic = orig.topic;
this.topicBytes = orig.topicBytes;
this.depEnabled = false;
}

/** {@inheritDoc} */
@Override public boolean isEvents() {
return false;
Expand Down Expand Up @@ -132,9 +122,11 @@ public GridMessageListenHandler(GridMessageListenHandler orig) {
}

/** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred, nodeId);
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) {
p2pUnmarshalFut.listen((fut) -> {
if (fut.error() == null)
ctx.io().addUserMessageListener(topic, pred, nodeId);
});

return RegisterStatus.REGISTERED;
}
Expand Down Expand Up @@ -180,18 +172,27 @@ public GridMessageListenHandler(GridMessageListenHandler orig) {
assert ctx != null;
assert ctx.config().isPeerClassLoadingEnabled();

GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
try {
GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);

if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
if (dep == null)
throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);

ClassLoader ldr = dep.classLoader();

ClassLoader ldr = dep.classLoader();
if (topicBytes != null)
topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));

if (topicBytes != null)
topic = U.unmarshal(ctx, topicBytes, U.resolveClassLoader(ldr, ctx.config()));
pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
}
catch (IgniteCheckedException | IgniteException e) {
((GridFutureAdapter)p2pUnmarshalFut).onDone(e);

throw e;
}

pred = U.unmarshal(ctx, predBytes, U.resolveClassLoader(ldr, ctx.config()));
((GridFutureAdapter)p2pUnmarshalFut).onDone();
}

/** {@inheritDoc} */
Expand Down Expand Up @@ -250,6 +251,7 @@ public GridMessageListenHandler(GridMessageListenHandler orig) {
depEnabled = in.readBoolean();

if (depEnabled) {
p2pUnmarshalFut = new GridFutureAdapter<>();
topicBytes = U.readByteArray(in);
predBytes = U.readByteArray(in);
clsName = U.readString(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ public IgniteEventsImpl(GridKernalContext ctx, ClusterGroupAdapter prj, boolean
autoUnsubscribe,
prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ private void send0(@Nullable Object topic, Collection<?> msgs, boolean async) th
false,
prj.predicate()));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
finally {
unguard();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
Expand All @@ -38,6 +40,7 @@ class CacheContinuousQueryDeployableObject implements Externalizable {
private static final long serialVersionUID = 0L;

/** Serialized object. */
@GridToStringExclude
private byte[] bytes;

/** Deployment class name. */
Expand Down Expand Up @@ -107,4 +110,9 @@ <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedExceptio
clsName = U.readString(in);
depInfo = (GridDeploymentInfo)in.readObject();
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheContinuousQueryDeployableObject.class, this);
}
}
Loading

0 comments on commit b1260bc

Please sign in to comment.