Message re-sequencing is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.
As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order.
A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.
The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.
Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.wordpress.com/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.
For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number.
A storage repository is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.
An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.
The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function. Then forward on the message when all de-batched line items have been received.
There are two possibilities available as when to forward the re-ordered messages out.
- On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
- Last message arrival – where we send all the messages out I one hit after the final message has been received.
This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.
Creating the Azure Table Storage
First we need to add an Azure Storage account to our resource group to store the incoming messages.
Building the Azure Function
Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.
This class will be used as a container to hold the message contents and sequence numbering attributes.
Now we need to add the following three methods to the function described below.
- AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
- AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
- AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.
To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function.
Add the following code to this new method “AzureTableStoreMsg”
The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.
Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.
This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.
The last Azure Function method to add is “AzureTableRemoveMsg”.
This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.
Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.
Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.
Publishing the Function to Azure
Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.
Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal.
In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen.
Logic App for message de-batching and re-sequencing
Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice. The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.
The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.
Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.
Two variables are required for holding the total number of line items and the PO lines from the Message Re-sequencer function.
The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.
The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.
The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.
The body and header properties are set to the following values.
- Request Body – output body of the POValidator Logic App (Microservice)
- x-MsgReseqBatchId – the OrderNumber of the trigger message
- x-MsgReseqNumber – the line number of the item in the loop
- x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.
The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.
The syntax for the condition action “Check msg count” is as follows:
If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.
When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.
The final step is Response body to that of the Compose shape above.
That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.
Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.
To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.
As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due to the delay shape.
Here is the response message with the total price appended to each line item and in the correct sequence.
Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.
Here is the response with the lines re-sequenced in the correct order.
Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.
Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.
You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.
You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.
Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.