runners
1
总安装量
1
周安装量
#78331
全站排名
安装命令
npx skills add https://github.com/apache/beam --skill runners
Agent 安装分布
amp
1
cline
1
opencode
1
cursor
1
continue
1
kimi-cli
1
Skill 文档
Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
| Runner | Location | Description |
|---|---|---|
| Direct | runners/direct-java/ |
Local execution for testing |
| Prism | runners/prism/ |
Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ |
Google Cloud Dataflow |
| Flink | runners/flink/ |
Apache Flink |
| Spark | runners/spark/ |
Apache Spark |
| Samza | runners/samza/ |
Apache Samza |
| Jet | runners/jet/ |
Hazelcast Jet |
| Twister2 | runners/twister2/ |
Twister2 |
Direct Runner
For local development and testing.
Java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
Python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
Command Line
--runner=DirectRunner
Dataflow Runner
Prerequisites
- GCP project with Dataflow API enabled
- Service account with Dataflow Admin role
- GCS bucket for staging
Java Usage
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
Python Usage
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])
Runner v2
--experiments=use_runner_v2
Custom SDK Container
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
Flink Runner
Embedded Mode
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
Cluster Mode
options.setFlinkMaster("host:port");
Portable Mode (Python)
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])
Spark Runner
Java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:port
Python (Portable)
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])
Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
# Direct Runner
./gradlew :runners:direct-java:validatesRunner
# Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
# Spark Runner
./gradlew :runners:spark:3:validatesRunner
# Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
TestPipeline with Runners
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
Portable Runners
Concept
- SDK-independent execution via Fn API
- SDK runs in container, communicates via gRPC
Environment Types
DOCKER– SDK in Docker containerLOOPBACK– SDK in same process (testing)EXTERNAL– SDK at specified addressPROCESS– SDK in subprocess
Job Server
Start Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
./gradlew :runners:spark:3:job-server:runShadow
Runner-Specific Options
Dataflow
| Option | Description |
|---|---|
--project |
GCP project |
--region |
GCP region |
--tempLocation |
GCS temp location |
--stagingLocation |
GCS staging |
--numWorkers |
Initial workers |
--maxNumWorkers |
Max workers |
--workerMachineType |
VM type |
Flink
| Option | Description |
|---|---|
--flinkMaster |
Flink master address |
--parallelism |
Default parallelism |
--checkpointingInterval |
Checkpoint interval |
Spark
| Option | Description |
|---|---|
--sparkMaster |
Spark master URL |
--sparkConf |
Additional Spark config |
Building Runner Artifacts
Dataflow Worker Jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
Flink Job Server
./gradlew :runners:flink:1.18:job-server:shadowJar
Spark Job Server
./gradlew :runners:spark:3:job-server:shadowJar
Debugging
Direct Runner
- Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - Use
--targetParallelism=1for deterministic execution
Dataflow
- Check Dataflow UI: console.cloud.google.com/dataflow
- Use
--experiments=upload_graphfor graph debugging - Worker logs in Cloud Logging
Portable Runners
- Enable debug logging on job server
- Check SDK harness logs in worker containers