|
45 | 45 | import java.io.ObjectInputStream; |
46 | 46 | import java.io.ObjectOutputStream; |
47 | 47 | import java.util.Collections; |
| 48 | +import java.util.concurrent.CountDownLatch; |
48 | 49 | import java.util.concurrent.atomic.AtomicLong; |
49 | 50 | import org.junit.jupiter.api.AfterEach; |
50 | 51 | import org.junit.jupiter.api.BeforeEach; |
@@ -224,6 +225,57 @@ public void testManagerTriggersRefreshInGracePeriod() throws InterruptedExceptio |
224 | 225 | assertEquals(newerEncoded, resultRab.getEncodedLocations()); |
225 | 226 | } |
226 | 227 |
|
| 228 | + @Test |
| 229 | + public void testExecutorQueueCapacityLimit() throws Exception { |
| 230 | + final String url = "https://example.com/rab"; |
| 231 | + final AccessToken token = new AccessToken("token", new java.util.Date(System.currentTimeMillis() + 3600000L)); |
| 232 | + RegionalAccessBoundaryProvider provider = () -> url; |
| 233 | + |
| 234 | + int poolSize = 5; |
| 235 | + int queueCapacity = 100; |
| 236 | + int totalCapacity = poolSize + queueCapacity; |
| 237 | + |
| 238 | + CountDownLatch latch = new CountDownLatch(1); |
| 239 | + |
| 240 | + java.io.InputStream blockingStream = new java.io.InputStream() { |
| 241 | + private final java.io.InputStream delegate = new ByteArrayInputStream("{\"encodedLocations\": \"encoded\", \"locations\": [\"loc\"]}".getBytes()); |
| 242 | + private boolean blocked = false; |
| 243 | + |
| 244 | + @Override |
| 245 | + public int read() throws java.io.IOException { |
| 246 | + if (!blocked) { |
| 247 | + try { |
| 248 | + latch.await(); |
| 249 | + } catch (InterruptedException e) { |
| 250 | + Thread.currentThread().interrupt(); |
| 251 | + } |
| 252 | + blocked = true; |
| 253 | + } |
| 254 | + return delegate.read(); |
| 255 | + } |
| 256 | + }; |
| 257 | + |
| 258 | + MockHttpTransport transport = new MockHttpTransport.Builder() |
| 259 | + .setLowLevelHttpResponse(new MockLowLevelHttpResponse().setContent(blockingStream).setContentType("application/json")) |
| 260 | + .build(); |
| 261 | + HttpTransportFactory transportFactory = () -> transport; |
| 262 | + |
| 263 | + RegionalAccessBoundaryManager[] managers = new RegionalAccessBoundaryManager[totalCapacity]; |
| 264 | + for (int i = 0; i < totalCapacity; i++) { |
| 265 | + managers[i] = new RegionalAccessBoundaryManager(testClock); |
| 266 | + managers[i].triggerAsyncRefresh(transportFactory, provider, token); |
| 267 | + } |
| 268 | + |
| 269 | + RegionalAccessBoundaryManager extraManager = new RegionalAccessBoundaryManager(testClock); |
| 270 | + assertFalse(extraManager.isCooldownActive()); |
| 271 | + |
| 272 | + extraManager.triggerAsyncRefresh(transportFactory, provider, token); |
| 273 | + |
| 274 | + assertTrue(extraManager.isCooldownActive(), "106th task should have been rejected and entered cooldown"); |
| 275 | + |
| 276 | + latch.countDown(); |
| 277 | + } |
| 278 | + |
227 | 279 | private static class TestClock implements Clock { |
228 | 280 | private final AtomicLong currentTime = new AtomicLong(System.currentTimeMillis()); |
229 | 281 |
|
|
0 commit comments