Service bus triggered Azure functions in Peek-Lock mode
Read messages in Peek-Lock mode and mark complete/dead-lettered based on processing outcome.
Service bus triggered Azure functions runs on arrival of new messages in queue/topic-subscription. By default, it reads messages in Receive-and-Delete mode which means as soon as message is fetched by function trigger, it is marked as completed and removed from queue.
But, it could be necessary to wait until processing is completed in order to decide if message to be marked as complete (success) or dead-lettered (failure).
To achieve this, we need to read messages in Peek-Lock mode and it can be done in .NET Azure functions as follows.
When we create a service bus triggered function, default template looks like something below-
public class ServiceBusTopicTrigger1
{
[FunctionName("ServiceBusTopicTrigger1")]
public void Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Conn_String")]string mySbMsg, ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
}
}
This code receives messages in Receive-and-Delete mode as default.
Below is modified function trigger in order to enable Peek-Lock mode.
public class ServiceBusTopicTrigger1
{
[FunctionName("ServiceBusTopicTrigger1")]
public static async Task Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Conn_String", AutoCompleteMessages = false)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, ILogger log)
{
log.LogInformation($"C# ServiceBus topic trigger function processed message: {mySbMsg}");
}
}
Changes made are as follows-
Set ServiceBusTrigger Attribute
AutoCompleteMessages = false
. This will make trigger read messages in peek-lock mode and does not delete messages from active queue immediately.Receive message as object of class
ServiceBusReceivedMessage
and not asstring
which is default when creating service bus triggered functions. This object is required to mark messages complete/dead-lettered.Inject another parameter to Run method
ServiceBusMessageActions messageActions
. This class provides methods to mark messages complete/dead-lettered.
Use below sample azure function to get the full context of Azure function along with implementation of marking messages as complete/dead-lettered.
using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.ServiceBus;
using Microsoft.Extensions.Logging;
namespace FunctionApp
{
public static class ServiceBusTopicTrigger1
{
[FunctionName("ServiceBusTopicTrigger1")]
public static async Task Run([ServiceBusTrigger("mytopic", "mysubscription", Connection = "SB_Connection_String", AutoCompleteMessages = false)] ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions, ILogger log)
{
var correlationId = message.CorrelationId;
log.LogInformation("Function ServiceBusTopicTrigger1 started processing message. CorrelationId: {CorrelationId}", correlationId);
try
{
// calling another HTTP Azure function 'ProcessMessage' to process the message.
//based on HTTP response, message is marked complete/dead-lettered
var httpRequest = new DefaultHttpContext().Request;
httpRequest.Body = new MemoryStream(Encoding.UTF8.GetBytes(message.Body.ToString()));
httpRequest.Headers.Add("CorrelationId", correlationId);
HttpResponseMessage response= await ProcessMessage.Run(httpRequest, log);
if(response.StatusCode==System.Net.HttpStatusCode.OK)
{
await messageActions.CompleteMessageAsync(message);
log.LogInformation("Message processed successfully. CorrelationId: {CorrelationId}", correlationId);
}
else if(response.StatusCode==System.Net.HttpStatusCode.InternalServerError)
{
await messageActions.DeadLetterMessageAsync(message, "InternalServerError", "Message dead-lettered due to unexpected transient failure");
log.LogInformation("Message processing failed. CorrelationId: {CorrelationId}", correlationId);
}
else
{
await messageActions.DeadLetterMessageAsync(message);
log.LogInformation("Message processing failed. CorrelationId: {CorrelationId}", correlationId);
}
}
catch (Exception ex)
{
await messageActions.DeadLetterMessageAsync(message, "TransientFailure", ex.Message);
log.LogError(ex, "Exception occurred while processing message. CorrelationId: {CorrelationId}", correlationId);
}
}
}
}
Note- Parameters supplied to method DeadLetterMessageAsync are -ServiceBusMessageActions.DeadLetterMessageAsync(ServiceBusReceivedMessage message, string deadLetterReason, [string deadLetterErrorDescription = null])
. deadLetterReason
and deadLetterErrorDescription
are optional and can be omitted.
Hope you find this blog helpful. Thanks for reading !