.net core7.0实现从service bus读取消息,并且记录到Azure sql中执行步骤
- 创建一个 .NET Core 7.0 项目,并添加以下 NuGet 包:
- Microsoft.Azure.ServiceBus
- Microsoft.Extensions.Configuration
- Microsoft.Extensions.Configuration.Json
- Microsoft.Extensions.DependencyInjection
- Microsoft.Extensions.Logging
- 在 appsettings.json 文件中添加以下配置:
{
"ServiceBus": {
"ConnectionString": "Endpoint=sb://your-service-bus-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-shared-access-key",
"QueueName": "your-queue-name"
},
"AzureSql": {
"ConnectionString": "Server=tcp:your-sql-server.database.windows.net,1433;Initial Catalog=your-database-name;Persist Security Info=False;User ID=your-user-name;Password=your-password;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"
}
}
该配置文件中包含了 Service Bus 和 Azure SQL 数据库的连接字符串和队列名称。
- 创建一个名为
ServiceBusReceiver的类,并实现IHostedService接口。在ServiceBusReceiver类的构造函数中注入IQueueClient和ILogger<ServiceBusReceiver>。
public class ServiceBusReceiver : IHostedService
{
private readonly IQueueClient _queueClient;
private readonly ILogger<ServiceBusReceiver> _logger;
public ServiceBusReceiver(IQueueClient queueClient, ILogger<ServiceBusReceiver> logger)
{
_queueClient = queueClient;
_logger = logger;
}
public Task StartAsync(CancellationToken cancellationToken)
{
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _queueClient.CloseAsync();
}
private async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
{
var messageBody = Encoding.UTF8.GetString(message.Body);
_logger.LogInformation($"Received message: {messageBody}");
// TODO: 将消息记录到 Azure SQL 数据库中
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
return Task.CompletedTask;
}
}
在 ProcessMessagesAsync 方法中,我们可以将从 Service Bus 接收到的消息记录到 Azure SQL 数据库中,这里暂时用 TODO 占位。
- 在
Program.cs中添加以下代码:
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
var serviceBusConnectionString = hostContext.Configuration.GetSection("ServiceBus:ConnectionString").Value;
var queueName = hostContext.Configuration.GetSection("ServiceBus:QueueName").Value;
var azureSqlConnectionString = hostContext.Configuration.GetSection("AzureSql:ConnectionString").Value;
services.AddSingleton(new ServiceBusConnectionStringBuilder(serviceBusConnectionString));
services.AddSingleton(new QueueClient(serviceBusConnectionString, queueName));
services.AddDbContext<MyDbContext>(options => options.UseSqlServer(azureSqlConnectionString));
services.AddHostedService<ServiceBusReceiver>();
})
.ConfigureLogging((hostingContext, logging) =>
{
logging.AddConsole();
});
在这里,我们注入了 ServiceBusConnectionStringBuilder、QueueClient 和 MyDbContext,同时将 ServiceBusReceiver 注册为 hosted service。
- 在
MyDbContext中创建一个名为Messages的DbSet:
public class MyDbContext : DbContext
{
public MyDbContext(DbContextOptions<MyDbContext> options) : base(options)
{
}
public DbSet<Message> Messages { get; set; }
}
- 在
ServiceBusReceiver的ProcessMessagesAsync方法中,使用MyDbContext将消息记录到数据库中:
private async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
{
var messageBody = Encoding.UTF8.GetString(message.Body);
_logger.LogInformation($"Received message: {messageBody}");
using (var dbContext = new MyDbContext(_dbContextOptions))
{
dbContext.Messages.Add(new Message { Text = messageBody });
await dbContext.SaveChangesAsync();
}
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
这样,当从 Service Bus 接收到消息时,就会将消息记录到 Azure SQL 数据库中。
原文地址: https://www.cveoy.top/t/topic/tH7 著作权归作者所有。请勿转载和采集!