1. 创建 .NET Core Web API 项目

首先,我们需要创建一个 .NET Core Web API 项目。在 Visual Studio 中选择“新建项目”,选择“ASP.NET Core Web 应用程序”模板,命名为“WebApiServiceBusDemo”。

  1. 安装 Microsoft.Azure.ServiceBus 包

在项目中右键点击“依赖项”,选择“管理 NuGet 包”,搜索“Microsoft.Azure.ServiceBus”,点击安装。

  1. 创建 Service Bus 实例

在 Azure 门户中创建一个 Service Bus 命名空间,然后创建一个名为“orders”的队列。

  1. 创建配置文件

在项目根目录下创建一个名为“appsettings.json”的文件,添加以下配置:

{
  "ServiceBusConnection": "<your-service-bus-connection-string>",
  "QueueName": "orders"
}

<your-service-bus-connection-string> 替换为你的 Service Bus 连接字符串。

  1. 创建 Service Bus 客户端

在项目根目录下创建一个名为“ServiceBusClient.cs”的文件,添加以下代码:

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using System.Text;
using System.Threading.Tasks;

namespace WebApiServiceBusDemo.Services
{
    public class ServiceBusClient : IServiceBusClient
    {
        private readonly string _serviceBusConnection;
        private readonly string _queueName;
        private readonly QueueClient _queueClient;

        public ServiceBusClient(IConfiguration configuration)
        {
            _serviceBusConnection = configuration["ServiceBusConnection"];
            _queueName = configuration["QueueName"];
            _queueClient = new QueueClient(_serviceBusConnection, _queueName);
        }

        public async Task SendMessageAsync(string message)
        {
            var messageBody = new Message(Encoding.UTF8.GetBytes(message));
            await _queueClient.SendAsync(messageBody);
        }

        public void Dispose()
        {
            _queueClient.CloseAsync().GetAwaiter().GetResult();
        }
    }

    public interface IServiceBusClient
    {
        Task SendMessageAsync(string message);
    }
}
  1. 创建服务

在项目根目录下创建一个名为“OrderService.cs”的文件,添加以下代码:

using System.Threading.Tasks;
using WebApiServiceBusDemo.Services;

namespace WebApiServiceBusDemo.Models
{
    public class OrderService : IOrderService
    {
        private readonly IServiceBusClient _serviceBusClient;

        public OrderService(IServiceBusClient serviceBusClient)
        {
            _serviceBusClient = serviceBusClient;
        }

        public async Task CreateOrderAsync(Order order)
        {
            // 将订单转化为 JSON 字符串,并发送到 Service Bus 队列
            var orderJson = Newtonsoft.Json.JsonConvert.SerializeObject(order);
            await _serviceBusClient.SendMessageAsync(orderJson);
        }
    }

    public interface IOrderService
    {
        Task CreateOrderAsync(Order order);
    }
}
  1. 创建 Order 控制器

在项目根目录下创建一个名为“OrderController.cs”的文件,添加以下代码:

using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
using WebApiServiceBusDemo.Models;

namespace WebApiServiceBusDemo.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class OrderController : ControllerBase
    {
        private readonly IOrderService _orderService;

        public OrderController(IOrderService orderService)
        {
            _orderService = orderService;
        }

        [HttpPost]
        public async Task<IActionResult> CreateOrderAsync([FromBody] Order order)
        {
            await _orderService.CreateOrderAsync(order);
            return Ok();
        }
    }
}
  1. 创建 Order 类

在项目根目录下创建一个名为“Order.cs”的文件,添加以下代码:

namespace WebApiServiceBusDemo.Models
{
    public class Order
    {
        public int Id { get; set; }
        public string ProductName { get; set; }
        public decimal Price { get; set; }
    }
}
  1. 创建 Service Bus 消费者

在项目根目录下创建一个名为“ServiceBusConsumer.cs”的文件,添加以下代码:

using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace WebApiServiceBusDemo.Services
{
    public class ServiceBusConsumer : IServiceBusConsumer
    {
        private readonly string _serviceBusConnection;
        private readonly string _queueName;
        private readonly IOrderRepository _orderRepository;
        private QueueClient _queueClient;
        private CancellationTokenSource _cancellationTokenSource;

        public ServiceBusConsumer(IConfiguration configuration, IOrderRepository orderRepository)
        {
            _serviceBusConnection = configuration["ServiceBusConnection"];
            _queueName = configuration["QueueName"];
            _orderRepository = orderRepository;
        }

        public void Start()
        {
            _queueClient = new QueueClient(_serviceBusConnection, _queueName);
            _cancellationTokenSource = new CancellationTokenSource();

            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            };

            _queueClient.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
        }

        public async Task StopAsync()
        {
            await _queueClient.CloseAsync();
            _cancellationTokenSource.Cancel();
        }

        private async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
        {
            var messageBody = Encoding.UTF8.GetString(message.Body);

            // 将收到的订单数据写入数据库
            var order = Newtonsoft.Json.JsonConvert.DeserializeObject<Order>(messageBody);
            await _orderRepository.CreateOrderAsync(order);

            await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
        }

        private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
        {
            Console.WriteLine(args.Exception);
            return Task.CompletedTask;
        }
    }

    public interface IServiceBusConsumer
    {
        void Start();
        Task StopAsync();
    }
}
  1. 创建 Order 数据库仓储

在项目根目录下创建一个名为“OrderRepository.cs”的文件,添加以下代码:

using System.Threading.Tasks;
using WebApiServiceBusDemo.Models;

namespace WebApiServiceBusDemo.Repositories
{
    public class OrderRepository : IOrderRepository
    {
        private readonly AppDbContext _dbContext;

        public OrderRepository(AppDbContext dbContext)
        {
            _dbContext = dbContext;
        }

        public async Task CreateOrderAsync(Order order)
        {
            await _dbContext.Orders.AddAsync(order);
            await _dbContext.SaveChangesAsync();
        }
    }

    public interface IOrderRepository
    {
        Task CreateOrderAsync(Order order);
    }
}
  1. 创建 AppDbContext

在项目根目录下创建一个名为“AppDbContext.cs”的文件,添加以下代码:

using Microsoft.EntityFrameworkCore;

namespace WebApiServiceBusDemo.Models
{
    public class AppDbContext : DbContext
    {
        public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
        {
        }

        public DbSet<Order> Orders { get; set; }
    }
}
  1. 注册服务

在 Startup.cs 文件中添加以下代码:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using WebApiServiceBusDemo.Models;
using WebApiServiceBusDemo.Repositories;
using WebApiServiceBusDemo.Services;

namespace WebApiServiceBusDemo
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            services.AddDbContext<AppDbContext>(options =>
            {
                options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            });

            services.AddScoped<IOrderRepository, OrderRepository>();

            services.AddSingleton<IServiceBusClient, ServiceBusClient>();
            services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();

            services.AddScoped<IOrderService, OrderService>();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceBusConsumer serviceBusConsumer)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            serviceBusConsumer.Start();
        }
    }
}
  1. 运行程序

现在我们可以运行程序了。在 Visual Studio 中按 F5 启动程序,然后使用 Postman 或其他工具向 https://localhost:5001/order 发送 POST 请求,请求体为以下 JSON:

{
    "productName": "Apple",
    "price": 3.5
}

然后我们可以通过查询数据库来验证订单是否成功写入。

  1. 停止服务

为了停止 Service Bus 消费者,我们需要在程序结束时调用 StopAsync 方法。在 Startup.cs 文件中添加以下代码:

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using WebApiServiceBusDemo.Models;
using WebApiServiceBusDemo.Repositories;
using WebApiServiceBusDemo.Services;

namespace WebApiServiceBusDemo
{
    public class Startup
    {
        private IServiceBusConsumer _serviceBusConsumer;

        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            services.AddDbContext<AppDbContext>(options =>
            {
                options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            });

            services.AddScoped<IOrderRepository, OrderRepository>();

            services.AddSingleton<IServiceBusClient, ServiceBusClient>();
            services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();

            services.AddScoped<IOrderService, OrderService>();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceBusConsumer serviceBusConsumer)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            _serviceBusConsumer = serviceBusConsumer;
            _serviceBusConsumer.Start();

            appLifetime.ApplicationStopping.Register(OnShutdown);
        }

        private void OnShutdown()
        {
            _serviceBusConsumer.StopAsync().GetAwaiter().GetResult();
        }
    }
}

现在当程序结束时,会自动调用 OnShutdown 方法来停止 Service Bus 消费者。

.net core 实现web api 将数据传入service bus,再读取service bus 数据写入数据库

原文地址: http://www.cveoy.top/t/topic/tRX 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录