5656import org .apache .beam .sdk .transforms .Sum ;
5757import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter ;
5858import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter .BundleFinalizerParameter ;
59+ import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter .CausedByDrainParameter ;
5960import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter .ElementParameter ;
6061import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter .FinishBundleContextParameter ;
6162import org .apache .beam .sdk .transforms .reflect .DoFnSignature .Parameter .OutputReceiverParameter ;
7879import org .apache .beam .sdk .transforms .windowing .BoundedWindow ;
7980import org .apache .beam .sdk .transforms .windowing .PaneInfo ;
8081import org .apache .beam .sdk .transforms .windowing .TimestampCombiner ;
82+ import org .apache .beam .sdk .values .CausedByDrain ;
8183import org .apache .beam .sdk .values .KV ;
8284import org .apache .beam .sdk .values .Row ;
8385import org .apache .beam .sdk .values .TypeDescriptor ;
@@ -130,10 +132,11 @@ public void process(
130132 PipelineOptions options ,
131133 @ SideInput ("tag1" ) String input1 ,
132134 @ SideInput ("tag2" ) Integer input2 ,
133- BundleFinalizer bundleFinalizer ) {}
135+ BundleFinalizer bundleFinalizer ,
136+ CausedByDrain causedByDrain ) {}
134137 }.getClass ());
135138
136- assertThat (sig .processElement ().extraParameters ().size (), equalTo (9 ));
139+ assertThat (sig .processElement ().extraParameters ().size (), equalTo (10 ));
137140 assertThat (sig .processElement ().extraParameters ().get (0 ), instanceOf (ElementParameter .class ));
138141 assertThat (sig .processElement ().extraParameters ().get (1 ), instanceOf (TimestampParameter .class ));
139142 assertThat (sig .processElement ().extraParameters ().get (2 ), instanceOf (WindowParameter .class ));
@@ -146,6 +149,8 @@ public void process(
146149 assertThat (sig .processElement ().extraParameters ().get (7 ), instanceOf (SideInputParameter .class ));
147150 assertThat (
148151 sig .processElement ().extraParameters ().get (8 ), instanceOf (BundleFinalizerParameter .class ));
152+ assertThat (
153+ sig .processElement ().extraParameters ().get (9 ), instanceOf (CausedByDrainParameter .class ));
149154 }
150155
151156 @ Test
@@ -585,6 +590,31 @@ public void onTimer(BoundedWindow w) {}
585590 instanceOf (WindowParameter .class ));
586591 }
587592
593+ @ Test
594+ public void testCausedByDrainOnTimer () throws Exception {
595+ final String timerId = "some-timer-id" ;
596+ final String timerDeclarationId = TimerDeclaration .PREFIX + timerId ;
597+
598+ DoFnSignature sig =
599+ DoFnSignatures .getSignature (
600+ new DoFn <String , String >() {
601+
602+ @ TimerId (timerId )
603+ private final TimerSpec myfield1 = TimerSpecs .timer (TimeDomain .EVENT_TIME );
604+
605+ @ ProcessElement
606+ public void process (ProcessContext c ) {}
607+
608+ @ OnTimer (timerId )
609+ public void onTimer (CausedByDrain causedByDrain ) {}
610+ }.getClass ());
611+
612+ assertThat (sig .onTimerMethods ().get (timerDeclarationId ).extraParameters ().size (), equalTo (1 ));
613+ assertThat (
614+ sig .onTimerMethods ().get (timerDeclarationId ).extraParameters ().get (0 ),
615+ instanceOf (CausedByDrainParameter .class ));
616+ }
617+
588618 @ Test
589619 public void testAllParamsOnTimer () throws Exception {
590620 final String timerId = "some-timer-id" ;
0 commit comments