1212import java .time .Duration ;
1313import java .time .Instant ;
1414import java .time .LocalDateTime ;
15+ import java .time .temporal .ChronoUnit ;
1516import java .util .*;
1617import java .util .concurrent .Executors ;
1718import java .util .concurrent .Future ;
19+ import java .util .stream .Stream ;
1820
1921public class EventSQLBenchmarksRunner {
2022
@@ -26,6 +28,7 @@ public class EventSQLBenchmarksRunner {
2628 static final EventSQLDialect SQL_DIALECT ;
2729 static final int RUNNER_INSTANCES = envIntValueOrDefault ("RUNNER_INSTANCES" , 1 );
2830 static final int EVENTS_TO_PUBLISH = envIntValueOrDefault ("EVENTS_TO_PUBLISH" , 60_000 );
31+ static final int EVENTS_BATCH_SIZE = envIntValueOrDefault ("EVENTS_BATCH_SIZE" , 1 );
2932 static final int EVENTS_RATE = envIntValueOrDefault ("EVENTS_RATE" , 1_000 );
3033 static final String TEST_TOPIC = envValueOrDefault ("TEST_TOPIC" , "account_created" );
3134 static final String TEST_CONSUMER = envValueOrDefault ("TEST_CONSUMER" , "benchmarks-consumer" );
@@ -182,15 +185,27 @@ static <T> T executeQuery(DataSource source, String query, ResultSetMapper<T> re
182185 static void publishEvents (EventSQLPublisher publisher ) throws Exception {
183186 var futures = new LinkedList <Future <?>>();
184187
188+ String eventsWerePublishedMessage ;
189+ if (EVENTS_BATCH_SIZE > 1 ) {
190+ eventsWerePublishedMessage = "events were published - in batches of " + EVENTS_BATCH_SIZE ;
191+ } else {
192+ eventsWerePublishedMessage = "events were published" ;
193+ }
194+
195+ var publishRate = EVENTS_RATE / EVENTS_BATCH_SIZE ;
196+ var batchesToPublish = EVENTS_TO_PUBLISH / EVENTS_BATCH_SIZE ;
185197 try (var executor = Executors .newVirtualThreadPerTaskExecutor ()) {
186- for (var i = 0 ; i < EVENTS_TO_PUBLISH ; i ++) {
187- var result = executor .submit (() -> publishRandomEvent (publisher ));
198+ for (var i = 0 ; i < batchesToPublish ; i ++) {
199+ var result = executor .submit (() -> publishRandomEventOrEventsBatch (publisher ));
188200 futures .add (result );
189201
190202 var publications = i + 1 ;
191- if (futures .size () >= EVENTS_RATE && publications < EVENTS_TO_PUBLISH ) {
192- System .out .printf ("%s, %d/%d events were published, waiting 1s before next publications...%n" ,
193- LocalDateTime .now (), publications , EVENTS_TO_PUBLISH );
203+ if (futures .size () >= publishRate && publications < batchesToPublish ) {
204+ System .out .printf ("%s, %d/%d %s, waiting 1s before next publications...%n" ,
205+ LocalDateTime .now ().truncatedTo (ChronoUnit .MILLIS ),
206+ publications * EVENTS_BATCH_SIZE ,
207+ EVENTS_TO_PUBLISH ,
208+ eventsWerePublishedMessage );
194209 Thread .sleep (1000 );
195210 awaitForFutures (futures );
196211 futures .clear ();
@@ -204,11 +219,18 @@ static void publishEvents(EventSQLPublisher publisher) throws Exception {
204219 }
205220 }
206221
207- static void publishRandomEvent (EventSQLPublisher publisher ) {
222+ static void publishRandomEventOrEventsBatch (EventSQLPublisher publisher ) {
208223 try {
209224 // make publication more evenly distributed in time
210225 Thread .sleep (RANDOM .nextInt (1000 ));
211- publisher .publish (nextEvent ());
226+ if (EVENTS_BATCH_SIZE > 1 ) {
227+ var batch = Stream .generate (EventSQLBenchmarksRunner ::nextEvent )
228+ .limit (EVENTS_BATCH_SIZE )
229+ .toList ();
230+ publisher .publishAll (batch );
231+ } else {
232+ publisher .publish (nextEvent ());
233+ }
212234 } catch (Exception e ) {
213235 e .printStackTrace ();
214236 }
0 commit comments