-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Dataflow Streaming] Add a job setting to limit value size in windmill state cache #38458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2ec6939
59a7c3d
af26fb1
e551790
10e0042
5097df1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| /** A simple histogram to track byte sizes. */ | ||
| public class SimpleByteHistogram { | ||
| private final long[] buckets = new long[7]; | ||
|
|
||
| public void add(long weight) { | ||
| buckets[getBucket(weight)]++; | ||
| } | ||
|
|
||
| private int getBucket(long weight) { | ||
| if (weight < 128) return 0; | ||
| if (weight < 256) return 1; | ||
| if (weight < 512) return 2; | ||
| if (weight < 1024) return 3; | ||
| if (weight < 10 * 1024) return 4; | ||
| if (weight < 1024 * 1024) return 5; | ||
| return 6; | ||
| } | ||
|
|
||
| public String format() { | ||
| return String.format( | ||
| "[<128B:%d, <256B:%d, <512B:%d, <1KB:%d, <10KB:%d, <1MB:%d, >=1MB:%d]", | ||
| buckets[0], buckets[1], buckets[2], buckets[3], buckets[4], buckets[5], buckets[6]); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,6 +32,7 @@ | |
| import org.apache.beam.runners.dataflow.worker.status.BaseStatusServlet; | ||
| import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; | ||
| import org.apache.beam.runners.dataflow.worker.streaming.ShardedKey; | ||
| import org.apache.beam.runners.dataflow.worker.util.SimpleByteHistogram; | ||
| import org.apache.beam.runners.dataflow.worker.util.common.worker.InternedByteString; | ||
| import org.apache.beam.sdk.state.State; | ||
| import org.apache.beam.sdk.util.Weighted; | ||
|
|
@@ -75,9 +76,18 @@ public class WindmillStateCache implements StatusDataProvider { | |
| private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex; | ||
| private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. | ||
| private final boolean supportMapViaMultimap; | ||
|
|
||
| WindmillStateCache(long sizeMb, boolean supportMapViaMultimap) { | ||
| private final long defaultMaxCachedEntryBytes; | ||
| private final boolean enableHistogram; | ||
| private volatile long maxCachedEntryBytesOverride = -1L; | ||
|
|
||
| WindmillStateCache( | ||
| long sizeMb, | ||
| boolean supportMapViaMultimap, | ||
| long maxCachedEntryBytes, | ||
| boolean enableHistogram) { | ||
| this.workerCacheBytes = sizeMb * MEGABYTES; | ||
| this.defaultMaxCachedEntryBytes = maxCachedEntryBytes; | ||
| this.enableHistogram = enableHistogram; | ||
| int stateCacheConcurrencyLevel = | ||
| Math.max(STATE_CACHE_CONCURRENCY_LEVEL, Runtime.getRuntime().availableProcessors()); | ||
| this.stateCache = | ||
|
|
@@ -99,22 +109,48 @@ public interface Builder { | |
|
|
||
| Builder setSupportMapViaMultimap(boolean supportMapViaMultimap); | ||
|
|
||
| Builder setMaxCachedEntryBytes(long maxCachedEntryBytes); | ||
|
|
||
| Builder setEnableHistogram(boolean enableHistogram); | ||
|
|
||
| WindmillStateCache build(); | ||
| } | ||
|
|
||
| public static Builder builder() { | ||
| return new AutoBuilder_WindmillStateCache_Builder().setSupportMapViaMultimap(false); | ||
| return new AutoBuilder_WindmillStateCache_Builder() | ||
| .setSupportMapViaMultimap(false) | ||
| .setMaxCachedEntryBytes(Long.MAX_VALUE) | ||
| .setEnableHistogram(true); | ||
| } | ||
|
|
||
| public void setMaxCachedEntryBytesOverride(long limit) { | ||
| this.maxCachedEntryBytesOverride = limit; | ||
| } | ||
|
|
||
| private long getMaxCachedEntryBytesLimit() { | ||
| long override = maxCachedEntryBytesOverride; | ||
| return override >= 0 ? override : defaultMaxCachedEntryBytes; | ||
| } | ||
|
|
||
| private EntryStats calculateEntryStats() { | ||
| EntryStats stats = new EntryStats(); | ||
| BiConsumer<StateId, StateCacheEntry> consumer = | ||
| (stateId, stateCacheEntry) -> { | ||
| stats.entries++; | ||
| stats.idWeight += stateId.getWeight(); | ||
| stats.entryWeight += stateCacheEntry.getWeight(); | ||
| long idWeight = stateId.getWeight(); | ||
| stats.idWeight += idWeight; | ||
| long entryWeight = stateCacheEntry.getWeight(); | ||
| stats.entryWeight += entryWeight; | ||
| stats.entryValues += stateCacheEntry.values.size(); | ||
| stats.maxEntryValues = Math.max(stats.maxEntryValues, stateCacheEntry.values.size()); | ||
| if (enableHistogram) { | ||
| stats.addKeyWeight(idWeight); | ||
| stats.addEntryWeight(entryWeight); | ||
| stateCacheEntry.values.forEach( | ||
| (encodedAddress, weightedValue) -> { | ||
|
arunpandianp marked this conversation as resolved.
|
||
| stats.addValueWeight(weightedValue.weight); | ||
| }); | ||
|
Comment on lines
+149
to
+152
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Iterating over all values in every cache entry to calculate the weight distribution increases the complexity of |
||
| } | ||
| }; | ||
| stateCache.asMap().forEach(consumer); | ||
| return stats; | ||
|
|
@@ -142,23 +178,44 @@ public ForComputation forComputation(String computation) { | |
| @Override | ||
| public void appendSummaryHtml(PrintWriter response) { | ||
| response.println("Cache Stats: <br><table>"); | ||
| response.println( | ||
| "<tr><th>Hit Ratio</th><th>Evictions</th><th>Entries</th>" | ||
| + "<th>Entry Values</th><th>Max Entry Values</th>" | ||
| + "<th>Id Weight</th><th>Entry Weight</th><th>Max Weight</th><th>Keys</th>" | ||
| + "</tr><tr>"); | ||
| CacheStats cacheStats = stateCache.stats(); | ||
| EntryStats entryStats = calculateEntryStats(); | ||
| response.println("<td>" + cacheStats.hitRate() + "</td>"); | ||
| response.println("<td>" + cacheStats.evictionCount() + "</td>"); | ||
| response.println("<td>" + entryStats.entries + "(" + stateCache.size() + " inc. weak) </td>"); | ||
| response.println("<td>" + entryStats.entryValues + "</td>"); | ||
| response.println("<td>" + entryStats.maxEntryValues + "</td>"); | ||
| response.println("<td>" + entryStats.idWeight / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + entryStats.entryWeight / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + getMaxWeight() / MEGABYTES + "MB</td>"); | ||
| response.println("<td>" + keyIndex.size() + "</td>"); | ||
| response.println("</tr></table><br>"); | ||
|
|
||
| response.println("<tr><th>Hit Ratio</th><td>" + cacheStats.hitRate() + "</td></tr>"); | ||
| response.println("<tr><th>Evictions</th><td>" + cacheStats.evictionCount() + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Entries</th><td>" | ||
| + entryStats.entries | ||
| + " (" | ||
| + stateCache.size() | ||
| + " inc. weak)</td></tr>"); | ||
| response.println("<tr><th>Entry Values</th><td>" + entryStats.entryValues + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Max Entry Values</th><td>" + entryStats.maxEntryValues + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Id Weight</th><td>" + entryStats.idWeight / MEGABYTES + "MB</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Entry Weight</th><td>" + entryStats.entryWeight / MEGABYTES + "MB</td></tr>"); | ||
| response.println("<tr><th>Max Weight</th><td>" + getMaxWeight() / MEGABYTES + "MB</td></tr>"); | ||
| response.println("<tr><th>Keys</th><td>" + keyIndex.size() + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Entry Size Limit</th><td>" + getMaxCachedEntryBytesLimit() + " bytes</td></tr>"); | ||
| if (enableHistogram) { | ||
| response.println( | ||
| "<tr><th>Entry Weight Dist</th><td>" | ||
| + entryStats.entryWeightHistogram.format() | ||
| + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Value Weight Dist</th><td>" | ||
| + entryStats.valueWeightHistogram.format() | ||
| + "</td></tr>"); | ||
| response.println( | ||
| "<tr><th>Key Weight Dist</th><td>" | ||
| + entryStats.keyWeightHistogram.format() | ||
| + "</td></tr>"); | ||
| } | ||
|
|
||
| response.println("</table><br>"); | ||
| } | ||
|
|
||
| public BaseStatusServlet statusServlet() { | ||
|
|
@@ -180,6 +237,21 @@ private static class EntryStats { | |
| long entryWeight; | ||
| long entryValues; | ||
| long maxEntryValues; | ||
| SimpleByteHistogram entryWeightHistogram = new SimpleByteHistogram(); | ||
| SimpleByteHistogram valueWeightHistogram = new SimpleByteHistogram(); | ||
| SimpleByteHistogram keyWeightHistogram = new SimpleByteHistogram(); | ||
|
|
||
| void addEntryWeight(long weight) { | ||
| entryWeightHistogram.add(weight); | ||
| } | ||
|
|
||
| void addValueWeight(long weight) { | ||
| valueWeightHistogram.add(weight); | ||
| } | ||
|
|
||
| void addKeyWeight(long weight) { | ||
| keyWeightHistogram.add(weight); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -413,7 +485,15 @@ public <T extends State> void put( | |
| } | ||
|
|
||
| public void persist() { | ||
| localCache.forEach(stateCache::put); | ||
| long limit = WindmillStateCache.this.getMaxCachedEntryBytesLimit(); | ||
| localCache.forEach( | ||
| (id, entry) -> { | ||
| if (entry.getWeight() <= limit) { | ||
| stateCache.put(id, entry); | ||
|
Comment on lines
+491
to
+492
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The limit is compared against |
||
| } else { | ||
| stateCache.invalidate(id); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.beam.runners.dataflow.worker.util; | ||
|
|
||
| import static org.junit.Assert.assertEquals; | ||
|
|
||
| import org.junit.Test; | ||
| import org.junit.runner.RunWith; | ||
| import org.junit.runners.JUnit4; | ||
|
|
||
| /** Tests for {@link SimpleByteHistogram}. */ | ||
| @RunWith(JUnit4.class) | ||
| public class SimpleByteHistogramTest { | ||
|
|
||
| @Test | ||
| public void testHistogram() { | ||
| SimpleByteHistogram histogram = new SimpleByteHistogram(); | ||
| histogram.add(10); // <128B | ||
| histogram.add(127); // <128B | ||
| histogram.add(128); // <256B | ||
| histogram.add(255); // <256B | ||
| histogram.add(256); // <512B | ||
| histogram.add(511); // <512B | ||
| histogram.add(512); // <1KB | ||
| histogram.add(1023); // <1KB | ||
| histogram.add(1024); // <10KB | ||
| histogram.add(10240 - 1); // <10KB | ||
| histogram.add(10240); // <1MB | ||
| histogram.add(1048576 - 1); // <1MB | ||
| histogram.add(1048576); // >=1MB | ||
| histogram.add(2000000); // >=1MB | ||
|
|
||
| String expected = "[<128B:2, <256B:2, <512B:2, <1KB:2, <10KB:2, <1MB:2, >=1MB:2]"; | ||
| assertEquals(expected, histogram.format()); | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.