3838import com .google .common .collect .Iterables ;
3939import com .google .common .collect .Streams ;
4040import io .quarkus .test .junit .QuarkusMock ;
41+ import io .smallrye .common .annotation .Identifier ;
4142import jakarta .annotation .Nonnull ;
43+ import jakarta .enterprise .inject .Any ;
44+ import jakarta .enterprise .inject .Instance ;
4245import jakarta .inject .Inject ;
4346import java .io .IOException ;
4447import java .io .UncheckedIOException ;
143146import org .apache .polaris .service .events .PolarisEventDispatcher ;
144147import org .apache .polaris .service .events .PolarisEventMetadataFactory ;
145148import org .apache .polaris .service .events .PolarisEventType ;
146- import org .apache .polaris .service .events .listeners .TestPolarisEventDispatcher ;
149+ import org .apache .polaris .service .events .listeners .PolarisEventListener ;
150+ import org .apache .polaris .service .events .listeners .TestPolarisEventListener ;
147151import org .apache .polaris .service .exception .FakeAzureHttpResponse ;
148152import org .apache .polaris .service .exception .IcebergExceptionMapper ;
149153import org .apache .polaris .service .storage .PolarisStorageIntegrationProviderImpl ;
@@ -203,6 +207,7 @@ public Map<String, String> getConfigOverrides() {
203207 .put ("polaris.features.\" LIST_PAGINATION_ENABLED\" " , "true" )
204208 .put ("polaris.behavior-changes.\" ALLOW_NAMESPACE_CUSTOM_LOCATION\" " , "true" )
205209 .put ("polaris.test.rootAugmentor.enabled" , "true" )
210+ .put ("polaris.event-listener.types" , "test" )
206211 .build ();
207212 }
208213 }
@@ -233,7 +238,7 @@ public Map<String, String> getConfigOverrides() {
233238 @ Inject PolarisStorageIntegrationProvider storageIntegrationProvider ;
234239 @ Inject ServiceIdentityProvider serviceIdentityProvider ;
235240 @ Inject PolarisDiagnostics diagServices ;
236- @ Inject PolarisEventDispatcher polarisEventDispatcher ;
241+ @ Inject @ Any Instance < PolarisEventListener > polarisEventListener ;
237242 @ Inject PolarisEventMetadataFactory eventMetadataFactory ;
238243 @ Inject PolarisMetaStoreManager metaStoreManager ;
239244 @ Inject CallContext callContext ;
@@ -246,14 +251,15 @@ public Map<String, String> getConfigOverrides() {
246251 @ Inject PolarisPrincipal authenticatedRoot ;
247252 @ Inject PolarisAdminService adminService ;
248253 @ Inject ResolverFactory resolverFactory ;
254+ @ Inject PolarisEventDispatcher polarisEventDispatcher ;
249255
250256 private IcebergCatalog catalog ;
251257 private String realmName ;
252258 private PolarisCallContext polarisContext ;
253259 private InMemoryFileIO fileIO ;
254260 private PolarisEntity catalogEntity ;
255261
256- private TestPolarisEventDispatcher testPolarisEventDispatcher ;
262+ private TestPolarisEventListener testPolarisEventListener ;
257263
258264 @ BeforeAll
259265 public static void setUpMocks () {
@@ -326,9 +332,9 @@ public void before(TestInfo testInfo) {
326332 .thenReturn ((PolarisStorageIntegration ) storageIntegration );
327333
328334 this .catalog = initCatalog ("my-catalog" , ImmutableMap .of ());
329-
330- testPolarisEventDispatcher = ( TestPolarisEventDispatcher ) polarisEventDispatcher ;
331- testPolarisEventDispatcher .clear ();
335+ testPolarisEventListener =
336+ ( TestPolarisEventListener ) polarisEventListener . select ( Identifier . Literal . of ( "test" )). get () ;
337+ testPolarisEventListener .clear ();
332338 }
333339
334340 @ AfterEach
@@ -2477,13 +2483,13 @@ public void testEventsAreEmitted() {
24772483 table .updateProperties ().set (key , valNew ).commit ();
24782484
24792485 PolarisEvent beforeRefreshEvent =
2480- testPolarisEventDispatcher .getLatest (PolarisEventType .BEFORE_REFRESH_TABLE );
2486+ testPolarisEventListener .getLatest (PolarisEventType .BEFORE_REFRESH_TABLE );
24812487 Assertions .assertThat (
24822488 beforeRefreshEvent .attributes ().getRequired (EventAttributes .TABLE_IDENTIFIER ))
24832489 .isEqualTo (TestData .TABLE );
24842490
24852491 PolarisEvent afterRefreshEvent =
2486- testPolarisEventDispatcher .getLatest (PolarisEventType .AFTER_REFRESH_TABLE );
2492+ testPolarisEventListener .getLatest (PolarisEventType .AFTER_REFRESH_TABLE );
24872493 Assertions .assertThat (
24882494 afterRefreshEvent .attributes ().getRequired (EventAttributes .TABLE_IDENTIFIER ))
24892495 .isEqualTo (TestData .TABLE );
0 commit comments