Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365 AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop. It consisted of developing over 60 Logic Apps.
With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a good candidate for this type of scenario.
The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages https://connectedcircuits.wordpress.com/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.
It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.
The main component of the solution is the (Scatter/Gather) logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.
After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.
The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.
Setting up the service bus
Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.
Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.
Developing the Scatter/Gather Logic App
The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.
This workflow is triggered by a HTTP request, but can also be triggered by some other connectors.
Next is a series of variables required by the workflow listed below:
- BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid.
- CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
- IntemediateMsg – Used in the loop the temporary store the aggregated message.
- ScatterCount – The number of sub-processors the request body was published to.
- ResponseCount – Holds the current number of responses received from the Processing logic apps.
Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.
To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:
Below is the “Send message processor 1” service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the service bus Topic.
Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)
If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.
First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)
As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.
First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable, and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:
After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.
Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.
Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.
Sub-Processor Logic Apps
The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the messaging orchestration components.
Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.
After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.
Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”
Testing the solution
We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.
“OrderId” : “12”,
“Message” : “Hello1”
Using RequestBin we can see the aggregated response messages from all 3 sub-processors.
Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days to send a response back. When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.
Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and Service Bus.