This post was updated on .
I have some issues and questions for ignite ContinuousQuery.
First, Can we please get the eventType for the CQ. I mean if the record is recently inserted into the intialQuery set then it is actually and NEW and already in the set and updated UPDATE and if removed also REMOVED notification(not talking about getEventType() method)? Second, When I try ContinuousQuery I am getting events for the records I am not subscribed to. I know that i can write the conditions again in the remotefilterfactory but then it is the replication of the sqlquery which i dont want to repeat again with a closure. Entity Class public class Profile implements Serializable{ @QuerySqlField(index = true) private String number; @QuerySqlField(index = true) private Double dataUsage; } Update Profile Method void updateAnyProfile(Double newDataUsage){ SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where dataUsage < 30"); List<CacheEntryImpl<String, Profile>> res = profileCache.query(qry).getAll(); Profile profile = res.iterator().next().getValue(); profile.setDataUsage(newDataUsage); profileCache.put(profile.getNumber(), profile); } Query method QueryCursor<Entry<String,BinaryObject>> detectChangesNoRemoteFilter(double start, double end) throws Exception{ ContinuousQuery<String, BinaryObject> qry = new ContinuousQuery<>(); SqlQuery sqlQry = new SqlQuery(Profile.class,"dataUsage > ? and dataUsage < ? "); sqlQry.setArgs(start,end); qry.setInitialQuery(sqlQry); qry.setLocalListener((evts) -> evts.forEach(e -> { System.out.println("Inserting/Updating profiles with more than %"+start+" key=" + e.getKey() + ", dataUsage=" + e.getValue().field("dataUsage")); })); qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<String,BinaryObject>>() { public CacheEntryEventFilter<String, BinaryObject> create() { return new CacheEntryEventFilter<String, BinaryObject>() { public boolean evaluate(CacheEntryEvent<? extends String, ? extends BinaryObject> e) throws CacheEntryListenerException { return true; } }; } }); IgniteCache<BinaryObject, BinaryObject> cache = Ignition.ignite().cache("profileCache").withKeepBinary(); QueryCursor<Entry<String, BinaryObject>> cursor = cache.query(qry); return cursor; } And registering the ContinuousQuery as below QueryCursor<Entry<String, BinaryObject>> cursor30=profileService.detectChangesNoRemoteFilter(30,33); Updating a profile with dataUsage is less than 30 to 51 and still getting notifications which i should not since the data change is irrelevant to my initial query. If that is the case then ContinuousQueries are not working as mentioned in the document profileService.updateAnyProfile(51D); Last question is if none of the above will be supported i guess i will need to listen to all the events for my cache and use closures with RemoteListeners. Please let me know if I shall use a better work around. Thanks From the ignite official documentation From https://apacheignite.readme.io/docs/continuous-queries (Continuous queries enable you to listen to data modifications occurring on Ignite caches. Once a continuous query is started, you will get notified of all the data changes that fall into your query filter if any.) |
Hi Fatih.
Documentation for initial query says: "an initial query that will be executed before the continuous query gets registered in the cluster and before you start to receive the updates". So it will only be executed once. To further filter events you must implement it on filter logic. Your filter implementation always returns true. On Mon, May 8, 2017 at 12:01 AM, fatih <[hidden email]> wrote: > When I try ContinuousQuery I am getting events for the records I am not > subscribed to. > > Entity Class > public class Profile implements Serializable{ > @QuerySqlField(index = true) > private String number; > @QuerySqlField(index = true) > private Double dataUsage; > } > > Update Profile Method > void updateAnyProfile(Double newDataUsage){ > SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile > where > dataUsage < 30"); > List<CacheEntryImpl<String, Profile>> res = > profileCache.query(qry).getAll(); > Profile profile = res.iterator().next().getValue(); > profile.setDataUsage(newDataUsage); > veonProfileCache.put(profile.getNumber(), profile); > } > > Query method > > QueryCursor<Entry<String,BinaryObject>> > detectChangesNoRemoteFilter(double start, double end) throws Exception{ > ContinuousQuery<String, BinaryObject> qry = new > ContinuousQuery<>(); > SqlQuery sqlQry = new SqlQuery(Profile.class,"dataUsage > > ? and dataUsage > < ? "); > sqlQry.setArgs(start,end); > qry.setInitialQuery(sqlQry); > qry.setLocalListener((evts) -> > evts.forEach(e -> { > System.out.println("Inserting/Updating > profiles with more than > %"+start+" key=" + e.getKey() + ", dataUsage=" + > e.getValue().field("dataUsage")); > })); > qry.setRemoteFilterFactory(new > Factory<CacheEntryEventFilter<String,BinaryObject>>() { > public CacheEntryEventFilter<String, BinaryObject> > create() { > return new CacheEntryEventFilter<String, > BinaryObject>() { > public boolean > evaluate(CacheEntryEvent<? extends String, ? extends > BinaryObject> e) > throws > CacheEntryListenerException { > return true; > } > }; > } > }); > > IgniteCache<BinaryObject, BinaryObject> cache = > Ignition.ignite().cache("profileCache").withKeepBinary(); > QueryCursor<Entry<String, BinaryObject>> cursor = > cache.query(qry); > return cursor; > } > > And registering the ContinuousQuery as below > QueryCursor<Entry<String, BinaryObject>> > cursor30 = > > profileService.detectChangesNoRemoteFilter(30,33); > > > Updating a profile with dataUSage is less than 30 to 51 and still getting > notifications which i should not since the data change is irrelevant to my > initial query. If that is the case then ContinuousQueries are not working > as > mentioned in the document > profileService.updateAnyProfile(51D); > > > From https://apacheignite.readme.io/docs/continuous-queries > Continuous queries enable you to listen to data modifications occurring on > Ignite caches. Once a continuous query is started, you will get notified of > all the data changes that fall into your query filter if any. > > > > > -- > View this message in context: http://apache-ignite- > developers.2346864.n4.nabble.com/ContiniousQuery-giving- > notification-for-different-dataset-tp17538.html > Sent from the Apache Ignite Developers mailing list archive at Nabble.com. > -- Alper Tekinalp Software Developer Evam Streaming Analytics Atatürk Mah. Turgut Özal Bulv. Gardenya 5 Plaza K:6 Ataşehir 34758 İSTANBUL Tel: +90 216 455 01 53 Fax: +90 216 455 01 54 www.evam.com.tr <http://www.evam.com> |
I have already mentioned below that i can use remotefilterfactory with more conditions but then this is basically the replication.
Second, When I try ContinuousQuery I am getting events for the records I am not subscribed to. I know that i can write the conditions again in the remotefilterfactory but then it is the replication of the sqlquery which i dont want to repeat again with a closure. |
Continuous queries are predicate-based. Initial query is a separate
optional feature that allows you to fetch the data that existed before listener was registered, it does not affect further update notifications in any way. Having said that, the behavior you observe is correct. -Val On Mon, May 8, 2017 at 12:12 PM, fatih <[hidden email]> wrote: > I have already mentioned below that i can use remotefilterfactory with more > conditions but then this is basically the replication. > > Second, > When I try ContinuousQuery I am getting events for the records I am not > subscribed to. I know that i can write the conditions again in the > remotefilterfactory but then it is the replication of the sqlquery which i > dont want to repeat again with a closure. > > > > -- > View this message in context: http://apache-ignite- > developers.2346864.n4.nabble.com/ContiniousQuery-giving- > notification-for-irrelevant-records-tp17538p17541.html > Sent from the Apache Ignite Developers mailing list archive at Nabble.com. > |
Free forum by Nabble | Edit this page |