.net core 实现web api 将数据传入service bus,再读取service bus 数据写入数据库
- 创建 .NET Core Web API 项目
首先,我们需要创建一个 .NET Core Web API 项目。在 Visual Studio 中选择“新建项目”,选择“ASP.NET Core Web 应用程序”模板,命名为“WebApiServiceBusDemo”。
- 安装 Microsoft.Azure.ServiceBus 包
在项目中右键点击“依赖项”,选择“管理 NuGet 包”,搜索“Microsoft.Azure.ServiceBus”,点击安装。
- 创建 Service Bus 实例
在 Azure 门户中创建一个 Service Bus 命名空间,然后创建一个名为“orders”的队列。
- 创建配置文件
在项目根目录下创建一个名为“appsettings.json”的文件,添加以下配置:
{
"ServiceBusConnection": "<your-service-bus-connection-string>",
"QueueName": "orders"
}
将 <your-service-bus-connection-string> 替换为你的 Service Bus 连接字符串。
- 创建 Service Bus 客户端
在项目根目录下创建一个名为“ServiceBusClient.cs”的文件,添加以下代码:
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using System.Text;
using System.Threading.Tasks;
namespace WebApiServiceBusDemo.Services
{
public class ServiceBusClient : IServiceBusClient
{
private readonly string _serviceBusConnection;
private readonly string _queueName;
private readonly QueueClient _queueClient;
public ServiceBusClient(IConfiguration configuration)
{
_serviceBusConnection = configuration["ServiceBusConnection"];
_queueName = configuration["QueueName"];
_queueClient = new QueueClient(_serviceBusConnection, _queueName);
}
public async Task SendMessageAsync(string message)
{
var messageBody = new Message(Encoding.UTF8.GetBytes(message));
await _queueClient.SendAsync(messageBody);
}
public void Dispose()
{
_queueClient.CloseAsync().GetAwaiter().GetResult();
}
}
public interface IServiceBusClient
{
Task SendMessageAsync(string message);
}
}
- 创建服务
在项目根目录下创建一个名为“OrderService.cs”的文件,添加以下代码:
using System.Threading.Tasks;
using WebApiServiceBusDemo.Services;
namespace WebApiServiceBusDemo.Models
{
public class OrderService : IOrderService
{
private readonly IServiceBusClient _serviceBusClient;
public OrderService(IServiceBusClient serviceBusClient)
{
_serviceBusClient = serviceBusClient;
}
public async Task CreateOrderAsync(Order order)
{
// 将订单转化为 JSON 字符串,并发送到 Service Bus 队列
var orderJson = Newtonsoft.Json.JsonConvert.SerializeObject(order);
await _serviceBusClient.SendMessageAsync(orderJson);
}
}
public interface IOrderService
{
Task CreateOrderAsync(Order order);
}
}
- 创建 Order 控制器
在项目根目录下创建一个名为“OrderController.cs”的文件,添加以下代码:
using Microsoft.AspNetCore.Mvc;
using System.Threading.Tasks;
using WebApiServiceBusDemo.Models;
namespace WebApiServiceBusDemo.Controllers
{
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IOrderService _orderService;
public OrderController(IOrderService orderService)
{
_orderService = orderService;
}
[HttpPost]
public async Task<IActionResult> CreateOrderAsync([FromBody] Order order)
{
await _orderService.CreateOrderAsync(order);
return Ok();
}
}
}
- 创建 Order 类
在项目根目录下创建一个名为“Order.cs”的文件,添加以下代码:
namespace WebApiServiceBusDemo.Models
{
public class Order
{
public int Id { get; set; }
public string ProductName { get; set; }
public decimal Price { get; set; }
}
}
- 创建 Service Bus 消费者
在项目根目录下创建一个名为“ServiceBusConsumer.cs”的文件,添加以下代码:
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace WebApiServiceBusDemo.Services
{
public class ServiceBusConsumer : IServiceBusConsumer
{
private readonly string _serviceBusConnection;
private readonly string _queueName;
private readonly IOrderRepository _orderRepository;
private QueueClient _queueClient;
private CancellationTokenSource _cancellationTokenSource;
public ServiceBusConsumer(IConfiguration configuration, IOrderRepository orderRepository)
{
_serviceBusConnection = configuration["ServiceBusConnection"];
_queueName = configuration["QueueName"];
_orderRepository = orderRepository;
}
public void Start()
{
_queueClient = new QueueClient(_serviceBusConnection, _queueName);
_cancellationTokenSource = new CancellationTokenSource();
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
_queueClient.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
}
public async Task StopAsync()
{
await _queueClient.CloseAsync();
_cancellationTokenSource.Cancel();
}
private async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
{
var messageBody = Encoding.UTF8.GetString(message.Body);
// 将收到的订单数据写入数据库
var order = Newtonsoft.Json.JsonConvert.DeserializeObject<Order>(messageBody);
await _orderRepository.CreateOrderAsync(order);
await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}
private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs args)
{
Console.WriteLine(args.Exception);
return Task.CompletedTask;
}
}
public interface IServiceBusConsumer
{
void Start();
Task StopAsync();
}
}
- 创建 Order 数据库仓储
在项目根目录下创建一个名为“OrderRepository.cs”的文件,添加以下代码:
using System.Threading.Tasks;
using WebApiServiceBusDemo.Models;
namespace WebApiServiceBusDemo.Repositories
{
public class OrderRepository : IOrderRepository
{
private readonly AppDbContext _dbContext;
public OrderRepository(AppDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task CreateOrderAsync(Order order)
{
await _dbContext.Orders.AddAsync(order);
await _dbContext.SaveChangesAsync();
}
}
public interface IOrderRepository
{
Task CreateOrderAsync(Order order);
}
}
- 创建 AppDbContext
在项目根目录下创建一个名为“AppDbContext.cs”的文件,添加以下代码:
using Microsoft.EntityFrameworkCore;
namespace WebApiServiceBusDemo.Models
{
public class AppDbContext : DbContext
{
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options)
{
}
public DbSet<Order> Orders { get; set; }
}
}
- 注册服务
在 Startup.cs 文件中添加以下代码:
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using WebApiServiceBusDemo.Models;
using WebApiServiceBusDemo.Repositories;
using WebApiServiceBusDemo.Services;
namespace WebApiServiceBusDemo
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<AppDbContext>(options =>
{
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
});
services.AddScoped<IOrderRepository, OrderRepository>();
services.AddSingleton<IServiceBusClient, ServiceBusClient>();
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddScoped<IOrderService, OrderService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceBusConsumer serviceBusConsumer)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
serviceBusConsumer.Start();
}
}
}
- 运行程序
现在我们可以运行程序了。在 Visual Studio 中按 F5 启动程序,然后使用 Postman 或其他工具向 https://localhost:5001/order 发送 POST 请求,请求体为以下 JSON:
{
"productName": "Apple",
"price": 3.5
}
然后我们可以通过查询数据库来验证订单是否成功写入。
- 停止服务
为了停止 Service Bus 消费者,我们需要在程序结束时调用 StopAsync 方法。在 Startup.cs 文件中添加以下代码:
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using WebApiServiceBusDemo.Models;
using WebApiServiceBusDemo.Repositories;
using WebApiServiceBusDemo.Services;
namespace WebApiServiceBusDemo
{
public class Startup
{
private IServiceBusConsumer _serviceBusConsumer;
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddDbContext<AppDbContext>(options =>
{
options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
});
services.AddScoped<IOrderRepository, OrderRepository>();
services.AddSingleton<IServiceBusClient, ServiceBusClient>();
services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddScoped<IOrderService, OrderService>();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceBusConsumer serviceBusConsumer)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
_serviceBusConsumer = serviceBusConsumer;
_serviceBusConsumer.Start();
appLifetime.ApplicationStopping.Register(OnShutdown);
}
private void OnShutdown()
{
_serviceBusConsumer.StopAsync().GetAwaiter().GetResult();
}
}
}
现在当程序结束时,会自动调用 OnShutdown 方法来停止 Service Bus 消费者。
原文地址: http://www.cveoy.top/t/topic/tRX 著作权归作者所有。请勿转载和采集!