Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1551,7 +1551,6 @@ class BeamModulePlugin implements Plugin<Project> {
"ThreadPriorityCheck",
"UndefinedEquals",
"UnescapedEntity",
"UnnecessaryParentheses",
"UnrecognisedJavadocTag",
// errorprone 3.2.0+ checks
"DirectInvocationOnMock",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public void testWriteAndRead() throws IOException {
.withAvroFormatFunction(
new AvroFormatFn(
configuration.numColumns,
!("STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod))));
!"STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod)));
break;
case JSON:
writeIO =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
@Override
public <OutputT> void output(TupleTag<OutputT> tag, WindowedValue<OutputT> output) {
checkArgument(bundles.containsKey(tag), "Unknown output tag %s", tag);
(bundles.get(tag)).add((WindowedValue) output);
bundles.get(tag).add((WindowedValue) output);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void start() {
pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval();
if (watermarkInterval == null) {
watermarkInterval =
(pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills()) / 5L;
pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills() / 5L;
watermarkInterval =
(watermarkInterval > MIN_WATERMARK_EMIT_INTERVAL_MS)
? watermarkInterval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws Exceptio

@Test
public void testRepeatedCountTriggerDisablesCombinerLifting() throws IOException, Exception {
testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1))));
testTriggerCombinerLiftingDisabled(Repeatedly.forever(AfterPane.elementCountAtLeast(1)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ private synchronized void recordActiveMessageInProcessingTimesMap() {
return;
}
int processingTime =
(int) (this.activeMessageMetadata.stopwatch().elapsed(TimeUnit.MILLISECONDS));
(int) this.activeMessageMetadata.stopwatch().elapsed(TimeUnit.MILLISECONDS);
this.processingTimesByStep.compute(
this.activeMessageMetadata.userStepName(),
(k, v) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,10 @@ public boolean equals(@Nullable Object o) {
}
if (o instanceof ShuffleEntry) {
ShuffleEntry that = (ShuffleEntry) o;
return (Objects.equals(this.position, that.position))
&& (Objects.equals(this.key, that.key))
&& (Objects.equals(this.secondaryKey, that.secondaryKey))
&& (Objects.equals(this.value, that.value));
return Objects.equals(this.position, that.position)
&& Objects.equals(this.key, that.key)
&& Objects.equals(this.secondaryKey, that.secondaryKey)
&& Objects.equals(this.value, that.value);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void issueReadBatch(ReadBatch batch) {
} catch (InterruptedException e) {
// We don't expect this thread to be interrupted. To simplify handling, we just fall through
// to issuing the call.
assert (false);
assert false;
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// startRead is a SettableFuture so this should never occur.
Expand Down Expand Up @@ -185,7 +185,7 @@ private void issueReadBatch(ReadBatch batch) {
synchronized (this) {
ReadBatch batch;
if (activeReadThreads < MAX_ACTIVE_READS) {
assert (pendingReadBatches.isEmpty());
assert pendingReadBatches.isEmpty();
activeReadThreads += 1;
// fall through to below synchronized block
} else if (pendingReadBatches.isEmpty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ private Map<Object, Triple<K, Boolean, ConcatIterables<V>>> mergedCachedEntries(
keyStateMap
.entrySet()
.removeIf(
(entry -> {
entry -> {
Object structuralKey = entry.getKey();
KeyState keyState = entry.getValue();
if (complete && keyState.existence == KeyExistence.KNOWN_EXIST) {
Expand All @@ -574,7 +574,7 @@ private Map<Object, Triple<K, Boolean, ConcatIterables<V>>> mergedCachedEntries(
return (keyState.existence == KeyExistence.KNOWN_NONEXISTENT
&& !keyState.removedLocally)
|| keyState.existence == KeyExistence.UNKNOWN_EXISTENCE;
}));
});
return cachedEntries;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private <K, InputT, AccumT, OutputT> ParDoFn createCombineValuesFn(
public void testCombineValuesFnAll() throws Exception {
TestReceiver receiver = new TestReceiver();

Combine.CombineFn<Integer, CountSum, String> combiner = (new MeanInts());
Combine.CombineFn<Integer, CountSum, String> combiner = new MeanInts();

ParDoFn combineParDoFn =
createCombineValuesFn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3540,7 +3540,7 @@ public void testQueuedWorkFailure() throws Exception {
// Release the blocked calls.
BlockingFn.blocker().countDown();
Map<Long, Windmill.WorkItemCommitRequest> commits =
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds((5)));
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5));
assertEquals(1, commits.size());

assertEquals(0, BlockingFn.teardownCounter.get());
Expand Down Expand Up @@ -3594,7 +3594,7 @@ public void testActiveWorkFailure() throws Exception {
// Release the blocked call, there should not be a commit and the dofn should be invalidated.
BlockingFn.blocker().countDown();
Map<Long, Windmill.WorkItemCommitRequest> commits =
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds((5)));
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5));
assertEquals(1, commits.size());

assertEquals(0, BlockingFn.teardownCounter.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ private List<KV<String, KV<String, String>>> readShuffleEntries(Reiterator<Shuff
ShuffleEntry entry = iter.next();
actual.add(
KV.of(
(entry.getKey().toStringUtf8()),
KV.of((entry.getSecondaryKey().toStringUtf8()), (entry.getValue().toStringUtf8()))));
entry.getKey().toStringUtf8(),
KV.of(entry.getSecondaryKey().toStringUtf8(), entry.getValue().toStringUtf8())));
}
return actual;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ static void encodeUnrolled(long v, OutputStream stream) throws IOException {
}
stream.write((byte) (v | 0x80));
v >>>= 7;
stream.write((byte) (v));
stream.write((byte) v);
}

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ private boolean readDefaultLine() throws IOException {
// Search for the newline
for (; bufferPosn < bufferLength; ++bufferPosn) {
if (buffer[bufferPosn] == LF) {
newlineLength = (prevCharCR) ? 2 : 1;
newlineLength = prevCharCR ? 2 : 1;
++bufferPosn; // at next invocation proceed from following byte
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public ByteKey interpolateKey(double fraction) {
BigInteger interpolatedOffset =
new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger();

int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength;
int outputKeyLength = endKey.isEmpty() ? paddedKeyLength - 1 : paddedKeyLength;
return ByteKey.copyFrom(
fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength));
}
Expand Down Expand Up @@ -364,7 +364,7 @@ private static BigInteger paddedPositiveInt(byte[] bytes, int length) {
checkArgument(
bytePaddingNeeded >= 0, "Required bytes.length %s < length %s", bytes.length, length);
BigInteger ret = new BigInteger(1, bytes);
return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded);
return bytePaddingNeeded == 0 ? ret : ret.shiftLeft(8 * bytePaddingNeeded);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,7 +1112,7 @@ static ProcessElementMethod create(
extraParameters().stream()
.filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply)
.findFirst();
return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null;
return parameter.isPresent() ? (OutputReceiverParameter) parameter.get() : null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ private static String getUrn(ProtocolMessageEnum value) {
checkState(CPU_COUNT_URN.equals(getUrn(StandardResourceHints.Enum.CPU_COUNT)));
checkState(
MAX_ACTIVE_BUNDLES_PER_WORKER.equals(
(getUrn(StandardResourceHints.Enum.MAX_ACTIVE_BUNDLES_PER_WORKER))));
getUrn(StandardResourceHints.Enum.MAX_ACTIVE_BUNDLES_PER_WORKER)));
}

private static ImmutableMap<String, String> hintNameToUrn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ private int getBucketIndexZeroScale(int value) {
}

private int getBucketIndexNegativeScale(int value) {
return getBucketIndexZeroScale(value) >> (-getScale());
return getBucketIndexZeroScale(value) >> -getScale();
}

// This method is valid for all 'scale' values but we fallback to more efficient methods for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static void encode(long v, OutputStream stream) throws IOException {
}
stream.write((byte) (v | 0x80));
v >>>= 7;
stream.write((byte) (v));
stream.write((byte) v);
}

/** Decodes an integer value from the given stream. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public Object match(
processedValue =
cases.processBytes(
rowPosition,
(byte[]) ((value instanceof ByteBuffer) ? ((ByteBuffer) value).array() : value),
(byte[]) (value instanceof ByteBuffer ? ((ByteBuffer) value).array() : value),
this);
break;
case INT16:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,13 +282,13 @@ public void testRunWithAdditionalArgsEffective() {
pipeline.apply(Create.of("")).apply(new ValidateTempLocation<>());
PipelineResult.State result =
pipeline.runWithAdditionalOptionArgs(pipelineArgs).waitUntilFinish();
assert (result == PipelineResult.State.DONE);
assertEquals(PipelineResult.State.DONE, result);
}

static class ValidateTempLocation<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public void validate(PipelineOptions pipelineOptions) {
assert (!Strings.isNullOrEmpty(pipelineOptions.getTempLocation()));
assert !Strings.isNullOrEmpty(pipelineOptions.getTempLocation());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public PTransform getTransform(FunctionSpec spec, PipelineOptions options) {
}

Schema configSchemaFromRequest =
SchemaTranslation.schemaFromProto((payload.getConfigurationSchema()));
SchemaTranslation.schemaFromProto(payload.getConfigurationSchema());
Schema configSchemaFromProvider = provider.configurationSchema();

if (!configSchemaFromRequest.assignableTo(configSchemaFromProvider)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ public String getLogicalTypeName() {

@Override
public Instant fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
long epochSeconds = microsFromEpoch / (1_000_000L);
long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L;
long epochSeconds = microsFromEpoch / 1_000_000L;
long nanoAdjustment = (microsFromEpoch % 1_000_000L) * 1_000L;

return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private static int getColumnIndex(RexNode rexNode) {
*/
public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode) {
if (relNode instanceof BeamRelNode) {
return (((BeamRelNode) relNode).isBounded());
return ((BeamRelNode) relNode).isBounded();
}
List<PCollection.IsBounded> boundednessOfInputs = new ArrayList<>();
for (RelNode inputRel : relNode.getInputs()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ public BeamSideInputLookupJoinRule() {
@Override
public boolean matches(RelOptRuleCall call) {
RelNode joinRel = call.rel(0);
boolean matches = BeamJoinRel.containsSeekableInput(joinRel);
return (matches);
return BeamJoinRel.containsSeekableInput(joinRel);
}

@Override
public RelNode convert(RelNode rel) {
Join join = (Join) rel;

return (new BeamSideInputLookupJoinRel(
return new BeamSideInputLookupJoinRel(
join.getCluster(),
join.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
convert(
Expand All @@ -64,6 +63,6 @@ public RelNode convert(RelNode rel) {
join.getRight(), join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
join.getCondition(),
join.getVariablesSet(),
join.getJoinType()));
join.getJoinType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1850,7 +1850,7 @@ public void reset() {
response =
handler.progress(
BeamFnApi.InstructionRequest.newBuilder()
.setInstructionId("thread-" + threadId + "-" + (++requestCount))
.setInstructionId("thread-" + threadId + "-" + ++requestCount)
.setProcessBundleProgress(
ProcessBundleProgressRequest.newBuilder()
.setInstructionId("999L")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public class SerializationTestUtil {

private static final ObjectMapper MAPPER =
(new ObjectMapper())
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));

public static <T> T serializeDeserialize(Class<T> clazz, T obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,13 @@ static Long convertTime(Object value, boolean micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return CivilTimeEncoder.encodePacked64TimeMicros(
java.time.LocalTime.ofNanoOfDay((TimeUnit.MICROSECONDS.toNanos((long) value))));
java.time.LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos((long) value)));
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return CivilTimeEncoder.encodePacked64TimeMicros(
java.time.LocalTime.ofNanoOfDay(
(TimeUnit.MILLISECONDS).toNanos(((Integer) value).longValue())));
TimeUnit.MILLISECONDS.toNanos(((Integer) value).longValue())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private static ByteString toProtoByteString(Object o) {
CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value))
.put(
SqlTypes.TIMESTAMP.getIdentifier(),
(logicalType, value) -> (ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value)))
(logicalType, value) -> ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value))
.put(
EnumerationType.IDENTIFIER,
(logicalType, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv
case LONG:
// TODO: Use LogicalTypes.TimestampNanos once avro version is updated.
if (useAvroLogicalTypes
&& (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) {
&& TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType"))) {
return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L);
}
if (logicalType instanceof LogicalTypes.TimeMicros) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ public static DynamicMessage messageFromMap(
TableRow nestedUnknown = new TableRow();
if (fieldDescriptor.isRepeated()) {
((List<TableRow>)
(unknownFields.computeIfAbsent(key, k -> new ArrayList<TableRow>())))
unknownFields.computeIfAbsent(key, k -> new ArrayList<TableRow>()))
.add(nestedUnknown);
return nestedUnknown;
}
Expand Down Expand Up @@ -895,7 +895,7 @@ public static DynamicMessage messageFromTableRow(
}
TableRow localUnknownFields = Preconditions.checkStateNotNull(unknownFields);
@Nullable
TableRow nested = (TableRow) (localUnknownFields.getF().get(finalIndex).getV());
TableRow nested = (TableRow) localUnknownFields.getF().get(finalIndex).getV();
if (nested == null) {
nested = new TableRow();
localUnknownFields.getF().set(finalIndex, new TableCell().setV(nested));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ private static BigtableDataSettings.Builder buildBigtableDataSettings(
.stubSettings()
.setCredentialsProvider(
FixedCredentialsProvider.create(
(pipelineOptions.as(GcpOptions.class)).getGcpCredential()));
pipelineOptions.as(GcpOptions.class).getGcpCredential()));
}

if (config.getCredentialFactory() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public PCollection<KV<ByteString, Iterable<Mutation>>> changeMutationInput(
throw new RuntimeException(
String.format(
"Unexpected mutation type [%s]: Key value is %s",
((input.getString("type"))),
input.getString("type"),
Arrays.toString(input.getBytes("key"))));
}
return KV.of(key, bigtableMutation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void processElement(ProcessContext c) throws Exception {
serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString());
LOG.error(
"Error while reading partition for operation: {}", op.getReadOperation().toString(), e);
throw (e);
throw e;
}
serviceCallMetric.call("ok");
// Report Lineage metrics
Expand Down
Loading
Loading