"""QSIRecon diffusion reconstruction runner."""
import os
from pathlib import Path
from typing import Any
from voxelops.runners._base import (
_get_default_log_dir,
run_docker,
validate_input_dir,
validate_participant,
)
from voxelops.schemas.qsirecon import (
QSIReconDefaults,
QSIReconInputs,
QSIReconOutputs,
)
[docs]
def run_qsirecon(
inputs: QSIReconInputs,
config: QSIReconDefaults | None = None,
log_dir: Path | None = None,
**overrides,
) -> dict[str, Any]:
"""Run QSIRecon diffusion reconstruction and connectivity.
Parameters
----------
inputs : QSIReconInputs
Required inputs (qsiprep_dir, participant, etc.).
config : Optional[QSIReconDefaults], optional
Configuration (uses brain bank defaults if not provided), by default None.
log_dir : Path, optional
Directory for audit logs. Defaults to inputs.output_dir/logs.
**overrides
Override any config parameter.
Returns
-------
Dict[str, Any]
Execution record with:
- tool: "qsirecon"
- participant: Participant label
- command: Full Docker command executed
- exit_code: Process exit code
- start_time, end_time: ISO format timestamps
- duration_seconds, duration_human: Execution duration
- success: Boolean success status
- log_file: Path to JSON log
- inputs: QSIReconInputs instance
- config: QSIReconDefaults instance
- expected_outputs: QSIReconOutputs instance
Raises
------
InputValidationError
If inputs are invalid.
ProcedureExecutionError
If reconstruction fails.
Examples
--------
>>> inputs = QSIReconInputs(
... qsiprep_dir=Path("/data/derivatives/qsiprep"),
... participant="01",
... )
>>> result = run_qsirecon(inputs, atlases=["schaefer100"])
>>> print(result['expected_outputs'].qsirecon_dir)
"""
log_dir = log_dir or _get_default_log_dir(inputs)
# Use brain bank defaults if config not provided
config = config or QSIReconDefaults()
# Apply overrides
for key, value in overrides.items():
if hasattr(config, key):
setattr(config, key, value)
# Validate inputs
validate_input_dir(inputs.qsiprep_dir, "QSIPrep")
validate_participant(inputs.qsiprep_dir, inputs.participant)
# Setup directories
output_dir = inputs.output_dir or (inputs.qsiprep_dir.parent)
work_dir = inputs.work_dir or (output_dir.parent / "work" / "qsirecon")
output_dir.mkdir(parents=True, exist_ok=True)
work_dir.mkdir(parents=True, exist_ok=True)
# Generate expected outputs
expected_outputs = QSIReconOutputs.from_inputs(inputs, output_dir, work_dir)
# Check if outputs already exist and skip if not forcing
if expected_outputs.exist() and not config.force:
print("\n" + "=" * 80)
print(f"✓ QSIRecon outputs already exist for participant {inputs.participant}")
print(f" QSIRecon dir: {expected_outputs.qsirecon_dir}")
print(" Workflow reports:")
for workflow, sessions in expected_outputs.workflow_reports.items():
for session, path in sessions.items():
session_label = f"ses-{session}" if session else "no-session"
print(f" - {workflow}/{session_label}: {path}")
print(" Use force=True to re-run")
print("=" * 80 + "\n")
return {
"tool": "qsirecon",
"participant": inputs.participant,
"skipped": True,
"reason": "outputs_exist",
"success": True,
"inputs": inputs,
"config": config,
"expected_outputs": expected_outputs,
}
# Get current user/group IDs for Docker
uid = os.getuid()
gid = os.getgid()
# Build Docker command
cmd = [
"docker",
"run",
"-it",
"--rm",
"--user",
f"{uid}:{gid}",
"-v",
f"{inputs.qsiprep_dir}:/data:ro",
"-v",
f"{output_dir}:/out",
"-v",
f"{work_dir}:/work",
]
# Mount recon spec only when provided
if inputs.recon_spec and inputs.recon_spec.exists():
cmd.extend(["-v", f"{inputs.recon_spec}:/recon_spec.yaml:ro"])
# Add optional mounts
if config.fs_license and config.fs_license.exists():
cmd.extend(["-v", f"{config.fs_license}:/license.txt:ro"])
# Aux files: use the directory basename as the container mount point
# so that e.g. a local ".../responses" dir becomes "/responses" in-container
if inputs.recon_spec_aux_files and inputs.recon_spec_aux_files.exists():
aux_name = inputs.recon_spec_aux_files.name
cmd.extend(["-v", f"{inputs.recon_spec_aux_files}:/{aux_name}:ro"])
# FreeSurfer subjects directory
if config.fs_subjects_dir and config.fs_subjects_dir.exists():
cmd.extend(["-v", f"{config.fs_subjects_dir}:/fs_subjects_dir:ro"])
if inputs.datasets:
for name, path in inputs.datasets.items():
cmd.extend(["-v", f"{path}:/datasets/{name}:ro"])
# Container image
cmd.append(config.docker_image)
# QSIRecon positional arguments
cmd.extend(
[
"/data",
"/out",
"participant",
"--participant-label",
inputs.participant,
]
)
# Per-session reconstruction
if inputs.session:
cmd.extend(["--session-id", inputs.session])
cmd.extend(
[
"--nprocs",
str(config.nprocs),
"--mem-mb",
str(config.mem_mb),
"--work-dir",
"/work",
]
)
# Datasets
if inputs.datasets:
cmd.extend(["--datasets"])
for name in inputs.datasets.keys():
cmd.extend([f"{name}=/datasets/{name}"])
# Atlases (optional - some recon specs may not require them)
if inputs.atlases:
cmd.extend(["--atlases", *inputs.atlases])
# Recon spec and aux files
if inputs.recon_spec and inputs.recon_spec.exists():
cmd.extend(["--recon-spec", "/recon_spec.yaml"])
if inputs.recon_spec_aux_files and inputs.recon_spec_aux_files.exists():
aux_name = inputs.recon_spec_aux_files.name
cmd.extend(["--recon-spec-aux-files", f"/{aux_name}"])
# FreeSurfer integration
if config.fs_subjects_dir and config.fs_subjects_dir.exists():
cmd.extend(["--fs-subjects-dir", "/fs_subjects_dir"])
if config.fs_license and config.fs_license.exists():
cmd.extend(["--fs-license-file", "/license.txt"])
# Execute
result = run_docker(
cmd=cmd,
tool_name="qsirecon",
participant=inputs.participant,
session=inputs.session,
log_dir=log_dir,
)
# Add inputs, config, and expected outputs to result
result["inputs"] = inputs
result["config"] = config
result["expected_outputs"] = expected_outputs
result["skipped"] = False
return result