streaming-patterns
1
总安装量
1
周安装量
#51956
全站排名
安装命令
npx skills add https://github.com/lookatitude/beluga-ai --skill streaming-patterns
Agent 安装分布
codex
1
Skill 文档
Streaming Patterns for Beluga AI v2
Primary Primitive: iter.Seq2[T, error]
Beluga uses Go 1.23+ range-over-func iterators, NOT channels, for public API streaming.
// Producing a stream
func (m *Model) Stream(ctx context.Context, msgs []schema.Message) iter.Seq2[schema.StreamChunk, error] {
return func(yield func(schema.StreamChunk, error) bool) {
// Setup
stream, err := m.client.Stream(ctx, msgs)
if err != nil {
yield(schema.StreamChunk{}, err)
return
}
defer stream.Close()
// Produce events
for {
select {
case <-ctx.Done():
yield(schema.StreamChunk{}, ctx.Err())
return
default:
}
chunk, err := stream.Recv()
if err == io.EOF {
return // stream complete
}
if err != nil {
yield(schema.StreamChunk{}, err)
return
}
if !yield(convertChunk(chunk), nil) {
return // consumer stopped iteration
}
}
}
}
// Consuming a stream
for chunk, err := range model.Stream(ctx, msgs) {
if err != nil {
log.Error("stream error", "err", err)
break
}
fmt.Print(chunk.Text)
}
Event[T] Type
type Event[T any] struct {
Type EventType
Payload T
Err error
Meta map[string]any
}
type EventType string
const (
EventData EventType = "data"
EventToolCall EventType = "tool_call"
EventToolResult EventType = "tool_result"
EventHandoff EventType = "handoff"
EventDone EventType = "done"
EventError EventType = "error"
)
Stream Composition
Pipe (sequential)
func Pipe[A, B any](
first iter.Seq2[A, error],
transform func(A) (B, error),
) iter.Seq2[B, error] {
return func(yield func(B, error) bool) {
for a, err := range first {
if err != nil {
var zero B
yield(zero, err)
return
}
b, err := transform(a)
if !yield(b, err) {
return
}
}
}
}
Fan-out (parallel)
func FanOut[T any](stream iter.Seq2[T, error], n int) []iter.Seq2[T, error] {
// Use iter.Pull to get next/stop, then broadcast to n consumers
next, stop := iter.Pull2(stream)
// ... create n output iterators that share the source
}
Collect (stream to slice)
func Collect[T any](stream iter.Seq2[T, error]) ([]T, error) {
var results []T
for item, err := range stream {
if err != nil {
return results, err
}
results = append(results, item)
}
return results, nil
}
Invoke from Stream
Invoke() is always implemented as “stream, collect, return last”:
func (a *Agent) Invoke(ctx context.Context, input any, opts ...core.Option) (any, error) {
var last any
for event, err := range a.Stream(ctx, input, opts...) {
if err != nil {
return nil, err
}
last = event
}
return last, nil
}
Backpressure with BufferedStream
type BufferedStream[T any] struct {
source iter.Seq2[T, error]
buffer chan T
size int
}
func NewBufferedStream[T any](source iter.Seq2[T, error], bufferSize int) *BufferedStream[T] {
return &BufferedStream[T]{source: source, size: bufferSize}
}
Context Cancellation
ALWAYS check context in stream producers:
select {
case <-ctx.Done():
yield(zero, ctx.Err())
return
default:
}
iter.Pull for Pull Semantics
When a consumer needs explicit next/stop control:
next, stop := iter.Pull2(stream)
defer stop()
val, err, ok := next()
if !ok { /* stream exhausted */ }
Rules
- Public API streaming:
iter.Seq2[T, error]â NEVER<-chan - Internal goroutine communication: channels are fine
- Always handle context cancellation in producers
yieldreturning false means consumer stopped â respect it immediately- Use
iter.Pull2only when pull semantics are genuinely needed - Collect/Invoke are convenience wrappers around streaming