sling-pipelines
1
总安装量
1
周安装量
#51435
全站排名
安装命令
npx skills add https://github.com/slingdata-io/slingdata-ai --skill sling-pipelines
Agent 安装分布
opencode
1
codex
1
github-copilot
1
claude-code
1
Skill 文档
Pipelines
Pipelines are multi-step workflows that execute a sequence of tasks with control flow.
When to Use Pipelines
- Run tasks before/after replications
- Chain multiple replications
- Implement data validation
- Send notifications
- Manage file operations
Basic Structure
env:
MY_VAR: "value"
steps:
- type: log
message: "Starting pipeline"
- type: replication
path: /path/to/replication.yaml
id: main_repl
- type: query
connection: MY_DB
query: "SELECT COUNT(*) FROM table"
if: state.main_repl.status == "success"
MCP Operations
Parse
{"action": "parse", "input": {"file_path": "/path/to/pipeline.yaml"}}
Run
{
"action": "run",
"input": {
"file_path": "/path/to/pipeline.yaml",
"env": {"MY_VAR": "override"}
}
}
Step Types
| Type | Description |
|---|---|
log |
Output messages |
run |
Simple data transfer |
replication |
Run replication file |
query |
Execute SQL |
http |
HTTP requests |
command |
Shell commands |
check |
Validate conditions |
copy |
Copy files |
delete |
Delete files |
write |
Write to files |
read |
Read file contents |
list |
List files |
store |
Store values |
group |
Group steps, enable looping |
Common Steps
log
- type: log
level: info # debug, info, warn, error
message: "Processing {env.MY_VAR}"
run (simple transfer)
- type: run
source: "MY_POSTGRES.public.users"
target: "file://users.csv"
mode: "full-refresh"
replication
- type: replication
path: /path/to/replication.yaml
select_streams: ["users", "orders"]
mode: "incremental"
id: my_repl
query
- type: query
connection: MY_POSTGRES
query: |
SELECT COUNT(*) as cnt FROM users
WHERE created_at > NOW() - INTERVAL '1 day'
into: result
id: count_query
http
- type: http
url: "https://api.example.com/webhook"
method: POST
headers:
Authorization: "Bearer {env.API_TOKEN}"
payload: |
{"status": "{state.my_repl.status}"}
into: response
command
- type: command
command: "python validate.py {store.file_path}"
working_dir: "/scripts"
into: output
check
- type: check
check: "state.count_query.result[0].cnt > 0"
failure_message: "No records found"
on_failure: abort
copy
- type: copy
from: "file://data/today/"
to: "s3://bucket/archive/{YYYY}/{MM}/{DD}/"
recursive: true
store
- type: store
key: my_value
value: "something"
# Later: {store.my_value}
group (with loop)
- type: group
loop: ["users", "products", "orders"]
steps:
- type: log
message: "Processing: {loop.value}"
- type: run
source: "POSTGRES.public.{loop.value}"
target: "SNOWFLAKE.staging.{loop.value}"
Control Flow
Conditional Execution
- type: replication
path: main.yaml
id: main_job
- type: http
url: "https://slack.com/webhook"
payload: '{"text": "Success!"}'
if: state.main_job.status == "success"
- type: http
url: "https://slack.com/webhook"
payload: '{"text": "Failed!"}'
if: state.main_job.status == "error"
Looping
- type: list
location: "s3://bucket/data/"
id: files
- type: group
loop: state.files.result
steps:
- type: log
message: "Processing: {loop.value.name}"
- type: run
source: "s3://bucket/{loop.value.path}"
target: "POSTGRES.staging.imported"
Error Handling
| Option | Behavior |
|---|---|
abort |
Stop pipeline (default) |
warn |
Log warning, continue |
quiet |
Silent, continue |
skip |
Skip step, continue |
break |
Exit current group only |
- type: delete
location: "file:///tmp/old/"
on_failure: warn # Don't fail if files don't exist
Variables
| Variable | Description |
|---|---|
{env.VAR} |
Environment variables |
{store.key} |
Stored values |
{state.id.*} |
Step results by ID |
{timestamp.date} |
Current date |
{loop.value} |
Current loop item |
{loop.index} |
Loop iteration index |
Complete Example
env:
SLACK_WEBHOOK: "${SLACK_WEBHOOK_URL}"
steps:
- type: log
message: "Starting data pipeline"
- type: replication
path: replications/main.yaml
id: main_sync
- type: check
check: state.main_sync.status == "success"
on_failure: break
- type: query
connection: TARGET_DB
query: "CALL refresh_materialized_views()"
- type: http
url: "{env.SLACK_WEBHOOK}"
method: POST
payload: |
{"text": "Pipeline completed: {state.main_sync.total_rows} rows"}
- type: log
message: "Pipeline finished"
Full Documentation
See https://docs.slingdata.io/concepts/pipeline.md for complete reference.