Embedded mode ignite on spark for cache data lost issues

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Embedded mode ignite on spark for cache data lost issues

percent620
I will describe my issues detailed as below. I have the same code on two scenarios  but first one is correct and second one is not correct.

I'm studying ignite recent days but can't get correct result on this....Hopefully anyone can help me on this.

==============================
1、Run ignite with spark-shell
1)./spark-shell --jars /u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-log4j/log4j-1.2.17.jar --packages org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0

2)running the following code on spark-shell
val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration(),false)
    val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
    val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
    println("initalRDD.counter=/. " + initalRDD.count() +"\t partitionCounter=> " + initalRDD.partitions.size)

    //sharedRDD.saveValues(initalRDD.map(line=>line._1))
    sharedRDD.savePairs(initalRDD, true)//override cache on ignite
    println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t igniteParitionCounter => " + sharedRDD.partitions.size)
    println("=====>totalIgniteFilterConditionEmbedCounter" + sharedRDD.filter(_._2 > 50000).count)

3)result as below
scala> import org.apache.ignite.spark._
import org.apache.ignite.spark._

scala> import org.apache.ignite.configuration._
import org.apache.ignite.configuration._

scala> val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration(),false)
ic: org.apache.ignite.spark.IgniteContext[Int,Int] = org.apache.ignite.spark.IgniteContext@74e72ff4

scala>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at RDD at IgniteAbstractRDD.scala:31

scala>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at map at <console>:33

scala>     println("initalRDD.counter=/. " + initalRDD.count() +"\t partitionCounter=> " + initalRDD.partitions.size)
initalRDD.counter=/. 100000 partitionCounter=> 10

scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite
                                                                               
scala>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t igniteParitionCounter => " + sharedRDD.partitions.size)
=====>totalIgniteEmbedCounter100000      igniteParitionCounter => 1024          

scala>     println("=====>totalIgniteFilterConditionEmbedCounter" + sharedRDD.filter(_._2 > 50000).count)
=====>totalIgniteFilterConditionEmbedCounter50000    

totalIgniteEmbedCounter is :100000 ,right
totalIgniteFilterConditionEmbedCounteris :50000, right
==============================


2、IDEA project
1)create a maven project on idea
2) import ignite maven files as above [1]
  <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-core</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-indexing</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-visor-console</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spring</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-spark</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.ignite</groupId>
            <artifactId>ignite-yarn</artifactId>
            <version>1.6.0</version>
        </dependency>
3)code as below for idea
object TestIgniteEmbedCache {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("TestIgniteEmbedCache")
    val sc = new SparkContext(conf)

    //val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED),false)
    val ic = new IgniteContext[Int, Int](sc, () => new IgniteConfiguration(),false)
    val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
    val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
    println("initalRDD.counter=/. " + initalRDD.count() +"\t partitionCounter=> " + initalRDD.partitions.size)

    //sharedRDD.saveValues(initalRDD.map(line=>line._1))
    sharedRDD.savePairs(initalRDD, true)//override cache on ignite
    println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t igniteParitionCounter => " + sharedRDD.partitions.size)
    println("=====>totalIgniteFilterConditionEmbedCounter" + sharedRDD.filter(_._2 > 50000).count)

  }

}
4、running maven clean assembly:assembly and get sparkignitedemo.jar

5、upload this jar to our linux driver machine and submit jar to yarn cluster using spark-submit command as below

/u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class com.TestIgniteEmbedCache --master yarn --executor-cores 5 --executor-memory 1000m --num-executors 10 --conf spark.rdd.compress=false --conf spark.shuffle.compress=false --conf spark.broadcast.compress=false /home/sparkignitedemo.jar


6、result: this is issue on this
totalIgniteEmbedCounter is : 40000 or 3000(I think is random)
totalIgniteFilterConditionEmbedCounteris :10000 or 2000(random)
==========================

This result is very make me to be confused on this why the same code have two different result on this? Can anyone help me on this? I'm blocking this issue on several days.

Thanks!!!
Reply | Threaded
Open this post in threaded view
|

Re: Embedded mode ignite on spark for cache data lost issues

Valentin Kulichenko
There is already a discussion on user list regarding this [1]. Let's
continue there as this is not for the dev list. I will try to respond there
shortly.

[1]
http://apache-ignite-users.70518.x6.nabble.com/Embedded-mode-ignite-on-spark-td6942.html

-Val

On Sun, Aug 14, 2016 at 6:24 PM, percent620 <[hidden email]> wrote:

> I will describe my issues detailed as below. I have the same code on two
> scenarios  but first one is correct and second one is not correct.
>
> I'm studying ignite recent days but can't get correct result on
> this....Hopefully anyone can help me on this.
>
> ==============================
> 1、Run ignite with spark-shell
> 1)./spark-shell --jars
> /u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-core-1.
> 6.0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/
> ignite-spark/ignite-spark-1.6.0.jar,/u01/xxx/apache-ignite-
> hadoop-1.6.0-bin/libs/cache-api-1.0.0.jar,/u01/xxx/apache-
> ignite-hadoop-1.6.0-bin/libs/ignite-log4j/ignite-log4j-1.6.
> 0.jar,/u01/xxx/apache-ignite-hadoop-1.6.0-bin/libs/ignite-
> log4j/log4j-1.2.17.jar
> --packages
> org.apache.ignite:ignite-spark:1.6.0,org.apache.ignite:ignite-spring:1.6.0
>
> 2)running the following code on spark-shell
> val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
>
>     //sharedRDD.saveValues(initalRDD.map(line=>line._1))
>     sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
>
> 3)result as below
> scala> import org.apache.ignite.spark._
> import org.apache.ignite.spark._
>
> scala> import org.apache.ignite.configuration._
> import org.apache.ignite.configuration._
>
> scala> val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
> ic: org.apache.ignite.spark.IgniteContext[Int,Int] =
> org.apache.ignite.spark.IgniteContext@74e72ff4
>
> scala>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
> sharedRDD: org.apache.ignite.spark.IgniteRDD[Int,Int] = IgniteRDD[1] at
> RDD
> at IgniteAbstractRDD.scala:31
>
> scala>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
> initalRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[3] at
> map
> at <console>:33
>
> scala>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
> initalRDD.counter=/. 100000     partitionCounter=> 10
>
> scala> sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>
> scala>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
> =====>totalIgniteEmbedCounter100000      igniteParitionCounter => 1024
>
> scala>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
> =====>totalIgniteFilterConditionEmbedCounter50000
>
> *totalIgniteEmbedCounter is :100000 ,right *
> *totalIgniteFilterConditionEmbedCounteris :50000, right *
> ==============================
>
>
> 2、IDEA project
> 1)create a maven project on idea
> 2) import ignite maven files as above [1]
>   <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-core</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-indexing</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-visor-console</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-spring</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-spark</artifactId>
>             <version>1.6.0</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.ignite</groupId>
>             <artifactId>ignite-yarn</artifactId>
>             <version>1.6.0</version>
>         </dependency>
> 3)code as below for idea
> object TestIgniteEmbedCache {
>   def main(args: Array[String]) {
>     val conf = new SparkConf().setAppName("TestIgniteEmbedCache")
>     val sc = new SparkContext(conf)
>
>     //val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration().setIncludeEventTypes(EventType.EVT_TASK_FAILED),
> false)
>     val ic = new IgniteContext[Int, Int](sc, () => new
> IgniteConfiguration(),false)
>     val sharedRDD = ic.fromCache("sharedBaselineCacheRDD")
>     val initalRDD = sc.parallelize(1 to 100000,10).map(i => (i, i))
>     println("initalRDD.counter=/. " + initalRDD.count() +"\t
> partitionCounter=> " + initalRDD.partitions.size)
>
>     //sharedRDD.saveValues(initalRDD.map(line=>line._1))
>     sharedRDD.savePairs(initalRDD, true)//override cache on ignite
>     println("=====>totalIgniteEmbedCounter" + sharedRDD.count + "\t
> igniteParitionCounter => " + sharedRDD.partitions.size)
>     println("=====>totalIgniteFilterConditionEmbedCounter" +
> sharedRDD.filter(_._2 > 50000).count)
>
>   }
>
> }
> 4、running maven clean assembly:assembly and get sparkignitedemo.jar
>
> 5、upload this jar to our linux driver machine and submit jar to yarn
> cluster
> using spark-submit command as below
>
> /u01/spark-1.6.0-hive/bin/spark-submit --driver-memory 8G --class
> com.TestIgniteEmbedCache --master yarn --executor-cores 5 --executor-memory
> 1000m --num-executors 10 --conf spark.rdd.compress=false --conf
> spark.shuffle.compress=false --conf spark.broadcast.compress=false
> /home/sparkignitedemo.jar
>
>
> 6、result: this is issue on this
> *totalIgniteEmbedCounter is : 40000 or 3000(I think is random) *
> *totalIgniteFilterConditionEmbedCounteris :10000 or 2000(random) *
> ==========================
>
> This result is very make me to be confused on this why the same code have
> two different result on this? Can anyone help me on this? I'm blocking this
> issue on several days.
>
> Thanks!!!
>
>
>
> --
> View this message in context: http://apache-ignite-
> developers.2346864.n4.nabble.com/Embedded-mode-ignite-on-
> spark-for-cache-data-lost-issues-tp10685.html
> Sent from the Apache Ignite Developers mailing list archive at Nabble.com.
>