1919
2020import static org .junit .jupiter .api .Assertions .assertEquals ;
2121import static org .mockito .ArgumentMatchers .any ;
22+ import static org .mockito .ArgumentMatchers .eq ;
2223import static org .mockito .Mockito .doAnswer ;
2324import static org .mockito .Mockito .doNothing ;
25+ import static org .mockito .Mockito .doThrow ;
2426import static org .mockito .Mockito .mock ;
2527import static org .mockito .Mockito .never ;
2628import static org .mockito .Mockito .times ;
5052import org .apache .hadoop .hdds .security .token .OzoneBlockTokenIdentifier ;
5153import org .apache .hadoop .security .token .Token ;
5254import org .apache .ratis .thirdparty .com .google .protobuf .ByteString ;
55+ import org .apache .ratis .thirdparty .io .grpc .stub .ClientCallStreamObserver ;
5356import org .junit .jupiter .api .Test ;
5457
5558/**
@@ -97,7 +100,8 @@ public void testReleasesStreamPermitAtBlockEof() throws Exception {
97100 byte [] data = new byte [] {1 , 2 , 3 , 4 };
98101 long length = data .length ;
99102 Pipeline pipeline = mockStandalonePipeline ();
100- XceiverClientGrpc xceiverClient = mockStreamingReadClient (data );
103+ ClientCallStreamObserver <ContainerCommandRequestProto > requestObserver = mock (ClientCallStreamObserver .class );
104+ XceiverClientGrpc xceiverClient = mockStreamingReadClient (data , requestObserver );
101105 XceiverClientFactory xceiverClientFactory = mock (XceiverClientFactory .class );
102106 when (xceiverClientFactory .acquireClientForReadData (any (Pipeline .class )))
103107 .thenReturn (xceiverClient );
@@ -115,13 +119,73 @@ public void testReleasesStreamPermitAtBlockEof() throws Exception {
115119 assertEquals (data [(int ) length - 1 ] & 0xFF , last );
116120 assertEquals (length , sbis .getPos ());
117121 verify (xceiverClient , times (1 )).completeStreamRead ();
122+ verify (requestObserver , times (1 )).onCompleted ();
118123
119124 // Subsequent reads should return EOF and must not trigger duplicate permit release.
120125 assertEquals (-1 , sbis .read ());
121126 assertEquals (-1 , sbis .read ());
122127 }
123128
124129 verify (xceiverClient , times (1 )).completeStreamRead ();
130+ verify (requestObserver , times (1 )).onCompleted ();
131+ verify (requestObserver , never ()).cancel (any (), any ());
132+ }
133+
134+ @ Test
135+ public void testCancelsRequestStreamWhenOnCompletedThrows () throws Exception {
136+ OzoneClientConfig clientConfig = newStreamReadConfig ();
137+ BlockID blockID = new BlockID (1L , 3L );
138+ byte [] data = new byte [] {1 , 2 , 3 , 4 };
139+ Pipeline pipeline = mockStandalonePipeline ();
140+ ClientCallStreamObserver <ContainerCommandRequestProto > requestObserver = mock (ClientCallStreamObserver .class );
141+ RuntimeException closeFailure = new RuntimeException ("close failed" );
142+ doThrow (closeFailure ).when (requestObserver ).onCompleted ();
143+
144+ XceiverClientGrpc xceiverClient = mockStreamingReadClient (data , requestObserver );
145+ XceiverClientFactory xceiverClientFactory = mock (XceiverClientFactory .class );
146+ when (xceiverClientFactory .acquireClientForReadData (any (Pipeline .class ))).thenReturn (xceiverClient );
147+
148+ try (StreamBlockInputStream sbis = new StreamBlockInputStream (
149+ blockID , data .length , pipeline , null , xceiverClientFactory , NO_REFRESH , clientConfig )) {
150+ ByteBuffer all = ByteBuffer .allocate (data .length );
151+ assertEquals (data .length , sbis .read (all ));
152+ assertEquals (data .length , sbis .getPos ());
153+ assertEquals (-1 , sbis .read ());
154+ }
155+
156+ verify (requestObserver , times (1 )).onCompleted ();
157+ verify (requestObserver , times (1 )).cancel (eq ("StreamBlockInputStream closed" ), eq (closeFailure ));
158+ verify (xceiverClient , times (1 )).completeStreamRead ();
159+ }
160+
161+ @ Test
162+ public void testCloseDoesNotFailWhenOnCompletedAndCancelThrow () throws Exception {
163+ OzoneClientConfig clientConfig = newStreamReadConfig ();
164+ BlockID blockID = new BlockID (1L , 4L );
165+ byte [] data = new byte [] {1 , 2 , 3 , 4 };
166+ Pipeline pipeline = mockStandalonePipeline ();
167+ ClientCallStreamObserver <ContainerCommandRequestProto > requestObserver = mock (ClientCallStreamObserver .class );
168+ RuntimeException closeFailure = new RuntimeException ("close failed" );
169+ RuntimeException cancelFailure = new RuntimeException ("cancel failed" );
170+ doThrow (closeFailure ).when (requestObserver ).onCompleted ();
171+ doThrow (cancelFailure ).when (requestObserver )
172+ .cancel (eq ("StreamBlockInputStream closed" ), eq (closeFailure ));
173+
174+ XceiverClientGrpc xceiverClient = mockStreamingReadClient (data , requestObserver );
175+ XceiverClientFactory xceiverClientFactory = mock (XceiverClientFactory .class );
176+ when (xceiverClientFactory .acquireClientForReadData (any (Pipeline .class ))).thenReturn (xceiverClient );
177+
178+ try (StreamBlockInputStream sbis = new StreamBlockInputStream (
179+ blockID , data .length , pipeline , null , xceiverClientFactory , NO_REFRESH , clientConfig )) {
180+ ByteBuffer all = ByteBuffer .allocate (data .length );
181+ assertEquals (data .length , sbis .read (all ));
182+ assertEquals (data .length , sbis .getPos ());
183+ assertEquals (-1 , sbis .read ());
184+ }
185+
186+ verify (requestObserver , times (1 )).onCompleted ();
187+ verify (requestObserver , times (1 )).cancel (eq ("StreamBlockInputStream closed" ), eq (closeFailure ));
188+ verify (xceiverClient , times (1 )).completeStreamRead ();
125189 }
126190
127191 private OzoneClientConfig newStreamReadConfig () {
@@ -149,10 +213,12 @@ private Pipeline mockStandalonePipeline() throws Exception {
149213 return pipeline ;
150214 }
151215
152- private XceiverClientGrpc mockStreamingReadClient (byte [] data ) throws Exception {
216+ private XceiverClientGrpc mockStreamingReadClient (byte [] data ,
217+ ClientCallStreamObserver <ContainerCommandRequestProto > requestObserver ) throws Exception {
153218 XceiverClientGrpc xceiverClient = mock (XceiverClientGrpc .class );
154219 StreamingReadResponse streamingReadResponse = mock (StreamingReadResponse .class );
155220 ReadBlockResponseProto readBlock = buildReadBlockResponse (data );
221+ when (streamingReadResponse .getRequestObserver ()).thenReturn (requestObserver );
156222
157223 doNothing ().when (xceiverClient )
158224 .streamRead (any (ContainerCommandRequestProto .class ),
0 commit comments