2323import java .util .ArrayList ;
2424import java .util .List ;
2525
26+ import org .apache .usergrid .corepersistence .util .CpNamingUtils ;
27+ import org .apache .usergrid .persistence .Schema ;
28+ import org .apache .usergrid .persistence .core .scope .ApplicationScope ;
29+ import org .apache .usergrid .persistence .graph .*;
30+ import org .apache .usergrid .persistence .graph .impl .SimpleSearchByEdge ;
2631import org .slf4j .Logger ;
2732import org .slf4j .LoggerFactory ;
2833
@@ -52,20 +57,27 @@ public class EntityLoadVerifyFilter extends AbstractFilter<FilterResult<Id>, Fil
5257 private static final Logger logger = LoggerFactory .getLogger ( EntityLoadVerifyFilter .class );
5358
5459 private final EntityCollectionManagerFactory entityCollectionManagerFactory ;
60+ private final GraphManagerFactory graphManagerFactory ;
61+ private final ReadRepairFig readRepairFig ;
5562
5663
5764 @ Inject
58- public EntityLoadVerifyFilter ( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
65+ public EntityLoadVerifyFilter ( final EntityCollectionManagerFactory entityCollectionManagerFactory ,
66+ final GraphManagerFactory graphManagerFactory ,
67+ final ReadRepairFig readRepairFig ) {
5968 this .entityCollectionManagerFactory = entityCollectionManagerFactory ;
69+ this .graphManagerFactory = graphManagerFactory ;
70+ this .readRepairFig = readRepairFig ;
6071 }
6172
6273
6374 @ Override
6475 public Observable <FilterResult <Entity >> call ( final Observable <FilterResult <Id >> filterResultObservable ) {
6576
6677
78+ final ApplicationScope applicationScope = pipelineContext .getApplicationScope ();
6779 final EntityCollectionManager entityCollectionManager =
68- entityCollectionManagerFactory .createCollectionManager ( pipelineContext . getApplicationScope () );
80+ entityCollectionManagerFactory .createCollectionManager ( applicationScope );
6981
7082 //it's more efficient to make 1 network hop to get everything, then drop our results if required
7183 final Observable <FilterResult <Entity >> entityObservable =
@@ -80,9 +92,10 @@ public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>>
8092 .flatMap ( ids -> entityCollectionManager .load ( ids ) );
8193
8294
83- //now we have a collection, validate our canidate set is correct.
84-
85- return entitySetObservable .map ( entitySet -> new EntityVerifier ( entitySet , bufferedIds ) )
95+ //now we have a collection, validate our candidate set is correct.
96+ GraphManager graphManager = graphManagerFactory .createEdgeManager (applicationScope );
97+ return entitySetObservable .map ( entitySet -> new EntityVerifier ( applicationScope , graphManager ,
98+ entitySet , bufferedIds , readRepairFig ) )
8699 .doOnNext ( entityCollector -> entityCollector .merge () ).flatMap (
87100 entityCollector -> Observable .from ( entityCollector .getResults () ) );
88101 } );
@@ -102,12 +115,20 @@ private static final class EntityVerifier {
102115
103116 private final List <FilterResult <Id >> candidateResults ;
104117 private final EntitySet entitySet ;
118+ private final GraphManager graphManager ;
119+ private final ApplicationScope applicationScope ;
120+ private final ReadRepairFig readRepairFig ;
105121
106122
107- public EntityVerifier ( final EntitySet entitySet , final List <FilterResult <Id >> candidateResults ) {
123+ public EntityVerifier ( final ApplicationScope applicationScope , final GraphManager graphManager ,
124+ final EntitySet entitySet , final List <FilterResult <Id >> candidateResults ,
125+ final ReadRepairFig readRepairFig ) {
126+ this .applicationScope = applicationScope ;
127+ this .graphManager = graphManager ;
108128 this .entitySet = entitySet ;
109129 this .candidateResults = candidateResults ;
110130 this .results = new ArrayList <>( entitySet .size () );
131+ this .readRepairFig = readRepairFig ;
111132 }
112133
113134
@@ -137,11 +158,42 @@ private void validate( final FilterResult<Id> filterResult ) {
137158
138159 //doesn't exist warn and drop
139160 if ( entity == null || !entity .getEntity ().isPresent () ) {
140- logger .warn ( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
141- + " Ignoring since this could be a region sync issue" , candidateId );
161+
162+ // look for orphaned edges
163+ String edgeTypeName = CpNamingUtils .getEdgeTypeFromCollectionName (Schema .defaultCollectionName (candidateId .getType ()));
164+ final SearchByEdge searchByEdge =
165+ new SimpleSearchByEdge ( applicationScope .getApplication (), edgeTypeName , candidateId , Long .MAX_VALUE , SearchByEdgeType .Order .DESCENDING ,
166+ Optional .absent () );
167+
168+ int edgesDeleted = 0 ;
169+ List <MarkedEdge > edgeList = graphManager .loadEdgeVersions (searchByEdge ).toList ().toBlocking ().last ();
170+ if (edgeList .size () > 0 ) {
171+ MarkedEdge firstEdge = edgeList .get (0 );
172+ long currentTimestamp = CpNamingUtils .createGraphOperationTimestamp ();
173+ long edgeTimestamp = firstEdge .getTimestamp ();
174+ long timestampDiff = currentTimestamp - edgeTimestamp ;
175+ long orphanDelaySecs = readRepairFig .getEdgeOrphanDelaySecs ();
176+ // timestamps are in 100 nanoseconds, convert from seconds
177+ long allowedDiff = orphanDelaySecs * 1000L * 1000L * 10L ;
178+ if (timestampDiff > allowedDiff ) {
179+ // edges must be orphans, delete edges
180+ for (MarkedEdge edge : edgeList ) {
181+ graphManager .markEdge (edge ).toBlocking ().lastOrDefault (null );
182+ edgesDeleted ++;
183+ }
184+ graphManager .deleteEdge (firstEdge ).toBlocking ().lastOrDefault (null );
185+ }
186+ }
187+
188+ if (edgesDeleted > 0 ) {
189+ logger .warn ("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
190+ + " Deleted at least {} edges." , candidateId , applicationScope .getApplication ().getUuid ().toString (), edgesDeleted );
191+ } else {
192+ logger .warn ("Read graph edge and received candidate with entityId {} (application {}), yet was not found in cassandra."
193+ + " Ignoring since this could be a region sync issue" , candidateId , applicationScope .getApplication ().getUuid ().toString ());
194+ }
142195
143196
144- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
145197
146198 return ;
147199 }
0 commit comments