.net core7.0实现从service bus读取消息,并且记录到Azure sql中简单例子
以下是一个简单的例子,演示如何使用.NET Core 7.0从Azure Service Bus读取消息,并将其记录到Azure SQL数据库中。
1.创建一个.NET Core 7.0控制台应用程序。
2.在“程序包管理器控制台”中,运行以下命令,以安装所需的NuGet包:
Install-Package Microsoft.Azure.ServiceBus
Install-Package Microsoft.EntityFrameworkCore.SqlServer
Install-Package Microsoft.EntityFrameworkCore.Design
Install-Package Microsoft.Extensions.Configuration
Install-Package Microsoft.Extensions.Configuration.Json
Install-Package Microsoft.Extensions.DependencyInjection
Install-Package Microsoft.Extensions.Logging
3.创建一个名为“appsettings.json”的配置文件,并添加以下内容:
{
"ConnectionStrings": {
"DefaultConnection": "Server=(localdb)\mssqllocaldb;Database=MessageLog;Trusted_Connection=True;MultipleActiveResultSets=true"
},
"ServiceBus": {
"ConnectionString": "Endpoint=sb://
替换
4.创建一个名为“Message.cs”的新类,以表示要记录的消息:
public class Message { public int Id { get; set; } public string Body { get; set; } public DateTime DateReceived { get; set; } }
5.创建一个名为“MessageDbContext.cs”的新类,以表示数据上下文:
public class MessageDbContext : DbContext
{
public DbSet
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
}
}
6.创建一个名为“Program.cs”的新类,并添加以下代码:
using Microsoft.Azure.ServiceBus; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Text; using System.Threading; using System.Threading.Tasks;
namespace ServiceBusToSql
{
class Program
{
private static IQueueClient _queueClient;
private static ILogger
static async Task Main(string[] args)
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", true, true)
.Build();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
})
.AddSingleton(configuration)
.AddDbContext<MessageDbContext>()
.BuildServiceProvider();
_logger = serviceProvider.GetService<ILogger<Program>>();
_logger.LogInformation("Starting message processing...");
_queueClient = new QueueClient(configuration.GetConnectionString("ServiceBus"), "myqueue");
_queueClient.RegisterMessageHandler(ProcessMessageAsync, new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
});
Console.ReadLine();
await _queueClient.CloseAsync();
_logger.LogInformation("Message processing stopped.");
}
private static async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
{
_logger.LogInformation($"Received message: {message.MessageId}");
var dbContext = _queueClient.ServiceProvider.GetService<MessageDbContext>();
dbContext.Messages.Add(new Message
{
Body = Encoding.UTF8.GetString(message.Body),
DateReceived = DateTime.UtcNow
});
await dbContext.SaveChangesAsync();
_logger.LogInformation($"Message saved to database: {message.MessageId}");
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError(exceptionReceivedEventArgs.Exception, $"Message handler encountered an exception.");
return Task.CompletedTask;
}
}
}
以上代码将:
a.从服务总线接收消息,并将其传递给ProcessMessageAsync方法进行处理。
b.在ProcessMessageAsync方法中,将消息记录到Azure SQL数据库。
c.使用CompleteAsync方法确认已处理消息,以便服务总线不会再次发送它。
7.运行应用程序,并确保已将消息发送到服务总线队列。
8.使用Azure SQL Server管理工具,检查已记录的消息。
9.完成!
请注意,此代码仅用于演示目的。在生产环境中,您需要考虑错误处理、重试策略、日志记录和其他安全性方面的问题。
原文地址: https://www.cveoy.top/t/topic/tIm 著作权归作者所有。请勿转载和采集!