package com.veon.dmp.event.kafka; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.stream.StreamReceiver; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionOptimisticException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; /** * Abstract stream receiver that processes entries using Ignite's transactions. * The class handles batches of entries using batch transactions or single transactions behaviour. * The benefit of this abstract class is it handles the concurrency between the updates the cache objects * by using Optimistic and Serializable ignite transaction support https://apacheignite.readme.io/docs/transactions#deadlock-free-transactions * Note: Atomicity mode should be set to TRANSACTIONAL * @param Type of the results list in the callback * @param Type of the key of the cache * @param Type of the value of the cache */ public abstract class AbstractTransactionalStreamReceiver implements StreamReceiver { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionalStreamReceiver.class); private final Ignite ignite; private final Boolean doBatch; private final int retryLimit; /** * The constructor * @param ignite Ignite instance * @param doBatch True if use batch transactions, false for single * @param retryLimit Number of times to retry the update transaction before giving up and moving on */ public AbstractTransactionalStreamReceiver(final Ignite ignite, final Boolean doBatch, final int retryLimit) { LOG.info("Creating AbstractTransactionalStreamReceiver with doBatch {} and retryLimit {}", doBatch, retryLimit); this.ignite = ignite; this.doBatch = doBatch; this.retryLimit = retryLimit; if (retryLimit < 0) { LOG.error("retryLimit can not be lower than 0"); throw new IllegalArgumentException("retryLimit can not be lower than 0"); } } /** * Minimal constructor providing good defaults * @param ignite Ignite instance */ public AbstractTransactionalStreamReceiver(final Ignite ignite) { this(ignite, false, 10); } /** * Called when the streamer has flushed the data from the client. * @param cache Ignite cache to operate with * @param entries Entries from the streamer to perform logic on * @throws IgniteException Exception from cache operations */ @Override public void receive(final IgniteCache cache, final Collection> entries) throws IgniteException { if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() != CacheAtomicityMode.TRANSACTIONAL) { LOG.warn("Cache should use Transactional atomicity for this class to take effect. Please update the configuration Atomicity Mode of {} and start again.", cache.getName()); } LOG.debug("Receiving {} entries...", entries.size()); if (doBatch) { processWithBatchTransactions(cache, entries); } else { processWithSingleTransactions(cache, entries); } } /** * This function uses one transaction for the whole set of entries */ private void processWithBatchTransactions(final IgniteCache cache, final Collection> entries) throws IgniteException { LOG.debug("Processing with batch transactions"); final Map update = new HashMap<>(); int retry = 0; do { try (final Transaction transaction = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) { final Map cacheObjects = cache.getAll(entries.stream().map(Map.Entry::getKey).collect(Collectors.toSet())); final LinkedList results = new LinkedList<>(); entries.forEach(entry -> { final K key = entry.getKey(); final V objectToUpdate = entry.getValue(); final Optional cacheObject = Optional.ofNullable(Optional.ofNullable(update.get(key)).orElse(cacheObjects.get(key))); final Optional updatedObjectOption = process(results, key, objectToUpdate, cacheObject); updatedObjectOption.ifPresent(updatedObject -> update.put(key, updatedObject)); }); cache.putAll(update); transaction.commit(); callback(results); LOG.debug("Put {} new records successfully", update.size()); return; } catch (final TransactionOptimisticException toe) { LOG.error("TransactionOptimisticException - could not put all the objects for retry: {}. The entries will be retried again until {} times", retry, retryLimit); update.clear(); } retry++; } while (retry <= retryLimit); LOG.error("Could not process {} entries after retried {} times", entries.size(), retry); } /** * This function uses one transaction per entry of the entries */ private void processWithSingleTransactions(final IgniteCache cache, final Collection> entries) throws IgniteException { LOG.debug("Processing with single transactions"); final List results = new LinkedList<>(); final int[] failedCount = {0}; entries.forEach(entry -> { int retry = 0; do { try (final Transaction transaction = ignite.transactions().txStart(TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE)) { final Optional cacheObject = Optional.ofNullable(cache.get(entry.getKey())); final K key = entry.getKey(); final V objectToUpdate = entry.getValue(); final Optional updatedObjectOption = process(results, key, objectToUpdate, cacheObject); updatedObjectOption.ifPresent(updatedObject -> cache.put(key, updatedObject)); transaction.commit(); break; } catch (final TransactionOptimisticException toe) { LOG.error("TransactionOptimisticException for object with key: {}. Retry number {}", entry.getKey(), retry); } retry++; } while (retry <= retryLimit); if (retry >= retryLimit) { failedCount[0]++; LOG.error("Could not update object with key: {}", entry.getKey()); } }); callback(results); LOG.debug("Processed the {} entries with single transactions and the number of TransactionOptimisticException failures is {}.", entries.size(), failedCount[0]); } /** * Process the received object and the one from the cache into the one that will be saved into the cache * @param results results to be added to * @param key the key of the cache object * @param objectToUpdate the object that has been received * @param cacheObjectOption the current object in the cache if present, else empty optional * @return updatedObject if any updated has been performed after processing */ protected abstract Optional process(final List results, final K key, final V objectToUpdate, final Optional cacheObjectOption); /** * This method gets called once the batch of entries are processed. Could be used for sending the messages to the client nodes/locallisteners * @param results to be sent */ protected void callback(final List results) { LOG.debug("Callback with results {}", results); } }