Showing posts with label apache camel. Show all posts
Showing posts with label apache camel. Show all posts

Thursday, August 8, 2019

Apache Camel DSL Sample That Does A Few Things

Below piece does a few things. It's useful if you are using Camel DSL like me. The explanation is in the form of comment below. 

<!-- Bean instantiation of Sql Server driver which will be referenced by below DSL -->
    <bean id="sqlServerDS"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="${mssql.driverClassName}" />
        <property name="url" value="${mssql.url}" />
        <property name="username" value="${mssql.username}" />
        <property name="password" value="${mssql.password}" />
    </bean>

<!-- Setting up camel endpoint for XSLT which will be used to transform the received XML into another XML -->
<camel:endpoint id="someXSLT" uri="xslt:file://${xslt.location}/someXSLT.xslt"/>

<!-- Setting up File endpoint to output the generated output flat file -->
<camel:endpoint id="InformationFile" uri="file://?fileName=${output.location}/$simple{file:name}&amp;fileExist=Append"/>

<!-- Camel Route starts -->
<camel:route id="LalaLand">

                        <!-- Listening to someQueue which has been setup in ActiveMq -->
                        <camel:from uri="tcpActivemq:queue:someQueue"/>

                        <!-- transform the Xml picked up from someQueue using someXSLT -->
<camel:to ref="someXSLT"/>
                        <!-- define file name for the output file and stores it in Header -->
                        <camel:setHeader headerName="CamelFileName">
<camel:xpath resultType="java.lang.String">concat('somefile_', //code, '.txt')     </camel:xpath>
</camel:setHeader>
                        
                        <!-- logging -->
<camel:log message="SPLIT BEGINS" loggingLevel="DEBUG" logName="lalaland" />
                       <!-- Split the Xml transformed by someXSLT into lines -->
<camel:split>
<camel:xpath>//FileRecord</camel:xpath>
<camel:setHeader headerName="ctcRef">
<camel:xpath resultType="java.lang.String">//contactRef</camel:xpath>
</camel:setHeader>
                                <!- The payload will lose in the next section so needs to store the original message -->
<camel:setProperty propertyName="oriMessage">
<camel:simple>${body}</camel:simple>
</camel:setProperty>

                                <!-- if header variable ctcRef is not empty, Camel DSL will call 3 SQL to finally get the Date Of Birth, which is the requirement of this. Why 3 Sql? Because over here it does not support JOIN query -->
<camel:choice>
<camel:when>
<camel:simple>${header.ctcRef} != ''</camel:simple>
<camel:to uri="sql:SELECT ID FROM CONTACTS WHERE CONTACTREF = :#ctcRef;?dataSource=sqlServerDS" />
<camel:log message="body: ${body[0].get('ID')} " loggingLevel="DEBUG" logName="com.experian" />
<camel:setHeader headerName="contactId">
<camel:simple>${body[0].get('ID')}</camel:simple>
</camel:setHeader>
<camel:log message="contactId: ${header.contactId} " loggingLevel="DEBUG" logName="com.experian" />
<camel:to uri="sql:SELECT ACCOUNTID FROM ACCOUNTCONTACTS WHERE CONTACTTYPEID = 102 AND CONTACTID = :#contactId;?dataSource=sqlServerDS" />
<camel:setHeader headerName="accountId">
<camel:simple>${body[0].get('ACCOUNTID')}</camel:simple>
</camel:setHeader>
<camel:log message="accountId: ${header.accountId} " loggingLevel="DEBUG" logName="com.experian" />
<camel:to uri="sql:SELECT CONVERT(varchar,DATE_OF_BIRTH,23) AS DOB FROM CS_PERSON WHERE ACCOUNTS1 = :#accountId;?dataSource=sqlServerDS" />
                                                <!-- Store Date Of Birth in Header -->
<camel:setProperty propertyName="dob">
<camel:simple>${body[0].get('DOB')}</camel:simple>
</camel:setProperty>
<camel:log message="dob: ${header.dob} " loggingLevel="DEBUG" logName="com.experian" />
</camel:when>
</camel:choice>
                               <!-- Set the Body back with the original payload stored above -->
<camel:setBody>
<camel:simple>${property.oriMessage}</camel:simple>
</camel:setBody>
                                
                                <!-- Retrieves field value using xpath, add the dob from Header, and print in one line -->
<camel:transform>
<camel:xpath resultType="java.lang.String">concat(//accountRef, '|', //contactRef, '|', //title, '|', //givenName, '|', //middleName, '|', //familyName, '|', ${header.dob}, '&#xD;&#xA;')</camel:xpath>
</camel:transform>
                                
                               <!- The below will be executed after last line by checking based on CamelSplitComplete from Header -->
<camel:when>
<camel:simple>${header.CamelSplitComplete} == true</camel:simple>
                                        <!-- Below will add below wordings and number of records based on CamelSplitSize from Header to the output flat file -->
<camel:transform>
<camel:simple>adding new line here ${header.CamelSplitSize} </camel:simple>
</camel:transform>
</camel:when>
                                <!- content will be output to a physical file -->
<camel:to ref="someFile"/>
</camel:split>
</camel:route>

Thursday, January 31, 2019

Named Parameters for Apache Camel SQL (Spring DSL) that involves more than 1 table

I was working on this Apache Camel solution where I need to pick up XML from Apache MQ, then transform the XML payload into text. During the XML payload transformation, I have to enrich the content by querying the DB based on a field read from the same payload.

As in the official documentation version 2.12.4, named parameters can be used in the Endpoint URI , represented by the symbol :# .

However, in much recent versions, Apache Camel user can just directly access Header or Property in the Endpoint URI using :#${property.xxx} or :#{header.xxx}. But in my case, my Apache Camel version is 2.12.4, so I've to stick with the 2.12 way of doing things.

Anyway, the named parameters way works, but only to SQL query that only reads from 1 table. It will not work if the SQL query involves many tables such as joined query. I was getting error message "Invalid Column Name" , but it was never about the Column Name was wrong, it was about Camel SQL Component failed to recognize the :# symbol to replace the value.

So, what happened to the joined table query then? I just broke the SQL query into multiple pieces of Camel SQL calls, with each of them calling only one single table based on the value retrieved from the previous SQL call.

It's worth highlighting that the columns from result set will always be stored in Message Body as Java Map in an Java ArrayList. It looks like this upon printing on log [{ID=12345}]. 

Monday, January 22, 2018

Camel-Context(DSL) only, No Coding! How to Log? How to Handle Error? How to Configure Retry? How to work out the Property Placeholder?

I'm excited to have found new way in Apache Camel to build specific feature without writing any single code, which has always been my way due to time constraint and flexibility (to make everything configurable and every exception handled properly). It was a long winding process with many many rounds of trial and error.

What I'm trying to achieve here is very simple, just read file and SFTP to the other side and Backup the original Source File! If this would have been done in a Java/Groovy and registered as a Spring Bean, then it shall be very fast. However I chose to do it the Camel-way!


  1. In your Camel-Context.xml (Spring DSL), declare a Route.  
  2. The <From> should be the File EIP. Remember to set a "Delay" for the file polling interval. 
  3. Next, use the SFTP EIP for the <To>


Done. So simple! However...

I want to LOG the entire process, such as:  

  1. What Are the Files Being Picked Up? 
  2. Whether the SFTP is Successful or Failed?  
  3. Was the SFTPed file moved to a Backup folder? 
For my own knowledge, I was using 2 ways to log:
  1. Camel's Log EIP - To log Route process + Error to the Application-specific log. 
  2. Camel's Log - To log Error to the General Error log. 
**For Error, will be written to both logs. 

Camel's Log EIP 
<camel:log message="[MyTimer2] SFTP Begins ${file:name}" loggingLevel="DEBUG" logName="myTimer2Logger" />
**${file:name} here is a kind of Reserved Keyword inside Camel's. It's called the File Expression language and this specific one will output the name of the file picked up in the log.  

Camel's Log
<camel:to uri="log:error-rollingFileAppender?level=ERROR&amp;showCaughtException=true&amp;showStackTrace=true" />
** First Parameter (in this case - error-rollingFileAppender) refers to the Appender setup in my Application's LogBack.xml


Question, where should I place my LOG Error Block? I should only LOG Error upon Exception right? 

First, I tried the <DoTry> - <DoCatch> way, I put my LOG Error function within <DoCatch>. However, there is a flaw here! Upon SFTP Exception, the File:// will still regard the SFTP as successful and continued to move file to the Backup folder. 

Finally I found the right way, use <OnException>! So just put the Log Error block inside your <OnException>, which is inside the <Route>! 


<camel:exception>java.lang.Exception</camel:exception>
<camel:to uri="log:error-rollingFileAppender?  level=ERROR&amp;showCaughtException=true&amp;showStackTrace=true" />
<camel:log message="[MyTimer2] File Transmission Error ${exception.stacktrace}" loggingLevel="ERROR" logName="myTimer2Logger" />
</camel:onException> 


Is that all? Nope.

I wanted to Retry for a fixed number of times upon any kind of Exception! 

Easy. Just declare the below:

<camel:redeliveryPolicyProfile id="myRedelivery" retryAttemptedLogLevel="ERROR" maximumRedeliveries="${H2MaxRetry}" redeliveryDelay="${H2RetryDelay}" />

Then add this as the "redeliveryPolicyRef" to your Route.

Last but not least, I wanted to set all the properties, such as the Input Directory, the SFTP host, username, password and etc in a single properties file

Here's the trick:
Use <Endpoint> for your File:// and SFTP:// 
Somehow only Endpoint allows Spring Property Placeholder! 

<camel:endpoint id="send_source" uri="file://${app.dir}/filewatcher/anz_output?delay=${myTimer2}&amp;move=${app.dir}/filewatcher/anz_backup/$simple{file:name}.done&amp;moveFailed=${app.dir}/filewatcher/anz_bad"/>
<camel:endpoint id="send_remote" uri="sftp://${H2hostname}${H2Path}?username=${H2user}&amp;password=${H2password}&amp;soTimeout=${H2SocketTimeout}&amp;binary=${H2Binary}"/>

**These properties placeholder are stored in the properties file configured at the PlaceholderConfigurer