elixir-otp-concurrency

📁 vinnie357/claude-skills 📅 6 days ago
3
总安装量
3
周安装量
#61281
全站排名
安装命令
npx skills add https://github.com/vinnie357/claude-skills --skill elixir-otp-concurrency

Agent 安装分布

opencode 3
gemini-cli 3
claude-code 3
github-copilot 3
codex 3
kimi-cli 3

Skill 文档

Elixir OTP and Concurrency

This skill activates when working with OTP behaviors, building concurrent systems, managing processes, or implementing fault-tolerant architectures in Elixir.

When to Use This Skill

Activate when:

  • Implementing GenServer, GenStage, Supervisor, or other OTP behaviors
  • Designing supervision trees and fault-tolerance strategies
  • Working with Tasks, Agents, or process management
  • Building concurrent or distributed systems
  • Managing application state
  • Troubleshooting process-related issues

OTP Behaviors

GenServer – Generic Server

Use GenServer for stateful processes:

defmodule MyApp.Counter do
  use GenServer

  # Client API

  def start_link(initial_value) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end

  def increment do
    GenServer.call(__MODULE__, :increment)
  end

  def get_value do
    GenServer.call(__MODULE__, :get)
  end

  # Server Callbacks

  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end

  @impl true
  def handle_call(:increment, _from, state) do
    {:reply, state + 1, state + 1}
  end

  @impl true
  def handle_call(:get, _from, state) do
    {:reply, state, state}
  end
end

GenServer Best Practices

  • Use call for synchronous requests that need a response
  • Use cast for asynchronous fire-and-forget messages
  • Use handle_info for receiving regular messages
  • Keep server callbacks fast – delegate heavy work to Tasks
  • Name processes with via tuples or Registry for dynamic naming
  • Implement timeouts to prevent client processes from hanging

GenServer Patterns

Background Work:

def init(state) do
  schedule_work()
  {:ok, state}
end

def handle_info(:work, state) do
  do_work(state)
  schedule_work()
  {:noreply, state}
end

defp schedule_work do
  Process.send_after(self(), :work, 5000)
end

State Timeouts:

def handle_call(:get, _from, state) do
  {:reply, state, state, {:state_timeout, 30_000, :cleanup}}
end

def handle_state_timeout(:cleanup, state) do
  {:stop, :normal, state}
end

Supervisor – Process Supervision

Build supervision trees for fault tolerance:

defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Database connection pool
      {MyApp.Repo, []},

      # PubSub system
      {Phoenix.PubSub, name: MyApp.PubSub},

      # Custom supervisor
      {MyApp.WorkerSupervisor, []},

      # Individual workers
      {MyApp.Cache, []},
      {MyApp.RateLimiter, []},

      # Web endpoint
      MyAppWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Supervision Strategies

:one_for_one – If a child dies, only that child is restarted

Supervisor.start_link(children, strategy: :one_for_one)

:one_for_all – If any child dies, all children are terminated and restarted

Supervisor.start_link(children, strategy: :one_for_all)

:rest_for_one – If a child dies, it and all children started after it are restarted

Supervisor.start_link(children, strategy: :rest_for_one)

Dynamic Supervisors

For dynamically creating processes:

defmodule MyApp.WorkerSupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_worker(args) do
    spec = {MyApp.Worker, args}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end

  @impl true
  def init(_init_arg) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
end

Restart Strategies

Configure child restart behavior:

children = [
  # Always restart (default)
  {MyApp.CriticalWorker, restart: :permanent},

  # Never restart
  {MyApp.OneTimeTask, restart: :temporary},

  # Only restart on abnormal exit
  {MyApp.OptionalWorker, restart: :transient}
]

Task – Concurrent Work

Fire-and-forget Tasks

For concurrent work without needing results:

Task.start(fn ->
  send_email(user, "Welcome!")
end)

Awaited Tasks

For concurrent work with results:

task = Task.async(fn ->
  expensive_computation()
end)

# Do other work...

result = Task.await(task, 5000)  # 5 second timeout

Supervised Tasks

For long-running tasks under supervision:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: MyApp.TaskSupervisor}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

# Use the supervised task
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
  long_running_operation()
end)

Concurrent Map

Process collections concurrently:

# Sequential
results = Enum.map(urls, &fetch_url/1)

# Concurrent
results = Task.async_stream(urls, &fetch_url/1, max_concurrency: 10)
         |> Enum.to_list()

Agent – Simple State Management

Use Agent for simple state:

{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)

# Get state
value = Agent.get(MyApp.Cache, fn state -> Map.get(state, :key) end)

# Update state
Agent.update(MyApp.Cache, fn state -> Map.put(state, :key, value) end)

# Get and update atomically
Agent.get_and_update(MyApp.Cache, fn state ->
  {Map.get(state, :key), Map.delete(state, :key)}
end)

When to use Agent vs GenServer:

  • Use Agent for simple key-value state
  • Use GenServer when you need complex logic, callbacks, or process lifecycle management

Process Communication

send/receive

Basic message passing:

# Send message
send(pid, {:hello, "world"})

# Receive message
receive do
  {:hello, msg} -> IO.puts(msg)
after
  5000 -> IO.puts("Timeout")
end

Process Registration

Register processes by name:

# Local registration
Process.register(self(), :my_process)
send(:my_process, :hello)

# Via Registry
{:ok, _} = Registry.start_link(keys: :unique, name: MyApp.Registry)

{:ok, pid} = GenServer.start_link(MyWorker, nil,
  name: {:via, Registry, {MyApp.Registry, "worker_1"}}
)

# Look up process
[{pid, _}] = Registry.lookup(MyApp.Registry, "worker_1")

Process Links and Monitors

Links – Bidirectional, propagate exits:

# Link processes
Process.link(pid)

# Spawn linked
spawn_link(fn -> do_work() end)

Monitors – Unidirectional, receive DOWN messages:

ref = Process.monitor(pid)

receive do
  {:DOWN, ^ref, :process, ^pid, reason} ->
    IO.puts("Process died: #{inspect(reason)}")
end

Concurrency Patterns

Pipeline Pattern

Chain operations with concurrency:

defmodule Pipeline do
  def process(data) do
    data
    |> async(&step1/1)
    |> async(&step2/1)
    |> async(&step3/1)
    |> await_all()
  end

  defp async(input, fun) do
    Task.async(fn -> fun.(input) end)
  end

  defp await_all(tasks) when is_list(tasks) do
    Enum.map(tasks, &Task.await/1)
  end
end

Worker Pool

Implement a worker pool:

defmodule MyApp.WorkerPool do
  use GenServer

  def start_link(opts) do
    pool_size = Keyword.get(opts, :size, 10)
    GenServer.start_link(__MODULE__, pool_size, name: __MODULE__)
  end

  def execute(fun) do
    GenServer.call(__MODULE__, {:execute, fun})
  end

  @impl true
  def init(pool_size) do
    workers = for _ <- 1..pool_size do
      {:ok, pid} = Task.Supervisor.start_link()
      pid
    end

    {:ok, %{workers: workers, index: 0}}
  end

  @impl true
  def handle_call({:execute, fun}, _from, state) do
    worker = Enum.at(state.workers, state.index)
    task = Task.Supervisor.async_nolink(worker, fun)

    new_index = rem(state.index + 1, length(state.workers))
    {:reply, task, %{state | index: new_index}}
  end
end

Backpressure with GenStage

For producer-consumer pipelines:

defmodule Producer do
  use GenStage

  def start_link(initial) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(initial) do
    {:producer, initial}
  end

  def handle_demand(demand, state) do
    events = Enum.to_list(state..state + demand - 1)
    {:noreply, events, state + demand}
  end
end

defmodule Consumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, :ok}
  end

  def handle_events(events, _from, state) do
    Enum.each(events, &process_event/1)
    {:noreply, [], state}
  end
end

ETS – Erlang Term Storage

In-memory key-value storage:

# Create table
:ets.new(:my_table, [:named_table, :public, read_concurrency: true])

# Insert
:ets.insert(:my_table, {:key, "value"})

# Lookup
[{:key, value}] = :ets.lookup(:my_table, :key)

# Delete
:ets.delete(:my_table, :key)

# Match patterns
:ets.match(:my_table, {:"$1", "value"})

# Iterate
:ets.foldl(fn {k, v}, acc -> [{k, v} | acc] end, [], :my_table)

ETS Best Practices

  • Use read_concurrency: true for read-heavy workloads
  • Use write_concurrency: true for write-heavy workloads
  • Prefer :set (default) for unique keys
  • Use :bag or :duplicate_bag for multiple values per key
  • Always own ETS tables in a GenServer or Supervisor to prevent data loss

Error Handling and Fault Tolerance

Let It Crash Philosophy

Design for failure:

# Don't do defensive programming
def process_order(order_id) do
  # Let it crash if order doesn't exist
  order = Repo.get!(Order, order_id)

  # Let it crash if validation fails
  {:ok, processed} = process(order)

  processed
end

Proper Error Handling

When to handle errors vs let crash:

# Handle expected errors
def fetch_user(id) do
  case HTTPoison.get("#{@api_url}/users/#{id}") do
    {:ok, %{status_code: 200, body: body}} ->
      Jason.decode(body)

    {:ok, %{status_code: 404}} ->
      {:error, :not_found}

    {:ok, %{status_code: status}} ->
      {:error, {:unexpected_status, status}}

    {:error, reason} ->
      {:error, {:network_error, reason}}
  end
end

# Let unexpected errors crash
def update_user!(id, params) do
  user = Repo.get!(User, id)  # Crash if not found

  user
  |> User.changeset(params)
  |> Repo.update!()  # Crash if invalid
end

Circuit Breaker

Prevent cascading failures:

defmodule CircuitBreaker do
  use GenServer

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{status: :closed, failures: 0}, name: __MODULE__)
  end

  def call(fun) do
    case GenServer.call(__MODULE__, :status) do
      :open -> {:error, :circuit_open}
      :closed -> execute(fun)
    end
  end

  defp execute(fun) do
    try do
      result = fun.()
      GenServer.cast(__MODULE__, :success)
      {:ok, result}
    rescue
      e ->
        GenServer.cast(__MODULE__, :failure)
        {:error, e}
    end
  end

  @impl true
  def init(state), do: {:ok, state}

  @impl true
  def handle_call(:status, _from, state) do
    {:reply, state.status, state}
  end

  @impl true
  def handle_cast(:success, state) do
    {:noreply, %{state | failures: 0, status: :closed}}
  end

  @impl true
  def handle_cast(:failure, state) do
    new_failures = state.failures + 1

    if new_failures >= 5 do
      Process.send_after(self(), :half_open, 30_000)
      {:noreply, %{state | failures: new_failures, status: :open}}
    else
      {:noreply, %{state | failures: new_failures}}
    end
  end

  @impl true
  def handle_info(:half_open, state) do
    {:noreply, %{state | status: :closed, failures: 0}}
  end
end

Testing Concurrent Systems

Testing GenServers

defmodule MyApp.CounterTest do
  use ExUnit.Case, async: true

  test "increments counter" do
    {:ok, pid} = MyApp.Counter.start_link(0)

    assert MyApp.Counter.increment(pid) == 1
    assert MyApp.Counter.increment(pid) == 2
    assert MyApp.Counter.get_value(pid) == 2
  end
end

Testing Asynchronous Processes

test "process receives message" do
  parent = self()

  spawn(fn ->
    receive do
      :ping -> send(parent, :pong)
    end
  end)

  send(pid, :ping)

  assert_receive :pong, 1000
end

Testing Supervision

test "supervisor restarts crashed worker" do
  {:ok, sup} = Supervisor.start_link([MyApp.Worker], strategy: :one_for_one)

  [{_, worker_pid, _, _}] = Supervisor.which_children(sup)

  # Crash the worker
  Process.exit(worker_pid, :kill)

  # Wait for restart
  Process.sleep(100)

  # Verify new worker started
  [{_, new_pid, _, _}] = Supervisor.which_children(sup)
  assert new_pid != worker_pid
  assert Process.alive?(new_pid)
end

Debugging Concurrent Systems

Observer

Launch Observer for visual process inspection:

:observer.start()

Process Info

Inspect running processes:

# List all processes
Process.list()

# Process information
Process.info(pid)

# Message queue length
{:message_queue_len, count} = Process.info(pid, :message_queue_len)

# Current function
{:current_function, {mod, fun, arity}} = Process.info(pid, :current_function)

Tracing

Use :sys module for debugging:

# Enable tracing
:sys.trace(pid, true)

# Get state
:sys.get_state(pid)

# Get status
:sys.get_status(pid)

Performance Considerations

Process Spawning

  • Processes are lightweight (< 2KB overhead)
  • Spawning thousands/millions of processes is normal
  • Use process pools when spawn rate is very high

Message Passing

  • Messages are copied between processes
  • Large messages are expensive – consider ETS or persistent_term
  • Use binary for efficient large data transfer

Bottlenecks

  • Single GenServer can become bottleneck
  • Solution: shard state across multiple processes
  • Use ETS with read_concurrency for read-heavy workloads

Key Principles

  • Embrace concurrency: Use processes liberally, they’re cheap
  • Let it crash: Don’t write defensive code, use supervision
  • Isolate failures: Design supervision trees to contain failures
  • Communicate via messages: Avoid shared state between processes
  • Use the right tool: GenServer for state, Task for work, Agent for simple state
  • Test at boundaries: Test process APIs, not internal implementation
  • Monitor and observe: Use Observer and logging to understand system behavior