jeado ko created IGNITE-10861:
--------------------------------- Summary: Using multiple Ignite Sink got Ignite instance has already been started Error Key: IGNITE-10861 URL: https://issues.apache.org/jira/browse/IGNITE-10861 Project: Ignite Issue Type: Bug Components: streaming Affects Versions: 2.7 Reporter: jeado ko I got following error when I create multiple sink in Flink 1.7.0 {code:java} Caused by: class org.apache.ignite.IgniteCheckedException: Default Ignite instance has already been started. at org.apache.ignite.internal.IgnitionEx.start0(IgnitionEx.java:1141) at org.apache.ignite.internal.IgnitionEx.startConfigurations(IgnitionEx.java:1076) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:962) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:861) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:731) at org.apache.ignite.internal.IgnitionEx.start(IgnitionEx.java:700) at org.apache.ignite.Ignition.start(Ignition.java:348) {code} and this is my flink job code {code:java} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.ignite.sink.flink.IgniteSink import scala.collection.JavaConverters._ object IgniteSinkTestJob extends App { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val igniteSink = new IgniteSink[java.util.Map[String, String]]("testCache", "ignite-test.xml") igniteSink.setAllowOverwrite(true) igniteSink.setAutoFlushFrequency(10) // igniteSink.open(new Configuration) val igniteSink2 = new IgniteSink[java.util.Map[String, String]]("testCache2", "ignite-test.xml") igniteSink2.setAllowOverwrite(true) igniteSink2.setAutoFlushFrequency(10) // igniteSink2.open(new Configuration) val igniteSink3 = new IgniteSink[java.util.Map[String, String]]("testCache3", "ignite-test.xml") igniteSink3.setAllowOverwrite(true) igniteSink3.setAutoFlushFrequency(10) // igniteSink3.open(new Configuration) val source = env.fromCollection( Array( Map("key1" -> "hello1"), Map("key1" -> "hello11"), Map("key1" -> "hello144"), Map("key1" -> "hello1155"), Map("key2" -> "hello2"), Map("key2" -> "hello3"), Map("key3" -> "hello23"), Map("key3" -> "hello25") ).map(_.asJava) ) source .filter(v => v.containsKey("key1")) .setParallelism(2) .addSink(igniteSink) .name("sink1") .setParallelism(1) source.filter(v => v.containsKey("key2")) .setParallelism(2) .addSink(igniteSink2) .name("sink2") .setParallelism(1) source.filter(v => v.containsKey("key3")) .setParallelism(2) .addSink(igniteSink3) .name("sink3") .setParallelism(1) env.execute("test ignite sink") } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |