diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 10aa127ed806..0c1327cd06c3 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -1551,7 +1551,6 @@ class BeamModulePlugin implements Plugin { "ThreadPriorityCheck", "UndefinedEquals", "UnescapedEntity", - "UnnecessaryParentheses", "UnrecognisedJavadocTag", // errorprone 3.2.0+ checks "DirectInvocationOnMock", diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java index 17ae820f1c13..337f14d4ea09 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigquery/BigQueryIOLT.java @@ -244,7 +244,7 @@ public void testWriteAndRead() throws IOException { .withAvroFormatFunction( new AvroFormatFn( configuration.numColumns, - !("STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod)))); + !"STORAGE_WRITE_API".equalsIgnoreCase(configuration.writeMethod))); break; case JSON: writeIO = diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index e42ae91d0973..6124f4a5413d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -302,7 +302,7 @@ private BundleOutputManager(Map, UncommittedBundle> bundles) { @Override public void output(TupleTag tag, WindowedValue output) { checkArgument(bundles.containsKey(tag), "Unknown output tag %s", tag); - (bundles.get(tag)).add((WindowedValue) output); + bundles.get(tag).add((WindowedValue) output); } } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java index 8837f3b9ec4e..1d1cb9fa5463 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/source/unbounded/FlinkUnboundedSourceReader.java @@ -128,7 +128,7 @@ public void start() { pipelineOptions.as(FlinkPipelineOptions.class).getAutoWatermarkInterval(); if (watermarkInterval == null) { watermarkInterval = - (pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills()) / 5L; + pipelineOptions.as(FlinkPipelineOptions.class).getMaxBundleTimeMills() / 5L; watermarkInterval = (watermarkInterval > MIN_WATERMARK_EMIT_INTERVAL_MS) ? watermarkInterval diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8e5a91c0441b..b0ab553d237c 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -287,7 +287,7 @@ private void testTriggerCombinerLiftingDisabled(Trigger trigger) throws Exceptio @Test public void testRepeatedCountTriggerDisablesCombinerLifting() throws IOException, Exception { - testTriggerCombinerLiftingDisabled(Repeatedly.forever((AfterPane.elementCountAtLeast(1)))); + testTriggerCombinerLiftingDisabled(Repeatedly.forever(AfterPane.elementCountAtLeast(1))); } @Test diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 3dc3293aa26c..6ff05b4b4452 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -478,7 +478,7 @@ private synchronized void recordActiveMessageInProcessingTimesMap() { return; } int processingTime = - (int) (this.activeMessageMetadata.stopwatch().elapsed(TimeUnit.MILLISECONDS)); + (int) this.activeMessageMetadata.stopwatch().elapsed(TimeUnit.MILLISECONDS); this.processingTimesByStep.compute( this.activeMessageMetadata.userStepName(), (k, v) -> { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java index d12cc2a593cb..2a0f6ab9eeb7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/common/worker/ShuffleEntry.java @@ -96,10 +96,10 @@ public boolean equals(@Nullable Object o) { } if (o instanceof ShuffleEntry) { ShuffleEntry that = (ShuffleEntry) o; - return (Objects.equals(this.position, that.position)) - && (Objects.equals(this.key, that.key)) - && (Objects.equals(this.secondaryKey, that.secondaryKey)) - && (Objects.equals(this.value, that.value)); + return Objects.equals(this.position, that.position) + && Objects.equals(this.key, that.key) + && Objects.equals(this.secondaryKey, that.secondaryKey) + && Objects.equals(this.value, that.value); } return false; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java index 48916f3eeed5..1c747edc1dea 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/getdata/ApplianceGetDataClient.java @@ -104,7 +104,7 @@ private void issueReadBatch(ReadBatch batch) { } catch (InterruptedException e) { // We don't expect this thread to be interrupted. To simplify handling, we just fall through // to issuing the call. - assert (false); + assert false; Thread.currentThread().interrupt(); } catch (ExecutionException e) { // startRead is a SettableFuture so this should never occur. @@ -185,7 +185,7 @@ private void issueReadBatch(ReadBatch batch) { synchronized (this) { ReadBatch batch; if (activeReadThreads < MAX_ACTIVE_READS) { - assert (pendingReadBatches.isEmpty()); + assert pendingReadBatches.isEmpty(); activeReadThreads += 1; // fall through to below synchronized block } else if (pendingReadBatches.isEmpty() diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMultimap.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMultimap.java index 7fa2e94a1bac..4b4fd84c2d57 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMultimap.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillMultimap.java @@ -548,7 +548,7 @@ private Map>> mergedCachedEntries( keyStateMap .entrySet() .removeIf( - (entry -> { + entry -> { Object structuralKey = entry.getKey(); KeyState keyState = entry.getValue(); if (complete && keyState.existence == KeyExistence.KNOWN_EXIST) { @@ -574,7 +574,7 @@ private Map>> mergedCachedEntries( return (keyState.existence == KeyExistence.KNOWN_NONEXISTENT && !keyState.removedLocally) || keyState.existence == KeyExistence.UNKNOWN_EXISTENCE; - })); + }); return cachedEntries; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/CombineValuesFnFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/CombineValuesFnFactoryTest.java index 4ca632acc66d..da86b2bf9f38 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/CombineValuesFnFactoryTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/CombineValuesFnFactoryTest.java @@ -222,7 +222,7 @@ private ParDoFn createCombineValuesFn( public void testCombineValuesFnAll() throws Exception { TestReceiver receiver = new TestReceiver(); - Combine.CombineFn combiner = (new MeanInts()); + Combine.CombineFn combiner = new MeanInts(); ParDoFn combineParDoFn = createCombineValuesFn( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 95374d46010b..e9df101793cb 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -3540,7 +3540,7 @@ public void testQueuedWorkFailure() throws Exception { // Release the blocked calls. BlockingFn.blocker().countDown(); Map commits = - server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds((5))); + server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5)); assertEquals(1, commits.size()); assertEquals(0, BlockingFn.teardownCounter.get()); @@ -3594,7 +3594,7 @@ public void testActiveWorkFailure() throws Exception { // Release the blocked call, there should not be a commit and the dofn should be invalidated. BlockingFn.blocker().countDown(); Map commits = - server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds((5))); + server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5)); assertEquals(1, commits.size()); assertEquals(0, BlockingFn.teardownCounter.get()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestShuffleReaderTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestShuffleReaderTest.java index 88530afa35aa..a4f36fe9598d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestShuffleReaderTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/TestShuffleReaderTest.java @@ -115,8 +115,8 @@ private List>> readShuffleEntries(Reiterator>>= 7; - stream.write((byte) (v)); + stream.write((byte) v); } @Benchmark diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java index 0b4db9f4f034..7a63f432906b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java @@ -349,7 +349,7 @@ private boolean readDefaultLine() throws IOException { // Search for the newline for (; bufferPosn < bufferLength; ++bufferPosn) { if (buffer[bufferPosn] == LF) { - newlineLength = (prevCharCR) ? 2 : 1; + newlineLength = prevCharCR ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRange.java index be808dfea9a5..0f3d6db6b8c8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRange.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/range/ByteKeyRange.java @@ -255,7 +255,7 @@ public ByteKey interpolateKey(double fraction) { BigInteger interpolatedOffset = new BigDecimal(range).multiply(BigDecimal.valueOf(fraction)).toBigInteger(); - int outputKeyLength = endKey.isEmpty() ? (paddedKeyLength - 1) : paddedKeyLength; + int outputKeyLength = endKey.isEmpty() ? paddedKeyLength - 1 : paddedKeyLength; return ByteKey.copyFrom( fixupHeadZeros(rangeStartInt.add(interpolatedOffset).toByteArray(), outputKeyLength)); } @@ -364,7 +364,7 @@ private static BigInteger paddedPositiveInt(byte[] bytes, int length) { checkArgument( bytePaddingNeeded >= 0, "Required bytes.length %s < length %s", bytes.length, length); BigInteger ret = new BigInteger(1, bytes); - return (bytePaddingNeeded == 0) ? ret : ret.shiftLeft(8 * bytePaddingNeeded); + return bytePaddingNeeded == 0 ? ret : ret.shiftLeft(8 * bytePaddingNeeded); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index af0353c902a6..c8315e140b50 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -1112,7 +1112,7 @@ static ProcessElementMethod create( extraParameters().stream() .filter(Predicates.instanceOf(OutputReceiverParameter.class)::apply) .findFirst(); - return parameter.isPresent() ? ((OutputReceiverParameter) parameter.get()) : null; + return parameter.isPresent() ? (OutputReceiverParameter) parameter.get() : null; } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java index 193326da0870..68d0a8b81757 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java @@ -63,7 +63,7 @@ private static String getUrn(ProtocolMessageEnum value) { checkState(CPU_COUNT_URN.equals(getUrn(StandardResourceHints.Enum.CPU_COUNT))); checkState( MAX_ACTIVE_BUNDLES_PER_WORKER.equals( - (getUrn(StandardResourceHints.Enum.MAX_ACTIVE_BUNDLES_PER_WORKER)))); + getUrn(StandardResourceHints.Enum.MAX_ACTIVE_BUNDLES_PER_WORKER))); } private static ImmutableMap hintNameToUrn = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java index 6422a58fa782..6e1d05491a7b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/HistogramData.java @@ -588,7 +588,7 @@ private int getBucketIndexZeroScale(int value) { } private int getBucketIndexNegativeScale(int value) { - return getBucketIndexZeroScale(value) >> (-getScale()); + return getBucketIndexZeroScale(value) >> -getScale(); } // This method is valid for all 'scale' values but we fallback to more efficient methods for diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java index 3eb9b5a64905..e1937b878743 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java @@ -97,7 +97,7 @@ public static void encode(long v, OutputStream stream) throws IOException { } stream.write((byte) (v | 0x80)); v >>>= 7; - stream.write((byte) (v)); + stream.write((byte) v); } /** Decodes an integer value from the given stream. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowUtils.java index 8a312f1ff0b8..92f60113620a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/RowUtils.java @@ -176,7 +176,7 @@ public Object match( processedValue = cases.processBytes( rowPosition, - (byte[]) ((value instanceof ByteBuffer) ? ((ByteBuffer) value).array() : value), + (byte[]) (value instanceof ByteBuffer ? ((ByteBuffer) value).array() : value), this); break; case INT16: diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java index 5001d2f4dd46..8df2e0abf355 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java @@ -282,13 +282,13 @@ public void testRunWithAdditionalArgsEffective() { pipeline.apply(Create.of("")).apply(new ValidateTempLocation<>()); PipelineResult.State result = pipeline.runWithAdditionalOptionArgs(pipelineArgs).waitUntilFinish(); - assert (result == PipelineResult.State.DONE); + assertEquals(PipelineResult.State.DONE, result); } static class ValidateTempLocation extends PTransform, PCollection> { @Override public void validate(PipelineOptions pipelineOptions) { - assert (!Strings.isNullOrEmpty(pipelineOptions.getTempLocation())); + assert !Strings.isNullOrEmpty(pipelineOptions.getTempLocation()); } @Override diff --git a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java index 2460a38e4843..5999b44e7e6b 100644 --- a/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java +++ b/sdks/java/expansion-service/src/main/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProvider.java @@ -115,7 +115,7 @@ public PTransform getTransform(FunctionSpec spec, PipelineOptions options) { } Schema configSchemaFromRequest = - SchemaTranslation.schemaFromProto((payload.getConfigurationSchema())); + SchemaTranslation.schemaFromProto(payload.getConfigurationSchema()); Schema configSchemaFromProvider = provider.configurationSchema(); if (!configSchemaFromRequest.assignableTo(configSchemaFromProvider)) { diff --git a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJavaTimeConversions.java b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJavaTimeConversions.java index 8f01e325c0e5..4291818e0480 100644 --- a/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJavaTimeConversions.java +++ b/sdks/java/extensions/avro/src/main/java/org/apache/beam/sdk/extensions/avro/schemas/utils/AvroJavaTimeConversions.java @@ -193,8 +193,8 @@ public String getLogicalTypeName() { @Override public Instant fromLong(Long microsFromEpoch, Schema schema, LogicalType type) { - long epochSeconds = microsFromEpoch / (1_000_000L); - long nanoAdjustment = (microsFromEpoch % (1_000_000L)) * 1_000L; + long epochSeconds = microsFromEpoch / 1_000_000L; + long nanoAdjustment = (microsFromEpoch % 1_000_000L) * 1_000L; return Instant.ofEpochSecond(epochSeconds, nanoAdjustment); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java index 7b5a8941f25d..c4b685e27d19 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java @@ -241,7 +241,7 @@ private static int getColumnIndex(RexNode rexNode) { */ public static PCollection.IsBounded getBoundednessOfRelNode(RelNode relNode) { if (relNode instanceof BeamRelNode) { - return (((BeamRelNode) relNode).isBounded()); + return ((BeamRelNode) relNode).isBounded(); } List boundednessOfInputs = new ArrayList<>(); for (RelNode inputRel : relNode.getInputs()) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java index b95ec6df0d7f..b86ef8dd7876 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamSideInputLookupJoinRule.java @@ -47,15 +47,14 @@ public BeamSideInputLookupJoinRule() { @Override public boolean matches(RelOptRuleCall call) { RelNode joinRel = call.rel(0); - boolean matches = BeamJoinRel.containsSeekableInput(joinRel); - return (matches); + return BeamJoinRel.containsSeekableInput(joinRel); } @Override public RelNode convert(RelNode rel) { Join join = (Join) rel; - return (new BeamSideInputLookupJoinRel( + return new BeamSideInputLookupJoinRel( join.getCluster(), join.getTraitSet().replace(BeamLogicalConvention.INSTANCE), convert( @@ -64,6 +63,6 @@ public RelNode convert(RelNode rel) { join.getRight(), join.getRight().getTraitSet().replace(BeamLogicalConvention.INSTANCE)), join.getCondition(), join.getVariablesSet(), - join.getJoinType())); + join.getJoinType()); } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index d86ef653dcab..47f85178b0a1 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -1850,7 +1850,7 @@ public void reset() { response = handler.progress( BeamFnApi.InstructionRequest.newBuilder() - .setInstructionId("thread-" + threadId + "-" + (++requestCount)) + .setInstructionId("thread-" + threadId + "-" + ++requestCount) .setProcessBundleProgress( ProcessBundleProgressRequest.newBuilder() .setInstructionId("999L") diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java index 6cf79c958090..d962be2f8aee 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java @@ -24,7 +24,7 @@ public class SerializationTestUtil { private static final ObjectMapper MAPPER = - (new ObjectMapper()) + new ObjectMapper() .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); public static T serializeDeserialize(Class clazz, T obj) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index 4cf8f9c73bab..2691f82eebef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -189,13 +189,13 @@ static Long convertTime(Object value, boolean micros) { Preconditions.checkArgument( value instanceof Long, "Expecting a value as Long type (time)."); return CivilTimeEncoder.encodePacked64TimeMicros( - java.time.LocalTime.ofNanoOfDay((TimeUnit.MICROSECONDS.toNanos((long) value)))); + java.time.LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos((long) value))); } else { Preconditions.checkArgument( value instanceof Integer, "Expecting a value as Integer type (time)."); return CivilTimeEncoder.encodePacked64TimeMicros( java.time.LocalTime.ofNanoOfDay( - (TimeUnit.MILLISECONDS).toNanos(((Integer) value).longValue()))); + TimeUnit.MILLISECONDS.toNanos(((Integer) value).longValue()))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java index 36e1d77b67bf..674329fc6847 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProto.java @@ -138,7 +138,7 @@ private static ByteString toProtoByteString(Object o) { CivilTimeEncoder.encodePacked64DatetimeMicros((LocalDateTime) value)) .put( SqlTypes.TIMESTAMP.getIdentifier(), - (logicalType, value) -> (ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value))) + (logicalType, value) -> ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value)) .put( EnumerationType.IDENTIFIER, (logicalType, value) -> diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java index 058b64f91532..2dbc4316b883 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java @@ -648,7 +648,7 @@ private static TableFieldSchema typedTableFieldSchema(Schema type, Boolean useAv case LONG: // TODO: Use LogicalTypes.TimestampNanos once avro version is updated. if (useAvroLogicalTypes - && (TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType")))) { + && TIMESTAMP_NANOS_LOGICAL_TYPE.equals(type.getProp("logicalType"))) { return fieldSchema.setType("TIMESTAMP").setTimestampPrecision(12L); } if (logicalType instanceof LogicalTypes.TimeMicros) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java index 646dfdd873fc..a3776b595d85 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java @@ -756,7 +756,7 @@ public static DynamicMessage messageFromMap( TableRow nestedUnknown = new TableRow(); if (fieldDescriptor.isRepeated()) { ((List) - (unknownFields.computeIfAbsent(key, k -> new ArrayList()))) + unknownFields.computeIfAbsent(key, k -> new ArrayList())) .add(nestedUnknown); return nestedUnknown; } @@ -895,7 +895,7 @@ public static DynamicMessage messageFromTableRow( } TableRow localUnknownFields = Preconditions.checkStateNotNull(unknownFields); @Nullable - TableRow nested = (TableRow) (localUnknownFields.getF().get(finalIndex).getV()); + TableRow nested = (TableRow) localUnknownFields.getF().get(finalIndex).getV(); if (nested == null) { nested = new TableRow(); localUnknownFields.getF().set(finalIndex, new TableCell().setV(nested)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java index 9fa68bdd0181..78fef141515f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableConfigTranslator.java @@ -137,7 +137,7 @@ private static BigtableDataSettings.Builder buildBigtableDataSettings( .stubSettings() .setCredentialsProvider( FixedCredentialsProvider.create( - (pipelineOptions.as(GcpOptions.class)).getGcpCredential())); + pipelineOptions.as(GcpOptions.class).getGcpCredential())); } if (config.getCredentialFactory() != null) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java index 2b1be006df45..1d5c78c6699e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteSchemaTransformProvider.java @@ -355,7 +355,7 @@ public PCollection>> changeMutationInput( throw new RuntimeException( String.format( "Unexpected mutation type [%s]: Key value is %s", - ((input.getString("type"))), + input.getString("type"), Arrays.toString(input.getBytes("key")))); } return KV.of(key, bigtableMutation); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java index d1c6f894dd7c..9f9cfbdb1fef 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/BatchSpannerRead.java @@ -256,7 +256,7 @@ public void processElement(ProcessContext c) throws Exception { serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); LOG.error( "Error while reading partition for operation: {}", op.getReadOperation().toString(), e); - throw (e); + throw e; } serviceCallMetric.call("ok"); // Report Lineage metrics diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java index 2cc32c44a625..f30407a86dcc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/MutationUtils.java @@ -63,7 +63,7 @@ public static boolean isPointDelete(Mutation m) { */ public static SerializableFunction beamRowToMutationFn( Mutation.Op operation, String table) { - return (row -> { + return row -> { switch (operation) { case INSERT: return MutationUtils.createMutationFromBeamRows(Mutation.newInsertBuilder(table), row); @@ -80,7 +80,7 @@ public static SerializableFunction beamRowToMutationFn( throw new IllegalArgumentException( String.format("Unknown mutation operation type: %s", operation)); } - }); + }; } private static Key createKeyFromBeamRow(Row row) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java index feb8a1e4cc05..ad5423f942c8 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerRead.java @@ -122,7 +122,7 @@ public void processElement(ProcessContext c) throws Exception { } catch (SpannerException e) { serviceCallMetric.call(e.getErrorCode().getGrpcStatusCode().toString()); LOG.error("Error while reading operation: {}", op, e); - throw (e); + throw e; } serviceCallMetric.call("ok"); // Report Lineage metrics diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java index 3e5f370434c2..8d263a286f3e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java @@ -703,7 +703,7 @@ public ApiFuture appendRows(long offset, ProtoRows rows) if (this.updatedSchema == null) { this.updatedSchema = newSchema; this.schemaInformation = - TableRowToStorageApiProto.SchemaInformation.fromTableSchema((this.updatedSchema)); + TableRowToStorageApiProto.SchemaInformation.fromTableSchema(this.updatedSchema); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java index b4f2fa6aa241..2e6b8b76ef30 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BeamRowToStorageApiProtoTest.java @@ -345,7 +345,7 @@ public void testDescriptorFromSchema() { public void testNestedFromSchema() { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( - BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((NESTED_SCHEMA)), true, false); + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(NESTED_SCHEMA), true, false); Map expectedBaseTypes = BASE_SCHEMA_PROTO.getFieldList().stream() .collect( @@ -429,7 +429,7 @@ public void testParticularMapsFromSchemas() { DescriptorProto descriptor = TableRowToStorageApiProto.descriptorSchemaFromTableSchema( - BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema((nestedMapSchemaVariations)), + BeamRowToStorageApiProto.protoTableSchemaFromBeamSchema(nestedMapSchemaVariations), true, false); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 3a467de38049..e5ad761993e7 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -3502,7 +3502,7 @@ public void testStorageApiErrorsWriteGenericRecord() throws Exception { String fieldName = "number"; Function shouldFailRow = (Function & Serializable) - tr -> (Long.valueOf((String) tr.get(fieldName))) >= failFrom; + tr -> Long.valueOf((String) tr.get(fieldName)) >= failFrom; fakeDatasetService.setShouldFailRow(shouldFailRow); SerializableFunction formatRecordOnFailureFunction = diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java index 43419b7147f6..59bf2e0bcd74 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/ReadChangeStreamPartitionActionTest.java @@ -248,7 +248,7 @@ public void testCloseStreamTerminateOKStatus() throws IOException { // Should terminate before reaching processing stream partition responses. verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any()); // Should not try claim any restriction when processing CloseStream - verify(tracker, (never())).tryClaim(any()); + verify(tracker, never()).tryClaim(any()); // Should decrement the metric on termination. verify(metrics).decPartitionStreamCount(); // Should not try to write any new partition to the metadata table. @@ -274,7 +274,7 @@ public void testCloseStreamTerminateNotOutOfRangeStatus() throws IOException { // Should terminate before reaching processing stream partition responses. verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any()); // Should not try claim any restriction when processing CloseStream - verify(tracker, (never())).tryClaim(any()); + verify(tracker, never()).tryClaim(any()); // Should decrement the metric on termination. verify(metrics).decPartitionStreamCount(); // Should not try to write any new partition to the metadata table. @@ -309,7 +309,7 @@ public void testCloseStreamWritesContinuationTokens() throws IOException { // Should terminate before reaching processing stream partition responses. verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any()); // Should not try claim any restriction when processing CloseStream - verify(tracker, (never())).tryClaim(any()); + verify(tracker, never()).tryClaim(any()); // Should decrement the metric on termination. verify(metrics).decPartitionStreamCount(); // Write the new partitions. @@ -361,7 +361,7 @@ public void testCloseStreamNewPartitionMerge() throws IOException { // Should terminate before reaching processing stream partition responses. verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any()); // Should not try claim any restriction when processing CloseStream - verify(tracker, (never())).tryClaim(any()); + verify(tracker, never()).tryClaim(any()); // Should decrement the metric on termination. verify(metrics).decPartitionStreamCount(); // Write the new partitions. @@ -407,7 +407,7 @@ public void testCloseStreamMergeWithoutNewPartitionsField() throws IOException { // Should terminate before reaching processing stream partition responses. verify(changeStreamAction, never()).run(any(), any(), any(), any(), any(), any()); // Should not try claim any restriction when processing CloseStream - verify(tracker, (never())).tryClaim(any()); + verify(tracker, never()).tryClaim(any()); // Should decrement the metric on termination. verify(metrics).decPartitionStreamCount(); // We have to correct the partition in the tokens if we don't have new_partitions field. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index 49b7f88f9e25..599115faee7b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -181,7 +181,7 @@ public void sendOneMessageWithoutAttributes() throws IOException { RecordIdMethod.DETERMINISTIC, null); p.apply(Create.of(ImmutableList.of(DATA))) - .apply(ParDo.of((new Stamp((@Nullable Map) null /* attributes */)))) + .apply(ParDo.of(new Stamp((@Nullable Map) null /* attributes */))) .apply(sink); p.run(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java index aa60f9d3f46e..332aa067b0eb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/storage/GcsMatchIT.java @@ -189,7 +189,7 @@ public static void tearDown() throws IOException { MatchResult matchResult = FileSystems.match(watchPath.resolve("*").toString()); ImmutableList resourceIdList = FluentIterable.from(matchResult.metadata()) - .transform(metadata -> (metadata.resourceId())) + .transform(metadata -> metadata.resourceId()) .toList(); // delete temporary files FileSystems.delete(resourceIdList); diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java index b70bfcc3ad8b..bc575b50af54 100644 --- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java +++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java @@ -949,7 +949,7 @@ public void processElement(ProcessContext c) throws IOException { recordsWritten++; } catch (IOException e) { throw new RuntimeException( - (String.join( + String.join( " ", "Table", tableId, @@ -963,7 +963,7 @@ public void processElement(ProcessContext c) throws IOException { connection.getAdmin().isTableEnabled(TableName.valueOf(tableId))), "\nConnection Closed/Aborted/Locks:", Boolean.toString(connection.isClosed()), - Boolean.toString(connection.isAborted())))); + Boolean.toString(connection.isAborted()))); } } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java index da75c9baaa45..6069924711c4 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcReadSchemaTransformProvider.java @@ -186,8 +186,8 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { if (Strings.isNullOrEmpty(driverClassName)) { driverClassName = JDBC_DRIVER_MAP.get( - (Objects.requireNonNull( - !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType()) .toLowerCase()); } diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java index 3386fbe090d0..47742da3548e 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcWriteSchemaTransformProvider.java @@ -190,8 +190,8 @@ protected JdbcIO.DataSourceConfiguration dataSourceConfiguration() { if (Strings.isNullOrEmpty(driverClassName)) { driverClassName = JDBC_DRIVER_MAP.get( - (Objects.requireNonNull( - !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType())) + Objects.requireNonNull( + !Strings.isNullOrEmpty(jdbcType) ? jdbcType : config.getJdbcType()) .toLowerCase()); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index d247886cc674..d972c2a45fa2 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -2069,8 +2069,7 @@ public void processElement( String uniqueId = null; if (element != null) { offset = element.getOffset(); - uniqueId = - (String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset)); + uniqueId = String.format("%s-%d-%d", element.getTopic(), element.getPartition(), offset); } outputReceiver.builder(element).setRecordId(uniqueId).setRecordOffset(offset).output(); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index a8d290237b3b..27af3b67930f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -712,7 +712,7 @@ private boolean topicPartitionExists( TopicPartition topicPartition, List partitionInfos) { // Check if the current TopicPartition still exists. return partitionInfos.stream() - .anyMatch(partitionInfo -> partitionInfo.partition() == (topicPartition.partition())); + .anyMatch(partitionInfo -> partitionInfo.partition() == topicPartition.partition()); } private TimestampPolicyContext updateWatermarkManually( diff --git a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java index 754c88f0c6a4..71e24a6859c1 100644 --- a/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java +++ b/sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java @@ -329,19 +329,19 @@ public void testReadCheckpoint() { preparer.add(messages.get(4), Instant.ofEpochMilli(50)); MqttIO.MqttCheckpointMark checkpointB = preparer.newCheckpoint(); assertTrue( - Arrays.stream(messages.toArray()).allMatch((m -> ((FakeMessage) m).getAckCount() == 0))); + Arrays.stream(messages.toArray()).allMatch(m -> ((FakeMessage) m).getAckCount() == 0)); checkpointA.finalizeCheckpoint(); // only messages in finalized checkpoint acked assertTrue( Arrays.stream(messages.subList(0, 3).toArray()) - .allMatch((m -> ((FakeMessage) m).getAckCount() == 1))); + .allMatch(m -> ((FakeMessage) m).getAckCount() == 1)); assertTrue( Arrays.stream(messages.subList(3, 5).toArray()) - .allMatch((m -> ((FakeMessage) m).getAckCount() == 0))); + .allMatch(m -> ((FakeMessage) m).getAckCount() == 0)); checkpointB.finalizeCheckpoint(); // all messaged acked once assertTrue( - Arrays.stream(messages.toArray()).allMatch((m -> ((FakeMessage) m).getAckCount() == 1))); + Arrays.stream(messages.toArray()).allMatch(m -> ((FakeMessage) m).getAckCount() == 1)); } @Test diff --git a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java index 37f48672b739..c33e97ab4ad1 100644 --- a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java +++ b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/PulsarIOTest.java @@ -54,9 +54,7 @@ public void testRead() { PCollection pcoll = pipeline .apply( - PulsarIO.read() - .withTopic(TEST_TOPIC) - .withPulsarClient((ignored -> newFakeClient()))) + PulsarIO.read().withTopic(TEST_TOPIC).withPulsarClient(ignored -> newFakeClient())) .apply( MapElements.into(TypeDescriptor.of(Integer.class)) .via(m -> (int) m.getMessageId()[1])); @@ -75,7 +73,7 @@ public void testRead() { @Test public void testExpandReadFailUnserializableType() { pipeline.apply( - PulsarIO.read(t -> t).withTopic(TEST_TOPIC).withPulsarClient((ignored -> newFakeClient()))); + PulsarIO.read(t -> t).withTopic(TEST_TOPIC).withPulsarClient(ignored -> newFakeClient())); IllegalStateException exception = Assert.assertThrows(IllegalStateException.class, pipeline::run); String errorMsg = exception.getMessage();