When you are doing Data Processing in data pipelines you must drop the duplicate messages or files, otherwise these duplicate files will create wrong results for data analysis and will lead to wrong business decisions. Any Data Analysis job or machine learning pipeline must find duplicates and drop them before processing them.

There might be solution that can be fixed at source system side, but it is always better sense the duplicates in the data before considering the data for analysis. There are number of ways to drop duplicates from data pipelines.

My solution for dropping duplicates is as follows. There is simple way to achieve this if you are using Apache Nifi in your project.

  1. If your source system is writing the data files to any Kafka topic, then it will be much easy to read that with Nifi processor called ConsumeKafka and connect it to the DetectDuplicate processor to find the duplicates and drop them
  2. But how you are gooing to identify the file is a duplicated one. This is where Nifi flow comes as handy tool. If your file content or file name provides any unique id like UUID or ReferenceID, the you can store this ID in distributed storage like Redis, CouchDB.
  3. Whenever you receive the file, nifi will check the refernce id is already available in database, if not store it route the file to "non-duplicate" relationship and send it to the topic where want to read the file for further processing.  Else reroute the file to "duplicate" relation and dropping it from from the processing. 
  4. You can define age out time of storing reference ids in DB or may be you can delete the ids based on size or amount of ids it is stored. 
  5. If you don't use Nifi, you need to build a simple java / scala application which reads the kafka topic, Connection to  RedisDB /Couch DB to store referenceids. 
As per the documentation Apache Nifi  DetectDuplicate processor "Caches a value, computed from FlowFile attributes, for each incoming FlowFile and determines if the cached value has already been seen. If so, routes the FlowFile to 'duplicate' with an attribute named 'original.identifier' that specifies the original FlowFile's "description", which is specified in the <FlowFile Description> property. If the FlowFile is not determined to be a duplicate, the Processor routes the FlowFile to 'non-duplicate' ".
Must read: How to deploy Machine Learning Models in Production?

There are some configurations that you need to set up in processor as showm in below pic:
To know what each property means, look at the link:  DetectDuplicate Documentation





   

Post a Comment

Previous Post Next Post