下面是一个简单的示例代码,演示如何使用C# .NET Core 7.0从消息队列中读取消息,并将其记录到Azure SQL中:

using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Data.SqlClient;

public class Program
{
    const string ServiceBusConnectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-policy-name>;SharedAccessKey=<your-policy-key>";
    const string QueueName = "myqueue";
    static IQueueClient queueClient;
    static SqlConnection sqlConnection;

    public static async Task Main(string[] args)
    {
        queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
        sqlConnection = new SqlConnection("<your-connection-string>");

        // Register the queue message handler and receive messages in a loop
        RegisterOnMessageHandlerAndReceiveMessages();

        Console.ReadKey();

        await queueClient.CloseAsync();
        await sqlConnection.CloseAsync();
    }

    static void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };

        queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }

    static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        var messageBody = Encoding.UTF8.GetString(message.Body);
        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{messageBody}");

        try
        {
            // Insert the message into the database
            await sqlConnection.OpenAsync();

            using (var command = new SqlCommand("INSERT INTO Messages (MessageBody) VALUES (@messageBody)", sqlConnection))
            {
                command.Parameters.AddWithValue("@messageBody", messageBody);
                await command.ExecuteNonQueryAsync();
            }

            await sqlConnection.CloseAsync();

            // Complete the message so that it is not received again.
            // This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is the default).
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }
        catch (Exception ex)
        {
            // Handle any exceptions that occur while processing the message
            Console.WriteLine($"Error while processing message: {ex.Message}");

            // Abandon the message so that it can be received again.
            await queueClient.AbandonAsync(message.SystemProperties.LockToken);
        }
    }

    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine("Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }
}

此示例代码使用Azure Service Bus作为消息队列,使用Azure SQL作为持久化存储。它首先将消息队列的连接字符串和队列名称设置为常量,然后创建一个队列客户端和一个SQL连接。然后,它注册了一个消息处理程序,该处理程序使用RegisterOnMessageHandlerAndReceiveMessages()方法。此方法将消息处理程序选项设置为每次处理一个消息,并设置一个异常处理程序。消息处理程序将消息插入到Azure SQL数据库中,并使用CompleteAsync()方法标记消息已完成。如果出现任何异常,则消息将被放弃,并可以重新接收。

请注意,此示例仅用于演示目的。在生产代码中,您应该处理更多的异常情况,并使用更好的错误处理和日志记录实践。

c#.net core7.0实现从消息队列种读取消息,并且记录到Azure sql中简单示例

原文地址: https://www.cveoy.top/t/topic/tFz 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录