ContinuousQueryWithTransformer implementation questions

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

ContinuousQueryWithTransformer implementation questions

Nikolay Izhikov
Hello, Igniters.

I'm working on IGNITE-425 [1] issue.
I made a couple of changes in my branch [2] so I want to confirm that
changes with community before moving forward:

Text of issue:

```
Currently if updated entry passes the filter, it is sent to node initiated
the query entirely.
It would be good to provide user with the ability to transform entry and,
for example,
select only fields that are important. This may bring huge economy to
traffic and lower GC pressure as well.
```

1. I create new class ContinuousQueryWithTransformer extends Query:

Reasons to create entirely new class without extending ContinuousQuery:

    a. ContinuousQuery is final so user can't extends it. I don't want to
change that.
    b. ContinuousQuery contains some deprecated methods(setRemoteFilter) so
with new class we can get rid of them.
    c. Such public API design disallow usage of existing localEventListener
with new transformedEventListenr in compile time.

```
    public final class ContinuousQueryWithTransformer<K, V, T> extends
Query<Cache.Entry<K, V>> {
        public ContinuousQueryWithTransformer<K, V, T>
setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
rmtFilterFactory) { /**/ }

        public ContinuousQueryWithTransformer<K, V, T>
setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
factory) { /**/ }

        public ContinuousQueryWithTransformer<K, V, T>
setLocalTransformedEventListener(TransformedEventListener<T>
locTransEvtLsnr) { /**/ }

        public interface TransformedEventListener<T> {
            void onUpdated(Iterable<? extends T> events) throws
CacheEntryListenerException;
        }
    }
```

2. I want to edit all tests from package
`core/src/test/java/org/apach/ignite/internal/processors/cache/query/continuous/`
to ensure my implementation fully support existing tests.
I want to make each test can work both for regular ContinousQuery and
ContinuousQueryWithTransformer:

Existing test:

```
        ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();

        qry.setLocalListener(new CacheEntryUpdatedListener<Object,
Object>() {
            @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>>
evts) {
                for (CacheEntryEvent evt : evts) {
                    if ((Integer)evt.getValue() >= 0)
                        evtCnt.incrementAndGet();
                }
            }
        });

```

To be:

```
        Query qry = createContinuousQuery();

        setLocalListener(qry, new CI1<T2<Object, Object>>() {
            @Override public void apply(T2<Object, Object> e) {
                if ((Integer)e.getValue() >= 0)
                    evtCnt.incrementAndGet();
            }
        });
```

Base class to support setLocalListener:

```
    protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>> lsnrClsr)
{
        if (isContinuousWithTransformer()) {
            ((ContinuousQueryWithTransformer)q)
                .setLocalTransformedEventListener(new
TransformedEventListenerImpl(lsnrClsr));
        } else
            ((ContinuousQuery)q).setLocalListener(new
CacheInvokeListener(lsnrClsr));
    }

    protected static class CacheInvokeListener<K, V>  {
        private CI1<T2<K, V>> clsr;

        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
K, ? extends V>> events)
            throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends K, ? extends V> e : events)
                clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
        }
    }

    protected static class TransformedEventListenerImpl<K, V> implements
TransformedEventListener {
        private IgniteBiInClosure<Ignite, T2<K, V>> clsr;

        @Override public void onUpdated(Iterable evts) throws
CacheEntryListenerException {
            for (Object e : evts) {
                clsr.apply((T2)e);
            }
        }
    }
```

Thoughts?

[1] https://issues.apache.org/jira/browse/IGNITE-425
[2] https://github.com/nizhikov/ignite/pull/9/files

--
Nikolay Izhikov
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions

Valentin Kulichenko
Nikolay,

We already have the following method for queries with transformer. It
currently throws exception for ContinuousQuery.

<T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer)

Would it be possible to utilize it instead of creating new API?

-Val

On Wed, Jul 26, 2017 at 5:26 AM, Николай Ижиков <[hidden email]>
wrote:

> Hello, Igniters.
>
> I'm working on IGNITE-425 [1] issue.
> I made a couple of changes in my branch [2] so I want to confirm that
> changes with community before moving forward:
>
> Text of issue:
>
> ```
> Currently if updated entry passes the filter, it is sent to node initiated
> the query entirely.
> It would be good to provide user with the ability to transform entry and,
> for example,
> select only fields that are important. This may bring huge economy to
> traffic and lower GC pressure as well.
> ```
>
> 1. I create new class ContinuousQueryWithTransformer extends Query:
>
> Reasons to create entirely new class without extending ContinuousQuery:
>
>     a. ContinuousQuery is final so user can't extends it. I don't want to
> change that.
>     b. ContinuousQuery contains some deprecated methods(setRemoteFilter) so
> with new class we can get rid of them.
>     c. Such public API design disallow usage of existing localEventListener
> with new transformedEventListenr in compile time.
>
> ```
>     public final class ContinuousQueryWithTransformer<K, V, T> extends
> Query<Cache.Entry<K, V>> {
>         public ContinuousQueryWithTransformer<K, V, T>
> setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
> rmtFilterFactory) { /**/ }
>
>         public ContinuousQueryWithTransformer<K, V, T>
> setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
> factory) { /**/ }
>
>         public ContinuousQueryWithTransformer<K, V, T>
> setLocalTransformedEventListener(TransformedEventListener<T>
> locTransEvtLsnr) { /**/ }
>
>         public interface TransformedEventListener<T> {
>             void onUpdated(Iterable<? extends T> events) throws
> CacheEntryListenerException;
>         }
>     }
> ```
>
> 2. I want to edit all tests from package
> `core/src/test/java/org/apach/ignite/internal/processors/
> cache/query/continuous/`
> to ensure my implementation fully support existing tests.
> I want to make each test can work both for regular ContinousQuery and
> ContinuousQueryWithTransformer:
>
> Existing test:
>
> ```
>         ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
>
>         qry.setLocalListener(new CacheEntryUpdatedListener<Object,
> Object>() {
>             @Override public void onUpdated(Iterable<CacheEntryEvent<?,
> ?>>
> evts) {
>                 for (CacheEntryEvent evt : evts) {
>                     if ((Integer)evt.getValue() >= 0)
>                         evtCnt.incrementAndGet();
>                 }
>             }
>         });
>
> ```
>
> To be:
>
> ```
>         Query qry = createContinuousQuery();
>
>         setLocalListener(qry, new CI1<T2<Object, Object>>() {
>             @Override public void apply(T2<Object, Object> e) {
>                 if ((Integer)e.getValue() >= 0)
>                     evtCnt.incrementAndGet();
>             }
>         });
> ```
>
> Base class to support setLocalListener:
>
> ```
>     protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>> lsnrClsr)
> {
>         if (isContinuousWithTransformer()) {
>             ((ContinuousQueryWithTransformer)q)
>                 .setLocalTransformedEventListener(new
> TransformedEventListenerImpl(lsnrClsr));
>         } else
>             ((ContinuousQuery)q).setLocalListener(new
> CacheInvokeListener(lsnrClsr));
>     }
>
>     protected static class CacheInvokeListener<K, V>  {
>         private CI1<T2<K, V>> clsr;
>
>         @Override public void onUpdated(Iterable<CacheEntryEvent<? extends
> K, ? extends V>> events)
>             throws CacheEntryListenerException {
>             for (CacheEntryEvent<? extends K, ? extends V> e : events)
>                 clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
>         }
>     }
>
>     protected static class TransformedEventListenerImpl<K, V> implements
> TransformedEventListener {
>         private IgniteBiInClosure<Ignite, T2<K, V>> clsr;
>
>         @Override public void onUpdated(Iterable evts) throws
> CacheEntryListenerException {
>             for (Object e : evts) {
>                 clsr.apply((T2)e);
>             }
>         }
>     }
> ```
>
> Thoughts?
>
> [1] https://issues.apache.org/jira/browse/IGNITE-425
> [2] https://github.com/nizhikov/ignite/pull/9/files
>
> --
> Nikolay Izhikov
> [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: ContinuousQueryWithTransformer implementation questions

Valentin Kulichenko
Yeah, unfortunately current ContinuousQuery object can be used for querying
with transformer. That's actually not good, because adding transformers to
continuous queries and scan queries will be very inconsistent.

AFAIK, there are plans to completely rework query API since we added a lot
of stuff current API is not enough for (DML, DLL, etc.). Probably it makes
sense to consider transformers in the new API as well.

-Val

On Wed, Jul 26, 2017 at 1:32 PM, Nikolay Izhikov <[hidden email]>
wrote:

> Hello, Valentin.
>
> As far as I can understand `query(Query<T> qry, IgniteClosure<T, R>
> transformer)` is slightly different from what I should implement.
>
>
> I need to pass two parameter for ContinuousQuery instead of localListener:
>
> - Remote Transformer
> - Local Listener for transformed events
>
> and method you provide can accept only transformer.
>
> Moreover I think I should somehow "extend" ContinuousQuery(my proposal is
> new class with similar name) because issue is about possibility of
> optimization of continuous query mechanism.
>
> Thoughts?
>
>
> 26.07.2017 20:56, Valentin Kulichenko пишет:
>
> Nikolay,
>>
>> We already have the following method for queries with transformer. It
>> currently throws exception for ContinuousQuery.
>>
>> <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer)
>>
>> Would it be possible to utilize it instead of creating new API?
>>
>> -Val
>>
>> On Wed, Jul 26, 2017 at 5:26 AM, Николай Ижиков <[hidden email]>
>> wrote:
>>
>> Hello, Igniters.
>>>
>>> I'm working on IGNITE-425 [1] issue.
>>> I made a couple of changes in my branch [2] so I want to confirm that
>>> changes with community before moving forward:
>>>
>>> Text of issue:
>>>
>>> ```
>>> Currently if updated entry passes the filter, it is sent to node
>>> initiated
>>> the query entirely.
>>> It would be good to provide user with the ability to transform entry and,
>>> for example,
>>> select only fields that are important. This may bring huge economy to
>>> traffic and lower GC pressure as well.
>>> ```
>>>
>>> 1. I create new class ContinuousQueryWithTransformer extends Query:
>>>
>>> Reasons to create entirely new class without extending ContinuousQuery:
>>>
>>>      a. ContinuousQuery is final so user can't extends it. I don't want
>>> to
>>> change that.
>>>      b. ContinuousQuery contains some deprecated
>>> methods(setRemoteFilter) so
>>> with new class we can get rid of them.
>>>      c. Such public API design disallow usage of existing
>>> localEventListener
>>> with new transformedEventListenr in compile time.
>>>
>>> ```
>>>      public final class ContinuousQueryWithTransformer<K, V, T> extends
>>> Query<Cache.Entry<K, V>> {
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setRemoteFilterFactory(Factory<? extends CacheEntryEventFilter<K, V>>
>>> rmtFilterFactory) { /**/ }
>>>
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setRemoteTransformerFactory(Factory<? extends IgniteBiClosure<K, V, T>>
>>> factory) { /**/ }
>>>
>>>          public ContinuousQueryWithTransformer<K, V, T>
>>> setLocalTransformedEventListener(TransformedEventListener<T>
>>> locTransEvtLsnr) { /**/ }
>>>
>>>          public interface TransformedEventListener<T> {
>>>              void onUpdated(Iterable<? extends T> events) throws
>>> CacheEntryListenerException;
>>>          }
>>>      }
>>> ```
>>>
>>> 2. I want to edit all tests from package
>>> `core/src/test/java/org/apach/ignite/internal/processors/
>>> cache/query/continuous/`
>>> to ensure my implementation fully support existing tests.
>>> I want to make each test can work both for regular ContinousQuery and
>>> ContinuousQueryWithTransformer:
>>>
>>> Existing test:
>>>
>>> ```
>>>          ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
>>>
>>>          qry.setLocalListener(new CacheEntryUpdatedListener<Object,
>>> Object>() {
>>>              @Override public void onUpdated(Iterable<CacheEntryEvent<?,
>>> ?>>
>>> evts) {
>>>                  for (CacheEntryEvent evt : evts) {
>>>                      if ((Integer)evt.getValue() >= 0)
>>>                          evtCnt.incrementAndGet();
>>>                  }
>>>              }
>>>          });
>>>
>>> ```
>>>
>>> To be:
>>>
>>> ```
>>>          Query qry = createContinuousQuery();
>>>
>>>          setLocalListener(qry, new CI1<T2<Object, Object>>() {
>>>              @Override public void apply(T2<Object, Object> e) {
>>>                  if ((Integer)e.getValue() >= 0)
>>>                      evtCnt.incrementAndGet();
>>>              }
>>>          });
>>> ```
>>>
>>> Base class to support setLocalListener:
>>>
>>> ```
>>>      protected <K, V> void setLocalListener(Query q, CI1<T2<K, V>>
>>> lsnrClsr)
>>> {
>>>          if (isContinuousWithTransformer()) {
>>>              ((ContinuousQueryWithTransformer)q)
>>>                  .setLocalTransformedEventListener(new
>>> TransformedEventListenerImpl(lsnrClsr));
>>>          } else
>>>              ((ContinuousQuery)q).setLocalListener(new
>>> CacheInvokeListener(lsnrClsr));
>>>      }
>>>
>>>      protected static class CacheInvokeListener<K, V>  {
>>>          private CI1<T2<K, V>> clsr;
>>>
>>>          @Override public void onUpdated(Iterable<CacheEntryEvent<?
>>> extends
>>> K, ? extends V>> events)
>>>              throws CacheEntryListenerException {
>>>              for (CacheEntryEvent<? extends K, ? extends V> e : events)
>>>                  clsr.apply(ignite, new T2<>(e.getKey(), e.getValue()));
>>>          }
>>>      }
>>>
>>>      protected static class TransformedEventListenerImpl<K, V>
>>> implements
>>> TransformedEventListener {
>>>          private IgniteBiInClosure<Ignite, T2<K, V>> clsr;
>>>
>>>          @Override public void onUpdated(Iterable evts) throws
>>> CacheEntryListenerException {
>>>              for (Object e : evts) {
>>>                  clsr.apply((T2)e);
>>>              }
>>>          }
>>>      }
>>> ```
>>>
>>> Thoughts?
>>>
>>> [1] https://issues.apache.org/jira/browse/IGNITE-425
>>> [2] https://github.com/nizhikov/ignite/pull/9/files
>>>
>>> --
>>> Nikolay Izhikov
>>> [hidden email]
>>>
>>>
>>