ai-querying-databases
npx skills add https://github.com/lebsral/dspy-programming-not-prompting-lms-skills --skill ai-querying-databases
Agent 安装分布
Skill 文档
Build AI That Answers Questions About Your Database
Guide the user through building an AI that takes plain English questions and returns answers from a SQL database. The pattern: understand the schema, generate SQL, validate it, run it, and explain the results.
When you need this
- Sales reps asking “how many deals closed last month?” without writing SQL
- Executives asking revenue questions in plain English
- Support agents looking up customer records by description
- Internal data assistants for non-technical staff
- Any “chat with your database” feature
How it’s different from document search
Document search (/ai-searching-docs) |
Database querying (this skill) | |
|---|---|---|
| Data type | Unstructured text (PDFs, articles, docs) | Structured data (tables, rows, columns) |
| How it works | Embed + retrieve passages | Understand schema + generate SQL |
| Output | Text answer grounded in passages | Data from query results + interpretation |
| Key challenge | Finding relevant passages | Writing correct, safe SQL |
Step 1: Understand the setup
Ask the user:
- What database? (Postgres, MySQL, SQLite, Snowflake, BigQuery, etc.)
- What tables matter? (all of them, or a subset?)
- Who asks questions? (technical users, business users, customers?)
- Read-only access? (this should always be yes for AI-generated SQL)
Step 2: Connect to your database
Use SQLAlchemy for provider-agnostic database access:
from sqlalchemy import create_engine, inspect, text
# PostgreSQL
engine = create_engine("postgresql://user:pass@host:5432/mydb")
# MySQL
engine = create_engine("mysql+pymysql://user:pass@host:3306/mydb")
# SQLite (for development)
engine = create_engine("sqlite:///local.db")
# Snowflake
engine = create_engine("snowflake://user:pass@account/db/schema")
# BigQuery
engine = create_engine("bigquery://project/dataset")
Build schema descriptions for the AI
The AI needs to understand your tables to write correct SQL:
def get_schema_description(engine, tables=None):
"""Build a text description of database schema for the AI."""
inspector = inspect(engine)
tables = tables or inspector.get_table_names()
descriptions = []
for table in tables:
columns = inspector.get_columns(table)
col_descs = []
for col in columns:
col_descs.append(f" - {col['name']} ({col['type']})")
pk = inspector.get_pk_constraint(table)
pk_cols = pk['constrained_columns'] if pk else []
desc = f"Table: {table}\n"
if pk_cols:
desc += f" Primary key: {', '.join(pk_cols)}\n"
desc += " Columns:\n" + "\n".join(col_descs)
descriptions.append(desc)
return "\n\n".join(descriptions)
schema = get_schema_description(engine)
print(schema)
Add business context (optional but helpful)
Raw column names like cust_ltv_90d don’t mean much to the AI. Add descriptions:
TABLE_DESCRIPTIONS = {
"orders": "Customer orders with amounts, dates, and status",
"customers": "Customer profiles with contact info and signup date",
"products": "Product catalog with names, prices, and categories",
}
COLUMN_DESCRIPTIONS = {
"orders.cust_ltv_90d": "Customer lifetime value over the last 90 days in USD",
"orders.gmv": "Gross merchandise value (total order amount before discounts)",
}
def get_enriched_schema(engine, table_descs=None, col_descs=None):
"""Schema description with business context."""
inspector = inspect(engine)
table_descs = table_descs or {}
col_descs = col_descs or {}
descriptions = []
for table in inspector.get_table_names():
desc = f"Table: {table}"
if table in table_descs:
desc += f" -- {table_descs[table]}"
desc += "\n Columns:\n"
for col in inspector.get_columns(table):
col_key = f"{table}.{col['name']}"
col_desc = f" - {col['name']} ({col['type']})"
if col_key in col_descs:
col_desc += f" -- {col_descs[col_key]}"
desc += col_desc + "\n"
descriptions.append(desc)
return "\n".join(descriptions)
Step 3: Build the text-to-SQL pipeline
Two-stage approach: first pick the relevant tables, then generate SQL.
Stage 1: Table selection (for databases with many tables)
import dspy
class SelectTables(dspy.Signature):
"""Given a database schema and a user question, select which tables
are needed to answer the question."""
schema: str = dspy.InputField(desc="Database schema description")
question: str = dspy.InputField(desc="User's question in plain English")
tables: list[str] = dspy.OutputField(desc="List of table names needed")
reasoning: str = dspy.OutputField(desc="Why these tables are needed")
Stage 2: SQL generation
class GenerateSQL(dspy.Signature):
"""Write a SQL SELECT query to answer the user's question.
Only use tables and columns that exist in the schema."""
schema: str = dspy.InputField(desc="Database schema for relevant tables")
question: str = dspy.InputField(desc="User's question in plain English")
sql: str = dspy.OutputField(desc="SQL SELECT query (read-only, no mutations)")
The full pipeline
class DatabaseQA(dspy.Module):
def __init__(self, engine, schema, use_table_selection=False):
self.engine = engine
self.full_schema = schema
self.use_table_selection = use_table_selection
if use_table_selection:
self.select_tables = dspy.ChainOfThought(SelectTables)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
# Pick relevant tables (for large schemas)
if self.use_table_selection:
selected = self.select_tables(
schema=self.full_schema, question=question
)
schema = filter_schema(self.full_schema, selected.tables)
else:
schema = self.full_schema
# Generate SQL
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
# Validate (see Step 4)
validate_sql(sql)
# Execute
rows = execute_query(self.engine, sql)
# Interpret results
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(
sql=sql, rows=rows, answer=interpretation.answer
)
Helper: filter schema to selected tables
def filter_schema(full_schema, table_names):
"""Keep only the schema sections for selected tables."""
sections = full_schema.split("\n\n")
filtered = []
for section in sections:
for table in table_names:
if section.startswith(f"Table: {table}"):
filtered.append(section)
break
return "\n\n".join(filtered)
Step 4: Validate SQL before execution
Never run AI-generated SQL without validation. Use dspy.Assert for hard constraints and dspy.Suggest for style guidance:
import sqlparse
def validate_sql(sql):
"""Validate AI-generated SQL is safe and well-formed."""
sql_upper = sql.strip().upper()
# Hard safety constraints
dspy.Assert(
sql_upper.startswith("SELECT"),
"Only SELECT queries are allowed. Do not generate INSERT, UPDATE, DELETE, DROP, or ALTER."
)
dangerous = ["INSERT", "UPDATE", "DELETE", "DROP", "ALTER", "TRUNCATE", "EXEC"]
for keyword in dangerous:
dspy.Assert(
keyword not in sql_upper.split("SELECT", 1)[0],
f"Query contains forbidden keyword: {keyword}"
)
# Syntax check
parsed = sqlparse.parse(sql)
dspy.Assert(len(parsed) == 1, "Generate exactly one SQL statement")
dspy.Assert(
parsed[0].get_type() == "SELECT",
"Only SELECT statements are allowed"
)
# Style suggestions (soft constraints â AI will retry if possible)
dspy.Suggest(
"JOIN" not in sql_upper or "ON" in sql_upper,
"Use explicit JOIN ... ON syntax instead of implicit joins in WHERE"
)
Execute with safety limits
from sqlalchemy import text
def execute_query(engine, sql, row_limit=100, timeout_seconds=30):
"""Execute validated SQL with safety limits."""
# Add row limit if not present
if "LIMIT" not in sql.upper():
sql = f"{sql} LIMIT {row_limit}"
with engine.connect() as conn:
conn = conn.execution_options(timeout=timeout_seconds)
result = conn.execute(text(sql))
columns = list(result.keys())
rows = [dict(zip(columns, row)) for row in result.fetchall()]
return rows
Step 5: Interpret results
Convert raw query results back to a natural language answer:
class InterpretResults(dspy.Signature):
"""Convert SQL query results into a clear, natural language answer
to the user's original question."""
question: str = dspy.InputField(desc="The user's original question")
sql: str = dspy.InputField(desc="The SQL query that was run")
results: str = dspy.InputField(desc="Query results as a string")
answer: str = dspy.OutputField(desc="Natural language answer to the question")
Step 6: Handle large schemas
For databases with 50+ tables, sending the full schema to the AI is expensive and confusing. Use embedding-based schema retrieval:
import chromadb
def build_schema_index(engine, table_descriptions=None):
"""Build a searchable index of table schemas."""
client = chromadb.PersistentClient(path="./schema_index")
collection = client.get_or_create_collection("table_schemas")
inspector = inspect(engine)
table_descriptions = table_descriptions or {}
for table in inspector.get_table_names():
columns = inspector.get_columns(table)
col_names = [c["name"] for c in columns]
# Searchable description
desc = table_descriptions.get(table, table)
searchable = f"{table}: {desc}. Columns: {', '.join(col_names)}"
collection.upsert(
documents=[searchable],
ids=[table],
metadatas=[{"table": table}],
)
return collection
class SchemaRetriever(dspy.Retrieve):
"""Retrieve relevant table schemas based on the question."""
def __init__(self, collection, engine, k=5):
super().__init__(k=k)
self.collection = collection
self.engine = engine
def forward(self, query, k=None):
k = k or self.k
results = self.collection.query(query_texts=[query], n_results=k)
# Get full schema for matched tables
tables = [m["table"] for m in results["metadatas"][0]]
schema = get_schema_description(self.engine, tables=tables)
return dspy.Prediction(passages=[schema])
Then use it in your pipeline:
class LargeSchemaQA(dspy.Module):
def __init__(self, engine, schema_collection):
self.engine = engine
self.schema_retriever = SchemaRetriever(schema_collection, engine, k=5)
self.generate_sql = dspy.ChainOfThought(GenerateSQL)
self.interpret = dspy.ChainOfThought(InterpretResults)
def forward(self, question):
schema = self.schema_retriever(question).passages[0]
result = self.generate_sql(schema=schema, question=question)
sql = result.sql.strip().rstrip(";")
validate_sql(sql)
rows = execute_query(self.engine, sql)
interpretation = self.interpret(
question=question, sql=sql, results=str(rows[:20])
)
return dspy.Prediction(sql=sql, rows=rows, answer=interpretation.answer)
Step 7: Test and optimize
SQL execution accuracy metric
def sql_accuracy(example, prediction, trace=None):
"""Check if the generated SQL returns the correct answer."""
try:
# Compare results (not SQL text â many valid SQL queries per question)
expected = set(str(r) for r in example.expected_rows)
actual = set(str(r) for r in prediction.rows)
return float(expected == actual)
except Exception:
return 0.0
def answer_quality(example, prediction, trace=None):
"""Check if the natural language answer is correct."""
judge = dspy.Predict("question, expected_answer, predicted_answer -> is_correct: bool")
result = judge(
question=example.question,
expected_answer=example.answer,
predicted_answer=prediction.answer,
)
return float(result.is_correct)
Build training data
trainset = [
dspy.Example(
question="How many orders were placed last month?",
answer="There were 1,247 orders placed last month.",
expected_sql="SELECT COUNT(*) FROM orders WHERE created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')",
).with_inputs("question"),
# Add 20-50 question/answer pairs covering your common queries
]
Optimize
optimizer = dspy.MIPROv2(metric=answer_quality, auto="medium")
optimized = optimizer.compile(DatabaseQA(engine, schema), trainset=trainset)
optimized.save("optimized_db_qa.json")
Step 8: Security and production
Security checklist
| Control | How |
|---|---|
| Read-only database user | GRANT SELECT ON ALL TABLES TO ai_reader |
| Query timeout | execution_options(timeout=30) in SQLAlchemy |
| Row limit | Always append LIMIT to queries |
| Table allowlist | Only include permitted tables in the schema |
| SQL validation | dspy.Assert for SELECT-only, no dangerous keywords |
| Audit logging | Log every question, generated SQL, and results |
| No raw credentials | Use environment variables or secrets manager |
Audit logging
import json
from datetime import datetime
def log_query(question, sql, row_count, user_id=None):
entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"question": question,
"sql": sql,
"row_count": row_count,
}
with open("query_audit.jsonl", "a") as f:
f.write(json.dumps(entry) + "\n")
Table allowlist
ALLOWED_TABLES = {"orders", "products", "customers", "categories"}
def get_safe_schema(engine, allowed=ALLOWED_TABLES):
inspector = inspect(engine)
all_tables = set(inspector.get_table_names())
tables = list(all_tables & allowed)
return get_schema_description(engine, tables=tables)
Key patterns
- Two-stage pipeline: table selection + SQL generation works better than one giant prompt
- Validate before executing: never run AI-generated SQL without safety checks
- Compare results, not SQL: many valid SQL queries produce the same answer
- Business context matters: column descriptions improve accuracy more than extra examples
- Start with a small table allowlist: expand as you build confidence
- Read-only, always: the AI database user should never have write permissions
Additional resources
- For worked examples, see examples.md
- Use
/ai-serving-apisto put your database assistant behind a REST API - Use
/ai-building-pipelinesfor complex multi-step query workflows - Use
/ai-checking-outputsfor additional SQL validation patterns - Use
/ai-following-rulesto enforce query policies (e.g., no queries on PII columns) - Use
/ai-improving-accuracyto measure and optimize query quality - Use
/ai-tracing-requeststo debug individual query failures