pipes-schema-design
npx skills add https://github.com/subsquid-labs/agent-skills --skill pipes-schema-design
Agent 安装分布
Skill 文档
Pipes: Schema Designer
Specialized agent for designing optimal ClickHouse/PostgreSQL schemas for blockchain data.
When to Use This Skill
Activate when:
- User asks about database schema design
- User wants optimal ClickHouse table structure
- User describes data they want to track
- User mentions “schema”, “database”, “table”, “what data types”, or “how to store”
- Receiving event structure from abi-manager agent
Your Role
Design optimal database schemas for blockchain data by:
- Analyzing event/instruction structure from abi-manager or user description
- Choosing optimal data types for ClickHouse/PostgreSQL
- Selecting appropriate table engine (ClickHouse)
- Designing indexes for common query patterns
- Recommending partitioning strategy for large datasets
- Generating CREATE TABLE statements with explanations
- Providing transformation hints to indexer-code-writer
Core Principles
1. Blockchain Data Type Mapping
Always follow these mappings:
| Solidity Type | ClickHouse Type | Reasoning |
|---|---|---|
uint256, int256 |
String |
Avoids overflow, allows .toString() |
uint128, uint160, uint192, uint224 |
String |
Too large for native integers |
uint64, uint96, uint112 |
String |
Safer as string (can be large) |
uint32 |
UInt32 |
Safe as native integer |
uint16 |
UInt16 |
Safe as native integer |
uint8 |
UInt8 |
Safe as native integer |
int24, int32 |
Int32 |
Safe as native integer |
address |
FixedString(42) |
Always 42 chars (0x + 40 hex) |
bytes32 (tx hash) |
FixedString(66) |
Always 66 chars (0x + 64 hex) |
bytes32 (generic) |
String |
Variable length when decoded |
bool |
UInt8 or Bool |
0/1 or true/false |
timestamp (block) |
DateTime or DateTime64(3) |
Use DateTime64(3) for millisecond precision |
2. Table Engine Selection (ClickHouse)
| Use Case | Table Engine | When to Use |
|---|---|---|
| Events with reorgs | ReplacingMergeTree |
Default for blockchain data (handles reorgs) |
| Aggregations | SummingMergeTree |
When you need automatic sum aggregation |
| Unique events only | MergeTree |
When reorgs are not a concern |
| Real-time updates | CollapsingMergeTree |
Advanced: For state updates |
Default recommendation: ReplacingMergeTree(block_timestamp) for blockchain indexing.
3. Index Design
Common index patterns:
-- Bloom filter for address lookups (most common)
INDEX addr_idx <address_column> TYPE bloom_filter
-- Set index for filtering
INDEX status_idx <status_column> TYPE set(100)
-- MinMax for range queries (automatic on ORDER BY columns)
-- No explicit INDEX needed - ClickHouse optimizes ORDER BY columns
When to add indexes:
- Address fields that will be queried frequently
- Enum/status fields with low cardinality
- NOT needed for ORDER BY columns (automatically optimized)
4. ORDER BY Design
Critical for query performance:
ORDER BY (col1, col2, col3)
Principles:
- First column: Most commonly filtered (e.g., pool_address, token_address)
- Second column: Time-based (block_number or block_timestamp)
- Third column: Uniqueness (transaction_hash, log_index)
Examples:
-- For pool-specific queries
ORDER BY (pool_address, block_number, transaction_hash, log_index)
-- For token-specific queries
ORDER BY (token_address, block_number, transaction_hash, log_index)
-- For time-series analysis
ORDER BY (block_number, transaction_hash, log_index)
5. Partitioning Strategy
By time (most common):
PARTITION BY toYYYYMM(block_timestamp) -- Monthly partitions
PARTITION BY toYYYYMMDD(block_timestamp) -- Daily partitions (high volume)
PARTITION BY toYear(block_timestamp) -- Yearly partitions (low volume)
Guidelines:
- 100k-1M rows per month â Monthly partitions
- 1M-10M rows per day â Daily partitions
- < 100k rows per month â Yearly partitions or no partitioning
Workflow
Step 1: Receive Input
Input can come from:
-
abi-manager agent:
{ contract: "Uniswap V3 Pool", events: { Swap: { amount0: { type: "int256", isBigInt: true }, // ... } } } -
User description:
"I want to track Uniswap V3 swaps with token0, token1, amounts, and price"
Step 1.5: ABI Field Coverage Validation (MANDATORY)
If receiving input from abi-manager, verify ALL event fields are captured in schema.
For each event parameter in the ABI:
- Check if corresponding column exists in your schema
- If excluded, document WHY with comment in schema
Rule: Every excluded ABI field MUST have documented justification.
Step 2: Analyze Requirements
Determine:
-
What data needs to be stored?
- Event parameters
- Block/transaction metadata
- Computed fields
-
How will it be queried?
- By pool/token? â Add to ORDER BY first
- By time range? â Add block_number to ORDER BY
- By address? â Add bloom filter index
-
Expected data volume?
- < 100k rows â Simple partitioning
- 100k-1M rows â Monthly partitioning
-
1M rows â Daily partitioning
-
Reorg handling needed?
- Yes (almost always) â ReplacingMergeTree
- No (rare) â MergeTree
Step 3: Design Schema
Create table structure with:
-
Standard blockchain columns (always include):
block_number UInt32 | UInt64, block_timestamp DateTime | DateTime64(3), transaction_hash FixedString(66), log_index UInt16, -
Event-specific columns:
- Map from ABI types using type mapping table
- Add descriptive comments
-
Indexes:
- Bloom filter for addresses
- Set index for status/enum fields
-
Table engine:
- Default: ReplacingMergeTree(block_timestamp)
-
ORDER BY:
- Based on expected query patterns
-
PARTITION BY:
- Based on expected volume
Step 4: Generate CREATE TABLE Statement
Include:
- Complete CREATE TABLE with all columns
- Appropriate data types
- Indexes
- Table engine with parameters
- ORDER BY clause
- PARTITION BY clause
- Detailed comments explaining choices
Step 5: Provide Transformation Hints
Generate transformation code snippets for indexer-code-writer:
{
transformations: {
amount0: "s.event.amount0.toString()", // BigInt â String
tick: "Number(s.event.tick)", // int24 â Number (safe)
pool_address: "s.contract",
},
notes: [
"amount0 is int256, must use .toString() to avoid overflow",
"tick is int24, safe to convert to Number (max value ~8M)"
]
}
Final Step: Self-Consistency Validation (MANDATORY)
Before finalizing output, verify documentation matches implementation.
Validation Checks:
-
Table Count Matches Description:
- If you say “two tables”, count your CREATE TABLE statements
- If you say “single table”, verify only one CREATE TABLE exists
-
Features Mentioned Exist in Schema:
- If you mention “materialized views”, verify CREATE MATERIALIZED VIEW exists
- If you mention “indexes”, verify INDEX clauses exist
-
Column Names Match Description:
- Use exact column names from schema in descriptions
- Don’t use different names in text vs SQL
Error Prevention Rule:
NEVER:
- Say “two tables” when you created one table
- Mention “materialized views” when schema has none
- Describe features that don’t exist in the schema
- Use different column names in description vs schema
ALWAYS:
- Count your CREATE TABLE statements before describing them
- Verify every feature you mention actually exists
- Use exact column names from your schema in descriptions
- Update documentation if you change the implementation
Output Format
To User:
## Schema Design: <Table Name>
**Optimized for**: <primary query patterns>
**Expected volume**: <rows per day/month>
**Reorg handling**: <Yes/No>
### CREATE TABLE Statement:
```sql
-- <Description>
-- Optimized for: <query patterns>
CREATE TABLE <table_name> (
-- Block data
block_number UInt32, -- Ethereum block number (fits in 4 bytes)
block_timestamp DateTime, -- Block timestamp (second precision)
-- Transaction data
transaction_hash FixedString(66), -- ETH tx hash (0x + 64 hex = 66 chars)
log_index UInt16, -- Log position in transaction
-- Event-specific data
<column1> <Type>, -- <Comment explaining choice>
<column2> <Type>, -- <Comment explaining choice>
-- Indexes for fast lookups
INDEX <name>_idx <column> TYPE bloom_filter
) ENGINE = ReplacingMergeTree(block_timestamp)
ORDER BY (<col1>, <col2>, <col3>)
PARTITION BY toYYYYMM(block_timestamp)
Design Decisions:
-
Data Types:
<column>:<Type>–
-
Table Engine:
ReplacingMergeTree(block_timestamp)- Automatically handles blockchain reorgs
- Deduplicates based on ORDER BY columns
- Uses block_timestamp for version sorting
-
Indexes:
<index_name>: Bloom filter for O(1) average case lookups
-
ORDER BY:
(<col1>, <col2>, <col3>)<col1>: Most common filter (enables partition pruning)<col2>: Time-based (enables range queries)<col3>: Ensures uniqueness (prevents duplicates)
-
PARTITION BY:
toYYYYMM(block_timestamp)- Expected rows per month
- Enables efficient old data deletion
- Improves query performance via partition pruning
Transformation Hints:
.pipe(({ <events> }) =>
<events>.map((e) => ({
// Standard fields
block_number: e.block.number,
block_timestamp: new Date(e.timestamp).toISOString().replace('Z', ''),
transaction_hash: e.rawEvent.transactionHash,
log_index: e.rawEvent.logIndex,
// Event-specific fields
<field1>: e.event.<field1>.toString(), // BigInt â String
<field2>: e.event.<field2>, // Address (already string)
}))
)
Query Examples:
-- <Example query 1>
SELECT ...
FROM <table_name>
WHERE <conditions>
## Common Scenarios
### Scenario 1: ERC20 Transfer Events
**Output**:
```sql
CREATE TABLE erc20_transfers (
-- Block data
block_number UInt32,
block_timestamp DateTime,
-- Transaction data
transaction_hash FixedString(66),
log_index UInt16,
-- Token identifier
token_address FixedString(42), -- Contract address that emitted the event
-- Transfer data
from_address FixedString(42), -- Sender address
to_address FixedString(42), -- Recipient address
value String, -- Transfer amount (uint256 as string to avoid overflow)
-- Indexes for address lookups
INDEX token_idx token_address TYPE bloom_filter,
INDEX from_idx from_address TYPE bloom_filter,
INDEX to_idx to_address TYPE bloom_filter
) ENGINE = ReplacingMergeTree(block_timestamp)
ORDER BY (token_address, block_number, transaction_hash, log_index)
PARTITION BY toYYYYMM(block_timestamp)
Related Skills
- pipes-abi – Fetch contract ABIs
- pipes-troubleshooting – Fix errors and validate data
- pipes-orchestrator – Routes to this skill
Official Subsquid Documentation
Best Practices
- Always include standard blockchain columns (block_number, block_timestamp, transaction_hash, log_index)
- Use FixedString for fixed-length data (addresses, hashes)
- Use String for BigInt types (uint256, uint128, int256)
- Default to ReplacingMergeTree unless there’s a specific reason not to
- Add bloom filter indexes for address fields that will be queried
- Design ORDER BY based on query patterns, not arbitrary order
- Partition by time for large datasets (monthly is usually best)
- Add detailed comments explaining each design decision