diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index 61de2b54e730..cf0c7d22899a 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -55,7 +55,7 @@ jobs: (github.event_name == 'schedule' && github.repository == 'apache/beam') || github.event.comment.body == 'Run Inference Benchmarks' runs-on: [self-hosted, ubuntu-24.04, main] - timeout-minutes: 1000 + timeout-minutes: 2000 name: ${{ matrix.job_name }} (${{ matrix.job_phrase }}) strategy: matrix: @@ -94,6 +94,9 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Batch.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Table_Row_Inference_Stream.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt # The env variables are created and populated in the test-arguments-action as "_test_arguments_" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -191,7 +194,7 @@ jobs: -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt' + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt' \ - name: run Table Row Inference Sklearn Batch uses: ./.github/actions/gradle-command-self-hosted-action timeout-minutes: 180 @@ -202,7 +205,7 @@ jobs: -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=NONE --metrics_table=result_table_row_inference_batch --influx_measurement=result_table_row_inference_batch --mode=batch --input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl --input_expand_factor=100 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs --job_name=benchmark-tests-table-row-inference-batch-${{env.NOW_UTC}}' + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --autoscaling_algorithm=NONE --metrics_table=result_table_row_inference_batch --influx_measurement=result_table_row_inference_batch --mode=batch --input_file=gs://apache-beam-ml/testing/inputs/table_rows_100k_benchmark.jsonl --input_expand_factor=100 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_batch_outputs --job_name=benchmark-tests-table-row-inference-batch-${{env.NOW_UTC}}' \ - name: run Table Row Inference Sklearn Stream uses: ./.github/actions/gradle-command-self-hosted-action timeout-minutes: 180 @@ -213,4 +216,136 @@ jobs: -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/table_row_inference_requirements.txt \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}' + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_10 }} --autoscaling_algorithm=THROUGHPUT_BASED --max_num_workers=20 --metrics_table=result_table_row_inference_stream --influx_measurement=result_table_row_inference_stream --mode=streaming --input_subscription=projects/apache-beam-testing/subscriptions/table_row_inference_benchmark --window_size_sec=60 --trigger_interval_sec=30 --timeout_ms=900000 --output_table=apache-beam-testing:beam_run_inference.result_table_row_inference_stream_outputs --job_name=benchmark-tests-table-row-inference-stream-${{env.NOW_UTC}}' \ + - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --device=CPU --mode=batch --job_name=benchmark-tests-pytorch-image-object-detection-batch-${{env.NOW_UTC}} --metrics_table=torch_inference_image_object_detection_batch_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_batch_cpu' \ + - name: run PyTorch Image Captioning BLIP + CLIP Batch CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_captioning_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_13 }} --device=CPU --mode=batch --job_name=benchmark-tests-pytorch-image-captioning-batch-${{env.NOW_UTC}} --metrics_table=torch_inference_image_captioning_batch_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_batch_cpu' + - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --device=GPU --worker_accelerator=type=nvidia-tesla-t4,count=1,install-nvidia-driver=true --mode=batch --job_name=benchmark-tests-pytorch-image-object-detection-batch-${{env.NOW_UTC}} --metrics_table=torch_inference_image_object_detection_batch_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_batch_gpu' \ + - name: run PyTorch Image Captioning BLIP + CLIP Batch GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_captioning_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_13 }} --device=GPU --worker_accelerator=type=nvidia-tesla-t4,count=1,install-nvidia-driver=true --mode=batch --job_name=benchmark-tests-pytorch-image-captioning-batch-${{env.NOW_UTC}} --metrics_table=torch_inference_image_captioning_batch_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_batch_gpu' \ + - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --device=CPU --mode=streaming --job_name=benchmark-tests-pytorch-image-object-detection-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_image_object_detection_streaming_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_streaming_cpu' \ + - name: run PyTorch Image Captioning BLIP + CLIP Streaming CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_captioning_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_13 }} --device=CPU --mode=streaming --job_name=benchmark-tests-pytorch-image-captioning-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_image_captioning_streaming_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_streaming_cpu' + - name: run PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_object_detection_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_12 }} --device=GPU --worker_accelerator=type=nvidia-tesla-t4,count=1,install-nvidia-driver=true --mode=streaming --job_name=benchmark-tests-pytorch-image-object-detection-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_image_object_detection_streaming_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_object_detection_streaming_gpu' \ + - name: run PyTorch Image Captioning BLIP + CLIP Streaming GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_image_captioning_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_13 }} --device=GPU --worker_accelerator=type=nvidia-tesla-t4,count=1,install-nvidia-driver=true --mode=streaming --job_name=benchmark-tests-pytorch-image-captioning-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_image_captioning_streaming_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_image_captioning_streaming_gpu' \ + - name: run PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --device=CPU --experiments=enable_streaming_rightfitting --mode=streaming --job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_imagenet_stream_rightfit_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_cpu' \ + - name: run PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting Exactly-once) CPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --device=CPU --experiments=enable_streaming_rightfitting --mode=streaming --job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_imagenet_stream_rightfit_once_cpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_cpu' \ + - name: run PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --device=GPU --experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx --mode=streaming --job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_imagenet_stream_rightfit_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_gpu' \ + - name: run PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting Exactly-once) GPU + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_11 }} --device=GPU --experiments=enable_streaming_rightfitting,worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx --mode=streaming --job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} --metrics_table=torch_inference_imagenet_stream_rightfit_once_gpu --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit_once_gpu' diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt new file mode 100644 index 000000000000..c58707383ef9 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Captioning.txt @@ -0,0 +1,37 @@ +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--worker_machine_type=n1-standard-4 +--num_workers=50 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/inference/pytorch_image_captioning_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--input_options={} +--influx_measurement=result_torch_inference_image_captioning +--input=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--blip_model_name=Salesforce/blip-image-captioning-base +--blip_batch_size=4 +--num_captions=5 +--max_new_tokens=30 +--num_beams=5 +--clip_model_name=openai/clip-vit-base-patch32 +--clip_batch_size=8 +--clip_score_normalize=false +--runner=DataflowRunner +--experiments=use_runner_v2 diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt new file mode 100644 index 000000000000..7bc43055edd6 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--machine_type=n1-standard-4 +--num_workers=50 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/inference/pytorch_rightfit_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--influx_measurement=torch_inference_imagenet_stream_rightfit +--pretrained_model_name=efficientnet_b0 +--input_file=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--runner=DataflowRunner +--mode=streaming +--input_mode=gcs_uris +--input_options={} +--pubsub_topic=projects/apache-beam-testing/topics/images_topic +--pubsub_subscription=projects/apache-beam-testing/subscriptions/images_subscription +--model_state_dict_path=gs://apache-beam-ml/models/efficientnet_b0_state_dict.pth +--image_size=224 +--top_k=5 +--inference_batch_size=auto +--window_sec=60 +--trigger_proc_time_sec=30 diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt new file mode 100644 index 000000000000..18f017ad0109 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Object_Detection.txt @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--worker_machine_type=n1-standard-4 +--num_workers=50 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--input_options={} +--influx_measurement=result_torch_inference_image_object_detection_batch +--pretrained_model_name=fasterrcnn_resnet50_fpn +--inference_batch_size=8 +--resize_shorter_side=800 +--score_threshold=0.5 +--max_detections=50 +--input=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--model_state_dict_path=gs://apache-beam-ml/models/torchvision.detection.fasterrcnn_resnet50_fpn.pth +--runner=DataflowRunner +--experiments=use_runner_v2 diff --git a/.test-infra/tools/refresh_looker_metrics.py b/.test-infra/tools/refresh_looker_metrics.py index afd8ffa6f861..89612d289439 100644 --- a/.test-infra/tools/refresh_looker_metrics.py +++ b/.test-infra/tools/refresh_looker_metrics.py @@ -43,6 +43,25 @@ ("82", ["263", "264", "265", "266", "267"]), # PyTorch Sentiment Streaming DistilBERT base uncased ("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased ("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma + + # PyTorch Image Classification EfficientNet-B0 Streaming (Right-fit) CPU + ("92", ["289", "290", "291", "292", "293"]), + ("97", ["306", "307", "308", "309", "310"]), + ("98", ["311", "312", "313", "314", "315"]), + ("99", ["316", "317", "318", "319", "320"]), + + # PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU + ("93", ["294", "295", "296", "298", "299"]), + ("100", ["321", "322", "323", "324", "325"]), + ("101", ["326", "327", "328", "329", "330"]), + ("102", ["331", "332", "333", "334", "335"]), + + # PyTorch Image Captioning BLIP + CLIP Batch CPU + ("94", ["297", "300", "301", "302", "303"]), + ("103", ["336", "337", "338", "339", "340"]), + ("104", ["341", "342", "343", "344", "345"]), + ("105", ["346", "347", "348", "349", "350"]), + ("96", ["270", "304", "305", "353", "354"]), # Table Row Inference Sklearn Batch ("106", ["355", "356", "357", "358", "359"]) # Table Row Inference Sklearn Streaming ] diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py new file mode 100644 index 000000000000..61a3c226b51f --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py @@ -0,0 +1,637 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This pipeline performs image captioning using a multi-model approach: +BLIP generates candidate captions, CLIP ranks them by image-text similarity. + +The pipeline reads image URIs from a GCS input file, decodes images, runs BLIP +caption generation in batches on GPU, then runs CLIP ranking in batches on GPU. +Results are written to BigQuery. +""" + +import argparse +import io +import json +import logging +import threading +import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Tuple + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import ModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult +from apache_beam.transforms import window + +from google.api_core.exceptions import NotFound +from google.cloud import pubsub_v1 +import torch +import PIL.Image as PILImage + +# ============ Utility ============ + + +def now_millis() -> int: + return int(time.time() * 1000) + + +def decode_pil(image_bytes: bytes) -> PILImage.Image: + with PILImage.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + img.load() + return img + + +# ============ DoFns ============ + + +class MakeKeyDoFn(beam.DoFn): + """Produce (uri, uri) so the URI is used as the stable key.""" + def process(self, element: str): + uri = element + yield uri, uri + + +class ReadImageBytesDoFn(beam.DoFn): + """Turn (uri, uri) -> (uri, dict(image_bytes)).""" + def process(self, kv: Tuple[str, str]): + uri, _ = kv + try: + with FileSystems.open(uri) as f: + image_bytes = f.read() + yield uri, {"image_bytes": image_bytes} + except Exception as e: + logging.warning("Failed to read image %s: %s", uri, e) + return + + +class DecodeImageDoFn(beam.DoFn): + """Turn (uri, dict(image_bytes)) -> (uri, dict(image)).""" + def process(self, kv: Tuple[str, Dict[str, Any]]): + uri, value = kv + image_bytes = value["image_bytes"] + + try: + image = decode_pil(image_bytes) + except Exception as e: + logging.warning("Failed to decode image %s: %s", uri, e) + image = PILImage.new("RGB", (224, 224), color=(0, 0, 0)) + + yield uri, {"image": image} + + +class PostProcessDoFn(beam.DoFn): + """Final PredictionResult -> row for BigQuery.""" + def __init__(self, blip_name: str, clip_name: str): + self.blip_name = blip_name + self.clip_name = clip_name + + def process(self, kv: Tuple[str, PredictionResult]): + uri, pred = kv + if hasattr(pred, "inference"): + inf = pred.inference or {} + else: + inf = pred + # Expected inference fields from CLIP handler: + # best_caption, best_score, candidates, scores, blip_ms, clip_ms, total_ms + best_caption = inf.get("best_caption", "") + best_score = inf.get("best_score", None) + candidates = inf.get("candidates", []) + scores = inf.get("scores", []) + blip_ms = inf.get("blip_ms", None) + clip_ms = inf.get("clip_ms", None) + total_ms = inf.get("total_ms", None) + + yield { + "image_id": uri, + "blip_model": self.blip_name, + "clip_model": self.clip_name, + "best_caption": best_caption, + "best_score": float(best_score) if best_score is not None else None, + "candidates": json.dumps(candidates), + "scores": json.dumps(scores), + "blip_ms": int(blip_ms) if blip_ms is not None else None, + "clip_ms": int(clip_ms) if clip_ms is not None else None, + "total_ms": int(total_ms) if total_ms is not None else None, + "infer_ms": now_millis(), + } + + +# ============ Model Handlers ============ + + +class BlipCaptionModelHandler(ModelHandler): + def __init__( + self, + model_name: str, + device: str, + batch_size: int, + num_captions: int, + max_new_tokens: int, + num_beams: int): + self.model_name = model_name + self.device = device + self.batch_size = batch_size + self.num_captions = num_captions + self.max_new_tokens = max_new_tokens + self.num_beams = num_beams + + def load_model(self): + from transformers import BlipForConditionalGeneration, BlipProcessor + processor = BlipProcessor.from_pretrained(self.model_name) + model = BlipForConditionalGeneration.from_pretrained(self.model_name) + return (model, processor) + + def batch_elements_kwargs(self): + return {"max_batch_size": self.batch_size} + + def run_inference( + self, batch: List[Dict[str, Any]], model_bundle, inference_args=None): + + model, processor = model_bundle + model.to(self.device) + model.eval() + start = now_millis() + + images = [x["image"] for x in batch] + + # Processor makes pixel_values + inputs = processor(images=images, return_tensors="pt") + pixel_values = inputs["pixel_values"].to(self.device) + + # Generate captions + # We use num_return_sequences to generate multiple candidates per image. + # Note: this will produce (B * num_captions) sequences. + with torch.no_grad(): + generated_ids = model.generate( + pixel_values=pixel_values, + max_new_tokens=self.max_new_tokens, + num_beams=max(self.num_beams, self.num_captions), + num_return_sequences=self.num_captions, + do_sample=False, + ) + + captions_all = processor.batch_decode( + generated_ids, skip_special_tokens=True) + + # Group candidates per image + candidates_per_image = [] + idx = 0 + for _ in range(len(batch)): + candidates_per_image.append(captions_all[idx:idx + self.num_captions]) + idx += self.num_captions + + blip_ms = now_millis() - start + + results = [] + for i in range(len(batch)): + results.append({ + "image": images[i], + "candidates": candidates_per_image[i], + "blip_ms": blip_ms, + }) + return results + + def get_metrics_namespace(self) -> str: + return "blip_captioning" + + +class ClipRankModelHandler(ModelHandler): + def __init__( + self, + model_name: str, + device: str, + batch_size: int, + score_normalize: bool): + self.model_name = model_name + self.device = device + self.batch_size = batch_size + self.score_normalize = score_normalize + + def load_model(self): + from transformers import CLIPModel, CLIPProcessor + processor = CLIPProcessor.from_pretrained(self.model_name) + model = CLIPModel.from_pretrained(self.model_name) + return (model, processor) + + def batch_elements_kwargs(self): + return {"max_batch_size": self.batch_size} + + def run_inference( + self, batch: List[Dict[str, Any]], model_bundle, inference_args=None): + + model, processor = model_bundle + model.to(self.device) + model.eval() + start_batch = now_millis() + + # Flat lists for a single batched CLIP forward pass + images: List[PILImage.Image] = [] + texts: List[str] = [] + offsets: List[Tuple[int, int]] = [] + # per element -> [start, end) in flat arrays + candidates_list: List[List[str]] = [] + blip_ms_list: List[Optional[int]] = [] + + for x in batch: + img = x["image"] + candidates = [str(c) for c in (x.get("candidates", []) or [])] + candidates_list.append(candidates) + blip_ms_list.append(x.get("blip_ms", None)) + + start_i = len(texts) + for c in candidates: + images.append(img) + texts.append(c) + end_i = len(texts) + offsets.append((start_i, end_i)) + + results: List[Dict[str, Any]] = [] + + # Fast path: no candidates at all + if not texts: + for blip_ms in blip_ms_list: + total_ms = int(blip_ms) if blip_ms is not None else None + results.append({ + "best_caption": "", + "best_score": None, + "candidates": [], + "scores": [], + "blip_ms": blip_ms, + "clip_ms": 0, + "total_ms": total_ms, + }) + return results + + with torch.no_grad(): + inputs = processor( + text=texts, + images=images, + return_tensors="pt", + padding=True, + truncation=True, + ) + inputs = { + k: (v.to(self.device) if torch.is_tensor(v) else v) + for k, v in inputs.items() + } + + # avoid NxN logits inside CLIPModel.forward() + img = model.get_image_features( + pixel_values=inputs["pixel_values"]) # [N, D] + txt = model.get_text_features( + input_ids=inputs["input_ids"], + attention_mask=inputs.get("attention_mask"), + ) # [N, D] + + img = img / img.norm(dim=-1, keepdim=True) + txt = txt / txt.norm(dim=-1, keepdim=True) + + logit_scale = model.logit_scale.exp() # scalar tensor + pair_scores = (img * txt).sum(dim=-1) * logit_scale # [N] + pair_scores_cpu = pair_scores.detach().cpu().tolist() + + batch_ms = now_millis() - start_batch + total_pairs = len(texts) + + items = zip(offsets, candidates_list, blip_ms_list) + for (start_i, end_i), candidates, blip_ms in items: + if start_i == end_i: + total_ms = int(blip_ms) if blip_ms is not None else None + results.append({ + "best_caption": "", + "best_score": None, + "candidates": [], + "scores": [], + "blip_ms": blip_ms, + "clip_ms": 0, + "total_ms": total_ms, + }) + continue + + scores = [float(pair_scores_cpu[j]) for j in range(start_i, end_i)] + + if self.score_normalize: + scores_t = torch.tensor(scores, dtype=torch.float32) + scores = torch.softmax(scores_t, dim=0).tolist() + + best_idx = max(range(len(scores)), key=lambda i, s=scores: s[i]) + + pairs = end_i - start_i + clip_ms_elem = int(batch_ms * (pairs / max(1, total_pairs))) + if pairs > 0: + clip_ms_elem = max(1, clip_ms_elem) + + total_ms = int(blip_ms) + clip_ms_elem if blip_ms is not None else None + results.append({ + "best_caption": candidates[best_idx], + "best_score": float(scores[best_idx]), + "candidates": candidates, + "scores": scores, + "blip_ms": blip_ms, + "clip_ms": clip_ms_elem, + "total_ms": total_ms, + }) + + return results + + def get_metrics_namespace(self) -> str: + return "clip_ranking" + + +# ============ Args & Helpers ============ + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + + # I/O & runtime + parser.add_argument( + '--mode', default='streaming', choices=['streaming', 'batch']) + parser.add_argument( + '--project', default='apache-beam-testing', help='GCP project ID') + parser.add_argument( + '--input', required=True, help='GCS path to file with image URIs') + parser.add_argument( + '--pubsub_topic', + default='projects/apache-beam-testing/topics/images_topic') + parser.add_argument( + '--pubsub_subscription', + default='projects/apache-beam-testing/subscriptions/images_subscription') + parser.add_argument( + '--output_table', + required=True, + help='BigQuery output table: dataset.table') + parser.add_argument( + '--publish_to_big_query', default='true', choices=['true', 'false']) + parser.add_argument( + '--feeder_start_delay_sec', + type=int, + default=900, + help=( + 'Delay before starting the feeder pipeline that reads URIs from GCS ' + 'and publishes them to Pub/Sub. This delay allows the main streaming ' + 'pipeline workers to start and scale before data ingestion begins.'), + ) + + # Device + parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU']) + + # BLIP + parser.add_argument( + '--blip_model_name', default='Salesforce/blip-image-captioning-base') + parser.add_argument('--blip_batch_size', type=int, default=4) + parser.add_argument('--num_captions', type=int, default=5) + parser.add_argument('--max_new_tokens', type=int, default=30) + parser.add_argument('--num_beams', type=int, default=5) + + # CLIP + parser.add_argument( + '--clip_model_name', default='openai/clip-vit-base-patch32') + parser.add_argument('--clip_batch_size', type=int, default=8) + parser.add_argument( + '--clip_score_normalize', default='false', choices=['true', 'false']) + + # Windows + parser.add_argument('--window_sec', type=int, default=60) + parser.add_argument('--trigger_proc_time_sec', type=int, default=30) + + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def ensure_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + publisher.get_topic(request={"topic": full_topic_path}) + except NotFound: + publisher.create_topic(name=full_topic_path) + + try: + subscriber.get_subscription( + request={"subscription": full_subscription_path}) + except NotFound: + subscriber.create_subscription( + name=full_subscription_path, topic=full_topic_path) + + +def cleanup_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + subscriber.delete_subscription( + request={"subscription": full_subscription_path}) + print(f"Deleted subscription: {subscription_name}") + except NotFound: + print(f"Subscription already deleted: {subscription_name}") + + try: + publisher.delete_topic(request={"topic": full_topic_path}) + print(f"Deleted topic: {topic_name}") + except NotFound: + print(f"Topic already deleted: {topic_name}") + + +def override_or_add(args, flag, value): + if flag in args: + idx = args.index(flag) + args[idx + 1] = str(value) + else: + args.extend([flag, str(value)]) + + +# ============ Load pipeline ============ + + +def run_load_pipeline(known_args, pipeline_args): + """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming).""" + # enforce smaller/CPU-only defaults for feeder + override_or_add(pipeline_args, '--device', 'CPU') + override_or_add(pipeline_args, '--num_workers', '5') + override_or_add(pipeline_args, '--max_num_workers', '10') + override_or_add( + pipeline_args, '--job_name', f"images-load-pubsub-{int(time.time())}") + override_or_add(pipeline_args, '--project', known_args.project) + pipeline_args = [ + arg for arg in pipeline_args if not arg.startswith("--experiments") + ] + + pipeline_options = PipelineOptions(pipeline_args) + pipeline = beam.Pipeline(options=pipeline_options) + + _ = ( + pipeline + | 'ReadGCSFile' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmpty' >> beam.Filter(lambda line: line.strip()) + | 'ToBytes' >> beam.Map(lambda line: line.encode('utf-8')) + | 'ToPubSub' >> beam.io.WriteToPubSub(topic=known_args.pubsub_topic)) + return pipeline.run() + + +# ============ Main pipeline ============ + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + + if known_args.mode == 'streaming': + ensure_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + # Start feeder thread that reads URIs from GCS and fills Pub/Sub. + # Delay is used to allow the main streaming pipeline workers to start + # and autoscale before the feeder pipeline begins publishing messages. + threading.Thread( + target=lambda: ( + time.sleep(known_args.feeder_start_delay_sec), run_load_pipeline( + known_args, pipeline_args)), + daemon=True).start() + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + pipeline_options.view_as(StandardOptions).streaming = ( + known_args.mode == 'streaming') + + device = 'cuda' if known_args.device.upper() == 'GPU' else 'cpu' + clip_score_normalize = (known_args.clip_score_normalize == 'true') + + blip_handler = BlipCaptionModelHandler( + model_name=known_args.blip_model_name, + device=device, + batch_size=int(known_args.blip_batch_size), + num_captions=int(known_args.num_captions), + max_new_tokens=int(known_args.max_new_tokens), + num_beams=int(known_args.num_beams), + ) + + clip_handler = ClipRankModelHandler( + model_name=known_args.clip_model_name, + device=device, + batch_size=int(known_args.clip_batch_size), + score_normalize=clip_score_normalize, + ) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + if known_args.mode == 'batch': + pcoll = ( + pipeline + | 'ReadURIsBatch' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())) + else: + pcoll = ( + pipeline + | 'ReadFromPubSub' >> + beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription) + | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8')) + | 'Window' >> beam.WindowInto( + window.FixedWindows(known_args.window_sec), + trigger=beam.trigger.AfterProcessingTime( + known_args.trigger_proc_time_sec), + accumulation_mode=beam.trigger.AccumulationMode.DISCARDING, + allowed_lateness=0)) + + keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn())) + image_bytes = (keyed | 'ReadImageBytes' >> beam.ParDo(ReadImageBytesDoFn())) + images = (image_bytes | 'DecodeImage' >> beam.ParDo(DecodeImageDoFn())) + + # Stage 1: BLIP candidate generation + blip_out = ( + images + | 'RunInferenceBLIP' >> RunInference(KeyedModelHandler(blip_handler))) + + # Stage 2: CLIP ranking over candidates + clip_out = ( + blip_out + | 'RunInferenceCLIP' >> RunInference(KeyedModelHandler(clip_handler))) + + results = ( + clip_out + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + blip_name=known_args.blip_model_name, + clip_name=known_args.clip_model_name))) + + method = ( + beam.io.WriteToBigQuery.Method.FILE_LOADS if known_args.mode == 'batch' + else beam.io.WriteToBigQuery.Method.STREAMING_INSERTS) + + if known_args.publish_to_big_query == 'true': + _ = ( + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema=( + 'image_id:STRING, blip_model:STRING, clip_model:STRING, ' + 'best_caption:STRING, best_score:FLOAT, ' + 'candidates:STRING, scores:STRING, ' + 'blip_ms:INT64, clip_ms:INT64, total_ms:INT64, infer_ms:INT64'), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=method)) + + result = pipeline.run() + result.wait_until_finish(duration=1800000) # 30 min + try: + result.cancel() + except Exception: + pass + + if known_args.mode == 'streaming': + cleanup_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py new file mode 100644 index 000000000000..21552ae5662e --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py @@ -0,0 +1,535 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""This pipeline performs object detection using an open-source PyTorch +TorchVision detection model (e.g., Faster R-CNN ResNet50 FPN) on GPU. + +It reads image URIs from a GCS input file, decodes and preprocesses images, +runs batched GPU inference via RunInference, post-processes detection outputs, +and writes results to BigQuery. + +The pipeline targets stable and reproducible performance measurements for +GPU inference workloads (no right-fitting; fixed batch size). +""" + +import argparse +import io +import json +import logging +import threading +import time +from typing import Any +from typing import Dict +from typing import List +from typing import Optional +from typing import Sequence +from typing import Tuple + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult +from apache_beam.transforms import window + +from google.api_core.exceptions import NotFound +from google.cloud import pubsub_v1 +import torch +import PIL.Image as PILImage + +# ============ Utility & Preprocessing ============ + + +def now_millis() -> int: + return int(time.time() * 1000) + + +def decode_to_tens( + image_bytes: bytes, + resize_shorter_side: Optional[int] = None) -> torch.Tensor: + """Decode bytes -> RGB PIL -> optional resize -> float tensor [0..1], CHW. + + Note: TorchVision detection models apply their own normalization internally. + """ + with PILImage.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + + if resize_shorter_side and resize_shorter_side > 0: + w, h = img.size + # Resize so that shorter side == resize_shorter_side, keep aspect ratio. + if w < h: + new_w = resize_shorter_side + new_h = int(h * (resize_shorter_side / float(w))) + else: + new_h = resize_shorter_side + new_w = int(w * (resize_shorter_side / float(h))) + img = img.resize((new_w, new_h)) + + import numpy as np + arr = np.asarray(img).astype("float32") / 255.0 # H,W,3 in [0..1] + arr = np.transpose(arr, (2, 0, 1)) # CHW + return torch.from_numpy(arr) + + +# ============ DoFns ============ + + +class MakeKeyDoFn(beam.DoFn): + """Produce (uri, uri) where the URI is used as the stable key.""" + def process(self, element: str): + uri = element + yield uri, uri + + +class DecodePreprocessDoFn(beam.DoFn): + """Turn (uri, uri) -> (uri, tensor).""" + def __init__(self, resize_shorter_side: Optional[int] = None): + self.resize_shorter_side = resize_shorter_side + + def process(self, kv: Tuple[str, str]): + uri, _ = kv + start = now_millis() + try: + with FileSystems.open(uri) as f: + image_bytes = f.read() + tensor = decode_to_tens( + image_bytes, resize_shorter_side=self.resize_shorter_side) + preprocess_ms = now_millis() - start + yield uri, {"tensor": tensor, "preprocess_ms": preprocess_ms} + except Exception as e: + logging.warning("Decode failed for %s: %s", uri, e) + return + + +def _torchvision_detection_inference_fn( + batch: Sequence[torch.Tensor], + model: torch.nn.Module, + device: torch.device, + inference_args: Optional[dict[str, Any]] = None, + model_id: Optional[str] = None, +) -> List[Dict[str, Any]]: + """Inference function for TorchVision detection models. + + TorchVision detection models expect List[Tensor] where each tensor is: + - shape: [3, H, W] + - dtype: float32 + - values: [0..1] + """ + del inference_args + del model_id + + with torch.no_grad(): + # Ensure tensors are on device + inputs = [] + for t in batch: + if isinstance(t, torch.Tensor): + inputs.append(t.to(device)) + else: + # Defensive: if somehow non-tensor slips through. + inputs.append(torch.as_tensor(t).to(device)) + outputs = model(inputs) # List[Dict[str, Tensor]] + return outputs + + +class PostProcessDoFn(beam.DoFn): + """PredictionResult -> dict row for BQ.""" + def __init__( + self, model_name: str, score_threshold: float, max_detections: int): + self.model_name = model_name + self.score_threshold = score_threshold + self.max_detections = max_detections + + def _extract_detection(self, inference_obj: Any) -> Dict[str, Any]: + """Extract detection fields from torchvision output dict.""" + # Expect: {'boxes': Tensor[N,4], 'labels': Tensor[N], 'scores': Tensor[N]} + boxes = inference_obj.get("boxes") + labels = inference_obj.get("labels") + scores = inference_obj.get("scores") + + # Convert to CPU lists + if isinstance(scores, torch.Tensor): + scores_list = scores.detach().cpu().tolist() + else: + scores_list = list(scores) if scores is not None else [] + + if isinstance(labels, torch.Tensor): + labels_list = labels.detach().cpu().tolist() + else: + labels_list = list(labels) if labels is not None else [] + + if isinstance(boxes, torch.Tensor): + boxes_list = boxes.detach().cpu().tolist() + else: + boxes_list = list(boxes) if boxes is not None else [] + + # Filter by threshold and trim to max_detections + dets = [] + for i in range(min(len(scores_list), len(labels_list), len(boxes_list))): + score = float(scores_list[i]) + if score < self.score_threshold: + continue + box = boxes_list[i] # [x1,y1,x2,y2] + dets.append({ + "label_id": int(labels_list[i]), + "score": score, + "box": [float(box[0]), float(box[1]), float(box[2]), float(box[3])], + }) + if len(dets) >= self.max_detections: + break + + return { + "detections": dets, + "num_detections": len(dets), + } + + def process(self, kv: Tuple[str, PredictionResult]): + image_uri, pred = kv + + # pred can be PredictionResult OR raw torchvision dict. + if hasattr(pred, "inference"): + inference_obj = pred.inference + else: + inference_obj = pred + + if isinstance(inference_obj, list) and len(inference_obj) == 1: + inference_obj = inference_obj[0] + + if not isinstance(inference_obj, dict): + logging.warning( + "Unexpected inf-ce type for %s: %s", image_uri, type(inference_obj)) + yield { + "image_id": image_uri, + "model_name": self.model_name, + "detections": json.dumps([]), + "num_detections": 0, + "infer_ms": now_millis(), + } + return + + extracted = self._extract_detection(inference_obj) + + yield { + "image_id": image_uri, + "model_name": self.model_name, + "detections": json.dumps(extracted["detections"]), + "num_detections": int(extracted["num_detections"]), + "infer_ms": now_millis(), + } + + +# ============ Args & Helpers ============ + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + + # I/O & runtime + parser.add_argument( + '--project', default='apache-beam-testing', help='GCP project ID') + parser.add_argument( + '--mode', default='streaming', choices=['streaming', 'batch']) + parser.add_argument( + '--output_table', + required=True, + help='BigQuery output table: dataset.table') + parser.add_argument( + '--publish_to_big_query', default='true', choices=['true', 'false']) + parser.add_argument( + '--input', required=True, help='GCS path to file with image URIs') + parser.add_argument( + '--pubsub_topic', + default='projects/apache-beam-testing/topics/images_topic') + parser.add_argument( + '--pubsub_subscription', + default='projects/apache-beam-testing/subscriptions/images_subscription') + parser.add_argument( + '--feeder_start_delay_sec', + type=int, + default=900, + help=( + 'Delay before starting the feeder pipeline that reads URIs from GCS ' + 'and publishes them to Pub/Sub. This delay allows the main streaming ' + 'pipeline workers to start and scale before data ingestion begins.'), + ) + + # Model & inference + parser.add_argument( + '--pretrained_model_name', + default='fasterrcnn_resnet50_fpn', + help=( + 'TorchVision detection model name ' + '(e.g., fasterrcnn_resnet50_fpn)')) + parser.add_argument( + '--model_state_dict_path', + required=True, + help='GCS path to a state_dict .pth for the chosen model') + parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU']) + + # Batch sizing (no right-fitting) + parser.add_argument('--inference_batch_size', type=int, default=8) + + # Preprocess + parser.add_argument('--resize_shorter_side', type=int, default=0) + + # Postprocess + parser.add_argument('--score_threshold', type=float, default=0.5) + parser.add_argument('--max_detections', type=int, default=50) + + # Windows + parser.add_argument('--window_sec', type=int, default=60) + parser.add_argument('--trigger_proc_time_sec', type=int, default=30) + + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def ensure_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + publisher.get_topic(request={"topic": full_topic_path}) + except NotFound: + publisher.create_topic(name=full_topic_path) + + try: + subscriber.get_subscription( + request={"subscription": full_subscription_path}) + except NotFound: + subscriber.create_subscription( + name=full_subscription_path, topic=full_topic_path) + + +def cleanup_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + subscriber.delete_subscription( + request={"subscription": full_subscription_path}) + print(f"Deleted subscription: {subscription_name}") + except NotFound: + print(f"Subscription already deleted: {subscription_name}") + + try: + publisher.delete_topic(request={"topic": full_topic_path}) + print(f"Deleted topic: {topic_name}") + except NotFound: + print(f"Topic already deleted: {topic_name}") + + +def override_or_add(args, flag, value): + if flag in args: + idx = args.index(flag) + args[idx + 1] = str(value) + else: + args.extend([flag, str(value)]) + + +def create_torchvision_detection_model(model_name: str): + """Creates a TorchVision detection model instance. + + Note: We will load weights via state_dict_path (required by Beam handler when + model_class is provided). + """ + import torchvision + + name = model_name.strip() + + if name == "fasterrcnn_resnet50_fpn": + model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=None) + elif name == "retinanet_resnet50_fpn": + model = torchvision.models.detection.retinanet_resnet50_fpn(weights=None) + else: + raise ValueError(f"Unsupported detection model: {model_name}") + + model.eval() + return model + + +# ============ Load pipeline ============ + + +def run_load_pipeline(known_args, pipeline_args): + """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming).""" + # enforce smaller/CPU-only defaults for feeder + override_or_add(pipeline_args, '--device', 'CPU') + override_or_add(pipeline_args, '--num_workers', '5') + override_or_add(pipeline_args, '--max_num_workers', '10') + override_or_add( + pipeline_args, '--job_name', f"images-load-pubsub-{int(time.time())}") + override_or_add(pipeline_args, '--project', known_args.project) + pipeline_args = [ + arg for arg in pipeline_args if not arg.startswith("--experiments") + ] + + pipeline_options = PipelineOptions(pipeline_args) + pipeline = beam.Pipeline(options=pipeline_options) + + _ = ( + pipeline + | 'ReadGCSFile' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmpty' >> beam.Filter(lambda line: line.strip()) + | 'ToBytes' >> beam.Map(lambda line: line.encode('utf-8')) + | 'ToPubSub' >> beam.io.WriteToPubSub(topic=known_args.pubsub_topic)) + return pipeline.run() + + +# ============ Main pipeline ============ + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + + if known_args.mode == 'streaming': + ensure_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + # Start feeder thread that reads URIs from GCS and fills Pub/Sub. + # Delay is used to allow the main streaming pipeline workers to start + # and autoscale before the feeder pipeline begins publishing messages. + threading.Thread( + target=lambda: ( + time.sleep(known_args.feeder_start_delay_sec), run_load_pipeline( + known_args, pipeline_args)), + daemon=True).start() + + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + pipeline_options.view_as(StandardOptions).streaming = ( + known_args.mode == 'streaming') + + device = 'GPU' if known_args.device.upper() == 'GPU' else 'CPU' + resize_shorter_side = ( + known_args.resize_shorter_side + ) if known_args.resize_shorter_side > 0 else None + + # Fixed batch size (no right-fitting) + batch_size = int(known_args.inference_batch_size) + + model_handler = PytorchModelHandlerTensor( + model_class=lambda: create_torchvision_detection_model( + known_args.pretrained_model_name), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=batch_size, + inference_fn=_torchvision_detection_inference_fn, + ) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + if known_args.mode == 'batch': + pcoll = ( + pipeline + | 'ReadURIsBatch' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())) + else: + pcoll = ( + pipeline + | 'ReadFromPubSub' >> + beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription) + | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8')) + | 'Window' >> beam.WindowInto( + window.FixedWindows(known_args.window_sec), + trigger=beam.trigger.AfterProcessingTime( + known_args.trigger_proc_time_sec), + accumulation_mode=beam.trigger.AccumulationMode.DISCARDING, + allowed_lateness=0)) + + keyed = (pcoll | 'MakeKey' >> beam.ParDo(MakeKeyDoFn())) + + preprocessed = ( + keyed + | 'DecodePreprocess' >> beam.ParDo( + DecodePreprocessDoFn(resize_shorter_side=resize_shorter_side))) + + to_infer = ( + preprocessed + | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"]))) + + predictions = ( + to_infer + | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))) + + results = ( + predictions + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + model_name=known_args.pretrained_model_name, + score_threshold=known_args.score_threshold, + max_detections=known_args.max_detections))) + + method = ( + beam.io.WriteToBigQuery.Method.FILE_LOADS if known_args.mode == 'batch' + else beam.io.WriteToBigQuery.Method.STREAMING_INSERTS) + + if known_args.publish_to_big_query == 'true': + _ = ( + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema=( + 'image_id:STRING, model_name:STRING, ' + 'detections:STRING, num_detections:INT64, infer_ms:INT64'), + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=method)) + + result = pipeline.run() + result.wait_until_finish(duration=1800000) # 30 min + try: + result.cancel() + except Exception: + pass + + if known_args.mode == 'streaming': + cleanup_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py new file mode 100644 index 000000000000..05acebc590e8 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py @@ -0,0 +1,522 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This pipeline performs image classification using an open-source +PyTorch EfficientNet-B0 model optimized for T4 GPUs. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, +and runs inference with adaptive batch sizing for optimal GPU utilization. +The pipeline targets stable and reproducible performance measurements under +continuous load. +Resources like Pub/Sub topic/subscription cleanup is handled programmatically. +""" + +import argparse +import io +import json +import logging +import threading +import time +from typing import Optional +from typing import Tuple + +import torch +import torch.nn.functional as F + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult +from apache_beam.transforms import window + +from google.api_core.exceptions import NotFound +from google.cloud import pubsub_v1 +import PIL.Image as PILImage + +# ============ Utility & Preprocessing ============ + +IMAGENET_MEAN = [0.485, 0.456, 0.406] +IMAGENET_STD = [0.229, 0.224, 0.225] + + +def now_millis() -> int: + return int(time.time() * 1000) + + +def load_image_from_uri(uri: str) -> bytes: + with FileSystems.open(uri) as f: + return f.read() + + +def decode_and_preprocess(image_bytes: bytes, size: int = 224) -> torch.Tensor: + """Decode bytes->RGB PIL->resize/crop->tensor->normalize.""" + with PILImage.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + img.thumbnail((256, 256)) + w, h = img.size + left = (w - size) // 2 + top = (h - size) // 2 + img = img.crop( + (max(0, left), max(0, top), min(w, left + size), min(h, top + size))) + + # To tensor [0..1] + import numpy as np + mean = np.array(IMAGENET_MEAN, dtype=np.float32) + std = np.array(IMAGENET_STD, dtype=np.float32) + arr = np.asarray(img).astype("float32") / 255.0 # H,W,3 + # Normalize + arr = (arr - mean) / std + # HWC -> CHW + arr = np.transpose(arr, (2, 0, 1)).astype("float32") + return torch.from_numpy(arr).float() # float32, shape (3,224,224) + + +class MakeKeyDoFn(beam.DoFn): + """Produce (image_id, payload) stable for dedup & BQ insertId.""" + def __init__(self, input_mode: str): + self.input_mode = input_mode + + def process(self, element: str | bytes): + # Input can be raw bytes from Pub/Sub or a GCS URI string, depends on mode + if self.input_mode == "bytes": + # element is bytes message, assume it includes + # {"image_id": "...", "bytes": base64?} or just raw bytes. + import hashlib + b = element if isinstance(element, + (bytes, + bytearray)) else element.encode('utf-8') + image_id = hashlib.sha1(b).hexdigest() + yield image_id, b + else: + # gcs_uris: element is uri string; image_id = sha1(uri) + import hashlib + uri = element.decode("utf-8") if isinstance( + element, (bytes, bytearray)) else str(element) + image_id = hashlib.sha1(uri.encode("utf-8")).hexdigest() + yield image_id, uri + + +class DecodePreprocessDoFn(beam.DoFn): + """Turn (image_id, bytes|uri) -> (image_id, torch.Tensor)""" + def __init__( + self, input_mode: str, image_size: int = 224, decode_threads: int = 4): + self.input_mode = input_mode + self.image_size = image_size + self.decode_threads = decode_threads + + def process(self, kv: Tuple[str, object]): + image_id, payload = kv + start = now_millis() + + try: + if self.input_mode == "bytes": + b = payload if isinstance(payload, + (bytes, bytearray)) else bytes(payload) + else: + uri = payload if isinstance(payload, str) else payload.decode("utf-8") + b = load_image_from_uri(uri) + + tensor = decode_and_preprocess(b, self.image_size) + preprocess_ms = now_millis() - start + yield image_id, {"tensor": tensor, "preprocess_ms": preprocess_ms} + except Exception as e: + logging.warning("Decode failed for %s: %s", image_id, e) + return + + +class PostProcessDoFn(beam.DoFn): + """PredictionResult -> dict row for BQ.""" + def __init__(self, top_k: int, model_name: str): + self.top_k = top_k + self.model_name = model_name + + def process(self, kv: Tuple[str, PredictionResult]): + image_id, pred = kv + + # pred can be PredictionResult OR raw inference object. + inference_obj = pred.inference if hasattr(pred, "inference") else pred + + # inference_obj can be dict {'logits': tensor} OR tensor directly. + if isinstance(inference_obj, dict): + logits = inference_obj.get("logits", None) + if logits is None: + # fallback: try first value if dict shape differs + try: + logits = next(iter(inference_obj.values())) + logging.warning( + 'Could not find key in model output.' + 'Falling back to first value in dict.') + except Exception: + logging.warning('Could not find key in dict.') + else: + logits = inference_obj + + if not isinstance(logits, torch.Tensor): + logging.warning( + "Unexpected logits type for %s: %s", image_id, type(logits)) + return + + # Ensure shape [1, C] + if logits.ndim == 1: + logits = logits.unsqueeze(0) + + probs = F.softmax(logits, dim=-1) # [B, C] + values, indices = torch.topk( + probs, k=min(self.top_k, probs.shape[-1]), dim=-1 + ) + + topk = [{ + "class_id": int(idx.item()), "score": float(val.item()) + } for idx, val in zip(indices[0], values[0])] + + yield { + "image_id": image_id, + "model_name": self.model_name, + "topk": json.dumps(topk), + "infer_ms": now_millis(), + } + + +# ============ Args & Helpers ============ + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + # I/O & runtime + parser.add_argument( + '--project', default='apache-beam-testing', help='GCP project ID') + parser.add_argument( + '--mode', default='streaming', choices=['streaming', 'batch']) + parser.add_argument( + '--output_table', + required=True, + help='BigQuery output table: dataset.table') + parser.add_argument( + '--publish_to_big_query', default='true', choices=['true', 'false']) + parser.add_argument( + '--input_mode', default='gcs_uris', choices=['gcs_uris', 'bytes']) + parser.add_argument( + '--input', + required=True, + help='GCS path to file with URIs (for load) OR unused for bytes') + parser.add_argument( + '--pubsub_topic', + default='projects/apache-beam-testing/topics/images_topic') + parser.add_argument( + '--pubsub_subscription', + default='projects/apache-beam-testing/subscriptions/images_subscription') + parser.add_argument( + '--feeder_start_delay_sec', + type=int, + default=900, + help=( + 'Delay before starting the feeder pipeline that reads URIs from GCS ' + 'and publishes them to Pub/Sub. This delay allows the main streaming ' + 'pipeline workers to start and scale before data ingestion begins.'), + ) + + # Model & inference + parser.add_argument( + '--pretrained_model_name', + default='efficientnet_b0', + help='OSS model name (e.g., efficientnet_b0|mobilenetv3_large_100)') + parser.add_argument( + '--model_state_dict_path', + default=None, + help='Optional state_dict to load') + parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU']) + parser.add_argument('--image_size', type=int, default=224) + parser.add_argument('--top_k', type=int, default=5) + parser.add_argument( + '--inference_batch_size', + default='auto', + help='int or "auto"; auto tries 64→32→16') + + # Windows + parser.add_argument('--window_sec', type=int, default=60) + parser.add_argument('--trigger_proc_time_sec', type=int, default=30) + + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def ensure_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + publisher.get_topic(request={"topic": full_topic_path}) + except NotFound: + publisher.create_topic(name=full_topic_path) + + try: + subscriber.get_subscription( + request={"subscription": full_subscription_path}) + except NotFound: + subscriber.create_subscription( + name=full_subscription_path, topic=full_topic_path) + + +def cleanup_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + subscriber.delete_subscription( + request={"subscription": full_subscription_path}) + print(f"Deleted subscription: {subscription_name}") + except NotFound: + print(f"Subscription already deleted: {subscription_name}") + + try: + publisher.delete_topic(request={"topic": full_topic_path}) + print(f"Deleted topic: {topic_name}") + except NotFound: + print(f"Topic already deleted: {topic_name}") + + +def override_or_add(args, flag, value): + if flag in args: + idx = args.index(flag) + args[idx + 1] = str(value) + else: + args.extend([flag, str(value)]) + + +# ============ Model factory (timm) ============ + + +def create_timm_m(model_name: str, num_classes: int = 1000): + import timm + model = timm.create_model( + model_name, pretrained=True, num_classes=num_classes) + model.eval() + return model + + +def pick_batch_size(arg: str) -> Optional[int]: + if isinstance(arg, str) and arg.lower() == 'auto': + return None + try: + return int(arg) + except Exception: + return None + + +# ============ Load pipeline ============ + + +def run_load_pipeline(known_args, pipeline_args): + """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming).""" + # enforce smaller/CPU-only defaults for feeder + override_or_add(pipeline_args, '--device', 'CPU') + override_or_add(pipeline_args, '--num_workers', '5') + override_or_add(pipeline_args, '--max_num_workers', '10') + override_or_add( + pipeline_args, '--job_name', f"images-load-pubsub-{int(time.time())}") + override_or_add(pipeline_args, '--project', known_args.project) + pipeline_args = [ + arg for arg in pipeline_args if not arg.startswith("--experiments") + ] + + pipeline_options = PipelineOptions(pipeline_args) + pipeline = beam.Pipeline(options=pipeline_options) + + _ = ( + pipeline + | 'ReadGCSFile' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmpty' >> beam.Filter(lambda line: line.strip()) + | 'ToBytes' >> beam.Map(lambda line: line.encode('utf-8')) + | 'ToPubSub' >> beam.io.WriteToPubSub(topic=known_args.pubsub_topic)) + return pipeline.run() + + +# ============ Main pipeline ============ + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + + if known_args.mode == 'streaming': + ensure_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + # Start feeder thread that reads URIs from GCS and fills Pub/Sub. + # Delay is used to allow the main streaming pipeline workers to start + # and autoscale before the feeder pipeline begins publishing messages. + threading.Thread( + target=lambda: ( + time.sleep(known_args.feeder_start_delay_sec), run_load_pipeline( + known_args, pipeline_args)), + daemon=True).start() + + # StandardOptions + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + pipeline_options.view_as(StandardOptions).streaming = ( + known_args.mode == 'streaming') + + # Build model handler with right-fitting batch size + desired_batch = pick_batch_size(known_args.inference_batch_size) + tried = [64, 32, 16] if desired_batch is None else [desired_batch] + + # Device + device = 'GPU' if known_args.device.upper() == 'GPU' else 'CPU' + + bs_ok = None + last_err = None + for bs in tried: + try: + model_handler = PytorchModelHandlerTensor( + model_class=lambda: create_timm_m(known_args.pretrained_model_name), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=bs + if bs is not None else 64, # start guess for warmup + ) + # quick warmup to validate memory (single dummy tensor) + dummy = torch.zeros((3, known_args.image_size, known_args.image_size), + dtype=torch.float32) + _ = model_handler.load_model() # ensures weights on device + with torch.no_grad(): + mdl = model_handler._model + mdl(torch.unsqueeze(dummy, 0)) + bs_ok = bs if bs is not None else 64 + break + except RuntimeError as e: + last_err = e + logging.warning("Batch size %s failed during warmup: %s", bs, e) + continue + + if bs_ok is None: + logging.warning( + "Falling back to batch_size=8 due to previous errors: %s", last_err) + bs_ok = 8 + model_handler = PytorchModelHandlerTensor( + model_class=lambda: create_timm_m(known_args.pretrained_model_name), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=bs_ok, + ) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + if known_args.mode == 'batch': + pcoll = ( + pipeline + | 'ReadURIsBatch' >> beam.io.ReadFromText(known_args.input) + | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())) + else: + pcoll = ( + pipeline + | 'ReadFromPubSub' >> + beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription) + | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8')) + | 'Window' >> beam.WindowInto( + window.FixedWindows(known_args.window_sec), + trigger=beam.trigger.AfterProcessingTime( + known_args.trigger_proc_time_sec), + accumulation_mode=beam.trigger.AccumulationMode.DISCARDING, + allowed_lateness=0)) + + keyed = ( + pcoll + | 'MakeKey' >> beam.ParDo(MakeKeyDoFn(input_mode=known_args.input_mode))) + + preprocessed = ( + keyed + | 'DecodePreprocess' >> beam.ParDo( + DecodePreprocessDoFn( + input_mode=known_args.input_mode, + image_size=known_args.image_size))) + + to_infer = ( + preprocessed + | + 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"].float()))) + + predictions = ( + to_infer + | 'RunInference' >> RunInference( + KeyedModelHandler(model_handler)).with_resource_hints( + accelerator="type:nvidia-tesla-t4;count:1;install-nvidia-driver")) + + results = ( + predictions + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + top_k=known_args.top_k, + model_name=known_args.pretrained_model_name))) + + method = ( + beam.io.WriteToBigQuery.Method.FILE_LOADS if known_args.mode == 'batch' + else beam.io.WriteToBigQuery.Method.STREAMING_INSERTS) + + if known_args.publish_to_big_query == 'true': + _ = ( + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema= + 'image_id:STRING, model_name:STRING, topk:STRING, infer_ms:INT64', + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=method)) + + result = pipeline.run() + result.wait_until_finish(duration=1800000) # 30 min + try: + result.cancel() + except Exception: + pass + + if known_args.mode == 'streaming': + cleanup_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_requirements.txt b/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_requirements.txt new file mode 100644 index 000000000000..726cc147a0aa --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/pytorch_image_captioning_requirements.txt @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +torch>=2.6.0,<2.8.0 +Pillow>=10.0.0 +numpy>=1.25.0 +transformers>=4.41.0,<5.0.0 +accelerate>=0.30.0 +tokenizers>=0.19.0 +safetensors>=0.4.3 +protobuf>=4.25.1 +requests>=2.31.0 +google-cloud-monitoring>=2.27.0 diff --git a/sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt b/sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt new file mode 100644 index 000000000000..c3ce392d1a4f --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/pytorch_image_object_detection_requirements.txt @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +torch>=2.2.0,<2.8.0 +torchvision>=0.17.0,<0.21.0 +Pillow>=10.0.0 +numpy>=1.25.0 +google-cloud-monitoring>=2.27.0 +protobuf>=4.25.1 +requests>=2.31.0 diff --git a/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt new file mode 100644 index 000000000000..2b2916c577ea --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +torch>=2.2.0,<2.8.0 +torchvision>=0.17.0,<0.21.0 +timm>=1.0.7 +Pillow>=10.0.0 +numpy>=1.25.0 +google-cloud-pubsub>=2.15.0 +google-cloud-monitoring>=2.27.0 +protobuf>=4.25.1 +requests>=2.31.0 diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py new file mode 100644 index 000000000000..2960bfb74689 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_captioning_benchmarks.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging + +from apache_beam.examples.inference import pytorch_image_captioning +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class PytorchImageCaptioningBenchmarkTest(DataflowCostBenchmark): + def __init__(self): + self.metrics_namespace = 'BeamML_PyTorch' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='PostProcess.out0') + + def test(self): + extra_opts = {} + extra_opts['input'] = self.pipeline.get_option('input_file') + self.result = pytorch_image_captioning.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + PytorchImageCaptioningBenchmarkTest().run() diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py new file mode 100644 index 000000000000..3bfd5daec187 --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_image_object_detection_benchmarks.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging + +from apache_beam.examples.inference import pytorch_image_object_detection +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class PytorchImageObjectDetectionBenchmarkTest(DataflowCostBenchmark): + def __init__(self): + self.metrics_namespace = 'BeamML_PyTorch' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='PostProcess.out0') + + def test(self): + extra_opts = {} + extra_opts['input'] = self.pipeline.get_option('input_file') + self.result = pytorch_image_object_detection.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + PytorchImageObjectDetectionBenchmarkTest().run() diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py new file mode 100644 index 000000000000..1528711a801b --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging + +from apache_beam.examples.inference import pytorch_imagenet_rightfit +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class PytorchImagenetRightfitBenchmarkTest(DataflowCostBenchmark): + def __init__(self): + self.metrics_namespace = 'BeamML_PyTorch' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='PostProcess.out0') + + def test(self): + extra_opts = {} + extra_opts['input'] = self.pipeline.get_option('input_file') + self.result = pytorch_imagenet_rightfit.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + PytorchImagenetRightfitBenchmarkTest().run() diff --git a/website/www/site/content/en/performance/_index.md b/website/www/site/content/en/performance/_index.md index 1624c58efe2e..133dcadc483a 100644 --- a/website/www/site/content/en/performance/_index.md +++ b/website/www/site/content/en/performance/_index.md @@ -46,6 +46,14 @@ See the following pages for performance measures recorded when running various B ## Streaming - [PyTorch Sentiment Analysis Streaming DistilBERT base](/performance/pytorchbertsentimentstreaming) +- [PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) CPU](/performance/pytorchimagenetrightfitcpu) +- [PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting Exactly-once) CPU](/performance/pytorchimagenetrightfitoncecpu) +- [PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) GPU](/performance/pytorchimagenetrightfitgpu) +- [PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting Exactly-once)](/performance/pytorchimagenetrightfitoncegpu) +- [PyTorch Image Captioning BLIP + CLIP Streaming CPU](/performance/pytorchimagecaptioningstreamingcpu) +- [PyTorch Image Captioning BLIP + CLIP Streaming GPU](/performance/pytorchimagecaptioningstreaminggpu) +- [PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming CPU](/performance/pytorchimageobjectdetectionstreamingcpu) +- [PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming GPU](/performance/pytorchimageobjectdetectionstreaminggpu) - [Table Row Inference Sklearn Streaming](/performance/tablerowinferencestreaming) ## Batch @@ -59,3 +67,7 @@ See the following pages for performance measures recorded when running various B - [TensorFlow MNIST Image Classification](/performance/tensorflowmnist) - [VLLM Gemma Batch Completion Tesla T4 GPU](/performance/vllmgemmabatchtesla) - [Table Row Inference Sklearn Batch](/performance/tablerowinference) +- [PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU](/performance/pytorchimageobjectdetectionbatchcpu) +- [PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch GPU](/performance/pytorchimageobjectdetectionbatchgpu) +- [PyTorch Image Captioning BLIP + CLIP Batch CPU](/performance/pytorchimagecaptioningbatchcpu) +- [PyTorch Image Captioning BLIP + CLIP Batch GPU](/performance/pytorchimagecaptioningbatchgpu) diff --git a/website/www/site/content/en/performance/pytorchimagecaptioningbatchcpu/_index.md b/website/www/site/content/en/performance/pytorchimagecaptioningbatchcpu/_index.md new file mode 100644 index 000000000000..d5d5f094e553 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagecaptioningbatchcpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Captioning BLIP + CLIP Batch CPU Performance" +--- + + + +# PyTorch Image Captioning BLIP + CLIP Batch CPU + +**Model**: PyTorch Image Captioning — BLIP (candidate generation) + CLIP (ranking) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This batch pipeline performs image captioning using a multi-model open-source PyTorch approach. +It first generates multiple candidate captions per image using a BLIP model, then ranks these candidates with a CLIP model based on image-text similarity. + +The following graphs show various metrics when running PyTorch Image Captioning BLIP + CLIP Batch CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagecaptioningbatchcpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagecaptioningbatchcpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagecaptioningbatchcpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagecaptioningbatchgpu/_index.md b/website/www/site/content/en/performance/pytorchimagecaptioningbatchgpu/_index.md new file mode 100644 index 000000000000..d0d12bf3f540 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagecaptioningbatchgpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Captioning BLIP + CLIP Batch GPU Performance" +--- + + + +# PyTorch Image Captioning BLIP + CLIP Batch GPU + +**Model**: PyTorch Image Captioning — BLIP (candidate generation) + CLIP (ranking) +**Accelerator**: Tesla T4 GPU +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This batch pipeline performs image captioning using a multi-model open-source PyTorch approach. +It first generates multiple candidate captions per image using a BLIP model, then ranks these candidates with a CLIP model based on image-text similarity. + +The following graphs show various metrics when running PyTorch Image Captioning BLIP + CLIP Batch GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagecaptioningbatchgpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagecaptioningbatchgpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagecaptioningbatchgpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagecaptioningstreamingcpu/_index.md b/website/www/site/content/en/performance/pytorchimagecaptioningstreamingcpu/_index.md new file mode 100644 index 000000000000..41223f1bf0b0 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagecaptioningstreamingcpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Captioning BLIP + CLIP Streaming CPU Performance" +--- + + + +# PyTorch Image Captioning BLIP + CLIP Streaming CPU + +**Model**: PyTorch Image Captioning — BLIP (candidate generation) + CLIP (ranking) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image captioning using a multi-model open-source PyTorch approach. +It first generates multiple candidate captions per image using a BLIP model, then ranks these candidates with a CLIP model based on image-text similarity. + +The following graphs show various metrics when running PyTorch Image Captioning BLIP + CLIP Streaming CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagecaptioningstreamingcpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagecaptioningstreamingcpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagecaptioningstreamingcpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagecaptioningstreaminggpu/_index.md b/website/www/site/content/en/performance/pytorchimagecaptioningstreaminggpu/_index.md new file mode 100644 index 000000000000..b7694baa82e2 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagecaptioningstreaminggpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Captioning BLIP + CLIP Streaming GPU Performance" +--- + + + +# PyTorch Image Captioning BLIP + CLIP Streaming GPU + +**Model**: PyTorch Image Captioning — BLIP (candidate generation) + CLIP (ranking) +**Accelerator**: Tesla T4 GPU +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image captioning using a multi-model open-source PyTorch approach. +It first generates multiple candidate captions per image using a BLIP model, then ranks these candidates with a CLIP model based on image-text similarity. + +The following graphs show various metrics when running PyTorch Image Captioning BLIP + CLIP Streaming GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_captioning.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagecaptioningstreaminggpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagecaptioningstreaminggpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagecaptioningstreaminggpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagenetrightfitcpu/_index.md b/website/www/site/content/en/performance/pytorchimagenetrightfitcpu/_index.md new file mode 100644 index 000000000000..9b7b26d9b3ef --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagenetrightfitcpu/_index.md @@ -0,0 +1,43 @@ +--- +title: "PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) CPU Performance" +--- + + + +# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + +**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on ImageNet) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image classification using an open-source PyTorch EfficientNet-B0 model. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, and runs inference with adaptive batch sizing. + +The following graphs show various metrics when running the PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitcpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagenetrightfitcpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitcpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagenetrightfitgpu/_index.md b/website/www/site/content/en/performance/pytorchimagenetrightfitgpu/_index.md new file mode 100644 index 000000000000..fefd149b8416 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagenetrightfitgpu/_index.md @@ -0,0 +1,43 @@ +--- +title: "PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) GPU Performance" +--- + + + +# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + +**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on ImageNet) +**Accelerator**: Tesla T4 GPU (right-fitted batch size 64 → 32 → 16 → 8) +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image classification using an open-source PyTorch EfficientNet-B0 model optimized for T4 GPUs. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, and runs inference with adaptive batch sizing for optimal GPU utilization. + +The following graphs show various metrics when running the PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitgpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagenetrightfitgpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitgpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagenetrightfitoncecpu/_index.md b/website/www/site/content/en/performance/pytorchimagenetrightfitoncecpu/_index.md new file mode 100644 index 000000000000..8e0cf82c349e --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagenetrightfitoncecpu/_index.md @@ -0,0 +1,44 @@ +--- +title: "PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting, Exactly-once) CPU Performance" +--- + + + +# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + +**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on ImageNet) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image classification using an open-source PyTorch EfficientNet-B0 model. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, and runs inference with adaptive batch sizing. +The pipeline ensures exactly-once semantics via stateful deduplication and idempotent BigQuery writes, allowing stable and reproducible performance measurements under continuous load. + +The following graphs show various metrics when running the PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting, Exactly-once) CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitoncecpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagenetrightfitoncecpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitoncecpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimagenetrightfitoncegpu/_index.md b/website/www/site/content/en/performance/pytorchimagenetrightfitoncegpu/_index.md new file mode 100644 index 000000000000..85a55b27955b --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagenetrightfitoncegpu/_index.md @@ -0,0 +1,44 @@ +--- +title: "PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting, Exactly-once) GPU Performance" +--- + + + +# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + +**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on ImageNet) +**Accelerator**: Tesla T4 GPU (right-fitted batch size 64 → 32 → 16 → 8) +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image classification using an open-source PyTorch EfficientNet-B0 model optimized for T4 GPUs. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, and runs inference with adaptive batch sizing for optimal GPU utilization. +The pipeline ensures exactly-once semantics via stateful deduplication and idempotent BigQuery writes, allowing stable and reproducible performance measurements under continuous load. + +The following graphs show various metrics when running the PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting, Exactly-once) GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitoncegpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagenetrightfitoncegpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagenetrightfitoncegpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchcpu/_index.md b/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchcpu/_index.md new file mode 100644 index 000000000000..a2ef39a2cf32 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchcpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU Performance" +--- + + + +# PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU + +**Model**: PyTorch Image Object Detection — Faster R-CNN ResNet-50 FPN (pretrained on COCO) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This batch pipeline performs object detection using an open-source PyTorch Faster R-CNN ResNet-50 FPN model on CPU. +It reads image URIs from GCS, decodes and preprocesses images, and runs batched inference with a fixed batch size. + +The following graphs show various metrics when running PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionbatchcpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimageobjectdetectionbatchcpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionbatchcpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchgpu/_index.md b/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchgpu/_index.md new file mode 100644 index 000000000000..f6d6107d19f9 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimageobjectdetectionbatchgpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch GPU Performance" +--- + + + +# PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch GPU + +**Model**: PyTorch Image Object Detection — Faster R-CNN ResNet-50 FPN (pretrained on COCO) +**Accelerator**: Tesla T4 GPU (fixed batch size) +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This batch pipeline performs object detection using an open-source PyTorch Faster R-CNN ResNet-50 FPN model on GPU. +It reads image URIs from GCS, decodes and preprocesses images, and runs batched inference with a fixed batch size to measure stable GPU performance. + +The following graphs show various metrics when running PyTorch Image Object Detection Faster R-CNN ResNet-50 Batch GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionbatchgpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimageobjectdetectionbatchgpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionbatchgpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimageobjectdetectionstreamingcpu/_index.md b/website/www/site/content/en/performance/pytorchimageobjectdetectionstreamingcpu/_index.md new file mode 100644 index 000000000000..b866d2893cd7 --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimageobjectdetectionstreamingcpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming CPU Performance" +--- + + + +# PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming CPU + +**Model**: PyTorch Image Object Detection — Faster R-CNN ResNet-50 FPN (pretrained on COCO) +**Accelerator**: CPU only +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs object detection using an open-source PyTorch Faster R-CNN ResNet-50 FPN model on CPU. +It reads image URIs from GCS, decodes and preprocesses images, and runs inference. + +The following graphs show various metrics when running PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming CPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionstreamingcpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimageobjectdetectionstreamingcpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionstreamingcpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/content/en/performance/pytorchimageobjectdetectionstreaminggpu/_index.md b/website/www/site/content/en/performance/pytorchimageobjectdetectionstreaminggpu/_index.md new file mode 100644 index 000000000000..443f5e99e76f --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimageobjectdetectionstreaminggpu/_index.md @@ -0,0 +1,41 @@ +--- +title: "PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming GPU Performance" +--- + + + +# PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming GPU + +**Model**: PyTorch Image Object Detection — Faster R-CNN ResNet-50 FPN (pretrained on COCO) +**Accelerator**: Tesla T4 GPU +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs object detection using an open-source PyTorch Faster R-CNN ResNet-50 FPN model on GPU. +It reads image URIs from GCS, decodes and preprocesses images, and runs inference. + +The following graphs show various metrics when running PyTorch Image Object Detection Faster R-CNN ResNet-50 Streaming GPU pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_image_object_detection.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionstreaminggpu" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimageobjectdetectionstreaminggpu" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimageobjectdetectionstreaminggpu" read_or_write="write" section="date" >}} diff --git a/website/www/site/data/performance.yaml b/website/www/site/data/performance.yaml index 8841bbb58ecb..4b6d527e01c0 100644 --- a/website/www/site/data/performance.yaml +++ b/website/www/site/data/performance.yaml @@ -283,3 +283,195 @@ looks: title: AvgThroughputBytesPerSec by Version - id: P7wKZy6tQFWbbDfm4HzfCJnsQrVgfGsJ title: AvgThroughputElementsPerSec by Version + pytorchimagenetrightfitcpu: + write: + folder: 92 + cost: + - id: zJhxrMmxJ3zVHH5WZnDQqcBHdFDrBhxK + title: RunTime and EstimatedCost + date: + - id: RybzxZdkXJg6PzQZkBcfxTJkByTf3ZV5 + title: AvgThroughputBytesPerSec by Date + - id: xTVYPytQVH7zXYz7SvphnRV4nQcxCddp + title: AvgThroughputElementsPerSec by Date + version: + - id: dGN6Zr6rh7DfnRtTDCN6GHcNfhSkrbCq + title: AvgThroughputBytesPerSec by Version + - id: VJMWrZh3jXk2mqCZk4NQn3tBrHgGWqnC + title: AvgThroughputElementsPerSec by Version + pytorchimagenetrightfitoncecpu: + write: + folder: 97 + cost: + - id: DhsvrRxpby2F25jbj8NPVR6FhzKgsHBP + title: RunTime and EstimatedCost + date: + - id: sntwpmZjMVT57kvG8KM8rzDjBKsrRTF7 + title: AvgThroughputBytesPerSec by Date + - id: 5GJHCVH65y2zmnrZ3QqmYqk8vKfPD4D6 + title: AvgThroughputElementsPerSec by Date + version: + - id: rz24FQ24rz3HtFmpzK2rKD4zFT5kpG46 + title: AvgThroughputBytesPerSec by Version + - id: jZkcSdHbvmGXy8f37RShNvgGv6HVN7Dy + title: AvgThroughputElementsPerSec by Version + pytorchimagenetrightfitgpu: + write: + folder: 98 + cost: + - id: sMgzq4N8kJfjSjhSzVDzmXSHbXKwCy4r + title: RunTime and EstimatedCost + date: + - id: t3D6hqq7BywmvjBmxtWCvRMsFFMgPBg7 + title: AvgThroughputBytesPerSec by Date + - id: MRptKCFsnTKcxPgVKKVvs7c8XVTf6MSN + title: AvgThroughputElementsPerSec by Date + version: + - id: v6jHqBc8WvCPspgdxQdjSZ6bCgqqPvZ3 + title: AvgThroughputBytesPerSec by Version + - id: DhGNQDmRDhYSCDgXDsx6BtCcDfwKCWhb + title: AvgThroughputElementsPerSec by Version + pytorchimagenetrightfitoncegpu: + write: + folder: 99 + cost: + - id: YbwBRmMVBCxPzjm22vWvJ2fDG5mmM3BR + title: RunTime and EstimatedCost + date: + - id: WtCZn2CrRrDdygbxQhfVhNM8F7NRRpsW + title: AvgThroughputBytesPerSec by Date + - id: RBMNkqksbcN3WcwCxz4B5mxVq5Y8yk2x + title: AvgThroughputElementsPerSec by Date + version: + - id: 48tzWvydG73Y3QwPJwQzYCkzdDnpF2T7 + title: AvgThroughputBytesPerSec by Version + - id: 3VDCrNY6q8FyS28YRrxjJtMBYkxKRdBw + title: AvgThroughputElementsPerSec by Version + pytorchimageobjectdetectionbatchgpu: + write: + folder: 93 + cost: + - id: XGPVSYhVbZGJHQCtMPW4nRGynxXpdzdh + title: RunTime and EstimatedCost + date: + - id: ZW48KBPBxShGgWx53vjfvgqp6cVmHCNB + title: AvgThroughputBytesPerSec by Date + - id: BjMWg26F3gNHQyZSsg8HjhTg3mCh6jFJ + title: AvgThroughputElementsPerSec by Date + version: + - id: kbF7Gnqjsjnvh3MKvYszSgWFbgDYYTWR + title: AvgThroughputBytesPerSec by Version + - id: cTxrpY3KGrCb35dq7fnjvdsDnd7t85pJ + title: AvgThroughputElementsPerSec by Version + pytorchimageobjectdetectionbatchcpu: + write: + folder: 100 + cost: + - id: QbMVTdnxzmZzfymSrKsGTvyqFTMpwqy7 + title: RunTime and EstimatedCost + date: + - id: v6H8sqn7X66j5g6qpgC6B57z624cCfDM + title: AvgThroughputBytesPerSec by Date + - id: KBMSpWzB5RgdYsTdZxGCXDYbWpskVqGS + title: AvgThroughputElementsPerSec by Date + version: + - id: GWz5VDrMTxSCrFqQwk2MyPHMwBHC253c + title: AvgThroughputBytesPerSec by Version + - id: NBWHNG8GmGHp8Bmg25wqg3tP8jWdP5PW + title: AvgThroughputElementsPerSec by Version + pytorchimageobjectdetectionstreaminggpu: + write: + folder: 101 + cost: + - id: SfgfDsbRVk67XMnKZShq3xVBFghN7GSv + title: RunTime and EstimatedCost + date: + - id: MtMJZTTrkgXZz24zbQbpzwYNjWwcfhqw + title: AvgThroughputBytesPerSec by Date + - id: kKncXDm5MKvfwVzFyXqQ4TpfR5VdT5z7 + title: AvgThroughputElementsPerSec by Date + version: + - id: ZWQ7vnsXYb5PDr5yDFjM3QP8gkkmdgdQ + title: AvgThroughputBytesPerSec by Version + - id: cTmDCbdCQyrsK3fS333vmR3BwCwrGZgp + title: AvgThroughputElementsPerSec by Version + pytorchimageobjectdetectionstreamingcpu: + write: + folder: 102 + cost: + - id: FHgY6JfxRc23jJ6THkb7sPVw2HSw4kgs + title: RunTime and EstimatedCost + date: + - id: G5DNH6vXpGVnYP8ZGMwppdGvgvndM8kn + title: AvgThroughputBytesPerSec by Date + - id: CGR43NrksWDscGNWCKsVGCwc3RxmvGhD + title: AvgThroughputElementsPerSec by Date + version: + - id: wGGVRGxBjXxQf3yvynSFtcDCVCqKzByy + title: AvgThroughputBytesPerSec by Version + - id: M5nWYF76fWPnbTH6QkzWqzRV8fSWxtWD + title: AvgThroughputElementsPerSec by Version + pytorchimagecaptioningbatchgpu: + write: + folder: 94 + cost: + - id: nhYggnCYJzfgDyTXZRGJwpSyYKvBpfhV + title: RunTime and EstimatedCost + date: + - id: qzjkYQJCrxjc8GPcBF7dFJSFdTXQKxmR + title: AvgThroughputBytesPerSec by Date + - id: kgkdbBCbkhDHpb2rHNGQDxcwGqzykwqD + title: AvgThroughputElementsPerSec by Date + version: + - id: 4p7b4HWVMRC8HXDZYMXbVYf2VPzYQ2kz + title: AvgThroughputBytesPerSec by Version + - id: xQZRDYJRvmV7qMhb3j2XTrPmC8TP4DTr + title: AvgThroughputElementsPerSec by Version + pytorchimagecaptioningbatchcpu: + write: + folder: 103 + cost: + - id: M4vTdbrK6BxJXdXcnR7WPwW7VF4Fmhbb + title: RunTime and EstimatedCost + date: + - id: T23Fty8X6dF6DFhxSy32ymSwqz64fPXT + title: AvgThroughputBytesPerSec by Date + - id: Wm2qdmKgHbzfJbW7vNVfrGtQWV4xGCx3 + title: AvgThroughputElementsPerSec by Date + version: + - id: wnQqxHgwJstnV4fPCkNpvMxJpyPG4326 + title: AvgThroughputBytesPerSec by Version + - id: kPMV7d8Htpp6CT8q5hC6NF5DbTRwBQcJ + title: AvgThroughputElementsPerSec by Version + pytorchimagecaptioningstreaminggpu: + write: + folder: 104 + cost: + - id: rgrPw4ZdZjCD8cvKqdvdPGGtPJgWfNK5 + title: RunTime and EstimatedCost + date: + - id: jwpm4S36mGXPrY2nfpWrCvDHN6Nsrnpq + title: AvgThroughputBytesPerSec by Date + - id: QmY5QVTpndNTBJdqWjMxxD4zQkZqn6Xs + title: AvgThroughputElementsPerSec by Date + version: + - id: htD3YwWcTPYcRk4wqtqHshnt7hrgyrfk + title: AvgThroughputBytesPerSec by Version + - id: rv3QbsbxBV68MWG8hdjhnH84zJsPgJQh + title: AvgThroughputElementsPerSec by Version + pytorchimagecaptioningstreamingcpu: + write: + folder: 105 + cost: + - id: YkFKPzV2Rvgy5hmmx2KPyJCHhbxhSjzj + title: RunTime and EstimatedCost + date: + - id: 5WStCthVw8gtCdKrY5mhQf8T6N45GGqH + title: AvgThroughputBytesPerSec by Date + - id: ZnJj4QFrd3dwmsmv3xdNKytPfbW7j88C + title: AvgThroughputElementsPerSec by Date + version: + - id: VHqjNCVNScFqHF8d65khMDGxfSPrBrMh + title: AvgThroughputBytesPerSec by Version + - id: ykZJhrhHDzyPBSTxyNcdYG7yw5qs3dYj + title: AvgThroughputElementsPerSec by Version