-
Notifications
You must be signed in to change notification settings - Fork 820
SOLR-18189: Skip indexing duplicate docs via a content hash URP #4263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
15 commits
Select commit
Hold shift + click to select a range
cd914b7
SOLR-18189: Improve tracking of duplicate Solr docs with content hash
fhuaulme-sfdc 5277652
* lint fixes
fhuaulme-sfdc cd3b510
* java lint fixes
fhuaulme-sfdc f4fe9f2
(review)
fhuaulme-sfdc 566f109
(review)
fhuaulme-sfdc d3a4235
Use BinaryField; remove base64 / StringField usage
dsmiley b248795
(review)
fhuaulme-sfdc 823315d
Rename replace addDocWithResponse with simply addDoc, modified to ret…
dsmiley 0bcd04f
rename hashFieldName -> hashField
dsmiley dbcff83
changelog
dsmiley cb328d8
(minor) rename "hashFieldName" to "hashField" in test configurations
fhuaulme-sfdc 80aa16b
Update SOLR-18189.yml
fhuaulme 4223cc5
@Override
dsmiley 9aab04c
precommit
dsmiley 03e5326
@Override
dsmiley File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
196 changes: 196 additions & 0 deletions
196
solr/core/src/java/org/apache/solr/update/processor/ContentHashVersionProcessor.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,196 @@ | ||
| package org.apache.solr.update.processor; | ||
|
|
||
| import org.apache.lucene.util.BytesRef; | ||
| import org.apache.solr.common.SolrException; | ||
| import org.apache.solr.common.SolrInputDocument; | ||
| import org.apache.solr.core.SolrCore; | ||
| import org.apache.solr.handler.component.RealTimeGetComponent; | ||
| import org.apache.solr.handler.component.RealTimeGetComponent.Resolution; | ||
| import org.apache.solr.request.SolrQueryRequest; | ||
| import org.apache.solr.response.SolrQueryResponse; | ||
| import org.apache.solr.schema.SchemaField; | ||
| import org.apache.solr.schema.TextField; | ||
| import org.apache.solr.update.AddUpdateCommand; | ||
| import org.apache.solr.update.UpdateCommand; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.io.IOException; | ||
| import java.lang.invoke.MethodHandles; | ||
| import java.util.Base64; | ||
| import java.util.Collection; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import java.util.function.Predicate; | ||
|
|
||
| /** | ||
| * An implementation of {@link UpdateRequestProcessor} which computes a hash of selected doc values, and uses this hash | ||
| * value to reject/accept doc updates. | ||
| * <ul> | ||
| * <li>When no corresponding doc with same id exists (create), computed hash is added to the document.</li> | ||
| * <li>When a previous doc exists (update), a new hash is computed using new version values and compared with old hash.</li> | ||
| * </ul> | ||
| * Depending on {#discardSameDocuments} value, this processor may reject or accept doc update. | ||
| * This implementation can be used for monitoring or rejecting no-op updates (updates that do not change Solr document). | ||
| * <p> | ||
| * Note: hash is computed using {@link Lookup3Signature}. | ||
| * </p> | ||
| * @see Lookup3Signature | ||
| */ | ||
| public class ContentHashVersionProcessor extends UpdateRequestProcessor { | ||
| private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); | ||
| private final SchemaField hashField; | ||
| private final SolrQueryResponse rsp; | ||
| private final SolrCore core; | ||
| private final Predicate<String> includedFields; // Matcher for included fields in hash | ||
| private final Predicate<String> excludedFields; // Matcher for excluded fields from hash | ||
| private OldDocProvider oldDocProvider = new DefaultDocProvider(); | ||
| private boolean discardSameDocuments; | ||
| private int sameCount = 0; | ||
| private int differentCount = 0; | ||
| private int unknownCount = 0; | ||
|
|
||
| public ContentHashVersionProcessor(Predicate<String> hashIncludedFields, Predicate<String> hashExcludedFields, String hashFieldName, SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { | ||
| super(next); | ||
| this.core = req.getCore(); | ||
| this.hashField = new SchemaField(hashFieldName, new TextField()); | ||
| this.rsp = rsp; | ||
| this.includedFields = hashIncludedFields; | ||
| this.excludedFields = hashExcludedFields; | ||
| } | ||
|
|
||
| public void processAdd(AddUpdateCommand cmd) throws IOException { | ||
| SolrInputDocument newDoc = cmd.getSolrInputDocument(); | ||
| String newHash = computeDocHash(newDoc); | ||
| newDoc.setField(hashField.getName(), newHash); | ||
| int i = 0; | ||
|
|
||
| if (!validateHash(cmd.getIndexedId(), newHash)) { | ||
| return; | ||
| } | ||
|
|
||
| while (true) { | ||
| logOverlyFailedRetries(i, cmd); | ||
| try { | ||
| super.processAdd(cmd); | ||
| return; | ||
| } catch (SolrException e) { | ||
| if (e.code() != 409) { | ||
| throw e; | ||
| } | ||
| ++i; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void finish() throws IOException { | ||
| try { | ||
| super.finish(); | ||
| } finally { | ||
| rsp.addToLog("numAddsExisting", sameCount + differentCount + unknownCount); | ||
| rsp.addToLog("numAddsExistingWithIdentical", sameCount); | ||
| rsp.addToLog("numAddsExistingUnknown", unknownCount); | ||
| } | ||
|
dsmiley marked this conversation as resolved.
|
||
| } | ||
|
|
||
| private static void logOverlyFailedRetries(int i, UpdateCommand cmd) { | ||
| if ((i & 255) == 255) { | ||
| log.warn("Unusual number of optimistic concurrency retries: retries={} cmd={}", i, cmd); | ||
| } | ||
| } | ||
|
|
||
| void setOldDocProvider(OldDocProvider oldDocProvider) { | ||
| this.oldDocProvider = oldDocProvider; | ||
| } | ||
|
|
||
| void setDiscardSameDocuments(boolean discardSameDocuments) { | ||
| this.discardSameDocuments = discardSameDocuments; | ||
| } | ||
|
|
||
| private boolean validateHash(BytesRef indexedDocId, String newHash) throws IOException { | ||
| assert null != indexedDocId; | ||
|
|
||
| var docFoundAndOldUserVersions = getOldUserVersionsFromStored(indexedDocId); | ||
| if (docFoundAndOldUserVersions.found) { | ||
| String oldHash = docFoundAndOldUserVersions.oldHash; // No hash: might want to keep track of these too | ||
| if (oldHash == null) { | ||
| unknownCount++; | ||
| return true; | ||
| } else if (Objects.equals(newHash, oldHash)) { | ||
| sameCount++; | ||
| return !discardSameDocuments; | ||
| } else { | ||
| differentCount++; | ||
| return true; | ||
| } | ||
| } | ||
| return true; // Doc not found | ||
| } | ||
|
|
||
| private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromStored(BytesRef indexedDocId) throws IOException { | ||
| SolrInputDocument oldDoc = oldDocProvider.getDocument(core, hashField.getName(), indexedDocId); | ||
| return null == oldDoc ? DocFoundAndOldUserAndSolrVersions.NOT_FOUND : getUserVersionAndSolrVersionFromDocument(oldDoc); | ||
| } | ||
|
|
||
| private DocFoundAndOldUserAndSolrVersions getUserVersionAndSolrVersionFromDocument(SolrInputDocument oldDoc) { | ||
| Object o = oldDoc.getFieldValue(hashField.getName()); | ||
| if (o != null) { | ||
| return new DocFoundAndOldUserAndSolrVersions(o.toString()); | ||
| } | ||
| return new DocFoundAndOldUserAndSolrVersions(); | ||
| } | ||
|
|
||
| public String computeDocHash(SolrInputDocument doc) { | ||
| List<String> docIncludedFieldNames = doc.getFieldNames().stream() | ||
| .filter(includedFields) // Keep fields that match 'included fields' matcher... | ||
| .filter(excludedFields.negate()) // ...and exclude fields that match 'excluded fields' matcher | ||
| .sorted() // Sort to ensure consistent field order across different doc field orders | ||
| .toList(); | ||
|
|
||
| final Signature sig = new Lookup3Signature(); | ||
| for (String fieldName : docIncludedFieldNames) { | ||
| sig.add(fieldName); | ||
| Object o = doc.getFieldValue(fieldName); | ||
| if (o instanceof Collection) { | ||
| for (Object oo : (Collection<?>) o) { | ||
| sig.add(String.valueOf(oo)); | ||
| } | ||
| } else { | ||
| sig.add(String.valueOf(o)); | ||
| } | ||
| } | ||
|
|
||
| // Signature, depending on implementation, may return 8-byte or 16-byte value | ||
| byte[] signature = sig.getSignature(); | ||
| return Base64.getEncoder().encodeToString(signature); // Makes a base64 hash out of signature value | ||
| } | ||
|
|
||
| interface OldDocProvider { | ||
| SolrInputDocument getDocument(SolrCore core, String hashField, BytesRef indexedDocId) throws IOException; | ||
| } | ||
|
|
||
| private static class DefaultDocProvider implements OldDocProvider { | ||
| @Override | ||
| public SolrInputDocument getDocument(SolrCore core, String hashField, BytesRef indexedDocId) throws IOException { | ||
| return RealTimeGetComponent.getInputDocument(core, indexedDocId, indexedDocId, null, Set.of(hashField), Resolution.PARTIAL); | ||
| } | ||
| } | ||
|
|
||
| private static class DocFoundAndOldUserAndSolrVersions { | ||
| private static final DocFoundAndOldUserAndSolrVersions NOT_FOUND = new DocFoundAndOldUserAndSolrVersions(); | ||
| private final boolean found; | ||
|
|
||
| public String oldHash; | ||
|
|
||
| private DocFoundAndOldUserAndSolrVersions() { | ||
| this.found = false; | ||
| } | ||
| private DocFoundAndOldUserAndSolrVersions(String oldHash) { | ||
| this.found = true; | ||
| this.oldHash = oldHash; | ||
| } | ||
|
|
||
| } | ||
| } | ||
144 changes: 144 additions & 0 deletions
144
solr/core/src/java/org/apache/solr/update/processor/ContentHashVersionProcessorFactory.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,144 @@ | ||
| package org.apache.solr.update.processor; | ||
|
|
||
| import org.apache.solr.common.SolrException; | ||
| import org.apache.solr.common.util.NamedList; | ||
| import org.apache.solr.common.util.StrUtils; | ||
| import org.apache.solr.core.SolrCore; | ||
| import org.apache.solr.request.SolrQueryRequest; | ||
| import org.apache.solr.response.SolrQueryResponse; | ||
| import org.apache.solr.util.plugin.SolrCoreAware; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.function.Predicate; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * Factory for {@link ContentHashVersionProcessor} instances. | ||
| */ | ||
| public class ContentHashVersionProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways { | ||
| private static final char SEPARATOR = ','; // Separator for included/excluded fields | ||
| static final String CONTENT_HASH_ENABLED_PARAM = "contentHashEnabled"; | ||
| private List<String> includeFields = Collections.singletonList("*"); // Included fields defaults to 'all' | ||
| private List<String> excludeFields = new ArrayList<>(); // No excluded field by default, yet hashFieldName is excluded by default | ||
| private String hashFieldName = "content_hash"; // Field name to store computed hash on create/update operations | ||
| private boolean discardSameDocuments = true; | ||
|
|
||
| public ContentHashVersionProcessorFactory() { | ||
| } | ||
|
|
||
| public void init(NamedList<?> args) { | ||
| Object tmp = args.remove("includeFields"); | ||
| if (tmp != null) { | ||
| if (!(tmp instanceof String)) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, | ||
| "'includeFields' must be configured as a <str>"); | ||
| } | ||
| // Include fields support comma separated list of fields (e.g. "field1,field2,field3"). | ||
| // Also supports "*" to include all fields | ||
| this.includeFields = StrUtils.splitSmart((String) tmp, SEPARATOR) | ||
| .stream() | ||
| .map(String::trim) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| tmp = args.remove("hashFieldName"); | ||
| if (tmp != null) { | ||
| if (!(tmp instanceof String)) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'hashFieldName' must be configured as a <str>"); | ||
| } | ||
| this.hashFieldName = (String) tmp; | ||
| } | ||
|
|
||
| tmp = args.remove("excludeFields"); | ||
| if (tmp != null) { | ||
| if (!(tmp instanceof String)) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'excludeFields' must be configured as a <str>"); | ||
| } | ||
| if ("*".equals(((String) tmp).trim())) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'excludeFields' can't exclude all fields."); | ||
| } | ||
| // Exclude fields support comma separated list of fields (e.g. "excluded_field1,excluded_field2"). | ||
| // Also supports "*" to exclude all fields | ||
| this.excludeFields = StrUtils.splitSmart((String) tmp, SEPARATOR) | ||
| .stream() | ||
| .map(String::trim) | ||
| .collect(Collectors.toList()); | ||
| } | ||
| excludeFields.add(hashFieldName); // Hash field name is excluded from hash computation | ||
|
|
||
| tmp = args.remove("hashCompareStrategy"); | ||
| if (tmp != null) { | ||
| if (!(tmp instanceof String)) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'hashCompareStrategy' must be configured as a <str>"); | ||
| } | ||
| String value = ((String) tmp).toLowerCase(); | ||
| if ("discard".equalsIgnoreCase(value)) { | ||
| discardSameDocuments = true; | ||
| } else if ("log".equalsIgnoreCase(value)) { | ||
| discardSameDocuments = false; | ||
| } else { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Value '" + value + "' is unsupported for 'hashCompareStrategy', only 'discard' and 'log' are supported."); | ||
| } | ||
| } | ||
|
|
||
| super.init(args); | ||
| } | ||
|
|
||
| public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { | ||
| if (!req.getParams().getBool(CONTENT_HASH_ENABLED_PARAM, false)) { | ||
| return next; | ||
| } | ||
|
|
||
| ContentHashVersionProcessor processor = new ContentHashVersionProcessor( | ||
| buildFieldMatcher(includeFields), | ||
| buildFieldMatcher(excludeFields), | ||
| hashFieldName, | ||
| req, | ||
| rsp, | ||
| next); | ||
| processor.setDiscardSameDocuments(discardSameDocuments); | ||
| return processor; | ||
| } | ||
|
|
||
| public void inform(SolrCore core) { | ||
| if (core.getLatestSchema().getUniqueKeyField() == null) { | ||
| throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "schema must have uniqueKey defined."); | ||
| } | ||
| } | ||
|
|
||
| public String getHashFieldName() { | ||
| return hashFieldName; | ||
| } | ||
|
|
||
| public List<String> getIncludeFields() { | ||
| return includeFields; | ||
| } | ||
|
|
||
| public List<String> getExcludeFields() { | ||
| return excludeFields; | ||
| } | ||
|
|
||
| public boolean discardSameDocuments() { | ||
| return discardSameDocuments; | ||
| } | ||
|
|
||
| static Predicate<String> buildFieldMatcher(List<String> fieldNames) { | ||
| return fieldName -> { | ||
| for (String currentFieldName : fieldNames) { | ||
| if ("*".equals(currentFieldName)) { | ||
| return true; | ||
| } | ||
| if (fieldName.equals(currentFieldName)) { | ||
| return true; | ||
| } | ||
| if (currentFieldName.length() > 1 | ||
| && currentFieldName.endsWith("*") | ||
| && fieldName.startsWith(currentFieldName.substring(0, currentFieldName.length() - 1))) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| }; | ||
| } | ||
| } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.