ai-building-pipelines
npx skills add https://github.com/lebsral/dspy-programming-not-prompting-lms-skills --skill ai-building-pipelines
Agent 安装分布
Skill 文档
Build a Multi-Step AI Pipeline
Guide the user through breaking a complex AI task into multiple steps that feed into each other. One prompt can’t do everything â compound AI systems dramatically outperform single calls by decomposing problems.
Step 1: Understand the pipeline
Ask the user:
- What’s the end-to-end task? (e.g., “read a support ticket, classify it, draft a response”)
- What are the natural stages? (classification, retrieval, generation, verification?)
- Does any step need special tools? (search, database, calculator?)
- Does data flow linearly, or do steps branch/loop?
Step 2: Design the stages
The core pattern â compose DSPy modules
Every stage is a DSPy module. Wire them together in forward():
import dspy
class SupportPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyTicket)
self.retrieve = dspy.Retrieve(k=3)
self.draft = dspy.ChainOfThought(DraftResponse)
def forward(self, ticket):
# Stage 1: Classify
classification = self.classify(ticket=ticket)
# Stage 2: Retrieve relevant docs
docs = self.retrieve(classification.category + " " + ticket).passages
# Stage 3: Draft response using classification + docs
return self.draft(
ticket=ticket,
category=classification.category,
context=docs,
)
Each stage has its own signature:
from typing import Literal
CATEGORIES = ["billing", "technical", "account", "general"]
class ClassifyTicket(dspy.Signature):
"""Classify the support ticket."""
ticket: str = dspy.InputField()
category: Literal[tuple(CATEGORIES)] = dspy.OutputField()
class DraftResponse(dspy.Signature):
"""Draft a helpful response to the support ticket."""
ticket: str = dspy.InputField()
category: str = dspy.InputField()
context: list[str] = dspy.InputField(desc="Relevant help articles")
response: str = dspy.OutputField(desc="Professional support response")
Step 3: Common pipeline patterns
Classify â Route â Specialize
Different categories get different handling:
class RoutedPipeline(dspy.Module):
def __init__(self):
self.classify = dspy.ChainOfThought(ClassifyInput)
self.handlers = {
"simple": dspy.Predict(SimpleAnswer),
"complex": dspy.ChainOfThought(DetailedAnswer),
"research": dspy.ChainOfThought(ResearchAnswer),
}
def forward(self, question):
category = self.classify(question=question).category
handler = self.handlers.get(category, self.handlers["simple"])
return handler(question=question)
Generate â Verify â Refine
Generate a first draft, check it, then improve:
class GenerateAndRefine(dspy.Module):
def __init__(self):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.verify = dspy.ChainOfThought(CheckQuality)
self.refine = dspy.ChainOfThought(ImproveDraft)
def forward(self, task):
# Stage 1: Generate
draft = self.generate(task=task)
# Stage 2: Verify
check = self.verify(task=task, draft=draft.output)
# Stage 3: Refine if needed
if not check.is_good:
refined = self.refine(
task=task,
draft=draft.output,
feedback=check.feedback,
)
return refined
return draft
Ensemble â ask multiple times, pick the best
Generate several candidates and select the best one (the pattern behind AlphaCode and Medprompt):
class EnsemblePipeline(dspy.Module):
def __init__(self, num_candidates=5):
self.generators = [dspy.ChainOfThought(GenerateAnswer) for _ in range(num_candidates)]
self.judge = dspy.ChainOfThought(PickBestAnswer)
def forward(self, question):
# Stage 1: Generate multiple candidates
candidates = []
for gen in self.generators:
result = gen(question=question)
candidates.append(result.answer)
# Stage 2: Pick the best
return self.judge(
question=question,
candidates=candidates,
)
class PickBestAnswer(dspy.Signature):
"""Pick the best answer from the candidates."""
question: str = dspy.InputField()
candidates: list[str] = dspy.InputField(desc="Multiple answer candidates")
best_answer: str = dspy.OutputField(desc="The most accurate and complete answer")
reasoning: str = dspy.OutputField(desc="Why this answer was chosen")
Parallel fan-out â merge
Process different aspects independently, then combine:
class ParallelAnalysis(dspy.Module):
def __init__(self):
self.sentiment = dspy.ChainOfThought(AnalyzeSentiment)
self.topics = dspy.ChainOfThought(ExtractTopics)
self.entities = dspy.ChainOfThought(ExtractEntities)
self.summarize = dspy.ChainOfThought(CombineAnalysis)
def forward(self, text):
# Fan out â run in parallel (DSPy can parallelize these)
sent = self.sentiment(text=text)
topics = self.topics(text=text)
entities = self.entities(text=text)
# Merge results
return self.summarize(
text=text,
sentiment=sent.sentiment,
topics=topics.topics,
entities=entities.entities,
)
Loop â iterative refinement
Keep improving until a condition is met:
class IterativeRefiner(dspy.Module):
def __init__(self, max_iterations=3):
self.generate = dspy.ChainOfThought(GenerateDraft)
self.evaluate = dspy.ChainOfThought(EvaluateDraft)
self.improve = dspy.ChainOfThought(ImproveDraft)
self.max_iterations = max_iterations
def forward(self, task):
draft = self.generate(task=task)
for i in range(self.max_iterations):
evaluation = self.evaluate(task=task, draft=draft.output)
if evaluation.score >= 0.9:
break
draft = self.improve(
task=task,
draft=draft.output,
feedback=evaluation.feedback,
)
return draft
Step 4: Use different models per stage
Not every stage needs the same model. Use cheap models for simple steps:
expensive_lm = dspy.LM("openai/gpt-4o")
cheap_lm = dspy.LM("openai/gpt-4o-mini")
pipeline = SupportPipeline()
# Cheap model for classification (simple task)
pipeline.classify.lm = cheap_lm
# Expensive model for drafting (needs quality)
pipeline.draft.lm = expensive_lm
See /ai-cutting-costs for more cost optimization strategies.
Step 5: Test and optimize the full pipeline
The beauty of DSPy pipelines: you optimize the whole thing end-to-end, not each step separately.
def pipeline_metric(example, prediction, trace=None):
# Score the final output quality
return prediction.response.lower().strip() == example.response.lower().strip()
# Optimizes prompts for ALL stages together
optimizer = dspy.MIPROv2(metric=pipeline_metric, auto="medium")
optimized = optimizer.compile(pipeline, trainset=trainset)
Key patterns
- Decompose the problem â if a task has distinct phases (understand, retrieve, generate, verify), make each one a module
- Each stage gets its own signature â clear inputs and outputs make the pipeline debuggable
- Wire in
forward()â theforwardmethod is your orchestration logic - Optimize end-to-end â DSPy optimizers tune all stages together to maximize the final metric
- Debug stage by stage â use
dspy.inspect_history()to see what each step did - Assign models per stage â cheap models for simple tasks, expensive for complex ones
When to use LangGraph instead
DSPy pipelines are great for stateless, linear-ish flows. But some problems need more:
| If your pipeline… | Use |
|---|---|
| Steps run in a fixed order | DSPy pipeline (this skill) |
| Steps branch based on results | DSPy pipeline with if/else in forward() |
| Needs cycles (retry loops, agent loops) | LangGraph StateGraph with DSPy modules as nodes |
| Needs persistent state across calls | LangGraph with checkpointing |
| Needs human approval mid-pipeline | LangGraph interrupt_before |
| Coordinates multiple independent agents | LangGraph supervisor pattern |
Quick example: DSPy module as a LangGraph node
import dspy
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
input_text: str
category: str
output: str
# DSPy modules
classifier = dspy.ChainOfThought("text -> category")
generator = dspy.ChainOfThought("text, category -> output")
# Wrap as LangGraph nodes
def classify_node(state: PipelineState) -> dict:
result = classifier(text=state["input_text"])
return {"category": result.category}
def generate_node(state: PipelineState) -> dict:
result = generator(text=state["input_text"], category=state["category"])
return {"output": result.output}
# Build graph
graph = StateGraph(PipelineState)
graph.add_node("classify", classify_node)
graph.add_node("generate", generate_node)
graph.add_edge(START, "classify")
graph.add_edge("classify", "generate")
graph.add_edge("generate", END)
app = graph.compile()
This gives you LangGraph’s state management and routing with DSPy’s optimizable prompts. For more, see /ai-building-chatbots (stateful conversations) and /ai-coordinating-agents (multi-agent systems). For the full LangGraph API reference, see docs/langchain-langgraph-reference.md.
Additional resources
- Use
/ai-checking-outputsto add verification and guardrails between stages - Use
/ai-cutting-coststo assign different models per stage - Not sure what stages your pipeline needs? Use
/ai-decomposing-tasksto identify where to split - For content generation pipelines, see
/ai-writing-content. For complex reasoning, see/ai-reasoning - Next:
/ai-improving-accuracyto measure and improve your pipeline