Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
"xchem_collate = dlstbx.wrapper.xchem_collate:XChemCollateWrapper",
"xia2 = dlstbx.wrapper.xia2:Xia2Wrapper",
"xia2.multiplex = dlstbx.wrapper.xia2_multiplex:Xia2MultiplexWrapper",
"xia2.multiplex_filtering = dlstbx.wrapper.xia2_multiplex_filtering:Xia2MultiplexFilteringWrapper",
"xia2.overload = dlstbx.wrapper.xia2_overload:Xia2OverloadWrapper",
"xia2.strategy = dlstbx.wrapper.xia2_strategy:Xia2StrategyWrapper",
"xia2.to_shelxcde = dlstbx.wrapper.xia2_to_shelxcde:Xia2toShelxcdeWrapper",
Expand Down
27 changes: 26 additions & 1 deletion src/dlstbx/services/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ class MultiplexParameters(pydantic.BaseModel):
diffraction_plan_info: Optional[DiffractionPlanInfo] = None
recipe: Optional[str] = None
use_clustering: Optional[List[str]] = None
use_filtering: List[str] = []
filtering_group_size: Dict[str, int] = pydantic.Field(
default={"default": 50}, alias="filtering-group-size"
)
beamline: str
trigger_every_collection: bool

Expand Down Expand Up @@ -2168,6 +2172,7 @@ def trigger_multiplex(
job_parameters.append(("sample_id", str(group.sample_id)))
else:
job_parameters.append(("sample_group_id", str(group.sample_group_id)))

if parameters.spacegroup:
job_parameters.append(("spacegroup", parameters.spacegroup))
if (
Expand All @@ -2187,6 +2192,24 @@ def trigger_multiplex(
("clustering.output_clusters", "true"),
]
)

# See if beamline is in list of allowed ones for filtering
# If so, add filtering parameters to job_parameters
# This will set the xia2.multiplex wrapper to send the job for filtering after completed

if parameters.beamline in parameters.use_filtering:
group_size = parameters.filtering_group_size.get(
parameters.beamline, parameters.filtering_group_size["default"]
)

job_parameters.extend(
[
("filtering.method", "deltacchalf"),
("deltacchalf.stdcutoff", "3"),
("deltacchalf.mode", "image_group"),
("deltacchalf.group_size", str(group_size)),
]
)
for k, v in job_parameters:
jpp = self.ispyb.mx_processing.get_job_parameter_params()
jpp["job_id"] = jobid
Expand All @@ -2202,7 +2225,9 @@ def trigger_multiplex(
message = {"recipes": [], "parameters": {"ispyb_process": jobid}}
rw.transport.send("processing_recipe", message)

self.log.info(f"xia2.multiplex trigger: Processing job {jobid} triggered")
self.log.info(
f"xia2.multiplex trigger: Processing job {jobid} triggered"
)

return {"success": True, "return_value": jobids}

Expand Down
45 changes: 43 additions & 2 deletions src/dlstbx/wrapper/xia2_multiplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ class Xia2MultiplexWrapper(Wrapper):
name = "xia2.multiplex"

def send_results_to_ispyb(
self, z, xtriage_results=None, cluster_num=None, attachments=[]
self,
z,
xtriage_results=None,
cluster_num=None,
attachments=[],
multiplex_filtering=False,
):
ispyb_command_list = []

Expand Down Expand Up @@ -138,6 +143,14 @@ def send_results_to_ispyb(
self.log.debug("Sending %s", str(ispyb_command_list))
self.recwrap.send_to("ispyb", {"ispyb_command_list": ispyb_command_list})

# After xia2.multiplex, xia2.multiplex_filtering can be optionally run to improve data reduction
# Currently not supported for clusters
# Only triggered if filtering parameters exist -> logic handled in trigger_multiplex

if not cluster_num and multiplex_filtering:
self.log.info("Triggering xia2.multiplex filtering.")
self.recwrap.send_to("filtering", True)

def construct_commandline(self, params):
"""Construct xia2.multiplex command line.
Takes job parameter dictionary, returns array."""
Expand All @@ -150,7 +163,15 @@ def construct_commandline(self, params):
command.append(f)

if params.get("ispyb_parameters"):
ignore = {"sample_id", "sample_group_id"}
# ignore filtering parameters for xia2.multiplex_filtering
ignore = {
"sample_id",
"sample_group_id",
"filtering.method",
"deltacchalf.stdcutoff",
"deltacchalf.mode",
"deltacchalf.group_size",
}
Comment on lines +166 to +174
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment here is misleadingly placed as it implies that sample_id and sample_group_id are flitering parameters.

It is also a bit clunky having to explicitly list parameters to ignore, though I appreciate this is due to how the multiplex job parameters are passed via the trigger service and might be hard to avoid.

translation = {
"d_min": "resolution.d_min",
"spacegroup": "symmetry.space_group",
Expand Down Expand Up @@ -279,6 +300,7 @@ def is_final_result(final_file: pathlib.Path) -> bool:
]

allfiles = []
cluster_count = 0

if success:
with multiplex_json.open("r") as fh:
Expand All @@ -291,6 +313,7 @@ def is_final_result(final_file: pathlib.Path) -> bool:
cluster_prefix = ""
cluster_num = None
elif "coordinate cluster" in dataset_name:
cluster_count += 1
cluster_num = dataset_name.split(" ")[-1]
cluster_prefix = f"coordinate_cluster_{cluster_num}_"
base_dir = working_directory / f"coordinate_cluster_{cluster_num}"
Expand Down Expand Up @@ -425,14 +448,32 @@ def is_final_result(final_file: pathlib.Path) -> bool:
self.log.info(
f"Triggering downstream recipe steps for dataset: '{dataset_name}'"
)

# Check if filtering parameters present -> if so, trigger filtering when sending results to ispyb
# Whether or not these are present is currently handled by the trigger service

filtering = False

if params.get("ispyb_parameters"):
if ("filtering.method", ["deltacchalf"]) in params[
"ispyb_parameters"
].items():
self.log.info(
"Additional filtering with xia2.multiplex_filtering selected."
)
filtering = True

self.send_results_to_ispyb(
ispyb_d,
xtriage_results=xtriage_results,
cluster_num=cluster_num,
attachments=attachments,
multiplex_filtering=filtering,
)

self._success_counter.inc()
if cluster_count >= 2:
self.recwrap.send_to("email", True)
else:
self._failure_counter.inc()

Expand Down
Loading