Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new PubsubReaderIterator(context.getWorkItem());
return new PubsubReaderIterator();
}

class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
protected PubsubReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
protected PubsubReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
Expand Down Expand Up @@ -157,6 +158,9 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private @Nullable UnboundedReader<?> activeReader;

private @Nullable WorkExecutor workExecutor;
Comment thread
arunpandianp marked this conversation as resolved.
private boolean finishKeyCalled = false;

public StreamingModeExecutionContext(
CounterFactory counterFactory,
String computationId,
Expand Down Expand Up @@ -240,9 +244,12 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
Windmill.WorkItemCommitRequest.Builder outputBuilder,
WorkExecutor workExecutor) {
this.key = key;
this.work = work;
this.workExecutor = workExecutor;
Comment thread
arunpandianp marked this conversation as resolved.
this.finishKeyCalled = false;
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
StreamingGlobalConfig config = globalConfigHandle.getConfig();
Expand Down Expand Up @@ -270,6 +277,17 @@ public void start(
}
}

public void finishKey() {
checkState(!finishKeyCalled, "finishKey was already called");
checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
try {
workExecutor.finishKey();
} catch (Exception e) {
throw new RuntimeException(e);
}
this.finishKeyCalled = true;
}

/**
* Ensure that the processing time is greater than any fired processing time timers. Otherwise, a
* trigger could ignore the timer and orphan the window.
Expand Down Expand Up @@ -451,6 +469,7 @@ public void invalidateCache() {
}

public Map<Long, Pair<Instant, Runnable>> flushState() {
checkState(finishKeyCalled, "finishKey must be called before flushState");
Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();

for (StepContext stepContext : getAllStepContexts()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new UngroupedWindmillReaderIterator(context.getWorkItem());
return new UngroupedWindmillReaderIterator();
}

class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
UngroupedWindmillReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/
public abstract class WindmillReaderIteratorBase<T>
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
private final StreamingModeExecutionContext context;
private final Windmill.WorkItem work;
private int bundleIndex = 0;
private int messageIndex = -1;
Expand All @@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase<T>
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);

protected WindmillReaderIteratorBase(
Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
StreamingModeExecutionContext context, ValueProvider<Boolean> skipUndecodableElements) {
this.context = context;
this.skipUndecodableElements = skipUndecodableElements;
this.work = work;
this.work = context.getWorkItem();
}

@Override
Expand All @@ -54,9 +56,14 @@ public boolean start() throws IOException {

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
}

while (true) {
if (bundleIndex >= work.getMessageBundlesCount()) {
current = null;
context.finishKey();
return false;
}
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ public NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>> iterator() throw
return new NativeReaderIterator<WindowedValue<KeyedWorkItem<K, T>>>() {
@Override
public boolean start() throws IOException {
context.finishKey();
return false;
}

Expand All @@ -182,6 +183,7 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
Comment on lines 183 to 184
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The anonymous iterator in WindowingWindmillReader should also check context.workIsFailed() and throw WorkItemCancelledException, consistent with the changes in WindmillReaderIteratorBase. This ensures that cancelled work items are handled promptly.

        @Override
        public boolean advance() throws IOException {
          if (context.workIsFailed()) {
            throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
          }

current = null;
context.finishKey();
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void execute() throws Exception {
LOG.debug("Source operation execution complete");
}

@Override
public void finishKey() throws Exception {}

@Override
public SourceOperationResponse getResponse() {
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ public boolean start() throws IOException {
}
try {
if (!reader.start()) {
context.finishKey();
return false;
}
} catch (Exception e) {
Expand All @@ -841,10 +842,13 @@ public boolean advance() throws IOException {
// that there are regular checkpoints and that state does not become too large.
BackOff backoff = backoffFactory.backoff();
while (true) {
if (context.workIsFailed()) {
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
}
if (elemsRead >= maxElems
|| Instant.now().isAfter(endTime)
|| context.isSinkFullHintSet()
|| context.workIsFailed()) {
|| context.isSinkFullHintSet()) {
context.finishKey();
return false;
}
try {
Expand All @@ -857,6 +861,7 @@ public boolean advance() throws IOException {
}
long nextBackoff = backoff.nextBackOffMillis();
if (nextBackoff == BackOff.STOP) {
context.finishKey();
return false;
}
Uninterruptibles.sleepUninterruptibly(nextBackoff, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final void executeWork(
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
workExecutor().execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public void process(Object elem) throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public boolean supportsRestart() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public void execute() throws Exception {
// TODO: support for success / failure ports?
}

@Override
public void finishKey() throws Exception {
for (Operation op : operations) {
op.finishKey();
}
}

@Override
public NativeReader.Progress getWorkerProgress() throws Exception {
return getReadOperation().getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public void finish() throws Exception {
}
}

/** Called when all elements for a specific key have been processed. */
public abstract void finishKey() throws Exception;

/** Aborts this Operation's execution. */
public void abort() throws Exception {
synchronized (initializationStateLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ public void process(Object elem) throws Exception {
}

@Override
public void finish() throws Exception {
// Batch mode does not use this method and instead relies on BatchModeUngroupingParDoFn
// to process timers per key.
public void finishKey() throws Exception {
try (Closeable scope = context.enterProcessTimers()) {
checkStarted();
fn.processTimers();
}
}

@Override
public void finish() throws Exception {
try (Closeable scope = context.enterFinish()) {
fn.finishBundle();
super.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ public void finish() throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public void abort() throws Exception {
if (readerIterator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface WorkExecutor extends AutoCloseable {
/** Executes the task. */
public abstract void execute() throws Exception;

/** Called when all elements for a specific key have been processed. */
void finishKey() throws Exception;

/**
* Returns the worker's current progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public void finish() throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public void abort() throws Exception {
if (writer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public void abort() throws Exception {
aborted = true;
super.abort();
}

@Override
public void finishKey() throws Exception {}
}

// A mock ReadOperation fed to a MapTaskExecutor in test.
Expand Down Expand Up @@ -312,6 +315,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(1L);
}
}

@Override
public void finishKey() throws Exception {}
},
new Operation(new OutputReceiver[] {}, context2) {
@Override
Expand All @@ -321,6 +327,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(2L);
}
}

@Override
public void finishKey() throws Exception {}
},
new Operation(new OutputReceiver[] {}, context3) {
@Override
Expand All @@ -330,6 +339,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(3L);
}
}

@Override
public void finishKey() throws Exception {}
});

try (IntrinsicMapTaskExecutor executor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3657,8 +3657,8 @@ public void testActiveWorkFailure() throws Exception {
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5));
assertEquals(1, commits.size());

assertEquals(0, BlockingFn.teardownCounter.get());
assertEquals(1, BlockingFn.setupCounter.get());
assertEquals(1, BlockingFn.teardownCounter.get());
assertEquals(2, BlockingFn.setupCounter.get());

worker.stop();
}
Expand Down
Loading
Loading