来一个Kafka跟elasticsearch的全局异常处理代码
Kafka全局异常处理代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
String topicName = "test-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(topicName));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
}
Elasticsearch全局异常处理代码:
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
public class ElasticsearchExample {
public static void main(String[] args) {
RestHighLevelClient client = new RestHighLevelClient();
// index
try {
IndexRequest request = new IndexRequest("test-index");
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("name", "John");
builder.field("age", 30);
builder.field("city", "New York");
}
builder.endObject();
request.source(builder);
IndexResponse response = client.index(request);
System.out.println(response.getId());
} catch (IOException e) {
e.printStackTrace();
}
// search
try {
SearchRequest request = new SearchRequest("test-index");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());
request.source(builder);
SearchResponse response = client.search(request);
for (SearchHit hit : response.getHits()) {
System.out.println(hit.getSourceAsString());
}
} catch (IOException e) {
e.printStackTrace();
}
// update
try {
UpdateRequest request = new UpdateRequest("test-index", "1");
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("age", 31);
}
builder.endObject();
request.doc(builder);
UpdateResponse response = client.update(request);
System.out.println(response.getId());
} catch (IOException e) {
e.printStackTrace();
}
// delete
try {
DeleteRequest request = new DeleteRequest("test-index", "1");
DeleteResponse response = client.delete(request);
System.out.println(response.getId());
} catch (IOException e) {
e.printStackTrace();
}
client.close();
}
}
``
原文地址: https://www.cveoy.top/t/topic/fnOc 著作权归作者所有。请勿转载和采集!