sling-pipelines

📁 slingdata-io/slingdata-ai 📅 9 days ago
1
总安装量
1
周安装量
#51435
全站排名
安装命令
npx skills add https://github.com/slingdata-io/slingdata-ai --skill sling-pipelines

Agent 安装分布

opencode 1
codex 1
github-copilot 1
claude-code 1

Skill 文档

Pipelines

Pipelines are multi-step workflows that execute a sequence of tasks with control flow.

When to Use Pipelines

  • Run tasks before/after replications
  • Chain multiple replications
  • Implement data validation
  • Send notifications
  • Manage file operations

Basic Structure

env:
  MY_VAR: "value"

steps:
  - type: log
    message: "Starting pipeline"

  - type: replication
    path: /path/to/replication.yaml
    id: main_repl

  - type: query
    connection: MY_DB
    query: "SELECT COUNT(*) FROM table"
    if: state.main_repl.status == "success"

MCP Operations

Parse

{"action": "parse", "input": {"file_path": "/path/to/pipeline.yaml"}}

Run

{
  "action": "run",
  "input": {
    "file_path": "/path/to/pipeline.yaml",
    "env": {"MY_VAR": "override"}
  }
}

Step Types

Type Description
log Output messages
run Simple data transfer
replication Run replication file
query Execute SQL
http HTTP requests
command Shell commands
check Validate conditions
copy Copy files
delete Delete files
write Write to files
read Read file contents
list List files
store Store values
group Group steps, enable looping

Common Steps

log

- type: log
  level: info  # debug, info, warn, error
  message: "Processing {env.MY_VAR}"

run (simple transfer)

- type: run
  source: "MY_POSTGRES.public.users"
  target: "file://users.csv"
  mode: "full-refresh"

replication

- type: replication
  path: /path/to/replication.yaml
  select_streams: ["users", "orders"]
  mode: "incremental"
  id: my_repl

query

- type: query
  connection: MY_POSTGRES
  query: |
    SELECT COUNT(*) as cnt FROM users
    WHERE created_at > NOW() - INTERVAL '1 day'
  into: result
  id: count_query

http

- type: http
  url: "https://api.example.com/webhook"
  method: POST
  headers:
    Authorization: "Bearer {env.API_TOKEN}"
  payload: |
    {"status": "{state.my_repl.status}"}
  into: response

command

- type: command
  command: "python validate.py {store.file_path}"
  working_dir: "/scripts"
  into: output

check

- type: check
  check: "state.count_query.result[0].cnt > 0"
  failure_message: "No records found"
  on_failure: abort

copy

- type: copy
  from: "file://data/today/"
  to: "s3://bucket/archive/{YYYY}/{MM}/{DD}/"
  recursive: true

store

- type: store
  key: my_value
  value: "something"
# Later: {store.my_value}

group (with loop)

- type: group
  loop: ["users", "products", "orders"]
  steps:
    - type: log
      message: "Processing: {loop.value}"
    - type: run
      source: "POSTGRES.public.{loop.value}"
      target: "SNOWFLAKE.staging.{loop.value}"

Control Flow

Conditional Execution

- type: replication
  path: main.yaml
  id: main_job

- type: http
  url: "https://slack.com/webhook"
  payload: '{"text": "Success!"}'
  if: state.main_job.status == "success"

- type: http
  url: "https://slack.com/webhook"
  payload: '{"text": "Failed!"}'
  if: state.main_job.status == "error"

Looping

- type: list
  location: "s3://bucket/data/"
  id: files

- type: group
  loop: state.files.result
  steps:
    - type: log
      message: "Processing: {loop.value.name}"
    - type: run
      source: "s3://bucket/{loop.value.path}"
      target: "POSTGRES.staging.imported"

Error Handling

Option Behavior
abort Stop pipeline (default)
warn Log warning, continue
quiet Silent, continue
skip Skip step, continue
break Exit current group only
- type: delete
  location: "file:///tmp/old/"
  on_failure: warn  # Don't fail if files don't exist

Variables

Variable Description
{env.VAR} Environment variables
{store.key} Stored values
{state.id.*} Step results by ID
{timestamp.date} Current date
{loop.value} Current loop item
{loop.index} Loop iteration index

Complete Example

env:
  SLACK_WEBHOOK: "${SLACK_WEBHOOK_URL}"

steps:
  - type: log
    message: "Starting data pipeline"

  - type: replication
    path: replications/main.yaml
    id: main_sync

  - type: check
    check: state.main_sync.status == "success"
    on_failure: break

  - type: query
    connection: TARGET_DB
    query: "CALL refresh_materialized_views()"

  - type: http
    url: "{env.SLACK_WEBHOOK}"
    method: POST
    payload: |
      {"text": "Pipeline completed: {state.main_sync.total_rows} rows"}

  - type: log
    message: "Pipeline finished"

Full Documentation

See https://docs.slingdata.io/concepts/pipeline.md for complete reference.