.net core7.0实现从service bus读取消息,并且记录到Azure sql中演示
对于这个问题,我们需要使用以下步骤:
-
创建一个.net core7.0的控制台应用程序
-
添加Azure Service Bus NuGet包,以便我们可以从服务总线中读取消息
-
添加Azure SQL数据库NuGet包,以便我们可以将消息记录到Azure SQL中
-
配置服务总线连接字符串和Azure SQL连接字符串
-
编写代码从服务总线中读取消息,并将其记录到Azure SQL中
下面是具体步骤:
- 创建控制台应用程序
打开Visual Studio,选择“新建项目”,然后选择“控制台应用程序”模板。将项目命名为“ServiceBusToSqlDemo”。
- 添加Azure Service Bus NuGet包
在Visual Studio中,右键单击项目,选择“管理NuGet包”。在NuGet包管理器中搜索Microsoft.Azure.ServiceBus,然后安装它。
- 添加Azure SQL数据库NuGet包
在NuGet包管理器中搜索Microsoft.Data.SqlClient,然后安装它。
- 配置连接字符串
在appsettings.json文件中添加以下配置:
{
"ConnectionStrings": {
"ServiceBus": "<your-service-bus-connection-string>",
"AzureSql": "<your-azure-sql-connection-string>"
}
}
将
- 编写代码
在Program.cs文件中添加以下代码:
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Data.SqlClient;
namespace ServiceBusToSqlDemo
{
class Program
{
static async Task Main(string[] args)
{
var builder = new HostBuilder()
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.AddJsonFile("appsettings.json", optional: true);
})
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<IQueueClient>(x =>
{
var connectionString = hostContext.Configuration.GetConnectionString("ServiceBus");
return new QueueClient(connectionString, "myqueue");
});
services.AddSingleton<SqlConnection>(x =>
{
var connectionString = hostContext.Configuration.GetConnectionString("AzureSql");
return new SqlConnection(connectionString);
});
services.AddSingleton<MessageHandler>();
services.AddHostedService<Worker>();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"));
logging.AddConsole();
});
await builder.RunConsoleAsync();
}
}
public class Worker : BackgroundService
{
private readonly IQueueClient _queueClient;
private readonly MessageHandler _messageHandler;
private readonly ILogger<Worker> _logger;
public Worker(IQueueClient queueClient, MessageHandler messageHandler, ILogger<Worker> logger)
{
_queueClient = queueClient;
_messageHandler = messageHandler;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting worker...");
stoppingToken.Register(() =>
{
_logger.LogInformation("Stopping worker...");
_queueClient.CloseAsync().GetAwaiter().GetResult();
});
_queueClient.RegisterMessageHandler(_messageHandler.HandleMessageAsync, new MessageHandlerOptions(_messageHandler.ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
await Task.Delay(Timeout.Infinite, stoppingToken);
}
}
public class MessageHandler
{
private readonly SqlConnection _sqlConnection;
private readonly ILogger<MessageHandler> _logger;
public MessageHandler(SqlConnection sqlConnection, ILogger<MessageHandler> logger)
{
_sqlConnection = sqlConnection;
_logger = logger;
}
public async Task HandleMessageAsync(Message message, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
try
{
var sql = "INSERT INTO Messages (MessageBody) VALUES (@messageBody)";
var cmd = new SqlCommand(sql, _sqlConnection);
cmd.Parameters.AddWithValue("@messageBody", Encoding.UTF8.GetString(message.Body));
await _sqlConnection.OpenAsync(cancellationToken);
await cmd.ExecuteNonQueryAsync(cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error storing message in database");
}
finally
{
await _sqlConnection.CloseAsync();
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
}
public Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler error");
return Task.CompletedTask;
}
}
}
这个例子中,我们将从服务总线中读取消息,并将它们记录到Azure SQL数据库中。在Worker类中,我们使用IQueueClient来连接到服务总线并注册MessageHandler来处理消息。MessageHandler类中,我们使用SqlConnection来连接到Azure SQL数据库,并将消息插入到Messages表中。
注意,我们在finally块中关闭SqlConnection并调用IQueueClient.CompleteAsync来释放消息锁定。
- 运行应用程序
现在我们可以运行应用程序并开始将消息记录到Azure SQL数据库中。在Visual Studio中按F5键启动应用程序,然后将一些消息发送到“myqueue”队列。你应该能够在Azure SQL数据库中看到插入的消息。
原文地址: https://www.cveoy.top/t/topic/tHZ 著作权归作者所有。请勿转载和采集!