Ignite Dev Community,
I’m working with the Ignite 2.4+ Spark SQL DataFrame functionality and have run into what I believe to be a bug where spark partition information is incorrect for non-trivial sizes of Ignite clusters. The partition array returned to Spark via org.apache.ignite.spark.impl.calcPartitions() needs to be in the order of the spark partition numbers, but the function doesn’t make that guarantee and consistently fails for anything but very small Ignite clusters. Without the correct partition sequencing, Spark will throw errors such as: java.lang.IllegalArgumentException: requirement failed: partitions(0).partition == 3, but it should equal 0 at scala.Predef$.require(Predef.scala:224) at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:255) at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:254) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) at org.apache.spark.rdd.RDD.count(RDD.scala:1162) at org.apache.ignite.spark.IgniteSQLDataFrameSpec$$anonfun$1$$anonfun$apply$mcV$sp$11.apply$mcV$sp(IgniteSQLDataFrameSpec.scala:145) I’ve forked and committed a change which demonstrates this by increasing the number of servers in the spark tests from 3 to 4 which causes the IgniteSQLDataFrameSpec test to start failing per above. This commit also demonstrates the fix which is to just sequence the ignite node map before zipping: https://github.com/stuartmacd/ignite/commit/c9e7294c71de9e7b2bddfae671605a71260b80b3 Can anyone help confirm this behaviour? Happy to create a jira and pull request for the proposed change. I believe this might also be related to another earlier report: http://apache-ignite-users.70518.x6.nabble.com/Getting-an-exception-when-listing-partitions-of-IgniteDataFrame-td22434.html Thanks, Stuart. |
Hello, Stuart.
I will investigate this issue and return to you in a couple days. пт, 20 июля 2018 г., 17:59 Stuart Macdonald <[hidden email]>: > Ignite Dev Community, > > I’m working with the Ignite 2.4+ Spark SQL DataFrame functionality and > have run into what I believe to be a bug where spark partition information > is incorrect for non-trivial sizes of Ignite clusters. > > The partition array returned to Spark via > org.apache.ignite.spark.impl.calcPartitions() needs to be in the order of > the spark partition numbers, but the function doesn’t make that guarantee > and consistently fails for anything but very small Ignite clusters. Without > the correct partition sequencing, Spark will throw errors such as: > > java.lang.IllegalArgumentException: requirement failed: > partitions(0).partition == 3, but it should equal 0 > at scala.Predef$.require(Predef.scala:224) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:255) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:254) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > at scala.Option.getOrElse(Option.scala:121) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) > at org.apache.spark.rdd.RDD.count(RDD.scala:1162) > at > org.apache.ignite.spark.IgniteSQLDataFrameSpec$$anonfun$1$$anonfun$apply$mcV$sp$11.apply$mcV$sp(IgniteSQLDataFrameSpec.scala:145) > > I’ve forked and committed a change which demonstrates this by increasing > the number of servers in the spark tests from 3 to 4 which causes the > IgniteSQLDataFrameSpec test to start failing per above. This commit also > demonstrates the fix which is to just sequence the ignite node map before > zipping: > > > https://github.com/stuartmacd/ignite/commit/c9e7294c71de9e7b2bddfae671605a71260b80b3 > > Can anyone help confirm this behaviour? Happy to create a jira and pull > request for the proposed change. > > I believe this might also be related to another earlier report: > http://apache-ignite-users.70518.x6.nabble.com/Getting-an-exception-when-listing-partitions-of-IgniteDataFrame-td22434.html > > Thanks, > Stuart. > > |
Hello, Stuart.
I'm able to reproduce your issue So I've created the ticket for it - https://issues.apache.org/jira/browse/IGNITE-9063 Do you want to provide a fix? В Пт, 20/07/2018 в 19:37 +0300, Nikolay Izhikov пишет: > Hello, Stuart. > > I will investigate this issue and return to you in a couple days. > > пт, 20 июля 2018 г., 17:59 Stuart Macdonald <[hidden email]>: > > Ignite Dev Community, > > > > I’m working with the Ignite 2.4+ Spark SQL DataFrame functionality and have run into what I believe to be a bug where spark partition information is incorrect for non-trivial sizes of Ignite clusters. > > > > The partition array returned to Spark via org.apache.ignite.spark.impl.calcPartitions() needs to be in the order of the spark partition numbers, but the function doesn’t make that guarantee and consistently fails for anything but very small Ignite clusters. Without the correct partition sequencing, Spark will throw errors such as: > > > > java.lang.IllegalArgumentException: requirement failed: partitions(0).partition == 3, but it should equal 0 > > at scala.Predef$.require(Predef.scala:224) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:255) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:254) > > at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > > at scala.Option.getOrElse(Option.scala:121) > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > > at scala.Option.getOrElse(Option.scala:121) > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > > at scala.Option.getOrElse(Option.scala:121) > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > > at scala.Option.getOrElse(Option.scala:121) > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) > > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) > > at scala.Option.getOrElse(Option.scala:121) > > at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) > > at org.apache.spark.rdd.RDD.count(RDD.scala:1162) > > at org.apache.ignite.spark.IgniteSQLDataFrameSpec$$anonfun$1$$anonfun$apply$mcV$sp$11.apply$mcV$sp(IgniteSQLDataFrameSpec.scala:145) > > > > I’ve forked and committed a change which demonstrates this by increasing the number of servers in the spark tests from 3 to 4 which causes the IgniteSQLDataFrameSpec test to start failing per above. This commit also demonstrates the fix which is to just sequence the ignite node map before zipping: > > > > https://github.com/stuartmacd/ignite/commit/c9e7294c71de9e7b2bddfae671605a71260b80b3 > > > > Can anyone help confirm this behaviour? Happy to create a jira and pull request for the proposed change. > > > > I believe this might also be related to another earlier report: http://apache-ignite-users.70518.x6.nabble.com/Getting-an-exception-when-listing-partitions-of-IgniteDataFrame-td22434.html > > > > Thanks, > > Stuart. > > |
Thanks Nikolay, yes I’ll provide a fix.
Stuart. > On 24 Jul 2018, at 10:18, Nikolay Izhikov <[hidden email]> wrote: > > Hello, Stuart. > > I'm able to reproduce your issue > > So I've created the ticket for it - https://issues.apache.org/jira/browse/IGNITE-9063 > > Do you want to provide a fix? > > В Пт, 20/07/2018 в 19:37 +0300, Nikolay Izhikov пишет: >> Hello, Stuart. >> >> I will investigate this issue and return to you in a couple days. >> >> пт, 20 июля 2018 г., 17:59 Stuart Macdonald <[hidden email]>: >>> Ignite Dev Community, >>> >>> I’m working with the Ignite 2.4+ Spark SQL DataFrame functionality and have run into what I believe to be a bug where spark partition information is incorrect for non-trivial sizes of Ignite clusters. >>> >>> The partition array returned to Spark via org.apache.ignite.spark.impl.calcPartitions() needs to be in the order of the spark partition numbers, but the function doesn’t make that guarantee and consistently fails for anything but very small Ignite clusters. Without the correct partition sequencing, Spark will throw errors such as: >>> >>> java.lang.IllegalArgumentException: requirement failed: partitions(0).partition == 3, but it should equal 0 >>> at scala.Predef$.require(Predef.scala:224) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:255) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2$$anonfun$apply$3.apply(RDD.scala:254) >>> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:254) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) >>> at scala.Option.getOrElse(Option.scala:121) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) >>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) >>> at scala.Option.getOrElse(Option.scala:121) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) >>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) >>> at scala.Option.getOrElse(Option.scala:121) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) >>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) >>> at scala.Option.getOrElse(Option.scala:121) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) >>> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) >>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) >>> at scala.Option.getOrElse(Option.scala:121) >>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) >>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092) >>> at org.apache.spark.rdd.RDD.count(RDD.scala:1162) >>> at org.apache.ignite.spark.IgniteSQLDataFrameSpec$$anonfun$1$$anonfun$apply$mcV$sp$11.apply$mcV$sp(IgniteSQLDataFrameSpec.scala:145) >>> >>> I’ve forked and committed a change which demonstrates this by increasing the number of servers in the spark tests from 3 to 4 which causes the IgniteSQLDataFrameSpec test to start failing per above. This commit also demonstrates the fix which is to just sequence the ignite node map before zipping: >>> >>> https://github.com/stuartmacd/ignite/commit/c9e7294c71de9e7b2bddfae671605a71260b80b3 >>> >>> Can anyone help confirm this behaviour? Happy to create a jira and pull request for the proposed change. >>> >>> I believe this might also be related to another earlier report: http://apache-ignite-users.70518.x6.nabble.com/Getting-an-exception-when-listing-partitions-of-IgniteDataFrame-td22434.html >>> >>> Thanks, >>> Stuart. |
Free forum by Nabble | Edit this page |