diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy index 28a01e3eee17..ac44c841725a 100644 --- a/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_coGBK_Flink_Python.groovy @@ -47,10 +47,10 @@ def scenarios = { datasetName -> '"num_hot_keys": 1,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 1,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 1, parallelism : 5, @@ -77,10 +77,10 @@ def scenarios = { datasetName -> '"num_hot_keys": 5,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 5,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 1, parallelism : 5, @@ -107,10 +107,10 @@ def scenarios = { datasetName -> '"num_hot_keys": 200000,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 200000,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 4, parallelism : 5, diff --git a/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy b/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy index 20b446cd1f8f..1efb93e58536 100644 --- a/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy +++ b/.test-infra/jenkins/job_LoadTests_coGBK_Python.groovy @@ -25,7 +25,7 @@ import InfluxDBCredentialsHelper def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) -def loadTestConfigurations = { datasetName -> +def loadTestConfigurations = { mode, datasetName -> [ [ title : 'CoGroupByKey Python Load test: 2GB of 100B records with a single key', @@ -34,12 +34,12 @@ def loadTestConfigurations = { datasetName -> pipelineOptions: [ project : 'apache-beam-testing', region : 'us-central1', - job_name : 'load-tests-python-dataflow-batch-cogbk-1-' + now, + job_name : "load-tests-python-dataflow-${mode}-cogbk-1-${now}", temp_location : 'gs://temp-storage-for-perf-tests/loadtests', publish_to_big_query : true, metrics_dataset : datasetName, - metrics_table : "python_dataflow_batch_cogbk_1", - influx_measurement : 'python_batch_cogbk_1', + metrics_table : "python_dataflow_${mode}_cogbk_1", + influx_measurement : "python_${mode}_cogbk_1", input_options : '\'{' + '"num_records": 20000000,' + '"key_size": 10,' + @@ -47,10 +47,10 @@ def loadTestConfigurations = { datasetName -> '"num_hot_keys": 1,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 1,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 1, num_workers : 5, @@ -64,12 +64,12 @@ def loadTestConfigurations = { datasetName -> pipelineOptions: [ project : 'apache-beam-testing', region : 'us-central1', - job_name : 'load-tests-python-dataflow-batch-cogbk-2-' + now, + job_name : "load-tests-python-dataflow-${mode}-cogbk-2-${now}", temp_location : 'gs://temp-storage-for-perf-tests/loadtests', publish_to_big_query : true, metrics_dataset : datasetName, - metrics_table : 'python_dataflow_batch_cogbk_2', - influx_measurement : 'python_batch_cogbk_2', + metrics_table : "python_dataflow_${mode}_cogbk_2", + influx_measurement : "python_${mode}_cogbk_2", input_options : '\'{' + '"num_records": 20000000,' + '"key_size": 10,' + @@ -77,10 +77,10 @@ def loadTestConfigurations = { datasetName -> '"num_hot_keys": 5,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 5,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 1, num_workers : 5, @@ -94,12 +94,12 @@ def loadTestConfigurations = { datasetName -> pipelineOptions: [ project : 'apache-beam-testing', region : 'us-central1', - job_name : 'load-tests-python-dataflow-batch-cogbk-3-' + now, + job_name : "load-tests-python-dataflow-${mode}-cogbk-3-${now}", temp_location : 'gs://temp-storage-for-perf-tests/loadtests', publish_to_big_query : true, metrics_dataset : datasetName, - metrics_table : "python_dataflow_batch_cogbk_3", - influx_measurement : 'python_batch_cogbk_3', + metrics_table : "python_dataflow_${mode}_cogbk_3", + influx_measurement : "python_${mode}_cogbk_3", input_options : '\'{' + '"num_records": 20000000,' + '"key_size": 10,' + @@ -107,10 +107,10 @@ def loadTestConfigurations = { datasetName -> '"num_hot_keys": 200000,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + - '"num_hot_keys": 200000,' + + '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', iterations : 4, num_workers : 5, @@ -124,12 +124,12 @@ def loadTestConfigurations = { datasetName -> pipelineOptions: [ project : 'apache-beam-testing', region : 'us-central1', - job_name : 'load-tests-python-dataflow-batch-cogbk-4-' + now, + job_name : "load-tests-python-dataflow-${mode}-cogbk-4-${now}", temp_location : 'gs://temp-storage-for-perf-tests/loadtests', publish_to_big_query : true, metrics_dataset : datasetName, - metrics_table : 'python_dataflow_batch_cogbk_4', - influx_measurement : 'python_batch_cogbk_4', + metrics_table : "python_dataflow_${mode}_cogbk_4", + influx_measurement : "python_${mode}_cogbk_4", input_options : '\'{' + '"num_records": 20000000,' + '"key_size": 10,' + @@ -137,7 +137,7 @@ def loadTestConfigurations = { datasetName -> '"num_hot_keys": 1000,' + '"hot_key_fraction": 1}\'', co_input_options : '\'{' + - '"num_records": 20000000,' + + '"num_records": 2000000,' + '"key_size": 10,' + '"value_size": 90,' + '"num_hot_keys": 1000,' + @@ -147,17 +147,22 @@ def loadTestConfigurations = { datasetName -> autoscaling_algorithm: 'NONE' ] ], - ].each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) } + ] + .each { test -> test.pipelineOptions.putAll(additionalPipelineArgs) } + .each { test -> (mode != 'streaming') ?: addStreamingOptions(test) } } -def batchLoadTestJob = { scope, triggeringContext -> - scope.description('Runs Python CoGBK load tests on Dataflow runner in batch mode') - commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240) +def addStreamingOptions(test) { + // Use highmem workers to prevent out of memory issues. + test.pipelineOptions << [streaming: null, + worker_machine_type: 'n1-highmem-4' + ] +} +def loadTestJob = { scope, triggeringContext, mode -> def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext) - for (testConfiguration in loadTestConfigurations(datasetName)) { - loadTestsBuilder.loadTest(scope, testConfiguration.title, testConfiguration.runner, CommonTestProperties.SDK.PYTHON_37, testConfiguration.pipelineOptions, testConfiguration.test) - } + loadTestsBuilder.loadTests(scope, CommonTestProperties.SDK.PYTHON_37, + loadTestConfigurations(mode, datasetName), 'CoGBK', mode) } CronJobBuilder.cronJob('beam_LoadTests_Python_CoGBK_Dataflow_Batch', 'H 16 * * *', this) { @@ -165,7 +170,7 @@ CronJobBuilder.cronJob('beam_LoadTests_Python_CoGBK_Dataflow_Batch', 'H 16 * * * influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname, ] - batchLoadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT) + loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'batch') } PhraseTriggeringPostCommitBuilder.postCommitJob( @@ -175,5 +180,23 @@ PhraseTriggeringPostCommitBuilder.postCommitJob( this ) { additionalPipelineArgs = [:] - batchLoadTestJob(delegate, CommonTestProperties.TriggeringContext.PR) + loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'batch') + } + +CronJobBuilder.cronJob('beam_LoadTests_Python_CoGBK_Dataflow_Streaming', 'H 16 * * *', this) { + additionalPipelineArgs = [ + influx_db_name: InfluxDBCredentialsHelper.InfluxDBDatabaseName, + influx_hostname: InfluxDBCredentialsHelper.InfluxDBHostname, + ] + loadTestJob(delegate, CommonTestProperties.TriggeringContext.POST_COMMIT, 'streaming') +} + +PhraseTriggeringPostCommitBuilder.postCommitJob( + 'beam_LoadTests_Python_CoGBK_Dataflow_Streaming', + 'Run Load Tests Python CoGBK Dataflow Streaming', + 'Load Tests Python CoGBK Dataflow Streaming suite', + this + ) { + additionalPipelineArgs = [:] + loadTestJob(delegate, CommonTestProperties.TriggeringContext.PR, 'streaming') } diff --git a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py index 000e3938f416..165fd920c689 100644 --- a/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py +++ b/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py @@ -120,7 +120,6 @@ def test(self): | 'Read ' + self.INPUT_TAG >> beam.io.Read( synthetic_pipeline.SyntheticSource( self.parse_synthetic_source_options())) - | 'Make ' + self.INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x)) | 'Measure time: Start pc1' >> beam.ParDo( MeasureTime(self.metrics_namespace))) @@ -129,8 +128,6 @@ def test(self): | 'Read ' + self.CO_INPUT_TAG >> beam.io.Read( synthetic_pipeline.SyntheticSource( self.parse_synthetic_source_options(self.co_input_options))) - | - 'Make ' + self.CO_INPUT_TAG + ' iterable' >> beam.Map(lambda x: (x, x)) | 'Measure time: Start pc2' >> beam.ParDo( MeasureTime(self.metrics_namespace))) # pylint: disable=expression-not-assigned