c#.net core7.0实现从消息队列种读取消息,并且记录到Azure sql中简单示例
下面是一个简单的示例代码,演示如何使用C# .NET Core 7.0从消息队列中读取消息,并将其记录到Azure SQL中:
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Data.SqlClient;
public class Program
{
const string ServiceBusConnectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<your-policy-name>;SharedAccessKey=<your-policy-key>";
const string QueueName = "myqueue";
static IQueueClient queueClient;
static SqlConnection sqlConnection;
public static async Task Main(string[] args)
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
sqlConnection = new SqlConnection("<your-connection-string>");
// Register the queue message handler and receive messages in a loop
RegisterOnMessageHandlerAndReceiveMessages();
Console.ReadKey();
await queueClient.CloseAsync();
await sqlConnection.CloseAsync();
}
static void RegisterOnMessageHandlerAndReceiveMessages()
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
}
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
var messageBody = Encoding.UTF8.GetString(message.Body);
Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{messageBody}");
try
{
// Insert the message into the database
await sqlConnection.OpenAsync();
using (var command = new SqlCommand("INSERT INTO Messages (MessageBody) VALUES (@messageBody)", sqlConnection))
{
command.Parameters.AddWithValue("@messageBody", messageBody);
await command.ExecuteNonQueryAsync();
}
await sqlConnection.CloseAsync();
// Complete the message so that it is not received again.
// This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is the default).
await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
catch (Exception ex)
{
// Handle any exceptions that occur while processing the message
Console.WriteLine($"Error while processing message: {ex.Message}");
// Abandon the message so that it can be received again.
await queueClient.AbandonAsync(message.SystemProperties.LockToken);
}
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
此示例代码使用Azure Service Bus作为消息队列,使用Azure SQL作为持久化存储。它首先将消息队列的连接字符串和队列名称设置为常量,然后创建一个队列客户端和一个SQL连接。然后,它注册了一个消息处理程序,该处理程序使用RegisterOnMessageHandlerAndReceiveMessages()方法。此方法将消息处理程序选项设置为每次处理一个消息,并设置一个异常处理程序。消息处理程序将消息插入到Azure SQL数据库中,并使用CompleteAsync()方法标记消息已完成。如果出现任何异常,则消息将被放弃,并可以重新接收。
请注意,此示例仅用于演示目的。在生产代码中,您应该处理更多的异常情况,并使用更好的错误处理和日志记录实践。
原文地址: https://www.cveoy.top/t/topic/tFz 著作权归作者所有。请勿转载和采集!