Saturday, October 8, 2016

Database Polling Listener and Watermark Configuration in MuleSoft Anypoint Studio

Many a times, there is a need for retrieving the newly inserted database records for synching or business process purposes.

MuleSoft makes it easy for us to process the records easily using Poll, Database with the Batch processing components.

Let us see how to design the Mule Flow - 

Step - 1 Drag and drop 'Batch' object into Anypoint studio canvas
Step - 2 Drag and drop 'Poll' object into Input section of batch process.
Step - 3 Place business process objects as appropriate under  Batch_Step section.
Step - 4 Place the objects as required in 'On Complete' section.

Now, you can see the template as below - 



Next step is to configure the DB query under Datbase object as below -

In Connector configuration configure the Database and query details as below - 



Please note that we need to update the Required Dependencies with the Database JAR file to enable communication to Database.

Next step is to configure the DB Query as below -



You may note that the query has the WHERE clause based on the criteria such that only the latest updated records are fetched.  So, we need to introduce a mule flow variable accordingly.  It could be based on the ID, date or any other data that suits to select only the latest updated records.

We will now see how MuleSoft helps us automatically update the variable to set the watermark of the data that eliminates the records that are already processed.


Based on the query, MuleSoft automatically udpates the watermark variable and processes the records only when there is at least one satisfying the criteria.

The below is the complete Mule Flow for your reference -

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd">
    <file:connector name="File" outputPattern="Outfile.xml"  autoDelete="true" streaming="true" validateConnections="true" doc:name="File" workFileNamePattern="*.xm" writeToDirectory="C:\FileCopySourceLocation"/>
    <file:connector name="File1"   autoDelete="true" streaming="true" validateConnections="true" doc:name="File" readFromDirectory="C:\FileCopySourceLocation" workFileNamePattern="Myfile.xml"/>
    <db:mysql-config name="MySQL_Configuration" host="localhost" port="3306" user="root" password="root" database="test" doc:name="MySQL Configuration"/>
    <batch:job name="Process_Employee_Records">
        <batch:input>
            <poll doc:name="Poll">
                <fixed-frequency-scheduler frequency="1" timeUnit="MINUTES"/>
                <watermark variable="id" default-expression="0" selector="LAST" selector-expression="#[payload.id]"/>
                <db:select config-ref="MySQL_Configuration" doc:name="Employee Repository">
                    <db:parameterized-query><![CDATA[SELECT ID, NAME, UPDATED_TIME FROM EMPLOYEE WHERE ID > #[flowVars.id]]]></db:parameterized-query>
                </db:select>
            </poll>
        </batch:input>
        <batch:process-records>
            <batch:step name="Batch_Step">
                <logger message="#[payload]" level="INFO" doc:name="Log Retrieval Status"/>
                <mulexml:object-to-xml-transformer doc:name="Convert to XML"/>
                <file:outbound-endpoint path="C:\FileCopySourceLocation" outputPattern="Myfile.xml" connector-ref="File" responseTimeout="10000" doc:name="Persist Payload"/>
            </batch:step>
        </batch:process-records>
        <batch:on-complete>
            <logger message="Completed Successfully" level="INFO" doc:name="Record Completion"/>
        </batch:on-complete>
    </batch:job>
</mule>

Hope you enjoyed reading this blog.  For any queries, please feel free to write to me @ sivakumar.th@gmail.com.









1 comment: