在 Flink 中,可以使用 log4j 或者 slf4j 来记录和打印运行日志。

首先,需要在 Flink 项目的依赖中添加 log4j 或者 slf4j 相关的库。例如,如果选择 slf4j,可以添加以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>${slf4j.version}</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
</dependency>

然后,在 Flink 作业的代码中,可以使用 org.slf4j.Logger 来记录日志。例如:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkJob {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJob.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Flink 作业逻辑
        env.fromElements("Hello", "Flink")
                .map(new MapFunction<String, String>() {
                    @Override
                    public String map(String value) throws Exception {
                        LOG.info("Processing value: {}", value);
                        return value.toUpperCase();
                    }
                })
                .print();

        env.execute("Flink Job");
    }
}

在上面的例子中,使用 LOG.info() 方法来记录日志。日志消息中可以使用占位符 {} 来引用变量。

最后,需要在 log4j.properties 或者 log4j.xml 配置文件中,指定日志的输出格式和目标。例如:

# log4j.properties

# 输出日志到控制台
log4j.rootLogger=INFO, Console

# 定义控制台输出
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - %m%n

上述配置将日志输出到控制台,并设置了日志的格式。

当运行 Flink 作业时,会在控制台输出日志信息

flink打印运行日志

原文地址: https://www.cveoy.top/t/topic/hYR6 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录