RE: Apache Flink Sink + Ignite: Ouch! Argument is invalid

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

RE: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Stanislav Lukyanov
Hi guys,

Thanks for helping with the fix!

As this is a development topic now and not a usage one, I’m BCC’ing the user-list and replacing it with dev-list.
Please continue the discussion there.

Andrey, Dmitry, please help with the review.

Thanks,
Stan

From: Saikat Maitra
Sent: 22 июля 2018 г. 8:28
To: [hidden email]; [hidden email]
Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Hi Ray, Andrew

As discussed I have fixed the issue with IgniteSink when running in cluster mode.

Please review the below PR and share feedback.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat




On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <[hidden email]> wrote:
Hi Ray,

Thank you for validating the changes, I see that in cluster mode when I am checking the IgniteSink it is working as desired. In stand alone mode I can see we are getting the exception class org.apache.ignite.IgniteException: Default Ignite instance has already been started.

Please take a look into this sample application https://github.com/samaitra/streamers which I used to run it with flink in cluster mode.

I am considering if I should make changes to run the IgniteSink in client mode similar to the ways flink connector for redis and flume were implemented in Apache Bahir

https://github.com/apache/bahir-flink

I will share update soon.

Regards,
Saikat



On Sun, Jul 15, 2018 at 10:07 PM, Ray <[hidden email]> wrote:
Hello Saikat,

I tried your newest code and wrote a simple word count application to test
the sink.
It appears there's still problems.
Here's my code.



import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.configuration.Configuration
import org.apache.ignite.Ignition
import org.apache.ignite.configuration.CacheConfiguration

import scala.collection.JavaConverters._


object WordCount {

        def main(args: Array[String]) {

                val ignite = Ignition.start("ignite.xml")
                val cacheConfig = new CacheConfiguration[Any, Any]()
                ignite.destroyCache("aaa")
                cacheConfig.setName("aaa")
                cacheConfig.setSqlSchema("PUBLIC")
                ignite.createCache(cacheConfig)
                ignite.close()


                // set up the execution environment
                val env = StreamExecutionEnvironment.getExecutionEnvironment

                val igniteSink = new IgniteSink[java.util.Map[String, Int]]("aaa",
"ignite.xml")

                igniteSink.setAllowOverwrite(false)
                igniteSink.setAutoFlushFrequency(1)

                igniteSink.open(new Configuration)


                // get input data
                val text = env.fromElements(
                        "To be, or not to be,--that is the question:--",
                        "Whether 'tis nobler in the mind to suffer",
                        "The slings and arrows of outrageous fortune",
                        "Or to take arms against a sea of troubles,")


                val counts = text
                        // split up the lines in pairs (2-tuples) containing: (word,1)
                        .flatMap(_.toLowerCase.split("\\W+"))
                        .filter(_.nonEmpty)
                        .map((_, 1))
                        // group by the tuple field "0" and sum up tuple field "1"
                        .keyBy(0)
                        .sum(1)
                        // Convert to key/value format before ingesting to Ignite
                        .mapWith { case (k: String, v: Int) => Map(k -> v).asJava }
                        .addSink(igniteSink)

                try
                        env.execute("Streaming WordCount1")
                catch {
                        case e: Exception =>

                        // Exception handling.
                } finally igniteSink.close()

        }
}

I tried running this application in Idea and the error log snippet is as
follows

07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched to
FAILED
class org.apache.ignite.IgniteException: Default Ignite instance has already
been started.
        at
org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.java:990)
        at org.apache.ignite.Ignition.start(Ignition.java:355)
        at IgniteSink.open(IgniteSink.java:135)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
instance has already been started.
        at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1134)
        at
org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1069)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:955)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:854)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:724)
        at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:693)
        at org.apache.ignite.Ignition.start(Ignition.java:352)
        ... 7 more

07/16/2018 11:05:30     Job execution switched to status FAILING.



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/



Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Saikat Maitra
Hi Stan

Thank you , we will continue the discussion in this dev-list.

Ray - Thank you for validating the changes, I will take a look into the
WordCount application. My understanding is the IgniteSink need not do any
reduce and the output collector will reduce the results and write to Ignite.

Regards,
Saikat

On Sun, Jul 22, 2018 at 5:07 PM, Stanislav Lukyanov <[hidden email]>
wrote:

> Hi guys,
>
> Thanks for helping with the fix!
>
> As this is a development topic now and not a usage one, I’m BCC’ing the
> user-list and replacing it with dev-list.
> Please continue the discussion there.
>
> Andrey, Dmitry, please help with the review.
>
> Thanks,
> Stan
>
> From: Saikat Maitra
> Sent: 22 июля 2018 г. 8:28
> To: [hidden email]; [hidden email]
> Subject: Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid
>
> Hi Ray, Andrew
>
> As discussed I have fixed the issue with IgniteSink when running in
> cluster mode.
>
> Please review the below PR and share feedback.
>
> PR : https://github.com/apache/ignite/pull/4398
> Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695
>
> Regards,
> Saikat
>
>
>
>
> On Mon, Jul 16, 2018 at 10:47 PM, Saikat Maitra <[hidden email]>
> wrote:
> Hi Ray,
>
> Thank you for validating the changes, I see that in cluster mode when I am
> checking the IgniteSink it is working as desired. In stand alone mode I can
> see we are getting the exception class org.apache.ignite.IgniteException:
> Default Ignite instance has already been started.
>
> Please take a look into this sample application https://github.
> com/samaitra/streamers which I used to run it with flink in cluster mode.
>
> I am considering if I should make changes to run the IgniteSink in client
> mode similar to the ways flink connector for redis and flume were
> implemented in Apache Bahir
>
> https://github.com/apache/bahir-flink
>
> I will share update soon.
>
> Regards,
> Saikat
>
>
>
> On Sun, Jul 15, 2018 at 10:07 PM, Ray <[hidden email]> wrote:
> Hello Saikat,
>
> I tried your newest code and wrote a simple word count application to test
> the sink.
> It appears there's still problems.
> Here's my code.
>
>
>
> import org.apache.flink.api.scala._
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.extensions._
> import org.apache.flink.configuration.Configuration
> import org.apache.ignite.Ignition
> import org.apache.ignite.configuration.CacheConfiguration
>
> import scala.collection.JavaConverters._
>
>
> object WordCount {
>
>         def main(args: Array[String]) {
>
>                 val ignite = Ignition.start("ignite.xml")
>                 val cacheConfig = new CacheConfiguration[Any, Any]()
>                 ignite.destroyCache("aaa")
>                 cacheConfig.setName("aaa")
>                 cacheConfig.setSqlSchema("PUBLIC")
>                 ignite.createCache(cacheConfig)
>                 ignite.close()
>
>
>                 // set up the execution environment
>                 val env = StreamExecutionEnvironment.
> getExecutionEnvironment
>
>                 val igniteSink = new IgniteSink[java.util.Map[String,
> Int]]("aaa",
> "ignite.xml")
>
>                 igniteSink.setAllowOverwrite(false)
>                 igniteSink.setAutoFlushFrequency(1)
>
>                 igniteSink.open(new Configuration)
>
>
>                 // get input data
>                 val text = env.fromElements(
>                         "To be, or not to be,--that is the question:--",
>                         "Whether 'tis nobler in the mind to suffer",
>                         "The slings and arrows of outrageous fortune",
>                         "Or to take arms against a sea of troubles,")
>
>
>                 val counts = text
>                         // split up the lines in pairs (2-tuples)
> containing: (word,1)
>                         .flatMap(_.toLowerCase.split("\\W+"))
>                         .filter(_.nonEmpty)
>                         .map((_, 1))
>                         // group by the tuple field "0" and sum up tuple
> field "1"
>                         .keyBy(0)
>                         .sum(1)
>                         // Convert to key/value format before ingesting to
> Ignite
>                         .mapWith { case (k: String, v: Int) => Map(k ->
> v).asJava }
>                         .addSink(igniteSink)
>
>                 try
>                         env.execute("Streaming WordCount1")
>                 catch {
>                         case e: Exception =>
>
>                         // Exception handling.
>                 } finally igniteSink.close()
>
>         }
> }
>
> I tried running this application in Idea and the error log snippet is as
> follows
>
> 07/16/2018 11:05:30     aggregation -> Map -> Sink: Unnamed(4/8) switched
> to
> FAILED
> class org.apache.ignite.IgniteException: Default Ignite instance has
> already
> been started.
>         at
> org.apache.ignite.internal.util.IgniteUtils.convertException(IgniteUtils.
> java:990)
>         at org.apache.ignite.Ignition.start(Ignition.java:355)
>         at IgniteSink.open(IgniteSink.java:135)
>         at
> org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(
> AbstractUdfStreamOperator.java:111)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:376)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:253)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite
> instance has already been started.
>         at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.
> java:1134)
>         at
> org.apache.ignite.internal.IgnitionEx.startConfigurations(
> IgnitionEx.java:1069)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:955)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:854)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:724)
>         at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.
> java:693)
>         at org.apache.ignite.Ignition.start(Ignition.java:352)
>         ... 7 more
>
> 07/16/2018 11:05:30     Job execution switched to status FAILING.
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>
>
>
>
Ray
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Ray
Hi Saikat,

The results flink calculated before sending to sink is correct, but the
results in Ignite is not correct.
You can remove the sink and print the stream content to validate my point.



--
Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Saikat Maitra
Hi Ray,

We will need to use igniteSink.setAllowOverwrite(true) flag so that latest
computed values are stored in cache. Also we need not call igniteSink.open(
new Configuration)

Please take a look into the below modified wordCount sample.

https://github.com/samaitra/flink-fn/blob/master/flink-fn/src/main/scala/com/samaitra/WordCount.scala

Please review and share feedback

Regards
Saikat

On Thu, Jul 26, 2018 at 1:16 AM, Ray <[hidden email]> wrote:

> Hi Saikat,
>
> The results flink calculated before sending to sink is correct, but the
> results in Ignite is not correct.
> You can remove the sink and print the stream content to validate my point.
>
>
>
> --
> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Saikat Maitra
Hi Andrew,

As we discussed I have updated the PR, please take a look. If it looks good
then I can go ahead and merge the changes.

PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat

On Thu, Jul 26, 2018 at 11:25 PM, Saikat Maitra <[hidden email]>
wrote:

> Hi Ray,
>
> We will need to use igniteSink.setAllowOverwrite(true) flag so that
> latest computed values are stored in cache. Also we need not call
> igniteSink.open(new Configuration)
>
> Please take a look into the below modified wordCount sample.
>
> https://github.com/samaitra/flink-fn/blob/master/flink-fn/
> src/main/scala/com/samaitra/WordCount.scala
>
> Please review and share feedback
>
> Regards
> Saikat
>
> On Thu, Jul 26, 2018 at 1:16 AM, Ray <[hidden email]> wrote:
>
>> Hi Saikat,
>>
>> The results flink calculated before sending to sink is correct, but the
>> results in Ignite is not correct.
>> You can remove the sink and print the stream content to validate my point.
>>
>>
>>
>> --
>> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Flink Sink + Ignite: Ouch! Argument is invalid

Saikat Maitra
Hi,

I have submitted the below PR, please review and share feedback.


Jira: https://issues.apache.org/jira/browse/IGNITE-8697
PR : https://github.com/apache/ignite/pull/4398
Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695

Regards,
Saikat


On Thu, Jul 26, 2018 at 11:26 PM, Saikat Maitra <[hidden email]>
wrote:

> Hi Andrew,
>
> As we discussed I have updated the PR, please take a look. If it looks
> good then I can go ahead and merge the changes.
>
> PR : https://github.com/apache/ignite/pull/4398
> Review : https://reviews.ignite.apache.org/ignite/review/IGNT-CR-695
>
> Regards,
> Saikat
>
> On Thu, Jul 26, 2018 at 11:25 PM, Saikat Maitra <[hidden email]>
> wrote:
>
>> Hi Ray,
>>
>> We will need to use igniteSink.setAllowOverwrite(true) flag so that
>> latest computed values are stored in cache. Also we need not call
>> igniteSink.open(new Configuration)
>>
>> Please take a look into the below modified wordCount sample.
>>
>> https://github.com/samaitra/flink-fn/blob/master/flink-fn/sr
>> c/main/scala/com/samaitra/WordCount.scala
>>
>> Please review and share feedback
>>
>> Regards
>> Saikat
>>
>> On Thu, Jul 26, 2018 at 1:16 AM, Ray <[hidden email]> wrote:
>>
>>> Hi Saikat,
>>>
>>> The results flink calculated before sending to sink is correct, but the
>>> results in Ignite is not correct.
>>> You can remove the sink and print the stream content to validate my
>>> point.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-ignite-developers.2346864.n4.nabble.com/
>>>
>>
>>
>