package com.veon.dmp.event.kafka; import org.apache.ignite.Ignite; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.binary.BinaryObjectBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; import java.util.stream.Collectors; /** * Stream receiver that extends from {@link AbstractTransactionalStreamReceiver} and * only updates the cache if the update for that field is the latest update * @param Type of the results list in the callback * @param Type of the key of the cache */ public class TimestampBasedUpdateStreamReceiver extends AbstractTransactionalStreamReceiver { private static final Logger LOG = LoggerFactory.getLogger(TimestampBasedUpdateStreamReceiver.class); private static final long serialVersionUID = 1L; /** * String to append to each field of the binary object to track when last update occurred */ public static final String TIMESTAMP_COLUMN_SUFFIX = "__timestamp"; private final Set fieldsToBeIgnored; /** * Fully parameterised constructor * @param ignite Ignite instance * @param doBatch True if use batch transactions, false for single * @param retryLimit Number of times to retry the update transaction before giving up and moving on * @param fieldsToBeIgnored Set of fields to be ignored when checking the timestamp */ public TimestampBasedUpdateStreamReceiver(final Ignite ignite, final Boolean doBatch, final int retryLimit, final Set fieldsToBeIgnored) { super(ignite, doBatch, retryLimit); LOG.info("Creating TimestampBasedUpdateStreamReceiver with fieldsToBeIgnored {}", fieldsToBeIgnored); this.fieldsToBeIgnored = fieldsToBeIgnored; } /** * Minimally parameterised constructor * @param ignite Ignite instance * @param fieldsToBeIgnored Set of fields to be ignored when checking the timestamp */ public TimestampBasedUpdateStreamReceiver(final Ignite ignite, final Set fieldsToBeIgnored) { super(ignite); this.fieldsToBeIgnored = fieldsToBeIgnored; } /** * Check if key of the entry is a valid field to update * @param entryField Entry to apply filter to * @return True if the entry key does not end with the timestamp column suffix and is not in the fieldsToBeIgnored set */ protected boolean applyFilter(final Map.Entry entryField) { LOG.trace("Applying filter to entry {}", entryField); return !entryField.getKey().endsWith(TIMESTAMP_COLUMN_SUFFIX) && !(fieldsToBeIgnored.contains(entryField.getKey())); } /** * Process each objectToUpdate by checking the cacheObject and checking if the timestamp of the * fields should are more recent than what is in the cache already * @param results list of results to be added to if required * @param key the key of the cache object * @param objectToUpdate the object that has been received * @param cacheObjectOption the current object in the cache * @return An optional containing the binary object to put into the cache if should update. Otherwise empty */ @Override final protected Optional process(final List results, final K key, final BinaryObject objectToUpdate, final Optional cacheObjectOption) { LOG.trace("Processing with results {}, key {}, objectToUpdate {} and cacheObjectOption {}", results, key, objectToUpdate, cacheObjectOption); if (!cacheObjectOption.isPresent()) { postProcess(results, key, objectToUpdate, cacheObjectOption); return Optional.of(objectToUpdate); } final BinaryObject cacheObject = cacheObjectOption.get(); final BinaryObjectBuilder cacheObjectBuilder = cacheObject.toBuilder(); final Map nameToFieldObjectMap = new HashMap<>(); objectToUpdate.type().fieldNames().forEach(fieldName -> { if (objectToUpdate.field(fieldName) != null) { nameToFieldObjectMap.put(fieldName, objectToUpdate.field(fieldName)); } }); LOG.trace("Name to field object map: {}", nameToFieldObjectMap); boolean isUpdateOccurred = false; for (final Map.Entry fieldEntry : nameToFieldObjectMap.entrySet().stream().filter(this::applyFilter).collect(Collectors.toSet())) { LOG.trace("Processing fieldEntry {}", fieldEntry); final String timeStampKey = fieldEntry.getKey() + TIMESTAMP_COLUMN_SUFFIX; if (doUpdateFromTimestamp(cacheObject, nameToFieldObjectMap, timeStampKey)) { LOG.trace("Updating the cacheObjectBuilder {} of {}", cacheObjectBuilder, fieldEntry); cacheObjectBuilder.setField(timeStampKey, nameToFieldObjectMap.get(timeStampKey)); cacheObjectBuilder.setField(fieldEntry.getKey(), fieldEntry.getValue()); isUpdateOccurred = true; } } // if there is more recent data in the cache we do not call postProcess if (isUpdateOccurred) { final BinaryObject updatedObject = cacheObjectBuilder.build(); postProcess(results, key, updatedObject, cacheObjectOption); LOG.trace("Returning updated object {}", updatedObject); return Optional.of(updatedObject); } LOG.trace("No update occurred."); return Optional.empty(); } /** * Check if need to update the cache object from the update timestamps * @param cacheObject Binary object from the cache * @param nameToFieldObjectMap Map of fields to objects for the received binary object from the streamer * @param timestampKey Key of where to find the timestamp * @return True if the cache needs updating, false if the cache object has a more recent update of the field */ private boolean doUpdateFromTimestamp(final BinaryObject cacheObject, final Map nameToFieldObjectMap, final String timestampKey) { if (nameToFieldObjectMap.containsKey(timestampKey) && (nameToFieldObjectMap.get(timestampKey) instanceof Long)) { final Long newTimestamp = (Long) nameToFieldObjectMap.get(timestampKey); if (cacheObject.hasField(timestampKey) && cacheObject.field(timestampKey) instanceof Long && newTimestamp <= cacheObject.field(timestampKey)) { LOG.trace("Cache object {} already has latest version of field. New timestamp is {} of {}", cacheObject, newTimestamp, timestampKey); return false; } LOG.trace("Cache object {} either does not have the field {}, the field is not of type long or the newTimestamp {} is smaller and so should be updated.", cacheObject, timestampKey, newTimestamp); return true; } LOG.trace("nameToFieldObjectMap {} does not contain timestamp key {} or not of type long", nameToFieldObjectMap, timestampKey); return true; } /** * post processing is required with binaryObjects * The only thing that can be done here is to put elements to the results. * @param results results to be added to * @param key the of the cache object * @param updatedObject the object that got updated in the cache * @param cacheObject the object retrieved from the cache initially */ protected void postProcess(final List results, K key, final BinaryObject updatedObject, final Optional cacheObject) { LOG.debug("Post processing with results {}, key {}, updatedObject {} and cache object {}", results, key, updatedObject, cacheObject); } }