Skip to content

Commit aa5797f

Browse files
authored
Add pipeline hash (#38357)
* Add Pipeline Options * Add Pipeline Hash along with URL * fix go lang staging * Fix formatting * backmerge master * remove cache * fix test * fix proto hash creation * fix lint * move pipeline hash to sdkharness * fix spotless
1 parent dc7b5b0 commit aa5797f

8 files changed

Lines changed: 125 additions & 12 deletions

File tree

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,6 +1368,9 @@ public DataflowPipelineJob run(Pipeline pipeline) {
13681368
options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
13691369
dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());
13701370

1371+
String pipelineProtoHash = Hashing.sha256().hashBytes(serializedProtoPipeline).toString();
1372+
options.as(SdkHarnessOptions.class).setPipelineProtoHash(pipelineProtoHash);
1373+
13711374
if (useUnifiedWorker(options)) {
13721375
LOG.info("Skipping v1 transform replacements since job will run on v2.");
13731376
} else {

sdks/go/pkg/beam/runners/dataflow/dataflow.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ package dataflow
2424

2525
import (
2626
"context"
27+
"crypto/sha256"
28+
"encoding/hex"
2729
"encoding/json"
2830
"flag"
2931
"fmt"
@@ -40,6 +42,7 @@ import (
4042
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
4143
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/pipelinex"
4244
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/hooks"
45+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
4346
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
4447
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
4548
"github.com/apache/beam/sdks/v2/go/pkg/beam/options/gcpopts"
@@ -239,7 +242,10 @@ func Execute(ctx context.Context, p *beam.Pipeline) (beam.PipelineResult, error)
239242
log.Info(ctx, "Dry-run: not submitting job!")
240243

241244
log.Info(ctx, model.String())
242-
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL)
245+
modelBytes := protox.MustEncode(model)
246+
hash := sha256.Sum256(modelBytes)
247+
pipelineProtoHash := hex.EncodeToString(hash[:])
248+
job, err := dataflowlib.Translate(ctx, model, opts, workerURL, modelURL, pipelineProtoHash)
243249
if err != nil {
244250
return nil, err
245251
}

sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package dataflowlib
1919

2020
import (
2121
"context"
22+
"crypto/sha256"
23+
"encoding/hex"
2224
"encoding/json"
2325
"os"
2426
"strings"
@@ -83,14 +85,17 @@ func Execute(ctx context.Context, raw *pipepb.Pipeline, opts *JobOptions, worker
8385
// (2) Upload model to GCS
8486
log.Info(ctx, raw.String())
8587

86-
if err := StageModel(ctx, opts.Project, modelURL, protox.MustEncode(raw)); err != nil {
88+
modelBytes := protox.MustEncode(raw)
89+
modelHash := sha256.Sum256(modelBytes)
90+
pipelineProtoHash := hex.EncodeToString(modelHash[:])
91+
if err := StageModel(ctx, opts.Project, modelURL, modelBytes); err != nil {
8792
return presult, err
8893
}
8994
log.Infof(ctx, "Staged model pipeline: %v", modelURL)
9095

9196
// (3) Translate to v1b3 and submit
9297

93-
job, err := Translate(ctx, raw, opts, workerURL, modelURL)
98+
job, err := Translate(ctx, raw, opts, workerURL, modelURL, pipelineProtoHash)
9499
if err != nil {
95100
return presult, err
96101
}

sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func containerImages(p *pipepb.Pipeline) ([]*df.SdkHarnessContainerImage, []stri
117117
}
118118

119119
// Translate translates a pipeline to a Dataflow job.
120-
func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string) (*df.Job, error) {
120+
func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, workerURL, modelURL string, pipelineProtoHash string) (*df.Job, error) {
121121
// (1) Translate pipeline to v1b3 speak.
122122

123123
jobType := "JOB_TYPE_BATCH"
@@ -181,10 +181,11 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
181181
SdkPipelineOptions: newMsg(pipelineOptions{
182182
DisplayData: printOptions(opts, images),
183183
Options: dataflowOptions{
184-
PipelineURL: modelURL,
185-
Region: opts.Region,
186-
Experiments: opts.Experiments,
187-
TempLocation: opts.TempLocation,
184+
PipelineURL: modelURL,
185+
PipelineProtoHash: pipelineProtoHash,
186+
Region: opts.Region,
187+
Experiments: opts.Experiments,
188+
TempLocation: opts.TempLocation,
188189
},
189190
GoOptions: opts.Options,
190191
}),
@@ -359,6 +360,7 @@ func GetMetrics(ctx context.Context, client *df.Service, project, region, jobID
359360
type dataflowOptions struct {
360361
Experiments []string `json:"experiments,omitempty"`
361362
PipelineURL string `json:"pipelineUrl"`
363+
PipelineProtoHash string `json:"pipelineProtoHash,omitempty"`
362364
Region string `json:"region"`
363365
TempLocation string `json:"tempLocation"`
364366
DiskProvisionedIops int64 `json:"diskProvisionedIops"`

sdks/go/pkg/beam/runners/dataflow/dataflowlib/job_test.go

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"reflect"
2222
"testing"
2323

24+
"encoding/json"
25+
26+
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
2427
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx"
2528
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/protox"
2629
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
@@ -293,7 +296,7 @@ func TestTranslate(t *testing.T) {
293296
workerURL := "gs://any-location/temp"
294297
modelURL := "gs://any-location/temp"
295298

296-
job, err := Translate(ctx, p, opts, workerURL, modelURL)
299+
job, err := Translate(ctx, p, opts, workerURL, modelURL, "dummy-hash-12345")
297300
if err != nil {
298301
t.Fatalf("Translate(...) error = %v, want nil", err)
299302
}
@@ -310,3 +313,49 @@ func TestTranslate(t *testing.T) {
310313
t.Errorf("DiskProvisionedThroughputMibps = %v, want 200", wp.DiskProvisionedThroughputMibps)
311314
}
312315
}
316+
317+
func TestTranslateWithPipelineHash(t *testing.T) {
318+
p := &pipepb.Pipeline{
319+
Components: &pipepb.Components{
320+
Environments: map[string]*pipepb.Environment{
321+
"env1": {
322+
Payload: protox.MustEncode(&pipepb.DockerPayload{
323+
ContainerImage: "dummy_image",
324+
}),
325+
},
326+
},
327+
},
328+
}
329+
opts := &JobOptions{
330+
Name: "test-job",
331+
Project: "test-project",
332+
Region: "test-region",
333+
Options: runtime.RawOptions{
334+
Options: make(map[string]string),
335+
},
336+
}
337+
338+
expectedHashStr := "dummy-hash-12345"
339+
340+
job, err := Translate(context.Background(), p, opts, "worker-url", "model-url", expectedHashStr)
341+
if err != nil {
342+
t.Fatalf("Translate failed: %v", err)
343+
}
344+
345+
// Verify PipelineProtoHash
346+
var recoveredOptions struct {
347+
Options struct {
348+
PipelineURL string `json:"pipelineUrl"`
349+
PipelineProtoHash string `json:"pipelineProtoHash"`
350+
} `json:"options"`
351+
}
352+
353+
rawOpts := job.Environment.SdkPipelineOptions
354+
if err := json.Unmarshal(rawOpts, &recoveredOptions); err != nil {
355+
t.Fatalf("Failed to unmarshal SdkPipelineOptions: %v", err)
356+
}
357+
358+
if recoveredOptions.Options.PipelineProtoHash != expectedHashStr {
359+
t.Errorf("Expected PipelineProtoHash %v, got %v", expectedHashStr, recoveredOptions.Options.PipelineProtoHash)
360+
}
361+
}

sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,4 +481,10 @@ public OpenTelemetry create(PipelineOptions options) {
481481
return GlobalOpenTelemetry.get();
482482
}
483483
}
484+
485+
/** The hex-encoded SHA256 hash of the staged portable pipeline proto. */
486+
@Description("The hex-encoded SHA256 hash of the staged portable pipeline proto")
487+
String getPipelineProtoHash();
488+
489+
void setPipelineProtoHash(String hash);
484490
}

sdks/python/apache_beam/runners/dataflow/internal/apiclient.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ def __init__(
9797
options,
9898
environment_version,
9999
proto_pipeline_staged_url,
100-
proto_pipeline=None):
100+
proto_pipeline=None,
101+
pipeline_proto_hash=None):
101102
self.standard_options = options.view_as(StandardOptions)
102103
self.google_cloud_options = options.view_as(GoogleCloudOptions)
103104
self.worker_options = options.view_as(WorkerOptions)
@@ -279,6 +280,8 @@ def __init__(
279280
for k, v in sdk_pipeline_options.items() if v is not None
280281
}
281282
options_dict["pipelineUrl"] = proto_pipeline_staged_url
283+
if pipeline_proto_hash:
284+
options_dict["pipelineProtoHash"] = pipeline_proto_hash
282285
# Don't pass impersonate_service_account through to the harness.
283286
# Though impersonation should start a job, the workers should
284287
# not try to modify their credentials.
@@ -831,10 +834,13 @@ def create_job_description(self, job):
831834
resources = self._stage_resources(job.proto_pipeline, job.options)
832835

833836
# Stage proto pipeline.
837+
serialized_pipeline = job.proto_pipeline.SerializeToString()
838+
pipeline_proto_hash = hashlib.sha256(serialized_pipeline).hexdigest()
839+
834840
self.stage_file_with_retry(
835841
job.google_cloud_options.staging_location,
836842
shared_names.STAGED_PIPELINE_FILENAME,
837-
io.BytesIO(job.proto_pipeline.SerializeToString()))
843+
io.BytesIO(serialized_pipeline))
838844

839845
job.proto.environment = Environment(
840846
proto_pipeline_staged_url=FileSystems.join(
@@ -843,7 +849,8 @@ def create_job_description(self, job):
843849
packages=resources,
844850
options=job.options,
845851
environment_version=self.environment_version,
846-
proto_pipeline=job.proto_pipeline).proto
852+
proto_pipeline=job.proto_pipeline,
853+
pipeline_proto_hash=pipeline_proto_hash).proto
847854
_LOGGER.debug('JOB: %s', job)
848855

849856
@retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)

sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
# pytype: skip-file
2121

22+
import hashlib
2223
import io
2324
import itertools
2425
import json
@@ -97,6 +98,40 @@ def test_pipeline_url(self):
9798

9899
self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL)
99100

101+
def test_pipeline_proto_hash(self):
102+
pipeline_options = PipelineOptions(
103+
['--temp_location', 'gs://any-location/temp'])
104+
proto_pipeline = beam_runner_api_pb2.Pipeline()
105+
proto_pipeline.components.transforms['dummy'].unique_name = 'dummy'
106+
107+
expected_hash = hashlib.sha256(
108+
proto_pipeline.SerializeToString()).hexdigest()
109+
110+
env = apiclient.Environment([],
111+
pipeline_options,
112+
'2.0.0',
113+
FAKE_PIPELINE_URL,
114+
proto_pipeline,
115+
pipeline_proto_hash=expected_hash)
116+
117+
recovered_options = None
118+
for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties:
119+
if additionalProperty.key == 'options':
120+
recovered_options = additionalProperty.value
121+
break
122+
else:
123+
self.fail('No pipeline options found')
124+
125+
pipeline_proto_hash = None
126+
for property in recovered_options.object_value.properties:
127+
if property.key == 'pipelineProtoHash':
128+
pipeline_proto_hash = property.value
129+
break
130+
else:
131+
self.fail('No pipelineProtoHash found')
132+
133+
self.assertEqual(pipeline_proto_hash.string_value, expected_hash)
134+
100135
def test_set_network(self):
101136
pipeline_options = PipelineOptions([
102137
'--network',

0 commit comments

Comments
 (0)