1. 创建一个 .NET Core 7.0 项目,并添加以下 NuGet 包:
  • Microsoft.Azure.ServiceBus
  • Microsoft.Extensions.Configuration
  • Microsoft.Extensions.Configuration.Json
  • Microsoft.Extensions.DependencyInjection
  • Microsoft.Extensions.Logging
  1. 在 appsettings.json 文件中添加以下配置:
{
  "ServiceBus": {
    "ConnectionString": "Endpoint=sb://your-service-bus-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-shared-access-key",
    "QueueName": "your-queue-name"
  },
  "AzureSql": {
    "ConnectionString": "Server=tcp:your-sql-server.database.windows.net,1433;Initial Catalog=your-database-name;Persist Security Info=False;User ID=your-user-name;Password=your-password;MultipleActiveResultSets=False;Encrypt=True;TrustServerCertificate=False;Connection Timeout=30;"
  }
}

该配置文件中包含了 Service Bus 和 Azure SQL 数据库的连接字符串和队列名称。

  1. 创建一个名为 ServiceBusReceiver 的类,并实现 IHostedService 接口。在 ServiceBusReceiver 类的构造函数中注入 IQueueClientILogger<ServiceBusReceiver>
public class ServiceBusReceiver : IHostedService
{
    private readonly IQueueClient _queueClient;
    private readonly ILogger<ServiceBusReceiver> _logger;

    public ServiceBusReceiver(IQueueClient queueClient, ILogger<ServiceBusReceiver> logger)
    {
        _queueClient = queueClient;
        _logger = logger;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };

        _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return _queueClient.CloseAsync();
    }

    private async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
    {
        var messageBody = Encoding.UTF8.GetString(message.Body);
        _logger.LogInformation($"Received message: {messageBody}");

        // TODO: 将消息记录到 Azure SQL 数据库中

        await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
    }

    private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");

        return Task.CompletedTask;
    }
}

ProcessMessagesAsync 方法中,我们可以将从 Service Bus 接收到的消息记录到 Azure SQL 数据库中,这里暂时用 TODO 占位。

  1. Program.cs 中添加以下代码:
public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .ConfigureServices((hostContext, services) =>
        {
            var serviceBusConnectionString = hostContext.Configuration.GetSection("ServiceBus:ConnectionString").Value;
            var queueName = hostContext.Configuration.GetSection("ServiceBus:QueueName").Value;
            var azureSqlConnectionString = hostContext.Configuration.GetSection("AzureSql:ConnectionString").Value;

            services.AddSingleton(new ServiceBusConnectionStringBuilder(serviceBusConnectionString));
            services.AddSingleton(new QueueClient(serviceBusConnectionString, queueName));
            services.AddDbContext<MyDbContext>(options => options.UseSqlServer(azureSqlConnectionString));
            services.AddHostedService<ServiceBusReceiver>();
        })
        .ConfigureLogging((hostingContext, logging) =>
        {
            logging.AddConsole();
        });

在这里,我们注入了 ServiceBusConnectionStringBuilderQueueClientMyDbContext,同时将 ServiceBusReceiver 注册为 hosted service。

  1. MyDbContext 中创建一个名为 MessagesDbSet
public class MyDbContext : DbContext
{
    public MyDbContext(DbContextOptions<MyDbContext> options) : base(options)
    {
    }

    public DbSet<Message> Messages { get; set; }
}
  1. ServiceBusReceiverProcessMessagesAsync 方法中,使用 MyDbContext 将消息记录到数据库中:
private async Task ProcessMessagesAsync(Message message, CancellationToken cancellationToken)
{
    var messageBody = Encoding.UTF8.GetString(message.Body);
    _logger.LogInformation($"Received message: {messageBody}");

    using (var dbContext = new MyDbContext(_dbContextOptions))
    {
        dbContext.Messages.Add(new Message { Text = messageBody });
        await dbContext.SaveChangesAsync();
    }

    await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}

这样,当从 Service Bus 接收到消息时,就会将消息记录到 Azure SQL 数据库中。

.net core7.0实现从service bus读取消息,并且记录到Azure sql中执行步骤

原文地址: https://www.cveoy.top/t/topic/tH7 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录