@@ -199,33 +199,69 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
199199 _to : BlockNumber ,
200200 _filter : & TriggerFilter ,
201201 ) -> Result < Vec < BlockWithTriggers < Chain > > , Error > {
202- // FIXME (NEAR): Scanning triggers makes little sense in Firehose approach, let's see
203- Ok ( vec ! [ ] )
202+ panic ! ( "Should never be called since not used by FirehoseBlockStream" )
204203 }
205204
206205 async fn triggers_in_block (
207206 & self ,
208207 _logger : & Logger ,
209- _block : codec:: Block ,
208+ block : codec:: Block ,
210209 _filter : & TriggerFilter ,
211210 ) -> Result < BlockWithTriggers < Chain > , Error > {
212- // FIXME (NEAR): Share implementation with FirehoseMapper::firehose_triggers_in_block version.
213- // This is currently unreachable since Near does not yet support dynamic data sources.
214- todo ! ( )
211+ // TODO: Find the best place to introduce an `Arc` and avoid this clone.
212+ let shared_block = Arc :: new ( block. clone ( ) ) ;
213+
214+ // Filter non-successful or non-action receipts.
215+ let receipts = block. shards . iter ( ) . flat_map ( |shard| {
216+ shard
217+ . receipt_execution_outcomes
218+ . iter ( )
219+ . filter_map ( |outcome| {
220+ if !outcome
221+ . execution_outcome
222+ . as_ref ( ) ?
223+ . outcome
224+ . as_ref ( ) ?
225+ . status
226+ . as_ref ( ) ?
227+ . is_success ( )
228+ {
229+ return None ;
230+ }
231+ if !matches ! (
232+ outcome. receipt. as_ref( ) ?. receipt,
233+ Some ( codec:: receipt:: Receipt :: Action ( _) )
234+ ) {
235+ return None ;
236+ }
237+
238+ Some ( trigger:: ReceiptWithOutcome {
239+ outcome : outcome. execution_outcome . as_ref ( ) ?. clone ( ) ,
240+ receipt : outcome. receipt . as_ref ( ) ?. clone ( ) ,
241+ block : shared_block. cheap_clone ( ) ,
242+ } )
243+ } )
244+ } ) ;
245+
246+ let mut trigger_data: Vec < _ > = receipts
247+ . map ( |r| NearTrigger :: Receipt ( Arc :: new ( r) ) )
248+ . collect ( ) ;
249+
250+ trigger_data. push ( NearTrigger :: Block ( shared_block. cheap_clone ( ) ) ) ;
251+
252+ Ok ( BlockWithTriggers :: new ( block, trigger_data) )
215253 }
216254
217255 async fn is_on_main_chain ( & self , _ptr : BlockPtr ) -> Result < bool , Error > {
218- // FIXME (NEAR): Might not be necessary for NEAR support for now
219- Ok ( true )
256+ panic ! ( "Should never be called since not used by FirehoseBlockStream" )
220257 }
221258
222259 fn ancestor_block (
223260 & self ,
224261 _ptr : BlockPtr ,
225262 _offset : BlockNumber ,
226263 ) -> Result < Option < codec:: Block > , Error > {
227- // FIXME (NEAR): Might not be necessary for NEAR support for now
228- Ok ( None )
264+ panic ! ( "Should never be called since FirehoseBlockStream cannot resolve it" )
229265 }
230266
231267 /// Panics if `block` is genesis.
@@ -241,12 +277,13 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
241277
242278pub struct FirehoseMapper { }
243279
280+ #[ async_trait]
244281impl FirehoseMapperTrait < Chain > for FirehoseMapper {
245- fn to_block_stream_event (
282+ async fn to_block_stream_event (
246283 & self ,
247- _logger : & Logger ,
284+ logger : & Logger ,
248285 response : & bstream:: BlockResponseV2 ,
249- _adapter : & TriggersAdapter ,
286+ adapter : & TriggersAdapter ,
250287 filter : & TriggerFilter ,
251288 ) -> Result < BlockStreamEvent < Chain > , FirehoseError > {
252289 let step = bstream:: ForkStep :: from_i32 ( response. step ) . unwrap_or_else ( || {
@@ -271,7 +308,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
271308
272309 match step {
273310 bstream:: ForkStep :: StepNew => Ok ( BlockStreamEvent :: ProcessBlock (
274- self . firehose_triggers_in_block ( & block, filter) ?,
311+ adapter . triggers_in_block ( logger , block, filter) . await ?,
275312 Some ( response. cursor . clone ( ) ) ,
276313 ) ) ,
277314
@@ -299,62 +336,6 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
299336 }
300337}
301338
302- impl FirehoseMapper {
303- // FIXME: This should be replaced by using the `TriggersAdapter` struct directly. However, the TriggersAdapter trait
304- // is async. It's actual async usage is done inside a manual `poll` implementation in `firehose_block_stream#poll_next`
305- // value. An upcoming improvement will be to remove this `poll_next`. Once the refactor occurs, this should be
306- // removed and TriggersAdapter::triggers_in_block should be use straight.
307- fn firehose_triggers_in_block (
308- & self ,
309- block : & codec:: Block ,
310- _filter : & TriggerFilter ,
311- ) -> Result < BlockWithTriggers < Chain > , FirehoseError > {
312- // TODO: Find the best place to introduce an `Arc` and avoid this clone.
313- let block = Arc :: new ( block. clone ( ) ) ;
314-
315- // Filter non-successful or non-action receipts.
316- let receipts = block. shards . iter ( ) . flat_map ( |shard| {
317- shard
318- . receipt_execution_outcomes
319- . iter ( )
320- . filter_map ( |outcome| {
321- if !outcome
322- . execution_outcome
323- . as_ref ( ) ?
324- . outcome
325- . as_ref ( ) ?
326- . status
327- . as_ref ( ) ?
328- . is_success ( )
329- {
330- return None ;
331- }
332- if !matches ! (
333- outcome. receipt. as_ref( ) ?. receipt,
334- Some ( codec:: receipt:: Receipt :: Action ( _) )
335- ) {
336- return None ;
337- }
338-
339- Some ( trigger:: ReceiptWithOutcome {
340- outcome : outcome. execution_outcome . as_ref ( ) ?. clone ( ) ,
341- receipt : outcome. receipt . as_ref ( ) ?. clone ( ) ,
342- block : block. cheap_clone ( ) ,
343- } )
344- } )
345- } ) ;
346-
347- let mut trigger_data: Vec < _ > = receipts
348- . map ( |r| NearTrigger :: Receipt ( Arc :: new ( r) ) )
349- . collect ( ) ;
350-
351- trigger_data. push ( NearTrigger :: Block ( block. cheap_clone ( ) ) ) ;
352-
353- // TODO: `block` should probably be an `Arc` in `BlockWithTriggers` to avoid this clone.
354- Ok ( BlockWithTriggers :: new ( block. as_ref ( ) . clone ( ) , trigger_data) )
355- }
356- }
357-
358339pub struct IngestorAdapter {
359340 logger : Logger ,
360341}
0 commit comments