Kafka全局异常处理代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {

    public static void main(String[] args) {
        String topicName = "test-topic";
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(topicName));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n",
                            record.offset(), record.key(), record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

Elasticsearch全局异常处理代码:

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;

public class ElasticsearchExample {

    public static void main(String[] args) {
        RestHighLevelClient client = new RestHighLevelClient();

        // index
        try {
            IndexRequest request = new IndexRequest("test-index");
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("name", "John");
                builder.field("age", 30);
                builder.field("city", "New York");
            }
            builder.endObject();
            request.source(builder);
            IndexResponse response = client.index(request);
            System.out.println(response.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }

        // search
        try {
            SearchRequest request = new SearchRequest("test-index");
            SearchSourceBuilder builder = new SearchSourceBuilder();
            builder.query(QueryBuilders.matchAllQuery());
            request.source(builder);
            SearchResponse response = client.search(request);
            for (SearchHit hit : response.getHits()) {
                System.out.println(hit.getSourceAsString());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

        // update
        try {
            UpdateRequest request = new UpdateRequest("test-index", "1");
            XContentBuilder builder = XContentFactory.jsonBuilder();
            builder.startObject();
            {
                builder.field("age", 31);
            }
            builder.endObject();
            request.doc(builder);
            UpdateResponse response = client.update(request);
            System.out.println(response.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }

        // delete
        try {
            DeleteRequest request = new DeleteRequest("test-index", "1");
            DeleteResponse response = client.delete(request);
            System.out.println(response.getId());
        } catch (IOException e) {
            e.printStackTrace();
        }

        client.close();
    }
}
``
来一个Kafka跟elasticsearch的全局异常处理代码

原文地址: https://www.cveoy.top/t/topic/fnOc 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录