Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.time.Clock
import java.time.Duration
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Periodically removes DELIVERED outbox entries older than [retentionDuration].
* Periodically removes DELIVERED outbox entries older than [OutboxPurgerConfig.retention].
*
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
Expand All @@ -19,17 +18,9 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
class OutboxPurger(
private val outboxStore: OutboxStore,
private val retentionDuration: Duration = Duration.ofDays(7),
private val intervalMs: Long = 3_600_000L,
private val batchSize: Int = 100,
private val config: OutboxPurgerConfig = OutboxPurgerConfig(),
private val clock: Clock = Clock.systemUTC(),
) {
init {
require(retentionDuration > Duration.ZERO) { "retentionDuration must be positive, got: $retentionDuration" }
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}

private val running = AtomicBoolean(false)

private val scheduler: ScheduledExecutorService =
Expand All @@ -41,11 +32,12 @@ class OutboxPurger(
check(!scheduler.isShutdown) { "OutboxPurger cannot be restarted after stop()" }
if (!running.compareAndSet(false, true)) return
logger.info(
"Outbox purger started [retention={}, interval={}ms, batchSize={}]",
retentionDuration,
intervalMs,
batchSize,
"Outbox purger started [retention={}, interval={}, batchSize={}]",
config.retention,
config.interval,
config.batchSize,
)
val intervalMs = config.interval.toMillis()
scheduler.scheduleWithFixedDelay(::tick, intervalMs, intervalMs, TimeUnit.MILLISECONDS)
}

Expand All @@ -62,14 +54,14 @@ class OutboxPurger(

private fun tick() {
try {
val cutoff = clock.instant().minus(retentionDuration)
val cutoff = clock.instant().minus(config.retention)
var totalDeleted = 0
var batches = 0
do {
val deleted = outboxStore.removeDeliveredBefore(cutoff, batchSize)
val deleted = outboxStore.removeDeliveredBefore(cutoff, config.batchSize)
totalDeleted += deleted
batches++
} while (deleted == batchSize && batches < MAX_BATCHES_PER_TICK)
} while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK)

if (totalDeleted > 0) {
logger.debug("Purged {} delivered entries in {} batches", totalDeleted, batches)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.softwaremill.okapi.core

import java.time.Duration

data class OutboxPurgerConfig(
val retention: Duration = Duration.ofDays(7),
val interval: Duration = Duration.ofHours(1),
val batchSize: Int = 100,
) {
init {
require(!retention.isZero && !retention.isNegative) { "retention must be positive, got: $retention" }
require(!interval.isNegative && interval.toMillis() > 0) { "interval must be at least 1ms, got: $interval" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class OutboxScheduler(
fun start() {
check(!scheduler.isShutdown) { "OutboxScheduler cannot be restarted after stop()" }
if (!running.compareAndSet(false, true)) return
logger.info("Outbox processor started [interval={}ms, batchSize={}]", config.intervalMs, config.batchSize)
scheduler.scheduleWithFixedDelay(::tick, 0L, config.intervalMs, TimeUnit.MILLISECONDS)
logger.info("Outbox processor started [interval={}, batchSize={}]", config.interval, config.batchSize)
scheduler.scheduleWithFixedDelay(::tick, 0L, config.interval.toMillis(), TimeUnit.MILLISECONDS)
}

fun stop() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.softwaremill.okapi.core

import java.time.Duration

data class OutboxSchedulerConfig(
val intervalMs: Long = 1_000L,
val interval: Duration = Duration.ofSeconds(1),
val batchSize: Int = 10,
) {
init {
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
require(!interval.isNegative && interval.toMillis() > 0) { "interval must be at least 1ms, got: $interval" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.time.Duration
import java.time.Duration.ofDays
import java.time.Duration.ofHours
import java.time.Duration.ofMinutes
import java.time.Duration.ofNanos
import java.time.Duration.ofSeconds

class OutboxPurgerConfigTest : FunSpec({

test("default config has valid values") {
val config = OutboxPurgerConfig()
config.retention shouldBe ofDays(7)
config.interval shouldBe ofHours(1)
config.batchSize shouldBe 100
}

test("accepts custom valid values") {
val config = OutboxPurgerConfig(
retention = ofHours(12),
interval = ofSeconds(30),
batchSize = 50,
)
config.retention shouldBe ofHours(12)
config.interval shouldBe ofSeconds(30)
config.batchSize shouldBe 50
}

test("rejects zero retention") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(retention = Duration.ZERO)
}
}

test("rejects negative retention") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(retention = ofDays(-1))
}
}

test("rejects zero interval") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(interval = Duration.ZERO)
}
}

test("rejects negative interval") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(interval = ofMinutes(-5))
}
}

test("rejects zero batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(batchSize = 0)
}
}

test("rejects negative batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(batchSize = -10)
}
}

test("rejects sub-millisecond interval") {
shouldThrow<IllegalArgumentException> {
OutboxPurgerConfig(interval = ofNanos(1))
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.time.Clock
import java.time.Duration
import java.time.Duration.ofDays
import java.time.Duration.ofMillis
import java.time.Duration.ofMinutes
import java.time.Instant
import java.time.ZoneOffset
import java.util.concurrent.CountDownLatch
Expand All @@ -29,17 +31,19 @@ class OutboxPurgerTest : FunSpec({

val purger = OutboxPurger(
outboxStore = store,
retentionDuration = Duration.ofDays(7),
intervalMs = 50,
batchSize = 100,
config = OutboxPurgerConfig(
retention = ofDays(7),
interval = ofMillis(50),
batchSize = 100,
),
clock = fixedClock,
)

purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

capturedCutoff shouldBe fixedNow.minus(Duration.ofDays(7))
capturedCutoff shouldBe fixedNow.minus(ofDays(7))
capturedLimit shouldBe 100
}

Expand All @@ -49,14 +53,18 @@ class OutboxPurgerTest : FunSpec({
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
if (count == 1) {
100 // first batch: full
100
} else {
latch.countDown()
42 // second batch: partial, loop stops
42
}
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()
Expand All @@ -70,10 +78,14 @@ class OutboxPurgerTest : FunSpec({
val store = stubStore(onRemove = { _, _ ->
val count = callCount.incrementAndGet()
if (count >= 10) latch.countDown()
100 // always full, would loop forever without guard
100
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()
Expand All @@ -91,7 +103,11 @@ class OutboxPurgerTest : FunSpec({
0
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
purger.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()
Expand All @@ -108,9 +124,13 @@ class OutboxPurgerTest : FunSpec({
0
})

val purger = OutboxPurger(store, intervalMs = 50, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100),
clock = fixedClock,
)
purger.start()
purger.start()
purger.start() // second start should be ignored
latch.await(2, TimeUnit.SECONDS) shouldBe true
purger.stop()

Expand All @@ -119,7 +139,11 @@ class OutboxPurgerTest : FunSpec({

test("isRunning transitions") {
val store = stubStore(onRemove = { _, _ -> 0 })
val purger = OutboxPurger(store, intervalMs = 60_000, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = store,
config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100),
clock = fixedClock,
)

purger.isRunning() shouldBe false
purger.start()
Expand All @@ -128,26 +152,12 @@ class OutboxPurgerTest : FunSpec({
purger.isRunning() shouldBe false
}

test("constructor rejects invalid batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), batchSize = 0, clock = fixedClock)
}
}

test("constructor rejects zero retentionDuration") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), retentionDuration = Duration.ZERO, clock = fixedClock)
}
}

test("constructor rejects negative intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxPurger(stubStore(), intervalMs = -1, clock = fixedClock)
}
}

test("start after stop throws") {
val purger = OutboxPurger(stubStore(), intervalMs = 60_000, batchSize = 100, clock = fixedClock)
val purger = OutboxPurger(
outboxStore = stubStore(),
config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100),
clock = fixedClock,
)

purger.start()
purger.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,34 @@ package com.softwaremill.okapi.core
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.time.Duration
import java.time.Duration.ofMillis
import java.time.Duration.ofNanos
import java.time.Duration.ofSeconds

class OutboxSchedulerConfigTest : FunSpec({

test("default config has valid values") {
val config = OutboxSchedulerConfig()
config.intervalMs shouldBe 1_000L
config.interval shouldBe ofSeconds(1)
config.batchSize shouldBe 10
}

test("accepts custom valid values") {
val config = OutboxSchedulerConfig(intervalMs = 500, batchSize = 50)
config.intervalMs shouldBe 500L
val config = OutboxSchedulerConfig(interval = ofMillis(500), batchSize = 50)
config.interval shouldBe ofMillis(500)
config.batchSize shouldBe 50
}

test("rejects zero intervalMs") {
test("rejects zero interval") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = 0)
OutboxSchedulerConfig(interval = Duration.ZERO)
}
}

test("rejects negative intervalMs") {
test("rejects negative interval") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = -1)
OutboxSchedulerConfig(interval = ofMillis(-1))
}
}

Expand All @@ -41,4 +45,10 @@ class OutboxSchedulerConfigTest : FunSpec({
OutboxSchedulerConfig(batchSize = -5)
}
}

test("rejects sub-millisecond interval") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(interval = ofNanos(1))
}
}
})
Loading