@@ -4,7 +4,8 @@ import { Deferred, Effect, Layer, Stream } from "effect"
44import z from "zod"
55import { Bus } from "../../src/bus"
66import { BusEvent } from "../../src/bus/bus-event"
7- import { provideTmpdirInstance } from "../fixture/fixture"
7+ import { Instance } from "../../src/project/instance"
8+ import { provideInstance , provideTmpdirInstance , tmpdirScoped } from "../fixture/fixture"
89import { testEffect } from "../lib/effect"
910
1011const TestEvent = {
@@ -127,4 +128,37 @@ describe("Bus (Effect-native)", () => {
127128 } ) ,
128129 ) ,
129130 )
131+
132+ it . effect ( "subscribeAll stream sees InstanceDisposed on disposal" , ( ) =>
133+ Effect . gen ( function * ( ) {
134+ const dir = yield * tmpdirScoped ( )
135+ const types : string [ ] = [ ]
136+ const seen = yield * Deferred . make < void > ( )
137+ const disposed = yield * Deferred . make < void > ( )
138+
139+ // Set up subscriber inside the instance
140+ yield * Effect . gen ( function * ( ) {
141+ const bus = yield * Bus . Service
142+
143+ yield * Stream . runForEach ( bus . subscribeAll ( ) , ( evt ) =>
144+ Effect . sync ( ( ) => {
145+ types . push ( evt . type )
146+ if ( evt . type === TestEvent . Ping . type ) Deferred . doneUnsafe ( seen , Effect . void )
147+ if ( evt . type === Bus . InstanceDisposed . type ) Deferred . doneUnsafe ( disposed , Effect . void )
148+ } ) ,
149+ ) . pipe ( Effect . forkScoped )
150+
151+ yield * Effect . sleep ( "10 millis" )
152+ yield * bus . publish ( TestEvent . Ping , { value : 1 } )
153+ yield * Deferred . await ( seen )
154+ } ) . pipe ( provideInstance ( dir ) )
155+
156+ // Dispose from OUTSIDE the instance scope
157+ yield * Effect . promise ( ( ) => Instance . disposeAll ( ) )
158+ yield * Deferred . await ( disposed ) . pipe ( Effect . timeout ( "2 seconds" ) )
159+
160+ expect ( types ) . toContain ( "test.effect.ping" )
161+ expect ( types ) . toContain ( Bus . InstanceDisposed . type )
162+ } ) ,
163+ )
130164} )
0 commit comments