Skip to content

Commit c9445ef

Browse files
committed
Aded portable runner to python and go runners
1 parent efe4e94 commit c9445ef

3 files changed

Lines changed: 46 additions & 2 deletions

File tree

sdks/go/pkg/beam/runners/dataflow/dataflow.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
335335
experiments := jobopts.GetExperiments()
336336
// Ensure that we enable the same set of experiments across all SDKs
337337
// for runner v2.
338-
var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
338+
var fnApiSet, v2set, uwSet, portableRunnerSet, portaSubmission, seSet, wsSet bool
339339
for _, e := range experiments {
340340
if strings.Contains(e, "beam_fn_api") {
341341
fnApiSet = true
@@ -349,7 +349,10 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
349349
if strings.Contains(e, "use_portable_job_submission") {
350350
portaSubmission = true
351351
}
352-
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
352+
if strings.Contains(e, "enable_portable_runner") {
353+
portableRunnerSet = true
354+
}
355+
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") || strings.Contains(e, "disable_portable_runner") || strings.Contains(e, "enable_streaming_java_runner") {
353356
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
354357
}
355358
}
@@ -366,6 +369,9 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
366369
if !portaSubmission {
367370
experiments = append(experiments, "use_portable_job_submission")
368371
}
372+
if !portableRunnerSet {
373+
// As this option is not documented, we do not set it by default. This behavior will be fixed in later versions.
374+
}
369375

370376
// Ensure that streaming specific experiments are set for streaming pipelines
371377
// since runner v2 only supports using streaming engine.

sdks/go/pkg/beam/runners/dataflow/dataflow_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,40 @@ func TestGetJobOptions_DisableRunnerV2ExperimentsSet(t *testing.T) {
244244
}
245245
}
246246

247+
func TestGetJobOptions_DisablePortableRunnerExperimentsSet(t *testing.T) {
248+
resetGlobals()
249+
*stagingLocation = "gs://testStagingLocation"
250+
*gcpopts.Project = "testProject"
251+
*gcpopts.Region = "testRegion"
252+
*jobopts.Experiments = "disable_portable_runner"
253+
254+
opts, err := getJobOptions(context.Background(), false)
255+
256+
if err == nil {
257+
t.Error("getJobOptions() returned error nil, want an error")
258+
}
259+
if opts != nil {
260+
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
261+
}
262+
}
263+
264+
func TestGetJobOptions_EnableStreamingJavaRunnerExperimentsSet(t *testing.T) {
265+
resetGlobals()
266+
*stagingLocation = "gs://testStagingLocation"
267+
*gcpopts.Project = "testProject"
268+
*gcpopts.Region = "testRegion"
269+
*jobopts.Experiments = "enable_streaming_java_runner"
270+
271+
opts, err := getJobOptions(context.Background(), false)
272+
273+
if err == nil {
274+
t.Error("getJobOptions() returned error nil, want an error")
275+
}
276+
if opts != nil {
277+
t.Errorf("getJobOptions() returned JobOptions when it should not have, got %#v, want nil", opts)
278+
}
279+
}
280+
247281
func TestGetJobOptions_NoStagingLocation(t *testing.T) {
248282
resetGlobals()
249283
*stagingLocation = ""

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,8 @@ def _add_runner_v2_missing_options(options):
591591
debug_options.add_experiment('use_unified_worker')
592592
debug_options.add_experiment('use_runner_v2')
593593
debug_options.add_experiment('use_portable_job_submission')
594+
# enable_portable_runner is not added by default as it is not documented.
595+
# This behavior will be fixed in later versions.
594596

595597

596598
def _check_and_add_missing_options(options):
@@ -662,6 +664,8 @@ def _is_runner_v2_disabled(options):
662664
"""Returns true if runner v2 is disabled."""
663665
debug_options = options.view_as(DebugOptions)
664666
return (
667+
debug_options.lookup_experiment('disable_portable_runner') or
668+
debug_options.lookup_experiment('enable_streaming_java_runner') or
665669
debug_options.lookup_experiment('disable_runner_v2') or
666670
debug_options.lookup_experiment('disable_runner_v2_until_2023') or
667671
debug_options.lookup_experiment('disable_runner_v2_until_v2.50') or

0 commit comments

Comments
 (0)