Servicebus”ing” like a champ

Over the years I have built a few integrations writing/reading from a servicebus queue/topic. Here I will share the different ways I have done it over the years

The classic way in most cases

If you wounder where the connection string is, use AzureWebJobsServiceBus__fullyQualifiedNamespace in your settings and you wont need it
using %% will indicate that the key:value is found in your settings

//A servicebus trigger (on new messages)
public async Task Run(
    [ServiceBusTrigger("%sbTopic%", "%sbSubscription%")] ServiceBusReceivedMessage message
    )
{
}

//A timer trigger that will write to the servicebus
public async Task Run(
    [TimerTrigger("%TimerSchedule%")] TimerInfo myTimer,
    [ServiceBus("%sbQueue%")] ServiceBusSender queue
    )
{
}

//A valid alternativ for writing to the servicebus is to use the function attribute
[FunctionName("Function1")]
[return: ServiceBus("%queue%")]
public async Task<ServiceBusMessage> Run(
    [TimerTrigger("%TimerSchedule%")] TimerInfo myTimer
    )
{
}

For a couple of years i used a custom client “sbClient.cs” instead of the binding when writing to the servicebus

public class SBClient
{
    private readonly AppSettings _appSettings;
    private readonly ServiceBusClient _client;

    public SBClient(IOptions<AppSettings> appSettings, IOptions<AzureWebJobsServiceBusSettings> azureWebJobsServiceBusSettings)
    {
        _appSettings = appSettings.Value;
        var credential = new DefaultAzureCredential();
        _client = new ServiceBusClient(azureWebJobsServiceBusSettings.Value.FullyQualifiedNamespace, credential);
    }


    public async Task SendMessageToSB(MyObject myObject)
    {
        try
        {
            ServiceBusSender topic = _client.CreateSender(_appSettings.SbTopic);

            var body = JsonConvert.SerializeObject(myObject, Formatting.Indented);

            var msg = new ServiceBusMessage(Encoding.UTF8.GetBytes(body)) //create a servicebus message to be able to set metadata
            {
                ContentType = "application/json",
                MessageId = "myMessageType"    //tag the message with sending system aka identifier of any kind 
                                                
            };

            await topic.SendMessageAsync(msg);
        }
        catch (Exception ex)
        {
            throw new Exception("Error writing to servicebus topic", ex);
        }
    }
}

In my handler.cs

await _serviceBusClient.SendMessageToSB(result); 

In august 2023 I discovered that Microsoft had added the possibility to dependency inject the servicebus (this also applies to storage account)
https://learn.microsoft.com/en-us/dotnet/azure/sdk/dependency-injection?tabs=web-app-builder

This ended up with a new solution that I use now. It is all setup in the program.cs and the extension HostBuilderExtension.cs

public static IServiceCollection RegisterAzureClients(this IServiceCollection services)
{
    services.AddAzureClients(clientBuilder =>
    {
        clientBuilder.UseCredential(CredentialUtil.DefaultAzureCredential);


        //DI: service bus
        clientBuilder.AddServiceBusClientWithNamespace(
            Environment.GetEnvironmentVariable("AzureWebJobsServiceBus__fullyQualifiedNamespace"));

        var topic = Environment.GetEnvironmentVariable("ServiceBusTopic");
        clientBuilder.AddClient<ServiceBusSender, ServiceBusClientOptions>((_, _, provider) =>
            provider
                .GetService<ServiceBusClient>()
                .CreateSender(topic)
            )
            .WithName("send-queue");
    });

    return services;
}

In the handler.cs (using a employee handler as an example here)
Two lines (8 and 24) of code is needed

public class EmployeeHandler
{
    private readonly ServiceBusSender _servicebusSender;

    public EmployeeHandler(ILogger<EmployeeHandler> logger, IAzureClientFactory<ServiceBusSender> serviceBusSenderFactory)
    {
        _logger = logger;
        _servicebusSender = serviceBusSenderFactory.CreateClient("send-queue");
    }
    public async Task HandleMessage(EmployeeCollection employeeCollection, string modifiedSince)
    {
        foreach (var employee in employeeCollection.employees)
        {
            using (_logger.BeginScope("{EmployeeId}", employee.employeeId))
            {
                var msg = new ServiceBusMessage(JsonSerializer.Serialize(employee))
                {
                    MessageId = employee.guid.ToString(),
                    ApplicationProperties =
                     {
                         { "modifiedSince", modifiedSince },
                     }
                };
                await _servicebusSender.SendMessageAsync(msg);

                _logger.LogInformation("outputMessage\r\n{prop__outputMessage}", JsonSerializer.Serialize(employee));
                _logger.LogInformation("MessageId\r\n{prop__MessageId}", msg.MessageId);
            }
        }
    }
}

The last part then, APIManagement and an API passing the body and any custom headers to the servicebus.
This can come in handy when you have to take control of the message where the sender does not take any responsibility for your response.

To ensure that the message can be resent we can utilize a few lines of code in the API policy and set the backend to the servicebus namespace.

<policies>
    <inbound>
        <base />
        <!--Azure Service Bus-->
        <!-- Set the backend URL to Service Bus Namespace-->
        <set-backend-service base-url="https://sb-int0000-4mxknr4zdsvlc.servicebus.windows.net" />
        <rewrite-uri template="sbt-api-mx/messages" />
        <set-header name="BrokerProperties" exists-action="override">
            <value>@{
                    var json = new JObject();
                    json.Add("MessageId", "demovalue");
                    json.Add("Label", "demodata");
                    return json.ToString(Newtonsoft.Json.Formatting.None);
                }</value>
        </set-header>
        <!-- Custom Headers are sent as 'Custom Properties' on service bus messages 
        In this case the X-Original-URL value will be set to a new custom property named originalUrl-->
        <set-header name="originUrl" exists-action="override">
            <value>@(context.Request.Headers.GetValueOrDefault("X-Original-URL"))</value>
        </set-header>
        <authentication-managed-identity resource="https://servicebus.azure.net/" />
    </inbound>
</policies>

your backend is the servicebus namespce
Note the /message suffix on you topic/queue name
BrokerProperties, override these to set a custom values to messageid and label

Authentication is managed using RBAC
APIManagement has the role Azure Service Bus Data Sender on the servicebus topic.

Example from postman

testkey and x-original-url will end up as custom properties on the servicebus message

the body will automatically be added to the servicebus message

The message will look like this
The combination of the values we set in the policy and as headers in postman is reflected here on the servicebus message

The BrokerProperties a.k.a message properties can be used as filters (note! they have to be prefixed with sys)
and this can also be done using the custom properties.

A servicebus filter is written as a sql expression like this

Note the use of prefix sys. to access the message properties.
The other rule (originUri_eq_yourcalleruri) looks like this originUrl = ‘/employees/events/employee’, no prefix is needed!

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top