Service bus triggered Azure functions in Peek-Lock mode

Service bus triggered Azure functions in Peek-Lock mode

Read messages in Peek-Lock mode and mark complete/dead-lettered based on processing outcome.

Service bus triggered Azure functions runs on arrival of new messages in queue/topic-subscription. By default, it reads messages in Receive-and-Delete mode which means as soon as message is fetched by function trigger, it is marked as completed and removed from queue.

But, it could be necessary to wait until processing is completed in order to decide if message to be marked as complete (success) or dead-lettered (failure).

To achieve this, we need to read messages in Peek-Lock mode and it can be done in .NET Azure functions as follows.

When we create a service bus triggered function, default template looks like something below-

public class ServiceBusTopicTrigger1
    {
        [FunctionName("ServiceBusTopicTrigger1")]
        public void Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Conn_String")]string mySbMsg, ILogger log)
        {
            log.LogInformation($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
        }
    }

This code receives messages in Receive-and-Delete mode as default.

Below is modified function trigger in order to enable Peek-Lock mode.

public class ServiceBusTopicTrigger1
    {
        [FunctionName("ServiceBusTopicTrigger1")]
        public static async Task Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Conn_String", AutoCompleteMessages = false)] ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions, ILogger log)

        {
            log.LogInformation($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
        }
    }

Changes made are as follows-

  1. Set ServiceBusTrigger Attribute AutoCompleteMessages = false. This will make trigger read messages in peek-lock mode and does not delete messages from active queue immediately.

  2. Receive message as object of class ServiceBusReceivedMessage and not as string which is default when creating service bus triggered functions. This object is required to mark messages complete/dead-lettered.

  3. Inject another parameter to Run method ServiceBusMessageActions messageActions. This class provides methods to mark messages complete/dead-lettered.

Use below sample azure function to get the full context of Azure function along with implementation of marking messages as complete/dead-lettered.

using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;

namespace FunctionApp
{

    public static class ServiceBusTopicTrigger1
    {
        [FunctionName("ServiceBusTopicTrigger1")]
        public static async Task Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Connection_String", AutoCompleteMessages = false)] ServiceBusReceivedMessage message,
        ServiceBusMessageActions messageActions, ILogger log)
        {
            var correlationId = message.CorrelationId;
            log.LogInformation("Function ServiceBusTopicTrigger1 started processing message. CorrelationId: {CorrelationId}", correlationId);
            try
            {
            // calling another HTTP Azure function 'ProcessMessage' to process the message.
            //based on HTTP response, message is marked complete/dead-lettered
            var httpRequest = new DefaultHttpContext().Request;
            httpRequest.Body = new MemoryStream(Encoding.UTF8.GetBytes(message.Body.ToString()));  
            httpRequest.Headers.Add("CorrelationId", correlationId);          
            HttpResponseMessage response= await ProcessMessage.Run(httpRequest, log); 

            if(response.StatusCode==System.Net.HttpStatusCode.OK)
            {
                await messageActions.CompleteMessageAsync(message);
                log.LogInformation("Message processed successfully. CorrelationId: {CorrelationId}",  correlationId);
            }
            else if(response.StatusCode==System.Net.HttpStatusCode.InternalServerError)
            {
                await messageActions.DeadLetterMessageAsync(message, "InternalServerError", "Message dead-lettered due to unexpected transient failure");
                log.LogInformation("Message processing failed. CorrelationId: {CorrelationId}",  correlationId);
            }
            else
            {
                await messageActions.DeadLetterMessageAsync(message);
                log.LogInformation("Message processing failed. CorrelationId: {CorrelationId}",  correlationId);
            }
            }
            catch (Exception ex)
            {
            await messageActions.DeadLetterMessageAsync(message, "TransientFailure", ex.Message);
            log.LogError(ex, "Exception occurred while processing message. CorrelationId: {CorrelationId}", correlationId);
            }
        }
    }
}

Note- Parameters supplied to method DeadLetterMessageAsync are -ServiceBusMessageActions.DeadLetterMessageAsync(ServiceBusReceivedMessage message, string deadLetterReason, [string deadLetterErrorDescription = null]). deadLetterReason and deadLetterErrorDescription are optional and can be omitted.

Hope you find this blog helpful. Thanks for reading !

Did you find this article valuable?

Support Azure Developer's Blog by becoming a sponsor. Any amount is appreciated!