Pavel Pereslegin created IGNITE-12370:
----------------------------------------- Summary: WAL history reservation may fail due to an incorrect determination of the availability of the WAL segment index (under race condition). Key: IGNITE-12370 URL: https://issues.apache.org/jira/browse/IGNITE-12370 Project: Ignite Issue Type: Bug Reporter: Pavel Pereslegin For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the WAL segment exists in an archive ({{File.exists}}) and then determines that index was in the archive (using {{Files.list}}). If the archive file was created between these operations {{hasIndex}} will return the false-negative result and the partition map exchange will fail on this node. Reproducer: {code:java} public class IgniteWalHistoryReservationsWithLoadTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1)); DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration() .setMaxSize(200L * 1024 * 1024) .setPersistenceEnabled(true)) .setWalMode(WALMode.LOG_ONLY) .setWalSegmentSize(512 * 1024) .setCheckpointFrequency(500); cfg.setDataStorageConfiguration(memCfg); CacheConfiguration ccfg1 = new CacheConfiguration(); ccfg1.setName("cache1"); ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32)); cfg.setCacheConfiguration(ccfg1); return cfg; } @Test public void testReservationWithConstantLoad() throws Exception { final IgniteEx node = startGrid(0); node.cluster().active(true); AtomicLong cntr = new AtomicLong(100_000); ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr); IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader"); U.sleep(500); forceCheckpoint(node); // Reserve history from the beginning. node.context().cache().context().database().reserveHistoryForExchange(); long endTime = U.currentTimeMillis() + 60_000; GridCacheContext ctx = node.cachex("cache1").context(); int grpId = ctx.groupId(); int parts = ctx.topology().partitions(); try { while (U.currentTimeMillis() < endTime && !Thread.currentThread().isInterrupted()) { try { for (int p = 0; p < parts; p++) { boolean reserved = node.context().cache().context().database().reserveHistoryForPreloading(grpId, p, cntr.get()); assertTrue("Unable to reserve history [p=" + p + ", cntr=" + cntr.get() + "]", reserved); } } finally { node.context().cache().context().database().releaseHistoryForPreloading(); } } } finally { node.context().cache().context().database().releaseHistoryForExchange(); ldr.stop(); } fut.get(10_000); } static class ConstantLoader implements Callable<Void> { private final IgniteCache cache; private final AtomicLong cntr; private volatile boolean stop; ConstantLoader(IgniteCache cache, AtomicLong cntr) { this.cache = cache; this.cntr = cntr; } @Override public Void call() throws Exception { while (!stop && !Thread.currentThread().isInterrupted()) { long n = cntr.getAndIncrement(); cache.put(n, n); if (n % 100_000 == 0) log.info("Loaded " + n); } return null; } public void stop() { stop = true; } } /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { stopAllGrids(); cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); cleanPersistenceDir(); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |