Idempotent Consumer EIP
We use the Idempotent Consumer EIP to filter out duplicate messages.
References:¶
- Enterprise Integration Patterns: https://www.enterpriseintegrationpatterns.com/patterns/messaging/IdempotentReceiver.html
- Camel Documentation: https://camel.apache.org/components/next/eips/idempotentConsumer-eip.html
A number of camel components support the use of an Idempotent Consumer directly within the component, so it will skip files that are processed once for example.
Some components that use the Idempotent Consumer directly:
- file
- sftp
- ftps
but could equally be used with the p6rest
component.
Platform 6 has an implementation of the Idempotent Repository which can be used by the camel Idempotent Consumer to filter out duplicate messages: P6IdempotentRepository
It is designed to be used in a clustered environment as it is implemented using the Platform 6 In Memory Data Grid
. However, it works equally well in a standalone deployment.
Note
Imagine a three node Platform 6 cluster, each with file consumer routes monitoring the same folder for files dropped. By using an Idempotent Consumer/Repository with a well-known name, file consumers can work together to ensure files are only processed once.
Example¶
Consider a scenario where we have to process files only once. If the same file is dropped more than once they should be skipped.
Here we will be using the File Name as the id which is stored in the Idempotent Repository for keeping track of the files processed.
import io.platform6.core.impl.application.camel.p6imdg.idempotent.P6IdempotentRepository
import org.apache.camel.Processor
import org.apache.camel.Exchange
p6.camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from('file:' + P6_DATA + '/pdfs/in')
.idempotentConsumer(header('CamelFileName'), new P6IdempotentRepository('pdf_filesin'))
.process(new Processor() {
void process(Exchange exchange) throws Exception {
println('This file is being processed the first time -- '
+ exchange.getIn().getHeader('CamelFileName'))
}
})
.to('file:' + P6_DATA + '/pdfs/processed')
.routeId('IdempotentFile')
}
})
Clearing an Idempotent Repository¶
As its name suggests, it is a repository with growing content!
Consider its use with the file component above. Over time a list of all the names of files ever dropped in the /pdfs/in folder will be stored in the repository. If this repository is not cleared from time-to-time or at some point, its use of resources may cause problems; out-of-memory, performance slowdown etc.
It is therefore good practice to use the clear()
method of the repository.
Note
The P6IdempotentRepository will be empty when Platform 6 is first started, however this is not the case in a clustered deployment where the repository content is maintained by all nodes in the cluster
When used in a single node deployment a simple clear() call in the route deployment script is often enough:
import io.platform6.core.impl.application.camel.p6imdg.idempotent.P6IdempotentRepository
import org.apache.camel.Processor
import org.apache.camel.Exchange
p6.camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
def idempotentRepo = new P6IdempotentRepository('pdf_filesin')
from('file:' + P6_DATA + '/pdfs/in')
.idempotentConsumer(header('CamelFileName'), idempotentRepo)
.process(new Processor() {
void process(Exchange exchange) {
def fileName = exchange.getIn().getHeader('CamelFileName')
println('This file is being processed the first time -- ' + fileName)
if(fileName == 'stop'){
idempotentRepo.clear()
}
}
})
.to('file:' + P6_DATA + '/pdfs/processed')
.routeId('idempotentFile')
}
})
Note
Each repository has a given name so can be referenced by multiple routes/scripts
In a clustered Platform 6 deployment clearing is better scheduled to run only on the master
node:
import io.platform6.core.impl.application.camel.p6imdg.idempotent.P6IdempotentRepository
import org.apache.camel.Processor
import org.apache.camel.Exchange
p6.camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from('master:p6:quartz://housekeeping/idempotent?cron=0+4+*+?+*+SUN')
.process(new Processor() {
void process(Exchange exchange) {
new P6IdempotentRepository('pdf_filesin').clear()
}
})
.routeId('idempotentHousekeeper')
}
})
4am on Sunday
Note
In a single node deployment, the single node is master
by default so this technique can also be used.