forked from tensorflow/transform
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalysis_graph_builder_test.py
More file actions
597 lines (555 loc) · 46.1 KB
/
analysis_graph_builder_test.py
File metadata and controls
597 lines (555 loc) · 46.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for tensorflow_transform.analysis_graph_builder."""
import os
import sys
import pytest
import tensorflow as tf
import tensorflow_transform as tft
from tensorflow_transform import analyzer_nodes
from tensorflow_transform import impl_helper
from tensorflow_transform import nodes
from tensorflow_transform import tf2_utils
from tensorflow_transform.beam import analysis_graph_builder
from tensorflow_transform.beam import analyzer_cache
from tensorflow_transform.beam import tft_unit
# TODO(b/243513856): Switch to `collections.namedtuple` or `typing.NamedTuple`
# once the Spark issue is resolved.
from tfx_bsl.types import tfx_namedtuple
mock = tf.compat.v1.test.mock
def _preprocessing_fn_with_no_analyzers(inputs):
x = inputs['x']
x_plus_1 = x + 1
return {'x_plus_1': x_plus_1}
_NO_ANALYZERS_CASE = dict(
testcase_name='with_no_analyzers',
feature_spec={'x': tf.io.FixedLenFeature([], tf.float32)},
preprocessing_fn=_preprocessing_fn_with_no_analyzers,
expected_dot_graph_str_py312=r"""digraph G {
directed=True;
node [shape=Mrecord];
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x_plus_1': \"Tensor\<shape: [None], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModel}"];
}
""",
expected_dot_graph_str_tf2=r"""digraph G {
directed=True;
node [shape=Mrecord];
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{\'x_plus_1\': \"Tensor\\<shape: [None], \\<dtype: 'float32'\\>\\>\"})|label: CreateSavedModel}"];
}
""",
)
def _preprocessing_fn_with_one_analyzer(inputs):
@tf.function
def _plus_one(x):
return x + 1
x = _plus_one(inputs['x'])
x_mean = tft.mean(x, name='x')
x_centered = x - x_mean
return {'x_centered': x_centered}
_ONE_ANALYZER_CASE = dict(
testcase_name='with_one_analyzer',
feature_spec={'x': tf.io.FixedLenFeature([], tf.float32)},
preprocessing_fn=_preprocessing_fn_with_one_analyzer,
expected_dot_graph_str_py312=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x/mean_and_var/Cast_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/div_no_nan': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/div_no_nan_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/zeros': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x#mean_and_var]" [label="{ExtractFromDict|keys: ('x/mean_and_var/Cast_1', 'x/mean_and_var/div_no_nan', 'x/mean_and_var/div_no_nan_1', 'x/mean_and_var/zeros')|label: TensorSource[x#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x#mean_and_var]";
"CacheableCombineAccumulate[x#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x#mean_and_var]|partitionable: True}"];
"TensorSource[x#mean_and_var]" -> "CacheableCombineAccumulate[x#mean_and_var]";
"CacheableCombineMerge[x#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x#mean_and_var]}"];
"CacheableCombineAccumulate[x#mean_and_var]" -> "CacheableCombineMerge[x#mean_and_var]";
"ExtractCombineMergeOutputs[x#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x#mean_and_var]" -> "ExtractCombineMergeOutputs[x#mean_and_var]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":0 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":1 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x_centered': \"Tensor\<shape: [None], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModel}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
expected_dot_graph_str_tf2=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x/mean_and_var/Cast_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/div_no_nan', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/div_no_nan_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/zeros', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\")])|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x#mean_and_var]" [label="{ExtractFromDict|keys: ('x/mean_and_var/Cast_1', 'x/mean_and_var/div_no_nan', 'x/mean_and_var/div_no_nan_1', 'x/mean_and_var/zeros')|label: TensorSource[x#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x#mean_and_var]";
"CacheableCombineAccumulate[x#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x#mean_and_var]|partitionable: True}"];
"TensorSource[x#mean_and_var]" -> "CacheableCombineAccumulate[x#mean_and_var]";
"CacheableCombineMerge[x#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x#mean_and_var]}"];
"CacheableCombineAccumulate[x#mean_and_var]" -> "CacheableCombineMerge[x#mean_and_var]";
"ExtractCombineMergeOutputs[x#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x#mean_and_var]" -> "ExtractCombineMergeOutputs[x#mean_and_var]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":0 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":1 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x_centered', \"Tensor\<shape: [None], \<dtype: 'float32'\>\>\")])|label: CreateSavedModel}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
)
def _preprocessing_fn_with_table(inputs):
x = inputs['x']
x_vocab = tft.vocabulary(x, name='x')
initializer = tf.lookup.TextFileInitializer(
x_vocab,
key_dtype=tf.string,
key_index=tf.lookup.TextFileIndex.WHOLE_LINE,
value_dtype=tf.int64,
value_index=tf.lookup.TextFileIndex.LINE_NUMBER)
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
x_integerized = table.lookup(x)
return {'x_integerized': x_integerized}
_WITH_TABLE_CASE = dict(
testcase_name='with_table',
feature_spec={'x': tf.io.FixedLenFeature([], tf.string)},
preprocessing_fn=_preprocessing_fn_with_table,
expected_dot_graph_str_py312=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x/boolean_mask/GatherV2': \"Tensor\<shape: [None], \<dtype: 'string'\>\>\"\})|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x]" [label="{ExtractFromDict|keys: ('x/boolean_mask/GatherV2',)|label: TensorSource[x]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x]";
"VocabularyAccumulate[x]" [label="{VocabularyAccumulate|vocab_ordering_type: 1|input_dtype: string|label: VocabularyAccumulate[x]|partitionable: True}"];
"TensorSource[x]" -> "VocabularyAccumulate[x]";
"VocabularyMerge[x]" [label="{VocabularyMerge|vocab_ordering_type: 1|use_adjusted_mutual_info: False|min_diff_from_avg: None|label: VocabularyMerge[x]}"];
"VocabularyAccumulate[x]" -> "VocabularyMerge[x]";
"VocabularyCountUnfiltered[x]" [label="{VocabularyCount|label: VocabularyCountUnfiltered[x]}"];
"VocabularyMerge[x]" -> "VocabularyCountUnfiltered[x]";
"CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output/vocab_x_unpruned_vocab_size:0|dtype_enum: 9|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]}"];
"VocabularyCountUnfiltered[x]" -> "CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]";
"VocabularyPrune[x]" [label="{VocabularyPrune|top_k: None|frequency_threshold: 0|informativeness_threshold: -inf|coverage_top_k: None|coverage_frequency_threshold: 0|coverage_informativeness_threshold: -inf|key_fn: None|input_dtype: string|label: VocabularyPrune[x]}"];
"VocabularyMerge[x]" -> "VocabularyPrune[x]";
"VocabularyCountFiltered[x]" [label="{VocabularyCount|label: VocabularyCountFiltered[x]}"];
"VocabularyPrune[x]" -> "VocabularyCountFiltered[x]";
"CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output_1/vocab_x_pruned_vocab_size:0|dtype_enum: 9|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]}"];
"VocabularyCountFiltered[x]" -> "CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]";
"VocabularyOrderAndWrite[x]" [label="{VocabularyOrderAndWrite|vocab_filename: vocab_x|store_frequency: False|input_dtype: string|label: VocabularyOrderAndWrite[x]|fingerprint_shuffle: False|file_format: text|input_is_sorted: False}"];
"VocabularyPrune[x]" -> "VocabularyOrderAndWrite[x]";
"CreateTensorBinding[x#temporary_analyzer_output_2#Const]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output_2/Const:0|dtype_enum: 7|is_asset_filepath: True|label: CreateTensorBinding[x#temporary_analyzer_output_2#Const]}"];
"VocabularyOrderAndWrite[x]" -> "CreateTensorBinding[x#temporary_analyzer_output_2#Const]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 1|output_signature: OrderedDict(\{'x_integerized': \"Tensor\<shape: [None], \<dtype: 'int64'\>\>\"\})|label: CreateSavedModel}"];
"CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]" -> CreateSavedModel;
"CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]" -> CreateSavedModel;
"CreateTensorBinding[x#temporary_analyzer_output_2#Const]" -> CreateSavedModel;
}
""",
expected_dot_graph_str_tf2=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x/boolean_mask/GatherV2', \"Tensor\<shape: [None], \<dtype: 'string'\>\>\")])|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x]" [label="{ExtractFromDict|keys: ('x/boolean_mask/GatherV2',)|label: TensorSource[x]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x]";
"VocabularyAccumulate[x]" [label="{VocabularyAccumulate|vocab_ordering_type: 1|input_dtype: string|label: VocabularyAccumulate[x]|partitionable: True}"];
"TensorSource[x]" -> "VocabularyAccumulate[x]";
"VocabularyMerge[x]" [label="{VocabularyMerge|vocab_ordering_type: 1|use_adjusted_mutual_info: False|min_diff_from_avg: None|label: VocabularyMerge[x]}"];
"VocabularyAccumulate[x]" -> "VocabularyMerge[x]";
"VocabularyCountUnfiltered[x]" [label="{VocabularyCount|label: VocabularyCountUnfiltered[x]}"];
"VocabularyMerge[x]" -> "VocabularyCountUnfiltered[x]";
"CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output/vocab_x_unpruned_vocab_size:0|dtype_enum: 9|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]}"];
"VocabularyCountUnfiltered[x]" -> "CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]";
"VocabularyPrune[x]" [label="{VocabularyPrune|top_k: None|frequency_threshold: 0|informativeness_threshold: -inf|coverage_top_k: None|coverage_frequency_threshold: 0|coverage_informativeness_threshold: -inf|key_fn: None|input_dtype: string|label: VocabularyPrune[x]}"];
"VocabularyMerge[x]" -> "VocabularyPrune[x]";
"VocabularyCountFiltered[x]" [label="{VocabularyCount|label: VocabularyCountFiltered[x]}"];
"VocabularyPrune[x]" -> "VocabularyCountFiltered[x]";
"CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output_1/vocab_x_pruned_vocab_size:0|dtype_enum: 9|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]}"];
"VocabularyCountFiltered[x]" -> "CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]";
"VocabularyOrderAndWrite[x]" [label="{VocabularyOrderAndWrite|vocab_filename: vocab_x|store_frequency: False|input_dtype: string|label: VocabularyOrderAndWrite[x]|fingerprint_shuffle: False|file_format: text|input_is_sorted: False}"];
"VocabularyPrune[x]" -> "VocabularyOrderAndWrite[x]";
"CreateTensorBinding[x#temporary_analyzer_output_2#Const]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output_2/Const:0|dtype_enum: 7|is_asset_filepath: True|label: CreateTensorBinding[x#temporary_analyzer_output_2#Const]}"];
"VocabularyOrderAndWrite[x]" -> "CreateTensorBinding[x#temporary_analyzer_output_2#Const]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 1|output_signature: OrderedDict([('x_integerized', \"Tensor\<shape: [None], \<dtype: 'int64'\>\>\")])|label: CreateSavedModel}"];
"CreateTensorBinding[x#temporary_analyzer_output#vocab_x_unpruned_vocab_size]" -> CreateSavedModel;
"CreateTensorBinding[x#temporary_analyzer_output_1#vocab_x_pruned_vocab_size]" -> CreateSavedModel;
"CreateTensorBinding[x#temporary_analyzer_output_2#Const]" -> CreateSavedModel;
}
""",
)
def _preprocessing_fn_with_two_phases(inputs):
x = inputs['x']
x_mean = tft.mean(x, name='x')
x_square_deviations = tf.square(x - x_mean)
x_var = tft.mean(x_square_deviations, name='x_square_deviations')
x_normalized = (x - x_mean) / tf.sqrt(x_var)
return {'x_normalized': x_normalized}
_TWO_PHASES_CASE = dict(
testcase_name='with_two_phases',
feature_spec={'x': tf.io.FixedLenFeature([], tf.float32)},
preprocessing_fn=_preprocessing_fn_with_two_phases,
expected_dot_graph_str_py312=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x/mean_and_var/Cast_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/div_no_nan': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/div_no_nan_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x/mean_and_var/zeros': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x#mean_and_var]" [label="{ExtractFromDict|keys: ('x/mean_and_var/Cast_1', 'x/mean_and_var/div_no_nan', 'x/mean_and_var/div_no_nan_1', 'x/mean_and_var/zeros')|label: TensorSource[x#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x#mean_and_var]";
"CacheableCombineAccumulate[x#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x#mean_and_var]|partitionable: True}"];
"TensorSource[x#mean_and_var]" -> "CacheableCombineAccumulate[x#mean_and_var]";
"CacheableCombineMerge[x#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x#mean_and_var]}"];
"CacheableCombineAccumulate[x#mean_and_var]" -> "CacheableCombineMerge[x#mean_and_var]";
"ExtractCombineMergeOutputs[x#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x#mean_and_var]" -> "ExtractCombineMergeOutputs[x#mean_and_var]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":0 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":1 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
"CreateSavedModelForAnalyzerInputs[Phase1]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x_square_deviations/mean_and_var/Cast_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x_square_deviations/mean_and_var/div_no_nan': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x_square_deviations/mean_and_var/div_no_nan_1': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\", 'x_square_deviations/mean_and_var/zeros': \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModelForAnalyzerInputs[Phase1]}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> "CreateSavedModelForAnalyzerInputs[Phase1]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> "CreateSavedModelForAnalyzerInputs[Phase1]";
"ApplySavedModel[Phase1]" [label="{ApplySavedModel|phase: 1|label: ApplySavedModel[Phase1]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase1]" -> "ApplySavedModel[Phase1]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase1]";
"TensorSource[x_square_deviations#mean_and_var]" [label="{ExtractFromDict|keys: ('x_square_deviations/mean_and_var/Cast_1', 'x_square_deviations/mean_and_var/div_no_nan', 'x_square_deviations/mean_and_var/div_no_nan_1', 'x_square_deviations/mean_and_var/zeros')|label: TensorSource[x_square_deviations#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase1]" -> "TensorSource[x_square_deviations#mean_and_var]";
"CacheableCombineAccumulate[x_square_deviations#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x_square_deviations#mean_and_var]|partitionable: True}"];
"TensorSource[x_square_deviations#mean_and_var]" -> "CacheableCombineAccumulate[x_square_deviations#mean_and_var]";
"CacheableCombineMerge[x_square_deviations#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x_square_deviations#mean_and_var]}"];
"CacheableCombineAccumulate[x_square_deviations#mean_and_var]" -> "CacheableCombineMerge[x_square_deviations#mean_and_var]";
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x_square_deviations#mean_and_var]" -> "ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]";
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x_square_deviations/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]":0 -> "CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x_square_deviations/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]":1 -> "CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x_normalized': \"Tensor\<shape: [None], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModel}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
expected_dot_graph_str_tf2=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x/mean_and_var/Cast_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/div_no_nan', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/div_no_nan_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x/mean_and_var/zeros', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\")])|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x#mean_and_var]" [label="{ExtractFromDict|keys: ('x/mean_and_var/Cast_1', 'x/mean_and_var/div_no_nan', 'x/mean_and_var/div_no_nan_1', 'x/mean_and_var/zeros')|label: TensorSource[x#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x#mean_and_var]";
"CacheableCombineAccumulate[x#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x#mean_and_var]|partitionable: True}"];
"TensorSource[x#mean_and_var]" -> "CacheableCombineAccumulate[x#mean_and_var]";
"CacheableCombineMerge[x#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x#mean_and_var]}"];
"CacheableCombineAccumulate[x#mean_and_var]" -> "CacheableCombineMerge[x#mean_and_var]";
"ExtractCombineMergeOutputs[x#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x#mean_and_var]" -> "ExtractCombineMergeOutputs[x#mean_and_var]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":0 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x#mean_and_var]":1 -> "CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
"CreateSavedModelForAnalyzerInputs[Phase1]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x_square_deviations/mean_and_var/Cast_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x_square_deviations/mean_and_var/div_no_nan', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x_square_deviations/mean_and_var/div_no_nan_1', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\"), ('x_square_deviations/mean_and_var/zeros', \"Tensor\<shape: [], \<dtype: 'float32'\>\>\")])|label: CreateSavedModelForAnalyzerInputs[Phase1]}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> "CreateSavedModelForAnalyzerInputs[Phase1]";
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> "CreateSavedModelForAnalyzerInputs[Phase1]";
"ApplySavedModel[Phase1]" [label="{ApplySavedModel|phase: 1|label: ApplySavedModel[Phase1]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase1]" -> "ApplySavedModel[Phase1]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase1]";
"TensorSource[x_square_deviations#mean_and_var]" [label="{ExtractFromDict|keys: ('x_square_deviations/mean_and_var/Cast_1', 'x_square_deviations/mean_and_var/div_no_nan', 'x_square_deviations/mean_and_var/div_no_nan_1', 'x_square_deviations/mean_and_var/zeros')|label: TensorSource[x_square_deviations#mean_and_var]|partitionable: True}"];
"ApplySavedModel[Phase1]" -> "TensorSource[x_square_deviations#mean_and_var]";
"CacheableCombineAccumulate[x_square_deviations#mean_and_var]" [label="{CacheableCombineAccumulate|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineAccumulate[x_square_deviations#mean_and_var]|partitionable: True}"];
"TensorSource[x_square_deviations#mean_and_var]" -> "CacheableCombineAccumulate[x_square_deviations#mean_and_var]";
"CacheableCombineMerge[x_square_deviations#mean_and_var]" [label="{CacheableCombineMerge|combiner: \<WeightedMeanAndVarCombiner\>|label: CacheableCombineMerge[x_square_deviations#mean_and_var]}"];
"CacheableCombineAccumulate[x_square_deviations#mean_and_var]" -> "CacheableCombineMerge[x_square_deviations#mean_and_var]";
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]" [label="{ExtractCombineMergeOutputs|output_tensor_info_list: [TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None), TensorInfo(dtype=tf.float32, shape=(), temporary_asset_info=None)]|label: ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]|{<0>0|<1>1}}"];
"CacheableCombineMerge[x_square_deviations#mean_and_var]" -> "ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]";
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x_square_deviations/mean_and_var/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]":0 -> "CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]";
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x_square_deviations/mean_and_var/temporary_analyzer_output_1/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]}"];
"ExtractCombineMergeOutputs[x_square_deviations#mean_and_var]":1 -> "CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x_normalized', \"Tensor\<shape: [None], \<dtype: 'float32'\>\>\")])|label: CreateSavedModel}"];
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
"CreateTensorBinding[x_square_deviations#mean_and_var#temporary_analyzer_output_1#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
)
def _preprocessing_fn_with_chained_ptransforms(inputs):
class FakeChainable(
tfx_namedtuple.namedtuple('FakeChainable', ['label']),
nodes.OperationDef):
def __new__(cls):
scope = tf.compat.v1.get_default_graph().get_name_scope()
label = '{}[{}]'.format(cls.__name__, scope)
return super(FakeChainable, cls).__new__(cls, label=label)
with tf.compat.v1.name_scope('x'):
input_values_node = nodes.apply_operation(
analyzer_nodes.TensorSource, tensors=[inputs['x']])
with tf.compat.v1.name_scope('ptransform1'):
intermediate_value_node = nodes.apply_operation(FakeChainable,
input_values_node)
with tf.compat.v1.name_scope('ptransform2'):
output_value_node = nodes.apply_operation(FakeChainable,
intermediate_value_node)
x_chained = analyzer_nodes.bind_future_as_tensor(
output_value_node, analyzer_nodes.TensorInfo(tf.float32, (17, 27),
None))
return {'x_chained': x_chained}
_CHAINED_PTRANSFORMS_CASE = dict(
testcase_name='with_chained_ptransforms',
feature_spec={'x': tf.io.FixedLenFeature([], tf.int64)},
preprocessing_fn=_preprocessing_fn_with_chained_ptransforms,
expected_dot_graph_str_py312=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'inputs_copy': \"Tensor\<shape: [None], \<dtype: 'int64'\>\>\"\})|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x]" [label="{ExtractFromDict|keys: ('inputs_copy',)|label: TensorSource[x]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x]";
"FakeChainable[x/ptransform1]" [label="{FakeChainable|label: FakeChainable[x/ptransform1]}"];
"TensorSource[x]" -> "FakeChainable[x/ptransform1]";
"FakeChainable[x/ptransform2]" [label="{FakeChainable|label: FakeChainable[x/ptransform2]}"];
"FakeChainable[x/ptransform1]" -> "FakeChainable[x/ptransform2]";
"CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]}"];
"FakeChainable[x/ptransform2]" -> "CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict(\{'x_chained': \"Tensor\<shape: [17, 27], \<dtype: 'float32'\>\>\"\})|label: CreateSavedModel}"];
"CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
expected_dot_graph_str_tf2=r"""digraph G {
directed=True;
node [shape=Mrecord];
"CreateSavedModelForAnalyzerInputs[Phase0]" [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('inputs_copy', \"Tensor\<shape: [None], \<dtype: 'int64'\>\>\")])|label: CreateSavedModelForAnalyzerInputs[Phase0]}"];
"ExtractInputForSavedModel[FlattenedDataset]" [label="{ExtractInputForSavedModel|dataset_key: DatasetKey(key='FlattenedDataset', is_cached=True)|label: ExtractInputForSavedModel[FlattenedDataset]}"];
"ApplySavedModel[Phase0]" [label="{ApplySavedModel|phase: 0|label: ApplySavedModel[Phase0]|partitionable: True}"];
"CreateSavedModelForAnalyzerInputs[Phase0]" -> "ApplySavedModel[Phase0]";
"ExtractInputForSavedModel[FlattenedDataset]" -> "ApplySavedModel[Phase0]";
"TensorSource[x]" [label="{ExtractFromDict|keys: ('inputs_copy',)|label: TensorSource[x]|partitionable: True}"];
"ApplySavedModel[Phase0]" -> "TensorSource[x]";
"FakeChainable[x/ptransform1]" [label="{FakeChainable|label: FakeChainable[x/ptransform1]}"];
"TensorSource[x]" -> "FakeChainable[x/ptransform1]";
"FakeChainable[x/ptransform2]" [label="{FakeChainable|label: FakeChainable[x/ptransform2]}"];
"FakeChainable[x/ptransform1]" -> "FakeChainable[x/ptransform2]";
"CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]" [label="{CreateTensorBinding|tensor_name: x/temporary_analyzer_output/PlaceholderWithDefault:0|dtype_enum: 1|is_asset_filepath: False|label: CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]}"];
"FakeChainable[x/ptransform2]" -> "CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]";
CreateSavedModel [label="{CreateSavedModel|table_initializers: 0|output_signature: OrderedDict([('x_chained', \"Tensor\<shape: [17, 27], \<dtype: 'float32'\>\>\")])|label: CreateSavedModel}"];
"CreateTensorBinding[x#temporary_analyzer_output#PlaceholderWithDefault]" -> CreateSavedModel;
}
""",
)
_ANALYZE_TEST_CASES = [
_NO_ANALYZERS_CASE,
_ONE_ANALYZER_CASE,
_WITH_TABLE_CASE,
_TWO_PHASES_CASE,
_CHAINED_PTRANSFORMS_CASE,
]
class AnalysisGraphBuilderTest(tft_unit.TransformTestCase):
@tft_unit.named_parameters(
*tft_unit.cross_named_parameters(
_ANALYZE_TEST_CASES,
[
dict(testcase_name='tf2', use_tf_compat_v1=False),
],
)
)
@pytest.mark.xfail(run=False, reason="PR 315 This class contains tests that fail and needs to be fixed. "
"If all tests pass, please remove this mark.")
def test_build(
self,
feature_spec,
preprocessing_fn,
expected_dot_graph_str_py312,
expected_dot_graph_str_tf2,
use_tf_compat_v1,
):
if not use_tf_compat_v1:
tft_unit.skip_if_not_tf2('Tensorflow 2.x required')
specs = (
feature_spec if use_tf_compat_v1 else
impl_helper.get_type_specs_from_feature_specs(feature_spec))
graph, structured_inputs, structured_outputs = (
impl_helper.trace_preprocessing_function(
preprocessing_fn,
specs,
use_tf_compat_v1=use_tf_compat_v1,
base_temp_dir=os.path.join(self.get_temp_dir(),
self._testMethodName)))
(transform_fn_future, unused_cache,
unused_sideeffects) = analysis_graph_builder.build(graph,
structured_inputs,
structured_outputs)
dot_string = nodes.get_dot_graph([transform_fn_future]).to_string()
self.WriteRenderedDotFile(dot_string)
self.assertMultiLineEqual(
msg='Result dot graph is:\n{}'.format(dot_string),
first=dot_string,
second=(
expected_dot_graph_str_py312
if sys.version_info >= (3, 12)
else expected_dot_graph_str_tf2
),
)
@tft_unit.named_parameters(
*tft_unit.cross_named_parameters(
[
dict(
testcase_name='one_dataset_cached_single_phase',
preprocessing_fn=_preprocessing_fn_with_one_analyzer,
full_dataset_keys=['a', 'b'],
cached_dataset_keys=['a'],
expected_dataset_keys=['b'],
),
dict(
testcase_name='all_datasets_cached_single_phase',
preprocessing_fn=_preprocessing_fn_with_one_analyzer,
full_dataset_keys=['a', 'b'],
cached_dataset_keys=['a', 'b'],
expected_dataset_keys=[],
),
dict(
testcase_name='mixed_single_phase',
preprocessing_fn=lambda d: dict( # pylint: disable=g-long-lambda
list(
_preprocessing_fn_with_chained_ptransforms(d).items()
)
+ list(_preprocessing_fn_with_one_analyzer(d).items())
),
full_dataset_keys=['a', 'b'],
cached_dataset_keys=['a', 'b'],
expected_dataset_keys=['a', 'b'],
),
dict(
testcase_name='multi_phase',
preprocessing_fn=_preprocessing_fn_with_two_phases,
full_dataset_keys=['a', 'b'],
cached_dataset_keys=['a', 'b'],
expected_dataset_keys=['a', 'b'],
),
],
[
dict(testcase_name='tf2', use_tf_compat_v1=False),
],
)
)
def test_get_analysis_dataset_keys(self, preprocessing_fn, full_dataset_keys,
cached_dataset_keys, expected_dataset_keys,
use_tf_compat_v1):
if not use_tf_compat_v1:
tft_unit.skip_if_not_tf2('Tensorflow 2.x required')
full_dataset_keys = list(
map(analyzer_cache.DatasetKey, full_dataset_keys))
cached_dataset_keys = map(analyzer_cache.DatasetKey, cached_dataset_keys)
expected_dataset_keys = map(
analyzer_cache.DatasetKey, expected_dataset_keys)
# We force all dataset keys with entries in the cache dict will have a cache
# hit.
mocked_cache_entry_key = b'M'
input_cache = {
key: analyzer_cache.DatasetCache({mocked_cache_entry_key: 'C'}, None)
for key in cached_dataset_keys
}
feature_spec = {'x': tf.io.FixedLenFeature([], tf.float32)}
specs = (
feature_spec if use_tf_compat_v1 else
impl_helper.get_type_specs_from_feature_specs(feature_spec))
with mock.patch(
'tensorflow_transform.beam.analysis_graph_builder.'
'analyzer_cache.make_cache_entry_key',
return_value=mocked_cache_entry_key):
dataset_keys = (
analysis_graph_builder.get_analysis_dataset_keys(
preprocessing_fn,
specs,
full_dataset_keys,
input_cache,
force_tf_compat_v1=use_tf_compat_v1))
self.DebugPublishLatestsRenderedTFTGraph()
self.assertCountEqual(expected_dataset_keys, dataset_keys)
@tft_unit.named_parameters(
dict(testcase_name='tf2', use_tf_compat_v1=False),
)
def test_get_analysis_cache_entry_keys(self, use_tf_compat_v1):
if not use_tf_compat_v1:
tft_unit.skip_if_not_tf2('Tensorflow 2.x required')
full_dataset_keys = map(analyzer_cache.DatasetKey, ['a', 'b'])
def preprocessing_fn(inputs):
return {'x': tft.scale_to_0_1(inputs['x'])}
mocked_cache_entry_key = 'A'
def mocked_make_cache_entry_key(_):
return mocked_cache_entry_key
feature_spec = {'x': tf.io.FixedLenFeature([], tf.float32)}
specs = (
feature_spec if use_tf_compat_v1 else
impl_helper.get_type_specs_from_feature_specs(feature_spec))
with mock.patch(
'tensorflow_transform.beam.analysis_graph_builder.'
'analyzer_cache.make_cache_entry_key',
side_effect=mocked_make_cache_entry_key):
cache_entry_keys = (
analysis_graph_builder.get_analysis_cache_entry_keys(
preprocessing_fn,
specs,
full_dataset_keys,
force_tf_compat_v1=use_tf_compat_v1))
self.DebugPublishLatestsRenderedTFTGraph()
self.assertCountEqual(cache_entry_keys, [mocked_cache_entry_key])
def test_duplicate_label_error(self):
def _preprocessing_fn(inputs):
class _Analyzer(
tfx_namedtuple.namedtuple('_Analyzer', ['label']),
nodes.OperationDef):
pass
input_values_node = nodes.apply_operation(
analyzer_nodes.TensorSource, tensors=[inputs['x']])
intermediate_value_node = nodes.apply_operation(
_Analyzer, input_values_node, label='SameLabel')
output_value_node = nodes.apply_operation(
_Analyzer, intermediate_value_node, label='SameLabel')
x_chained = analyzer_nodes.bind_future_as_tensor(
output_value_node,
analyzer_nodes.TensorInfo(tf.float32, (17, 27), None))
return {'x_chained': x_chained}
feature_spec = {'x': tf.io.FixedLenFeature([], tf.float32)}
use_tf_compat_v1 = tf2_utils.use_tf_compat_v1(False)
specs = (
feature_spec if use_tf_compat_v1 else
impl_helper.get_type_specs_from_feature_specs(feature_spec))
graph, structured_inputs, structured_outputs = (
impl_helper.trace_preprocessing_function(
_preprocessing_fn,
specs,
use_tf_compat_v1=use_tf_compat_v1,
base_temp_dir=os.path.join(self.get_temp_dir(),
self._testMethodName)))
with self.assertRaisesRegex(AssertionError, 'SameLabel'):
_ = analysis_graph_builder.build(graph, structured_inputs,
structured_outputs)