Pavel Pereslegin created IGNITE-12349:
----------------------------------------- Summary: File transmission can cause the cluster to freeze. Key: IGNITE-12349 URL: https://issues.apache.org/jira/browse/IGNITE-12349 Project: Ignite Issue Type: Bug Affects Versions: 2.8 Reporter: Pavel Pereslegin Assignee: Maxim Muzafarov When we initiating file transmission - a timeout object with mutable endTime is added to the timeout processor "queue" (see TcpCommunicationSpi#openChannel). Since endTime is mutable, a timeout for this object will never occur, moreover, at some point, this object will be the first in the "queue" and TimeoutProcessor will stop working at all. Reproducer {code:java} public class FileTransmissionTimeoutProcessorTest extends GridCommonAbstractTest { @After public void after() throws Exception { cleanPersistenceDir(); stopAllGrids(); } /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { return super.getConfiguration(igniteInstanceName) .setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setPersistenceEnabled(true) .setMaxSize(500L * 1024 * 1024))) .setCacheConfiguration(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)); } @Test public void testChannelTimeoutObject() throws Exception { IgniteEx snd = startGrid(0); IgniteEx rcv = startGrid(1); // Do some transfer between nodes. initiateFileTransfer(snd, rcv); GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>(); // Add new timeout object after file transmission timeout object. snd.context().timeout().addTimeoutObject(new GridTimeoutObjectAdapter(DFLT_CONN_TIMEOUT + 1_000) { @Override public void onTimeout() { fut.onDone(true); } }); // The timeout processor will hang on the file transfer timeout object and will never complete the remaining tasks. boolean success = fut.get(DFLT_CONN_TIMEOUT + 30_000); assertTrue(success); } /** */ private void initiateFileTransfer(IgniteEx snd, IgniteEx rcv) throws IOException, IgniteCheckedException, InterruptedException { snd.cluster().active(true); awaitPartitionMapExchange(); try (IgniteDataStreamer<Integer, Integer> dataStreamer = snd.dataStreamer(DEFAULT_CACHE_NAME)) { dataStreamer.allowOverwrite(true); for (int i = 0; i < 10_000; i++) dataStreamer.addData(i, i + DEFAULT_CACHE_NAME.hashCode()); } Map<String, Long> fileSizes = new HashMap<>(); Map<String, Integer> fileCrcs = new HashMap<>(); Map<String, Serializable> fileParams = new HashMap<>(); assertTrue(snd.context().io().fileTransmissionSupported(rcv.localNode())); File tempStore = U.resolveWorkDirectory(U.defaultWorkDirectory(), "ctmp", true); rcv.context().io().addTransmissionHandler(GridTopic.TOPIC_CACHE.topic("test", 0), new TransmissionHandler() { @Override public void onException(UUID nodeId, Throwable err) { // No-op. } @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) { return new File(tempStore, fileMeta.name()).getAbsolutePath(); } @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) { return null; } @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) { return new Consumer<File>() { @Override public void accept(File file) { assertTrue(fileSizes.containsKey(file.getName())); // Save all params. fileParams.putAll(initMeta.params()); } }; } }); IgniteInternalCache<Object, Object> defCache = snd.cachex(DEFAULT_CACHE_NAME); File cacheDirIg0 = ((FilePageStoreManager)(defCache).context() .shared() .pageStore()).cacheWorkDir(defCache.configuration()); File[] cacheParts = cacheDirIg0.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { return name.endsWith(FILE_SUFFIX); } }); for (File file : cacheParts) { fileSizes.put(file.getName(), file.length()); fileCrcs.put(file.getName(), FastCrc.calcCrc(file)); } try (GridIoManager.TransmissionSender sender = snd.context() .io() .openTransmissionSender(rcv.localNode().id(), GridTopic.TOPIC_CACHE.topic("test", 0))) { // Iterate over cache partition cacheParts. for (File file : cacheParts) { Map<String, Serializable> params = new HashMap<>(); params.put(file.getName(), file.hashCode()); sender.send(file, params, TransmissionPolicy.FILE); } } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |