使用 spring+kafka+springMvc 并不使用KafkaListener注解 和xml配置方式 demo
首先,我们需要在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.8</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
接下来,我们需要在application.properties文件中添加以下配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group1
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
然后,我们需要创建一个KafkaProducerConfig类来配置生产者:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
然后,我们需要创建一个KafkaConsumerConfig类来配置消费者:
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Object.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
接下来,我们需要创建一个KafkaController类来处理HTTP请求和Kafka消息:
@RestController
@RequestMapping("/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@PostMapping("/publish")
public void sendMessage(@RequestBody String message) {
kafkaTemplate.send("topic1", message);
}
@GetMapping("/consume")
public List<Object> consumeMessages() {
List<Object> messages = new ArrayList<>();
ConsumerRecords<String, Object> records = kafkaTemplate.execute(new KafkaOperations.ProducerCallback<String, Object, ConsumerRecords<String, Object>>() {
@Override
public ConsumerRecords<String, Object> doInKafka(Producer<String, Object> producer) {
ConsumerRecords<String, Object> records = null;
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, "metadata"));
producer.sendOffsetsToTransaction(offsets, "group1");
producer.beginTransaction();
records = producer.poll(Duration.ofSeconds(10));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
return records;
}
});
for (ConsumerRecord<String, Object> record : records) {
messages.add(record.value());
}
return messages;
}
}
最后,我们需要在web.xml文件中添加以下配置:
<servlet>
<servlet-name>dispatcherServlet</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/springmvc-servlet.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>dispatcherServlet</servlet-name>
<url-pattern>/</url-pattern>
</servlet-mapping>
然后,我们需要在WEB-INF目录下创建一个springmvc-servlet.xml文件,配置Spring MVC:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.example.demo.controller"/>
<mvc:annotation-driven />
<bean id="viewResolver"
class="org.springframework.web.servlet.view.InternalResourceViewResolver">
<property name="prefix" value="/WEB-INF/views/" />
<property name="suffix" value=".jsp" />
</bean>
</beans>
现在我们可以在浏览器中访问http://localhost:8080/kafka/consume来消费Kafka消息,也可以在Postman中发送POST请求http://localhost:8080/kafka/publish来发送Kafka消息。
原文地址: https://www.cveoy.top/t/topic/bZbS 著作权归作者所有。请勿转载和采集!