ContinuousQueryWithTransformer implementation questions - 2

classic Classic list List threaded Threaded
42 messages Options
123
Reply | Threaded
Open this post in threaded view
|

ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Hello, Igniters.

I'm working on IGNITE-425 [1] issue.

Text of issue:

===
Currently if updated entry passes the filter, it is sent to node
initiated the query entirely. It would be good to provide user with the
ability to transform entry and, for example, select only fields that are
important. This may bring huge economy to traffic and lower GC pressure
as well.
===

My pull request [2] is ready.
Anton Vinogradov is OK with it.
Nikolay Tikhonov reviewed my changes and want to discuss changes related
to public API with community.

1. I introduce new query class - ContinuousQueryWithTransformer [3].
Reasons:
* ContinuousQuery is final so user can't extends it. I don't want to
change that.
* ContinuousQuery contains some deprecated methods(setRemoteFilter) so
with new class we can get rid of them.
* Such public API design disallow usage of existing localEventListener
with new transformedEventListenr in compile time.

Thoughts?

2. What behavior is expected if transformer throws exception for some
event? I see following options:

* Pass `null` to listener(pull request implementation).
* Skip event. Don't call listener.
* Introduce special callback. onTransformError?

Thoughts?

```
public final class ContinuousQueryWithTransformer<K, V, T> extends
Query<Cache.Entry<K, V>> {
     //...

     private Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory;

     private Factory<? extends IgniteClosure<CacheEntryEvent<? extends
K, ? extends V>, T>> rmtTransFactory;

     private EventListener<T> locLsnr;

     //...

     public interface EventListener<T> {
         void onUpdated(Iterable<? extends T> events);
     }
}
```

Previous discussion - [4]

[1] https://issues.apache.org/jira/browse/IGNITE-425
[2] https://github.com/apache/ignite/pull/2372
[3]
https://github.com/apache/ignite/pull/2372/files#diff-22cc0cf0bc428b32a39e6cc0b22b0e3e
[4]
http://apache-ignite-developers.2346864.n4.nabble.com/ContinuousQueryWithTransformer-implementation-questions-td20078.html
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Anton Vinogradov
Nikolay,

> 2. What behavior is expected if transformer throws exception for some
event? I see following options:

Client should be notified, I vote for
> Introduce special callback. onTransformError?


On Tue, Aug 29, 2017 at 1:36 PM, Nikolay Izhikov <[hidden email]>
wrote:

> Hello, Igniters.
>
> I'm working on IGNITE-425 [1] issue.
>
> Text of issue:
>
> ===
> Currently if updated entry passes the filter, it is sent to node initiated
> the query entirely. It would be good to provide user with the ability to
> transform entry and, for example, select only fields that are important.
> This may bring huge economy to traffic and lower GC pressure as well.
> ===
>
> My pull request [2] is ready.
> Anton Vinogradov is OK with it.
> Nikolay Tikhonov reviewed my changes and want to discuss changes related
> to public API with community.
>
> 1. I introduce new query class - ContinuousQueryWithTransformer [3].
> Reasons:
> * ContinuousQuery is final so user can't extends it. I don't want to
> change that.
> * ContinuousQuery contains some deprecated methods(setRemoteFilter) so
> with new class we can get rid of them.
> * Such public API design disallow usage of existing localEventListener
> with new transformedEventListenr in compile time.
>
> Thoughts?
>
> 2. What behavior is expected if transformer throws exception for some
> event? I see following options:
>
> * Pass `null` to listener(pull request implementation).
> * Skip event. Don't call listener.
> * Introduce special callback. onTransformError?
>
> Thoughts?
>
> ```
> public final class ContinuousQueryWithTransformer<K, V, T> extends
> Query<Cache.Entry<K, V>> {
>     //...
>
>     private Factory<? extends CacheEntryEventFilter<K, V>>
> rmtFilterFactory;
>
>     private Factory<? extends IgniteClosure<CacheEntryEvent<? extends K,
> ? extends V>, T>> rmtTransFactory;
>
>     private EventListener<T> locLsnr;
>
>     //...
>
>     public interface EventListener<T> {
>         void onUpdated(Iterable<? extends T> events);
>     }
> }
> ```
>
> Previous discussion - [4]
>
> [1] https://issues.apache.org/jira/browse/IGNITE-425
> [2] https://github.com/apache/ignite/pull/2372
> [3] https://github.com/apache/ignite/pull/2372/files#diff-22cc0c
> f0bc428b32a39e6cc0b22b0e3e
> [4] http://apache-ignite-developers.2346864.n4.nabble.com/Contin
> uousQueryWithTransformer-implementation-questions-td20078.html
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

yzhdanov
In reply to this post by Nikolay Izhikov
I don't like the idea of having separate class, but it seems to be the only
way as there are too many API and generics differences.

Nikolay, I would also suggest you extract some super class for continuous
query. It will help to avoid code duplicates.

As far as remote transformer failure - we should react in the same way as
we react now for filter failure. I don't think there should be a
difference. What is the reaction now?

--Yakov
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Hi, Yakov.

If filter throws exception entry would be passed to listener.

 > Nikolay, I would also suggest you extract some super class for continuous
 > query. It will help to avoid code duplicates.

Yes, I will do this after reaching consensus on API changes.

29.08.2017 14:04, Yakov Zhdanov пишет:

> I don't like the idea of having separate class, but it seems to be the only
> way as there are too many API and generics differences.
>
> Nikolay, I would also suggest you extract some super class for continuous
> query. It will help to avoid code duplicates.
>
> As far as remote transformer failure - we should react in the same way as
> we react now for filter failure. I don't think there should be a
> difference. What is the reaction now?
>
> --Yakov
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

yzhdanov
> If filter throws exception entry would be passed to listener.

this is strange. Imagine a filter that very rarely throws some runtime
exception due to external or environmental reasons, but in case of normal
execution filter evaluates to false. In case of error entry is passed to a
local listener which can lead to some serious consequences and
inconsistencies in business logic. We probably need to send entry with a
notion that there was an error on server.

--Yakov
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Yakov.

I think exception equals `true` is intended behavior.

Filter evaluation implementation from master - [1]
Test from master to check filter exception(without explicit asserts
checking listeners call) - [2]

Here is my quick test with asserts on listener call after filter exception:

```
package org.apache.ignite.internal.processors.cache.query.continuous;

//... imports

public class GridCacheContinuousQueryFilterExceptionTest extends
GridCacheContinuousQueryAbstractSelfTest implements Serializable {
     /**
      * @throws Exception If failed.
      */
     public void testListenerAfterFilterException() throws Exception {
         IgniteCache<Integer, Integer> cache =
grid(0).cache(DEFAULT_CACHE_NAME);

         ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();

         final CountDownLatch latch = new CountDownLatch(100);

         qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
Integer>() {
             @Override public void onUpdated(Iterable<CacheEntryEvent<?
extends Integer, ? extends Integer>> evts) {
                 for (CacheEntryEvent<? extends Integer, ? extends
Integer> evt : evts)
                     latch.countDown();
             }
         });

         qry.setRemoteFilter(new
CacheEntryEventSerializableFilter<Integer, Integer>() {
             @Override public boolean evaluate(CacheEntryEvent<? extends
Integer, ? extends Integer> evt) {
                 throw new RuntimeException("Test error.");
             }
         });

         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored =
cache.query(qry)) {
             for (int i = 0; i < 100; i++)
                 cache.put(i, i);

             assertTrue(latch.await(10, SECONDS));
         }
     }

     @Override protected CacheMode cacheMode() {
         return CacheMode.REPLICATED;
     }

     @Override protected int gridCount() {
         return 1;
     }
}
```

[1]
https://github.com/apache/ignite/blob/master/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java#L791

[2]
https://github.com/apache/ignite/blob/master/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java#L359


29.08.2017 14:46, Yakov Zhdanov пишет:

>> If filter throws exception entry would be passed to listener.
>
> this is strange. Imagine a filter that very rarely throws some runtime
> exception due to external or environmental reasons, but in case of normal
> execution filter evaluates to false. In case of error entry is passed to a
> local listener which can lead to some serious consequences and
> inconsistencies in business logic. We probably need to send entry with a
> notion that there was an error on server.
>
> --Yakov
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

yzhdanov
Igniters,

Does anyone else see potential issues on user side with current approach?

Sam, is this JCache requirement?

--Yakov

2017-08-29 15:16 GMT+03:00 Nikolay Izhikov <[hidden email]>:

> Yakov.
>
> I think exception equals `true` is intended behavior.
>
> Filter evaluation implementation from master - [1]
> Test from master to check filter exception(without explicit asserts
> checking listeners call) - [2]
>
> Here is my quick test with asserts on listener call after filter exception:
>
> ```
> package org.apache.ignite.internal.processors.cache.query.continuous;
>
> //... imports
>
> public class GridCacheContinuousQueryFilterExceptionTest extends
> GridCacheContinuousQueryAbstractSelfTest implements Serializable {
>     /**
>      * @throws Exception If failed.
>      */
>     public void testListenerAfterFilterException() throws Exception {
>         IgniteCache<Integer, Integer> cache =
> grid(0).cache(DEFAULT_CACHE_NAME);
>
>         ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
>
>         final CountDownLatch latch = new CountDownLatch(100);
>
>         qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
> Integer>() {
>             @Override public void onUpdated(Iterable<CacheEntryEvent<?
> extends Integer, ? extends Integer>> evts) {
>                 for (CacheEntryEvent<? extends Integer, ? extends Integer>
> evt : evts)
>                     latch.countDown();
>             }
>         });
>
>         qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,
> Integer>() {
>             @Override public boolean evaluate(CacheEntryEvent<? extends
> Integer, ? extends Integer> evt) {
>                 throw new RuntimeException("Test error.");
>             }
>         });
>
>         try (QueryCursor<Cache.Entry<Integer, Integer>> ignored =
> cache.query(qry)) {
>             for (int i = 0; i < 100; i++)
>                 cache.put(i, i);
>
>             assertTrue(latch.await(10, SECONDS));
>         }
>     }
>
>     @Override protected CacheMode cacheMode() {
>         return CacheMode.REPLICATED;
>     }
>
>     @Override protected int gridCount() {
>         return 1;
>     }
> }
> ```
>
> [1] https://github.com/apache/ignite/blob/master/modules/core/
> src/main/java/org/apache/ignite/internal/processors/cache/
> query/continuous/CacheContinuousQueryHandler.java#L791
>
> [2] https://github.com/apache/ignite/blob/master/modules/core/
> src/test/java/org/apache/ignite/internal/processors/cache/
> query/continuous/GridCacheContinuousQueryAbstractSelfTest.java#L359
>
>
> 29.08.2017 14:46, Yakov Zhdanov пишет:
>
> If filter throws exception entry would be passed to listener.
>>>
>>
>> this is strange. Imagine a filter that very rarely throws some runtime
>> exception due to external or environmental reasons, but in case of normal
>> execution filter evaluates to false. In case of error entry is passed to a
>> local listener which can lead to some serious consequences and
>> inconsistencies in business logic. We probably need to send entry with a
>> notion that there was an error on server.
>>
>> --Yakov
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Yakov.

I found following description in jcache javadoc [1] -

===
Returns:
    true if the evaluation passes, otherwise false.
    *The effect of returning true is that listener will be invoked*
===

JSR doesn't specify how filter exception has to be handled.
As far as I can understand *only* way to pass filter is return true from
`evaluate`.

I think we has to change current behavior.

Should I file a ticket?

[1]
https://static.javadoc.io/javax.cache/cache-api/1.0.0/javax/cache/event/CacheEntryEventFilter.html#evaluate(javax.cache.event.CacheEntryEvent)


29.08.2017 17:09, Yakov Zhdanov пишет:

> Igniters,
>
> Does anyone else see potential issues on user side with current approach?
>
> Sam, is this JCache requirement?
>
> --Yakov
>
> 2017-08-29 15:16 GMT+03:00 Nikolay Izhikov <[hidden email]>:
>
>> Yakov.
>>
>> I think exception equals `true` is intended behavior.
>>
>> Filter evaluation implementation from master - [1]
>> Test from master to check filter exception(without explicit asserts
>> checking listeners call) - [2]
>>
>> Here is my quick test with asserts on listener call after filter exception:
>>
>> ```
>> package org.apache.ignite.internal.processors.cache.query.continuous;
>>
>> //... imports
>>
>> public class GridCacheContinuousQueryFilterExceptionTest extends
>> GridCacheContinuousQueryAbstractSelfTest implements Serializable {
>>      /**
>>       * @throws Exception If failed.
>>       */
>>      public void testListenerAfterFilterException() throws Exception {
>>          IgniteCache<Integer, Integer> cache =
>> grid(0).cache(DEFAULT_CACHE_NAME);
>>
>>          ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
>>
>>          final CountDownLatch latch = new CountDownLatch(100);
>>
>>          qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
>> Integer>() {
>>              @Override public void onUpdated(Iterable<CacheEntryEvent<?
>> extends Integer, ? extends Integer>> evts) {
>>                  for (CacheEntryEvent<? extends Integer, ? extends Integer>
>> evt : evts)
>>                      latch.countDown();
>>              }
>>          });
>>
>>          qry.setRemoteFilter(new CacheEntryEventSerializableFilter<Integer,
>> Integer>() {
>>              @Override public boolean evaluate(CacheEntryEvent<? extends
>> Integer, ? extends Integer> evt) {
>>                  throw new RuntimeException("Test error.");
>>              }
>>          });
>>
>>          try (QueryCursor<Cache.Entry<Integer, Integer>> ignored =
>> cache.query(qry)) {
>>              for (int i = 0; i < 100; i++)
>>                  cache.put(i, i);
>>
>>              assertTrue(latch.await(10, SECONDS));
>>          }
>>      }
>>
>>      @Override protected CacheMode cacheMode() {
>>          return CacheMode.REPLICATED;
>>      }
>>
>>      @Override protected int gridCount() {
>>          return 1;
>>      }
>> }
>> ```
>>
>> [1] https://github.com/apache/ignite/blob/master/modules/core/
>> src/main/java/org/apache/ignite/internal/processors/cache/
>> query/continuous/CacheContinuousQueryHandler.java#L791
>>
>> [2] https://github.com/apache/ignite/blob/master/modules/core/
>> src/test/java/org/apache/ignite/internal/processors/cache/
>> query/continuous/GridCacheContinuousQueryAbstractSelfTest.java#L359
>>
>>
>> 29.08.2017 14:46, Yakov Zhdanov пишет:
>>
>> If filter throws exception entry would be passed to listener.
>>>>
>>>
>>> this is strange. Imagine a filter that very rarely throws some runtime
>>> exception due to external or environmental reasons, but in case of normal
>>> execution filter evaluates to false. In case of error entry is passed to a
>>> local listener which can lead to some serious consequences and
>>> inconsistencies in business logic. We probably need to send entry with a
>>> notion that there was an error on server.
>>>
>>> --Yakov
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Semyon Boikov
Hi,

I had an impression that current behavior is required by jcache, but now I
can not find anything about this neither in spec nor in jcache tck tests.
So I think we can change current behavior.

Thanks

On Tue, Aug 29, 2017 at 9:48 PM, Nikolay Izhikov <[hidden email]>
wrote:

> Yakov.
>
> I found following description in jcache javadoc [1] -
>
> ===
> Returns:
>    true if the evaluation passes, otherwise false.
>    *The effect of returning true is that listener will be invoked*
> ===
>
> JSR doesn't specify how filter exception has to be handled.
> As far as I can understand *only* way to pass filter is return true from
> `evaluate`.
>
> I think we has to change current behavior.
>
> Should I file a ticket?
>
> [1] https://static.javadoc.io/javax.cache/cache-api/1.0.0/javax/
> cache/event/CacheEntryEventFilter.html#evaluate(javax.cache.
> event.CacheEntryEvent)
>
>
> 29.08.2017 17:09, Yakov Zhdanov пишет:
>
> Igniters,
>>
>> Does anyone else see potential issues on user side with current approach?
>>
>> Sam, is this JCache requirement?
>>
>> --Yakov
>>
>> 2017-08-29 15:16 GMT+03:00 Nikolay Izhikov <[hidden email]>:
>>
>> Yakov.
>>>
>>> I think exception equals `true` is intended behavior.
>>>
>>> Filter evaluation implementation from master - [1]
>>> Test from master to check filter exception(without explicit asserts
>>> checking listeners call) - [2]
>>>
>>> Here is my quick test with asserts on listener call after filter
>>> exception:
>>>
>>> ```
>>> package org.apache.ignite.internal.processors.cache.query.continuous;
>>>
>>> //... imports
>>>
>>> public class GridCacheContinuousQueryFilterExceptionTest extends
>>> GridCacheContinuousQueryAbstractSelfTest implements Serializable {
>>>      /**
>>>       * @throws Exception If failed.
>>>       */
>>>      public void testListenerAfterFilterException() throws Exception {
>>>          IgniteCache<Integer, Integer> cache =
>>> grid(0).cache(DEFAULT_CACHE_NAME);
>>>
>>>          ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
>>>
>>>          final CountDownLatch latch = new CountDownLatch(100);
>>>
>>>          qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
>>> Integer>() {
>>>              @Override public void onUpdated(Iterable<CacheEntryEvent<?
>>> extends Integer, ? extends Integer>> evts) {
>>>                  for (CacheEntryEvent<? extends Integer, ? extends
>>> Integer>
>>> evt : evts)
>>>                      latch.countDown();
>>>              }
>>>          });
>>>
>>>          qry.setRemoteFilter(new CacheEntryEventSerializableFil
>>> ter<Integer,
>>> Integer>() {
>>>              @Override public boolean evaluate(CacheEntryEvent<? extends
>>> Integer, ? extends Integer> evt) {
>>>                  throw new RuntimeException("Test error.");
>>>              }
>>>          });
>>>
>>>          try (QueryCursor<Cache.Entry<Integer, Integer>> ignored =
>>> cache.query(qry)) {
>>>              for (int i = 0; i < 100; i++)
>>>                  cache.put(i, i);
>>>
>>>              assertTrue(latch.await(10, SECONDS));
>>>          }
>>>      }
>>>
>>>      @Override protected CacheMode cacheMode() {
>>>          return CacheMode.REPLICATED;
>>>      }
>>>
>>>      @Override protected int gridCount() {
>>>          return 1;
>>>      }
>>> }
>>> ```
>>>
>>> [1] https://github.com/apache/ignite/blob/master/modules/core/
>>> src/main/java/org/apache/ignite/internal/processors/cache/
>>> query/continuous/CacheContinuousQueryHandler.java#L791
>>>
>>> [2] https://github.com/apache/ignite/blob/master/modules/core/
>>> src/test/java/org/apache/ignite/internal/processors/cache/
>>> query/continuous/GridCacheContinuousQueryAbstractSelfTest.java#L359
>>>
>>>
>>> 29.08.2017 14:46, Yakov Zhdanov пишет:
>>>
>>> If filter throws exception entry would be passed to listener.
>>>
>>>>
>>>>>
>>>> this is strange. Imagine a filter that very rarely throws some runtime
>>>> exception due to external or environmental reasons, but in case of
>>>> normal
>>>> execution filter evaluates to false. In case of error entry is passed
>>>> to a
>>>> local listener which can lead to some serious consequences and
>>>> inconsistencies in business logic. We probably need to send entry with a
>>>> notion that there was an error on server.
>>>>
>>>> --Yakov
>>>>
>>>>
>>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Semyon, Yakov,

Here is ticket - https://issues.apache.org/jira/browse/IGNITE-6221

Thank you.

Can we now return to my questions about IGNITE-425?

1. I introduce new query class - ContinuousQueryWithTransformer [3].
Reasons:
* ContinuousQuery is final so user can't extends it. I don't want to
change that.
* ContinuousQuery contains some deprecated methods(setRemoteFilter) so
with new class we can get rid of them.
* Such public API design disallow usage of existing localEventListener
with new transformedEventListenr in compile time.

Thoughts?

2. What behavior is expected if transformer throws exception for some
event? I see following options:

* Pass `null` to listener(pull request implementation).
* Skip event. Don't call listener.
* Introduce special callback. onTransformError?

Thoughts?


30.08.2017 10:38, Semyon Boikov пишет:

> Hi,
>
> I had an impression that current behavior is required by jcache, but now I
> can not find anything about this neither in spec nor in jcache tck tests.
> So I think we can change current behavior.
>
> Thanks
>
> On Tue, Aug 29, 2017 at 9:48 PM, Nikolay Izhikov <[hidden email]>
> wrote:
>
>> Yakov.
>>
>> I found following description in jcache javadoc [1] -
>>
>> ===
>> Returns:
>>     true if the evaluation passes, otherwise false.
>>     *The effect of returning true is that listener will be invoked*
>> ===
>>
>> JSR doesn't specify how filter exception has to be handled.
>> As far as I can understand *only* way to pass filter is return true from
>> `evaluate`.
>>
>> I think we has to change current behavior.
>>
>> Should I file a ticket?
>>
>> [1] https://static.javadoc.io/javax.cache/cache-api/1.0.0/javax/
>> cache/event/CacheEntryEventFilter.html#evaluate(javax.cache.
>> event.CacheEntryEvent)
>>
>>
>> 29.08.2017 17:09, Yakov Zhdanov пишет:
>>
>> Igniters,
>>>
>>> Does anyone else see potential issues on user side with current approach?
>>>
>>> Sam, is this JCache requirement?
>>>
>>> --Yakov
>>>
>>> 2017-08-29 15:16 GMT+03:00 Nikolay Izhikov <[hidden email]>:
>>>
>>> Yakov.
>>>>
>>>> I think exception equals `true` is intended behavior.
>>>>
>>>> Filter evaluation implementation from master - [1]
>>>> Test from master to check filter exception(without explicit asserts
>>>> checking listeners call) - [2]
>>>>
>>>> Here is my quick test with asserts on listener call after filter
>>>> exception:
>>>>
>>>> ```
>>>> package org.apache.ignite.internal.processors.cache.query.continuous;
>>>>
>>>> //... imports
>>>>
>>>> public class GridCacheContinuousQueryFilterExceptionTest extends
>>>> GridCacheContinuousQueryAbstractSelfTest implements Serializable {
>>>>       /**
>>>>        * @throws Exception If failed.
>>>>        */
>>>>       public void testListenerAfterFilterException() throws Exception {
>>>>           IgniteCache<Integer, Integer> cache =
>>>> grid(0).cache(DEFAULT_CACHE_NAME);
>>>>
>>>>           ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
>>>>
>>>>           final CountDownLatch latch = new CountDownLatch(100);
>>>>
>>>>           qry.setLocalListener(new CacheEntryUpdatedListener<Integer,
>>>> Integer>() {
>>>>               @Override public void onUpdated(Iterable<CacheEntryEvent<?
>>>> extends Integer, ? extends Integer>> evts) {
>>>>                   for (CacheEntryEvent<? extends Integer, ? extends
>>>> Integer>
>>>> evt : evts)
>>>>                       latch.countDown();
>>>>               }
>>>>           });
>>>>
>>>>           qry.setRemoteFilter(new CacheEntryEventSerializableFil
>>>> ter<Integer,
>>>> Integer>() {
>>>>               @Override public boolean evaluate(CacheEntryEvent<? extends
>>>> Integer, ? extends Integer> evt) {
>>>>                   throw new RuntimeException("Test error.");
>>>>               }
>>>>           });
>>>>
>>>>           try (QueryCursor<Cache.Entry<Integer, Integer>> ignored =
>>>> cache.query(qry)) {
>>>>               for (int i = 0; i < 100; i++)
>>>>                   cache.put(i, i);
>>>>
>>>>               assertTrue(latch.await(10, SECONDS));
>>>>           }
>>>>       }
>>>>
>>>>       @Override protected CacheMode cacheMode() {
>>>>           return CacheMode.REPLICATED;
>>>>       }
>>>>
>>>>       @Override protected int gridCount() {
>>>>           return 1;
>>>>       }
>>>> }
>>>> ```
>>>>
>>>> [1] https://github.com/apache/ignite/blob/master/modules/core/
>>>> src/main/java/org/apache/ignite/internal/processors/cache/
>>>> query/continuous/CacheContinuousQueryHandler.java#L791
>>>>
>>>> [2] https://github.com/apache/ignite/blob/master/modules/core/
>>>> src/test/java/org/apache/ignite/internal/processors/cache/
>>>> query/continuous/GridCacheContinuousQueryAbstractSelfTest.java#L359
>>>>
>>>>
>>>> 29.08.2017 14:46, Yakov Zhdanov пишет:
>>>>
>>>> If filter throws exception entry would be passed to listener.
>>>>
>>>>>
>>>>>>
>>>>> this is strange. Imagine a filter that very rarely throws some runtime
>>>>> exception due to external or environmental reasons, but in case of
>>>>> normal
>>>>> execution filter evaluates to false. In case of error entry is passed
>>>>> to a
>>>>> local listener which can lead to some serious consequences and
>>>>> inconsistencies in business logic. We probably need to send entry with a
>>>>> notion that there was an error on server.
>>>>>
>>>>> --Yakov
>>>>>
>>>>>
>>>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

yzhdanov
I think I have already agreed on a separate class since it seems to be the
only option due to generics issue. Should we extract a super class?

We can put hard requirement that filter and transformer cannot throw
exception (same as cache interceptor). If exception is thrown then we
cancel the query globally and unregister all the listeners. This may sound
too much but inconsistencies brought by listener notifications may be
terrible for app.

--Yakov
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Hello, Yakov.

The new class is OK - got it. Thanks!

 > Should we extract a super class?

Yes, we should.
I already have done it.

See my last commit in PR -
https://github.com/apache/ignite/pull/2372/commits/af1ed2e4dbef4ba5999f8566198cb75ad922f93b

 > We can put hard requirement that filter and transformer cannot throw
 > exception (same as cache interceptor).

I think to cancel the whole query on transformer exception is too much.
After discussion, I like the idea to skip event if transformer throws
exception. As far as it "like regular filter" behavior.

Thoughts?


30.08.2017 16:03, Yakov Zhdanov пишет:

> I think I have already agreed on a separate class since it seems to be the
> only option due to generics issue. Should we extract a super class?
>
> We can put hard requirement that filter and transformer cannot throw
> exception (same as cache interceptor). If exception is thrown then we
> cancel the query globally and unregister all the listeners. This may sound
> too much but inconsistencies brought by listener notifications may be
> terrible for app.
>
> --Yakov
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Anton Vinogradov
Nikolay,

You can also use:

1) Global Exception Registry
ctx.kernalContext().exceptionRegistry().onException(shortMsg, ex);

2) Statistic
final boolean statsEnabled = cctx.config().isStatisticsEnabled();
if (statsEnabled)
   metrics.incrementOnFails();

P.s. Please make sure that code still valid, I found this at 1.6.

On Wed, Aug 30, 2017 at 4:24 PM, Nikolay Izhikov <[hidden email]>
wrote:

> Hello, Yakov.
>
> The new class is OK - got it. Thanks!
>
> > Should we extract a super class?
>
> Yes, we should.
> I already have done it.
>
> See my last commit in PR - https://github.com/apache/igni
> te/pull/2372/commits/af1ed2e4dbef4ba5999f8566198cb75ad922f93b
>
> > We can put hard requirement that filter and transformer cannot throw
> > exception (same as cache interceptor).
>
> I think to cancel the whole query on transformer exception is too much.
> After discussion, I like the idea to skip event if transformer throws
> exception. As far as it "like regular filter" behavior.
>
> Thoughts?
>
>
> 30.08.2017 16:03, Yakov Zhdanov пишет:
>
> I think I have already agreed on a separate class since it seems to be the
>> only option due to generics issue. Should we extract a super class?
>>
>> We can put hard requirement that filter and transformer cannot throw
>> exception (same as cache interceptor). If exception is thrown then we
>> cancel the query globally and unregister all the listeners. This may sound
>> too much but inconsistencies brought by listener notifications may be
>> terrible for app.
>>
>> --Yakov
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Yakov Zhdanov-2
In reply to this post by Nikolay Izhikov
I would postpone review until we come to a clear decision on what should be
done if filter or transformer fails. I don't think cancelling query is too
much. From my standpoint this is a kind of heuristic exception and may
break some sensitive logic.

Thanks!
--
Yakov Zhdanov, Director R&D
*GridGain Systems*
www.gridgain.com

2017-08-30 16:24 GMT+03:00 Nikolay Izhikov <[hidden email]>:

> Hello, Yakov.
>
> The new class is OK - got it. Thanks!
>
> > Should we extract a super class?
>
> Yes, we should.
> I already have done it.
>
> See my last commit in PR - https://github.com/apache/igni
> te/pull/2372/commits/af1ed2e4dbef4ba5999f8566198cb75ad922f93b
>
> > We can put hard requirement that filter and transformer cannot throw
> > exception (same as cache interceptor).
>
> I think to cancel the whole query on transformer exception is too much.
> After discussion, I like the idea to skip event if transformer throws
> exception. As far as it "like regular filter" behavior.
>
> Thoughts?
>
>
> 30.08.2017 16:03, Yakov Zhdanov пишет:
>
> I think I have already agreed on a separate class since it seems to be the
>> only option due to generics issue. Should we extract a super class?
>>
>> We can put hard requirement that filter and transformer cannot throw
>> exception (same as cache interceptor). If exception is thrown then we
>> cancel the query globally and unregister all the listeners. This may sound
>> too much but inconsistencies brought by listener notifications may be
>> terrible for app.
>>
>> --Yakov
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Hello, Yakov.

I made a bit of investigation about your proposal of handling filter and
transformer exceptions:

1. If we cancel continuous query from remote node user can't know it.
    There is no public API to check "Is continuous query still alive?".
    The only consequence of canceling query - listener not called on a
local node.

2. If we change a behavior of filter exception handling then we broke
backward compatibility. Is it OK?

3. If we implement query cancel only for transformer exception - behavior
would be different for filter and transformer.

I think changes for consistent exception handling requires additional
discussion.
I will start such discussion in another thread but seems that it not
related to current issue as it also touches current ContinuousQuery
implementation.

Can we stay with current behavior for current task(IGNITE-425)?

* filter exception treats as true
* transformer exception treats as null


2017-08-30 17:16 GMT+03:00 Yakov Zhdanov <[hidden email]>:

> I would postpone review until we come to a clear decision on what should be
> done if filter or transformer fails. I don't think cancelling query is too
> much. From my standpoint this is a kind of heuristic exception and may
> break some sensitive logic.
>
> Thanks!
> --
> Yakov Zhdanov, Director R&D
> *GridGain Systems*
> www.gridgain.com
>
> 2017-08-30 16:24 GMT+03:00 Nikolay Izhikov <[hidden email]>:
>
> > Hello, Yakov.
> >
> > The new class is OK - got it. Thanks!
> >
> > > Should we extract a super class?
> >
> > Yes, we should.
> > I already have done it.
> >
> > See my last commit in PR - https://github.com/apache/igni
> > te/pull/2372/commits/af1ed2e4dbef4ba5999f8566198cb75ad922f93b
> >
> > > We can put hard requirement that filter and transformer cannot throw
> > > exception (same as cache interceptor).
> >
> > I think to cancel the whole query on transformer exception is too much.
> > After discussion, I like the idea to skip event if transformer throws
> > exception. As far as it "like regular filter" behavior.
> >
> > Thoughts?
> >
> >
> > 30.08.2017 16:03, Yakov Zhdanov пишет:
> >
> > I think I have already agreed on a separate class since it seems to be
> the
> >> only option due to generics issue. Should we extract a super class?
> >>
> >> We can put hard requirement that filter and transformer cannot throw
> >> exception (same as cache interceptor). If exception is thrown then we
> >> cancel the query globally and unregister all the listeners. This may
> sound
> >> too much but inconsistencies brought by listener notifications may be
> >> terrible for app.
> >>
> >> --Yakov
> >>
> >>
>



--
Nikolay Izhikov
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Anton Vinogradov-2
+1 to keep current behavior and start new thread to solve notification
issue.

On Mon, Sep 4, 2017 at 2:05 PM, Николай Ижиков <[hidden email]>
wrote:

> Hello, Yakov.
>
> I made a bit of investigation about your proposal of handling filter and
> transformer exceptions:
>
> 1. If we cancel continuous query from remote node user can't know it.
>     There is no public API to check "Is continuous query still alive?".
>     The only consequence of canceling query - listener not called on a
> local node.
>
> 2. If we change a behavior of filter exception handling then we broke
> backward compatibility. Is it OK?
>
> 3. If we implement query cancel only for transformer exception - behavior
> would be different for filter and transformer.
>
> I think changes for consistent exception handling requires additional
> discussion.
> I will start such discussion in another thread but seems that it not
> related to current issue as it also touches current ContinuousQuery
> implementation.
>
> Can we stay with current behavior for current task(IGNITE-425)?
>
> * filter exception treats as true
> * transformer exception treats as null
>
>
> 2017-08-30 17:16 GMT+03:00 Yakov Zhdanov <[hidden email]>:
>
> > I would postpone review until we come to a clear decision on what should
> be
> > done if filter or transformer fails. I don't think cancelling query is
> too
> > much. From my standpoint this is a kind of heuristic exception and may
> > break some sensitive logic.
> >
> > Thanks!
> > --
> > Yakov Zhdanov, Director R&D
> > *GridGain Systems*
> > www.gridgain.com
> >
> > 2017-08-30 16:24 GMT+03:00 Nikolay Izhikov <[hidden email]>:
> >
> > > Hello, Yakov.
> > >
> > > The new class is OK - got it. Thanks!
> > >
> > > > Should we extract a super class?
> > >
> > > Yes, we should.
> > > I already have done it.
> > >
> > > See my last commit in PR - https://github.com/apache/igni
> > > te/pull/2372/commits/af1ed2e4dbef4ba5999f8566198cb75ad922f93b
> > >
> > > > We can put hard requirement that filter and transformer cannot throw
> > > > exception (same as cache interceptor).
> > >
> > > I think to cancel the whole query on transformer exception is too much.
> > > After discussion, I like the idea to skip event if transformer throws
> > > exception. As far as it "like regular filter" behavior.
> > >
> > > Thoughts?
> > >
> > >
> > > 30.08.2017 16:03, Yakov Zhdanov пишет:
> > >
> > > I think I have already agreed on a separate class since it seems to be
> > the
> > >> only option due to generics issue. Should we extract a super class?
> > >>
> > >> We can put hard requirement that filter and transformer cannot throw
> > >> exception (same as cache interceptor). If exception is thrown then we
> > >> cancel the query globally and unregister all the listeners. This may
> > sound
> > >> too much but inconsistencies brought by listener notifications may be
> > >> terrible for app.
> > >>
> > >> --Yakov
> > >>
> > >>
> >
>
>
>
> --
> Nikolay Izhikov
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

yzhdanov
Well, let's leave it as is for now, since I don't see any easy way to
consistently solve this.

--Yakov
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Yakov, Anton, thank you!

2017-09-04 15:16 GMT+03:00 Yakov Zhdanov <[hidden email]>:

> Well, let's leave it as is for now, since I don't see any easy way to
> consistently solve this.
>
> --Yakov
>



--
Nikolay Izhikov
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolai Tikhonov
Guys,

This API looks very complicated. We already have similar functionality for
ScanQuery, let's reuse it. Please, have a look
at IgniteCache#query(org.apache.ignite.cache.query.Query<T>,
org.apache.ignite.lang.IgniteClosure<T,R>) method. It looks more pretty for
me. Any objections?

Thanks,
Nikolay

On Mon, Sep 4, 2017 at 3:22 PM, Николай Ижиков <[hidden email]>
wrote:

> Yakov, Anton, thank you!
>
> 2017-09-04 15:16 GMT+03:00 Yakov Zhdanov <[hidden email]>:
>
> > Well, let's leave it as is for now, since I don't see any easy way to
> > consistently solve this.
> >
> > --Yakov
> >
>
>
>
> --
> Nikolay Izhikov
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions - 2

Nikolay Izhikov
Hello, Nikolay.

I think we can't use this method:

We need to set:
* transformer
* *local listener* - differs from regular ContinuousQuery listener
* other continuous query params.

So at least we have to add local listener param.

If we use ContinuousQuery instance as first param to get all
ContinuousQuery params: timeout, pageSize, filter, etc... then we have to
ignore local listener inside it.

What do you think?


2017-09-05 15:48 GMT+03:00 Nikolai Tikhonov <[hidden email]>:

> Guys,
>
> This API looks very complicated. We already have similar functionality for
> ScanQuery, let's reuse it. Please, have a look
> at IgniteCache#query(org.apache.ignite.cache.query.Query<T>,
> org.apache.ignite.lang.IgniteClosure<T,R>) method. It looks more pretty
> for
> me. Any objections?
>
> Thanks,
> Nikolay
>
> On Mon, Sep 4, 2017 at 3:22 PM, Николай Ижиков <[hidden email]>
> wrote:
>
> > Yakov, Anton, thank you!
> >
> > 2017-09-04 15:16 GMT+03:00 Yakov Zhdanov <[hidden email]>:
> >
> > > Well, let's leave it as is for now, since I don't see any easy way to
> > > consistently solve this.
> > >
> > > --Yakov
> > >
> >
> >
> >
> > --
> > Nikolay Izhikov
> > [hidden email]
> >
>



--
Nikolay Izhikov
[hidden email]
123