Pipeline API

Pipeline Class

Usage Examples

Creating a Pipeline

from stagecoachml import Pipeline

# Create an empty pipeline
pipeline = Pipeline(name="my_pipeline")

# Create with configuration
pipeline = Pipeline(
    name="configured_pipeline",
    config={
        "max_retries": 3,
        "timeout": 300
    }
)

Adding Stages

from stagecoachml.stage import FunctionStage

def my_function(context):
    return {"result": "success"}

stage = FunctionStage(name="my_stage", func=my_function)
pipeline.add_stage(stage)

Defining Dependencies

# Add a dependency between stages
pipeline.add_dependency("stage1", "stage2")

# This ensures stage1 runs before stage2

Running the Pipeline

# Run with default context
results = pipeline.run()

# Run with custom context
results = pipeline.run({"custom_param": "value"})

Validation

# Validate pipeline structure
is_valid = pipeline.validate()

# Get execution order
order = pipeline.get_execution_order()
print(f"Stages will execute in order: {order}")