Source code for voxelops.schemas.qsirecon
"""QSIRecon schemas: inputs, outputs, and defaults."""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
[docs]
@dataclass
class QSIReconInputs:
"""Required inputs for QSIRecon diffusion reconstruction.
Parameters
----------
qsiprep_dir : Path
QSIPrep output directory.
participant : str
Participant label (without 'sub-' prefix).
session : str, optional
Session label (without 'ses-' prefix), by default None.
When provided, ``--session-id`` is passed to qsirecon so that
only data from that specific session is processed. This enables
per-session reconstruction without re-running the full dataset.
Leave ``None`` for datasets that have no session structure.
output_dir : Optional[Path], optional
Output directory, by default None.
If None, defaults to qsiprep_dir/../qsirecon.
work_dir : Optional[Path], optional
Working directory, by default None.
If None, defaults to output_dir/../work/qsirecon.
recon_spec : Optional[Path], optional
Path to reconstruction spec YAML file, by default None.
datasets : Optional[dict[str, Path]], optional
Dictionary of dataset names and paths, by default None.
atlases : Optional[List[str]], optional
List of atlases for connectivity, by default None.
recon_spec_aux_files : Optional[Path], optional
Directory with auxiliary files referenced by the recon spec
(e.g. response functions for MRtrix3). The directory is mounted
into the container using its own basename as the mount point
(e.g. a local path ending in ``responses/`` becomes
``/responses`` inside the container). By default None.
"""
qsiprep_dir: Path
participant: str
session: str | None = None
atlases: list[str] = field(
default_factory=lambda: [
"4S156Parcels",
"4S256Parcels",
"4S356Parcels",
"4S456Parcels",
"4S556Parcels",
"4S656Parcels",
"4S756Parcels",
"4S856Parcels",
"4S956Parcels",
"4S1056Parcels",
"AICHA384Ext",
"Brainnetome246Ext",
"AAL116",
"Gordon333Ext",
]
)
output_dir: Path | None = None
work_dir: Path | None = None
recon_spec: Path | None = None
datasets: dict[str, Path] | None = None
recon_spec_aux_files: Path | None = None
force: bool = False
[docs]
def __post_init__(self):
"""Ensure paths are Path objects."""
self.qsiprep_dir = Path(self.qsiprep_dir)
if self.output_dir:
self.output_dir = Path(self.output_dir)
if self.work_dir:
self.work_dir = Path(self.work_dir)
if self.recon_spec:
self.recon_spec = Path(self.recon_spec)
if self.recon_spec_aux_files:
self.recon_spec_aux_files = Path(self.recon_spec_aux_files)
if self.datasets:
self.datasets = {k: Path(v) for k, v in self.datasets.items()}
[docs]
@dataclass
class QSIReconOutputs:
"""Expected outputs from QSIRecon.
Parameters
----------
qsirecon_dir : Path
QSIRecon output directory.
participant_dir : Path
Participant-specific directory.
workflow_reports : Dict[str, Dict[str, Path]]
Nested dictionary of HTML reports: {workflow_name: {session_id: html_path}}.
For datasets without sessions, session_id will be None.
work_dir : Path
Working directory.
"""
qsirecon_dir: Path
participant_dir: Path
workflow_reports: dict[str, dict[str | None, Path]]
work_dir: Path
[docs]
def exist(self) -> bool:
"""Check if key outputs exist.
Returns
-------
bool
True if all expected workflow HTML reports exist.
"""
# Check if at least the main output directory exists
if not self.qsirecon_dir.exists():
return False
# Check if all workflow reports exist
for workflow_reports in self.workflow_reports.values():
for html_path in workflow_reports.values():
if not html_path.exists():
return False
return True
[docs]
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for JSON serialization.
Returns
-------
Dict[str, any]
Dictionary with Path objects converted to strings.
"""
return {
"qsirecon_dir": str(self.qsirecon_dir),
"participant_dir": str(self.participant_dir),
"workflow_reports": {
workflow: {session: str(path) for session, path in sessions.items()}
for workflow, sessions in self.workflow_reports.items()
},
"work_dir": str(self.work_dir),
}
[docs]
@classmethod
def from_inputs(cls, inputs: QSIReconInputs, output_dir: Path, work_dir: Path):
"""Generate expected output paths from inputs.
Parameters
----------
inputs : QSIReconInputs
QSIReconInputs instance.
output_dir : Path
Resolved output directory (the qsirecon output directory).
work_dir : Path
Resolved work directory.
Returns
-------
QSIReconOutputs
QSIReconOutputs with expected paths.
"""
# output_dir is already the qsirecon output directory
qsirecon_dir = output_dir
participant_dir = qsirecon_dir / f"sub-{inputs.participant}"
# Discover sessions from qsiprep output
sessions = _discover_sessions(inputs.qsiprep_dir, inputs.participant)
# Extract workflow names from recon spec
workflows = (
_extract_workflows(inputs.recon_spec) if inputs.recon_spec else ["default"]
)
# Generate expected HTML reports for each workflow × session combination
workflow_reports = {}
for workflow_name in workflows:
workflow_reports[workflow_name] = {}
if sessions:
# Multi-session dataset
for session in sessions:
html_path = (
qsirecon_dir
/ "derivatives"
/ f"qsirecon-{workflow_name}"
/ f"sub-{inputs.participant}_ses-{session}.html"
)
workflow_reports[workflow_name][session] = html_path
else:
# Single-session dataset (no session subdirectories)
html_path = (
qsirecon_dir
/ "derivatives"
/ f"qsirecon-{workflow_name}"
/ f"sub-{inputs.participant}.html"
)
workflow_reports[workflow_name][None] = html_path
return cls(
qsirecon_dir=qsirecon_dir,
participant_dir=participant_dir,
workflow_reports=workflow_reports,
work_dir=work_dir,
)
[docs]
@dataclass
class QSIReconDefaults:
"""Default configuration for QSIRecon (brain bank standards).
Parameters
----------
nprocs : int, optional
Number of parallel processes, by default 8.
mem_gb : int, optional
Memory limit in GB, by default 16000.
atlases : List[str], optional
List of atlases for connectivity, by default a long list of atlases.
fs_subjects_dir : Optional[Path], optional
FreeSurfer subjects directory, by default None.
fs_license : Optional[Path], optional
Path to FreeSurfer license file, by default None.
docker_image : str, optional
Docker image to use, by default "pennlinc/qsirecon:latest".
force : bool, optional
Force re-run even if outputs exist, by default False.
"""
nprocs: int = 8
mem_mb: int = 16000
fs_subjects_dir: Path | None = None
fs_license: Path | None = None
docker_image: str = "pennlinc/qsirecon:1.2.0"
force: bool = False
[docs]
def __post_init__(self):
"""Ensure paths are Path objects if provided."""
if self.fs_subjects_dir:
self.fs_subjects_dir = Path(self.fs_subjects_dir)
if self.fs_license:
self.fs_license = Path(self.fs_license)
def _discover_sessions(qsiprep_dir: Path, participant: str) -> list[str]:
"""Discover session IDs from QSIPrep output directory.
Parameters
----------
qsiprep_dir : Path
QSIPrep output directory.
participant : str
Participant label (without 'sub-' prefix).
Returns
-------
List[str]
List of session IDs (without 'ses-' prefix), or empty list if no sessions.
"""
participant_dir = qsiprep_dir / f"sub-{participant}"
if not participant_dir.exists():
return []
# Look for session subdirectories
session_dirs = [
d for d in participant_dir.iterdir() if d.is_dir() and d.name.startswith("ses-")
]
if session_dirs:
# Multi-session dataset
return sorted([d.name.replace("ses-", "") for d in session_dirs])
else:
# Single-session dataset (no session subdirectories)
return []
def _extract_workflows(recon_spec_path: Path) -> list[str]:
"""Extract workflow suffixes from QSIRecon reconstruction spec YAML.
QSIRecon creates derivative directories based on the qsirecon_suffix
of each node, not the workflow name.
Parameters
----------
recon_spec_path : Path
Path to reconstruction spec YAML file.
Returns
-------
List[str]
List of qsirecon_suffix values from nodes in the spec.
"""
if not recon_spec_path or not recon_spec_path.exists():
return ["default"]
try:
with open(recon_spec_path) as f:
spec = yaml.safe_load(f)
suffixes = []
# Single workflow spec
if isinstance(spec, dict) and "nodes" in spec:
for node in spec["nodes"]:
if "qsirecon_suffix" in node:
suffixes.append(node["qsirecon_suffix"])
# Multiple workflows
elif isinstance(spec, list):
for workflow in spec:
if "nodes" in workflow:
for node in workflow["nodes"]:
if "qsirecon_suffix" in node:
suffixes.append(node["qsirecon_suffix"])
return suffixes if suffixes else ["default"]
except Exception:
# If we can't parse the spec, use default
return ["default"]