Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5615ecf
feat(core): add SLF4J API dependency for OutboxPurger logging
endrju19 Mar 25, 2026
70e75a5
feat(core): change OutboxStore.removeDeliveredBefore to accept limit …
endrju19 Mar 25, 2026
9df3848
feat(postgres): implement batched delete with FOR UPDATE SKIP LOCKED
endrju19 Mar 25, 2026
564af20
feat(db): add index on (status, last_attempt) for efficient purge que…
endrju19 Mar 25, 2026
24865e7
feat(core): rewrite OutboxPurger with batched delete, error handling,…
endrju19 Mar 25, 2026
58f4b51
feat(spring): add OkapiPurgerProperties with @ConfigurationProperties…
endrju19 Mar 25, 2026
62150a5
feat(spring): migrate OutboxPurgerScheduler to SmartLifecycle, bind f…
endrju19 Mar 25, 2026
66b4696
test(spring): add OutboxPurgerAutoConfigurationTest for properties bi…
endrju19 Mar 25, 2026
34404e2
style: apply ktlint formatting
endrju19 Mar 25, 2026
126d2fc
refactor(stores): use Exposed exec API instead of raw JDBC connection…
endrju19 Mar 25, 2026
dcbd7c1
refactor(spring): remove unused enabled field from OkapiPurgerProperties
endrju19 Mar 25, 2026
522c91d
fix: address PR review findings — restart guard, stop callback safety…
endrju19 Mar 26, 2026
41cfa25
refactor(spring): rename OkapiPurgerProperties to OutboxPurgerPropert…
endrju19 Mar 26, 2026
4681164
docs: fix KDoc accuracy — deliveryType routing, sync stop, batch-size…
endrju19 Mar 26, 2026
c85d18e
feat(core): add OutboxSchedulerConfig value object with validation
endrju19 Mar 26, 2026
9d8e21b
feat(core): rewrite OutboxScheduler with error handling, guards, logging
endrju19 Mar 26, 2026
1713ff3
feat(spring): add OutboxProcessorProperties with @ConfigurationProper…
endrju19 Mar 26, 2026
b7f2045
feat(spring): migrate OutboxProcessorScheduler to SmartLifecycle
endrju19 Mar 26, 2026
6161168
feat(spring): bind OutboxProcessorProperties in OutboxAutoConfiguration
endrju19 Mar 26, 2026
dd9dcbe
test(spring): add OutboxProcessorAutoConfigurationTest
endrju19 Mar 26, 2026
1b7c44c
feat(db): add index (status, created_at) for processor claimPending q…
endrju19 Mar 26, 2026
dc624d3
feat(spring): add processor properties to spring-configuration-metada…
endrju19 Mar 26, 2026
9035b97
merge: resolve conflicts with origin/main
endrju19 Apr 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,51 +1,64 @@
package com.softwaremill.okapi.core

import org.slf4j.LoggerFactory
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

/**
* Standalone scheduler that periodically calls [OutboxProcessor.processNext].
*
* Each tick is optionally wrapped in a transaction via [transactionRunner].
* Runs on a single daemon thread and provides explicit [start]/[stop] lifecycle.
* Runs on a single daemon thread with explicit [start]/[stop] lifecycle.
* [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.
* [AtomicBoolean] guards against accidental double-start, not restart.
*
* Framework-specific modules hook into their own lifecycle events:
* - `okapi-spring`: `SmartInitializingSingleton` / `DisposableBean`
* - `okapi-spring-boot`: `SmartLifecycle`
* - `okapi-ktor`: `ApplicationStarted` / `ApplicationStopped`
*/
class OutboxScheduler(
private val outboxProcessor: OutboxProcessor,
private val transactionRunner: TransactionRunner? = null,
private val intervalMs: Long = 1_000L,
private val batchSize: Int = 10,
private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(),
) {
private val running = AtomicBoolean(false)

private val scheduler: ScheduledExecutorService =
Executors.newSingleThreadScheduledExecutor { r ->
Thread(r, "outbox-processor").apply { isDaemon = true }
}

fun start() {
scheduler.scheduleWithFixedDelay(
::tick,
0L,
intervalMs,
TimeUnit.MILLISECONDS,
)
check(!scheduler.isShutdown) { "OutboxScheduler cannot be restarted after stop()" }
if (!running.compareAndSet(false, true)) return
logger.info("Outbox processor started [interval={}ms, batchSize={}]", config.intervalMs, config.batchSize)
scheduler.scheduleWithFixedDelay(::tick, 0L, config.intervalMs, TimeUnit.MILLISECONDS)
}

fun stop() {
if (!running.compareAndSet(true, false)) return
scheduler.shutdown()
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow()
}
logger.info("Outbox processor stopped")
}

fun isRunning(): Boolean = running.get()

private fun tick() {
if (transactionRunner != null) {
transactionRunner.runInTransaction { outboxProcessor.processNext(batchSize) }
} else {
outboxProcessor.processNext(batchSize)
try {
transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) }
?: outboxProcessor.processNext(config.batchSize)
logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize)
} catch (e: Exception) {
logger.error("Outbox processor tick failed, will retry at next scheduled interval", e)
}
}

companion object {
private val logger = LoggerFactory.getLogger(OutboxScheduler::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.softwaremill.okapi.core

data class OutboxSchedulerConfig(
val intervalMs: Long = 1_000L,
val batchSize: Int = 10,
) {
init {
require(intervalMs > 0) { "intervalMs must be positive, got: $intervalMs" }
require(batchSize > 0) { "batchSize must be positive, got: $batchSize" }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe

class OutboxSchedulerConfigTest : FunSpec({

test("default config has valid values") {
val config = OutboxSchedulerConfig()
config.intervalMs shouldBe 1_000L
config.batchSize shouldBe 10
}

test("accepts custom valid values") {
val config = OutboxSchedulerConfig(intervalMs = 500, batchSize = 50)
config.intervalMs shouldBe 500L
config.batchSize shouldBe 50
}

test("rejects zero intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = 0)
}
}

test("rejects negative intervalMs") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(intervalMs = -1)
}
}

test("rejects zero batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(batchSize = 0)
}
}

test("rejects negative batchSize") {
shouldThrow<IllegalArgumentException> {
OutboxSchedulerConfig(batchSize = -5)
}
}
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package com.softwaremill.okapi.core

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger

class OutboxSchedulerTest : FunSpec({

test("tick calls processNext with configured batchSize") {
var capturedLimit: Int? = null
val latch = CountDownLatch(1)
val processor = stubProcessor { limit ->
capturedLimit = limit
latch.countDown()
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50, batchSize = 25),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

capturedLimit shouldBe 25
}

test("exception in tick does not kill scheduler") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(2)
val processor = stubProcessor { _ ->
val count = callCount.incrementAndGet()
latch.countDown()
if (count == 1) throw RuntimeException("db connection lost")
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

callCount.get() shouldBe 2
}

test("double start is ignored") {
val callCount = AtomicInteger(0)
val latch = CountDownLatch(1)
val processor = stubProcessor { _ ->
callCount.incrementAndGet()
latch.countDown()
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

scheduler.isRunning() shouldBe false
}

test("isRunning transitions") {
val processor = stubProcessor { _ -> }
val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 60_000),
)

scheduler.isRunning() shouldBe false
scheduler.start()
scheduler.isRunning() shouldBe true
scheduler.stop()
scheduler.isRunning() shouldBe false
}

test("start after stop throws") {
val processor = stubProcessor { _ -> }
val scheduler = OutboxScheduler(
outboxProcessor = processor,
config = OutboxSchedulerConfig(intervalMs = 60_000),
)

scheduler.start()
scheduler.stop()

shouldThrow<IllegalStateException> {
scheduler.start()
}.message shouldBe "OutboxScheduler cannot be restarted after stop()"
}

test("transactionRunner wraps tick when provided") {
val txInvoked = AtomicBoolean(false)
val latch = CountDownLatch(1)
val processor = stubProcessor { _ -> latch.countDown() }
val txRunner = object : TransactionRunner {
override fun <T> runInTransaction(block: () -> T): T {
txInvoked.set(true)
return block()
}
}

val scheduler = OutboxScheduler(
outboxProcessor = processor,
transactionRunner = txRunner,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()

txInvoked.get() shouldBe true
}

test("tick runs without transactionRunner") {
val latch = CountDownLatch(1)
val processor = stubProcessor { _ -> latch.countDown() }

val scheduler = OutboxScheduler(
outboxProcessor = processor,
transactionRunner = null,
config = OutboxSchedulerConfig(intervalMs = 50),
)

scheduler.start()
latch.await(2, TimeUnit.SECONDS) shouldBe true
scheduler.stop()
}
})

private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor {
val store = object : OutboxStore {
override fun persist(entry: OutboxEntry) = entry
override fun claimPending(limit: Int): List<OutboxEntry> {
onProcessNext(limit)
return emptyList()
}
override fun updateAfterProcessing(entry: OutboxEntry) = entry
override fun removeDeliveredBefore(time: java.time.Instant, limit: Int) = 0
override fun findOldestCreatedAt(statuses: Set<OutboxStatus>) = emptyMap<OutboxStatus, java.time.Instant>()
override fun countByStatuses() = emptyMap<OutboxStatus, Long>()
}
val entryProcessor = OutboxEntryProcessor(
deliverer = object : MessageDeliverer {
override val type = "stub"
override fun deliver(entry: OutboxEntry) = DeliveryResult.Success
},
retryPolicy = RetryPolicy(maxRetries = 3),
clock = java.time.Clock.systemUTC(),
)
return OutboxProcessor(store = store, entryProcessor = entryProcessor)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.softwaremill.okapi.core.MessageDeliverer
import com.softwaremill.okapi.core.OutboxEntryProcessor
import com.softwaremill.okapi.core.OutboxProcessor
import com.softwaremill.okapi.core.OutboxPublisher
import com.softwaremill.okapi.core.OutboxSchedulerConfig
import com.softwaremill.okapi.core.OutboxStore
import com.softwaremill.okapi.core.RetryPolicy
import com.softwaremill.okapi.mysql.MysqlOutboxStore
Expand Down Expand Up @@ -42,7 +43,7 @@ import javax.sql.DataSource
* - [PlatformTransactionManager] — if absent, each store call runs in its own transaction
*/
@AutoConfiguration
@EnableConfigurationProperties(OutboxPurgerProperties::class)
@EnableConfigurationProperties(OutboxPurgerProperties::class, OutboxProcessorProperties::class)
class OutboxAutoConfiguration {
@Bean
@ConditionalOnMissingBean
Expand Down Expand Up @@ -82,13 +83,19 @@ class OutboxAutoConfiguration {

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "okapi.processor", name = ["enabled"], havingValue = "true", matchIfMissing = true)
fun outboxProcessorScheduler(
props: OutboxProcessorProperties,
outboxProcessor: OutboxProcessor,
transactionManager: ObjectProvider<PlatformTransactionManager>,
): OutboxProcessorScheduler {
return OutboxProcessorScheduler(
outboxProcessor = outboxProcessor,
transactionTemplate = transactionManager.getIfAvailable()?.let { TransactionTemplate(it) },
config = OutboxSchedulerConfig(
intervalMs = props.intervalMs,
batchSize = props.batchSize,
),
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.softwaremill.okapi.springboot

import jakarta.validation.constraints.Min
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.validation.annotation.Validated

@ConfigurationProperties(prefix = "okapi.processor")
@Validated
data class OutboxProcessorProperties(
@field:Min(1) val intervalMs: Long = 1_000,
@field:Min(1) val batchSize: Int = 10,
)
Loading