data-provenance
npx skills add https://github.com/sunnypatneedi/claude-starter-kit --skill data-provenance
Agent 安装分布
Skill 文档
Data Provenance & Lineage
Track where data comes from, how it transforms, and where it goesâessential for trust, compliance, and debugging.
When to Use
Use this skill when:
- Auditing data for compliance (GDPR, HIPAA, SOX, CCPA)
- Debugging data quality issues (“Where did this bad data come from?”)
- Understanding impact of schema changes (“What breaks if I change this field?”)
- Building data catalogs or governance systems
- Tracking sensitive data (PII, PHI) through systems
- Responding to data deletion requests (GDPR “right to be forgotten”)
What is Data Provenance?
Provenance: The complete history and lineage of a data element
Question: "Where does the revenue number in this dashboard come from?"
Answer (with provenance):
Dashboard.revenue (computed 2026-01-21 08:00)
â warehouse.daily_sales.total (aggregated 2026-01-21 02:00)
â etl_pipeline.transform_sales (ran 2026-01-21 01:30)
â production_db.orders.amount (order #12345, created 2026-01-20 15:23)
â stripe_api.charge (charge_id: ch_abc123, processed 2026-01-20 15:23)
â user input (customer: cust_xyz, card ending 4242)
Key questions provenance answers:
- Where did this data come from? (source)
- When was it created/updated? (timestamp)
- How was it transformed? (logic, code version)
- Who created/modified it? (user, system, process)
- Why does it have this value? (business logic)
- What depends on it? (downstream consumers)
Levels of Provenance Tracking
Level 1: Table-Level Lineage
What: Track which tables feed into other tables
ââââââââââââââ
â orders ââââ
ââââââââââââââ â
âââ⺠ââââââââââââââââ
ââââââââââââââ â â daily_sales â
â customers ââââ ââââââââââââââââ
ââââââââââââââ
Implementation: Metadata table
CREATE TABLE table_lineage (
downstream_table VARCHAR(255),
upstream_table VARCHAR(255),
relationship_type VARCHAR(50), -- 'direct_copy', 'join', 'aggregate'
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, upstream_table)
);
INSERT INTO table_lineage VALUES
('daily_sales', 'orders', 'aggregate'),
('daily_sales', 'customers', 'join');
Query: “What tables does daily_sales depend on?”
SELECT upstream_table
FROM table_lineage
WHERE downstream_table = 'daily_sales';
-- Result: orders, customers
Query: “What tables depend on orders?”
SELECT downstream_table
FROM table_lineage
WHERE upstream_table = 'orders';
-- Result: daily_sales, weekly_report, customer_lifetime_value
Level 2: Column-Level Lineage
What: Track which columns feed into which columns
orders.amount âââ
orders.tax âââ¼ââ⺠daily_sales.total_revenue
orders.shippingââ
Implementation:
CREATE TABLE column_lineage (
downstream_table VARCHAR(255),
downstream_column VARCHAR(255),
upstream_table VARCHAR(255),
upstream_column VARCHAR(255),
transformation TEXT, -- SQL or description
created_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (downstream_table, downstream_column, upstream_table, upstream_column)
);
INSERT INTO column_lineage VALUES
('daily_sales', 'total_revenue', 'orders', 'amount', 'SUM(amount + tax + shipping)'),
('daily_sales', 'order_count', 'orders', 'id', 'COUNT(id)'),
('daily_sales', 'customer_name', 'customers', 'name', 'LEFT JOIN on customer_id');
Query: “Where does daily_sales.total_revenue come from?”
SELECT
upstream_table,
upstream_column,
transformation
FROM column_lineage
WHERE downstream_table = 'daily_sales'
AND downstream_column = 'total_revenue';
Level 3: Row-Level Lineage
What: Track individual record transformations
orders.id=12345 (amount=$100) ââ⺠daily_sales.id=67 (date=2026-01-20, total=$100)
orders.id=12346 (amount=$50) âââ
Implementation: Lineage table
CREATE TABLE row_lineage (
id BIGSERIAL PRIMARY KEY,
downstream_table VARCHAR(255),
downstream_pk BIGINT,
upstream_table VARCHAR(255),
upstream_pk BIGINT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- After ETL run
INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk)
SELECT
'daily_sales',
ds.id,
'orders',
o.id
FROM daily_sales ds
JOIN orders o ON DATE(o.created_at) = ds.sale_date;
Query: “What orders contributed to daily_sales row 67?”
SELECT o.*
FROM row_lineage rl
JOIN orders o ON rl.upstream_pk = o.id
WHERE rl.downstream_table = 'daily_sales'
AND rl.downstream_pk = 67;
Level 4: Value-Level Lineage (Finest)
What: Track transformations at the field value level
order.amount = $100
order.tax = $10
order.shipping = $5
â (SUM transformation)
daily_sales.total_revenue = $115
Implementation: Event log
CREATE TABLE value_lineage (
id BIGSERIAL PRIMARY KEY,
entity_type VARCHAR(50),
entity_id BIGINT,
field_name VARCHAR(100),
old_value TEXT,
new_value TEXT,
transformation TEXT,
source_values JSONB, -- Array of source values
created_at TIMESTAMPTZ DEFAULT NOW(),
created_by VARCHAR(255) -- User or process
);
-- Example: Revenue calculation
INSERT INTO value_lineage VALUES (
DEFAULT,
'daily_sales',
67,
'total_revenue',
NULL,
'115.00',
'SUM(orders.amount + orders.tax + orders.shipping) WHERE date = 2026-01-20',
'[{"table": "orders", "id": 12345, "amount": 100, "tax": 10, "shipping": 5}]',
NOW(),
'etl_pipeline_v1.2.3'
);
Provenance Capture Methods
Method 1: Code Instrumentation
Manual tracking in ETL code:
def etl_orders_to_daily_sales():
# Extract
orders = db.query("SELECT * FROM orders WHERE date = ?", yesterday)
# Transform
daily_sales = {}
for order in orders:
date = order['created_at'].date()
if date not in daily_sales:
daily_sales[date] = {'total': 0, 'count': 0, 'order_ids': []}
daily_sales[date]['total'] += order['amount']
daily_sales[date]['count'] += 1
daily_sales[date]['order_ids'].append(order['id'])
# Load with lineage tracking
for date, metrics in daily_sales.items():
ds_id = db.insert(
"INSERT INTO daily_sales (date, total, count) VALUES (?, ?, ?)",
date, metrics['total'], metrics['count']
)
# Track lineage
for order_id in metrics['order_ids']:
db.insert(
"INSERT INTO row_lineage (downstream_table, downstream_pk, upstream_table, upstream_pk) VALUES (?, ?, ?, ?)",
'daily_sales', ds_id, 'orders', order_id
)
Method 2: SQL Parsing
Automatically extract lineage from SQL queries:
import sqlparse
from sqllineage.runner import LineageRunner
sql = """
INSERT INTO daily_sales (date, total_revenue, order_count)
SELECT
DATE(created_at) as date,
SUM(amount + tax + shipping) as total_revenue,
COUNT(*) as order_count
FROM orders
LEFT JOIN customers ON orders.customer_id = customers.id
WHERE created_at >= '2026-01-20'
GROUP BY DATE(created_at)
"""
# Parse lineage
runner = LineageRunner(sql)
print("Source tables:", runner.source_tables)
# {'orders', 'customers'}
print("Target tables:", runner.target_tables)
# {'daily_sales'}
# Store in lineage table
for source in runner.source_tables:
db.insert(
"INSERT INTO table_lineage (downstream_table, upstream_table) VALUES (?, ?)",
'daily_sales', source
)
Method 3: Database Triggers
Capture changes automatically:
-- Audit trail for all changes
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(255),
record_id BIGINT,
operation VARCHAR(10), -- INSERT, UPDATE, DELETE
old_values JSONB,
new_values JSONB,
changed_by VARCHAR(255),
changed_at TIMESTAMPTZ DEFAULT NOW()
);
-- Trigger on orders table
CREATE OR REPLACE FUNCTION audit_orders()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO audit_log (table_name, record_id, operation, old_values, new_values, changed_by)
VALUES (
'orders',
COALESCE(NEW.id, OLD.id),
TG_OP,
row_to_json(OLD),
row_to_json(NEW),
current_user
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER orders_audit
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION audit_orders();
Method 4: CDC (Change Data Capture)
Stream database changes:
# Using Debezium or similar CDC tool
from kafka import KafkaConsumer
consumer = KafkaConsumer('postgres.public.orders')
for message in consumer:
change_event = json.loads(message.value)
# Store in lineage system
db.insert(
"INSERT INTO change_log (table_name, operation, before, after, timestamp) VALUES (?, ?, ?, ?, ?)",
change_event['source']['table'],
change_event['op'], # 'c' (create), 'u' (update), 'd' (delete)
change_event.get('before'),
change_event.get('after'),
change_event['ts_ms']
)
Impact Analysis
Downstream Impact
Question: “If I change orders.amount, what breaks?”
-- Find all downstream dependencies
WITH RECURSIVE dependencies AS (
-- Base: Direct dependencies
SELECT
downstream_table,
downstream_column,
1 as depth
FROM column_lineage
WHERE upstream_table = 'orders'
AND upstream_column = 'amount'
UNION ALL
-- Recursive: Dependencies of dependencies
SELECT
cl.downstream_table,
cl.downstream_column,
d.depth + 1
FROM column_lineage cl
JOIN dependencies d
ON cl.upstream_table = d.downstream_table
AND cl.upstream_column = d.downstream_column
WHERE d.depth < 10 -- Prevent infinite loops
)
SELECT DISTINCT
downstream_table,
downstream_column,
depth
FROM dependencies
ORDER BY depth, downstream_table, downstream_column;
Result:
| downstream_table | downstream_column | depth |
|------------------------|--------------------|-------|
| daily_sales | total_revenue | 1 |
| monthly_revenue | total | 2 |
| executive_dashboard | ytd_revenue | 3 |
| investor_report | arr | 4 |
Interpretation: Changing orders.amount affects 4 layers of downstream tables!
Upstream Impact
Question: “What source data feeds into this dashboard metric?”
-- Trace backwards to original sources
WITH RECURSIVE sources AS (
-- Base: Direct sources
SELECT
upstream_table,
upstream_column,
1 as depth
FROM column_lineage
WHERE downstream_table = 'executive_dashboard'
AND downstream_column = 'ytd_revenue'
UNION ALL
-- Recursive: Sources of sources
SELECT
cl.upstream_table,
cl.upstream_column,
s.depth + 1
FROM column_lineage cl
JOIN sources s
ON cl.downstream_table = s.upstream_table
AND cl.downstream_column = s.upstream_column
WHERE s.depth < 10
)
SELECT DISTINCT
upstream_table,
upstream_column,
depth
FROM sources
WHERE upstream_table NOT IN (
SELECT DISTINCT downstream_table FROM column_lineage
) -- Only leaf nodes (true sources)
ORDER BY upstream_table, upstream_column;
Result: Original sources for dashboard metric
| upstream_table | upstream_column | depth |
|----------------|-----------------|-------|
| orders | amount | 4 |
| orders | tax | 4 |
| orders | shipping | 4 |
| stripe_events | charge_amount | 5 |
Data Catalog
Schema Registry
Track all datasets and their metadata:
CREATE TABLE data_catalog (
id BIGSERIAL PRIMARY KEY,
dataset_name VARCHAR(255) UNIQUE NOT NULL,
dataset_type VARCHAR(50), -- 'table', 'view', 'api', 'file'
description TEXT,
owner VARCHAR(255),
steward VARCHAR(255), -- Data steward (responsible for quality)
sensitivity VARCHAR(50), -- 'public', 'internal', 'confidential', 'restricted'
contains_pii BOOLEAN DEFAULT FALSE,
retention_days INT, -- How long to keep data
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE data_catalog_columns (
dataset_id BIGINT REFERENCES data_catalog(id),
column_name VARCHAR(255),
data_type VARCHAR(50),
description TEXT,
is_nullable BOOLEAN,
is_pii BOOLEAN DEFAULT FALSE,
pii_type VARCHAR(50), -- 'email', 'ssn', 'phone', 'name', etc.
sample_values TEXT[],
PRIMARY KEY (dataset_id, column_name)
);
-- Example: Register orders table
INSERT INTO data_catalog VALUES (
DEFAULT,
'orders',
'table',
'Customer orders from e-commerce platform',
'engineering@company.com',
'data-team@company.com',
'internal',
TRUE, -- Contains PII
2555, -- 7 years retention
NOW(),
NOW()
);
INSERT INTO data_catalog_columns VALUES
(1, 'id', 'BIGINT', 'Unique order ID', FALSE, FALSE, NULL, NULL),
(1, 'customer_email', 'VARCHAR(255)', 'Customer email address', FALSE, TRUE, 'email', NULL),
(1, 'amount', 'DECIMAL(10,2)', 'Order total in USD', FALSE, FALSE, NULL, '{10.99, 25.50, 100.00}');
Searchable Catalog
Find datasets by keyword:
-- Full-text search
CREATE INDEX idx_catalog_search ON data_catalog
USING GIN(to_tsvector('english', dataset_name || ' ' || description));
-- Search for "revenue"
SELECT
dataset_name,
dataset_type,
description,
owner
FROM data_catalog
WHERE to_tsvector('english', dataset_name || ' ' || description)
@@ to_tsquery('english', 'revenue')
ORDER BY dataset_name;
Compliance & Data Privacy
GDPR: Right to be Forgotten
Track all PII to enable deletion:
-- Find all PII for a user
SELECT
dc.dataset_name,
dcc.column_name,
dcc.pii_type
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.is_pii = TRUE;
-- Result: Tables/columns containing PII
| dataset_name | column_name | pii_type |
|-----------------|----------------|----------|
| orders | customer_email | email |
| users | email | email |
| users | name | name |
| support_tickets | email | email |
| analytics_events| user_id | user_id |
-- Generate deletion script
SELECT
'DELETE FROM ' || dataset_name || ' WHERE ' || column_name || ' = ''' || user_email || ''';'
FROM (
SELECT DISTINCT
dc.dataset_name,
dcc.column_name
FROM data_catalog dc
JOIN data_catalog_columns dcc ON dc.id = dcc.dataset_id
WHERE dcc.pii_type = 'email'
) subq;
-- Output:
-- DELETE FROM orders WHERE customer_email = 'user@example.com';
-- DELETE FROM users WHERE email = 'user@example.com';
-- DELETE FROM support_tickets WHERE email = 'user@example.com';
PII Tracking in Data Flow
Tag PII as it flows through pipeline:
def track_pii_flow(source_table, dest_table, pii_fields):
"""Track movement of PII between tables"""
for field in pii_fields:
db.insert(
"""
INSERT INTO pii_lineage (source_table, source_column, dest_table, dest_column, tracked_at)
VALUES (?, ?, ?, ?, NOW())
""",
source_table, field, dest_table, field
)
# Usage
track_pii_flow('users', 'orders', ['email'])
track_pii_flow('orders', 'daily_sales_with_emails', ['email'])
# Query: "Where has this user's email propagated?"
db.query("""
WITH RECURSIVE pii_flow AS (
SELECT dest_table, dest_column, 1 as depth
FROM pii_lineage
WHERE source_table = 'users' AND source_column = 'email'
UNION ALL
SELECT pl.dest_table, pl.dest_column, pf.depth + 1
FROM pii_lineage pl
JOIN pii_flow pf ON pl.source_table = pf.dest_table AND pl.source_column = pf.dest_column
WHERE pf.depth < 10
)
SELECT DISTINCT dest_table, dest_column FROM pii_flow;
""")
Visualization & Tools
Lineage Graph
Generate visual lineage:
import graphviz
def visualize_lineage(table_name):
# Fetch lineage
lineage = db.query("""
SELECT upstream_table, downstream_table
FROM table_lineage
WHERE upstream_table = ? OR downstream_table = ?
""", table_name, table_name)
# Create graph
dot = graphviz.Digraph()
for row in lineage:
dot.edge(row['upstream_table'], row['downstream_table'])
dot.render('lineage_graph', format='png', view=True)
visualize_lineage('orders')
Output:
stripe_api ââ⺠orders âââ¬ââ⺠daily_sales ââ⺠monthly_revenue
â
customers âââââââââââââââ
Commercial Tools
| Tool | Use Case | Features |
|---|---|---|
| Apache Atlas | Open-source data governance | Metadata management, lineage, search |
| Collibra | Enterprise data governance | Catalog, lineage, policies, workflows |
| Alation | Data catalog | Metadata search, collaboration, lineage |
| Amundsen (Lyft) | Open-source data discovery | Search, lineage, usage analytics |
| DataHub (LinkedIn) | Open-source metadata platform | Lineage, discovery, governance |
| dbt | Analytics engineering | SQL lineage, documentation, tests |
Implementation Checklist
Minimal (Start Here)
[ ] Table-level lineage tracking
[ ] Audit logs for critical tables
[ ] Data catalog for major datasets
[ ] Documentation of ETL processes
Standard
[ ] Column-level lineage
[ ] Automated lineage extraction from SQL
[ ] PII tagging and tracking
[ ] Impact analysis queries
[ ] Change notifications for downstream consumers
Advanced
[ ] Row-level lineage
[ ] Real-time lineage from CDC
[ ] Searchable data catalog
[ ] Automated GDPR compliance tools
[ ] Data quality metrics tied to lineage
[ ] Machine learning for anomaly detection
Output Format
When helping with data provenance:
## Provenance Strategy
### Lineage Level
- [ ] Table-level
- [ ] Column-level
- [ ] Row-level
- [ ] Value-level
### Capture Method
- [ ] Code instrumentation
- [ ] SQL parsing
- [ ] Database triggers
- [ ] CDC (Change Data Capture)
### Data Catalog Schema
[SQL DDL for catalog tables]
### Impact Analysis Queries
[SQL queries for upstream/downstream impact]
### PII Tracking
Tables with PII:
- [Table 1]: [Columns]
- [Table 2]: [Columns]
Deletion strategy:
[Step-by-step process]
### Visualization
[Lineage graph representation]
### Compliance Requirements
- [ ] GDPR
- [ ] CCPA
- [ ] HIPAA
- [ ] SOX
- [ ] Other: [specify]
### Tooling
- Lineage tracking: [Tool/Custom]
- Data catalog: [Tool/Custom]
- Visualization: [Tool/Custom]
Integration
Works with:
- scalable-data-schema – Track schema evolution over time
- data-infrastructure-at-scale – Lineage for pipelines and ETL
- multi-source-data-conflation – Track source of merged data
- systems-decompose – Plan lineage as part of feature design