Part 1: Mule Batch Processing - Introduction


Mule Enterprise Edition has capability to process messages in batches. This is useful for processing large number of records or stream of data. It has many components that are very specific to batch processing and can be used to implement business logic. In this blog series, we will talk about batch processing in Mule and unit testing it with MUnit.

In this first part of the series, I will introduce you to Mule Batch processing and different terminologies related to it. At the end, We will also see a simple Mule batch job.

For quick reference, here are links to all parts in this series:

Let’s begin with some random quote -

You have to be Odd to be Number One!
— Dr. Seuss

1 Introduction to Batch

Batch processing in Mule is divided into four phases - Input, Load and Dispatch, Process and On Complete.

1.1 Mule Batch Phases

Mule Batch Phases
Figure 1.A: Mule Batch Phases [Source: Mule Docs]

Input Phase: This is an optional part of the batch job that can be used to retrieve the source data using any inbound connector. It also allows to chain multiple message processor to transform the source data before it is ready for processing.

Load and Dispatch: This is an implicit phase and Mule runtime takes care of it. In this phase, the payload generated in Input phase or provided to Batch from Caller flow, is turned into collection of records. It also creates a job instance for processing records. The collection is then sent through the collection-splitter to queue individual records for processing.

Process: This is the required phase where actual processing of every record occurs asynchronously.

  1. Each record from the input queue is processed through First Step and sent back to the queue after processing of first Step completes.

  2. Records that are processed in First step, are then passed through Second Step and sent back to queue after processing of second step completes.

  3. Mule continues this until all records are passed through each step.

At the step level, you can also specify what type of records should step accept. This can be configured using accept-expression on Step definition. Records satisfying accept-expression condition of a step, will be processed by that step, otherwise moved to next eligible step. You can also define accept-policy to filter out messages based on their processing status.

Processing of records through next step DOES NOT wait for previous step to finish processing of all records. Mule manages state of each record while it moves back-and-forth between queue and steps.

On Complete: In this final but optional phase, A summary of batch execution is made available to possibly generate reports or any other statistics. Payload in this phase is available as an instance of BatchJobResult object. It holds information such as number of records loaded, processed, failed, succeeded. It can also provide details of exceptions occurred in steps, if any.

1.2 What are Record Variables?

In general, mule provides Flow Variables, Session Variables and Outbound Properties to store information at different scope levels. When records are being processed through the steps, each record becomes the message payload for step.

If you want to store any information at individual record level then existing scopes does not work. That is where a Record variable is needed. It is scoped to the Process phase only and every record gets it’s own copy of record variables that are serialized and carried with record through all the steps.

Record variables can be set using <batch:set-record-variable /> element, similar to <set-variable />.

Record variables can only exist and be used during the Batch Process phase when a record is being processed.

1.3 Initializing a Batch Job

Mule Batch processing can be initiated by two ways -

A. Triggering via Input Phase: One-way message sources can be polled in input phase to retrieve data for processing.

Listing 1.3.B: Triggering a Batch Job
  <batch:input>
      <poll>
        <db:select config-ref="MySQL_Configuration" doc:name="Database">
          <db:parameterized-query><![CDATA[select * from employees where status = 'REHIRE']]></db:parameterized-query>
        </db:select>
      </poll>
  </batch:input>

B. Invoking Batch Job: Just like invoking a flow/sub-flow using flow-ref component, it is possible to invoke a batch in existing flow using <batch:execute /> component.

Listing 1.3.B: Invoking Batch job
  <flow name="mule-configFlow">
        <poll doc:name="Poll">
            <fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/>
            <db:select config-ref="MySQL_Configuration" doc:name="Database">
              <db:parameterized-query><![CDATA[select * from employees where status = 'REHIRE']]></db:parameterized-query>
            </db:select>
        </poll>
        <batch:execute name="simple_batch_job" doc:name="simple_batch_job"/>
    </flow>
Even if you invoke Batch by referencing it from another flow, it still executes asynchronously and you will NOT get batch results back in the calling flow. Any processing on batch results, should be done in On Complete phase only.

2. Simple Batch Job Application

We now have enough basic information about Mule batch job. Let’s look at a simple batch job (we will use this in our testing demo).

We expect to invoke this batch from another flow that generate the source data and feeds it to the batch. Our batch expects a list of coffee orders with some status. General business logic will be -

  1. If status is 'MessedUp' then input phase will not load those records.

  2. Records with status 'Ready' will be marked as 'Processed'.

  3. Records with status 'Not-Ready' will be marked as 'Failed'.

Also, our steps have accept-expression defined to demonstrate record filtering.

Listing 2.A: Simple Batch Job
<batch:job name="mule-simple-batch">
    <batch:input>     (1)
        <dw:transform-message doc:name="Transform Message">
            <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload filter $.status != 'MessedUp']]></dw:set-payload>
        </dw:transform-message>
    </batch:input>

    <!-- Implicit Load and Dispatch phase --> (2)

    <batch:process-records>     (3)
        <batch:step name="Batch_Step_1">
            <flow-ref name="batch-step-1-process" doc:name="Flow batch-step-1-process"/>
        </batch:step>
        <batch:step name="Batch_Step_2"  accept-expression="#[payload.status == 'Processing']">
        	<flow-ref name="batch-step-2-process" doc:name="Flow batch-step-2-process"/>
        </batch:step>
        <batch:step name="Batch_Step_3" accept-expression="#[payload.status == 'Not-Processing']">
        	<flow-ref name="batch-step-3-process" doc:name="Flow batch-step-3-process"/>
        </batch:step>
    </batch:process-records>

    <batch:on-complete>       (4)
        <logger message="#['Batch Processing Result: Loaded:'+ payload.loadedRecords + ', successful: '+ payload.successfulRecords + ', failed: '+ payload.failedRecords]" level="INFO" doc:name="EndLogger"/>
    </batch:on-complete>
</batch:job>
1 Batch Input phase: Filters out the MessedUp records
2 Implicit Load and Dispatch phase will run at this point.
3 Processing Phase: Process records through steps to mark as Processed or Failed.
4 On Complete phase: Log the processing statistics.

Each of our batch step calls a sub-flow that implements logic to update the status and process record.

Listing 2.B: Batch Step 1 Sub-flow
<sub-flow name="batch-step-1-process">
    <logger message="#['Processing Step 1 for Id:' + payload.id + ' with status: ' + payload.status.trim()]" level="INFO" doc:name="Logger"/>
<batch:set-record-variable variableName="id" value="#[payload.id]" doc:name="Record Variable"/>

    <dw:transform-message doc:name="Transform Message">
        <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
(payload ++ (status: 'Processing' when payload.status == 'Ready' otherwise 'Not-Processing'))]]></dw:set-payload>
    </dw:transform-message>
</sub-flow>
Listing 2.C: Batch Step 2 Sub-flow
<sub-flow name="batch-step-2-process">
    <logger message="#['Processing Step 2 for Id:' + recordVars.id + ' with status: ' + payload.status]" level="INFO" doc:name="Logger"/>
            <dw:transform-message doc:name="Transform Message">
        <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
(payload ++ (status: 'Processed'))]]></dw:set-payload>
    </dw:transform-message>
</sub-flow>
Listing 2.D: Batch Step 3 Sub-flow
<sub-flow name="batch-step-3-process">
    <logger message="#['Processing Step 3 for Id:' + recordVars.id + ' with status: ' + payload.status]" level="INFO" doc:name="Logger"/>
            <dw:transform-message doc:name="Transform Message">
        <dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
(payload ++ (status: 'Failed'))]]></dw:set-payload>
        </dw:transform-message>
    </sub-flow>
</mule>

This is a very simple batch application that does not call any other systems. But it does not mean you can not do that. You can replace the logic in sub-flow with any implementation.

The only message processors that you CANNOT use in batch steps are request-response inbound connector.

3. Conclusion

In this part, We learned what Mule Batch Job is and different phases in batch processing. We also saw how to manage record level variables and then trigger the batch processing. At the end, we saw a simple batch application that process coffee orders.

4. Next: Part 2

In next part, we will enable MUnit on our batch and learn how we can unit test Input and On Complete phase.

5. References

on twitter to get updates on new posts.

Stay updated!

I usually post about Java, Java EE, Integrations, Mule ESB and other things in java ecosystem.

Get updates for new Posts in your email by subscribing to JavaStreets feed here -


Lives on Java Planet, Walks on Java Streets, Read/Writes in Java, JCP member, Java EE enthusiast, MuleSoft Integration Consultant, Open Source Contributor and Supporter, also writes at Unit Testers, A Family man!