Using Logic App Webhooks to execute long running SQL queries

On one of my projects, I had a requirement to send several large datasets as csv files to an FTP server using a Logic App.

I had thought about 2 options to implement this requirement. The first option was to use the Azure SQL Server API connector to extract the dataset from the database. But the amount of data to be serialised as JSON would certainly cause the SQL API connector to timeout or exceed the maximum payload size without some form of database paging and using paging would not meet the requirement of a single file.

The second option was to utilise the HTTP Webhook API connector available with Logic Apps to call an Azure Function passing the call-back URL and the database stored procedure to execute. The Azure Function would simply place the JSON message body onto an Azure Storage account queue and then return a HTTP response code 200 to put the logic app into a dehydrated state.

Another Azure Function would poll the queue for new messages. When a new message is received from the queue, it would query the database using the stored procedure name sent in the queue message.  A datareader is then used to read the returned recordset for speed and populate a DataTable. The rows in the DataTable are then converted into a csv file which is then written to a blob container in a streaming fashion. Once the function has completed creating the csv file, it would call-back to Logic App using the URL sent in the queue message which will be in a dehydrated state to continue with the workflow and send the file from the Blobstore to the FTP server.

Using the Webhook option meant the Logic App would go into a dehydrated state after calling the Azure Function and I would not be charged for any consumption time Smile  whilst in this state. This meant the Azure Function may take as long as required to create the csv file while executing under a consumption billing plan.

Below is a sequence diagram showing the activation of each process. The whole process is triggered by a scheduler inside a logic app.

image

Creating the Solution

I have broken the solution into 2 projects, one for the Azure Functions and the other for the Helper classes. I typically abstract all the business logic out from the Azure Functions and place them in a separate class library project were I can create unit tests for the business logic.

image

Defining the Helper Libraries

The first helper class is the AzureStorage which has one method defined to return a blob reference object and to create the blob container if it does not exist.

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Blob;

using System;

using System.Configuration;

using System.Threading.Tasks;

 

namespace Helpers

{

    public static class AzureStorage

    {

        public static async Task<CloudBlockBlob> CreateBlobAsync(string fileName, string containerName)

        {

            try

            {                             

                // Retrieve storage account from connection string.

                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["BlobStore"]);

                // Create the blob client.

                CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

 

                CloudBlobContainer container = blobClient.GetContainerReference(containerName.ToLower());

                //firstly, we need check the container if exists or not. And if not, we need to create one.

                var creaeContainer = await container.CreateIfNotExistsAsync();

 

                // Retrieve reference to a blob

                return  container.GetBlockBlobReference(fileName.ToLower());

            }

            catch (Exception ex)

            {

                throw ex;            

            }

        }      

    }   

}

 

The next class is the “DataAccess” for accessing the database using a datareader to load a DataTable which is then returned. Remember the default command timeout is 30 seconds, so this needs to be increased on the SQLCommand object.

Note this is just a simple implementation of reading the data. Ideally you would add transient fault handling code with retry logic built-in for production releases.

using System;

using System.Configuration;

using System.Data;

using System.Data.SqlClient;

 

namespace Helpers

{

    internal static class DataAccess

    {

        internal static DataTable CreateDataExtract(string sprocName)

        {

            var constr = ConfigurationManager.ConnectionStrings["SQLConnectionString"].ConnectionString;

            DataTable dataTable = null;

            try

            {

                dataTable = new DataTable();

                using (SqlConnection conn = new SqlConnection(constr))

                {

                    conn.Open();

                    SqlCommand cmd = new SqlCommand(sprocName, conn);

                    cmd.CommandType = CommandType.StoredProcedure;

                    cmd.CommandTimeout = 600;

                    dataTable.Load(cmd.ExecuteReader());

                }               

            }

            catch(SqlException sqlEx)

            {

                //capture SQL errors

                throw sqlEx;

            }

            catch (Exception genEx)

            {

                //capture general errors

                throw genEx;

            }

            return dataTable;

        }

    }

}

 

The last helper class “DataExtract” is the entry point for the Azure Function. This calls the other helper methods to read the database and then converts the DataTable into a csv file by writing the output in a streaming manner to the blob container called “csv”

using System;

using System.Data;

using System.IO;

using System.Text;

using System.Threading.Tasks;

 

namespace Helpers

{

    public static class DataExtracts

    {

              

        public static async Task<string> GenerateCSVDataExtractAsync(string storedProc, string columnDelimiter = ",")

        {

            string filename;

            try

            {

                //call data acess 

                var dataTable = DataAccess.CreateDataExtract(storedProc);

 

                //save table as CSV to blobstore

                filename = Guid.NewGuid().ToString("D") + ".csv";                                 

                await WriteDataTableToCSVAsync(dataTable, "csv", filename, columnDelimiter);

 

            }

            catch(Exception ex)

            {

                throw ex;

            }

 

            return filename;

        }

 

        private static async Task WriteDataTableToCSVAsync(DataTable dt, string containerName, string filename, string columnDelimiter)

        {                        

            if (dt != null)

            {             

                using (var ms = new MemoryStream())

                using (var sw = new StreamWriter(ms, Encoding.UTF8))

                {

                    //create the header row       

                    for (int i = 0; i < dt.Columns.Count; i++)

                    {

                        sw.Write(dt.Columns[i].ColumnName);

                        sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                    }

 

                    //append the data rows

                    foreach (DataRow row in dt.Rows)

                    {

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sw.Write(EscapeCSV(row[i].ToString()));

                            sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                        }

                    }

                    sw.Flush();

                    ms.Position = 0;

 

                    //write to blobstore

                    var blob = await BlobStore.CreateBlobAsync(filename, containerName);

                    await blob.UploadFromStreamAsync(ms);

                }

            }

        }

      

        private static  string EscapeCSV(string colData)

        {

            string quote = "\"";

            string escapedQuote = "\"\""; 

            char[] quotedCharacters = new char[] { ',', '"', '\r', '\n', '\t' };

 

 

            if (colData == null) return "";

            if (colData.Contains(quote)) colData = colData.Replace(quote, escapedQuote);

            if (colData.IndexOfAny(quotedCharacters) > 1)

                colData = quote + colData + quote;

 

            return colData;

        }

 

    }

 

}

    

Defining the Azure Functions

The next project called “LongRunningJobs” contains the 2 Azure Functions required by the solution.

Below is the code for the function “SQLJobRequest” which is called by the Logic App. This simply puts the posted JSON request message onto an Azure Storage Queue using an output parameter of type Queue which simplifies the coding to a few lines of code.

using System.Net;

using System.Net.Http;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Extensions.Http;

using Microsoft.Azure.WebJobs.Host;

 

namespace LongRunningJobs

{

   

    public static class SQLJobRequest

    {

        [FunctionName("SQLJobRequest")]      

        public static async Task<HttpResponseMessage> Run(

                                    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,

                                    [Queue("sqljobs", Connection = "BlobStore")]IAsyncCollector<string> outputQueueItem, 

                                    TraceWriter log )

        {

            log.Info("C# HTTP trigger function processed a request.");

            var isParamsValid = false;

          

            // Get request body

            dynamic data = await req.Content.ReadAsAsync<object>();

            string sprocName = data?.SprocName;

            string callBackUrl = data?.CallBackUrl;            

            

            if (!string.IsNullOrEmpty(sprocName) && !string.IsNullOrEmpty(callBackUrl))

            {

                isParamsValid = true;        

                await outputQueueItem.AddAsync(data.ToString());

            }

                        

            

            return isParamsValid == false

                ? req.CreateResponse(HttpStatusCode.BadRequest, "Please pass the sproc name and callback url.")

                : req.CreateResponse(HttpStatusCode.OK);

        }

    }

}

The line [Queue(“<name-of-queue>“, Connection = “<blob-connection-string>“)]IAsyncCollector<string> outputQueueItem in the function parameter list, uses output attributes to define the name of the queue and the name of the blob connection string to use.

The other function “SQLJobExecute” polls the Storage Queue “sqljobs” for new messages. When a new message is received, it calls the helper class method “DataExtracts.GenerateCSVDataExtractAsync(sprocName)” to create the csv file. If the csv file was created successfully, the filename of the csv file is returned in the HTTP Call-back Post body.

using System;

using System.Net.Http;

using System.Net.Http.Headers;

using System.Text;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Host;

using Newtonsoft.Json;

 

namespace LongRunningJobs

{

    public static class SQLJobExecute

    {

        [FunctionName("SQLJobExecute")]

        public static void Run([QueueTrigger("sqljobs", Connection = "BlobStore")]string queueItem, TraceWriter log)

        {

            log.Info($"C# Queue trigger function processed: {queueItem}");

 

            dynamic data = JsonConvert.DeserializeObject(queueItem);

 

            string sprocName = data.SprocName;

            string callBackUrl = data.CallBackUrl;

            string logicappRunId = data.RunId;

 

            //check if valid parameters were passed.

            if(string.IsNullOrEmpty(sprocName) || string.IsNullOrEmpty(callBackUrl) || string.IsNullOrEmpty(logicappRunId))

                log.Error("Null value parameters passed.");

            else

                Task.Run(async() => { await ExecuteQueryAsync(callBackUrl, sprocName, logicappRunId, log);});

            

        }

 

        private static async Task ExecuteQueryAsync(string callBackUrl, string sprocName, string logicAppRunId, TraceWriter log)

        {

            string blobFilename = string.Empty;

            try

            {

                //call the helper class to create the csv file.

                blobFilename = await DataExtracts.GenerateCSVDataExtractAsync(sprocName);

            }

            catch (Exception ex)

            {

                log.Error(ex.Message, ex);

            }

 

            try

            {

                //call-back to the Logic App Webhook using the URL passed in the message from the queue.

                using (var httpClient = new HttpClient())

                {

                    var postUrl = new Uri(callBackUrl);

                    httpClient.DefaultRequestHeaders.Accept.Clear();

                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

                    var content = new StringContent("{\"SprocName\":\"" + sprocName + "\",\"Filename\":\"" + blobFilename + "\"}", Encoding.UTF8);

                    var response = await httpClient.PostAsync(postUrl, content);

                    response.EnsureSuccessStatusCode();

                }

            }

            catch (Exception ex)

            {

                log.Error(string.Format("Error occurred when executing function for logic app runId:{0}\n{1}", logicAppRunId, ex.Message), ex);

            }

 

        }

    }

}

Below is the local.settings.json file which has the following values defined for the Blob Store and SQL Database.image

After the Azure Functions have been deployed to an Azure App Service, the Application Settings for the Blob Store and SQL Server settings highlighted below need to be added.image

Logic App Definition

Below are Azure resources used for this solution. The Azure Function are deployed to the “dalfunctions” App Service and “dataextracts” Storage account contains the Blob Store for the csv files and the storage queue “sqljobs”.image

The Logic App is a very simple workflow which is triggered by a scheduler.

image

The HTTP Webhook shape is what calls the function endpoint “SQLJobRequest” and then puts the Logic App into a dehydrated state.

image

The Subscribe-URI can be obtained from the App Service after the Azure Functions have been deployed and using the “Get function URL” link below.image

The Subscribe-Body property consists of a JSON payload.  This includes the call-back Url, Logic App run Id for debugging latter if required and the SQL stored procedure name to call in the database. Below is the code view for the HTTP Webhook action.

image

After the HTTP Webhook shape is the JSON Parser and a condition to check if a filename was returned in the webhook request body.

image

The schema for the JSON Parser checks for the FileName and the SprocName.image

The expression for the condition is below:image

The “True” condition gets the csv file from the Blob Store and sends it to the FTP server. Note we have to prefix the filename with the Blob container name. Once the file has been sent, it is deleted from the Blob Store.image

If no filename was received when the Logic App comes out of its dehydrated state, the workflow is terminated.

image

Sample runs

As you can see from the sample runs below, some are taking over 6 minutes to complete which would normally cause a HTTP Function action to timeout.image

Conclusion

The solution can easily be modified to pass a list of stored procedures to create several csv files in one call and then have the call-back function pass the list of filenames to send via FTP.

This pattern could also be used for other scenarios that require an Azure Function to execute some long running process which would cause the normal HTTP Function action to timeout while waiting for the response message.

Enjoy…

Advertisements
Posted in Azure, Functions, Logic Apps | Tagged , , , , , | 1 Comment

Using Azure APIM Policies to Route on HTTP Verbs & Resources

The policies available in APIM are indeed very powerful. It provides the ability to modify the backend flow of a request using rules based on the payload contents or the request context properties. I have yet to find a scenario where I could not resolve a requirement using the inbuilt statements.

One example is I had to send a request to different endpoints depending on the request verb (Post,Put,Delete) and the resource location.

image

A policy inspects the incoming request Url and the operation method to determine which Azure Logic App or Function to forward the request onto.

The inbound policy section for this scenario is shown below. It uses the Xslt Choose function to match on the Url path and the operation. If a match is found, it sets the variable “logicappEndpoint” to the name-value variable defined in the Azure APIM blade.

<inbound>

    <choose>

        <when condition="@(context.Request.Url.Path.Equals("inventory")  && context.Operation.Method.Equals("PUT"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductUpdateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("inventory")  && context.Operation.Method.Equals("POST"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductCreateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("inventory") && context.Operation.Method.Equals("DELETE"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductDeleteUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("stockmovement") && context.Operation.Method.Equals("PUT"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXStockMovementUpdateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("stockmovement") && context.Operation.Method.Equals("POST"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXStockMovementCreateUrl}}" />

        </when>        

    </choose>

    <base />

</inbound>

To reference the Named-Value properties defined in the portal from within a policy, simply wrap the property name with double braces as shown here as an example:-  {{UAT-AXStockMovementCreateUrl}}

These Name-Values pairs are then created in APIM blade of the Azure Portal shown below.

image

Once the Inbound policy section has been created, we then need to create the rule for the Backend section shown below. Here I am also setting a retry condition to 3 times before sending back a status error code of 500.  Also I am always forwarding the request to the Logic App as a POST.

<backend>

    <retry condition="@(context.Response.StatusCode == 500)" count="3" interval="5" max-interval="10" delta="2" first-fast-retry="false">

        <send-request mode="copy" response-variable-name="tokenstate" timeout="120" ignore-error="true">

            <set-url>@(context.Variables.GetValueOrDefault<string>("logicappEndpoint"))</set-url>

            <set-method>POST</set-method>

            <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" />

        </send-request>

    </retry>

</backend>

 

I hope this short blog gives you a basic understanding of how to redirect a request based on the HTTP verb and a resource location. Enjoy…

Posted in APIM | Tagged | 2 Comments

Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App. image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.image

Add a suitable name and the resource group. image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.image

 

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.image

This variable is used to append all the API response messages.image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.image

Code view for the “Complete the message in a queue” is shown below.image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.image

The last step is to return the aggregated message as a typed JSON message.image

Below is code view for the Response Action.image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
  {
    “topic”: string,
    “subject”: string,   
    “id”: string,
    “eventType”: string,
    “eventTime”: string,
    “data”:{
      object-unique-to-each-publisher
    },
    “dataVersion”: string,
    “metadataVersion”: string
  }
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6.  
  7. }

Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.  
  6.             string topicEndpoint = "https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events&quot;;
  7.             //add the event grid access key
  8.             string sasKey = "<Event Gris sas key>";
  9.  
  10.             // Event grid expects event data as JSON
  11.             var json = JsonConvert.SerializeObject(payloadList);
  12.  
  13.             // Create request which will be sent to the topic
  14.             var content = new StringContent(json, Encoding.UTF8, "application/json");
  15.  
  16.             // Create a HTTP client which we will use to post to the Event Grid Topic
  17.             var httpClient = new HttpClient();
  18.  
  19.             // Add key in the request headers
  20.             httpClient.DefaultRequestHeaders.Add("aeg-sas-key", sasKey);
  21.  
  22.           
  23.             // Send request
  24.             Console.WriteLine("Sending event to Event Grid…");
  25.             var result = await httpClient.PostAsync(topicEndpoint, content);
  26.  
  27.             return result.ReasonPhrase;
  28.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format("<API Gatherer Logic App Callback url>", batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.  
  6.     var payload = new GridEvent<PurchaseOrder>
  7.     {
  8.         Data = new PurchaseOrder { OrderNumber = "1234", CustomerNumber = "ABC100", ProductCode = "PRD2043" },
  9.         EventTime = DateTime.UtcNow,
  10.         EventType = "orders",
  11.         Id = batchId,
  12.         Subject = "po/validation"
  13.  
  14.     };
  15.    
  16.     var rc = SendMsgAsync(payload).Result;
  17.     Console.WriteLine($"Event sent with result:" + rc);
  18.  
  19.     Thread.Sleep(1000);
  20.     var msg = ReadMsgAsync(batchId).Result;
  21.     Console.WriteLine("Response Message…");
  22.     Console.WriteLine(msg);
  23.     Console.ReadLine();
  24. }

 

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds. image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…

Posted in Azure, Logic Apps, Service bus | Tagged , , , , | 1 Comment

Message Re-sequencing using Azure Functions and Table storage

Message re-sequencing  is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.

As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order. 

image 

Scenario

A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.

The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.

Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.wordpress.com/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.

Approach

For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number. 

A storage repository  is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.

An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.

The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function.  Then forward on the message when all de-batched line items have been received.

There are two possibilities available as when to forward the re-ordered messages out.

  • On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
  • Last message arrival –  where we send all the messages out I one hit after the final message  has been received.

This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.

Design

Creating the Azure Table Storage

First we need to add an Azure Storage account to our resource group to store the incoming messages.

clip_image002

Take a copy of the connection string from Settings/Access keys of Storage account as we will require this latter.clip_image004

Building the Azure Function

Start by creating an Azure Functions project in Visual Studio 2017. clip_image006

Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.

clip_image007

This class will be used as a container to hold the message contents and sequence numbering attributes.

Now we need to add the following three methods to the function described below.

  • AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
  • AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
  • AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.

To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function.clip_image009

Then select the “Generic WebHook” type. This will allow a Logic App to access the interface from the workflow designer using the Azure Functions action shape latter. clip_image010

Add the following code to this new method “AzureTableStoreMsg”

   1: [FunctionName("AzureTableStoreMsg")]

   2: public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3: {

   4:     log.Info($"Webhook was triggered!");

   5:  

   6:     var lastMsgIndex = 0;

   7:     var msgCount = 0;

   8:     var eomDateTimeReceived = DateTime.MinValue;

   9:  

  10:     try

  11:     {

  12:         //read the request headers              

  13:         var sequencenumber = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqNumber").FirstOrDefault());

  14:         var totalMsgCount = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqTotalCount").FirstOrDefault());

  15:         var batchnumber = req.Headers.GetValues("x-MsgReseqBatchId").FirstOrDefault();

  16:         var isEndMsg = totalMsgCount == sequencenumber ? true : false;

  17:  

  18:         string jsonContent = await req.Content.ReadAsStringAsync();

  19:  

  20:         var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  21:         // Create the table client.

  22:         CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  23:         // Retrieve a reference to the table.

  24:         CloudTable table = tableClient.GetTableReference("msgsequence");

  25:         // Create the table if it doesn't exist.

  26:         table.CreateIfNotExists();

  27:  

  28:         // Get request body and initialise a MsgEntity object.

  29:         var requestBody = await req.Content.ReadAsAsync<object>();

  30:         var msgEntity = new MsgEntity(batchnumber, sequencenumber, isEndMsg);

  31:         msgEntity.Message = requestBody.ToString();

  32:  

  33:         // Create the TableOperation object that inserts the message entity. Will rasie error if duplicate found.

  34:         TableOperation insertOperation = TableOperation.InsertOrReplace(msgEntity);

  35:  

  36:         // Execute the insert operation.

  37:         await table.ExecuteAsync(insertOperation);

  38:  

  39:         //iterate through all the messages for this partition check if the last message has been received and the number of messages

  40:         var queryMsgType = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  41:                            .Select(new string[] { "RowKey", "Timestamp", "IsLastMessage" });

  42:  

  43:         foreach (MsgEntity entity in table.ExecuteQuery(queryMsgType))

  44:         {

  45:             if (entity.IsLastMessage)

  46:             {

  47:                 lastMsgIndex = Convert.ToInt32(entity.RowKey);

  48:                 eomDateTimeReceived = entity.Timestamp.UtcDateTime;

  49:             }

  50:             msgCount++;

  51:         }

  52:  

  53:     }

  54:     catch (Exception ex)

  55:     {

  56:         return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);

  57:     }

  58:  

  59:     // Return the number of messages when the completion message arrived.

  60:     return req.CreateResponse(HttpStatusCode.OK, new { EOMReceived = eomDateTimeReceived, EOMIndex = lastMsgIndex, TotalMessagesReceived = msgCount });

  61:  

  62: }

The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.

image

Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.

   1: [FunctionName("AzureTableRetreiveMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:  

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string,string>>(jsonContent);

   8:  

   9:            var sequencenumber = data["MsgSequenceNUmber"];

  10:            var batchnumber = data["MsgBatchId"];

  11:           

  12:  

  13:            var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  14:            // Create the table client.

  15:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  16:            // Retrieve a reference to the table.

  17:            CloudTable table = tableClient.GetTableReference("msgsequence");

  18:  

  19:            // Construct the query operation for all entities where PartitionKey equals msgtype.

  20:            TableQuery<MsgEntity> query = null;

  21:            if (string.IsNullOrEmpty(sequencenumber))

  22:            {

  23:                query = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber));

  24:            }

  25:            else

  26:            {

  27:                var sequenceFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, string.Format(sequencenumber,"0000000000"));

  28:                var msgTypeFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber);

  29:                query = new TableQuery<MsgEntity>().Where(TableQuery.CombineFilters(sequenceFilter, TableOperators.And, msgTypeFilter));

  30:            }

  31:  

  32:            var list = new List<dynamic>();

  33:            foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  34:            {

  35:                list.Add(JsonConvert.DeserializeObject(entity.Message));

  36:            }

  37:  

  38:            return req.CreateResponse(HttpStatusCode.OK, list);

  39:        }

This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.

image

The last Azure Function method to add is “AzureTableRemoveMsg”.

   1: [FunctionName("AzureTableDeleteMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:  

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string, string>>(jsonContent);

   8:  

   9:            var batchnumber = data["MsgBatchId"];

  10:  

  11:            var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  12:            // Create the table client.

  13:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  14:            // Retrieve a reference to the table.

  15:            CloudTable table = tableClient.GetTableReference("msgsequence");

  16:  

  17:            // Construct the query operation for all entities where PartitionKey equals batchnumber.            

  18:            if (!string.IsNullOrEmpty(batchnumber))

  19:            {

  20:                TableQuery<MsgEntity> query = new TableQuery<MsgEntity>()

  21:                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  22:                            .Select(new string[] { "PartitionKey", "RowKey" });

  23:  

  24:                if (table.ExecuteQuery(query).Count() == 0)

  25:                {

  26:                    return req.CreateResponse(HttpStatusCode.NotFound);

  27:                }

  28:                foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  29:                {

  30:                    TableOperation deleteOperation = TableOperation.Delete(entity);

  31:                    await table.ExecuteAsync(deleteOperation);

  32:                }

  33:                return req.CreateResponse(HttpStatusCode.OK);

  34:            }

  35:  

  36:  

  37:            return req.CreateResponse(HttpStatusCode.BadRequest);

  38:        }

This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.

image

Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.

clip_image012 

Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.

Publishing the Function to Azure

Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.

clip_image014

In the next screen add a function name, subscription and hosting plans etc. Then click the create button to provision any new resources and to publish the function. clip_image016

Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal. clip_image018

Then click on the Application settings link. clip_image020

In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen. clip_image022

 

Logic App for message de-batching and re-sequencing

Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice.  The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.

The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.

image

Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.  

image

Two variables are required for holding the total number of  line items and the PO lines from the Message Re-sequencer function.

image

The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.

 image

After the variables are initialised, is the For each loop shape where we iterate through the PO lines of the message.image

The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.

image

The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.

Functions that have been created as generic webhooks can be selected from the Logic App designer by adding an action to the designer and choosing Azure Functions.image

After selecting Azure functions, you should be able to see all your published functions. Click on the Azure function that was published above. image

This will then display all the methods that we had created above. Make sure you select the AzureTableStoreMsg method from the list. image

After the function method has been added, setup the properties as shown.image

The body and header properties are set to the following values.

  • Request Body  – output body of the POValidator Logic App (Microservice)
  • x-MsgReseqBatchId  – the OrderNumber of the trigger message
  • x-MsgReseqNumber – the line number of the item in the loop
  • x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.

And the code view for this action is shown here:image

The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.

image

The syntax for the condition action “Check msg count” is as follows:

@equals(body(‘AzureTableStoreMsg’)[‘EOMIndex’], body(‘AzureTableStoreMsg’)[‘TotalMessagesReceived’])

If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.

image

The code view for this shape looks like this.image

We then set the variable LineItems to the output of the function method “AzureTableRetrieveMsg”. image

Once we set the variable, we can then clear the table storage of this data by calling the function method “AzureTableDeleteMsg” and passing the PO Order number in the body.image

When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.

  image

Here is the code view of the Compose shape.image

The final step is Response body to that of the Compose shape above.

image

That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.

Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.

image

Testing

To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.

image

As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due  to the delay shape.

image

Here is the response message with the total price appended to each line item and in the correct sequence.

image

Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.

image

Here is the response with the lines re-sequenced in the correct order.

image

Conclusion

Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.

Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.

You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.

image

You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.

Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.

Enjoy…    

Posted in Azure, Functions, Logic Apps | Tagged , , , , , | 3 Comments

Content based message routing using Azure Logic Apps, Function and Service Bus

Content Based Routing (CBR) is another pattern used in the integration world. The contents of the message determines the endpoint of the message.

This article will describe one option to develop this pattern using an Azure Service Bus, an Azure Function and a Logic App.

Basically the service bus will be used to route the message to the correct endpoint using topics and subscriptions. The Azure Function is used to host and execute the business rules to inspect the message contents and set the routing properties. A logic app is used to accept the message and call the function passing the received message as an argument. Once the function executes the business rules, it will return the routing properties in the response body. The routing information is then used to set the properties on the service bus API connector in the Logic App before publishing the message onto the bus.

image

Scenario

To demonstrate a typical use-case of this pattern we have 2 message types, Sales Orders (SO) and Purchase Orders (PO). For the SO I want to send the order to a priority queue if the total sales amount is over a particular value. And for a PO, it should be sent to a pre-approval queue if the order value is under a specified amount.

Here is an example of a SO message to be routed: image

And an example of a PO being sent:image

Solution

The real smarts of this solution is the function which will return a common JSON response message to set the values for the Topic name and the custom properties on the service bus connector. The fields of the response message are described below.

  • TopicName – the name of service bus topic to send the message to.
  • CBRFilter_1 – used by the subscription rule to filter the value on. Depending on your own requirements you may need more fields to filter more granular.
  • RuleSetVersion – used by the subscription rule to filter the value on. It’s a good idea to have this field as you may have several versions of this rule in play at any one time.

Let’s start with provisioning the service bus topics and subscriptions for the 2 types of messages. First create 2 topics called purchaseorder and salesorder.

image

Now add the following subscriptions and rules for each of the topics.

Topic Name Subscription Name Rule
purchaseorder Approved_V1.00 CBRFilter_1 = ‘Approved’ and RuleSetVersion = ‘1.00’
purchaseorder NotApproved_V1.00 CBRFilter_1 = ‘ApprovedNot’ and RuleSetVersion = ‘1.00’
salesorder HighPriority_V1.00 CBRFilter_1 = ‘PriorityHigh’ and RuleSetVersion = ‘1.00’
salesorder LowPriority_V1.00 CBRFilter_1 = ‘PriorityLow’ and RuleSetVersion = ‘1.00’

Next is the development of the Azure function. This is best developed in Visual Studio where you can include a Unit Test project to each of the rules. Add a new Azure Function project to your solution.image

After the project has been created, right click on the function and click Add -> New Item. Choose Azure Function, give it a name and select the Http trigger option.image

Below is code for the HTTP trigger function which includes the class definition for the RoutingProperties object. I am checking for specific elements SalesOrderNumber, PurchaseOrderNumber in the JSON message to determine the type of message and which determines what rule code block to execute. Each rule block code will first set the TopicName and RuleSetVersion properties.

public static class CBRRule

   {

       [FunctionName("CBRRule")]

       public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function,  "post", Route = null)]HttpRequestMessage req, TraceWriter log)

       {

           log.Info("C# HTTP trigger function processed a request.");

           var routingProperties = new RoutingProperties();

          

           // Get request body

           JObject data = await req.Content.ReadAsAsync<JObject>();

 

           //Is this a  sales order message type           

           if (data != null && data["SalesOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "PriorityLow";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "SalesOrder";

 

               var lineItems = data["Lines"];                

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);

 

               //if the total sales is greater than $1000 send the message to the high priority queue

               if (totalSaleAmount > 1000)

                   routingProperties.CBRFilter_1 = "PriorityHigh";

           }

 

           //Is this a purchase order message type           

           if (data != null && data["PurchaseOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "ApprovedNot";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "PurchaseOrder";

 

               var lineItems = data["Lines"];                

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);

 

               //Approve PO if the total order price is less than $500

               if (totalSaleAmount < 500)

                   routingProperties.CBRFilter_1 = "Approved";

           }

          

           return req.CreateResponse(HttpStatusCode.OK, routingProperties);

       }

 

       /// <summary>

       /// Response message to set the custom routing properties of the service bus

       /// </summary>

       public class RoutingProperties

       {

           public string TopicName { get; set; }

           public string CBRFilter_1 { get; set; }

           public string RuleSetVersion { get; set; }

 

           public RoutingProperties()

           {

               this.CBRFilter_1 = "Unknown";

               this.RuleSetVersion = "Unknown";

               this.TopicName = "Unknown";

           }

       }

     

   }

The business rule for a SO aggregates all the line items and checks if the total amount is greater than 1000, if it is then set the property CBRFilter_1 to “PriorityHigh”.

The business rule for a PO also aggregates all the line items and checks if the total amount is less than 500, if it is then set the property CBRFilter to “Approved”.

With the following input message sent to the function:clip_image001

The output of the function should look similar to this below:clip_image001[6]

Now we need to publish the function from Visual Studio to your Azure resource group using the publishing wizard.clip_image002

The last component of this solution is the Logic App which is triggered by an HTTP Request API and then calls the Azure function created above. The basic flow looks like this below.clip_image004

The HTTP Request trigger has no request body JSON schema created. The trigger must accept any type of message.clip_image006

Add an Azure Function after the trigger action and select the method called “CBRRule” clip_image008

Set the Request Body to the trigger body content and the Method to “POST”clip_image010

Next add a Service Bus action and set the properties as shown. Both the Queue/Topic name and Properties are set from the function response message.clip_image012

Here is the code view of the Send Message action showing how the properties are set.clip_image014

Testing

Using PostMan we can send sample messages to the Logic App and then see them sitting on the service bus waiting to be processed by some other method.

At the moment the service bus should have no messages waiting to be processed as shown.clip_image015

Using PostMan to send the following PO, we should see this message end up in the purchaseorder/NotApproved subscription.clip_image016

All going well, the message will arrive at the correct subscription waiting to be processed as shown below using Service Bus Explorer.clip_image018

Sending the following SO will also route the message to the correct subscriber.clip_image019

clip_image021

Conclusion

CBR can be easily achievable using an Azure Function to execute your business rules on a message to set up the routing properties. Taking this a step further, I would abstract the business rules for each message type in its own class for manageability.

Also it is advisable to setup a Unit Test project for each of the classes holding the business rules to ensure 100% code testing coverage.

Enjoy…

Posted in Azure, Logic Apps, Service bus | Tagged , , , , , , | 2 Comments

Itinerary based message routing using Azure Logic Apps and Service Bus Actions

This is another integration pattern used quite extensively in the integration world. It is used when a message is required to be routed to several endpoints in a particular order using some form of routing list. Depending on your business requirements, the message being routed may be enriched or replaced before sending it to the next service endpoint in the list.

image

This is probably one of the easiest integration patterns to implement using Logic Apps and Service Bus Actions. There are probably other methods to implement this pattern, but I wanted to abstract the routing logic away from the Logic App itself and leave it to focus on the business process and not worry about setting up the next routing endpoint.

By using the Actions feature of the service bus I can defined the routing order of the next service endpoint, or in this case the next service bus subscription by setting properties to match the next subscriber filter condition. A service bus action is executed after the filter condition has been met and is used to set the value of either a system or custom property before the message is consumed from the service bus. It is set in a similar way to the T-SQL “Set” command where you set a field to a value.

For this scenario I have a sales order message that is required to be passed to several logic apps in a particular sequence. There is a sales order header logic app, sales order line logic app, sale order payments logic app and finally a sales order completion logic app. The sales order message will look similar to this below.

image

Provisioning Azure Service Bus

The method involves creating a service bus topic called “salesorder” with the following subscriptions.

servicebus

The filter and action rules are setup as in the following table. There are 2 custom properties “MsgType” and “ItineraryLeg” used for the subscriptions. The MsgType is just used to group the messages and the ItineraryLeg  defines the order of the subscribers.

The action is used to specify the next subscriber to send the message to after the current process by setting the ItineraryLeg property.

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘solines’
2 SOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘sopayments’
3 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘socompletion’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’  

A sample of the SOHeader rules are shown below:image

Creating the Logic Apps

Next we will provision the 4 logic apps to create the SO header, process the line items, apply the payments and complete the sales order. 

image

The logic apps (SalesOrderHeaderProcessor, SalesOrderLinesProcessor, SalesOrderPaymentsProcessor) are constructed in a similar manner as shown below and the only difference is the topic subscription names for the service bus trigger action.  The Delay action is where you would implement your business process logic on the received message.

image

Expanding the service bus trigger shape has the following properties set. The other logic apps will have different Topic subscription names set.image

The real smarts of these workflows are the “Republish message” service bus action shape. You will need to add this shape to every logic app that is involved in processing the message. Here we set the ItineraryLog and MsgType property values to the same values in the Trigger service bus action shape. Also I am adding another custom  property called “Tracking”. This is used to track the name of logic app the message was routed to in a chorological order and can be useful for debugging. The content property is just set the received message from the service bus trigger. Depending on your business requirements, you may be required to publish a totally different message.

image

Below is the code behind for the Republish message shape showing how to setup the properties section.

“body”: {
                       “ContentData”: “@{triggerBody()?[‘ContentData’]}”,
                       “Properties”: {
                           “ItineraryLeg”: “@triggerBody()?[‘Properties’][‘ItineraryLeg’]”,
                           “MsgType”: “@triggerBody()?[‘Properties’][‘MsgType’]”,
                           “Tracking”: “@concat(coalesce(triggerBody()?[‘Properties’]?[‘Tracking’],’Begin’),’,’,workflow()?[‘name’])”
                       }
                   },

The other logic app SalesOrderCompletionProcessor simply pulls the message from the last subscription and sends the Tracking property value to RequestBin.

image

Messages are placed onto the service bus using the MessagePublisher logic app which accepts a json message and initialises the service bus custom properties “ItineraryLeg” and “MsgType”. Here the ItineraryLeg is set to an empty string and the MsgType being set to “salesorder”. The body of the HTTP request trigger is used as the message content for the Send message action shape.

image

Testing

Using PostMan we can POST a message to the MessagePublisher logic app. If you set the delays long enough in the logic apps, you can see the message being published and consumed by the subscribers in the correct order as defined in the service bus actions.

Here is the result of the message being posted to the MessagePublisher logic app and the output from the SalesOrderCompletionProcessor  logic app which sends it to RequestBin.

Begin,SalesOrderHeaderProcessor,SalesOrderLinesProcessor,SalesOrderPaymentsProcessor,SalesOrderCompletionProcessor

Now if I wanted to change the order of the message processing, say I wanted to process the payments before the sales order lines. Here I would simply update the actions on the service bus subscriptions as follows:

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘sopayments’
3 SOPaymentsSOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘socompletion’
2 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘solines’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’  

Now the output of the SalesOrderCompletionProcessor looks like this below. You can clearly see the payments logic app being executed after the sales header process.

Begin,SalesOrderHeaderProcessor,SalesOrderPaymentsProcessor,SalesOrderLinesProcessor,SalesOrderCompletionProcessor

In Summary

Use service bus actions to manage the itinerary list to abstract the routing logic away from the normal business process of the logic apps.

Typically you would use Azure Resource templates to setup the service bus subscriptions and rules under TFS control.

Keep a watch out for my next article on Content Based Routing using Azure Logic Apps and Service Bus.

Enjoy…

Posted in Logic Apps, Service bus | Tagged , , , | 3 Comments

Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus

Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365  AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop.  It consisted of developing over 60 Logic Apps.

With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a  good candidate for this type of scenario.

The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages  https://connectedcircuits.wordpress.com/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.

image

It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.

The main component of the solution is the (Scatter/Gather)  logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.

After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.

The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.

Setting up the service bus

Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.

image

Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.image

Developing the Scatter/Gather Logic App

The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.

image

This  workflow is triggered by a HTTP request, but can also be triggered by some other connectors.

image

Next is a series of variables required by the workflow listed below:

  • BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid. 
  • CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
  • IntemediateMsg – Used in the loop the temporary store the aggregated message.
  • ScatterCount – The number of sub-processors the request body was published to.
  • ResponseCount – Holds the current number of responses received from the Processing logic apps.

image

Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.

image

To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:

“X-ScatterList”:”‘Process1′,’Process2’,’Process3′”

The filters on the parallel conditions are setup as below, where is checks if the list in the HTTP header contains the process name.image

Expanding one of the condition tasks shows an action to send the request onto the Service Bus  Topic and to increment the ScatterCount variable by one.image

Below is the “Send message processor 1”  service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the  service bus Topic.image

The next step is to increment to Scatter count. This is how we keep track of the number sub-processors that the message was scattered to. image

Once the request body content has been published to the service bus Topic, we cycle in a loop until we have received all the response messages from the service bus queue or the Until loop times out.image

The first step in the loop is to get one new message at a time from the queue using the BatchId variable as the session Id. This is to ensure we only get messages off the queue matching this same Id. image

Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)

image

If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.

Below are the actions inside of the “If true” branch.image

First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)

image

If it is, then we just set the variable “CompositeMsg” to the service bus queue content data.image

As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.image

Now if this was not the first message received of the service bus, we need to append it the other messages received. This is done with the following actions in the “If false” branch shown below.image

First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable,  and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:

“value”: “@{concat(variables(‘IntermediateMsg’),’,’,base64ToString(body(‘Get_messages_from_a_queue_(peek-lock)’)?[0][‘ContentData’]))}”

After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.

 image

Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.

image

Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.

Sub-Processor Logic Apps

The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the  messaging orchestration components.image

The properties of the trigger service bus connector is shown below which has a typical setup.image

Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.image

After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.image 

Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”image

The code view for the connector looks like this:image

The last task of the workflow is to complete the topic subscription as shown here:image

Testing the solution

We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.

POST /workflows/…/triggers/manual/paths/invoke?api-version=2016-10-01&amp;sp=%2Ftriggers%2FmanualHTTP/1.1
Host: prod-11.australiasoutheast.logic.azure.com:443
Content-Type: application/json
X-ScatterList: ‘Process1′,’Process2′,’Process3’
Cache-Control: no-cache
Postman-Token: 644c5cdb-fd60-4903-b3ec-2d3d6febfe7e

{
    “OrderId” : “12”,
    “Message” : “Hello1”
}

Using RequestBin we can see the aggregated response messages from all 3 sub-processors.

[{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process1"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process2"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process3"}]

What’s next

Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days  to send a response back.  When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.

Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and  Service Bus.

Enjoy…

Posted in Azure, Logic Apps | Tagged , , , , | 2 Comments