Hi,
You know CacheStore API that is commonly used for read/write-through relationship of the in-memory data with the persistence storage. There is also IgniteCache.loadCache method for hot-loading the cache on startup. Invocation of this method causes execution of CacheStore.loadCache on the all nodes storing the cache partitions. Because of none keys are passed to the CacheStore.loadCache methods, the underlying implementation is forced to read all the data from the persistence storage, but only part of the data will be stored on each node. So, the current implementation have two general drawbacks: 1. Persistence storage is forced to perform as many identical queries as many nodes on the cluster. Each query may involve much additional computation on the persistence storage server. 2. Network is forced to transfer much more data, so obviously the big disadvantage on large systems. The partition-aware data loading approach, described in https://apacheignite.readme.io/docs/data-loading#section-partition-aware-data-loading , is not a choice. It requires persistence of the volatile data depended on affinity function implementation and settings. I propose using something like IgniteDataStreamer inside IgniteCache.loadCache implementation. -- Thanks, Alexandr Kuramshin |
Alexandr,
Could you describe your proposal in more details? Especially in case with several nodes. On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin <[hidden email]> wrote: > Hi, > > You know CacheStore API that is commonly used for read/write-through > relationship of the in-memory data with the persistence storage. > > There is also IgniteCache.loadCache method for hot-loading the cache on > startup. Invocation of this method causes execution of CacheStore.loadCache > on the all nodes storing the cache partitions. Because of none keys are > passed to the CacheStore.loadCache methods, the underlying implementation > is forced to read all the data from the persistence storage, but only part > of the data will be stored on each node. > > So, the current implementation have two general drawbacks: > > 1. Persistence storage is forced to perform as many identical queries as > many nodes on the cluster. Each query may involve much additional > computation on the persistence storage server. > > 2. Network is forced to transfer much more data, so obviously the big > disadvantage on large systems. > > The partition-aware data loading approach, described in > https://apacheignite.readme.io/docs/data-loading#section- > partition-aware-data-loading > , is not a choice. It requires persistence of the volatile data depended on > affinity function implementation and settings. > > I propose using something like IgniteDataStreamer inside > IgniteCache.loadCache implementation. > > > -- > Thanks, > Alexandr Kuramshin > -- Alexey Kuznetsov |
All right,
Let's assume a simple scenario. When the IgniteCache.loadCache is invoked, we check whether the cache is not local, and if so, then we'll initiate the new loading logic. First, we take a "streamer" node, it could be done by utilizing LoadBalancingSpi, or it may be configured statically, for the reason that the streamer node is running on the same host as the persistence storage provider. After that we start the loading task on the streamer node which creates IgniteDataStreamer and loads the cache with CacheStore.loadCache. Every call to IgniteBiInClosure.apply simply invokes IgniteDataStreamer.addData. This implementation will completely relieve overhead on the persistence storage provider. Network overhead is also decreased in the case of partitioned caches. For two nodes we get 1-1/2 amount of data transferred by the network (1 part well be transferred from the persistence storage to the streamer, and then 1/2 from the streamer node to the another node). For three nodes it will be 1-2/3 and so on, up to the two times amount of data on the big clusters. I'd like to propose some additional optimization at this place. If we have the streamer node on the same machine as the persistence storage provider, then we completely relieve the network overhead as well. It could be a some special daemon node for the cache loading assigned in the cache configuration, or an ordinary sever node as well. Certainly this calculations have been done in assumption that we have even partitioned cache with only primary nodes (without backups). In the case of one backup (the most frequent case I think), we get 2 amount of data transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on four, and so on up to the three times amount of data on the big clusters. Hence it's still better than the current implementation. In the worst case with a fully replicated cache we take N+1 amount of data transferred by the network (where N is the number of nodes in the cluster). But it's not a problem in small clusters, and a little overhead in big clusters. And we still gain the persistence storage provider optimization. Now let's take more complex scenario. To achieve some level of parallelism, we could split our cluster on several groups. It could be a parameter of the IgniteCache.loadCache method or a cache configuration option. The number of groups could be a fixed value, or it could be calculated dynamically by the maximum number of nodes in the group. After splitting the whole cluster on groups we will take the streamer node in the each group and submit the task for loading the cache similar to the single streamer scenario, except as the only keys will be passed to the IgniteDataStreamer.addData method those correspond to the cluster group where is the streamer node running. In this case we get equal level of overhead as the parallelism, but not so surplus as how many nodes in whole the cluster. 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email]>: > Alexandr, > > Could you describe your proposal in more details? > Especially in case with several nodes. > > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin <[hidden email]> > wrote: > > > Hi, > > > > You know CacheStore API that is commonly used for read/write-through > > relationship of the in-memory data with the persistence storage. > > > > There is also IgniteCache.loadCache method for hot-loading the cache on > > startup. Invocation of this method causes execution of > CacheStore.loadCache > > on the all nodes storing the cache partitions. Because of none keys are > > passed to the CacheStore.loadCache methods, the underlying implementation > > is forced to read all the data from the persistence storage, but only > part > > of the data will be stored on each node. > > > > So, the current implementation have two general drawbacks: > > > > 1. Persistence storage is forced to perform as many identical queries as > > many nodes on the cluster. Each query may involve much additional > > computation on the persistence storage server. > > > > 2. Network is forced to transfer much more data, so obviously the big > > disadvantage on large systems. > > > > The partition-aware data loading approach, described in > > https://apacheignite.readme.io/docs/data-loading#section- > > partition-aware-data-loading > > , is not a choice. It requires persistence of the volatile data depended > on > > affinity function implementation and settings. > > > > I propose using something like IgniteDataStreamer inside > > IgniteCache.loadCache implementation. > > > > > > -- > > Thanks, > > Alexandr Kuramshin > > > > > > -- > Alexey Kuznetsov > -- Thanks, Alexandr Kuramshin |
Looks good for me.
But I will suggest to consider one more use-case: If user knows its data he could manually split loading. For example: table Persons contains 10M rows. User could provide something like: cache.loadCache(null, "Person", "select * from Person where id < 1_000_000", "Person", "select * from Person where id >= 1_000_000 and id < 2_000_000", .... "Person", "select * from Person where id >= 9_000_000 and id < 10_000_000", ); or may be it could be some descriptor object like { sql: select * from Person where id >= ? and id < ?" range: 0...10_000_000 } In this case provided queries will be send to mach nodes as number of queries. And data will be loaded in parallel and for keys that a not local - data streamer should be used (as described Alexandr description). I think it is a good issue for Ignite 2.0 Vova, Val - what do you think? On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin <[hidden email]> wrote: > All right, > > Let's assume a simple scenario. When the IgniteCache.loadCache is invoked, > we check whether the cache is not local, and if so, then we'll initiate the > new loading logic. > > First, we take a "streamer" node, it could be done by > utilizing LoadBalancingSpi, or it may be configured statically, for the > reason that the streamer node is running on the same host as the > persistence storage provider. > > After that we start the loading task on the streamer node which > creates IgniteDataStreamer and loads the cache with CacheStore.loadCache. > Every call to IgniteBiInClosure.apply simply > invokes IgniteDataStreamer.addData. > > This implementation will completely relieve overhead on the persistence > storage provider. Network overhead is also decreased in the case of > partitioned caches. For two nodes we get 1-1/2 amount of data transferred > by the network (1 part well be transferred from the persistence storage to > the streamer, and then 1/2 from the streamer node to the another node). For > three nodes it will be 1-2/3 and so on, up to the two times amount of data > on the big clusters. > > I'd like to propose some additional optimization at this place. If we have > the streamer node on the same machine as the persistence storage provider, > then we completely relieve the network overhead as well. It could be a some > special daemon node for the cache loading assigned in the cache > configuration, or an ordinary sever node as well. > > Certainly this calculations have been done in assumption that we have even > partitioned cache with only primary nodes (without backups). In the case of > one backup (the most frequent case I think), we get 2 amount of data > transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on four, and > so on up to the three times amount of data on the big clusters. Hence it's > still better than the current implementation. In the worst case with a > fully replicated cache we take N+1 amount of data transferred by the > network (where N is the number of nodes in the cluster). But it's not a > problem in small clusters, and a little overhead in big clusters. And we > still gain the persistence storage provider optimization. > > Now let's take more complex scenario. To achieve some level of parallelism, > we could split our cluster on several groups. It could be a parameter of > the IgniteCache.loadCache method or a cache configuration option. The > number of groups could be a fixed value, or it could be calculated > dynamically by the maximum number of nodes in the group. > > After splitting the whole cluster on groups we will take the streamer node > in the each group and submit the task for loading the cache similar to the > single streamer scenario, except as the only keys will be passed to > the IgniteDataStreamer.addData method those correspond to the cluster group > where is the streamer node running. > > In this case we get equal level of overhead as the parallelism, but not so > surplus as how many nodes in whole the cluster. > > 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email]>: > > > Alexandr, > > > > Could you describe your proposal in more details? > > Especially in case with several nodes. > > > > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > [hidden email]> > > wrote: > > > > > Hi, > > > > > > You know CacheStore API that is commonly used for read/write-through > > > relationship of the in-memory data with the persistence storage. > > > > > > There is also IgniteCache.loadCache method for hot-loading the cache on > > > startup. Invocation of this method causes execution of > > CacheStore.loadCache > > > on the all nodes storing the cache partitions. Because of none keys are > > > passed to the CacheStore.loadCache methods, the underlying > implementation > > > is forced to read all the data from the persistence storage, but only > > part > > > of the data will be stored on each node. > > > > > > So, the current implementation have two general drawbacks: > > > > > > 1. Persistence storage is forced to perform as many identical queries > as > > > many nodes on the cluster. Each query may involve much additional > > > computation on the persistence storage server. > > > > > > 2. Network is forced to transfer much more data, so obviously the big > > > disadvantage on large systems. > > > > > > The partition-aware data loading approach, described in > > > https://apacheignite.readme.io/docs/data-loading#section- > > > partition-aware-data-loading > > > , is not a choice. It requires persistence of the volatile data > depended > > on > > > affinity function implementation and settings. > > > > > > I propose using something like IgniteDataStreamer inside > > > IgniteCache.loadCache implementation. > > > > > > > > > -- > > > Thanks, > > > Alexandr Kuramshin > > > > > > > > > > > -- > > Alexey Kuznetsov > > > > > > -- > Thanks, > Alexandr Kuramshin > -- Alexey Kuznetsov GridGain Systems www.gridgain.com |
Alexandr, Alexey,
While I agree with you that current cache loading logic is far from ideal, it would be cool to see API drafts based on your suggestions to get better understanding of your ideas. How exactly users are going to use your suggestions? My main concern is that initial load is not very trivial task in general case. Some users have centralized RDBMS systems, some have NoSQL, others work with distributed persistent stores (e.g. HDFS). Sometimes we have Ignite nodes "near" persistent data, sometimes we don't. Sharding, affinity, co-location, etc.. If we try to support all (or many) cases out of the box, we may end up in very messy and difficult API. So we should carefully balance between simplicity, usability and feature-rich characteristics here. Personally, I think that if user is not satisfied with "loadCache()" API, he just writes simple closure with blackjack streamer and queries and send it to whatever node he finds convenient. Not a big deal. Only very common cases should be added to Ignite API. Vladimir. On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov <[hidden email]> wrote: > Looks good for me. > > But I will suggest to consider one more use-case: > > If user knows its data he could manually split loading. > For example: table Persons contains 10M rows. > User could provide something like: > cache.loadCache(null, "Person", "select * from Person where id < > 1_000_000", > "Person", "select * from Person where id >= 1_000_000 and id < 2_000_000", > .... > "Person", "select * from Person where id >= 9_000_000 and id < 10_000_000", > ); > > or may be it could be some descriptor object like > > { > sql: select * from Person where id >= ? and id < ?" > range: 0...10_000_000 > } > > In this case provided queries will be send to mach nodes as number of > queries. > And data will be loaded in parallel and for keys that a not local - data > streamer > should be used (as described Alexandr description). > > I think it is a good issue for Ignite 2.0 > > Vova, Val - what do you think? > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin <[hidden email]> > wrote: > >> All right, >> >> Let's assume a simple scenario. When the IgniteCache.loadCache is invoked, >> we check whether the cache is not local, and if so, then we'll initiate >> the >> new loading logic. >> >> First, we take a "streamer" node, it could be done by >> utilizing LoadBalancingSpi, or it may be configured statically, for the >> reason that the streamer node is running on the same host as the >> persistence storage provider. >> >> After that we start the loading task on the streamer node which >> creates IgniteDataStreamer and loads the cache with CacheStore.loadCache. >> Every call to IgniteBiInClosure.apply simply >> invokes IgniteDataStreamer.addData. >> >> This implementation will completely relieve overhead on the persistence >> storage provider. Network overhead is also decreased in the case of >> partitioned caches. For two nodes we get 1-1/2 amount of data transferred >> by the network (1 part well be transferred from the persistence storage to >> the streamer, and then 1/2 from the streamer node to the another node). >> For >> three nodes it will be 1-2/3 and so on, up to the two times amount of data >> on the big clusters. >> >> I'd like to propose some additional optimization at this place. If we have >> the streamer node on the same machine as the persistence storage provider, >> then we completely relieve the network overhead as well. It could be a >> some >> special daemon node for the cache loading assigned in the cache >> configuration, or an ordinary sever node as well. >> >> Certainly this calculations have been done in assumption that we have even >> partitioned cache with only primary nodes (without backups). In the case >> of >> one backup (the most frequent case I think), we get 2 amount of data >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on four, >> and >> so on up to the three times amount of data on the big clusters. Hence it's >> still better than the current implementation. In the worst case with a >> fully replicated cache we take N+1 amount of data transferred by the >> network (where N is the number of nodes in the cluster). But it's not a >> problem in small clusters, and a little overhead in big clusters. And we >> still gain the persistence storage provider optimization. >> >> Now let's take more complex scenario. To achieve some level of >> parallelism, >> we could split our cluster on several groups. It could be a parameter of >> the IgniteCache.loadCache method or a cache configuration option. The >> number of groups could be a fixed value, or it could be calculated >> dynamically by the maximum number of nodes in the group. >> >> After splitting the whole cluster on groups we will take the streamer node >> in the each group and submit the task for loading the cache similar to the >> single streamer scenario, except as the only keys will be passed to >> the IgniteDataStreamer.addData method those correspond to the cluster >> group >> where is the streamer node running. >> >> In this case we get equal level of overhead as the parallelism, but not so >> surplus as how many nodes in whole the cluster. >> >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email]>: >> >> > Alexandr, >> > >> > Could you describe your proposal in more details? >> > Especially in case with several nodes. >> > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < >> [hidden email]> >> > wrote: >> > >> > > Hi, >> > > >> > > You know CacheStore API that is commonly used for read/write-through >> > > relationship of the in-memory data with the persistence storage. >> > > >> > > There is also IgniteCache.loadCache method for hot-loading the cache >> on >> > > startup. Invocation of this method causes execution of >> > CacheStore.loadCache >> > > on the all nodes storing the cache partitions. Because of none keys >> are >> > > passed to the CacheStore.loadCache methods, the underlying >> implementation >> > > is forced to read all the data from the persistence storage, but only >> > part >> > > of the data will be stored on each node. >> > > >> > > So, the current implementation have two general drawbacks: >> > > >> > > 1. Persistence storage is forced to perform as many identical queries >> as >> > > many nodes on the cluster. Each query may involve much additional >> > > computation on the persistence storage server. >> > > >> > > 2. Network is forced to transfer much more data, so obviously the big >> > > disadvantage on large systems. >> > > >> > > The partition-aware data loading approach, described in >> > > https://apacheignite.readme.io/docs/data-loading#section- >> > > partition-aware-data-loading >> > > , is not a choice. It requires persistence of the volatile data >> depended >> > on >> > > affinity function implementation and settings. >> > > >> > > I propose using something like IgniteDataStreamer inside >> > > IgniteCache.loadCache implementation. >> > > >> > > >> > > -- >> > > Thanks, >> > > Alexandr Kuramshin >> > > >> > >> > >> > >> > -- >> > Alexey Kuznetsov >> > >> >> >> >> -- >> Thanks, >> Alexandr Kuramshin >> > > > > -- > Alexey Kuznetsov > GridGain Systems > www.gridgain.com > |
Hi,
I just want to clarify a couple of API details from the original email to make sure that we are making the right assumptions here. *"Because of none keys are passed to the CacheStore.loadCache methods, the > underlying implementation is forced to read all the data from the > persistence storage"* According to the javadoc, loadCache(...) method receives an optional argument from the user. You can pass anything you like, including a list of keys, or an SQL where clause, etc. *"The partition-aware data loading approach is not a choice. It requires > persistence of the volatile data depended on affinity function > implementation and settings."* This is only partially true. While Ignite allows to plugin custom affinity functions, the affinity function is not something that changes dynamically and should always return the same partition for the same key.So, the partition assignments are not volatile at all. If, in some very rare case, the partition assignment logic needs to change, then you could update the partition assignments that you may have persisted elsewhere as well, e.g. database. D. On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov <[hidden email]> wrote: > Alexandr, Alexey, > > While I agree with you that current cache loading logic is far from ideal, > it would be cool to see API drafts based on your suggestions to get better > understanding of your ideas. How exactly users are going to use your > suggestions? > > My main concern is that initial load is not very trivial task in general > case. Some users have centralized RDBMS systems, some have NoSQL, others > work with distributed persistent stores (e.g. HDFS). Sometimes we have > Ignite nodes "near" persistent data, sometimes we don't. Sharding, > affinity, co-location, etc.. If we try to support all (or many) cases out > of the box, we may end up in very messy and difficult API. So we should > carefully balance between simplicity, usability and feature-rich > characteristics here. > > Personally, I think that if user is not satisfied with "loadCache()" API, > he just writes simple closure with blackjack streamer and queries and send > it to whatever node he finds convenient. Not a big deal. Only very common > cases should be added to Ignite API. > > Vladimir. > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > [hidden email]> > wrote: > > > Looks good for me. > > > > But I will suggest to consider one more use-case: > > > > If user knows its data he could manually split loading. > > For example: table Persons contains 10M rows. > > User could provide something like: > > cache.loadCache(null, "Person", "select * from Person where id < > > 1_000_000", > > "Person", "select * from Person where id >= 1_000_000 and id < > 2_000_000", > > .... > > "Person", "select * from Person where id >= 9_000_000 and id < > 10_000_000", > > ); > > > > or may be it could be some descriptor object like > > > > { > > sql: select * from Person where id >= ? and id < ?" > > range: 0...10_000_000 > > } > > > > In this case provided queries will be send to mach nodes as number of > > queries. > > And data will be loaded in parallel and for keys that a not local - data > > streamer > > should be used (as described Alexandr description). > > > > I think it is a good issue for Ignite 2.0 > > > > Vova, Val - what do you think? > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > [hidden email]> > > wrote: > > > >> All right, > >> > >> Let's assume a simple scenario. When the IgniteCache.loadCache is > invoked, > >> we check whether the cache is not local, and if so, then we'll initiate > >> the > >> new loading logic. > >> > >> First, we take a "streamer" node, it could be done by > >> utilizing LoadBalancingSpi, or it may be configured statically, for the > >> reason that the streamer node is running on the same host as the > >> persistence storage provider. > >> > >> After that we start the loading task on the streamer node which > >> creates IgniteDataStreamer and loads the cache with > CacheStore.loadCache. > >> Every call to IgniteBiInClosure.apply simply > >> invokes IgniteDataStreamer.addData. > >> > >> This implementation will completely relieve overhead on the persistence > >> storage provider. Network overhead is also decreased in the case of > >> partitioned caches. For two nodes we get 1-1/2 amount of data > transferred > >> by the network (1 part well be transferred from the persistence storage > to > >> the streamer, and then 1/2 from the streamer node to the another node). > >> For > >> three nodes it will be 1-2/3 and so on, up to the two times amount of > data > >> on the big clusters. > >> > >> I'd like to propose some additional optimization at this place. If we > have > >> the streamer node on the same machine as the persistence storage > provider, > >> then we completely relieve the network overhead as well. It could be a > >> some > >> special daemon node for the cache loading assigned in the cache > >> configuration, or an ordinary sever node as well. > >> > >> Certainly this calculations have been done in assumption that we have > even > >> partitioned cache with only primary nodes (without backups). In the case > >> of > >> one backup (the most frequent case I think), we get 2 amount of data > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on four, > >> and > >> so on up to the three times amount of data on the big clusters. Hence > it's > >> still better than the current implementation. In the worst case with a > >> fully replicated cache we take N+1 amount of data transferred by the > >> network (where N is the number of nodes in the cluster). But it's not a > >> problem in small clusters, and a little overhead in big clusters. And we > >> still gain the persistence storage provider optimization. > >> > >> Now let's take more complex scenario. To achieve some level of > >> parallelism, > >> we could split our cluster on several groups. It could be a parameter of > >> the IgniteCache.loadCache method or a cache configuration option. The > >> number of groups could be a fixed value, or it could be calculated > >> dynamically by the maximum number of nodes in the group. > >> > >> After splitting the whole cluster on groups we will take the streamer > node > >> in the each group and submit the task for loading the cache similar to > the > >> single streamer scenario, except as the only keys will be passed to > >> the IgniteDataStreamer.addData method those correspond to the cluster > >> group > >> where is the streamer node running. > >> > >> In this case we get equal level of overhead as the parallelism, but not > so > >> surplus as how many nodes in whole the cluster. > >> > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email]>: > >> > >> > Alexandr, > >> > > >> > Could you describe your proposal in more details? > >> > Especially in case with several nodes. > >> > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > >> [hidden email]> > >> > wrote: > >> > > >> > > Hi, > >> > > > >> > > You know CacheStore API that is commonly used for read/write-through > >> > > relationship of the in-memory data with the persistence storage. > >> > > > >> > > There is also IgniteCache.loadCache method for hot-loading the cache > >> on > >> > > startup. Invocation of this method causes execution of > >> > CacheStore.loadCache > >> > > on the all nodes storing the cache partitions. Because of none keys > >> are > >> > > passed to the CacheStore.loadCache methods, the underlying > >> implementation > >> > > is forced to read all the data from the persistence storage, but > only > >> > part > >> > > of the data will be stored on each node. > >> > > > >> > > So, the current implementation have two general drawbacks: > >> > > > >> > > 1. Persistence storage is forced to perform as many identical > queries > >> as > >> > > many nodes on the cluster. Each query may involve much additional > >> > > computation on the persistence storage server. > >> > > > >> > > 2. Network is forced to transfer much more data, so obviously the > big > >> > > disadvantage on large systems. > >> > > > >> > > The partition-aware data loading approach, described in > >> > > https://apacheignite.readme.io/docs/data-loading#section- > >> > > partition-aware-data-loading > >> > > , is not a choice. It requires persistence of the volatile data > >> depended > >> > on > >> > > affinity function implementation and settings. > >> > > > >> > > I propose using something like IgniteDataStreamer inside > >> > > IgniteCache.loadCache implementation. > >> > > > >> > > > >> > > -- > >> > > Thanks, > >> > > Alexandr Kuramshin > >> > > > >> > > >> > > >> > > >> > -- > >> > Alexey Kuznetsov > >> > > >> > >> > >> > >> -- > >> Thanks, > >> Alexandr Kuramshin > >> > > > > > > > > -- > > Alexey Kuznetsov > > GridGain Systems > > www.gridgain.com > > > |
Hi Aleksandr,
Data streamer is already outlined as one of the possible approaches for loading the data [1]. Basically, you start a designated client node or chose a leader among server nodes [1] and then use IgniteDataStreamer API to load the data. With this approach there is no need to have the CacheStore implementation at all. Can you please elaborate what additional value are you trying to add here? [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer [2] https://apacheignite.readme.io/docs/leader-election -Val On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan <[hidden email]> wrote: > Hi, > > I just want to clarify a couple of API details from the original email to > make sure that we are making the right assumptions here. > > *"Because of none keys are passed to the CacheStore.loadCache methods, the > > underlying implementation is forced to read all the data from the > > persistence storage"* > > > According to the javadoc, loadCache(...) method receives an optional > argument from the user. You can pass anything you like, including a list of > keys, or an SQL where clause, etc. > > *"The partition-aware data loading approach is not a choice. It requires > > persistence of the volatile data depended on affinity function > > implementation and settings."* > > > This is only partially true. While Ignite allows to plugin custom affinity > functions, the affinity function is not something that changes dynamically > and should always return the same partition for the same key.So, the > partition assignments are not volatile at all. If, in some very rare case, > the partition assignment logic needs to change, then you could update the > partition assignments that you may have persisted elsewhere as well, e.g. > database. > > D. > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov <[hidden email]> > wrote: > > > Alexandr, Alexey, > > > > While I agree with you that current cache loading logic is far from > ideal, > > it would be cool to see API drafts based on your suggestions to get > better > > understanding of your ideas. How exactly users are going to use your > > suggestions? > > > > My main concern is that initial load is not very trivial task in general > > case. Some users have centralized RDBMS systems, some have NoSQL, others > > work with distributed persistent stores (e.g. HDFS). Sometimes we have > > Ignite nodes "near" persistent data, sometimes we don't. Sharding, > > affinity, co-location, etc.. If we try to support all (or many) cases out > > of the box, we may end up in very messy and difficult API. So we should > > carefully balance between simplicity, usability and feature-rich > > characteristics here. > > > > Personally, I think that if user is not satisfied with "loadCache()" API, > > he just writes simple closure with blackjack streamer and queries and > send > > it to whatever node he finds convenient. Not a big deal. Only very common > > cases should be added to Ignite API. > > > > Vladimir. > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > [hidden email]> > > wrote: > > > > > Looks good for me. > > > > > > But I will suggest to consider one more use-case: > > > > > > If user knows its data he could manually split loading. > > > For example: table Persons contains 10M rows. > > > User could provide something like: > > > cache.loadCache(null, "Person", "select * from Person where id < > > > 1_000_000", > > > "Person", "select * from Person where id >= 1_000_000 and id < > > 2_000_000", > > > .... > > > "Person", "select * from Person where id >= 9_000_000 and id < > > 10_000_000", > > > ); > > > > > > or may be it could be some descriptor object like > > > > > > { > > > sql: select * from Person where id >= ? and id < ?" > > > range: 0...10_000_000 > > > } > > > > > > In this case provided queries will be send to mach nodes as number of > > > queries. > > > And data will be loaded in parallel and for keys that a not local - > data > > > streamer > > > should be used (as described Alexandr description). > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > Vova, Val - what do you think? > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > [hidden email]> > > > wrote: > > > > > >> All right, > > >> > > >> Let's assume a simple scenario. When the IgniteCache.loadCache is > > invoked, > > >> we check whether the cache is not local, and if so, then we'll > initiate > > >> the > > >> new loading logic. > > >> > > >> First, we take a "streamer" node, it could be done by > > >> utilizing LoadBalancingSpi, or it may be configured statically, for > the > > >> reason that the streamer node is running on the same host as the > > >> persistence storage provider. > > >> > > >> After that we start the loading task on the streamer node which > > >> creates IgniteDataStreamer and loads the cache with > > CacheStore.loadCache. > > >> Every call to IgniteBiInClosure.apply simply > > >> invokes IgniteDataStreamer.addData. > > >> > > >> This implementation will completely relieve overhead on the > persistence > > >> storage provider. Network overhead is also decreased in the case of > > >> partitioned caches. For two nodes we get 1-1/2 amount of data > > transferred > > >> by the network (1 part well be transferred from the persistence > storage > > to > > >> the streamer, and then 1/2 from the streamer node to the another > node). > > >> For > > >> three nodes it will be 1-2/3 and so on, up to the two times amount of > > data > > >> on the big clusters. > > >> > > >> I'd like to propose some additional optimization at this place. If we > > have > > >> the streamer node on the same machine as the persistence storage > > provider, > > >> then we completely relieve the network overhead as well. It could be a > > >> some > > >> special daemon node for the cache loading assigned in the cache > > >> configuration, or an ordinary sever node as well. > > >> > > >> Certainly this calculations have been done in assumption that we have > > even > > >> partitioned cache with only primary nodes (without backups). In the > case > > >> of > > >> one backup (the most frequent case I think), we get 2 amount of data > > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on > four, > > >> and > > >> so on up to the three times amount of data on the big clusters. Hence > > it's > > >> still better than the current implementation. In the worst case with a > > >> fully replicated cache we take N+1 amount of data transferred by the > > >> network (where N is the number of nodes in the cluster). But it's not > a > > >> problem in small clusters, and a little overhead in big clusters. And > we > > >> still gain the persistence storage provider optimization. > > >> > > >> Now let's take more complex scenario. To achieve some level of > > >> parallelism, > > >> we could split our cluster on several groups. It could be a parameter > of > > >> the IgniteCache.loadCache method or a cache configuration option. The > > >> number of groups could be a fixed value, or it could be calculated > > >> dynamically by the maximum number of nodes in the group. > > >> > > >> After splitting the whole cluster on groups we will take the streamer > > node > > >> in the each group and submit the task for loading the cache similar to > > the > > >> single streamer scenario, except as the only keys will be passed to > > >> the IgniteDataStreamer.addData method those correspond to the cluster > > >> group > > >> where is the streamer node running. > > >> > > >> In this case we get equal level of overhead as the parallelism, but > not > > so > > >> surplus as how many nodes in whole the cluster. > > >> > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email]>: > > >> > > >> > Alexandr, > > >> > > > >> > Could you describe your proposal in more details? > > >> > Especially in case with several nodes. > > >> > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > >> [hidden email]> > > >> > wrote: > > >> > > > >> > > Hi, > > >> > > > > >> > > You know CacheStore API that is commonly used for > read/write-through > > >> > > relationship of the in-memory data with the persistence storage. > > >> > > > > >> > > There is also IgniteCache.loadCache method for hot-loading the > cache > > >> on > > >> > > startup. Invocation of this method causes execution of > > >> > CacheStore.loadCache > > >> > > on the all nodes storing the cache partitions. Because of none > keys > > >> are > > >> > > passed to the CacheStore.loadCache methods, the underlying > > >> implementation > > >> > > is forced to read all the data from the persistence storage, but > > only > > >> > part > > >> > > of the data will be stored on each node. > > >> > > > > >> > > So, the current implementation have two general drawbacks: > > >> > > > > >> > > 1. Persistence storage is forced to perform as many identical > > queries > > >> as > > >> > > many nodes on the cluster. Each query may involve much additional > > >> > > computation on the persistence storage server. > > >> > > > > >> > > 2. Network is forced to transfer much more data, so obviously the > > big > > >> > > disadvantage on large systems. > > >> > > > > >> > > The partition-aware data loading approach, described in > > >> > > https://apacheignite.readme.io/docs/data-loading#section- > > >> > > partition-aware-data-loading > > >> > > , is not a choice. It requires persistence of the volatile data > > >> depended > > >> > on > > >> > > affinity function implementation and settings. > > >> > > > > >> > > I propose using something like IgniteDataStreamer inside > > >> > > IgniteCache.loadCache implementation. > > >> > > > > >> > > > > >> > > -- > > >> > > Thanks, > > >> > > Alexandr Kuramshin > > >> > > > > >> > > > >> > > > >> > > > >> > -- > > >> > Alexey Kuznetsov > > >> > > > >> > > >> > > >> > > >> -- > > >> Thanks, > > >> Alexandr Kuramshin > > >> > > > > > > > > > > > > -- > > > Alexey Kuznetsov > > > GridGain Systems > > > www.gridgain.com > > > > > > |
As far as I can understand Alex was trying to avoid the scenario when user
needs to bring 1Tb dataset to each node of 50 nodes cluster and then discard 49/50 of data loaded. For me this seems to be a very good catch. However, I agree with Val that this may be implemented apart from store and user can continue using store for read/write-through and there is probably no need to alter any API. Maybe we need to outline Val's suggestion in the documentation and describe this as one of the possible scenarios. Thoughts? --Yakov |
In reply to this post by Valentin Kulichenko
Hi all,
I think the discussion goes a wrong direction. Certainly it's not a big deal to implement some custom user logic to load the data into caches. But Ignite framework gives the user some reusable code build on top of the basic system. So the main question is: Why developers let the user to use convenient way to load caches with totally non-optimal solution? We could talk too much about different persistence storage types, but whenever we initiate the loading with IgniteCache.loadCache the current implementation imposes much overhead on the network. Partition-aware data loading may be used in some scenarios to avoid this network overhead, but the users are compelled to do additional steps to achieve this optimization: adding the column to tables, adding compound indices including the added column, write a peace of repeatable code to load the data in different caches in fault-tolerant fashion, etc. Let's give the user the reusable code which is convenient, reliable and fast. 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < [hidden email]>: > Hi Aleksandr, > > Data streamer is already outlined as one of the possible approaches for > loading the data [1]. Basically, you start a designated client node or > chose a leader among server nodes [1] and then use IgniteDataStreamer API > to load the data. With this approach there is no need to have the > CacheStore implementation at all. Can you please elaborate what additional > value are you trying to add here? > > [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer > [2] https://apacheignite.readme.io/docs/leader-election > > -Val > > On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan <[hidden email]> > wrote: > > > Hi, > > > > I just want to clarify a couple of API details from the original email to > > make sure that we are making the right assumptions here. > > > > *"Because of none keys are passed to the CacheStore.loadCache methods, > the > > > underlying implementation is forced to read all the data from the > > > persistence storage"* > > > > > > According to the javadoc, loadCache(...) method receives an optional > > argument from the user. You can pass anything you like, including a list > of > > keys, or an SQL where clause, etc. > > > > *"The partition-aware data loading approach is not a choice. It requires > > > persistence of the volatile data depended on affinity function > > > implementation and settings."* > > > > > > This is only partially true. While Ignite allows to plugin custom > affinity > > functions, the affinity function is not something that changes > dynamically > > and should always return the same partition for the same key.So, the > > partition assignments are not volatile at all. If, in some very rare > case, > > the partition assignment logic needs to change, then you could update the > > partition assignments that you may have persisted elsewhere as well, e.g. > > database. > > > > D. > > > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov <[hidden email]> > > wrote: > > > > > Alexandr, Alexey, > > > > > > While I agree with you that current cache loading logic is far from > > ideal, > > > it would be cool to see API drafts based on your suggestions to get > > better > > > understanding of your ideas. How exactly users are going to use your > > > suggestions? > > > > > > My main concern is that initial load is not very trivial task in > general > > > case. Some users have centralized RDBMS systems, some have NoSQL, > others > > > work with distributed persistent stores (e.g. HDFS). Sometimes we have > > > Ignite nodes "near" persistent data, sometimes we don't. Sharding, > > > affinity, co-location, etc.. If we try to support all (or many) cases > out > > > of the box, we may end up in very messy and difficult API. So we should > > > carefully balance between simplicity, usability and feature-rich > > > characteristics here. > > > > > > Personally, I think that if user is not satisfied with "loadCache()" > API, > > > he just writes simple closure with blackjack streamer and queries and > > send > > > it to whatever node he finds convenient. Not a big deal. Only very > common > > > cases should be added to Ignite API. > > > > > > Vladimir. > > > > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > > [hidden email]> > > > wrote: > > > > > > > Looks good for me. > > > > > > > > But I will suggest to consider one more use-case: > > > > > > > > If user knows its data he could manually split loading. > > > > For example: table Persons contains 10M rows. > > > > User could provide something like: > > > > cache.loadCache(null, "Person", "select * from Person where id < > > > > 1_000_000", > > > > "Person", "select * from Person where id >= 1_000_000 and id < > > > 2_000_000", > > > > .... > > > > "Person", "select * from Person where id >= 9_000_000 and id < > > > 10_000_000", > > > > ); > > > > > > > > or may be it could be some descriptor object like > > > > > > > > { > > > > sql: select * from Person where id >= ? and id < ?" > > > > range: 0...10_000_000 > > > > } > > > > > > > > In this case provided queries will be send to mach nodes as number of > > > > queries. > > > > And data will be loaded in parallel and for keys that a not local - > > data > > > > streamer > > > > should be used (as described Alexandr description). > > > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > > > Vova, Val - what do you think? > > > > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > > [hidden email]> > > > > wrote: > > > > > > > >> All right, > > > >> > > > >> Let's assume a simple scenario. When the IgniteCache.loadCache is > > > invoked, > > > >> we check whether the cache is not local, and if so, then we'll > > initiate > > > >> the > > > >> new loading logic. > > > >> > > > >> First, we take a "streamer" node, it could be done by > > > >> utilizing LoadBalancingSpi, or it may be configured statically, for > > the > > > >> reason that the streamer node is running on the same host as the > > > >> persistence storage provider. > > > >> > > > >> After that we start the loading task on the streamer node which > > > >> creates IgniteDataStreamer and loads the cache with > > > CacheStore.loadCache. > > > >> Every call to IgniteBiInClosure.apply simply > > > >> invokes IgniteDataStreamer.addData. > > > >> > > > >> This implementation will completely relieve overhead on the > > persistence > > > >> storage provider. Network overhead is also decreased in the case of > > > >> partitioned caches. For two nodes we get 1-1/2 amount of data > > > transferred > > > >> by the network (1 part well be transferred from the persistence > > storage > > > to > > > >> the streamer, and then 1/2 from the streamer node to the another > > node). > > > >> For > > > >> three nodes it will be 1-2/3 and so on, up to the two times amount > of > > > data > > > >> on the big clusters. > > > >> > > > >> I'd like to propose some additional optimization at this place. If > we > > > have > > > >> the streamer node on the same machine as the persistence storage > > > provider, > > > >> then we completely relieve the network overhead as well. It could > be a > > > >> some > > > >> special daemon node for the cache loading assigned in the cache > > > >> configuration, or an ordinary sever node as well. > > > >> > > > >> Certainly this calculations have been done in assumption that we > have > > > even > > > >> partitioned cache with only primary nodes (without backups). In the > > case > > > >> of > > > >> one backup (the most frequent case I think), we get 2 amount of data > > > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on > > four, > > > >> and > > > >> so on up to the three times amount of data on the big clusters. > Hence > > > it's > > > >> still better than the current implementation. In the worst case > with a > > > >> fully replicated cache we take N+1 amount of data transferred by the > > > >> network (where N is the number of nodes in the cluster). But it's > not > > a > > > >> problem in small clusters, and a little overhead in big clusters. > And > > we > > > >> still gain the persistence storage provider optimization. > > > >> > > > >> Now let's take more complex scenario. To achieve some level of > > > >> parallelism, > > > >> we could split our cluster on several groups. It could be a > parameter > > of > > > >> the IgniteCache.loadCache method or a cache configuration option. > The > > > >> number of groups could be a fixed value, or it could be calculated > > > >> dynamically by the maximum number of nodes in the group. > > > >> > > > >> After splitting the whole cluster on groups we will take the > streamer > > > node > > > >> in the each group and submit the task for loading the cache similar > to > > > the > > > >> single streamer scenario, except as the only keys will be passed to > > > >> the IgniteDataStreamer.addData method those correspond to the > cluster > > > >> group > > > >> where is the streamer node running. > > > >> > > > >> In this case we get equal level of overhead as the parallelism, but > > not > > > so > > > >> surplus as how many nodes in whole the cluster. > > > >> > > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov <[hidden email] > >: > > > >> > > > >> > Alexandr, > > > >> > > > > >> > Could you describe your proposal in more details? > > > >> > Especially in case with several nodes. > > > >> > > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > > >> [hidden email]> > > > >> > wrote: > > > >> > > > > >> > > Hi, > > > >> > > > > > >> > > You know CacheStore API that is commonly used for > > read/write-through > > > >> > > relationship of the in-memory data with the persistence storage. > > > >> > > > > > >> > > There is also IgniteCache.loadCache method for hot-loading the > > cache > > > >> on > > > >> > > startup. Invocation of this method causes execution of > > > >> > CacheStore.loadCache > > > >> > > on the all nodes storing the cache partitions. Because of none > > keys > > > >> are > > > >> > > passed to the CacheStore.loadCache methods, the underlying > > > >> implementation > > > >> > > is forced to read all the data from the persistence storage, but > > > only > > > >> > part > > > >> > > of the data will be stored on each node. > > > >> > > > > > >> > > So, the current implementation have two general drawbacks: > > > >> > > > > > >> > > 1. Persistence storage is forced to perform as many identical > > > queries > > > >> as > > > >> > > many nodes on the cluster. Each query may involve much > additional > > > >> > > computation on the persistence storage server. > > > >> > > > > > >> > > 2. Network is forced to transfer much more data, so obviously > the > > > big > > > >> > > disadvantage on large systems. > > > >> > > > > > >> > > The partition-aware data loading approach, described in > > > >> > > https://apacheignite.readme.io/docs/data-loading#section- > > > >> > > partition-aware-data-loading > > > >> > > , is not a choice. It requires persistence of the volatile data > > > >> depended > > > >> > on > > > >> > > affinity function implementation and settings. > > > >> > > > > > >> > > I propose using something like IgniteDataStreamer inside > > > >> > > IgniteCache.loadCache implementation. > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > Thanks, > > > >> > > Alexandr Kuramshin > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > Alexey Kuznetsov > > > >> > > > > >> > > > >> > > > >> > > > >> -- > > > >> Thanks, > > > >> Alexandr Kuramshin > > > >> > > > > > > > > > > > > > > > > -- > > > > Alexey Kuznetsov > > > > GridGain Systems > > > > www.gridgain.com > > > > > > > > > > -- Thanks, Alexandr Kuramshin |
In reply to this post by yzhdanov
On Tue, Nov 15, 2016 at 9:07 AM, Yakov Zhdanov <[hidden email]> wrote:
> As far as I can understand Alex was trying to avoid the scenario when user > needs to bring 1Tb dataset to each node of 50 nodes cluster and then > discard 49/50 of data loaded. For me this seems to be a very good catch. > Yakov, I agree that such scenario should be avoided. I also think that loadCache(...) method, as it is right now, provides a way to avoid it. DataStreamer also seems like an option here, but in this case, loadCache(...) method should not be used at all, to my understanding. |
In reply to this post by Alexandr Kuramshin
Hi Alex,
>>> Let's give the user the reusable code which is convenient, reliable and fast. Convenience - this is why I asked for example on how API can look like and how users are going to use it. Vladimir. On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin <[hidden email]> wrote: > Hi all, > > I think the discussion goes a wrong direction. Certainly it's not a big > deal to implement some custom user logic to load the data into caches. But > Ignite framework gives the user some reusable code build on top of the > basic system. > > So the main question is: Why developers let the user to use convenient way > to load caches with totally non-optimal solution? > > We could talk too much about different persistence storage types, but > whenever we initiate the loading with IgniteCache.loadCache the current > implementation imposes much overhead on the network. > > Partition-aware data loading may be used in some scenarios to avoid this > network overhead, but the users are compelled to do additional steps to > achieve this optimization: adding the column to tables, adding compound > indices including the added column, write a peace of repeatable code to > load the data in different caches in fault-tolerant fashion, etc. > > Let's give the user the reusable code which is convenient, reliable and > fast. > > 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > [hidden email]>: > > > Hi Aleksandr, > > > > Data streamer is already outlined as one of the possible approaches for > > loading the data [1]. Basically, you start a designated client node or > > chose a leader among server nodes [1] and then use IgniteDataStreamer API > > to load the data. With this approach there is no need to have the > > CacheStore implementation at all. Can you please elaborate what > additional > > value are you trying to add here? > > > > [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer > > [2] https://apacheignite.readme.io/docs/leader-election > > > > -Val > > > > On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > [hidden email]> > > wrote: > > > > > Hi, > > > > > > I just want to clarify a couple of API details from the original email > to > > > make sure that we are making the right assumptions here. > > > > > > *"Because of none keys are passed to the CacheStore.loadCache methods, > > the > > > > underlying implementation is forced to read all the data from the > > > > persistence storage"* > > > > > > > > > According to the javadoc, loadCache(...) method receives an optional > > > argument from the user. You can pass anything you like, including a > list > > of > > > keys, or an SQL where clause, etc. > > > > > > *"The partition-aware data loading approach is not a choice. It > requires > > > > persistence of the volatile data depended on affinity function > > > > implementation and settings."* > > > > > > > > > This is only partially true. While Ignite allows to plugin custom > > affinity > > > functions, the affinity function is not something that changes > > dynamically > > > and should always return the same partition for the same key.So, the > > > partition assignments are not volatile at all. If, in some very rare > > case, > > > the partition assignment logic needs to change, then you could update > the > > > partition assignments that you may have persisted elsewhere as well, > e.g. > > > database. > > > > > > D. > > > > > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > [hidden email]> > > > wrote: > > > > > > > Alexandr, Alexey, > > > > > > > > While I agree with you that current cache loading logic is far from > > > ideal, > > > > it would be cool to see API drafts based on your suggestions to get > > > better > > > > understanding of your ideas. How exactly users are going to use your > > > > suggestions? > > > > > > > > My main concern is that initial load is not very trivial task in > > general > > > > case. Some users have centralized RDBMS systems, some have NoSQL, > > others > > > > work with distributed persistent stores (e.g. HDFS). Sometimes we > have > > > > Ignite nodes "near" persistent data, sometimes we don't. Sharding, > > > > affinity, co-location, etc.. If we try to support all (or many) cases > > out > > > > of the box, we may end up in very messy and difficult API. So we > should > > > > carefully balance between simplicity, usability and feature-rich > > > > characteristics here. > > > > > > > > Personally, I think that if user is not satisfied with "loadCache()" > > API, > > > > he just writes simple closure with blackjack streamer and queries and > > > send > > > > it to whatever node he finds convenient. Not a big deal. Only very > > common > > > > cases should be added to Ignite API. > > > > > > > > Vladimir. > > > > > > > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > > > [hidden email]> > > > > wrote: > > > > > > > > > Looks good for me. > > > > > > > > > > But I will suggest to consider one more use-case: > > > > > > > > > > If user knows its data he could manually split loading. > > > > > For example: table Persons contains 10M rows. > > > > > User could provide something like: > > > > > cache.loadCache(null, "Person", "select * from Person where id < > > > > > 1_000_000", > > > > > "Person", "select * from Person where id >= 1_000_000 and id < > > > > 2_000_000", > > > > > .... > > > > > "Person", "select * from Person where id >= 9_000_000 and id < > > > > 10_000_000", > > > > > ); > > > > > > > > > > or may be it could be some descriptor object like > > > > > > > > > > { > > > > > sql: select * from Person where id >= ? and id < ?" > > > > > range: 0...10_000_000 > > > > > } > > > > > > > > > > In this case provided queries will be send to mach nodes as number > of > > > > > queries. > > > > > And data will be loaded in parallel and for keys that a not local - > > > data > > > > > streamer > > > > > should be used (as described Alexandr description). > > > > > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > > > > > Vova, Val - what do you think? > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > > > [hidden email]> > > > > > wrote: > > > > > > > > > >> All right, > > > > >> > > > > >> Let's assume a simple scenario. When the IgniteCache.loadCache is > > > > invoked, > > > > >> we check whether the cache is not local, and if so, then we'll > > > initiate > > > > >> the > > > > >> new loading logic. > > > > >> > > > > >> First, we take a "streamer" node, it could be done by > > > > >> utilizing LoadBalancingSpi, or it may be configured statically, > for > > > the > > > > >> reason that the streamer node is running on the same host as the > > > > >> persistence storage provider. > > > > >> > > > > >> After that we start the loading task on the streamer node which > > > > >> creates IgniteDataStreamer and loads the cache with > > > > CacheStore.loadCache. > > > > >> Every call to IgniteBiInClosure.apply simply > > > > >> invokes IgniteDataStreamer.addData. > > > > >> > > > > >> This implementation will completely relieve overhead on the > > > persistence > > > > >> storage provider. Network overhead is also decreased in the case > of > > > > >> partitioned caches. For two nodes we get 1-1/2 amount of data > > > > transferred > > > > >> by the network (1 part well be transferred from the persistence > > > storage > > > > to > > > > >> the streamer, and then 1/2 from the streamer node to the another > > > node). > > > > >> For > > > > >> three nodes it will be 1-2/3 and so on, up to the two times amount > > of > > > > data > > > > >> on the big clusters. > > > > >> > > > > >> I'd like to propose some additional optimization at this place. If > > we > > > > have > > > > >> the streamer node on the same machine as the persistence storage > > > > provider, > > > > >> then we completely relieve the network overhead as well. It could > > be a > > > > >> some > > > > >> special daemon node for the cache loading assigned in the cache > > > > >> configuration, or an ordinary sever node as well. > > > > >> > > > > >> Certainly this calculations have been done in assumption that we > > have > > > > even > > > > >> partitioned cache with only primary nodes (without backups). In > the > > > case > > > > >> of > > > > >> one backup (the most frequent case I think), we get 2 amount of > data > > > > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 on > > > four, > > > > >> and > > > > >> so on up to the three times amount of data on the big clusters. > > Hence > > > > it's > > > > >> still better than the current implementation. In the worst case > > with a > > > > >> fully replicated cache we take N+1 amount of data transferred by > the > > > > >> network (where N is the number of nodes in the cluster). But it's > > not > > > a > > > > >> problem in small clusters, and a little overhead in big clusters. > > And > > > we > > > > >> still gain the persistence storage provider optimization. > > > > >> > > > > >> Now let's take more complex scenario. To achieve some level of > > > > >> parallelism, > > > > >> we could split our cluster on several groups. It could be a > > parameter > > > of > > > > >> the IgniteCache.loadCache method or a cache configuration option. > > The > > > > >> number of groups could be a fixed value, or it could be calculated > > > > >> dynamically by the maximum number of nodes in the group. > > > > >> > > > > >> After splitting the whole cluster on groups we will take the > > streamer > > > > node > > > > >> in the each group and submit the task for loading the cache > similar > > to > > > > the > > > > >> single streamer scenario, except as the only keys will be passed > to > > > > >> the IgniteDataStreamer.addData method those correspond to the > > cluster > > > > >> group > > > > >> where is the streamer node running. > > > > >> > > > > >> In this case we get equal level of overhead as the parallelism, > but > > > not > > > > so > > > > >> surplus as how many nodes in whole the cluster. > > > > >> > > > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > [hidden email] > > >: > > > > >> > > > > >> > Alexandr, > > > > >> > > > > > >> > Could you describe your proposal in more details? > > > > >> > Especially in case with several nodes. > > > > >> > > > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > > > >> [hidden email]> > > > > >> > wrote: > > > > >> > > > > > >> > > Hi, > > > > >> > > > > > > >> > > You know CacheStore API that is commonly used for > > > read/write-through > > > > >> > > relationship of the in-memory data with the persistence > storage. > > > > >> > > > > > > >> > > There is also IgniteCache.loadCache method for hot-loading the > > > cache > > > > >> on > > > > >> > > startup. Invocation of this method causes execution of > > > > >> > CacheStore.loadCache > > > > >> > > on the all nodes storing the cache partitions. Because of none > > > keys > > > > >> are > > > > >> > > passed to the CacheStore.loadCache methods, the underlying > > > > >> implementation > > > > >> > > is forced to read all the data from the persistence storage, > but > > > > only > > > > >> > part > > > > >> > > of the data will be stored on each node. > > > > >> > > > > > > >> > > So, the current implementation have two general drawbacks: > > > > >> > > > > > > >> > > 1. Persistence storage is forced to perform as many identical > > > > queries > > > > >> as > > > > >> > > many nodes on the cluster. Each query may involve much > > additional > > > > >> > > computation on the persistence storage server. > > > > >> > > > > > > >> > > 2. Network is forced to transfer much more data, so obviously > > the > > > > big > > > > >> > > disadvantage on large systems. > > > > >> > > > > > > >> > > The partition-aware data loading approach, described in > > > > >> > > https://apacheignite.readme.io/docs/data-loading#section- > > > > >> > > partition-aware-data-loading > > > > >> > > , is not a choice. It requires persistence of the volatile > data > > > > >> depended > > > > >> > on > > > > >> > > affinity function implementation and settings. > > > > >> > > > > > > >> > > I propose using something like IgniteDataStreamer inside > > > > >> > > IgniteCache.loadCache implementation. > > > > >> > > > > > > >> > > > > > > >> > > -- > > > > >> > > Thanks, > > > > >> > > Alexandr Kuramshin > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > -- > > > > >> > Alexey Kuznetsov > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> -- > > > > >> Thanks, > > > > >> Alexandr Kuramshin > > > > >> > > > > > > > > > > > > > > > > > > > > -- > > > > > Alexey Kuznetsov > > > > > GridGain Systems > > > > > www.gridgain.com > > > > > > > > > > > > > > > > > > -- > Thanks, > Alexandr Kuramshin > |
Hi Vladimir,
I don't offer any changes in API. Usage scenario is the same as it was described in https://apacheignite.readme.io/docs/persistent-store#section-loadcache- The preload cache logic invokes IgniteCache.loadCache() with some additional arguments, depending on a CacheStore implementation, and then the loading occurs in the way I've already described. 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > Hi Alex, > > >>> Let's give the user the reusable code which is convenient, reliable and > fast. > Convenience - this is why I asked for example on how API can look like and > how users are going to use it. > > Vladimir. > > On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin <[hidden email] > > > wrote: > > > Hi all, > > > > I think the discussion goes a wrong direction. Certainly it's not a big > > deal to implement some custom user logic to load the data into caches. > But > > Ignite framework gives the user some reusable code build on top of the > > basic system. > > > > So the main question is: Why developers let the user to use convenient > way > > to load caches with totally non-optimal solution? > > > > We could talk too much about different persistence storage types, but > > whenever we initiate the loading with IgniteCache.loadCache the current > > implementation imposes much overhead on the network. > > > > Partition-aware data loading may be used in some scenarios to avoid this > > network overhead, but the users are compelled to do additional steps to > > achieve this optimization: adding the column to tables, adding compound > > indices including the added column, write a peace of repeatable code to > > load the data in different caches in fault-tolerant fashion, etc. > > > > Let's give the user the reusable code which is convenient, reliable and > > fast. > > > > 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > > [hidden email]>: > > > > > Hi Aleksandr, > > > > > > Data streamer is already outlined as one of the possible approaches for > > > loading the data [1]. Basically, you start a designated client node or > > > chose a leader among server nodes [1] and then use IgniteDataStreamer > API > > > to load the data. With this approach there is no need to have the > > > CacheStore implementation at all. Can you please elaborate what > > additional > > > value are you trying to add here? > > > > > > [1] https://apacheignite.readme.io/docs/data-loading# > ignitedatastreamer > > > [2] https://apacheignite.readme.io/docs/leader-election > > > > > > -Val > > > > > > On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > > [hidden email]> > > > wrote: > > > > > > > Hi, > > > > > > > > I just want to clarify a couple of API details from the original > > to > > > > make sure that we are making the right assumptions here. > > > > > > > > *"Because of none keys are passed to the CacheStore.loadCache > methods, > > > the > > > > > underlying implementation is forced to read all the data from the > > > > > persistence storage"* > > > > > > > > > > > > According to the javadoc, loadCache(...) method receives an optional > > > > argument from the user. You can pass anything you like, including a > > list > > > of > > > > keys, or an SQL where clause, etc. > > > > > > > > *"The partition-aware data loading approach is not a choice. It > > requires > > > > > persistence of the volatile data depended on affinity function > > > > > implementation and settings."* > > > > > > > > > > > > This is only partially true. While Ignite allows to plugin custom > > > affinity > > > > functions, the affinity function is not something that changes > > > dynamically > > > > and should always return the same partition for the same key.So, the > > > > partition assignments are not volatile at all. If, in some very rare > > > case, > > > > the partition assignment logic needs to change, then you could update > > the > > > > partition assignments that you may have persisted elsewhere as well, > > e.g. > > > > database. > > > > > > > > D. > > > > > > > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > > [hidden email]> > > > > wrote: > > > > > > > > > Alexandr, Alexey, > > > > > > > > > > While I agree with you that current cache loading logic is far from > > > > ideal, > > > > > it would be cool to see API drafts based on your suggestions to get > > > > better > > > > > understanding of your ideas. How exactly users are going to use > your > > > > > suggestions? > > > > > > > > > > My main concern is that initial load is not very trivial task in > > > general > > > > > case. Some users have centralized RDBMS systems, some have NoSQL, > > > others > > > > > work with distributed persistent stores (e.g. HDFS). Sometimes we > > have > > > > > Ignite nodes "near" persistent data, sometimes we don't. Sharding, > > > > > affinity, co-location, etc.. If we try to support all (or many) > cases > > > out > > > > > of the box, we may end up in very messy and difficult API. So we > > should > > > > > carefully balance between simplicity, usability and feature-rich > > > > > characteristics here. > > > > > > > > > > Personally, I think that if user is not satisfied with > "loadCache()" > > > API, > > > > > he just writes simple closure with blackjack streamer and queries > and > > > > send > > > > > it to whatever node he finds convenient. Not a big deal. Only very > > > common > > > > > cases should be added to Ignite API. > > > > > > > > > > Vladimir. > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > > > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Looks good for me. > > > > > > > > > > > > But I will suggest to consider one more use-case: > > > > > > > > > > > > If user knows its data he could manually split loading. > > > > > > For example: table Persons contains 10M rows. > > > > > > User could provide something like: > > > > > > cache.loadCache(null, "Person", "select * from Person where id < > > > > > > 1_000_000", > > > > > > "Person", "select * from Person where id >= 1_000_000 and id < > > > > > 2_000_000", > > > > > > .... > > > > > > "Person", "select * from Person where id >= 9_000_000 and id < > > > > > 10_000_000", > > > > > > ); > > > > > > > > > > > > or may be it could be some descriptor object like > > > > > > > > > > > > { > > > > > > sql: select * from Person where id >= ? and id < ?" > > > > > > range: 0...10_000_000 > > > > > > } > > > > > > > > > > > > In this case provided queries will be send to mach nodes as > number > > of > > > > > > queries. > > > > > > And data will be loaded in parallel and for keys that a not > local - > > > > data > > > > > > streamer > > > > > > should be used (as described Alexandr description). > > > > > > > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > > > > > > > Vova, Val - what do you think? > > > > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > >> All right, > > > > > >> > > > > > >> Let's assume a simple scenario. When the IgniteCache.loadCache > is > > > > > invoked, > > > > > >> we check whether the cache is not local, and if so, then we'll > > > > initiate > > > > > >> the > > > > > >> new loading logic. > > > > > >> > > > > > >> First, we take a "streamer" node, it could be done by > > > > > >> utilizing LoadBalancingSpi, or it may be configured statically, > > for > > > > the > > > > > >> reason that the streamer node is running on the same host as the > > > > > >> persistence storage provider. > > > > > >> > > > > > >> After that we start the loading task on the streamer node which > > > > > >> creates IgniteDataStreamer and loads the cache with > > > > > CacheStore.loadCache. > > > > > >> Every call to IgniteBiInClosure.apply simply > > > > > >> invokes IgniteDataStreamer.addData. > > > > > >> > > > > > >> This implementation will completely relieve overhead on the > > > > persistence > > > > > >> storage provider. Network overhead is also decreased in the case > > of > > > > > >> partitioned caches. For two nodes we get 1-1/2 amount of data > > > > > transferred > > > > > >> by the network (1 part well be transferred from the persistence > > > > storage > > > > > to > > > > > >> the streamer, and then 1/2 from the streamer node to the another > > > > node). > > > > > >> For > > > > > >> three nodes it will be 1-2/3 and so on, up to the two times > amount > > > of > > > > > data > > > > > >> on the big clusters. > > > > > >> > > > > > >> I'd like to propose some additional optimization at this place. > If > > > we > > > > > have > > > > > >> the streamer node on the same machine as the persistence storage > > > > > provider, > > > > > >> then we completely relieve the network overhead as well. It > could > > > be a > > > > > >> some > > > > > >> special daemon node for the cache loading assigned in the cache > > > > > >> configuration, or an ordinary sever node as well. > > > > > >> > > > > > >> Certainly this calculations have been done in assumption that we > > > have > > > > > even > > > > > >> partitioned cache with only primary nodes (without backups). In > > the > > > > case > > > > > >> of > > > > > >> one backup (the most frequent case I think), we get 2 amount of > > data > > > > > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 > on > > > > four, > > > > > >> and > > > > > >> so on up to the three times amount of data on the big clusters. > > > Hence > > > > > it's > > > > > >> still better than the current implementation. In the worst case > > > with a > > > > > >> fully replicated cache we take N+1 amount of data transferred by > > the > > > > > >> network (where N is the number of nodes in the cluster). But > it's > > > not > > > > a > > > > > >> problem in small clusters, and a little overhead in big > clusters. > > > And > > > > we > > > > > >> still gain the persistence storage provider optimization. > > > > > >> > > > > > >> Now let's take more complex scenario. To achieve some level of > > > > > >> parallelism, > > > > > >> we could split our cluster on several groups. It could be a > > > parameter > > > > of > > > > > >> the IgniteCache.loadCache method or a cache configuration > option. > > > The > > > > > >> number of groups could be a fixed value, or it could be > calculated > > > > > >> dynamically by the maximum number of nodes in the group. > > > > > >> > > > > > >> After splitting the whole cluster on groups we will take the > > > streamer > > > > > node > > > > > >> in the each group and submit the task for loading the cache > > similar > > > to > > > > > the > > > > > >> single streamer scenario, except as the only keys will be passed > > to > > > > > >> the IgniteDataStreamer.addData method those correspond to the > > > cluster > > > > > >> group > > > > > >> where is the streamer node running. > > > > > >> > > > > > >> In this case we get equal level of overhead as the parallelism, > > but > > > > not > > > > > so > > > > > >> surplus as how many nodes in whole the cluster. > > > > > >> > > > > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > > [hidden email] > > > >: > > > > > >> > > > > > >> > Alexandr, > > > > > >> > > > > > > >> > Could you describe your proposal in more details? > > > > > >> > Especially in case with several nodes. > > > > > >> > > > > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > > > > >> [hidden email]> > > > > > >> > wrote: > > > > > >> > > > > > > >> > > Hi, > > > > > >> > > > > > > > >> > > You know CacheStore API that is commonly used for > > > > read/write-through > > > > > >> > > relationship of the in-memory data with the persistence > > storage. > > > > > >> > > > > > > > >> > > There is also IgniteCache.loadCache method for hot-loading > the > > > > cache > > > > > >> on > > > > > >> > > startup. Invocation of this method causes execution of > > > > > >> > CacheStore.loadCache > > > > > >> > > on the all nodes storing the cache partitions. Because of > none > > > > keys > > > > > >> are > > > > > >> > > passed to the CacheStore.loadCache methods, the underlying > > > > > >> implementation > > > > > >> > > is forced to read all the data from the persistence storage, > > but > > > > > only > > > > > >> > part > > > > > >> > > of the data will be stored on each node. > > > > > >> > > > > > > > >> > > So, the current implementation have two general drawbacks: > > > > > >> > > > > > > > >> > > 1. Persistence storage is forced to perform as many > identical > > > > > queries > > > > > >> as > > > > > >> > > many nodes on the cluster. Each query may involve much > > > additional > > > > > >> > > computation on the persistence storage server. > > > > > >> > > > > > > > >> > > 2. Network is forced to transfer much more data, so > obviously > > > the > > > > > big > > > > > >> > > disadvantage on large systems. > > > > > >> > > > > > > > >> > > The partition-aware data loading approach, described in > > > > > >> > > https://apacheignite.readme.io/docs/data-loading#section- > > > > > >> > > partition-aware-data-loading > > > > > >> > > , is not a choice. It requires persistence of the volatile > > data > > > > > >> depended > > > > > >> > on > > > > > >> > > affinity function implementation and settings. > > > > > >> > > > > > > > >> > > I propose using something like IgniteDataStreamer inside > > > > > >> > > IgniteCache.loadCache implementation. > > > > > >> > > > > > > > >> > > > > > > > >> > > -- > > > > > >> > > Thanks, > > > > > >> > > Alexandr Kuramshin > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > -- > > > > > >> > Alexey Kuznetsov > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Thanks, > > > > > >> Alexandr Kuramshin > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Alexey Kuznetsov > > > > > > GridGain Systems > > > > > > www.gridgain.com > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Thanks, > > Alexandr Kuramshin > > > -- Thanks, Alexandr Kuramshin |
Hi, All!
I think we do not need to chage API at all. public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws CacheException; We could pass any args to loadCache(); So we could create class IgniteCacheLoadDescriptor { some fields that will describe how to load } and modify POJO store to detect and use such arguments. All we need is to implement this and write good documentation and examples. Thoughts? On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin <[hidden email]> wrote: > Hi Vladimir, > > I don't offer any changes in API. Usage scenario is the same as it was > described in > https://apacheignite.readme.io/docs/persistent-store#section-loadcache- > > The preload cache logic invokes IgniteCache.loadCache() with some > additional arguments, depending on a CacheStore implementation, and then > the loading occurs in the way I've already described. > > > 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > > > Hi Alex, > > > > >>> Let's give the user the reusable code which is convenient, reliable > and > > fast. > > Convenience - this is why I asked for example on how API can look like > and > > how users are going to use it. > > > > Vladimir. > > > > On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < > [hidden email] > > > > > wrote: > > > > > Hi all, > > > > > > I think the discussion goes a wrong direction. Certainly it's not a big > > > deal to implement some custom user logic to load the data into caches. > > But > > > Ignite framework gives the user some reusable code build on top of the > > > basic system. > > > > > > So the main question is: Why developers let the user to use convenient > > way > > > to load caches with totally non-optimal solution? > > > > > > We could talk too much about different persistence storage types, but > > > whenever we initiate the loading with IgniteCache.loadCache the current > > > implementation imposes much overhead on the network. > > > > > > Partition-aware data loading may be used in some scenarios to avoid > this > > > network overhead, but the users are compelled to do additional steps to > > > achieve this optimization: adding the column to tables, adding compound > > > indices including the added column, write a peace of repeatable code to > > > load the data in different caches in fault-tolerant fashion, etc. > > > > > > Let's give the user the reusable code which is convenient, reliable and > > > fast. > > > > > > 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > > > [hidden email]>: > > > > > > > Hi Aleksandr, > > > > > > > > Data streamer is already outlined as one of the possible approaches > for > > > > loading the data [1]. Basically, you start a designated client node > or > > > > chose a leader among server nodes [1] and then use IgniteDataStreamer > > API > > > > to load the data. With this approach there is no need to have the > > > > CacheStore implementation at all. Can you please elaborate what > > > additional > > > > value are you trying to add here? > > > > > > > > [1] https://apacheignite.readme.io/docs/data-loading# > > ignitedatastreamer > > > > [2] https://apacheignite.readme.io/docs/leader-election > > > > > > > > -Val > > > > > > > > On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > > > [hidden email]> > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I just want to clarify a couple of API details from the original > > > to > > > > > make sure that we are making the right assumptions here. > > > > > > > > > > *"Because of none keys are passed to the CacheStore.loadCache > > methods, > > > > the > > > > > > underlying implementation is forced to read all the data from the > > > > > > persistence storage"* > > > > > > > > > > > > > > > According to the javadoc, loadCache(...) method receives an > optional > > > > > argument from the user. You can pass anything you like, including a > > > list > > > > of > > > > > keys, or an SQL where clause, etc. > > > > > > > > > > *"The partition-aware data loading approach is not a choice. It > > > requires > > > > > > persistence of the volatile data depended on affinity function > > > > > > implementation and settings."* > > > > > > > > > > > > > > > This is only partially true. While Ignite allows to plugin custom > > > > affinity > > > > > functions, the affinity function is not something that changes > > > > dynamically > > > > > and should always return the same partition for the same key.So, > the > > > > > partition assignments are not volatile at all. If, in some very > rare > > > > case, > > > > > the partition assignment logic needs to change, then you could > update > > > the > > > > > partition assignments that you may have persisted elsewhere as > well, > > > e.g. > > > > > database. > > > > > > > > > > D. > > > > > > > > > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Alexandr, Alexey, > > > > > > > > > > > > While I agree with you that current cache loading logic is far > from > > > > > ideal, > > > > > > it would be cool to see API drafts based on your suggestions to > get > > > > > better > > > > > > understanding of your ideas. How exactly users are going to use > > your > > > > > > suggestions? > > > > > > > > > > > > My main concern is that initial load is not very trivial task in > > > > general > > > > > > case. Some users have centralized RDBMS systems, some have NoSQL, > > > > others > > > > > > work with distributed persistent stores (e.g. HDFS). Sometimes we > > > have > > > > > > Ignite nodes "near" persistent data, sometimes we don't. > Sharding, > > > > > > affinity, co-location, etc.. If we try to support all (or many) > > cases > > > > out > > > > > > of the box, we may end up in very messy and difficult API. So we > > > should > > > > > > carefully balance between simplicity, usability and feature-rich > > > > > > characteristics here. > > > > > > > > > > > > Personally, I think that if user is not satisfied with > > "loadCache()" > > > > API, > > > > > > he just writes simple closure with blackjack streamer and queries > > and > > > > > send > > > > > > it to whatever node he finds convenient. Not a big deal. Only > very > > > > common > > > > > > cases should be added to Ignite API. > > > > > > > > > > > > Vladimir. > > > > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > > > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Looks good for me. > > > > > > > > > > > > > > But I will suggest to consider one more use-case: > > > > > > > > > > > > > > If user knows its data he could manually split loading. > > > > > > > For example: table Persons contains 10M rows. > > > > > > > User could provide something like: > > > > > > > cache.loadCache(null, "Person", "select * from Person where id > < > > > > > > > 1_000_000", > > > > > > > "Person", "select * from Person where id >= 1_000_000 and id < > > > > > > 2_000_000", > > > > > > > .... > > > > > > > "Person", "select * from Person where id >= 9_000_000 and id < > > > > > > 10_000_000", > > > > > > > ); > > > > > > > > > > > > > > or may be it could be some descriptor object like > > > > > > > > > > > > > > { > > > > > > > sql: select * from Person where id >= ? and id < ?" > > > > > > > range: 0...10_000_000 > > > > > > > } > > > > > > > > > > > > > > In this case provided queries will be send to mach nodes as > > number > > > of > > > > > > > queries. > > > > > > > And data will be loaded in parallel and for keys that a not > > local - > > > > > data > > > > > > > streamer > > > > > > > should be used (as described Alexandr description). > > > > > > > > > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > > > > > > > > > Vova, Val - what do you think? > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > > > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > >> All right, > > > > > > >> > > > > > > >> Let's assume a simple scenario. When the IgniteCache.loadCache > > is > > > > > > invoked, > > > > > > >> we check whether the cache is not local, and if so, then we'll > > > > > initiate > > > > > > >> the > > > > > > >> new loading logic. > > > > > > >> > > > > > > >> First, we take a "streamer" node, it could be done by > > > > > > >> utilizing LoadBalancingSpi, or it may be configured > statically, > > > for > > > > > the > > > > > > >> reason that the streamer node is running on the same host as > the > > > > > > >> persistence storage provider. > > > > > > >> > > > > > > >> After that we start the loading task on the streamer node > which > > > > > > >> creates IgniteDataStreamer and loads the cache with > > > > > > CacheStore.loadCache. > > > > > > >> Every call to IgniteBiInClosure.apply simply > > > > > > >> invokes IgniteDataStreamer.addData. > > > > > > >> > > > > > > >> This implementation will completely relieve overhead on the > > > > > persistence > > > > > > >> storage provider. Network overhead is also decreased in the > case > > > of > > > > > > >> partitioned caches. For two nodes we get 1-1/2 amount of data > > > > > > transferred > > > > > > >> by the network (1 part well be transferred from the > persistence > > > > > storage > > > > > > to > > > > > > >> the streamer, and then 1/2 from the streamer node to the > another > > > > > node). > > > > > > >> For > > > > > > >> three nodes it will be 1-2/3 and so on, up to the two times > > amount > > > > of > > > > > > data > > > > > > >> on the big clusters. > > > > > > >> > > > > > > >> I'd like to propose some additional optimization at this > place. > > If > > > > we > > > > > > have > > > > > > >> the streamer node on the same machine as the persistence > storage > > > > > > provider, > > > > > > >> then we completely relieve the network overhead as well. It > > could > > > > be a > > > > > > >> some > > > > > > >> special daemon node for the cache loading assigned in the > cache > > > > > > >> configuration, or an ordinary sever node as well. > > > > > > >> > > > > > > >> Certainly this calculations have been done in assumption that > we > > > > have > > > > > > even > > > > > > >> partitioned cache with only primary nodes (without backups). > In > > > the > > > > > case > > > > > > >> of > > > > > > >> one backup (the most frequent case I think), we get 2 amount > of > > > data > > > > > > >> transferred by the network on two nodes, 2-1/3 on three, 2-1/2 > > on > > > > > four, > > > > > > >> and > > > > > > >> so on up to the three times amount of data on the big > clusters. > > > > Hence > > > > > > it's > > > > > > >> still better than the current implementation. In the worst > case > > > > with a > > > > > > >> fully replicated cache we take N+1 amount of data transferred > by > > > the > > > > > > >> network (where N is the number of nodes in the cluster). But > > it's > > > > not > > > > > a > > > > > > >> problem in small clusters, and a little overhead in big > > clusters. > > > > And > > > > > we > > > > > > >> still gain the persistence storage provider optimization. > > > > > > >> > > > > > > >> Now let's take more complex scenario. To achieve some level of > > > > > > >> parallelism, > > > > > > >> we could split our cluster on several groups. It could be a > > > > parameter > > > > > of > > > > > > >> the IgniteCache.loadCache method or a cache configuration > > option. > > > > The > > > > > > >> number of groups could be a fixed value, or it could be > > calculated > > > > > > >> dynamically by the maximum number of nodes in the group. > > > > > > >> > > > > > > >> After splitting the whole cluster on groups we will take the > > > > streamer > > > > > > node > > > > > > >> in the each group and submit the task for loading the cache > > > similar > > > > to > > > > > > the > > > > > > >> single streamer scenario, except as the only keys will be > passed > > > to > > > > > > >> the IgniteDataStreamer.addData method those correspond to the > > > > cluster > > > > > > >> group > > > > > > >> where is the streamer node running. > > > > > > >> > > > > > > >> In this case we get equal level of overhead as the > parallelism, > > > but > > > > > not > > > > > > so > > > > > > >> surplus as how many nodes in whole the cluster. > > > > > > >> > > > > > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > > > [hidden email] > > > > >: > > > > > > >> > > > > > > >> > Alexandr, > > > > > > >> > > > > > > > >> > Could you describe your proposal in more details? > > > > > > >> > Especially in case with several nodes. > > > > > > >> > > > > > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > > > > > >> [hidden email]> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > Hi, > > > > > > >> > > > > > > > > >> > > You know CacheStore API that is commonly used for > > > > > read/write-through > > > > > > >> > > relationship of the in-memory data with the persistence > > > storage. > > > > > > >> > > > > > > > > >> > > There is also IgniteCache.loadCache method for hot-loading > > the > > > > > cache > > > > > > >> on > > > > > > >> > > startup. Invocation of this method causes execution of > > > > > > >> > CacheStore.loadCache > > > > > > >> > > on the all nodes storing the cache partitions. Because of > > none > > > > > keys > > > > > > >> are > > > > > > >> > > passed to the CacheStore.loadCache methods, the underlying > > > > > > >> implementation > > > > > > >> > > is forced to read all the data from the persistence > storage, > > > but > > > > > > only > > > > > > >> > part > > > > > > >> > > of the data will be stored on each node. > > > > > > >> > > > > > > > > >> > > So, the current implementation have two general drawbacks: > > > > > > >> > > > > > > > > >> > > 1. Persistence storage is forced to perform as many > > identical > > > > > > queries > > > > > > >> as > > > > > > >> > > many nodes on the cluster. Each query may involve much > > > > additional > > > > > > >> > > computation on the persistence storage server. > > > > > > >> > > > > > > > > >> > > 2. Network is forced to transfer much more data, so > > obviously > > > > the > > > > > > big > > > > > > >> > > disadvantage on large systems. > > > > > > >> > > > > > > > > >> > > The partition-aware data loading approach, described in > > > > > > >> > > https://apacheignite.readme.io/docs/data-loading#section- > > > > > > >> > > partition-aware-data-loading > > > > > > >> > > , is not a choice. It requires persistence of the volatile > > > data > > > > > > >> depended > > > > > > >> > on > > > > > > >> > > affinity function implementation and settings. > > > > > > >> > > > > > > > > >> > > I propose using something like IgniteDataStreamer inside > > > > > > >> > > IgniteCache.loadCache implementation. > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > -- > > > > > > >> > > Thanks, > > > > > > >> > > Alexandr Kuramshin > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > -- > > > > > > >> > Alexey Kuznetsov > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> -- > > > > > > >> Thanks, > > > > > > >> Alexandr Kuramshin > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Alexey Kuznetsov > > > > > > > GridGain Systems > > > > > > > www.gridgain.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Thanks, > > > Alexandr Kuramshin > > > > > > > > > -- > Thanks, > Alexandr Kuramshin > -- Alexey Kuznetsov |
It sounds like Aleksandr is basically proposing to support automatic
persistence [1] for loading through data streamer and we really don't have this. However, I think I have more generic solution in mind. What if we add one more IgniteCache.loadCache overload like this: loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> clo, @Nullable Object... args) It's the same as the existing one, but with the key-value closure provided as a parameter. This closure will be passed to the CacheStore.loadCache along with the arguments and will allow to override the logic that actually saves the loaded entry in cache (currently this logic is always provided by the cache itself and user can't control it). We can then provide the implementation of this closure that will create a data streamer and call addData() within its apply() method. I see the following advantages: - Any existing CacheStore implementation can be reused to load through streamer (our JDBC and Cassandra stores or anything else that user has). - Loading code is always part of CacheStore implementation, so it's very easy to switch between different ways of loading. - User is not limited by two approaches we provide out of the box, they can always implement a new one. Thoughts? [1] https://apacheignite.readme.io/docs/automatic-persistence -Val On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov <[hidden email]> wrote: > Hi, All! > > I think we do not need to chage API at all. > > public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable > Object... args) throws CacheException; > > We could pass any args to loadCache(); > > So we could create class > IgniteCacheLoadDescriptor { > some fields that will describe how to load > } > > > and modify POJO store to detect and use such arguments. > > > All we need is to implement this and write good documentation and examples. > > Thoughts? > > On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin <[hidden email]> > wrote: > > > Hi Vladimir, > > > > I don't offer any changes in API. Usage scenario is the same as it was > > described in > > https://apacheignite.readme.io/docs/persistent-store#section-loadcache- > > > > The preload cache logic invokes IgniteCache.loadCache() with some > > additional arguments, depending on a CacheStore implementation, and then > > the loading occurs in the way I've already described. > > > > > > 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > > > > > Hi Alex, > > > > > > >>> Let's give the user the reusable code which is convenient, reliable > > and > > > fast. > > > Convenience - this is why I asked for example on how API can look like > > and > > > how users are going to use it. > > > > > > Vladimir. > > > > > > On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < > > [hidden email] > > > > > > > wrote: > > > > > > > Hi all, > > > > > > > > I think the discussion goes a wrong direction. Certainly it's not a > big > > > > deal to implement some custom user logic to load the data into > caches. > > > But > > > > Ignite framework gives the user some reusable code build on top of > the > > > > basic system. > > > > > > > > So the main question is: Why developers let the user to use > convenient > > > way > > > > to load caches with totally non-optimal solution? > > > > > > > > We could talk too much about different persistence storage types, but > > > > whenever we initiate the loading with IgniteCache.loadCache the > current > > > > implementation imposes much overhead on the network. > > > > > > > > Partition-aware data loading may be used in some scenarios to avoid > > this > > > > network overhead, but the users are compelled to do additional steps > to > > > > achieve this optimization: adding the column to tables, adding > compound > > > > indices including the added column, write a peace of repeatable code > to > > > > load the data in different caches in fault-tolerant fashion, etc. > > > > > > > > Let's give the user the reusable code which is convenient, reliable > and > > > > fast. > > > > > > > > 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > > > > [hidden email]>: > > > > > > > > > Hi Aleksandr, > > > > > > > > > > Data streamer is already outlined as one of the possible approaches > > for > > > > > loading the data [1]. Basically, you start a designated client node > > or > > > > > chose a leader among server nodes [1] and then use > IgniteDataStreamer > > > API > > > > > to load the data. With this approach there is no need to have the > > > > > CacheStore implementation at all. Can you please elaborate what > > > > additional > > > > > value are you trying to add here? > > > > > > > > > > [1] https://apacheignite.readme.io/docs/data-loading# > > > ignitedatastreamer > > > > > [2] https://apacheignite.readme.io/docs/leader-election > > > > > > > > > > -Val > > > > > > > > > > On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > > > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I just want to clarify a couple of API details from the original > > > > to > > > > > > make sure that we are making the right assumptions here. > > > > > > > > > > > > *"Because of none keys are passed to the CacheStore.loadCache > > > methods, > > > > > the > > > > > > > underlying implementation is forced to read all the data from > the > > > > > > > persistence storage"* > > > > > > > > > > > > > > > > > > According to the javadoc, loadCache(...) method receives an > > optional > > > > > > argument from the user. You can pass anything you like, > including a > > > > list > > > > > of > > > > > > keys, or an SQL where clause, etc. > > > > > > > > > > > > *"The partition-aware data loading approach is not a choice. It > > > > requires > > > > > > > persistence of the volatile data depended on affinity function > > > > > > > implementation and settings."* > > > > > > > > > > > > > > > > > > This is only partially true. While Ignite allows to plugin custom > > > > > affinity > > > > > > functions, the affinity function is not something that changes > > > > > dynamically > > > > > > and should always return the same partition for the same key.So, > > the > > > > > > partition assignments are not volatile at all. If, in some very > > rare > > > > > case, > > > > > > the partition assignment logic needs to change, then you could > > update > > > > the > > > > > > partition assignments that you may have persisted elsewhere as > > well, > > > > e.g. > > > > > > database. > > > > > > > > > > > > D. > > > > > > > > > > > > On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > > > > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Alexandr, Alexey, > > > > > > > > > > > > > > While I agree with you that current cache loading logic is far > > from > > > > > > ideal, > > > > > > > it would be cool to see API drafts based on your suggestions to > > get > > > > > > better > > > > > > > understanding of your ideas. How exactly users are going to use > > > your > > > > > > > suggestions? > > > > > > > > > > > > > > My main concern is that initial load is not very trivial task > in > > > > > general > > > > > > > case. Some users have centralized RDBMS systems, some have > NoSQL, > > > > > others > > > > > > > work with distributed persistent stores (e.g. HDFS). Sometimes > we > > > > have > > > > > > > Ignite nodes "near" persistent data, sometimes we don't. > > Sharding, > > > > > > > affinity, co-location, etc.. If we try to support all (or many) > > > cases > > > > > out > > > > > > > of the box, we may end up in very messy and difficult API. So > we > > > > should > > > > > > > carefully balance between simplicity, usability and > feature-rich > > > > > > > characteristics here. > > > > > > > > > > > > > > Personally, I think that if user is not satisfied with > > > "loadCache()" > > > > > API, > > > > > > > he just writes simple closure with blackjack streamer and > queries > > > and > > > > > > send > > > > > > > it to whatever node he finds convenient. Not a big deal. Only > > very > > > > > common > > > > > > > cases should be added to Ignite API. > > > > > > > > > > > > > > Vladimir. > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > > > > > > > [hidden email]> > > > > > > > wrote: > > > > > > > > > > > > > > > Looks good for me. > > > > > > > > > > > > > > > > But I will suggest to consider one more use-case: > > > > > > > > > > > > > > > > If user knows its data he could manually split loading. > > > > > > > > For example: table Persons contains 10M rows. > > > > > > > > User could provide something like: > > > > > > > > cache.loadCache(null, "Person", "select * from Person where > id > > < > > > > > > > > 1_000_000", > > > > > > > > "Person", "select * from Person where id >= 1_000_000 and > id < > > > > > > > 2_000_000", > > > > > > > > .... > > > > > > > > "Person", "select * from Person where id >= 9_000_000 and id > < > > > > > > > 10_000_000", > > > > > > > > ); > > > > > > > > > > > > > > > > or may be it could be some descriptor object like > > > > > > > > > > > > > > > > { > > > > > > > > sql: select * from Person where id >= ? and id < ?" > > > > > > > > range: 0...10_000_000 > > > > > > > > } > > > > > > > > > > > > > > > > In this case provided queries will be send to mach nodes as > > > number > > > > of > > > > > > > > queries. > > > > > > > > And data will be loaded in parallel and for keys that a not > > > local - > > > > > > data > > > > > > > > streamer > > > > > > > > should be used (as described Alexandr description). > > > > > > > > > > > > > > > > I think it is a good issue for Ignite 2.0 > > > > > > > > > > > > > > > > Vova, Val - what do you think? > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > > > > > > > [hidden email]> > > > > > > > > wrote: > > > > > > > > > > > > > > > >> All right, > > > > > > > >> > > > > > > > >> Let's assume a simple scenario. When the > IgniteCache.loadCache > > > is > > > > > > > invoked, > > > > > > > >> we check whether the cache is not local, and if so, then > we'll > > > > > > initiate > > > > > > > >> the > > > > > > > >> new loading logic. > > > > > > > >> > > > > > > > >> First, we take a "streamer" node, it could be done by > > > > > > > >> utilizing LoadBalancingSpi, or it may be configured > > statically, > > > > for > > > > > > the > > > > > > > >> reason that the streamer node is running on the same host as > > the > > > > > > > >> persistence storage provider. > > > > > > > >> > > > > > > > >> After that we start the loading task on the streamer node > > which > > > > > > > >> creates IgniteDataStreamer and loads the cache with > > > > > > > CacheStore.loadCache. > > > > > > > >> Every call to IgniteBiInClosure.apply simply > > > > > > > >> invokes IgniteDataStreamer.addData. > > > > > > > >> > > > > > > > >> This implementation will completely relieve overhead on the > > > > > > persistence > > > > > > > >> storage provider. Network overhead is also decreased in the > > case > > > > of > > > > > > > >> partitioned caches. For two nodes we get 1-1/2 amount of > data > > > > > > > transferred > > > > > > > >> by the network (1 part well be transferred from the > > persistence > > > > > > storage > > > > > > > to > > > > > > > >> the streamer, and then 1/2 from the streamer node to the > > another > > > > > > node). > > > > > > > >> For > > > > > > > >> three nodes it will be 1-2/3 and so on, up to the two times > > > amount > > > > > of > > > > > > > data > > > > > > > >> on the big clusters. > > > > > > > >> > > > > > > > >> I'd like to propose some additional optimization at this > > place. > > > If > > > > > we > > > > > > > have > > > > > > > >> the streamer node on the same machine as the persistence > > storage > > > > > > > provider, > > > > > > > >> then we completely relieve the network overhead as well. It > > > could > > > > > be a > > > > > > > >> some > > > > > > > >> special daemon node for the cache loading assigned in the > > cache > > > > > > > >> configuration, or an ordinary sever node as well. > > > > > > > >> > > > > > > > >> Certainly this calculations have been done in assumption > that > > we > > > > > have > > > > > > > even > > > > > > > >> partitioned cache with only primary nodes (without backups). > > In > > > > the > > > > > > case > > > > > > > >> of > > > > > > > >> one backup (the most frequent case I think), we get 2 amount > > of > > > > data > > > > > > > >> transferred by the network on two nodes, 2-1/3 on three, > 2-1/2 > > > on > > > > > > four, > > > > > > > >> and > > > > > > > >> so on up to the three times amount of data on the big > > clusters. > > > > > Hence > > > > > > > it's > > > > > > > >> still better than the current implementation. In the worst > > case > > > > > with a > > > > > > > >> fully replicated cache we take N+1 amount of data > transferred > > by > > > > the > > > > > > > >> network (where N is the number of nodes in the cluster). But > > > it's > > > > > not > > > > > > a > > > > > > > >> problem in small clusters, and a little overhead in big > > > clusters. > > > > > And > > > > > > we > > > > > > > >> still gain the persistence storage provider optimization. > > > > > > > >> > > > > > > > >> Now let's take more complex scenario. To achieve some level > of > > > > > > > >> parallelism, > > > > > > > >> we could split our cluster on several groups. It could be a > > > > > parameter > > > > > > of > > > > > > > >> the IgniteCache.loadCache method or a cache configuration > > > option. > > > > > The > > > > > > > >> number of groups could be a fixed value, or it could be > > > calculated > > > > > > > >> dynamically by the maximum number of nodes in the group. > > > > > > > >> > > > > > > > >> After splitting the whole cluster on groups we will take the > > > > > streamer > > > > > > > node > > > > > > > >> in the each group and submit the task for loading the cache > > > > similar > > > > > to > > > > > > > the > > > > > > > >> single streamer scenario, except as the only keys will be > > passed > > > > to > > > > > > > >> the IgniteDataStreamer.addData method those correspond to > the > > > > > cluster > > > > > > > >> group > > > > > > > >> where is the streamer node running. > > > > > > > >> > > > > > > > >> In this case we get equal level of overhead as the > > parallelism, > > > > but > > > > > > not > > > > > > > so > > > > > > > >> surplus as how many nodes in whole the cluster. > > > > > > > >> > > > > > > > >> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > > > > [hidden email] > > > > > >: > > > > > > > >> > > > > > > > >> > Alexandr, > > > > > > > >> > > > > > > > > >> > Could you describe your proposal in more details? > > > > > > > >> > Especially in case with several nodes. > > > > > > > >> > > > > > > > > >> > On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > > > > > > > >> [hidden email]> > > > > > > > >> > wrote: > > > > > > > >> > > > > > > > > >> > > Hi, > > > > > > > >> > > > > > > > > > >> > > You know CacheStore API that is commonly used for > > > > > > read/write-through > > > > > > > >> > > relationship of the in-memory data with the persistence > > > > storage. > > > > > > > >> > > > > > > > > > >> > > There is also IgniteCache.loadCache method for > hot-loading > > > the > > > > > > cache > > > > > > > >> on > > > > > > > >> > > startup. Invocation of this method causes execution of > > > > > > > >> > CacheStore.loadCache > > > > > > > >> > > on the all nodes storing the cache partitions. Because > of > > > none > > > > > > keys > > > > > > > >> are > > > > > > > >> > > passed to the CacheStore.loadCache methods, the > underlying > > > > > > > >> implementation > > > > > > > >> > > is forced to read all the data from the persistence > > storage, > > > > but > > > > > > > only > > > > > > > >> > part > > > > > > > >> > > of the data will be stored on each node. > > > > > > > >> > > > > > > > > > >> > > So, the current implementation have two general > drawbacks: > > > > > > > >> > > > > > > > > > >> > > 1. Persistence storage is forced to perform as many > > > identical > > > > > > > queries > > > > > > > >> as > > > > > > > >> > > many nodes on the cluster. Each query may involve much > > > > > additional > > > > > > > >> > > computation on the persistence storage server. > > > > > > > >> > > > > > > > > > >> > > 2. Network is forced to transfer much more data, so > > > obviously > > > > > the > > > > > > > big > > > > > > > >> > > disadvantage on large systems. > > > > > > > >> > > > > > > > > > >> > > The partition-aware data loading approach, described in > > > > > > > >> > > https://apacheignite.readme. > io/docs/data-loading#section- > > > > > > > >> > > partition-aware-data-loading > > > > > > > >> > > , is not a choice. It requires persistence of the > volatile > > > > data > > > > > > > >> depended > > > > > > > >> > on > > > > > > > >> > > affinity function implementation and settings. > > > > > > > >> > > > > > > > > > >> > > I propose using something like IgniteDataStreamer inside > > > > > > > >> > > IgniteCache.loadCache implementation. > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > -- > > > > > > > >> > > Thanks, > > > > > > > >> > > Alexandr Kuramshin > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > -- > > > > > > > >> > Alexey Kuznetsov > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> -- > > > > > > > >> Thanks, > > > > > > > >> Alexandr Kuramshin > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Alexey Kuznetsov > > > > > > > > GridGain Systems > > > > > > > > www.gridgain.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Alexandr Kuramshin > > > > > > > > > > > > > > > -- > > Thanks, > > Alexandr Kuramshin > > > > > > -- > Alexey Kuznetsov > |
How would your proposal resolve the main point Aleksandr is trying to convey that is extensive network utilization?
As I see the loadCache method still will be triggered on every and as before all the nodes will pre-load all the data set from a database. That was Aleksandr’s reasonable concern. If we make up a way how to call the loadCache on a specific node only and implement some falt-tolerant mechanism then your suggestion should work perfectly fine. — Denis > On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko <[hidden email]> wrote: > > It sounds like Aleksandr is basically proposing to support automatic > persistence [1] for loading through data streamer and we really don't have > this. However, I think I have more generic solution in mind. > > What if we add one more IgniteCache.loadCache overload like this: > > loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> > clo, @Nullable > Object... args) > > It's the same as the existing one, but with the key-value closure provided > as a parameter. This closure will be passed to the CacheStore.loadCache > along with the arguments and will allow to override the logic that actually > saves the loaded entry in cache (currently this logic is always provided by > the cache itself and user can't control it). > > We can then provide the implementation of this closure that will create a > data streamer and call addData() within its apply() method. > > I see the following advantages: > > - Any existing CacheStore implementation can be reused to load through > streamer (our JDBC and Cassandra stores or anything else that user has). > - Loading code is always part of CacheStore implementation, so it's very > easy to switch between different ways of loading. > - User is not limited by two approaches we provide out of the box, they > can always implement a new one. > > Thoughts? > > [1] https://apacheignite.readme.io/docs/automatic-persistence > > -Val > > On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov <[hidden email]> > wrote: > >> Hi, All! >> >> I think we do not need to chage API at all. >> >> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable >> Object... args) throws CacheException; >> >> We could pass any args to loadCache(); >> >> So we could create class >> IgniteCacheLoadDescriptor { >> some fields that will describe how to load >> } >> >> >> and modify POJO store to detect and use such arguments. >> >> >> All we need is to implement this and write good documentation and examples. >> >> Thoughts? >> >> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin <[hidden email]> >> wrote: >> >>> Hi Vladimir, >>> >>> I don't offer any changes in API. Usage scenario is the same as it was >>> described in >>> https://apacheignite.readme.io/docs/persistent-store#section-loadcache- >>> >>> The preload cache logic invokes IgniteCache.loadCache() with some >>> additional arguments, depending on a CacheStore implementation, and then >>> the loading occurs in the way I've already described. >>> >>> >>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: >>> >>>> Hi Alex, >>>> >>>>>>> Let's give the user the reusable code which is convenient, reliable >>> and >>>> fast. >>>> Convenience - this is why I asked for example on how API can look like >>> and >>>> how users are going to use it. >>>> >>>> Vladimir. >>>> >>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < >>> [hidden email] >>>>> >>>> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I think the discussion goes a wrong direction. Certainly it's not a >> big >>>>> deal to implement some custom user logic to load the data into >> caches. >>>> But >>>>> Ignite framework gives the user some reusable code build on top of >> the >>>>> basic system. >>>>> >>>>> So the main question is: Why developers let the user to use >> convenient >>>> way >>>>> to load caches with totally non-optimal solution? >>>>> >>>>> We could talk too much about different persistence storage types, but >>>>> whenever we initiate the loading with IgniteCache.loadCache the >> current >>>>> implementation imposes much overhead on the network. >>>>> >>>>> Partition-aware data loading may be used in some scenarios to avoid >>> this >>>>> network overhead, but the users are compelled to do additional steps >> to >>>>> achieve this optimization: adding the column to tables, adding >> compound >>>>> indices including the added column, write a peace of repeatable code >> to >>>>> load the data in different caches in fault-tolerant fashion, etc. >>>>> >>>>> Let's give the user the reusable code which is convenient, reliable >> and >>>>> fast. >>>>> >>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < >>>>> [hidden email]>: >>>>> >>>>>> Hi Aleksandr, >>>>>> >>>>>> Data streamer is already outlined as one of the possible approaches >>> for >>>>>> loading the data [1]. Basically, you start a designated client node >>> or >>>>>> chose a leader among server nodes [1] and then use >> IgniteDataStreamer >>>> API >>>>>> to load the data. With this approach there is no need to have the >>>>>> CacheStore implementation at all. Can you please elaborate what >>>>> additional >>>>>> value are you trying to add here? >>>>>> >>>>>> [1] https://apacheignite.readme.io/docs/data-loading# >>>> ignitedatastreamer >>>>>> [2] https://apacheignite.readme.io/docs/leader-election >>>>>> >>>>>> -Val >>>>>> >>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < >>>>> [hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I just want to clarify a couple of API details from the original >>>>> to >>>>>>> make sure that we are making the right assumptions here. >>>>>>> >>>>>>> *"Because of none keys are passed to the CacheStore.loadCache >>>> methods, >>>>>> the >>>>>>>> underlying implementation is forced to read all the data from >> the >>>>>>>> persistence storage"* >>>>>>> >>>>>>> >>>>>>> According to the javadoc, loadCache(...) method receives an >>> optional >>>>>>> argument from the user. You can pass anything you like, >> including a >>>>> list >>>>>> of >>>>>>> keys, or an SQL where clause, etc. >>>>>>> >>>>>>> *"The partition-aware data loading approach is not a choice. It >>>>> requires >>>>>>>> persistence of the volatile data depended on affinity function >>>>>>>> implementation and settings."* >>>>>>> >>>>>>> >>>>>>> This is only partially true. While Ignite allows to plugin custom >>>>>> affinity >>>>>>> functions, the affinity function is not something that changes >>>>>> dynamically >>>>>>> and should always return the same partition for the same key.So, >>> the >>>>>>> partition assignments are not volatile at all. If, in some very >>> rare >>>>>> case, >>>>>>> the partition assignment logic needs to change, then you could >>> update >>>>> the >>>>>>> partition assignments that you may have persisted elsewhere as >>> well, >>>>> e.g. >>>>>>> database. >>>>>>> >>>>>>> D. >>>>>>> >>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < >>>>> [hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Alexandr, Alexey, >>>>>>>> >>>>>>>> While I agree with you that current cache loading logic is far >>> from >>>>>>> ideal, >>>>>>>> it would be cool to see API drafts based on your suggestions to >>> get >>>>>>> better >>>>>>>> understanding of your ideas. How exactly users are going to use >>>> your >>>>>>>> suggestions? >>>>>>>> >>>>>>>> My main concern is that initial load is not very trivial task >> in >>>>>> general >>>>>>>> case. Some users have centralized RDBMS systems, some have >> NoSQL, >>>>>> others >>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes >> we >>>>> have >>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. >>> Sharding, >>>>>>>> affinity, co-location, etc.. If we try to support all (or many) >>>> cases >>>>>> out >>>>>>>> of the box, we may end up in very messy and difficult API. So >> we >>>>> should >>>>>>>> carefully balance between simplicity, usability and >> feature-rich >>>>>>>> characteristics here. >>>>>>>> >>>>>>>> Personally, I think that if user is not satisfied with >>>> "loadCache()" >>>>>> API, >>>>>>>> he just writes simple closure with blackjack streamer and >> queries >>>> and >>>>>>> send >>>>>>>> it to whatever node he finds convenient. Not a big deal. Only >>> very >>>>>> common >>>>>>>> cases should be added to Ignite API. >>>>>>>> >>>>>>>> Vladimir. >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < >>>>>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Looks good for me. >>>>>>>>> >>>>>>>>> But I will suggest to consider one more use-case: >>>>>>>>> >>>>>>>>> If user knows its data he could manually split loading. >>>>>>>>> For example: table Persons contains 10M rows. >>>>>>>>> User could provide something like: >>>>>>>>> cache.loadCache(null, "Person", "select * from Person where >> id >>> < >>>>>>>>> 1_000_000", >>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and >> id < >>>>>>>> 2_000_000", >>>>>>>>> .... >>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id >> < >>>>>>>> 10_000_000", >>>>>>>>> ); >>>>>>>>> >>>>>>>>> or may be it could be some descriptor object like >>>>>>>>> >>>>>>>>> { >>>>>>>>> sql: select * from Person where id >= ? and id < ?" >>>>>>>>> range: 0...10_000_000 >>>>>>>>> } >>>>>>>>> >>>>>>>>> In this case provided queries will be send to mach nodes as >>>> number >>>>> of >>>>>>>>> queries. >>>>>>>>> And data will be loaded in parallel and for keys that a not >>>> local - >>>>>>> data >>>>>>>>> streamer >>>>>>>>> should be used (as described Alexandr description). >>>>>>>>> >>>>>>>>> I think it is a good issue for Ignite 2.0 >>>>>>>>> >>>>>>>>> Vova, Val - what do you think? >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < >>>>>>>> [hidden email]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> All right, >>>>>>>>>> >>>>>>>>>> Let's assume a simple scenario. When the >> IgniteCache.loadCache >>>> is >>>>>>>> invoked, >>>>>>>>>> we check whether the cache is not local, and if so, then >> we'll >>>>>>> initiate >>>>>>>>>> the >>>>>>>>>> new loading logic. >>>>>>>>>> >>>>>>>>>> First, we take a "streamer" node, it could be done by >>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured >>> statically, >>>>> for >>>>>>> the >>>>>>>>>> reason that the streamer node is running on the same host as >>> the >>>>>>>>>> persistence storage provider. >>>>>>>>>> >>>>>>>>>> After that we start the loading task on the streamer node >>> which >>>>>>>>>> creates IgniteDataStreamer and loads the cache with >>>>>>>> CacheStore.loadCache. >>>>>>>>>> Every call to IgniteBiInClosure.apply simply >>>>>>>>>> invokes IgniteDataStreamer.addData. >>>>>>>>>> >>>>>>>>>> This implementation will completely relieve overhead on the >>>>>>> persistence >>>>>>>>>> storage provider. Network overhead is also decreased in the >>> case >>>>> of >>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of >> data >>>>>>>> transferred >>>>>>>>>> by the network (1 part well be transferred from the >>> persistence >>>>>>> storage >>>>>>>> to >>>>>>>>>> the streamer, and then 1/2 from the streamer node to the >>> another >>>>>>> node). >>>>>>>>>> For >>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times >>>> amount >>>>>> of >>>>>>>> data >>>>>>>>>> on the big clusters. >>>>>>>>>> >>>>>>>>>> I'd like to propose some additional optimization at this >>> place. >>>> If >>>>>> we >>>>>>>> have >>>>>>>>>> the streamer node on the same machine as the persistence >>> storage >>>>>>>> provider, >>>>>>>>>> then we completely relieve the network overhead as well. It >>>> could >>>>>> be a >>>>>>>>>> some >>>>>>>>>> special daemon node for the cache loading assigned in the >>> cache >>>>>>>>>> configuration, or an ordinary sever node as well. >>>>>>>>>> >>>>>>>>>> Certainly this calculations have been done in assumption >> that >>> we >>>>>> have >>>>>>>> even >>>>>>>>>> partitioned cache with only primary nodes (without backups). >>> In >>>>> the >>>>>>> case >>>>>>>>>> of >>>>>>>>>> one backup (the most frequent case I think), we get 2 amount >>> of >>>>> data >>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, >> 2-1/2 >>>> on >>>>>>> four, >>>>>>>>>> and >>>>>>>>>> so on up to the three times amount of data on the big >>> clusters. >>>>>> Hence >>>>>>>> it's >>>>>>>>>> still better than the current implementation. In the worst >>> case >>>>>> with a >>>>>>>>>> fully replicated cache we take N+1 amount of data >> transferred >>> by >>>>> the >>>>>>>>>> network (where N is the number of nodes in the cluster). But >>>> it's >>>>>> not >>>>>>> a >>>>>>>>>> problem in small clusters, and a little overhead in big >>>> clusters. >>>>>> And >>>>>>> we >>>>>>>>>> still gain the persistence storage provider optimization. >>>>>>>>>> >>>>>>>>>> Now let's take more complex scenario. To achieve some level >> of >>>>>>>>>> parallelism, >>>>>>>>>> we could split our cluster on several groups. It could be a >>>>>> parameter >>>>>>> of >>>>>>>>>> the IgniteCache.loadCache method or a cache configuration >>>> option. >>>>>> The >>>>>>>>>> number of groups could be a fixed value, or it could be >>>> calculated >>>>>>>>>> dynamically by the maximum number of nodes in the group. >>>>>>>>>> >>>>>>>>>> After splitting the whole cluster on groups we will take the >>>>>> streamer >>>>>>>> node >>>>>>>>>> in the each group and submit the task for loading the cache >>>>> similar >>>>>> to >>>>>>>> the >>>>>>>>>> single streamer scenario, except as the only keys will be >>> passed >>>>> to >>>>>>>>>> the IgniteDataStreamer.addData method those correspond to >> the >>>>>> cluster >>>>>>>>>> group >>>>>>>>>> where is the streamer node running. >>>>>>>>>> >>>>>>>>>> In this case we get equal level of overhead as the >>> parallelism, >>>>> but >>>>>>> not >>>>>>>> so >>>>>>>>>> surplus as how many nodes in whole the cluster. >>>>>>>>>> >>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < >>>>> [hidden email] >>>>>>> : >>>>>>>>>> >>>>>>>>>>> Alexandr, >>>>>>>>>>> >>>>>>>>>>> Could you describe your proposal in more details? >>>>>>>>>>> Especially in case with several nodes. >>>>>>>>>>> >>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < >>>>>>>>>> [hidden email]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> You know CacheStore API that is commonly used for >>>>>>> read/write-through >>>>>>>>>>>> relationship of the in-memory data with the persistence >>>>> storage. >>>>>>>>>>>> >>>>>>>>>>>> There is also IgniteCache.loadCache method for >> hot-loading >>>> the >>>>>>> cache >>>>>>>>>> on >>>>>>>>>>>> startup. Invocation of this method causes execution of >>>>>>>>>>> CacheStore.loadCache >>>>>>>>>>>> on the all nodes storing the cache partitions. Because >> of >>>> none >>>>>>> keys >>>>>>>>>> are >>>>>>>>>>>> passed to the CacheStore.loadCache methods, the >> underlying >>>>>>>>>> implementation >>>>>>>>>>>> is forced to read all the data from the persistence >>> storage, >>>>> but >>>>>>>> only >>>>>>>>>>> part >>>>>>>>>>>> of the data will be stored on each node. >>>>>>>>>>>> >>>>>>>>>>>> So, the current implementation have two general >> drawbacks: >>>>>>>>>>>> >>>>>>>>>>>> 1. Persistence storage is forced to perform as many >>>> identical >>>>>>>> queries >>>>>>>>>> as >>>>>>>>>>>> many nodes on the cluster. Each query may involve much >>>>>> additional >>>>>>>>>>>> computation on the persistence storage server. >>>>>>>>>>>> >>>>>>>>>>>> 2. Network is forced to transfer much more data, so >>>> obviously >>>>>> the >>>>>>>> big >>>>>>>>>>>> disadvantage on large systems. >>>>>>>>>>>> >>>>>>>>>>>> The partition-aware data loading approach, described in >>>>>>>>>>>> https://apacheignite.readme. >> io/docs/data-loading#section- >>>>>>>>>>>> partition-aware-data-loading >>>>>>>>>>>> , is not a choice. It requires persistence of the >> volatile >>>>> data >>>>>>>>>> depended >>>>>>>>>>> on >>>>>>>>>>>> affinity function implementation and settings. >>>>>>>>>>>> >>>>>>>>>>>> I propose using something like IgniteDataStreamer inside >>>>>>>>>>>> IgniteCache.loadCache implementation. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Alexandr Kuramshin >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Alexey Kuznetsov >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Thanks, >>>>>>>>>> Alexandr Kuramshin >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Alexey Kuznetsov >>>>>>>>> GridGain Systems >>>>>>>>> www.gridgain.com >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Alexandr Kuramshin >>>>> >>>> >>> >>> >>> >>> -- >>> Thanks, >>> Alexandr Kuramshin >>> >> >> >> >> -- >> Alexey Kuznetsov >> |
You can use localLoadCache method for this (it should be overloaded as well
of course). Basically, if you provide closure based on IgniteDataStreamer and call localLoadCache on one of the nodes (client or server), it's the same approach as described in [1], but with the possibility to reuse existing persistence code. Makes sense? [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer -Val On Tue, Nov 15, 2016 at 1:15 PM, Denis Magda <[hidden email]> wrote: > How would your proposal resolve the main point Aleksandr is trying to > convey that is extensive network utilization? > > As I see the loadCache method still will be triggered on every and as > before all the nodes will pre-load all the data set from a database. That > was Aleksandr’s reasonable concern. > > If we make up a way how to call the loadCache on a specific node only and > implement some falt-tolerant mechanism then your suggestion should work > perfectly fine. > > — > Denis > > > On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko < > [hidden email]> wrote: > > > > It sounds like Aleksandr is basically proposing to support automatic > > persistence [1] for loading through data streamer and we really don't > have > > this. However, I think I have more generic solution in mind. > > > > What if we add one more IgniteCache.loadCache overload like this: > > > > loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> > > clo, @Nullable > > Object... args) > > > > It's the same as the existing one, but with the key-value closure > provided > > as a parameter. This closure will be passed to the CacheStore.loadCache > > along with the arguments and will allow to override the logic that > actually > > saves the loaded entry in cache (currently this logic is always provided > by > > the cache itself and user can't control it). > > > > We can then provide the implementation of this closure that will create a > > data streamer and call addData() within its apply() method. > > > > I see the following advantages: > > > > - Any existing CacheStore implementation can be reused to load through > > streamer (our JDBC and Cassandra stores or anything else that user > has). > > - Loading code is always part of CacheStore implementation, so it's > very > > easy to switch between different ways of loading. > > - User is not limited by two approaches we provide out of the box, they > > can always implement a new one. > > > > Thoughts? > > > > [1] https://apacheignite.readme.io/docs/automatic-persistence > > > > -Val > > > > On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov <[hidden email] > > > > wrote: > > > >> Hi, All! > >> > >> I think we do not need to chage API at all. > >> > >> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable > >> Object... args) throws CacheException; > >> > >> We could pass any args to loadCache(); > >> > >> So we could create class > >> IgniteCacheLoadDescriptor { > >> some fields that will describe how to load > >> } > >> > >> > >> and modify POJO store to detect and use such arguments. > >> > >> > >> All we need is to implement this and write good documentation and > examples. > >> > >> Thoughts? > >> > >> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin < > [hidden email]> > >> wrote: > >> > >>> Hi Vladimir, > >>> > >>> I don't offer any changes in API. Usage scenario is the same as it was > >>> described in > >>> https://apacheignite.readme.io/docs/persistent-store# > section-loadcache- > >>> > >>> The preload cache logic invokes IgniteCache.loadCache() with some > >>> additional arguments, depending on a CacheStore implementation, and > then > >>> the loading occurs in the way I've already described. > >>> > >>> > >>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > >>> > >>>> Hi Alex, > >>>> > >>>>>>> Let's give the user the reusable code which is convenient, reliable > >>> and > >>>> fast. > >>>> Convenience - this is why I asked for example on how API can look like > >>> and > >>>> how users are going to use it. > >>>> > >>>> Vladimir. > >>>> > >>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < > >>> [hidden email] > >>>>> > >>>> wrote: > >>>> > >>>>> Hi all, > >>>>> > >>>>> I think the discussion goes a wrong direction. Certainly it's not a > >> big > >>>>> deal to implement some custom user logic to load the data into > >> caches. > >>>> But > >>>>> Ignite framework gives the user some reusable code build on top of > >> the > >>>>> basic system. > >>>>> > >>>>> So the main question is: Why developers let the user to use > >> convenient > >>>> way > >>>>> to load caches with totally non-optimal solution? > >>>>> > >>>>> We could talk too much about different persistence storage types, but > >>>>> whenever we initiate the loading with IgniteCache.loadCache the > >> current > >>>>> implementation imposes much overhead on the network. > >>>>> > >>>>> Partition-aware data loading may be used in some scenarios to avoid > >>> this > >>>>> network overhead, but the users are compelled to do additional steps > >> to > >>>>> achieve this optimization: adding the column to tables, adding > >> compound > >>>>> indices including the added column, write a peace of repeatable code > >> to > >>>>> load the data in different caches in fault-tolerant fashion, etc. > >>>>> > >>>>> Let's give the user the reusable code which is convenient, reliable > >> and > >>>>> fast. > >>>>> > >>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > >>>>> [hidden email]>: > >>>>> > >>>>>> Hi Aleksandr, > >>>>>> > >>>>>> Data streamer is already outlined as one of the possible approaches > >>> for > >>>>>> loading the data [1]. Basically, you start a designated client node > >>> or > >>>>>> chose a leader among server nodes [1] and then use > >> IgniteDataStreamer > >>>> API > >>>>>> to load the data. With this approach there is no need to have the > >>>>>> CacheStore implementation at all. Can you please elaborate what > >>>>> additional > >>>>>> value are you trying to add here? > >>>>>> > >>>>>> [1] https://apacheignite.readme.io/docs/data-loading# > >>>> ignitedatastreamer > >>>>>> [2] https://apacheignite.readme.io/docs/leader-election > >>>>>> > >>>>>> -Val > >>>>>> > >>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > >>>>> [hidden email]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> I just want to clarify a couple of API details from the original > >>>>> to > >>>>>>> make sure that we are making the right assumptions here. > >>>>>>> > >>>>>>> *"Because of none keys are passed to the CacheStore.loadCache > >>>> methods, > >>>>>> the > >>>>>>>> underlying implementation is forced to read all the data from > >> the > >>>>>>>> persistence storage"* > >>>>>>> > >>>>>>> > >>>>>>> According to the javadoc, loadCache(...) method receives an > >>> optional > >>>>>>> argument from the user. You can pass anything you like, > >> including a > >>>>> list > >>>>>> of > >>>>>>> keys, or an SQL where clause, etc. > >>>>>>> > >>>>>>> *"The partition-aware data loading approach is not a choice. It > >>>>> requires > >>>>>>>> persistence of the volatile data depended on affinity function > >>>>>>>> implementation and settings."* > >>>>>>> > >>>>>>> > >>>>>>> This is only partially true. While Ignite allows to plugin custom > >>>>>> affinity > >>>>>>> functions, the affinity function is not something that changes > >>>>>> dynamically > >>>>>>> and should always return the same partition for the same key.So, > >>> the > >>>>>>> partition assignments are not volatile at all. If, in some very > >>> rare > >>>>>> case, > >>>>>>> the partition assignment logic needs to change, then you could > >>> update > >>>>> the > >>>>>>> partition assignments that you may have persisted elsewhere as > >>> well, > >>>>> e.g. > >>>>>>> database. > >>>>>>> > >>>>>>> D. > >>>>>>> > >>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > >>>>> [hidden email]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Alexandr, Alexey, > >>>>>>>> > >>>>>>>> While I agree with you that current cache loading logic is far > >>> from > >>>>>>> ideal, > >>>>>>>> it would be cool to see API drafts based on your suggestions to > >>> get > >>>>>>> better > >>>>>>>> understanding of your ideas. How exactly users are going to use > >>>> your > >>>>>>>> suggestions? > >>>>>>>> > >>>>>>>> My main concern is that initial load is not very trivial task > >> in > >>>>>> general > >>>>>>>> case. Some users have centralized RDBMS systems, some have > >> NoSQL, > >>>>>> others > >>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes > >> we > >>>>> have > >>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. > >>> Sharding, > >>>>>>>> affinity, co-location, etc.. If we try to support all (or many) > >>>> cases > >>>>>> out > >>>>>>>> of the box, we may end up in very messy and difficult API. So > >> we > >>>>> should > >>>>>>>> carefully balance between simplicity, usability and > >> feature-rich > >>>>>>>> characteristics here. > >>>>>>>> > >>>>>>>> Personally, I think that if user is not satisfied with > >>>> "loadCache()" > >>>>>> API, > >>>>>>>> he just writes simple closure with blackjack streamer and > >> queries > >>>> and > >>>>>>> send > >>>>>>>> it to whatever node he finds convenient. Not a big deal. Only > >>> very > >>>>>> common > >>>>>>>> cases should be added to Ignite API. > >>>>>>>> > >>>>>>>> Vladimir. > >>>>>>>> > >>>>>>>> > >>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > >>>>>>>> [hidden email]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Looks good for me. > >>>>>>>>> > >>>>>>>>> But I will suggest to consider one more use-case: > >>>>>>>>> > >>>>>>>>> If user knows its data he could manually split loading. > >>>>>>>>> For example: table Persons contains 10M rows. > >>>>>>>>> User could provide something like: > >>>>>>>>> cache.loadCache(null, "Person", "select * from Person where > >> id > >>> < > >>>>>>>>> 1_000_000", > >>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and > >> id < > >>>>>>>> 2_000_000", > >>>>>>>>> .... > >>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id > >> < > >>>>>>>> 10_000_000", > >>>>>>>>> ); > >>>>>>>>> > >>>>>>>>> or may be it could be some descriptor object like > >>>>>>>>> > >>>>>>>>> { > >>>>>>>>> sql: select * from Person where id >= ? and id < ?" > >>>>>>>>> range: 0...10_000_000 > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> In this case provided queries will be send to mach nodes as > >>>> number > >>>>> of > >>>>>>>>> queries. > >>>>>>>>> And data will be loaded in parallel and for keys that a not > >>>> local - > >>>>>>> data > >>>>>>>>> streamer > >>>>>>>>> should be used (as described Alexandr description). > >>>>>>>>> > >>>>>>>>> I think it is a good issue for Ignite 2.0 > >>>>>>>>> > >>>>>>>>> Vova, Val - what do you think? > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > >>>>>>>> [hidden email]> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> All right, > >>>>>>>>>> > >>>>>>>>>> Let's assume a simple scenario. When the > >> IgniteCache.loadCache > >>>> is > >>>>>>>> invoked, > >>>>>>>>>> we check whether the cache is not local, and if so, then > >> we'll > >>>>>>> initiate > >>>>>>>>>> the > >>>>>>>>>> new loading logic. > >>>>>>>>>> > >>>>>>>>>> First, we take a "streamer" node, it could be done by > >>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured > >>> statically, > >>>>> for > >>>>>>> the > >>>>>>>>>> reason that the streamer node is running on the same host as > >>> the > >>>>>>>>>> persistence storage provider. > >>>>>>>>>> > >>>>>>>>>> After that we start the loading task on the streamer node > >>> which > >>>>>>>>>> creates IgniteDataStreamer and loads the cache with > >>>>>>>> CacheStore.loadCache. > >>>>>>>>>> Every call to IgniteBiInClosure.apply simply > >>>>>>>>>> invokes IgniteDataStreamer.addData. > >>>>>>>>>> > >>>>>>>>>> This implementation will completely relieve overhead on the > >>>>>>> persistence > >>>>>>>>>> storage provider. Network overhead is also decreased in the > >>> case > >>>>> of > >>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of > >> data > >>>>>>>> transferred > >>>>>>>>>> by the network (1 part well be transferred from the > >>> persistence > >>>>>>> storage > >>>>>>>> to > >>>>>>>>>> the streamer, and then 1/2 from the streamer node to the > >>> another > >>>>>>> node). > >>>>>>>>>> For > >>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times > >>>> amount > >>>>>> of > >>>>>>>> data > >>>>>>>>>> on the big clusters. > >>>>>>>>>> > >>>>>>>>>> I'd like to propose some additional optimization at this > >>> place. > >>>> If > >>>>>> we > >>>>>>>> have > >>>>>>>>>> the streamer node on the same machine as the persistence > >>> storage > >>>>>>>> provider, > >>>>>>>>>> then we completely relieve the network overhead as well. It > >>>> could > >>>>>> be a > >>>>>>>>>> some > >>>>>>>>>> special daemon node for the cache loading assigned in the > >>> cache > >>>>>>>>>> configuration, or an ordinary sever node as well. > >>>>>>>>>> > >>>>>>>>>> Certainly this calculations have been done in assumption > >> that > >>> we > >>>>>> have > >>>>>>>> even > >>>>>>>>>> partitioned cache with only primary nodes (without backups). > >>> In > >>>>> the > >>>>>>> case > >>>>>>>>>> of > >>>>>>>>>> one backup (the most frequent case I think), we get 2 amount > >>> of > >>>>> data > >>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, > >> 2-1/2 > >>>> on > >>>>>>> four, > >>>>>>>>>> and > >>>>>>>>>> so on up to the three times amount of data on the big > >>> clusters. > >>>>>> Hence > >>>>>>>> it's > >>>>>>>>>> still better than the current implementation. In the worst > >>> case > >>>>>> with a > >>>>>>>>>> fully replicated cache we take N+1 amount of data > >> transferred > >>> by > >>>>> the > >>>>>>>>>> network (where N is the number of nodes in the cluster). But > >>>> it's > >>>>>> not > >>>>>>> a > >>>>>>>>>> problem in small clusters, and a little overhead in big > >>>> clusters. > >>>>>> And > >>>>>>> we > >>>>>>>>>> still gain the persistence storage provider optimization. > >>>>>>>>>> > >>>>>>>>>> Now let's take more complex scenario. To achieve some level > >> of > >>>>>>>>>> parallelism, > >>>>>>>>>> we could split our cluster on several groups. It could be a > >>>>>> parameter > >>>>>>> of > >>>>>>>>>> the IgniteCache.loadCache method or a cache configuration > >>>> option. > >>>>>> The > >>>>>>>>>> number of groups could be a fixed value, or it could be > >>>> calculated > >>>>>>>>>> dynamically by the maximum number of nodes in the group. > >>>>>>>>>> > >>>>>>>>>> After splitting the whole cluster on groups we will take the > >>>>>> streamer > >>>>>>>> node > >>>>>>>>>> in the each group and submit the task for loading the cache > >>>>> similar > >>>>>> to > >>>>>>>> the > >>>>>>>>>> single streamer scenario, except as the only keys will be > >>> passed > >>>>> to > >>>>>>>>>> the IgniteDataStreamer.addData method those correspond to > >> the > >>>>>> cluster > >>>>>>>>>> group > >>>>>>>>>> where is the streamer node running. > >>>>>>>>>> > >>>>>>>>>> In this case we get equal level of overhead as the > >>> parallelism, > >>>>> but > >>>>>>> not > >>>>>>>> so > >>>>>>>>>> surplus as how many nodes in whole the cluster. > >>>>>>>>>> > >>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > >>>>> [hidden email] > >>>>>>> : > >>>>>>>>>> > >>>>>>>>>>> Alexandr, > >>>>>>>>>>> > >>>>>>>>>>> Could you describe your proposal in more details? > >>>>>>>>>>> Especially in case with several nodes. > >>>>>>>>>>> > >>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > >>>>>>>>>> [hidden email]> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> > >>>>>>>>>>>> You know CacheStore API that is commonly used for > >>>>>>> read/write-through > >>>>>>>>>>>> relationship of the in-memory data with the persistence > >>>>> storage. > >>>>>>>>>>>> > >>>>>>>>>>>> There is also IgniteCache.loadCache method for > >> hot-loading > >>>> the > >>>>>>> cache > >>>>>>>>>> on > >>>>>>>>>>>> startup. Invocation of this method causes execution of > >>>>>>>>>>> CacheStore.loadCache > >>>>>>>>>>>> on the all nodes storing the cache partitions. Because > >> of > >>>> none > >>>>>>> keys > >>>>>>>>>> are > >>>>>>>>>>>> passed to the CacheStore.loadCache methods, the > >> underlying > >>>>>>>>>> implementation > >>>>>>>>>>>> is forced to read all the data from the persistence > >>> storage, > >>>>> but > >>>>>>>> only > >>>>>>>>>>> part > >>>>>>>>>>>> of the data will be stored on each node. > >>>>>>>>>>>> > >>>>>>>>>>>> So, the current implementation have two general > >> drawbacks: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Persistence storage is forced to perform as many > >>>> identical > >>>>>>>> queries > >>>>>>>>>> as > >>>>>>>>>>>> many nodes on the cluster. Each query may involve much > >>>>>> additional > >>>>>>>>>>>> computation on the persistence storage server. > >>>>>>>>>>>> > >>>>>>>>>>>> 2. Network is forced to transfer much more data, so > >>>> obviously > >>>>>> the > >>>>>>>> big > >>>>>>>>>>>> disadvantage on large systems. > >>>>>>>>>>>> > >>>>>>>>>>>> The partition-aware data loading approach, described in > >>>>>>>>>>>> https://apacheignite.readme. > >> io/docs/data-loading#section- > >>>>>>>>>>>> partition-aware-data-loading > >>>>>>>>>>>> , is not a choice. It requires persistence of the > >> volatile > >>>>> data > >>>>>>>>>> depended > >>>>>>>>>>> on > >>>>>>>>>>>> affinity function implementation and settings. > >>>>>>>>>>>> > >>>>>>>>>>>> I propose using something like IgniteDataStreamer inside > >>>>>>>>>>>> IgniteCache.loadCache implementation. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Alexey Kuznetsov > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Thanks, > >>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Alexey Kuznetsov > >>>>>>>>> GridGain Systems > >>>>>>>>> www.gridgain.com > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Thanks, > >>>>> Alexandr Kuramshin > >>>>> > >>>> > >>> > >>> > >>> > >>> -- > >>> Thanks, > >>> Alexandr Kuramshin > >>> > >> > >> > >> > >> -- > >> Alexey Kuznetsov > >> > > |
Well, that’s clear. However, with localLoadCache the user still has to care about the fault-tolerance if the node that loads the data goes down. What if we provide an overloaded version of loadCache that will accept a number of nodes where the closure has to be executed? If the number decreases then the engine will re-execute the closure on a node that is alive.
— Denis > On Nov 15, 2016, at 2:06 PM, Valentin Kulichenko <[hidden email]> wrote: > > You can use localLoadCache method for this (it should be overloaded as well > of course). Basically, if you provide closure based on IgniteDataStreamer > and call localLoadCache on one of the nodes (client or server), it's the > same approach as described in [1], but with the possibility to reuse > existing persistence code. Makes sense? > > [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer > > -Val > > On Tue, Nov 15, 2016 at 1:15 PM, Denis Magda <[hidden email]> wrote: > >> How would your proposal resolve the main point Aleksandr is trying to >> convey that is extensive network utilization? >> >> As I see the loadCache method still will be triggered on every and as >> before all the nodes will pre-load all the data set from a database. That >> was Aleksandr’s reasonable concern. >> >> If we make up a way how to call the loadCache on a specific node only and >> implement some falt-tolerant mechanism then your suggestion should work >> perfectly fine. >> >> — >> Denis >> >>> On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko < >> [hidden email]> wrote: >>> >>> It sounds like Aleksandr is basically proposing to support automatic >>> persistence [1] for loading through data streamer and we really don't >> have >>> this. However, I think I have more generic solution in mind. >>> >>> What if we add one more IgniteCache.loadCache overload like this: >>> >>> loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> >>> clo, @Nullable >>> Object... args) >>> >>> It's the same as the existing one, but with the key-value closure >> provided >>> as a parameter. This closure will be passed to the CacheStore.loadCache >>> along with the arguments and will allow to override the logic that >> actually >>> saves the loaded entry in cache (currently this logic is always provided >> by >>> the cache itself and user can't control it). >>> >>> We can then provide the implementation of this closure that will create a >>> data streamer and call addData() within its apply() method. >>> >>> I see the following advantages: >>> >>> - Any existing CacheStore implementation can be reused to load through >>> streamer (our JDBC and Cassandra stores or anything else that user >> has). >>> - Loading code is always part of CacheStore implementation, so it's >> very >>> easy to switch between different ways of loading. >>> - User is not limited by two approaches we provide out of the box, they >>> can always implement a new one. >>> >>> Thoughts? >>> >>> [1] https://apacheignite.readme.io/docs/automatic-persistence >>> >>> -Val >>> >>> On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov <[hidden email] >>> >>> wrote: >>> >>>> Hi, All! >>>> >>>> I think we do not need to chage API at all. >>>> >>>> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable >>>> Object... args) throws CacheException; >>>> >>>> We could pass any args to loadCache(); >>>> >>>> So we could create class >>>> IgniteCacheLoadDescriptor { >>>> some fields that will describe how to load >>>> } >>>> >>>> >>>> and modify POJO store to detect and use such arguments. >>>> >>>> >>>> All we need is to implement this and write good documentation and >> examples. >>>> >>>> Thoughts? >>>> >>>> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin < >> [hidden email]> >>>> wrote: >>>> >>>>> Hi Vladimir, >>>>> >>>>> I don't offer any changes in API. Usage scenario is the same as it was >>>>> described in >>>>> https://apacheignite.readme.io/docs/persistent-store# >> section-loadcache- >>>>> >>>>> The preload cache logic invokes IgniteCache.loadCache() with some >>>>> additional arguments, depending on a CacheStore implementation, and >> then >>>>> the loading occurs in the way I've already described. >>>>> >>>>> >>>>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: >>>>> >>>>>> Hi Alex, >>>>>> >>>>>>>>> Let's give the user the reusable code which is convenient, reliable >>>>> and >>>>>> fast. >>>>>> Convenience - this is why I asked for example on how API can look like >>>>> and >>>>>> how users are going to use it. >>>>>> >>>>>> Vladimir. >>>>>> >>>>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < >>>>> [hidden email] >>>>>>> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I think the discussion goes a wrong direction. Certainly it's not a >>>> big >>>>>>> deal to implement some custom user logic to load the data into >>>> caches. >>>>>> But >>>>>>> Ignite framework gives the user some reusable code build on top of >>>> the >>>>>>> basic system. >>>>>>> >>>>>>> So the main question is: Why developers let the user to use >>>> convenient >>>>>> way >>>>>>> to load caches with totally non-optimal solution? >>>>>>> >>>>>>> We could talk too much about different persistence storage types, but >>>>>>> whenever we initiate the loading with IgniteCache.loadCache the >>>> current >>>>>>> implementation imposes much overhead on the network. >>>>>>> >>>>>>> Partition-aware data loading may be used in some scenarios to avoid >>>>> this >>>>>>> network overhead, but the users are compelled to do additional steps >>>> to >>>>>>> achieve this optimization: adding the column to tables, adding >>>> compound >>>>>>> indices including the added column, write a peace of repeatable code >>>> to >>>>>>> load the data in different caches in fault-tolerant fashion, etc. >>>>>>> >>>>>>> Let's give the user the reusable code which is convenient, reliable >>>> and >>>>>>> fast. >>>>>>> >>>>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < >>>>>>> [hidden email]>: >>>>>>> >>>>>>>> Hi Aleksandr, >>>>>>>> >>>>>>>> Data streamer is already outlined as one of the possible approaches >>>>> for >>>>>>>> loading the data [1]. Basically, you start a designated client node >>>>> or >>>>>>>> chose a leader among server nodes [1] and then use >>>> IgniteDataStreamer >>>>>> API >>>>>>>> to load the data. With this approach there is no need to have the >>>>>>>> CacheStore implementation at all. Can you please elaborate what >>>>>>> additional >>>>>>>> value are you trying to add here? >>>>>>>> >>>>>>>> [1] https://apacheignite.readme.io/docs/data-loading# >>>>>> ignitedatastreamer >>>>>>>> [2] https://apacheignite.readme.io/docs/leader-election >>>>>>>> >>>>>>>> -Val >>>>>>>> >>>>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < >>>>>>> [hidden email]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I just want to clarify a couple of API details from the original >>>>>>> to >>>>>>>>> make sure that we are making the right assumptions here. >>>>>>>>> >>>>>>>>> *"Because of none keys are passed to the CacheStore.loadCache >>>>>> methods, >>>>>>>> the >>>>>>>>>> underlying implementation is forced to read all the data from >>>> the >>>>>>>>>> persistence storage"* >>>>>>>>> >>>>>>>>> >>>>>>>>> According to the javadoc, loadCache(...) method receives an >>>>> optional >>>>>>>>> argument from the user. You can pass anything you like, >>>> including a >>>>>>> list >>>>>>>> of >>>>>>>>> keys, or an SQL where clause, etc. >>>>>>>>> >>>>>>>>> *"The partition-aware data loading approach is not a choice. It >>>>>>> requires >>>>>>>>>> persistence of the volatile data depended on affinity function >>>>>>>>>> implementation and settings."* >>>>>>>>> >>>>>>>>> >>>>>>>>> This is only partially true. While Ignite allows to plugin custom >>>>>>>> affinity >>>>>>>>> functions, the affinity function is not something that changes >>>>>>>> dynamically >>>>>>>>> and should always return the same partition for the same key.So, >>>>> the >>>>>>>>> partition assignments are not volatile at all. If, in some very >>>>> rare >>>>>>>> case, >>>>>>>>> the partition assignment logic needs to change, then you could >>>>> update >>>>>>> the >>>>>>>>> partition assignments that you may have persisted elsewhere as >>>>> well, >>>>>>> e.g. >>>>>>>>> database. >>>>>>>>> >>>>>>>>> D. >>>>>>>>> >>>>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < >>>>>>> [hidden email]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Alexandr, Alexey, >>>>>>>>>> >>>>>>>>>> While I agree with you that current cache loading logic is far >>>>> from >>>>>>>>> ideal, >>>>>>>>>> it would be cool to see API drafts based on your suggestions to >>>>> get >>>>>>>>> better >>>>>>>>>> understanding of your ideas. How exactly users are going to use >>>>>> your >>>>>>>>>> suggestions? >>>>>>>>>> >>>>>>>>>> My main concern is that initial load is not very trivial task >>>> in >>>>>>>> general >>>>>>>>>> case. Some users have centralized RDBMS systems, some have >>>> NoSQL, >>>>>>>> others >>>>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes >>>> we >>>>>>> have >>>>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. >>>>> Sharding, >>>>>>>>>> affinity, co-location, etc.. If we try to support all (or many) >>>>>> cases >>>>>>>> out >>>>>>>>>> of the box, we may end up in very messy and difficult API. So >>>> we >>>>>>> should >>>>>>>>>> carefully balance between simplicity, usability and >>>> feature-rich >>>>>>>>>> characteristics here. >>>>>>>>>> >>>>>>>>>> Personally, I think that if user is not satisfied with >>>>>> "loadCache()" >>>>>>>> API, >>>>>>>>>> he just writes simple closure with blackjack streamer and >>>> queries >>>>>> and >>>>>>>>> send >>>>>>>>>> it to whatever node he finds convenient. Not a big deal. Only >>>>> very >>>>>>>> common >>>>>>>>>> cases should be added to Ignite API. >>>>>>>>>> >>>>>>>>>> Vladimir. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < >>>>>>>>>> [hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Looks good for me. >>>>>>>>>>> >>>>>>>>>>> But I will suggest to consider one more use-case: >>>>>>>>>>> >>>>>>>>>>> If user knows its data he could manually split loading. >>>>>>>>>>> For example: table Persons contains 10M rows. >>>>>>>>>>> User could provide something like: >>>>>>>>>>> cache.loadCache(null, "Person", "select * from Person where >>>> id >>>>> < >>>>>>>>>>> 1_000_000", >>>>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and >>>> id < >>>>>>>>>> 2_000_000", >>>>>>>>>>> .... >>>>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id >>>> < >>>>>>>>>> 10_000_000", >>>>>>>>>>> ); >>>>>>>>>>> >>>>>>>>>>> or may be it could be some descriptor object like >>>>>>>>>>> >>>>>>>>>>> { >>>>>>>>>>> sql: select * from Person where id >= ? and id < ?" >>>>>>>>>>> range: 0...10_000_000 >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> In this case provided queries will be send to mach nodes as >>>>>> number >>>>>>> of >>>>>>>>>>> queries. >>>>>>>>>>> And data will be loaded in parallel and for keys that a not >>>>>> local - >>>>>>>>> data >>>>>>>>>>> streamer >>>>>>>>>>> should be used (as described Alexandr description). >>>>>>>>>>> >>>>>>>>>>> I think it is a good issue for Ignite 2.0 >>>>>>>>>>> >>>>>>>>>>> Vova, Val - what do you think? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < >>>>>>>>>> [hidden email]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> All right, >>>>>>>>>>>> >>>>>>>>>>>> Let's assume a simple scenario. When the >>>> IgniteCache.loadCache >>>>>> is >>>>>>>>>> invoked, >>>>>>>>>>>> we check whether the cache is not local, and if so, then >>>> we'll >>>>>>>>> initiate >>>>>>>>>>>> the >>>>>>>>>>>> new loading logic. >>>>>>>>>>>> >>>>>>>>>>>> First, we take a "streamer" node, it could be done by >>>>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured >>>>> statically, >>>>>>> for >>>>>>>>> the >>>>>>>>>>>> reason that the streamer node is running on the same host as >>>>> the >>>>>>>>>>>> persistence storage provider. >>>>>>>>>>>> >>>>>>>>>>>> After that we start the loading task on the streamer node >>>>> which >>>>>>>>>>>> creates IgniteDataStreamer and loads the cache with >>>>>>>>>> CacheStore.loadCache. >>>>>>>>>>>> Every call to IgniteBiInClosure.apply simply >>>>>>>>>>>> invokes IgniteDataStreamer.addData. >>>>>>>>>>>> >>>>>>>>>>>> This implementation will completely relieve overhead on the >>>>>>>>> persistence >>>>>>>>>>>> storage provider. Network overhead is also decreased in the >>>>> case >>>>>>> of >>>>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of >>>> data >>>>>>>>>> transferred >>>>>>>>>>>> by the network (1 part well be transferred from the >>>>> persistence >>>>>>>>> storage >>>>>>>>>> to >>>>>>>>>>>> the streamer, and then 1/2 from the streamer node to the >>>>> another >>>>>>>>> node). >>>>>>>>>>>> For >>>>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times >>>>>> amount >>>>>>>> of >>>>>>>>>> data >>>>>>>>>>>> on the big clusters. >>>>>>>>>>>> >>>>>>>>>>>> I'd like to propose some additional optimization at this >>>>> place. >>>>>> If >>>>>>>> we >>>>>>>>>> have >>>>>>>>>>>> the streamer node on the same machine as the persistence >>>>> storage >>>>>>>>>> provider, >>>>>>>>>>>> then we completely relieve the network overhead as well. It >>>>>> could >>>>>>>> be a >>>>>>>>>>>> some >>>>>>>>>>>> special daemon node for the cache loading assigned in the >>>>> cache >>>>>>>>>>>> configuration, or an ordinary sever node as well. >>>>>>>>>>>> >>>>>>>>>>>> Certainly this calculations have been done in assumption >>>> that >>>>> we >>>>>>>> have >>>>>>>>>> even >>>>>>>>>>>> partitioned cache with only primary nodes (without backups). >>>>> In >>>>>>> the >>>>>>>>> case >>>>>>>>>>>> of >>>>>>>>>>>> one backup (the most frequent case I think), we get 2 amount >>>>> of >>>>>>> data >>>>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, >>>> 2-1/2 >>>>>> on >>>>>>>>> four, >>>>>>>>>>>> and >>>>>>>>>>>> so on up to the three times amount of data on the big >>>>> clusters. >>>>>>>> Hence >>>>>>>>>> it's >>>>>>>>>>>> still better than the current implementation. In the worst >>>>> case >>>>>>>> with a >>>>>>>>>>>> fully replicated cache we take N+1 amount of data >>>> transferred >>>>> by >>>>>>> the >>>>>>>>>>>> network (where N is the number of nodes in the cluster). But >>>>>> it's >>>>>>>> not >>>>>>>>> a >>>>>>>>>>>> problem in small clusters, and a little overhead in big >>>>>> clusters. >>>>>>>> And >>>>>>>>> we >>>>>>>>>>>> still gain the persistence storage provider optimization. >>>>>>>>>>>> >>>>>>>>>>>> Now let's take more complex scenario. To achieve some level >>>> of >>>>>>>>>>>> parallelism, >>>>>>>>>>>> we could split our cluster on several groups. It could be a >>>>>>>> parameter >>>>>>>>> of >>>>>>>>>>>> the IgniteCache.loadCache method or a cache configuration >>>>>> option. >>>>>>>> The >>>>>>>>>>>> number of groups could be a fixed value, or it could be >>>>>> calculated >>>>>>>>>>>> dynamically by the maximum number of nodes in the group. >>>>>>>>>>>> >>>>>>>>>>>> After splitting the whole cluster on groups we will take the >>>>>>>> streamer >>>>>>>>>> node >>>>>>>>>>>> in the each group and submit the task for loading the cache >>>>>>> similar >>>>>>>> to >>>>>>>>>> the >>>>>>>>>>>> single streamer scenario, except as the only keys will be >>>>> passed >>>>>>> to >>>>>>>>>>>> the IgniteDataStreamer.addData method those correspond to >>>> the >>>>>>>> cluster >>>>>>>>>>>> group >>>>>>>>>>>> where is the streamer node running. >>>>>>>>>>>> >>>>>>>>>>>> In this case we get equal level of overhead as the >>>>> parallelism, >>>>>>> but >>>>>>>>> not >>>>>>>>>> so >>>>>>>>>>>> surplus as how many nodes in whole the cluster. >>>>>>>>>>>> >>>>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < >>>>>>> [hidden email] >>>>>>>>> : >>>>>>>>>>>> >>>>>>>>>>>>> Alexandr, >>>>>>>>>>>>> >>>>>>>>>>>>> Could you describe your proposal in more details? >>>>>>>>>>>>> Especially in case with several nodes. >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < >>>>>>>>>>>> [hidden email]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> You know CacheStore API that is commonly used for >>>>>>>>> read/write-through >>>>>>>>>>>>>> relationship of the in-memory data with the persistence >>>>>>> storage. >>>>>>>>>>>>>> >>>>>>>>>>>>>> There is also IgniteCache.loadCache method for >>>> hot-loading >>>>>> the >>>>>>>>> cache >>>>>>>>>>>> on >>>>>>>>>>>>>> startup. Invocation of this method causes execution of >>>>>>>>>>>>> CacheStore.loadCache >>>>>>>>>>>>>> on the all nodes storing the cache partitions. Because >>>> of >>>>>> none >>>>>>>>> keys >>>>>>>>>>>> are >>>>>>>>>>>>>> passed to the CacheStore.loadCache methods, the >>>> underlying >>>>>>>>>>>> implementation >>>>>>>>>>>>>> is forced to read all the data from the persistence >>>>> storage, >>>>>>> but >>>>>>>>>> only >>>>>>>>>>>>> part >>>>>>>>>>>>>> of the data will be stored on each node. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So, the current implementation have two general >>>> drawbacks: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Persistence storage is forced to perform as many >>>>>> identical >>>>>>>>>> queries >>>>>>>>>>>> as >>>>>>>>>>>>>> many nodes on the cluster. Each query may involve much >>>>>>>> additional >>>>>>>>>>>>>> computation on the persistence storage server. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Network is forced to transfer much more data, so >>>>>> obviously >>>>>>>> the >>>>>>>>>> big >>>>>>>>>>>>>> disadvantage on large systems. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The partition-aware data loading approach, described in >>>>>>>>>>>>>> https://apacheignite.readme. >>>> io/docs/data-loading#section- >>>>>>>>>>>>>> partition-aware-data-loading >>>>>>>>>>>>>> , is not a choice. It requires persistence of the >>>> volatile >>>>>>> data >>>>>>>>>>>> depended >>>>>>>>>>>>> on >>>>>>>>>>>>>> affinity function implementation and settings. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I propose using something like IgniteDataStreamer inside >>>>>>>>>>>>>> IgniteCache.loadCache implementation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Alexandr Kuramshin >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Alexey Kuznetsov >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Alexandr Kuramshin >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Alexey Kuznetsov >>>>>>>>>>> GridGain Systems >>>>>>>>>>> www.gridgain.com >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks, >>>>>>> Alexandr Kuramshin >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks, >>>>> Alexandr Kuramshin >>>>> >>>> >>>> >>>> >>>> -- >>>> Alexey Kuznetsov >>>> >> >> |
Denis,
The loading will be most likely initiated by the application anyway, even if you call localLoadCache on one of the server nodes. I.e. the flow is the following: 1. Client sends a closure to a server node (e.g. oldest or random). 2. The closure calls localLoadCache method. 3. If this server node fails (or if the loading process fails), client gets an exception and retries if needed. I would not complicate the API and implementation even more. We have compute grid API that already allows to handle things you're describing. It's very flexible and easy to use. -Val On Tue, Nov 15, 2016 at 2:20 PM, Denis Magda <[hidden email]> wrote: > Well, that’s clear. However, with localLoadCache the user still has to > care about the fault-tolerance if the node that loads the data goes down. > What if we provide an overloaded version of loadCache that will accept a > number of nodes where the closure has to be executed? If the number > decreases then the engine will re-execute the closure on a node that is > alive. > > — > Denis > > > > On Nov 15, 2016, at 2:06 PM, Valentin Kulichenko < > [hidden email]> wrote: > > > > You can use localLoadCache method for this (it should be overloaded as > well > > of course). Basically, if you provide closure based on IgniteDataStreamer > > and call localLoadCache on one of the nodes (client or server), it's the > > same approach as described in [1], but with the possibility to reuse > > existing persistence code. Makes sense? > > > > [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer > > > > -Val > > > > On Tue, Nov 15, 2016 at 1:15 PM, Denis Magda <[hidden email]> wrote: > > > >> How would your proposal resolve the main point Aleksandr is trying to > >> convey that is extensive network utilization? > >> > >> As I see the loadCache method still will be triggered on every and as > >> before all the nodes will pre-load all the data set from a database. > That > >> was Aleksandr’s reasonable concern. > >> > >> If we make up a way how to call the loadCache on a specific node only > and > >> implement some falt-tolerant mechanism then your suggestion should work > >> perfectly fine. > >> > >> — > >> Denis > >> > >>> On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko < > >> [hidden email]> wrote: > >>> > >>> It sounds like Aleksandr is basically proposing to support automatic > >>> persistence [1] for loading through data streamer and we really don't > >> have > >>> this. However, I think I have more generic solution in mind. > >>> > >>> What if we add one more IgniteCache.loadCache overload like this: > >>> > >>> loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> > >>> clo, @Nullable > >>> Object... args) > >>> > >>> It's the same as the existing one, but with the key-value closure > >> provided > >>> as a parameter. This closure will be passed to the CacheStore.loadCache > >>> along with the arguments and will allow to override the logic that > >> actually > >>> saves the loaded entry in cache (currently this logic is always > provided > >> by > >>> the cache itself and user can't control it). > >>> > >>> We can then provide the implementation of this closure that will > create a > >>> data streamer and call addData() within its apply() method. > >>> > >>> I see the following advantages: > >>> > >>> - Any existing CacheStore implementation can be reused to load through > >>> streamer (our JDBC and Cassandra stores or anything else that user > >> has). > >>> - Loading code is always part of CacheStore implementation, so it's > >> very > >>> easy to switch between different ways of loading. > >>> - User is not limited by two approaches we provide out of the box, > they > >>> can always implement a new one. > >>> > >>> Thoughts? > >>> > >>> [1] https://apacheignite.readme.io/docs/automatic-persistence > >>> > >>> -Val > >>> > >>> On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov < > [hidden email] > >>> > >>> wrote: > >>> > >>>> Hi, All! > >>>> > >>>> I think we do not need to chage API at all. > >>>> > >>>> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable > >>>> Object... args) throws CacheException; > >>>> > >>>> We could pass any args to loadCache(); > >>>> > >>>> So we could create class > >>>> IgniteCacheLoadDescriptor { > >>>> some fields that will describe how to load > >>>> } > >>>> > >>>> > >>>> and modify POJO store to detect and use such arguments. > >>>> > >>>> > >>>> All we need is to implement this and write good documentation and > >> examples. > >>>> > >>>> Thoughts? > >>>> > >>>> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin < > >> [hidden email]> > >>>> wrote: > >>>> > >>>>> Hi Vladimir, > >>>>> > >>>>> I don't offer any changes in API. Usage scenario is the same as it > was > >>>>> described in > >>>>> https://apacheignite.readme.io/docs/persistent-store# > >> section-loadcache- > >>>>> > >>>>> The preload cache logic invokes IgniteCache.loadCache() with some > >>>>> additional arguments, depending on a CacheStore implementation, and > >> then > >>>>> the loading occurs in the way I've already described. > >>>>> > >>>>> > >>>>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > >>>>> > >>>>>> Hi Alex, > >>>>>> > >>>>>>>>> Let's give the user the reusable code which is convenient, > reliable > >>>>> and > >>>>>> fast. > >>>>>> Convenience - this is why I asked for example on how API can look > like > >>>>> and > >>>>>> how users are going to use it. > >>>>>> > >>>>>> Vladimir. > >>>>>> > >>>>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < > >>>>> [hidden email] > >>>>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> I think the discussion goes a wrong direction. Certainly it's not a > >>>> big > >>>>>>> deal to implement some custom user logic to load the data into > >>>> caches. > >>>>>> But > >>>>>>> Ignite framework gives the user some reusable code build on top of > >>>> the > >>>>>>> basic system. > >>>>>>> > >>>>>>> So the main question is: Why developers let the user to use > >>>> convenient > >>>>>> way > >>>>>>> to load caches with totally non-optimal solution? > >>>>>>> > >>>>>>> We could talk too much about different persistence storage types, > but > >>>>>>> whenever we initiate the loading with IgniteCache.loadCache the > >>>> current > >>>>>>> implementation imposes much overhead on the network. > >>>>>>> > >>>>>>> Partition-aware data loading may be used in some scenarios to avoid > >>>>> this > >>>>>>> network overhead, but the users are compelled to do additional > steps > >>>> to > >>>>>>> achieve this optimization: adding the column to tables, adding > >>>> compound > >>>>>>> indices including the added column, write a peace of repeatable > code > >>>> to > >>>>>>> load the data in different caches in fault-tolerant fashion, etc. > >>>>>>> > >>>>>>> Let's give the user the reusable code which is convenient, reliable > >>>> and > >>>>>>> fast. > >>>>>>> > >>>>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > >>>>>>> [hidden email]>: > >>>>>>> > >>>>>>>> Hi Aleksandr, > >>>>>>>> > >>>>>>>> Data streamer is already outlined as one of the possible > approaches > >>>>> for > >>>>>>>> loading the data [1]. Basically, you start a designated client > node > >>>>> or > >>>>>>>> chose a leader among server nodes [1] and then use > >>>> IgniteDataStreamer > >>>>>> API > >>>>>>>> to load the data. With this approach there is no need to have the > >>>>>>>> CacheStore implementation at all. Can you please elaborate what > >>>>>>> additional > >>>>>>>> value are you trying to add here? > >>>>>>>> > >>>>>>>> [1] https://apacheignite.readme.io/docs/data-loading# > >>>>>> ignitedatastreamer > >>>>>>>> [2] https://apacheignite.readme.io/docs/leader-election > >>>>>>>> > >>>>>>>> -Val > >>>>>>>> > >>>>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > >>>>>>> [hidden email]> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> I just want to clarify a couple of API details from the original > >>>>>>> to > >>>>>>>>> make sure that we are making the right assumptions here. > >>>>>>>>> > >>>>>>>>> *"Because of none keys are passed to the CacheStore.loadCache > >>>>>> methods, > >>>>>>>> the > >>>>>>>>>> underlying implementation is forced to read all the data from > >>>> the > >>>>>>>>>> persistence storage"* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> According to the javadoc, loadCache(...) method receives an > >>>>> optional > >>>>>>>>> argument from the user. You can pass anything you like, > >>>> including a > >>>>>>> list > >>>>>>>> of > >>>>>>>>> keys, or an SQL where clause, etc. > >>>>>>>>> > >>>>>>>>> *"The partition-aware data loading approach is not a choice. It > >>>>>>> requires > >>>>>>>>>> persistence of the volatile data depended on affinity function > >>>>>>>>>> implementation and settings."* > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> This is only partially true. While Ignite allows to plugin custom > >>>>>>>> affinity > >>>>>>>>> functions, the affinity function is not something that changes > >>>>>>>> dynamically > >>>>>>>>> and should always return the same partition for the same key.So, > >>>>> the > >>>>>>>>> partition assignments are not volatile at all. If, in some very > >>>>> rare > >>>>>>>> case, > >>>>>>>>> the partition assignment logic needs to change, then you could > >>>>> update > >>>>>>> the > >>>>>>>>> partition assignments that you may have persisted elsewhere as > >>>>> well, > >>>>>>> e.g. > >>>>>>>>> database. > >>>>>>>>> > >>>>>>>>> D. > >>>>>>>>> > >>>>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > >>>>>>> [hidden email]> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Alexandr, Alexey, > >>>>>>>>>> > >>>>>>>>>> While I agree with you that current cache loading logic is far > >>>>> from > >>>>>>>>> ideal, > >>>>>>>>>> it would be cool to see API drafts based on your suggestions to > >>>>> get > >>>>>>>>> better > >>>>>>>>>> understanding of your ideas. How exactly users are going to use > >>>>>> your > >>>>>>>>>> suggestions? > >>>>>>>>>> > >>>>>>>>>> My main concern is that initial load is not very trivial task > >>>> in > >>>>>>>> general > >>>>>>>>>> case. Some users have centralized RDBMS systems, some have > >>>> NoSQL, > >>>>>>>> others > >>>>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes > >>>> we > >>>>>>> have > >>>>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. > >>>>> Sharding, > >>>>>>>>>> affinity, co-location, etc.. If we try to support all (or many) > >>>>>> cases > >>>>>>>> out > >>>>>>>>>> of the box, we may end up in very messy and difficult API. So > >>>> we > >>>>>>> should > >>>>>>>>>> carefully balance between simplicity, usability and > >>>> feature-rich > >>>>>>>>>> characteristics here. > >>>>>>>>>> > >>>>>>>>>> Personally, I think that if user is not satisfied with > >>>>>> "loadCache()" > >>>>>>>> API, > >>>>>>>>>> he just writes simple closure with blackjack streamer and > >>>> queries > >>>>>> and > >>>>>>>>> send > >>>>>>>>>> it to whatever node he finds convenient. Not a big deal. Only > >>>>> very > >>>>>>>> common > >>>>>>>>>> cases should be added to Ignite API. > >>>>>>>>>> > >>>>>>>>>> Vladimir. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > >>>>>>>>>> [hidden email]> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Looks good for me. > >>>>>>>>>>> > >>>>>>>>>>> But I will suggest to consider one more use-case: > >>>>>>>>>>> > >>>>>>>>>>> If user knows its data he could manually split loading. > >>>>>>>>>>> For example: table Persons contains 10M rows. > >>>>>>>>>>> User could provide something like: > >>>>>>>>>>> cache.loadCache(null, "Person", "select * from Person where > >>>> id > >>>>> < > >>>>>>>>>>> 1_000_000", > >>>>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and > >>>> id < > >>>>>>>>>> 2_000_000", > >>>>>>>>>>> .... > >>>>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id > >>>> < > >>>>>>>>>> 10_000_000", > >>>>>>>>>>> ); > >>>>>>>>>>> > >>>>>>>>>>> or may be it could be some descriptor object like > >>>>>>>>>>> > >>>>>>>>>>> { > >>>>>>>>>>> sql: select * from Person where id >= ? and id < ?" > >>>>>>>>>>> range: 0...10_000_000 > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> In this case provided queries will be send to mach nodes as > >>>>>> number > >>>>>>> of > >>>>>>>>>>> queries. > >>>>>>>>>>> And data will be loaded in parallel and for keys that a not > >>>>>> local - > >>>>>>>>> data > >>>>>>>>>>> streamer > >>>>>>>>>>> should be used (as described Alexandr description). > >>>>>>>>>>> > >>>>>>>>>>> I think it is a good issue for Ignite 2.0 > >>>>>>>>>>> > >>>>>>>>>>> Vova, Val - what do you think? > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > >>>>>>>>>> [hidden email]> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> All right, > >>>>>>>>>>>> > >>>>>>>>>>>> Let's assume a simple scenario. When the > >>>> IgniteCache.loadCache > >>>>>> is > >>>>>>>>>> invoked, > >>>>>>>>>>>> we check whether the cache is not local, and if so, then > >>>> we'll > >>>>>>>>> initiate > >>>>>>>>>>>> the > >>>>>>>>>>>> new loading logic. > >>>>>>>>>>>> > >>>>>>>>>>>> First, we take a "streamer" node, it could be done by > >>>>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured > >>>>> statically, > >>>>>>> for > >>>>>>>>> the > >>>>>>>>>>>> reason that the streamer node is running on the same host as > >>>>> the > >>>>>>>>>>>> persistence storage provider. > >>>>>>>>>>>> > >>>>>>>>>>>> After that we start the loading task on the streamer node > >>>>> which > >>>>>>>>>>>> creates IgniteDataStreamer and loads the cache with > >>>>>>>>>> CacheStore.loadCache. > >>>>>>>>>>>> Every call to IgniteBiInClosure.apply simply > >>>>>>>>>>>> invokes IgniteDataStreamer.addData. > >>>>>>>>>>>> > >>>>>>>>>>>> This implementation will completely relieve overhead on the > >>>>>>>>> persistence > >>>>>>>>>>>> storage provider. Network overhead is also decreased in the > >>>>> case > >>>>>>> of > >>>>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of > >>>> data > >>>>>>>>>> transferred > >>>>>>>>>>>> by the network (1 part well be transferred from the > >>>>> persistence > >>>>>>>>> storage > >>>>>>>>>> to > >>>>>>>>>>>> the streamer, and then 1/2 from the streamer node to the > >>>>> another > >>>>>>>>> node). > >>>>>>>>>>>> For > >>>>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times > >>>>>> amount > >>>>>>>> of > >>>>>>>>>> data > >>>>>>>>>>>> on the big clusters. > >>>>>>>>>>>> > >>>>>>>>>>>> I'd like to propose some additional optimization at this > >>>>> place. > >>>>>> If > >>>>>>>> we > >>>>>>>>>> have > >>>>>>>>>>>> the streamer node on the same machine as the persistence > >>>>> storage > >>>>>>>>>> provider, > >>>>>>>>>>>> then we completely relieve the network overhead as well. It > >>>>>> could > >>>>>>>> be a > >>>>>>>>>>>> some > >>>>>>>>>>>> special daemon node for the cache loading assigned in the > >>>>> cache > >>>>>>>>>>>> configuration, or an ordinary sever node as well. > >>>>>>>>>>>> > >>>>>>>>>>>> Certainly this calculations have been done in assumption > >>>> that > >>>>> we > >>>>>>>> have > >>>>>>>>>> even > >>>>>>>>>>>> partitioned cache with only primary nodes (without backups). > >>>>> In > >>>>>>> the > >>>>>>>>> case > >>>>>>>>>>>> of > >>>>>>>>>>>> one backup (the most frequent case I think), we get 2 amount > >>>>> of > >>>>>>> data > >>>>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, > >>>> 2-1/2 > >>>>>> on > >>>>>>>>> four, > >>>>>>>>>>>> and > >>>>>>>>>>>> so on up to the three times amount of data on the big > >>>>> clusters. > >>>>>>>> Hence > >>>>>>>>>> it's > >>>>>>>>>>>> still better than the current implementation. In the worst > >>>>> case > >>>>>>>> with a > >>>>>>>>>>>> fully replicated cache we take N+1 amount of data > >>>> transferred > >>>>> by > >>>>>>> the > >>>>>>>>>>>> network (where N is the number of nodes in the cluster). But > >>>>>> it's > >>>>>>>> not > >>>>>>>>> a > >>>>>>>>>>>> problem in small clusters, and a little overhead in big > >>>>>> clusters. > >>>>>>>> And > >>>>>>>>> we > >>>>>>>>>>>> still gain the persistence storage provider optimization. > >>>>>>>>>>>> > >>>>>>>>>>>> Now let's take more complex scenario. To achieve some level > >>>> of > >>>>>>>>>>>> parallelism, > >>>>>>>>>>>> we could split our cluster on several groups. It could be a > >>>>>>>> parameter > >>>>>>>>> of > >>>>>>>>>>>> the IgniteCache.loadCache method or a cache configuration > >>>>>> option. > >>>>>>>> The > >>>>>>>>>>>> number of groups could be a fixed value, or it could be > >>>>>> calculated > >>>>>>>>>>>> dynamically by the maximum number of nodes in the group. > >>>>>>>>>>>> > >>>>>>>>>>>> After splitting the whole cluster on groups we will take the > >>>>>>>> streamer > >>>>>>>>>> node > >>>>>>>>>>>> in the each group and submit the task for loading the cache > >>>>>>> similar > >>>>>>>> to > >>>>>>>>>> the > >>>>>>>>>>>> single streamer scenario, except as the only keys will be > >>>>> passed > >>>>>>> to > >>>>>>>>>>>> the IgniteDataStreamer.addData method those correspond to > >>>> the > >>>>>>>> cluster > >>>>>>>>>>>> group > >>>>>>>>>>>> where is the streamer node running. > >>>>>>>>>>>> > >>>>>>>>>>>> In this case we get equal level of overhead as the > >>>>> parallelism, > >>>>>>> but > >>>>>>>>> not > >>>>>>>>>> so > >>>>>>>>>>>> surplus as how many nodes in whole the cluster. > >>>>>>>>>>>> > >>>>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > >>>>>>> [hidden email] > >>>>>>>>> : > >>>>>>>>>>>> > >>>>>>>>>>>>> Alexandr, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Could you describe your proposal in more details? > >>>>>>>>>>>>> Especially in case with several nodes. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > >>>>>>>>>>>> [hidden email]> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> You know CacheStore API that is commonly used for > >>>>>>>>> read/write-through > >>>>>>>>>>>>>> relationship of the in-memory data with the persistence > >>>>>>> storage. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> There is also IgniteCache.loadCache method for > >>>> hot-loading > >>>>>> the > >>>>>>>>> cache > >>>>>>>>>>>> on > >>>>>>>>>>>>>> startup. Invocation of this method causes execution of > >>>>>>>>>>>>> CacheStore.loadCache > >>>>>>>>>>>>>> on the all nodes storing the cache partitions. Because > >>>> of > >>>>>> none > >>>>>>>>> keys > >>>>>>>>>>>> are > >>>>>>>>>>>>>> passed to the CacheStore.loadCache methods, the > >>>> underlying > >>>>>>>>>>>> implementation > >>>>>>>>>>>>>> is forced to read all the data from the persistence > >>>>> storage, > >>>>>>> but > >>>>>>>>>> only > >>>>>>>>>>>>> part > >>>>>>>>>>>>>> of the data will be stored on each node. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> So, the current implementation have two general > >>>> drawbacks: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 1. Persistence storage is forced to perform as many > >>>>>> identical > >>>>>>>>>> queries > >>>>>>>>>>>> as > >>>>>>>>>>>>>> many nodes on the cluster. Each query may involve much > >>>>>>>> additional > >>>>>>>>>>>>>> computation on the persistence storage server. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2. Network is forced to transfer much more data, so > >>>>>> obviously > >>>>>>>> the > >>>>>>>>>> big > >>>>>>>>>>>>>> disadvantage on large systems. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The partition-aware data loading approach, described in > >>>>>>>>>>>>>> https://apacheignite.readme. > >>>> io/docs/data-loading#section- > >>>>>>>>>>>>>> partition-aware-data-loading > >>>>>>>>>>>>>> , is not a choice. It requires persistence of the > >>>> volatile > >>>>>>> data > >>>>>>>>>>>> depended > >>>>>>>>>>>>> on > >>>>>>>>>>>>>> affinity function implementation and settings. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I propose using something like IgniteDataStreamer inside > >>>>>>>>>>>>>> IgniteCache.loadCache implementation. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> Alexey Kuznetsov > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Alexey Kuznetsov > >>>>>>>>>>> GridGain Systems > >>>>>>>>>>> www.gridgain.com > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Thanks, > >>>>>>> Alexandr Kuramshin > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> Thanks, > >>>>> Alexandr Kuramshin > >>>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> Alexey Kuznetsov > >>>> > >> > >> > > |
Val,
Then I would create a blog post on how to use the new API proposed by you to accomplish the scenario described by Alexandr. Are you willing to write the post once the API is implemented? Alexandr, do you think the API proposed by Val will resolve your case when it’s used as listed below? If it’s so are you interested to take over the implementation and contribute to Apache Ignite? — Denis > On Nov 15, 2016, at 2:30 PM, Valentin Kulichenko <[hidden email]> wrote: > > Denis, > > The loading will be most likely initiated by the application anyway, even > if you call localLoadCache on one of the server nodes. I.e. the flow is the > following: > > 1. Client sends a closure to a server node (e.g. oldest or random). > 2. The closure calls localLoadCache method. > 3. If this server node fails (or if the loading process fails), client > gets an exception and retries if needed. > > I would not complicate the API and implementation even more. We have > compute grid API that already allows to handle things you're describing. > It's very flexible and easy to use. > > -Val > > On Tue, Nov 15, 2016 at 2:20 PM, Denis Magda <[hidden email]> wrote: > >> Well, that’s clear. However, with localLoadCache the user still has to >> care about the fault-tolerance if the node that loads the data goes down. >> What if we provide an overloaded version of loadCache that will accept a >> number of nodes where the closure has to be executed? If the number >> decreases then the engine will re-execute the closure on a node that is >> alive. >> >> — >> Denis >> >> >>> On Nov 15, 2016, at 2:06 PM, Valentin Kulichenko < >> [hidden email]> wrote: >>> >>> You can use localLoadCache method for this (it should be overloaded as >> well >>> of course). Basically, if you provide closure based on IgniteDataStreamer >>> and call localLoadCache on one of the nodes (client or server), it's the >>> same approach as described in [1], but with the possibility to reuse >>> existing persistence code. Makes sense? >>> >>> [1] https://apacheignite.readme.io/docs/data-loading#ignitedatastreamer >>> >>> -Val >>> >>> On Tue, Nov 15, 2016 at 1:15 PM, Denis Magda <[hidden email]> wrote: >>> >>>> How would your proposal resolve the main point Aleksandr is trying to >>>> convey that is extensive network utilization? >>>> >>>> As I see the loadCache method still will be triggered on every and as >>>> before all the nodes will pre-load all the data set from a database. >> That >>>> was Aleksandr’s reasonable concern. >>>> >>>> If we make up a way how to call the loadCache on a specific node only >> and >>>> implement some falt-tolerant mechanism then your suggestion should work >>>> perfectly fine. >>>> >>>> — >>>> Denis >>>> >>>>> On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko < >>>> [hidden email]> wrote: >>>>> >>>>> It sounds like Aleksandr is basically proposing to support automatic >>>>> persistence [1] for loading through data streamer and we really don't >>>> have >>>>> this. However, I think I have more generic solution in mind. >>>>> >>>>> What if we add one more IgniteCache.loadCache overload like this: >>>>> >>>>> loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, V> >>>>> clo, @Nullable >>>>> Object... args) >>>>> >>>>> It's the same as the existing one, but with the key-value closure >>>> provided >>>>> as a parameter. This closure will be passed to the CacheStore.loadCache >>>>> along with the arguments and will allow to override the logic that >>>> actually >>>>> saves the loaded entry in cache (currently this logic is always >> provided >>>> by >>>>> the cache itself and user can't control it). >>>>> >>>>> We can then provide the implementation of this closure that will >> create a >>>>> data streamer and call addData() within its apply() method. >>>>> >>>>> I see the following advantages: >>>>> >>>>> - Any existing CacheStore implementation can be reused to load through >>>>> streamer (our JDBC and Cassandra stores or anything else that user >>>> has). >>>>> - Loading code is always part of CacheStore implementation, so it's >>>> very >>>>> easy to switch between different ways of loading. >>>>> - User is not limited by two approaches we provide out of the box, >> they >>>>> can always implement a new one. >>>>> >>>>> Thoughts? >>>>> >>>>> [1] https://apacheignite.readme.io/docs/automatic-persistence >>>>> >>>>> -Val >>>>> >>>>> On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov < >> [hidden email] >>>>> >>>>> wrote: >>>>> >>>>>> Hi, All! >>>>>> >>>>>> I think we do not need to chage API at all. >>>>>> >>>>>> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable >>>>>> Object... args) throws CacheException; >>>>>> >>>>>> We could pass any args to loadCache(); >>>>>> >>>>>> So we could create class >>>>>> IgniteCacheLoadDescriptor { >>>>>> some fields that will describe how to load >>>>>> } >>>>>> >>>>>> >>>>>> and modify POJO store to detect and use such arguments. >>>>>> >>>>>> >>>>>> All we need is to implement this and write good documentation and >>>> examples. >>>>>> >>>>>> Thoughts? >>>>>> >>>>>> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin < >>>> [hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Hi Vladimir, >>>>>>> >>>>>>> I don't offer any changes in API. Usage scenario is the same as it >> was >>>>>>> described in >>>>>>> https://apacheignite.readme.io/docs/persistent-store# >>>> section-loadcache- >>>>>>> >>>>>>> The preload cache logic invokes IgniteCache.loadCache() with some >>>>>>> additional arguments, depending on a CacheStore implementation, and >>>> then >>>>>>> the loading occurs in the way I've already described. >>>>>>> >>>>>>> >>>>>>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: >>>>>>> >>>>>>>> Hi Alex, >>>>>>>> >>>>>>>>>>> Let's give the user the reusable code which is convenient, >> reliable >>>>>>> and >>>>>>>> fast. >>>>>>>> Convenience - this is why I asked for example on how API can look >> like >>>>>>> and >>>>>>>> how users are going to use it. >>>>>>>> >>>>>>>> Vladimir. >>>>>>>> >>>>>>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < >>>>>>> [hidden email] >>>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I think the discussion goes a wrong direction. Certainly it's not a >>>>>> big >>>>>>>>> deal to implement some custom user logic to load the data into >>>>>> caches. >>>>>>>> But >>>>>>>>> Ignite framework gives the user some reusable code build on top of >>>>>> the >>>>>>>>> basic system. >>>>>>>>> >>>>>>>>> So the main question is: Why developers let the user to use >>>>>> convenient >>>>>>>> way >>>>>>>>> to load caches with totally non-optimal solution? >>>>>>>>> >>>>>>>>> We could talk too much about different persistence storage types, >> but >>>>>>>>> whenever we initiate the loading with IgniteCache.loadCache the >>>>>> current >>>>>>>>> implementation imposes much overhead on the network. >>>>>>>>> >>>>>>>>> Partition-aware data loading may be used in some scenarios to avoid >>>>>>> this >>>>>>>>> network overhead, but the users are compelled to do additional >> steps >>>>>> to >>>>>>>>> achieve this optimization: adding the column to tables, adding >>>>>> compound >>>>>>>>> indices including the added column, write a peace of repeatable >> code >>>>>> to >>>>>>>>> load the data in different caches in fault-tolerant fashion, etc. >>>>>>>>> >>>>>>>>> Let's give the user the reusable code which is convenient, reliable >>>>>> and >>>>>>>>> fast. >>>>>>>>> >>>>>>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < >>>>>>>>> [hidden email]>: >>>>>>>>> >>>>>>>>>> Hi Aleksandr, >>>>>>>>>> >>>>>>>>>> Data streamer is already outlined as one of the possible >> approaches >>>>>>> for >>>>>>>>>> loading the data [1]. Basically, you start a designated client >> node >>>>>>> or >>>>>>>>>> chose a leader among server nodes [1] and then use >>>>>> IgniteDataStreamer >>>>>>>> API >>>>>>>>>> to load the data. With this approach there is no need to have the >>>>>>>>>> CacheStore implementation at all. Can you please elaborate what >>>>>>>>> additional >>>>>>>>>> value are you trying to add here? >>>>>>>>>> >>>>>>>>>> [1] https://apacheignite.readme.io/docs/data-loading# >>>>>>>> ignitedatastreamer >>>>>>>>>> [2] https://apacheignite.readme.io/docs/leader-election >>>>>>>>>> >>>>>>>>>> -Val >>>>>>>>>> >>>>>>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < >>>>>>>>> [hidden email]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I just want to clarify a couple of API details from the original >>>>>>>>> to >>>>>>>>>>> make sure that we are making the right assumptions here. >>>>>>>>>>> >>>>>>>>>>> *"Because of none keys are passed to the CacheStore.loadCache >>>>>>>> methods, >>>>>>>>>> the >>>>>>>>>>>> underlying implementation is forced to read all the data from >>>>>> the >>>>>>>>>>>> persistence storage"* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> According to the javadoc, loadCache(...) method receives an >>>>>>> optional >>>>>>>>>>> argument from the user. You can pass anything you like, >>>>>> including a >>>>>>>>> list >>>>>>>>>> of >>>>>>>>>>> keys, or an SQL where clause, etc. >>>>>>>>>>> >>>>>>>>>>> *"The partition-aware data loading approach is not a choice. It >>>>>>>>> requires >>>>>>>>>>>> persistence of the volatile data depended on affinity function >>>>>>>>>>>> implementation and settings."* >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is only partially true. While Ignite allows to plugin custom >>>>>>>>>> affinity >>>>>>>>>>> functions, the affinity function is not something that changes >>>>>>>>>> dynamically >>>>>>>>>>> and should always return the same partition for the same key.So, >>>>>>> the >>>>>>>>>>> partition assignments are not volatile at all. If, in some very >>>>>>> rare >>>>>>>>>> case, >>>>>>>>>>> the partition assignment logic needs to change, then you could >>>>>>> update >>>>>>>>> the >>>>>>>>>>> partition assignments that you may have persisted elsewhere as >>>>>>> well, >>>>>>>>> e.g. >>>>>>>>>>> database. >>>>>>>>>>> >>>>>>>>>>> D. >>>>>>>>>>> >>>>>>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < >>>>>>>>> [hidden email]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Alexandr, Alexey, >>>>>>>>>>>> >>>>>>>>>>>> While I agree with you that current cache loading logic is far >>>>>>> from >>>>>>>>>>> ideal, >>>>>>>>>>>> it would be cool to see API drafts based on your suggestions to >>>>>>> get >>>>>>>>>>> better >>>>>>>>>>>> understanding of your ideas. How exactly users are going to use >>>>>>>> your >>>>>>>>>>>> suggestions? >>>>>>>>>>>> >>>>>>>>>>>> My main concern is that initial load is not very trivial task >>>>>> in >>>>>>>>>> general >>>>>>>>>>>> case. Some users have centralized RDBMS systems, some have >>>>>> NoSQL, >>>>>>>>>> others >>>>>>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes >>>>>> we >>>>>>>>> have >>>>>>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. >>>>>>> Sharding, >>>>>>>>>>>> affinity, co-location, etc.. If we try to support all (or many) >>>>>>>> cases >>>>>>>>>> out >>>>>>>>>>>> of the box, we may end up in very messy and difficult API. So >>>>>> we >>>>>>>>> should >>>>>>>>>>>> carefully balance between simplicity, usability and >>>>>> feature-rich >>>>>>>>>>>> characteristics here. >>>>>>>>>>>> >>>>>>>>>>>> Personally, I think that if user is not satisfied with >>>>>>>> "loadCache()" >>>>>>>>>> API, >>>>>>>>>>>> he just writes simple closure with blackjack streamer and >>>>>> queries >>>>>>>> and >>>>>>>>>>> send >>>>>>>>>>>> it to whatever node he finds convenient. Not a big deal. Only >>>>>>> very >>>>>>>>>> common >>>>>>>>>>>> cases should be added to Ignite API. >>>>>>>>>>>> >>>>>>>>>>>> Vladimir. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < >>>>>>>>>>>> [hidden email]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Looks good for me. >>>>>>>>>>>>> >>>>>>>>>>>>> But I will suggest to consider one more use-case: >>>>>>>>>>>>> >>>>>>>>>>>>> If user knows its data he could manually split loading. >>>>>>>>>>>>> For example: table Persons contains 10M rows. >>>>>>>>>>>>> User could provide something like: >>>>>>>>>>>>> cache.loadCache(null, "Person", "select * from Person where >>>>>> id >>>>>>> < >>>>>>>>>>>>> 1_000_000", >>>>>>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and >>>>>> id < >>>>>>>>>>>> 2_000_000", >>>>>>>>>>>>> .... >>>>>>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id >>>>>> < >>>>>>>>>>>> 10_000_000", >>>>>>>>>>>>> ); >>>>>>>>>>>>> >>>>>>>>>>>>> or may be it could be some descriptor object like >>>>>>>>>>>>> >>>>>>>>>>>>> { >>>>>>>>>>>>> sql: select * from Person where id >= ? and id < ?" >>>>>>>>>>>>> range: 0...10_000_000 >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> In this case provided queries will be send to mach nodes as >>>>>>>> number >>>>>>>>> of >>>>>>>>>>>>> queries. >>>>>>>>>>>>> And data will be loaded in parallel and for keys that a not >>>>>>>> local - >>>>>>>>>>> data >>>>>>>>>>>>> streamer >>>>>>>>>>>>> should be used (as described Alexandr description). >>>>>>>>>>>>> >>>>>>>>>>>>> I think it is a good issue for Ignite 2.0 >>>>>>>>>>>>> >>>>>>>>>>>>> Vova, Val - what do you think? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < >>>>>>>>>>>> [hidden email]> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> All right, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Let's assume a simple scenario. When the >>>>>> IgniteCache.loadCache >>>>>>>> is >>>>>>>>>>>> invoked, >>>>>>>>>>>>>> we check whether the cache is not local, and if so, then >>>>>> we'll >>>>>>>>>>> initiate >>>>>>>>>>>>>> the >>>>>>>>>>>>>> new loading logic. >>>>>>>>>>>>>> >>>>>>>>>>>>>> First, we take a "streamer" node, it could be done by >>>>>>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured >>>>>>> statically, >>>>>>>>> for >>>>>>>>>>> the >>>>>>>>>>>>>> reason that the streamer node is running on the same host as >>>>>>> the >>>>>>>>>>>>>> persistence storage provider. >>>>>>>>>>>>>> >>>>>>>>>>>>>> After that we start the loading task on the streamer node >>>>>>> which >>>>>>>>>>>>>> creates IgniteDataStreamer and loads the cache with >>>>>>>>>>>> CacheStore.loadCache. >>>>>>>>>>>>>> Every call to IgniteBiInClosure.apply simply >>>>>>>>>>>>>> invokes IgniteDataStreamer.addData. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This implementation will completely relieve overhead on the >>>>>>>>>>> persistence >>>>>>>>>>>>>> storage provider. Network overhead is also decreased in the >>>>>>> case >>>>>>>>> of >>>>>>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of >>>>>> data >>>>>>>>>>>> transferred >>>>>>>>>>>>>> by the network (1 part well be transferred from the >>>>>>> persistence >>>>>>>>>>> storage >>>>>>>>>>>> to >>>>>>>>>>>>>> the streamer, and then 1/2 from the streamer node to the >>>>>>> another >>>>>>>>>>> node). >>>>>>>>>>>>>> For >>>>>>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times >>>>>>>> amount >>>>>>>>>> of >>>>>>>>>>>> data >>>>>>>>>>>>>> on the big clusters. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'd like to propose some additional optimization at this >>>>>>> place. >>>>>>>> If >>>>>>>>>> we >>>>>>>>>>>> have >>>>>>>>>>>>>> the streamer node on the same machine as the persistence >>>>>>> storage >>>>>>>>>>>> provider, >>>>>>>>>>>>>> then we completely relieve the network overhead as well. It >>>>>>>> could >>>>>>>>>> be a >>>>>>>>>>>>>> some >>>>>>>>>>>>>> special daemon node for the cache loading assigned in the >>>>>>> cache >>>>>>>>>>>>>> configuration, or an ordinary sever node as well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Certainly this calculations have been done in assumption >>>>>> that >>>>>>> we >>>>>>>>>> have >>>>>>>>>>>> even >>>>>>>>>>>>>> partitioned cache with only primary nodes (without backups). >>>>>>> In >>>>>>>>> the >>>>>>>>>>> case >>>>>>>>>>>>>> of >>>>>>>>>>>>>> one backup (the most frequent case I think), we get 2 amount >>>>>>> of >>>>>>>>> data >>>>>>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, >>>>>> 2-1/2 >>>>>>>> on >>>>>>>>>>> four, >>>>>>>>>>>>>> and >>>>>>>>>>>>>> so on up to the three times amount of data on the big >>>>>>> clusters. >>>>>>>>>> Hence >>>>>>>>>>>> it's >>>>>>>>>>>>>> still better than the current implementation. In the worst >>>>>>> case >>>>>>>>>> with a >>>>>>>>>>>>>> fully replicated cache we take N+1 amount of data >>>>>> transferred >>>>>>> by >>>>>>>>> the >>>>>>>>>>>>>> network (where N is the number of nodes in the cluster). But >>>>>>>> it's >>>>>>>>>> not >>>>>>>>>>> a >>>>>>>>>>>>>> problem in small clusters, and a little overhead in big >>>>>>>> clusters. >>>>>>>>>> And >>>>>>>>>>> we >>>>>>>>>>>>>> still gain the persistence storage provider optimization. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now let's take more complex scenario. To achieve some level >>>>>> of >>>>>>>>>>>>>> parallelism, >>>>>>>>>>>>>> we could split our cluster on several groups. It could be a >>>>>>>>>> parameter >>>>>>>>>>> of >>>>>>>>>>>>>> the IgniteCache.loadCache method or a cache configuration >>>>>>>> option. >>>>>>>>>> The >>>>>>>>>>>>>> number of groups could be a fixed value, or it could be >>>>>>>> calculated >>>>>>>>>>>>>> dynamically by the maximum number of nodes in the group. >>>>>>>>>>>>>> >>>>>>>>>>>>>> After splitting the whole cluster on groups we will take the >>>>>>>>>> streamer >>>>>>>>>>>> node >>>>>>>>>>>>>> in the each group and submit the task for loading the cache >>>>>>>>> similar >>>>>>>>>> to >>>>>>>>>>>> the >>>>>>>>>>>>>> single streamer scenario, except as the only keys will be >>>>>>> passed >>>>>>>>> to >>>>>>>>>>>>>> the IgniteDataStreamer.addData method those correspond to >>>>>> the >>>>>>>>>> cluster >>>>>>>>>>>>>> group >>>>>>>>>>>>>> where is the streamer node running. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In this case we get equal level of overhead as the >>>>>>> parallelism, >>>>>>>>> but >>>>>>>>>>> not >>>>>>>>>>>> so >>>>>>>>>>>>>> surplus as how many nodes in whole the cluster. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < >>>>>>>>> [hidden email] >>>>>>>>>>> : >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Alexandr, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could you describe your proposal in more details? >>>>>>>>>>>>>>> Especially in case with several nodes. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < >>>>>>>>>>>>>> [hidden email]> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> You know CacheStore API that is commonly used for >>>>>>>>>>> read/write-through >>>>>>>>>>>>>>>> relationship of the in-memory data with the persistence >>>>>>>>> storage. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> There is also IgniteCache.loadCache method for >>>>>> hot-loading >>>>>>>> the >>>>>>>>>>> cache >>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> startup. Invocation of this method causes execution of >>>>>>>>>>>>>>> CacheStore.loadCache >>>>>>>>>>>>>>>> on the all nodes storing the cache partitions. Because >>>>>> of >>>>>>>> none >>>>>>>>>>> keys >>>>>>>>>>>>>> are >>>>>>>>>>>>>>>> passed to the CacheStore.loadCache methods, the >>>>>> underlying >>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>> is forced to read all the data from the persistence >>>>>>> storage, >>>>>>>>> but >>>>>>>>>>>> only >>>>>>>>>>>>>>> part >>>>>>>>>>>>>>>> of the data will be stored on each node. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So, the current implementation have two general >>>>>> drawbacks: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. Persistence storage is forced to perform as many >>>>>>>> identical >>>>>>>>>>>> queries >>>>>>>>>>>>>> as >>>>>>>>>>>>>>>> many nodes on the cluster. Each query may involve much >>>>>>>>>> additional >>>>>>>>>>>>>>>> computation on the persistence storage server. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2. Network is forced to transfer much more data, so >>>>>>>> obviously >>>>>>>>>> the >>>>>>>>>>>> big >>>>>>>>>>>>>>>> disadvantage on large systems. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The partition-aware data loading approach, described in >>>>>>>>>>>>>>>> https://apacheignite.readme. >>>>>> io/docs/data-loading#section- >>>>>>>>>>>>>>>> partition-aware-data-loading >>>>>>>>>>>>>>>> , is not a choice. It requires persistence of the >>>>>> volatile >>>>>>>>> data >>>>>>>>>>>>>> depended >>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>> affinity function implementation and settings. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I propose using something like IgniteDataStreamer inside >>>>>>>>>>>>>>>> IgniteCache.loadCache implementation. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Alexandr Kuramshin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Alexey Kuznetsov >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Alexandr Kuramshin >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Alexey Kuznetsov >>>>>>>>>>>>> GridGain Systems >>>>>>>>>>>>> www.gridgain.com >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Thanks, >>>>>>>>> Alexandr Kuramshin >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Thanks, >>>>>>> Alexandr Kuramshin >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Alexey Kuznetsov >>>>>> >>>> >>>> >> >> |
Hi all,
Denis, thank you for the explanation, your understanding of the question is the most closest to mine. The extension of the method IgniteCache.loadCache by adding an IgniteClosure is a handy feature which may be useful in some cases, but not addresses the problem of extensive network utilization. Actually I vote against that extension - uses of that method will have the same overhead on the network. IgniteCache.localLoadCache, as its name tells, should only load entities for the local cache partitions, and the such filtering should be done before invoking the predicate, to minimize the unnecessary analyzing of the entities will not be stored in the cache. So extension of the method with IgniteClosure does not resolve the problem, because the IgniteClosure should be called after the IgnitePredicate has done its filtering. The last argument, is that any extension of the API does not affect last usages of the non-optimized method IgniteCache.loadCache. And my wish and my will are to re-implement the IgniteCache.loadCache. After the re-implementation has been done, we can extend the API by adding additional arguments like IgniteClosure to make cache store operations customizable. 2016-11-16 3:51 GMT+03:00 Denis Magda <[hidden email]>: > Val, > > Then I would create a blog post on how to use the new API proposed by you > to accomplish the scenario described by Alexandr. Are you willing to write > the post once the API is implemented? > > Alexandr, do you think the API proposed by Val will resolve your case when > it’s used as listed below? If it’s so are you interested to take over the > implementation and contribute to Apache Ignite? > > — > Denis > > > On Nov 15, 2016, at 2:30 PM, Valentin Kulichenko < > [hidden email]> wrote: > > > > Denis, > > > > The loading will be most likely initiated by the application anyway, even > > if you call localLoadCache on one of the server nodes. I.e. the flow is > the > > following: > > > > 1. Client sends a closure to a server node (e.g. oldest or random). > > 2. The closure calls localLoadCache method. > > 3. If this server node fails (or if the loading process fails), client > > gets an exception and retries if needed. > > > > I would not complicate the API and implementation even more. We have > > compute grid API that already allows to handle things you're describing. > > It's very flexible and easy to use. > > > > -Val > > > > On Tue, Nov 15, 2016 at 2:20 PM, Denis Magda <[hidden email]> wrote: > > > >> Well, that’s clear. However, with localLoadCache the user still has to > >> care about the fault-tolerance if the node that loads the data goes > down. > >> What if we provide an overloaded version of loadCache that will accept a > >> number of nodes where the closure has to be executed? If the number > >> decreases then the engine will re-execute the closure on a node that is > >> alive. > >> > >> — > >> Denis > >> > >> > >>> On Nov 15, 2016, at 2:06 PM, Valentin Kulichenko < > >> [hidden email]> wrote: > >>> > >>> You can use localLoadCache method for this (it should be overloaded as > >> well > >>> of course). Basically, if you provide closure based on > IgniteDataStreamer > >>> and call localLoadCache on one of the nodes (client or server), it's > the > >>> same approach as described in [1], but with the possibility to reuse > >>> existing persistence code. Makes sense? > >>> > >>> [1] https://apacheignite.readme.io/docs/data-loading# > ignitedatastreamer > >>> > >>> -Val > >>> > >>> On Tue, Nov 15, 2016 at 1:15 PM, Denis Magda <[hidden email]> > wrote: > >>> > >>>> How would your proposal resolve the main point Aleksandr is trying to > >>>> convey that is extensive network utilization? > >>>> > >>>> As I see the loadCache method still will be triggered on every and as > >>>> before all the nodes will pre-load all the data set from a database. > >> That > >>>> was Aleksandr’s reasonable concern. > >>>> > >>>> If we make up a way how to call the loadCache on a specific node only > >> and > >>>> implement some falt-tolerant mechanism then your suggestion should > work > >>>> perfectly fine. > >>>> > >>>> — > >>>> Denis > >>>> > >>>>> On Nov 15, 2016, at 12:05 PM, Valentin Kulichenko < > >>>> [hidden email]> wrote: > >>>>> > >>>>> It sounds like Aleksandr is basically proposing to support automatic > >>>>> persistence [1] for loading through data streamer and we really don't > >>>> have > >>>>> this. However, I think I have more generic solution in mind. > >>>>> > >>>>> What if we add one more IgniteCache.loadCache overload like this: > >>>>> > >>>>> loadCache(@Nullable IgniteBiPredicate<K, V> p, IgniteBiInClosure<K, > V> > >>>>> clo, @Nullable > >>>>> Object... args) > >>>>> > >>>>> It's the same as the existing one, but with the key-value closure > >>>> provided > >>>>> as a parameter. This closure will be passed to the > CacheStore.loadCache > >>>>> along with the arguments and will allow to override the logic that > >>>> actually > >>>>> saves the loaded entry in cache (currently this logic is always > >> provided > >>>> by > >>>>> the cache itself and user can't control it). > >>>>> > >>>>> We can then provide the implementation of this closure that will > >> create a > >>>>> data streamer and call addData() within its apply() method. > >>>>> > >>>>> I see the following advantages: > >>>>> > >>>>> - Any existing CacheStore implementation can be reused to load > through > >>>>> streamer (our JDBC and Cassandra stores or anything else that user > >>>> has). > >>>>> - Loading code is always part of CacheStore implementation, so it's > >>>> very > >>>>> easy to switch between different ways of loading. > >>>>> - User is not limited by two approaches we provide out of the box, > >> they > >>>>> can always implement a new one. > >>>>> > >>>>> Thoughts? > >>>>> > >>>>> [1] https://apacheignite.readme.io/docs/automatic-persistence > >>>>> > >>>>> -Val > >>>>> > >>>>> On Tue, Nov 15, 2016 at 2:27 AM, Alexey Kuznetsov < > >> [hidden email] > >>>>> > >>>>> wrote: > >>>>> > >>>>>> Hi, All! > >>>>>> > >>>>>> I think we do not need to chage API at all. > >>>>>> > >>>>>> public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable > >>>>>> Object... args) throws CacheException; > >>>>>> > >>>>>> We could pass any args to loadCache(); > >>>>>> > >>>>>> So we could create class > >>>>>> IgniteCacheLoadDescriptor { > >>>>>> some fields that will describe how to load > >>>>>> } > >>>>>> > >>>>>> > >>>>>> and modify POJO store to detect and use such arguments. > >>>>>> > >>>>>> > >>>>>> All we need is to implement this and write good documentation and > >>>> examples. > >>>>>> > >>>>>> Thoughts? > >>>>>> > >>>>>> On Tue, Nov 15, 2016 at 5:22 PM, Alexandr Kuramshin < > >>>> [hidden email]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Vladimir, > >>>>>>> > >>>>>>> I don't offer any changes in API. Usage scenario is the same as it > >> was > >>>>>>> described in > >>>>>>> https://apacheignite.readme.io/docs/persistent-store# > >>>> section-loadcache- > >>>>>>> > >>>>>>> The preload cache logic invokes IgniteCache.loadCache() with some > >>>>>>> additional arguments, depending on a CacheStore implementation, and > >>>> then > >>>>>>> the loading occurs in the way I've already described. > >>>>>>> > >>>>>>> > >>>>>>> 2016-11-15 11:26 GMT+03:00 Vladimir Ozerov <[hidden email]>: > >>>>>>> > >>>>>>>> Hi Alex, > >>>>>>>> > >>>>>>>>>>> Let's give the user the reusable code which is convenient, > >> reliable > >>>>>>> and > >>>>>>>> fast. > >>>>>>>> Convenience - this is why I asked for example on how API can look > >> like > >>>>>>> and > >>>>>>>> how users are going to use it. > >>>>>>>> > >>>>>>>> Vladimir. > >>>>>>>> > >>>>>>>> On Tue, Nov 15, 2016 at 11:18 AM, Alexandr Kuramshin < > >>>>>>> [hidden email] > >>>>>>>>> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> I think the discussion goes a wrong direction. Certainly it's > not a > >>>>>> big > >>>>>>>>> deal to implement some custom user logic to load the data into > >>>>>> caches. > >>>>>>>> But > >>>>>>>>> Ignite framework gives the user some reusable code build on top > of > >>>>>> the > >>>>>>>>> basic system. > >>>>>>>>> > >>>>>>>>> So the main question is: Why developers let the user to use > >>>>>> convenient > >>>>>>>> way > >>>>>>>>> to load caches with totally non-optimal solution? > >>>>>>>>> > >>>>>>>>> We could talk too much about different persistence storage types, > >> but > >>>>>>>>> whenever we initiate the loading with IgniteCache.loadCache the > >>>>>> current > >>>>>>>>> implementation imposes much overhead on the network. > >>>>>>>>> > >>>>>>>>> Partition-aware data loading may be used in some scenarios to > avoid > >>>>>>> this > >>>>>>>>> network overhead, but the users are compelled to do additional > >> steps > >>>>>> to > >>>>>>>>> achieve this optimization: adding the column to tables, adding > >>>>>> compound > >>>>>>>>> indices including the added column, write a peace of repeatable > >> code > >>>>>> to > >>>>>>>>> load the data in different caches in fault-tolerant fashion, etc. > >>>>>>>>> > >>>>>>>>> Let's give the user the reusable code which is convenient, > reliable > >>>>>> and > >>>>>>>>> fast. > >>>>>>>>> > >>>>>>>>> 2016-11-14 20:56 GMT+03:00 Valentin Kulichenko < > >>>>>>>>> [hidden email]>: > >>>>>>>>> > >>>>>>>>>> Hi Aleksandr, > >>>>>>>>>> > >>>>>>>>>> Data streamer is already outlined as one of the possible > >> approaches > >>>>>>> for > >>>>>>>>>> loading the data [1]. Basically, you start a designated client > >> node > >>>>>>> or > >>>>>>>>>> chose a leader among server nodes [1] and then use > >>>>>> IgniteDataStreamer > >>>>>>>> API > >>>>>>>>>> to load the data. With this approach there is no need to have > the > >>>>>>>>>> CacheStore implementation at all. Can you please elaborate what > >>>>>>>>> additional > >>>>>>>>>> value are you trying to add here? > >>>>>>>>>> > >>>>>>>>>> [1] https://apacheignite.readme.io/docs/data-loading# > >>>>>>>> ignitedatastreamer > >>>>>>>>>> [2] https://apacheignite.readme.io/docs/leader-election > >>>>>>>>>> > >>>>>>>>>> -Val > >>>>>>>>>> > >>>>>>>>>> On Mon, Nov 14, 2016 at 8:23 AM, Dmitriy Setrakyan < > >>>>>>>>> [hidden email]> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> I just want to clarify a couple of API details from the > original > >>>>>>>>> to > >>>>>>>>>>> make sure that we are making the right assumptions here. > >>>>>>>>>>> > >>>>>>>>>>> *"Because of none keys are passed to the CacheStore.loadCache > >>>>>>>> methods, > >>>>>>>>>> the > >>>>>>>>>>>> underlying implementation is forced to read all the data from > >>>>>> the > >>>>>>>>>>>> persistence storage"* > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> According to the javadoc, loadCache(...) method receives an > >>>>>>> optional > >>>>>>>>>>> argument from the user. You can pass anything you like, > >>>>>> including a > >>>>>>>>> list > >>>>>>>>>> of > >>>>>>>>>>> keys, or an SQL where clause, etc. > >>>>>>>>>>> > >>>>>>>>>>> *"The partition-aware data loading approach is not a choice. It > >>>>>>>>> requires > >>>>>>>>>>>> persistence of the volatile data depended on affinity function > >>>>>>>>>>>> implementation and settings."* > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> This is only partially true. While Ignite allows to plugin > custom > >>>>>>>>>> affinity > >>>>>>>>>>> functions, the affinity function is not something that changes > >>>>>>>>>> dynamically > >>>>>>>>>>> and should always return the same partition for the same > key.So, > >>>>>>> the > >>>>>>>>>>> partition assignments are not volatile at all. If, in some very > >>>>>>> rare > >>>>>>>>>> case, > >>>>>>>>>>> the partition assignment logic needs to change, then you could > >>>>>>> update > >>>>>>>>> the > >>>>>>>>>>> partition assignments that you may have persisted elsewhere as > >>>>>>> well, > >>>>>>>>> e.g. > >>>>>>>>>>> database. > >>>>>>>>>>> > >>>>>>>>>>> D. > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Nov 14, 2016 at 10:23 AM, Vladimir Ozerov < > >>>>>>>>> [hidden email]> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Alexandr, Alexey, > >>>>>>>>>>>> > >>>>>>>>>>>> While I agree with you that current cache loading logic is far > >>>>>>> from > >>>>>>>>>>> ideal, > >>>>>>>>>>>> it would be cool to see API drafts based on your suggestions > to > >>>>>>> get > >>>>>>>>>>> better > >>>>>>>>>>>> understanding of your ideas. How exactly users are going to > use > >>>>>>>> your > >>>>>>>>>>>> suggestions? > >>>>>>>>>>>> > >>>>>>>>>>>> My main concern is that initial load is not very trivial task > >>>>>> in > >>>>>>>>>> general > >>>>>>>>>>>> case. Some users have centralized RDBMS systems, some have > >>>>>> NoSQL, > >>>>>>>>>> others > >>>>>>>>>>>> work with distributed persistent stores (e.g. HDFS). Sometimes > >>>>>> we > >>>>>>>>> have > >>>>>>>>>>>> Ignite nodes "near" persistent data, sometimes we don't. > >>>>>>> Sharding, > >>>>>>>>>>>> affinity, co-location, etc.. If we try to support all (or > many) > >>>>>>>> cases > >>>>>>>>>> out > >>>>>>>>>>>> of the box, we may end up in very messy and difficult API. So > >>>>>> we > >>>>>>>>> should > >>>>>>>>>>>> carefully balance between simplicity, usability and > >>>>>> feature-rich > >>>>>>>>>>>> characteristics here. > >>>>>>>>>>>> > >>>>>>>>>>>> Personally, I think that if user is not satisfied with > >>>>>>>> "loadCache()" > >>>>>>>>>> API, > >>>>>>>>>>>> he just writes simple closure with blackjack streamer and > >>>>>> queries > >>>>>>>> and > >>>>>>>>>>> send > >>>>>>>>>>>> it to whatever node he finds convenient. Not a big deal. Only > >>>>>>> very > >>>>>>>>>> common > >>>>>>>>>>>> cases should be added to Ignite API. > >>>>>>>>>>>> > >>>>>>>>>>>> Vladimir. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Nov 14, 2016 at 12:43 PM, Alexey Kuznetsov < > >>>>>>>>>>>> [hidden email]> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Looks good for me. > >>>>>>>>>>>>> > >>>>>>>>>>>>> But I will suggest to consider one more use-case: > >>>>>>>>>>>>> > >>>>>>>>>>>>> If user knows its data he could manually split loading. > >>>>>>>>>>>>> For example: table Persons contains 10M rows. > >>>>>>>>>>>>> User could provide something like: > >>>>>>>>>>>>> cache.loadCache(null, "Person", "select * from Person where > >>>>>> id > >>>>>>> < > >>>>>>>>>>>>> 1_000_000", > >>>>>>>>>>>>> "Person", "select * from Person where id >= 1_000_000 and > >>>>>> id < > >>>>>>>>>>>> 2_000_000", > >>>>>>>>>>>>> .... > >>>>>>>>>>>>> "Person", "select * from Person where id >= 9_000_000 and id > >>>>>> < > >>>>>>>>>>>> 10_000_000", > >>>>>>>>>>>>> ); > >>>>>>>>>>>>> > >>>>>>>>>>>>> or may be it could be some descriptor object like > >>>>>>>>>>>>> > >>>>>>>>>>>>> { > >>>>>>>>>>>>> sql: select * from Person where id >= ? and id < ?" > >>>>>>>>>>>>> range: 0...10_000_000 > >>>>>>>>>>>>> } > >>>>>>>>>>>>> > >>>>>>>>>>>>> In this case provided queries will be send to mach nodes as > >>>>>>>> number > >>>>>>>>> of > >>>>>>>>>>>>> queries. > >>>>>>>>>>>>> And data will be loaded in parallel and for keys that a not > >>>>>>>> local - > >>>>>>>>>>> data > >>>>>>>>>>>>> streamer > >>>>>>>>>>>>> should be used (as described Alexandr description). > >>>>>>>>>>>>> > >>>>>>>>>>>>> I think it is a good issue for Ignite 2.0 > >>>>>>>>>>>>> > >>>>>>>>>>>>> Vova, Val - what do you think? > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Nov 14, 2016 at 4:01 PM, Alexandr Kuramshin < > >>>>>>>>>>>> [hidden email]> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> All right, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Let's assume a simple scenario. When the > >>>>>> IgniteCache.loadCache > >>>>>>>> is > >>>>>>>>>>>> invoked, > >>>>>>>>>>>>>> we check whether the cache is not local, and if so, then > >>>>>> we'll > >>>>>>>>>>> initiate > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>> new loading logic. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> First, we take a "streamer" node, it could be done by > >>>>>>>>>>>>>> utilizing LoadBalancingSpi, or it may be configured > >>>>>>> statically, > >>>>>>>>> for > >>>>>>>>>>> the > >>>>>>>>>>>>>> reason that the streamer node is running on the same host as > >>>>>>> the > >>>>>>>>>>>>>> persistence storage provider. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> After that we start the loading task on the streamer node > >>>>>>> which > >>>>>>>>>>>>>> creates IgniteDataStreamer and loads the cache with > >>>>>>>>>>>> CacheStore.loadCache. > >>>>>>>>>>>>>> Every call to IgniteBiInClosure.apply simply > >>>>>>>>>>>>>> invokes IgniteDataStreamer.addData. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> This implementation will completely relieve overhead on the > >>>>>>>>>>> persistence > >>>>>>>>>>>>>> storage provider. Network overhead is also decreased in the > >>>>>>> case > >>>>>>>>> of > >>>>>>>>>>>>>> partitioned caches. For two nodes we get 1-1/2 amount of > >>>>>> data > >>>>>>>>>>>> transferred > >>>>>>>>>>>>>> by the network (1 part well be transferred from the > >>>>>>> persistence > >>>>>>>>>>> storage > >>>>>>>>>>>> to > >>>>>>>>>>>>>> the streamer, and then 1/2 from the streamer node to the > >>>>>>> another > >>>>>>>>>>> node). > >>>>>>>>>>>>>> For > >>>>>>>>>>>>>> three nodes it will be 1-2/3 and so on, up to the two times > >>>>>>>> amount > >>>>>>>>>> of > >>>>>>>>>>>> data > >>>>>>>>>>>>>> on the big clusters. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'd like to propose some additional optimization at this > >>>>>>> place. > >>>>>>>> If > >>>>>>>>>> we > >>>>>>>>>>>> have > >>>>>>>>>>>>>> the streamer node on the same machine as the persistence > >>>>>>> storage > >>>>>>>>>>>> provider, > >>>>>>>>>>>>>> then we completely relieve the network overhead as well. It > >>>>>>>> could > >>>>>>>>>> be a > >>>>>>>>>>>>>> some > >>>>>>>>>>>>>> special daemon node for the cache loading assigned in the > >>>>>>> cache > >>>>>>>>>>>>>> configuration, or an ordinary sever node as well. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Certainly this calculations have been done in assumption > >>>>>> that > >>>>>>> we > >>>>>>>>>> have > >>>>>>>>>>>> even > >>>>>>>>>>>>>> partitioned cache with only primary nodes (without backups). > >>>>>>> In > >>>>>>>>> the > >>>>>>>>>>> case > >>>>>>>>>>>>>> of > >>>>>>>>>>>>>> one backup (the most frequent case I think), we get 2 amount > >>>>>>> of > >>>>>>>>> data > >>>>>>>>>>>>>> transferred by the network on two nodes, 2-1/3 on three, > >>>>>> 2-1/2 > >>>>>>>> on > >>>>>>>>>>> four, > >>>>>>>>>>>>>> and > >>>>>>>>>>>>>> so on up to the three times amount of data on the big > >>>>>>> clusters. > >>>>>>>>>> Hence > >>>>>>>>>>>> it's > >>>>>>>>>>>>>> still better than the current implementation. In the worst > >>>>>>> case > >>>>>>>>>> with a > >>>>>>>>>>>>>> fully replicated cache we take N+1 amount of data > >>>>>> transferred > >>>>>>> by > >>>>>>>>> the > >>>>>>>>>>>>>> network (where N is the number of nodes in the cluster). But > >>>>>>>> it's > >>>>>>>>>> not > >>>>>>>>>>> a > >>>>>>>>>>>>>> problem in small clusters, and a little overhead in big > >>>>>>>> clusters. > >>>>>>>>>> And > >>>>>>>>>>> we > >>>>>>>>>>>>>> still gain the persistence storage provider optimization. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Now let's take more complex scenario. To achieve some level > >>>>>> of > >>>>>>>>>>>>>> parallelism, > >>>>>>>>>>>>>> we could split our cluster on several groups. It could be a > >>>>>>>>>> parameter > >>>>>>>>>>> of > >>>>>>>>>>>>>> the IgniteCache.loadCache method or a cache configuration > >>>>>>>> option. > >>>>>>>>>> The > >>>>>>>>>>>>>> number of groups could be a fixed value, or it could be > >>>>>>>> calculated > >>>>>>>>>>>>>> dynamically by the maximum number of nodes in the group. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> After splitting the whole cluster on groups we will take the > >>>>>>>>>> streamer > >>>>>>>>>>>> node > >>>>>>>>>>>>>> in the each group and submit the task for loading the cache > >>>>>>>>> similar > >>>>>>>>>> to > >>>>>>>>>>>> the > >>>>>>>>>>>>>> single streamer scenario, except as the only keys will be > >>>>>>> passed > >>>>>>>>> to > >>>>>>>>>>>>>> the IgniteDataStreamer.addData method those correspond to > >>>>>> the > >>>>>>>>>> cluster > >>>>>>>>>>>>>> group > >>>>>>>>>>>>>> where is the streamer node running. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> In this case we get equal level of overhead as the > >>>>>>> parallelism, > >>>>>>>>> but > >>>>>>>>>>> not > >>>>>>>>>>>> so > >>>>>>>>>>>>>> surplus as how many nodes in whole the cluster. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2016-11-11 15:37 GMT+03:00 Alexey Kuznetsov < > >>>>>>>>> [hidden email] > >>>>>>>>>>> : > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Alexandr, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Could you describe your proposal in more details? > >>>>>>>>>>>>>>> Especially in case with several nodes. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On Fri, Nov 11, 2016 at 6:34 PM, Alexandr Kuramshin < > >>>>>>>>>>>>>> [hidden email]> > >>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Hi, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> You know CacheStore API that is commonly used for > >>>>>>>>>>> read/write-through > >>>>>>>>>>>>>>>> relationship of the in-memory data with the persistence > >>>>>>>>> storage. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> There is also IgniteCache.loadCache method for > >>>>>> hot-loading > >>>>>>>> the > >>>>>>>>>>> cache > >>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> startup. Invocation of this method causes execution of > >>>>>>>>>>>>>>> CacheStore.loadCache > >>>>>>>>>>>>>>>> on the all nodes storing the cache partitions. Because > >>>>>> of > >>>>>>>> none > >>>>>>>>>>> keys > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>> passed to the CacheStore.loadCache methods, the > >>>>>> underlying > >>>>>>>>>>>>>> implementation > >>>>>>>>>>>>>>>> is forced to read all the data from the persistence > >>>>>>> storage, > >>>>>>>>> but > >>>>>>>>>>>> only > >>>>>>>>>>>>>>> part > >>>>>>>>>>>>>>>> of the data will be stored on each node. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> So, the current implementation have two general > >>>>>> drawbacks: > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 1. Persistence storage is forced to perform as many > >>>>>>>> identical > >>>>>>>>>>>> queries > >>>>>>>>>>>>>> as > >>>>>>>>>>>>>>>> many nodes on the cluster. Each query may involve much > >>>>>>>>>> additional > >>>>>>>>>>>>>>>> computation on the persistence storage server. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2. Network is forced to transfer much more data, so > >>>>>>>> obviously > >>>>>>>>>> the > >>>>>>>>>>>> big > >>>>>>>>>>>>>>>> disadvantage on large systems. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> The partition-aware data loading approach, described in > >>>>>>>>>>>>>>>> https://apacheignite.readme. > >>>>>> io/docs/data-loading#section- > >>>>>>>>>>>>>>>> partition-aware-data-loading > >>>>>>>>>>>>>>>> , is not a choice. It requires persistence of the > >>>>>> volatile > >>>>>>>>> data > >>>>>>>>>>>>>> depended > >>>>>>>>>>>>>>> on > >>>>>>>>>>>>>>>> affinity function implementation and settings. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> I propose using something like IgniteDataStreamer inside > >>>>>>>>>>>>>>>> IgniteCache.loadCache implementation. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> Alexey Kuznetsov > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Alexandr Kuramshin > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> Alexey Kuznetsov > >>>>>>>>>>>>> GridGain Systems > >>>>>>>>>>>>> www.gridgain.com > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Thanks, > >>>>>>>>> Alexandr Kuramshin > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Thanks, > >>>>>>> Alexandr Kuramshin > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> Alexey Kuznetsov > >>>>>> > >>>> > >>>> > >> > >> > > -- Thanks, Alexandr Kuramshin |
Free forum by Nabble | Edit this page |