这是一个基于Python的简单pipeline示例:

from typing import Union
from dagster import pipeline, solid, InputDefinition, OutputDefinition

@solid(config_schema={"project": str})
def process_data(context, input_data: str) -> Union[str, int]:
    project = context.solid_config["project"]
    if project == "project01":
        return input_data
    elif project == "project02":
        context.log.info("Creating variable B")
        B = 123
        context.log.info(f"B = {B}")
        return B
    else:
        raise ValueError(f"Invalid project name: {project}")

@pipeline
def my_pipeline():
    process_data()

在这个pipeline中,我们定义了一个solid process_data,它带有一个配置参数project,并且它需要一个输入参数input_data。根据project的值,solid将返回输入数据或创建一个变量B并返回它的值。我们还定义了一个my_pipeline,它只包含process_data solid。

我们可以通过以下方式运行这个pipeline:

from dagster import execute_pipeline

result = execute_pipeline(
    my_pipeline,
    {
        "solids": {
            "process_data": {
                "config": {"project": "project01"},
                "inputs": {"input_data": "Hello, world!"},
            }
        }
    },
)

print(result.output_value("process_data"))  # 输出 "Hello, world!"

如果我们将project设置为project02,我们将得到这样的输出:

Creating variable B
B = 123

我们也可以使用Dagster的CLI运行pipeline:

$ dagster pipeline execute -f my_pipeline.py -c '{"solids": {"process_data": {"config": {"project": "project01"}, "inputs": {"input_data": "Hello, world!"}}}}'
请帮我写一个pipeline,如果选项参数A选中project01,则输出A的值,如果选项参数A选中project02,则创建一个变量B=123,并输出B的值

原文地址: https://www.cveoy.top/t/topic/s62 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录