java操作rabbitmq 通过ConnectionFactory连接工厂移除监听队列
要移除监听队列,你需要使用Channel对象的basicCancel方法来取消消费者的订阅。
首先,你需要创建一个Connection对象和一个Channel对象:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
然后,你需要创建一个消费者并订阅队列:
String queueName = "your_queue_name";
boolean autoAck = true;
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(queueName, autoAck, consumer);
最后,当你想要移除监听队列时,调用Channel对象的basicCancel方法:
channel.basicCancel(consumerTag);
这将取消消费者的订阅。注意,你需要传递消费者的标签(consumerTag),这是在basicConsume方法返回的。
完整的代码示例:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queueName = "your_queue_name";
boolean autoAck = true;
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
};
channel.basicConsume(queueName, autoAck, consumer);
// Wait for some time...
channel.basicCancel(consumerTag);
channel.close();
connection.close();
}
}
确保替换队列名称(queueName)为你要监听的队列名称
原文地址: https://www.cveoy.top/t/topic/h1MV 著作权归作者所有。请勿转载和采集!