diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java index 9727048e47aa..90375ad445ae 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java @@ -249,6 +249,16 @@ public interface DataflowStreamingPipelineOptions extends PipelineOptions { void setIsWindmillServiceDirectPathEnabled(boolean isWindmillServiceDirectPathEnabled); + /** + * The maximum size of cached entries in bytes. Entries (eg: values, bags) larger than this limit + * will not be cached by the windmill state cache + */ + @Description("The maximum size of cached entries in bytes.") + @Default.Long(Long.MAX_VALUE) + Long getMaxWindmillStateCacheEntryBytes(); + + void setMaxWindmillStateCacheEntryBytes(Long value); + /** * Factory for creating local Windmill address. Reads from system propery 'windmill.hostport' for * backwards compatibility. diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index f5e5adab1556..4d070da995b3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -50,6 +50,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore; import org.apache.beam.runners.dataflow.worker.streaming.WorkHeartbeatResponseProcessor; import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig; +import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig.Fetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.FixedGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingApplianceComputationConfigFetcher; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher; @@ -113,6 +114,7 @@ import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.util.construction.CoderTranslation; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.auth.MoreCallCredentials; @@ -633,6 +635,10 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) .setSupportMapViaMultimap(options.isEnableStreamingEngine()) + .setMaxCachedEntryBytes(options.getMaxWindmillStateCacheEntryBytes()) + .setEnableHistogram( + !ExperimentalOptions.hasExperiment( + options, "disable_windmill_user_state_cache_histogram")) .build(); GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = @@ -651,6 +657,15 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o windmillStateCache::forComputation, ID_GENERATOR)); + Fetcher configFetcher = configFetcherComputationStateCacheAndWindmillClient.configFetcher(); + configFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + windmillStateCache.setMaxCachedEntryBytesOverride( + config.userWorkerJobSettings().getMaxCachedEntryBytes()); + }); + ComputationStateCache computationStateCache = configFetcherComputationStateCacheAndWindmillClient.computationStateCache(); WindmillServerStub windmillServer = @@ -689,7 +704,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configFetcherComputationStateCacheAndWindmillClient.configFetcher(), + configFetcher, computationStateCache, windmillStateCache, workExecutor, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java new file mode 100644 index 000000000000..6b90ca8df9ef --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogram.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +/** A simple histogram to track byte sizes. */ +public class SimpleByteHistogram { + private final long[] buckets = new long[7]; + + public void add(long weight) { + buckets[getBucket(weight)]++; + } + + private int getBucket(long weight) { + if (weight < 128) return 0; + if (weight < 256) return 1; + if (weight < 512) return 2; + if (weight < 1024) return 3; + if (weight < 10 * 1024) return 4; + if (weight < 1024 * 1024) return 5; + return 6; + } + + public String format() { + return String.format( + "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", + buckets[0], buckets[1], buckets[2], buckets[3], buckets[4], buckets[5], buckets[6]); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index 07c9599c866a..7515db000852 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; +import org.apache.beam.runners.dataflow.worker.util.SimpleByteHistogram; import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.util.Weighted; @@ -75,9 +76,18 @@ public class WindmillStateCache implements StatusDataProvider { private final ConcurrentMap keyIndex; private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. private final boolean supportMapViaMultimap; - - WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) { + private final long defaultMaxCachedEntryBytes; + private final boolean enableHistogram; + private volatile long maxCachedEntryBytesOverride = -1L; + + WindmillStateCache( + long sizeMb, + boolean supportMapViaMultimap, + long maxCachedEntryBytes, + boolean enableHistogram) { this.workerCacheBytes = sizeMb * MEGABYTES; + this.defaultMaxCachedEntryBytes = maxCachedEntryBytes; + this.enableHistogram = enableHistogram; int stateCacheConcurrencyLevel = Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); this.stateCache = @@ -99,11 +109,27 @@ public interface Builder { Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); + Builder setMaxCachedEntryBytes(long maxCachedEntryBytes); + + Builder setEnableHistogram(boolean enableHistogram); + WindmillStateCache build(); } public static Builder builder() { - return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false); + return new AutoBuilder_WindmillStateCache_Builder() + .setSupportMapViaMultimap(false) + .setMaxCachedEntryBytes(Long.MAX_VALUE) + .setEnableHistogram(true); + } + + public void setMaxCachedEntryBytesOverride(long limit) { + this.maxCachedEntryBytesOverride = limit; + } + + private long getMaxCachedEntryBytesLimit() { + long override = maxCachedEntryBytesOverride; + return override >= 0 ? override : defaultMaxCachedEntryBytes; } private EntryStats calculateEntryStats() { @@ -111,10 +137,20 @@ private EntryStats calculateEntryStats() { BiConsumer consumer = (stateId, stateCacheEntry) -> { stats.entries++; - stats.idWeight += stateId.getWeight(); - stats.entryWeight += stateCacheEntry.getWeight(); + long idWeight = stateId.getWeight(); + stats.idWeight += idWeight; + long entryWeight = stateCacheEntry.getWeight(); + stats.entryWeight += entryWeight; stats.entryValues += stateCacheEntry.values.size(); stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size()); + if (enableHistogram) { + stats.addKeyWeight(idWeight); + stats.addEntryWeight(entryWeight); + stateCacheEntry.values.forEach( + (encodedAddress, weightedValue) -> { + stats.addValueWeight(weightedValue.weight); + }); + } }; stateCache.asMap().forEach(consumer); return stats; @@ -142,23 +178,44 @@ public ForComputation forComputation(String computation) { @Override public void appendSummaryHtml(PrintWriter response) { response.println("Cache Stats:
"); - response.println( - "" - + "" - + "" - + ""); CacheStats cacheStats = stateCache.stats(); EntryStats entryStats = calculateEntryStats(); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println(""); - response.println("
Hit RatioEvictionsEntriesEntry ValuesMax Entry ValuesId WeightEntry WeightMax WeightKeys
" + cacheStats.hitRate() + "" + cacheStats.evictionCount() + "" + entryStats.entries + "(" + stateCache.size() + " inc. weak) " + entryStats.entryValues + "" + entryStats.maxEntryValues + "" + entryStats.idWeight / MEGABYTES + "MB" + entryStats.entryWeight / MEGABYTES + "MB" + getMaxWeight() / MEGABYTES + "MB" + keyIndex.size() + "

"); + + response.println("Hit Ratio" + cacheStats.hitRate() + ""); + response.println("Evictions" + cacheStats.evictionCount() + ""); + response.println( + "Entries" + + entryStats.entries + + " (" + + stateCache.size() + + " inc. weak)"); + response.println("Entry Values" + entryStats.entryValues + ""); + response.println( + "Max Entry Values" + entryStats.maxEntryValues + ""); + response.println( + "Id Weight" + entryStats.idWeight / MEGABYTES + "MB"); + response.println( + "Entry Weight" + entryStats.entryWeight / MEGABYTES + "MB"); + response.println("Max Weight" + getMaxWeight() / MEGABYTES + "MB"); + response.println("Keys" + keyIndex.size() + ""); + response.println( + "Entry Size Limit" + getMaxCachedEntryBytesLimit() + " bytes"); + if (enableHistogram) { + response.println( + "Entry Weight Dist" + + entryStats.entryWeightHistogram.format() + + ""); + response.println( + "Value Weight Dist" + + entryStats.valueWeightHistogram.format() + + ""); + response.println( + "Key Weight Dist" + + entryStats.keyWeightHistogram.format() + + ""); + } + + response.println("
"); } public BaseStatusServlet statusServlet() { @@ -180,6 +237,21 @@ private static class EntryStats { long entryWeight; long entryValues; long maxEntryValues; + SimpleByteHistogram entryWeightHistogram = new SimpleByteHistogram(); + SimpleByteHistogram valueWeightHistogram = new SimpleByteHistogram(); + SimpleByteHistogram keyWeightHistogram = new SimpleByteHistogram(); + + void addEntryWeight(long weight) { + entryWeightHistogram.add(weight); + } + + void addValueWeight(long weight) { + valueWeightHistogram.add(weight); + } + + void addKeyWeight(long weight) { + keyWeightHistogram.add(weight); + } } /** @@ -413,7 +485,15 @@ public void put( } public void persist() { - localCache.forEach(stateCache::put); + long limit = WindmillStateCache.this.getMaxCachedEntryBytesLimit(); + localCache.forEach( + (id, entry) -> { + if (entry.getWeight() <= limit) { + stateCache.put(id, entry); + } else { + stateCache.invalidate(id); + } + }); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java new file mode 100644 index 000000000000..252300a19550 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/SimpleByteHistogramTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.util; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link SimpleByteHistogram}. */ +@RunWith(JUnit4.class) +public class SimpleByteHistogramTest { + + @Test + public void testHistogram() { + SimpleByteHistogram histogram = new SimpleByteHistogram(); + histogram.add(10); // <128B + histogram.add(127); // <128B + histogram.add(128); // <256B + histogram.add(255); // <256B + histogram.add(256); // <512B + histogram.add(511); // <512B + histogram.add(512); // <1KB + histogram.add(1023); // <1KB + histogram.add(1024); // <10KB + histogram.add(10240 - 1); // <10KB + histogram.add(10240); // <1MB + histogram.add(1048576 - 1); // <1MB + histogram.add(1048576); // >=1MB + histogram.add(2000000); // >=1MB + + String expected = "[<128B:2, <256B:2, <512B:2, <1KB:2, <10KB:2, <1MB:2, >=1MB:2]"; + assertEquals(expected, histogram.format()); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index bbb8e4c93c07..2d3d9b5ccff2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -270,6 +270,68 @@ public void testMaxWeight() throws Exception { assertEquals(400 * MEGABYTES, cache.getMaxWeight()); } + @Test + public void testMaxCachedEntryBytes() throws Exception { + cache.setMaxCachedEntryBytesOverride( + 100); // Set limit to 100 bytes, per cache entry overhead is 136. + + WindmillStateCache.ForKeyAndFamily keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 1L).forFamily(STATE_FAMILY); + + TestStateTag tag1 = new TestStateTag("tag1"); + TestStateTag tag2 = new TestStateTag("tag2"); + + putInCache(keyCache, StateNamespaces.global(), tag1, new TestState("g1"), 10); + keyCache.persist(); + + // It should not be in global cache because it's too large. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 2L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag1)); + + // Now set limit larger. + cache.setMaxCachedEntryBytesOverride(1000); + + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2"), 10); + keyCache.persist(); + + // It should be in global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 3L).forFamily(STATE_FAMILY); + assertEquals( + Optional.of(new TestState("g2")), getFromCache(keyCache, StateNamespaces.global(), tag2)); + + // Now update it to be larger than limit. + putInCache(keyCache, StateNamespaces.global(), tag2, new TestState("g2_large"), 2000); + keyCache.persist(); + + // It should be removed from global cache. + keyCache = + cache.forComputation(COMPUTATION).forKey(COMPUTATION_KEY, 0L, 4L).forFamily(STATE_FAMILY); + assertEquals(Optional.empty(), getFromCache(keyCache, StateNamespaces.global(), tag2)); + } + + @Test + public void testDisableHistogram() throws Exception { + WindmillStateCache noHistogramCache = + WindmillStateCache.builder().setSizeMb(400).setEnableHistogram(false).build(); + WindmillStateCache.ForKeyAndFamily keyCache = + noHistogramCache + .forComputation(COMPUTATION) + .forKey(COMPUTATION_KEY, 0L, 1L) + .forFamily(STATE_FAMILY); + + putInCache( + keyCache, StateNamespaces.global(), new TestStateTag("tag1"), new TestState("g1"), 2); + keyCache.persist(); + + java.io.StringWriter writer = new java.io.StringWriter(); + noHistogramCache.appendSummaryHtml(new java.io.PrintWriter(writer)); + String summary = writer.toString(); + + org.junit.Assert.assertFalse(summary.contains("Entry Weight Dist")); + } + /** Verifies that values are cached in the appropriate namespaces. */ @Test public void testInvalidation() throws Exception { diff --git a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto index b7579cbacb8e..58e4f7df3c34 100644 --- a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto +++ b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto @@ -986,6 +986,8 @@ message UserWorkerRunnerV1Settings { optional ConnectivityType connectivity_type = 4 [default = CONNECTIVITY_TYPE_DEFAULT]; + optional int64 max_cached_entry_bytes = 5 [default = -1]; + reserved 1, 2; }