elixir-ecto
npx skills add https://github.com/vircung/opencode-config --skill elixir-ecto
Agent 安装分布
Skill 文档
Elixir Ecto Skill
Schema Design
Basic Schema Definition
Schema Structure:
defmodule MyApp.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "users" do
field :email, :string
field :name, :string
field :age, :integer
field :is_active, :boolean, default: true
field :password, :string, virtual: true
field :password_hash, :string
field :profile_data, :map
has_many :posts, MyApp.Blog.Post
has_one :profile, MyApp.Accounts.Profile
many_to_many :roles, MyApp.Accounts.Role, join_through: "user_roles"
timestamps()
end
def changeset(user, attrs) do
user
|> cast(attrs, [:email, :name, :age, :password])
|> validate_required([:email, :name])
|> validate_format(:email, ~r/^[^\s]+@[^\s]+$/, message: "must have the @ sign and no spaces")
|> validate_length(:name, min: 2, max: 100)
|> validate_number(:age, greater_than: 0, less_than: 150)
|> unique_constraint(:email)
|> put_password_hash()
end
defp put_password_hash(%{valid?: true, changes: %{password: password}} = changeset) do
put_change(changeset, :password_hash, Argon2.hash_pwd_salt(password))
end
defp put_password_hash(changeset), do: changeset
end
Virtual Fields and Computed Values
Virtual Fields for Input Processing:
defmodule MyApp.Accounts.Profile do
use Ecto.Schema
import Ecto.Changeset
schema "profiles" do
field :first_name, :string
field :last_name, :string
field :full_name, :string, virtual: true
field :birth_date, :date
field :age, :integer, virtual: true
belongs_to :user, MyApp.Accounts.User
timestamps()
end
def changeset(profile, attrs) do
profile
|> cast(attrs, [:first_name, :last_name, :birth_date, :full_name])
|> validate_required([:first_name, :last_name])
|> put_full_name()
|> put_age()
end
defp put_full_name(%{changes: %{first_name: first, last_name: last}} = changeset) do
put_change(changeset, :full_name, "#{first} #{last}")
end
defp put_full_name(changeset), do: changeset
defp put_age(%{changes: %{birth_date: birth_date}} = changeset) do
age = Date.diff(Date.utc_today(), birth_date) |> div(365)
put_change(changeset, :age, age)
end
defp put_age(changeset), do: changeset
end
Embedded Schemas
For JSON/Map Fields:
defmodule MyApp.Accounts.Address do
use Ecto.Schema
import Ecto.Changeset
@primary_key false
embedded_schema do
field :street, :string
field :city, :string
field :state, :string
field :zip_code, :string
field :country, :string, default: "US"
end
def changeset(address, attrs) do
address
|> cast(attrs, [:street, :city, :state, :zip_code, :country])
|> validate_required([:street, :city, :state, :zip_code])
|> validate_length(:zip_code, is: 5)
end
end
# Usage in parent schema
defmodule MyApp.Accounts.User do
use Ecto.Schema
import Ecto.Changeset
schema "users" do
field :name, :string
embeds_one :address, MyApp.Accounts.Address
embeds_many :previous_addresses, MyApp.Accounts.Address
timestamps()
end
def changeset(user, attrs) do
user
|> cast(attrs, [:name])
|> cast_embed(:address)
|> cast_embed(:previous_addresses)
end
end
Associations
Association Patterns:
# One-to-many with dependent deletion
defmodule MyApp.Blog.Post do
use Ecto.Schema
schema "posts" do
field :title, :string
field :body, :text
field :published_at, :utc_datetime
belongs_to :author, MyApp.Accounts.User
has_many :comments, MyApp.Blog.Comment, on_delete: :delete_all
has_many :post_tags, MyApp.Blog.PostTag, on_delete: :delete_all
has_many :tags, through: [:post_tags, :tag]
timestamps()
end
end
# Many-to-many with join schema
defmodule MyApp.Blog.PostTag do
use Ecto.Schema
schema "post_tags" do
belongs_to :post, MyApp.Blog.Post
belongs_to :tag, MyApp.Blog.Tag
field :created_by_id, :id
timestamps()
end
end
# Polymorphic associations
defmodule MyApp.Comments.Comment do
use Ecto.Schema
schema "comments" do
field :body, :text
field :commentable_type, :string
field :commentable_id, :id
belongs_to :user, MyApp.Accounts.User
timestamps()
end
def for_post(query \\ __MODULE__) do
from c in query, where: c.commentable_type == "post"
end
def for_user(query \\ __MODULE__) do
from c in query, where: c.commentable_type == "user"
end
end
Changesets & Validation
Advanced Changeset Patterns
Conditional Validations:
defmodule MyApp.Accounts.User do
import Ecto.Changeset
def changeset(user, attrs) do
user
|> cast(attrs, [:email, :role, :company_id, :freelance_rate])
|> validate_required([:email, :role])
|> validate_role_specific_fields()
|> validate_business_rules()
end
defp validate_role_specific_fields(changeset) do
case get_field(changeset, :role) do
:employee ->
changeset
|> validate_required([:company_id])
|> validate_exclusion(:freelance_rate, [nil], message: "must be empty for employees")
:freelancer ->
changeset
|> validate_required([:freelance_rate])
|> validate_number(:freelance_rate, greater_than: 0)
|> validate_exclusion(:company_id, [nil], message: "must be empty for freelancers")
_ -> changeset
end
end
defp validate_business_rules(changeset) do
changeset
|> validate_email_domain()
|> validate_unique_email_per_company()
end
defp validate_email_domain(changeset) do
email = get_change(changeset, :email)
if email && String.ends_with?(email, "@competitor.com") do
add_error(changeset, :email, "competitor email addresses not allowed")
else
changeset
end
end
end
Custom Validators:
defmodule MyApp.Validators do
import Ecto.Changeset
def validate_password_strength(changeset, field) do
validate_change changeset, field, fn _, password ->
cond do
String.length(password) < 8 ->
[{field, "must be at least 8 characters"}]
not Regex.match?(~r/[A-Z]/, password) ->
[{field, "must contain uppercase letter"}]
not Regex.match?(~r/[a-z]/, password) ->
[{field, "must contain lowercase letter"}]
not Regex.match?(~r/[0-9]/, password) ->
[{field, "must contain number"}]
true -> []
end
end
end
def validate_phone_number(changeset, field) do
validate_change changeset, field, fn _, phone ->
# Remove all non-digits
digits = Regex.replace(~r/\D/, phone, "")
case String.length(digits) do
10 -> []
11 when String.starts_with?(digits, "1") -> []
_ -> [{field, "must be a valid phone number"}]
end
end
end
def validate_future_date(changeset, field) do
validate_change changeset, field, fn _, date ->
if Date.compare(date, Date.utc_today()) == :gt do
[]
else
[{field, "must be in the future"}]
end
end
end
end
Nested Changesets
Complex Form Handling:
defmodule MyApp.Orders.Order do
use Ecto.Schema
import Ecto.Changeset
schema "orders" do
field :total_amount, :decimal
field :status, :string, default: "pending"
belongs_to :customer, MyApp.Accounts.User
has_many :line_items, MyApp.Orders.LineItem, on_delete: :delete_all
timestamps()
end
def changeset(order, attrs) do
order
|> cast(attrs, [:customer_id])
|> cast_assoc(:line_items, required: true)
|> validate_required([:customer_id])
|> calculate_total()
|> validate_number(:total_amount, greater_than: 0)
end
defp calculate_total(changeset) do
case get_change(changeset, :line_items) do
nil -> changeset
line_items_changesets ->
total =
line_items_changesets
|> Enum.map(&get_field(&1, :amount))
|> Enum.sum()
put_change(changeset, :total_amount, total)
end
end
end
defmodule MyApp.Orders.LineItem do
use Ecto.Schema
import Ecto.Changeset
schema "line_items" do
field :quantity, :integer
field :unit_price, :decimal
field :amount, :decimal
belongs_to :order, MyApp.Orders.Order
belongs_to :product, MyApp.Catalog.Product
timestamps()
end
def changeset(line_item, attrs) do
line_item
|> cast(attrs, [:product_id, :quantity, :unit_price])
|> validate_required([:product_id, :quantity, :unit_price])
|> validate_number(:quantity, greater_than: 0)
|> validate_number(:unit_price, greater_than: 0)
|> calculate_amount()
end
defp calculate_amount(changeset) do
case {get_field(changeset, :quantity), get_field(changeset, :unit_price)} do
{qty, price} when is_integer(qty) and not is_nil(price) ->
amount = Decimal.mult(price, qty)
put_change(changeset, :amount, amount)
_ -> changeset
end
end
end
Querying Patterns
Basic Query Composition
Query Building:
defmodule MyApp.Blog do
import Ecto.Query
alias MyApp.Repo
alias MyApp.Blog.Post
def list_posts(opts \\ []) do
Post
|> filter_by_status(opts[:status])
|> filter_by_author(opts[:author_id])
|> search_by_title(opts[:search])
|> order_by_date(opts[:order])
|> paginate(opts[:page], opts[:per_page])
|> Repo.all()
|> preload_associations()
end
defp filter_by_status(query, nil), do: query
defp filter_by_status(query, status) do
from p in query, where: p.status == ^status
end
defp filter_by_author(query, nil), do: query
defp filter_by_author(query, author_id) do
from p in query, where: p.author_id == ^author_id
end
defp search_by_title(query, nil), do: query
defp search_by_title(query, search_term) do
like_pattern = "%#{search_term}%"
from p in query, where: ilike(p.title, ^like_pattern)
end
defp order_by_date(query, :asc) do
from p in query, order_by: [asc: p.inserted_at]
end
defp order_by_date(query, _) do
from p in query, order_by: [desc: p.inserted_at]
end
defp paginate(query, nil, _), do: query
defp paginate(query, page, per_page) do
offset = (page - 1) * per_page
from p in query, limit: ^per_page, offset: ^offset
end
defp preload_associations(posts) do
Repo.preload(posts, [:author, :tags, comments: :user])
end
end
Advanced Query Patterns
Subqueries and Aggregations:
defmodule MyApp.Analytics do
import Ecto.Query
alias MyApp.Repo
def popular_posts_with_stats do
comment_counts = from c in Comment,
group_by: c.post_id,
select: %{post_id: c.post_id, count: count(c.id)}
from p in Post,
left_join: cc in subquery(comment_counts), on: p.id == cc.post_id,
select: %{
id: p.id,
title: p.title,
comment_count: coalesce(cc.count, 0),
view_count: p.view_count
},
where: p.status == "published",
order_by: [desc: coalesce(cc.count, 0)]
end
def user_activity_summary(user_id) do
posts_query = from p in Post,
where: p.author_id == ^user_id,
select: count(p.id)
comments_query = from c in Comment,
where: c.user_id == ^user_id,
select: count(c.id)
%{
posts_count: Repo.one(posts_query),
comments_count: Repo.one(comments_query),
last_activity: last_activity(user_id)
}
end
defp last_activity(user_id) do
last_post = from p in Post,
where: p.author_id == ^user_id,
select: p.inserted_at,
order_by: [desc: p.inserted_at],
limit: 1
last_comment = from c in Comment,
where: c.user_id == ^user_id,
select: c.inserted_at,
order_by: [desc: c.inserted_at],
limit: 1
[Repo.one(last_post), Repo.one(last_comment)]
|> Enum.reject(&is_nil/1)
|> Enum.max(Date, fn -> nil end)
end
end
Preloading Strategies
Efficient Data Loading:
defmodule MyApp.Blog do
# Basic preload
def get_post_with_author(id) do
Post
|> Repo.get!(id)
|> Repo.preload(:author)
end
# Selective preload based on query
def get_post_with_comments(id, include_author: include_author) do
preloads = if include_author do
[author: [], comments: :user]
else
[comments: :user]
end
Post
|> Repo.get!(id)
|> Repo.preload(preloads)
end
# Custom preload query
def get_post_with_recent_comments(id) do
recent_comments = from c in Comment,
where: c.inserted_at > ago(7, "day"),
order_by: [desc: c.inserted_at],
preload: :user
Post
|> Repo.get!(id)
|> Repo.preload([author: [], comments: recent_comments])
end
# Prevent N+1 with join preload for aggregation
def list_posts_with_comment_counts do
from p in Post,
left_join: c in assoc(p, :comments),
group_by: p.id,
preload: [author: []],
select: %{post: p, comment_count: count(c.id)}
end
end
Dynamic Queries
Runtime Query Building:
defmodule MyApp.Search do
import Ecto.Query
def build_search_query(base_query, filters) do
Enum.reduce(filters, base_query, &apply_filter/2)
end
defp apply_filter({:status, status}, query) when status != "" do
from q in query, where: q.status == ^status
end
defp apply_filter({:date_range, %{start: start_date, end: end_date}}, query) do
from q in query,
where: q.inserted_at >= ^start_date and q.inserted_at <= ^end_date
end
defp apply_filter({:tags, tag_ids}, query) when is_list(tag_ids) and tag_ids != [] do
from q in query,
join: t in assoc(q, :tags),
where: t.id in ^tag_ids,
distinct: true
end
defp apply_filter({:search, term}, query) when is_binary(term) and term != "" do
like_term = "%#{term}%"
from q in query,
where: ilike(q.title, ^like_term) or ilike(q.body, ^like_term)
end
defp apply_filter({:author_name, name}, query) when is_binary(name) and name != "" do
from q in query,
join: a in assoc(q, :author),
where: ilike(a.name, ^"%#{name}%")
end
# Ignore unknown or empty filters
defp apply_filter(_, query), do: query
end
Migrations & Schema Evolution
Migration Patterns
Safe Migration Strategies:
defmodule MyApp.Repo.Migrations.CreateUsersTable do
use Ecto.Migration
def change do
create table(:users, primary_key: false) do
add :id, :binary_id, primary_key: true
add :email, :string, null: false
add :name, :string, null: false
add :age, :integer
add :is_active, :boolean, default: true, null: false
timestamps()
end
create unique_index(:users, [:email])
create index(:users, [:is_active])
# Add constraint for data integrity
create constraint(:users, :age_must_be_positive, check: "age > 0")
end
end
# Adding columns safely
defmodule MyApp.Repo.Migrations.AddUserProfile do
use Ecto.Migration
def change do
alter table(:users) do
add :profile_data, :map, default: "{}"
add :last_login_at, :utc_datetime
add :login_count, :integer, default: 0
end
create index(:users, [:last_login_at])
end
end
# Data migration
defmodule MyApp.Repo.Migrations.MigrateUserNames do
use Ecto.Migration
import Ecto.Query
def up do
# First add new columns
alter table(:users) do
add :first_name, :string
add :last_name, :string
end
flush()
# Migrate data
from(u in "users", select: [:id, :name])
|> MyApp.Repo.all()
|> Enum.each(fn user ->
names = String.split(user.name, " ", parts: 2)
first_name = Enum.at(names, 0)
last_name = Enum.at(names, 1, "")
from(u in "users", where: u.id == ^user.id)
|> MyApp.Repo.update_all(set: [first_name: first_name, last_name: last_name])
end)
end
def down do
alter table(:users) do
remove :first_name
remove :last_name
end
end
end
Zero-Downtime Migrations
Safe Schema Changes:
# Step 1: Add new nullable column
defmodule MyApp.Repo.Migrations.AddNewEmailColumn do
use Ecto.Migration
def change do
alter table(:users) do
add :new_email, :string
end
create index(:users, [:new_email])
end
end
# Step 2: Deploy code that writes to both columns
# Step 3: Backfill data
defmodule MyApp.Repo.Migrations.BackfillNewEmail do
use Ecto.Migration
import Ecto.Query
def up do
from(u in "users", where: is_nil(u.new_email))
|> MyApp.Repo.update_all(set: [new_email: fragment("email")])
end
def down, do: :ok
end
# Step 4: Add not null constraint
defmodule MyApp.Repo.Migrations.MakeNewEmailRequired do
use Ecto.Migration
def change do
alter table(:users) do
modify :new_email, :string, null: false
end
create unique_index(:users, [:new_email])
end
end
# Step 5: Deploy code that only uses new column
# Step 6: Remove old column
defmodule MyApp.Repo.Migrations.RemoveOldEmailColumn do
use Ecto.Migration
def change do
drop index(:users, [:email])
alter table(:users) do
remove :email
end
rename table(:users), :new_email, to: :email
end
end
Performance & Optimization
Query Optimization
N+1 Query Prevention:
# Bad: N+1 queries
def list_posts_bad do
posts = Repo.all(Post)
Enum.map(posts, fn post ->
# This generates one query per post!
author = Repo.get(User, post.author_id)
%{post | author: author}
end)
end
# Good: Use preload
def list_posts_good do
Post
|> Repo.all()
|> Repo.preload(:author)
end
# Good: Use join when you don't need associated data
def list_post_summaries do
from p in Post,
join: u in User, on: p.author_id == u.id,
select: %{id: p.id, title: p.title, author_name: u.name}
end
Batch Loading:
defmodule MyApp.Accounts do
def get_users_with_post_counts(user_ids) do
# Get users
users = from(u in User, where: u.id in ^user_ids) |> Repo.all()
# Get post counts in single query
post_counts =
from(p in Post,
where: p.author_id in ^user_ids,
group_by: p.author_id,
select: {p.author_id, count(p.id)})
|> Repo.all()
|> Map.new()
# Combine results
Enum.map(users, fn user ->
post_count = Map.get(post_counts, user.id, 0)
%{user | post_count: post_count}
end)
end
end
Database Indexes:
defmodule MyApp.Repo.Migrations.AddPerformanceIndexes do
use Ecto.Migration
def change do
# Composite index for common query pattern
create index(:posts, [:author_id, :status, :inserted_at])
# Partial index for active records only
create index(:users, [:email], where: "is_active = true")
# Functional index for case-insensitive searches
create index(:users, ["lower(email)"])
# GIN index for JSON column searches
create index(:posts, [:metadata], using: "GIN")
end
end
Connection Pool Management
Pool Configuration:
# config/config.exs
config :my_app, MyApp.Repo,
pool_size: String.to_integer(System.get_env("DB_POOL_SIZE") || "10"),
queue_target: 5000,
queue_interval: 5000,
timeout: 15_000,
pool_timeout: 5_000
# For read replicas
config :my_app, MyApp.ReadRepo,
pool_size: 5,
priv: "priv/repo" # Share migration directory
Transaction Management:
defmodule MyApp.Orders do
alias MyApp.Repo
alias Ecto.Multi
def create_order_with_payment(order_attrs, payment_attrs) do
Multi.new()
|> Multi.insert(:order, Order.changeset(%Order{}, order_attrs))
|> Multi.run(:payment, fn repo, %{order: order} ->
payment_attrs = Map.put(payment_attrs, :order_id, order.id)
Payment.changeset(%Payment{}, payment_attrs)
|> repo.insert()
end)
|> Multi.run(:inventory, fn _repo, %{order: order} ->
# Complex inventory update logic
MyApp.Inventory.reserve_items(order.line_items)
end)
|> Repo.transaction()
|> case do
{:ok, %{order: order, payment: payment}} -> {:ok, order}
{:error, :order, changeset, _} -> {:error, changeset}
{:error, :payment, changeset, _} -> {:error, changeset}
{:error, :inventory, reason, _} -> {:error, reason}
end
end
end
Cross-References
Phoenix Integration
For Phoenix-specific Ecto patterns, schema generation with generators, and web-layer database integration,
see the elixir-phoenix-framework skill.
System Architecture
For designing data layer architecture, context boundaries with Ecto schemas, and database scaling patterns,
see the elixir-architecture skill.
Performance Review
For database query optimization, N+1 detection, and Ecto performance monitoring,
see the elixir-review skill.
Best Practices Summary
Schema Design
- Use UUIDs for distributed systems – avoid integer ID conflicts
- Validate at the database level – use constraints and indexes
- Keep schemas focused – single responsibility principle
- Use virtual fields judiciously – for computed values and form handling
Changeset Patterns
- Validate early and often – fail fast with clear messages
- Separate update types – different changesets for different operations
- Use custom validators – for complex business rules
- Handle associations properly – use cast_assoc and cast_embed
Query Optimization
- Prevent N+1 queries – use preload and joins appropriately
- Use indexes strategically – for common query patterns
- Batch operations – reduce round trips to database
- Profile queries – measure before optimizing
Migration Safety
- Add before remove – maintain compatibility during deployment
- Use constraints – enforce data integrity
- Test rollbacks – ensure migrations are reversible
- Batch large changes – avoid blocking operations
This skill provides comprehensive guidance for building efficient, maintainable database layers with Ecto in Elixir applications.