请帮我写一个pipeline,如果选项参数A选中project01,则输出A的值,如果选项参数A选中project02,则创建一个变量B=123,并输出B的值
这是一个基于Python的简单pipeline示例:
from typing import Union
from dagster import pipeline, solid, InputDefinition, OutputDefinition
@solid(config_schema={"project": str})
def process_data(context, input_data: str) -> Union[str, int]:
project = context.solid_config["project"]
if project == "project01":
return input_data
elif project == "project02":
context.log.info("Creating variable B")
B = 123
context.log.info(f"B = {B}")
return B
else:
raise ValueError(f"Invalid project name: {project}")
@pipeline
def my_pipeline():
process_data()
在这个pipeline中,我们定义了一个solid process_data,它带有一个配置参数project,并且它需要一个输入参数input_data。根据project的值,solid将返回输入数据或创建一个变量B并返回它的值。我们还定义了一个my_pipeline,它只包含process_data solid。
我们可以通过以下方式运行这个pipeline:
from dagster import execute_pipeline
result = execute_pipeline(
my_pipeline,
{
"solids": {
"process_data": {
"config": {"project": "project01"},
"inputs": {"input_data": "Hello, world!"},
}
}
},
)
print(result.output_value("process_data")) # 输出 "Hello, world!"
如果我们将project设置为project02,我们将得到这样的输出:
Creating variable B
B = 123
我们也可以使用Dagster的CLI运行pipeline:
$ dagster pipeline execute -f my_pipeline.py -c '{"solids": {"process_data": {"config": {"project": "project01"}, "inputs": {"input_data": "Hello, world!"}}}}'
原文地址: https://www.cveoy.top/t/topic/s62 著作权归作者所有。请勿转载和采集!