Add ability to enable and disable rebalancing per-node

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Add ability to enable and disable rebalancing per-node

Sasha Belyak
Hi,
I have jira ticket https://issues.apache.org/jira/browse/IGNITE-5061 with
subj and now I'm trying to understand partition exchange mechanism.
As far as I understand, task point is: it need when we have to grow cluster
(start few new nodes without partition exchange and enable exchange after
we completely get desired topology).
All nodes know actual partition topology, so partitions can stay unbalanced
for long time.
How I see solution:
1) Add in ignite MBean some flag enablePartitionExchange
2) Test it in GridDhtPartitionDemander.requestPartitions and complete
RebalanceFuture if partitionExchange is disabled
3) Somehow fire up force rebalance when flag change to true.
Does anybody see problems with this solution? Maybe we need to add
per-cache granularity or let system caches rebalance even if
enablePartitionExchange=false? Does we have some huge changes in progress,
which can conflict with it?
Reply | Threaded
Open this post in threaded view
|

Re: Add ability to enable and disable rebalancing per-node

npordash
I can outline a use-case I have which may help define requirements for this task. For context, I was originally going to try and address the below use-case by disabling automatic rebalancing on a per-cache basis and use a cluster-wide task to orchestrate manual rebalancing; however, this issue sounds like it may provide a better approach.

I have caches setup for the sole purpose of routing data to nodes via a Data Streamer. The logic in the streamer is simply to access a plugin on the data node which exposes a processing pipeline and runs the received cache entries through it. The data in this case is monitoring related and there is one cache (or logical stream) per data type (f.e. logs, events, metrics).

The pipeline is composed of N services which are deployed as node singletons and have a service filter which targets a particular cache. These services can be deployed and un-deployed as processing requirements change or bugs are fixed without requiring clients to know or care about it.

The catch here is that when nodes are added I don't want map partitions to rebalance to a new node until I know all of the necessary services are running, otherwise we may have a small window where data is processed through a pipeline that isn't completely initialized yet which would result in a data quality issue. Alternatively, I could have the pipeline raise an error which would cause the streamer to retry, but I'd like this to be handled more gracefully, if possible.

In addition, it will probably be the case were these caches eventually have node filters so that we can isolate resources for these streams across different computes. This means that, for example, if we add a node only for metrics then deferring rebalancing should ideally only impact caches that would get assigned to that node.

Going even further... so far we've talked about one cache which is used just for streaming, but at least one of the services would create its own set of caches as an in-memory storage layer which maintains an inverted index and time series data for elements coming through the stream. The storage caches in this case would only exist on nodes where the stream cache is and most of the write activity to these caches would be local since they would use the same affinity as the stream cache (if most writes were remote this wouldn't scale well). So... these caches would need to rebalance at the same time in order to minimize the possibility of additional network calls.

The main concern I have is how to avoid the race condition of another node joining the topology _after_ it has been determined rebalancing should happen, but _before_ rebalancing is triggered. If this is controlled on a per-node (+cache) basis - as the ticket describes - it's probably a non-issue, but it's definitely an issue if it's only on a per-cache basis.

-Nick
Reply | Threaded
Open this post in threaded view
|

Re: Add ability to enable and disable rebalancing per-node

Alexandr Kuramshin
to Nick,

could you please describe in more detail the use of DataStreamer (do you
use StreamReceiver)?

It seems that you've unnecessary care about synchronous service startup and
cache rebalance. Service should start quickly after node has joined the
topology, and will process all the data has been collected by local
partitions the moments before. You may use rebalance delay to minimize the
amount of collected data before the service has been be started.

But if your service depends on external resources (another service),
managing rebalance won't help you because external resource may get
unavailable even after your service gets started and rebalance has occur.
You can't unrebalance partitions in such the case. In addition, if some
event-cache should be supplied with other caches (storing additional data
for service processing), there is always the gap between rebalancing
partition of first and the last cache containing collocated data. I think
you should not worry about additional network calls while rebalancing in
progress.

to Sasha,

I think we need configuration property enablePartitionExchange (in addition
to MBean flag) to have an ability to disable partition exchange at node
startup.

2017-05-06 2:51 GMT+07:00 npordash <[hidden email]>:

> I can outline a use-case I have which may help define requirements for this
> task. For context, I was originally going to try and address the below
> use-case by disabling automatic rebalancing on a per-cache basis and use a
> cluster-wide task to orchestrate manual rebalancing; however, this issue
> sounds like it may provide a better approach.
>
> I have caches setup for the sole purpose of routing data to nodes via a
> Data
> Streamer. The logic in the streamer is simply to access a plugin on the
> data
> node which exposes a processing pipeline and runs the received cache
> entries
> through it. The data in this case is monitoring related and there is one
> cache (or logical stream) per data type (f.e. logs, events, metrics).
>
> The pipeline is composed of N services which are deployed as node
> singletons
> and have a service filter which targets a particular cache. These services
> can be deployed and un-deployed as processing requirements change or bugs
> are fixed without requiring clients to know or care about it.
>
> The catch here is that when nodes are added I don't want map partitions to
> rebalance to a new node until I know all of the necessary services are
> running, otherwise we may have a small window where data is processed
> through a pipeline that isn't completely initialized yet which would result
> in a data quality issue. Alternatively, I could have the pipeline raise an
> error which would cause the streamer to retry, but I'd like this to be
> handled more gracefully, if possible.
>
> In addition, it will probably be the case were these caches eventually have
> node filters so that we can isolate resources for these streams across
> different computes. This means that, for example, if we add a node only for
> metrics then deferring rebalancing should ideally only impact caches that
> would get assigned to that node.
>
> Going even further... so far we've talked about one cache which is used
> just
> for streaming, but at least one of the services would create its own set of
> caches as an in-memory storage layer which maintains an inverted index and
> time series data for elements coming through the stream. The storage caches
> in this case would only exist on nodes where the stream cache is and most
> of
> the write activity to these caches would be local since they would use the
> same affinity as the stream cache (if most writes were remote this wouldn't
> scale well). So... these caches would need to rebalance at the same time in
> order to minimize the possibility of additional network calls.
>
> The main concern I have is how to avoid the race condition of another node
> joining the topology _after_ it has been determined rebalancing should
> happen, but _before_ rebalancing is triggered. If this is controlled on a
> per-node (+cache) basis - as the ticket describes - it's probably a
> non-issue, but it's definitely an issue if it's only on a per-cache basis.
>
> -Nick
>
>
>
> --
> View this message in context: http://apache-ignite-
> developers.2346864.n4.nabble.com/Add-ability-to-enable-and-
> disable-rebalancing-per-node-tp17494p17529.html
> Sent from the Apache Ignite Developers mailing list archive at Nabble.com.
>



--
Thanks,
Alexandr Kuramshin
Reply | Threaded
Open this post in threaded view
|

Re: Add ability to enable and disable rebalancing per-node

yzhdanov
As far as I can see from ticket title reporter wants to disable rebalancing
on per-node basis. So, some nodes would not load partitions from other
nodes while others should load normally. Disabling partition map exchange
will stop rebalancing all over the grid (however, I find this pretty useful
feature in some cases and it probably deserves a separate ticket).

I would ask Alexey Goncharuk to elaborate a little bit and add some
background for the ticket he filed.

--Yakov
Reply | Threaded
Open this post in threaded view
|

Re: Add ability to enable and disable rebalancing per-node

npordash
In reply to this post by Alexandr Kuramshin
Hi Alexandr,

Yes, I'm using a StreamReceiver since I'm mostly using the cache as a router. The implementation is more-or-less like the following:

@Override
public void receive(IgniteCache<Object, Object> channel, Collection<Map.Entry<Object, Object>> entries) {
  final StreamPipeline pipeline = ((Plugin) Ignition.localIgnite().plugin(...)).pipeline(channel.getName());
  pipeline.receive(channel, entries);
}

The StreamPipeline is managed by the data node which contains the stages that make up the pipeline and what service is responsible for each stage (the service exposes a handle method which is used for side-effects or modifying entries as they go through the pipeline). When a service is started or cancelled it will poke the pipeline so it can add or remove that stage from the pipeline. The pipeline is also aware if all required services have started or not.

I didn't mention it initially here, but the classes for these services are _not_ available to Ignite when the node starts. Instead, the node is actually starting a generic service which fetches required jar files from IGFS and then starts the real service using a URLClassLoader (once that is done the pipeline poke I mentioned above occurs). I'm doing this to get around existing service grid limitations with regards to dynamic code deployment.

Because of the above, I wouldn't assume service startup would be "fast" when compared to map partitions being rebalanced to the new node.

Can you please elaborate on the following?

"Service should start quickly after node has joined the topology, and will process all the data has been collected by local partitions the moments before."

It sounds like you're implying that data is being buffered somewhere, but not quite following.

Thanks!
-Nick
Reply | Threaded
Open this post in threaded view
|

Re: Add ability to enable and disable rebalancing per-node

Alexey Goncharuk
Sasha,

The idea behind the ticket was as follows: currently, the rebalanceDelay
property is set in CacheConfiguration, which is not very flexible. In
certain circumstances, a user might expect particularly large load on some
segments of the cluster and want to disable rebalancing for those
(existing) nodes only in a case of topology change to reduce CPU and
network activity related to rebalancing. Thus, I do not think that you
should involve partition map exchange as it is a heavy cluster-wide
operation.
At the first glance, pausing and resuming of rebalancing can be implemented
in a local-only manner.

Disabling exchanges to grow a cluster is indeed a great idea, but it has
nothing to do with the original ticket.

2017-05-12 0:26 GMT+03:00 npordash <[hidden email]>:

> Hi Alexandr,
>
> Yes, I'm using a StreamReceiver since I'm mostly using the cache as a
> router. The implementation is more-or-less like the following:
>
>
>
> The StreamPipeline is managed by the data node which contains the stages
> that make up the pipeline and what service is responsible for each stage
> (the service exposes a handle method which is used for side-effects or
> modifying entries as they go through the pipeline). When a service is
> started or cancelled it will poke the pipeline so it can add or remove that
> stage from the pipeline. The pipeline is also aware if all required
> services
> have started or not.
>
> I didn't mention it initially here, but the classes for these services are
> _not_ available to Ignite when the node starts. Instead, the node is
> actually starting a generic service which fetches required jar files from
> IGFS and then starts the real service using a URLClassLoader (once that is
> done the pipeline poke I mentioned above occurs). I'm doing this to get
> around existing service grid limitations with regards to dynamic code
> deployment.
>
> Because of the above, I wouldn't assume service startup would be "fast"
> when
> compared to map partitions being rebalanced to the new node.
>
> Can you please elaborate on the following?
>
> "Service should start quickly after node has joined the topology, and will
> process all the data has been collected by local partitions the moments
> before."
>
> It sounds like you're implying that data is being buffered somewhere, but
> not quite following.
>
> Thanks!
> -Nick
>
>
>
> --
> View this message in context: http://apache-ignite-
> developers.2346864.n4.nabble.com/Add-ability-to-enable-and-
> disable-rebalancing-per-node-tp17494p17600.html
> Sent from the Apache Ignite Developers mailing list archive at Nabble.com.
>