Pipeline API¶
Pipeline Class¶
Usage Examples¶
Creating a Pipeline¶
from stagecoachml import Pipeline
# Create an empty pipeline
pipeline = Pipeline(name="my_pipeline")
# Create with configuration
pipeline = Pipeline(
name="configured_pipeline",
config={
"max_retries": 3,
"timeout": 300
}
)
Adding Stages¶
from stagecoachml.stage import FunctionStage
def my_function(context):
return {"result": "success"}
stage = FunctionStage(name="my_stage", func=my_function)
pipeline.add_stage(stage)
Defining Dependencies¶
# Add a dependency between stages
pipeline.add_dependency("stage1", "stage2")
# This ensures stage1 runs before stage2
Running the Pipeline¶
# Run with default context
results = pipeline.run()
# Run with custom context
results = pipeline.run({"custom_param": "value"})
Validation¶
# Validate pipeline structure
is_valid = pipeline.validate()
# Get execution order
order = pipeline.get_execution_order()
print(f"Stages will execute in order: {order}")