LongRecordingOrganizer#
- class neurodent.core.LongRecordingOrganizer(item: str | Path | list[str] | tuple[str] | DiscoveredFile, mode: Literal['si', 'mne', None] = 'si', truncate: bool | int = False, cache_policy: Literal['auto', 'always', 'force_regenerate'] = 'auto', multiprocess_mode: Literal['dask', 'serial'] = 'serial', extract_func: Callable[[...], si.BaseRecording] | Callable[[...], Raw] | str = None, manual_datetimes: datetime | list[datetime] = None, datetimes_are_start: bool = True, n_jobs: int = 1, recording: si.BaseRecording = None, **kwargs)[source]#
Bases:
objectConstruct a long recording from various file formats or an existing recording object.
- Parameters:
item (
str | Path | list[str] | DiscoveredFile | None) – Input data specification. - str/Path: Single file or directory path - list[str]: Multiple files to concatenate - DiscoveredFile: File(s) discovered by FileDiscoverer (single or multi-file) - None: Used when initializing from an existing recording objectmode (
Literal['si', 'mne', None], optional) – Data loading mode. Defaults to ‘si’. - ‘si’: Use SpikeInterface extractors - ‘mne’: Use MNE-Python extractors (creates intermediate file) - None: No data loading (item must be None, recording must be provided)truncate (
bool | int, optional) – If True, truncate to first 10 files. If an integer, truncate to first n files. Defaults to False.cache_policy (
Literal['auto', 'always', 'force_regenerate'], optional) – Cache policy for intermediate files. Defaults to ‘auto’.multiprocess_mode (
Literal['dask', 'serial'], optional) – Processing mode for parallel operations when loading multiple files. Defaults to ‘serial’.extract_func (
Callable | str, optional) – Function to extract data. - If str: name of SpikeInterface or MNE extractor (e.g., ‘read_intan’, ‘read_raw_edf’) - If Callable: custom extraction function - If None: defaults to si.load_extractor for SI modemanual_datetimes (
datetime | list[datetime], optional) – Manually provided timestamps.datetimes_are_start (
bool, optional) – If True (default), manual_datetimes are start times.n_jobs (
int, optional) – Number of parallel jobs for MNE resampling. Defaults to 1.recording (
si.BaseRecording, optional) – Existing SpikeInterface recording object for in-memory initialization. Use this when creating LRO wrappers around split recordings.**kwargs – Additional arguments passed to the data loading functions.
- Variables:
LongRecording (
si.BaseRecording) – The SpikeInterface recording object.meta (
RecordingMetadata) – Technical metadata (sampling rate, channels, etc.).channel_names (
list[str]) – List of channel names.file_durations (
list[float]) – Duration of each individual file in seconds.cumulative_file_durations (
list[float]) – Cumulative duration timestamps for file boundaries.temppaths (
list[str]) – Paths to temporary files created during processing.bad_channel_names (
list[str]) – List of channels identified as bad/noisy._is_in_memory (
bool) – True if this LRO was created from an in-memory recording (via split()).
- Raises:
ValueError – If no data files are found, if the folder contains mixed file types, or if manual time parameters are invalid.
- __init__(item: str | Path | list[str] | tuple[str] | DiscoveredFile, mode: Literal['si', 'mne', None] = 'si', truncate: bool | int = False, cache_policy: Literal['auto', 'always', 'force_regenerate'] = 'auto', multiprocess_mode: Literal['dask', 'serial'] = 'serial', extract_func: Callable[[...], si.BaseRecording] | Callable[[...], Raw] | str = None, manual_datetimes: datetime | list[datetime] = None, datetimes_are_start: bool = True, n_jobs: int = 1, recording: si.BaseRecording = None, **kwargs)[source]#
- property display_name: str#
Short display name for logging, derived from the item.
- detect_and_load_data(mode: Literal['si', 'mne', None] = 'si', cache_policy: Literal['auto', 'always', 'force_regenerate'] = 'auto', multiprocess_mode: Literal['dask', 'serial'] = 'serial', extract_func: Callable[[...], BaseRecording] | Callable[[...], Raw] | str | None = None, **kwargs)[source]#
Load in recording based on mode.
Parameters#
- mode{“si”, “mne”, None}
Backend to use for loading recordings.
- cache_policy{“auto”, “always”, “force_regenerate”}
Caching strategy for loaded recordings.
- multiprocess_mode{“dask”, “serial”}
Parallelism strategy.
- extract_funccallable or str, optional
Function (or reference to one) used to load each discovered file into a recording object. When a string, resolved in this order:
Short name — looked up in
spikeinterface.extractors/spikeinterface(formode="si") ormne.io(formode="mne"). Example:"read_intan".File path (contains
:) — loads a function directly from a Python file. The.pyextension is required. Example:"tests/integration/readers.py:read_bin_csv_pair"or"/absolute/path/to/readers.py:my_func".
- **kwargs
Forwarded to the backend loading method.
- convert_file_with_si_to_recording(extract_func: Callable[[...], BaseRecording], cache_policy: Literal['auto', 'always', 'force_regenerate'] = 'auto', multiprocess_mode: Literal['dask', 'serial'] = 'serial', **kwargs)[source]#
- convert_file_with_mne_to_recording(extract_func: Callable[[...], Raw], intermediate: Literal['edf', 'bin'] = 'edf', intermediate_name=None, intermediate_dir=None, cache_policy: Literal['auto', 'always', 'force_regenerate'] = 'auto', multiprocess_mode: Literal['dask', 'serial'] = 'serial', n_jobs: int | None = None, **kwargs)[source]#
- split(groups: dict[str, list[str]]) dict[str, LongRecordingOrganizer][source]#
Split the current recording into multiple in-memory LongRecordingOrganizer objects.
This creates lightweight LRO wrappers around channel-sliced views of the recording. No disk I/O is performed. Use
persist()on individual results to save them to disk if needed.- Parameters:
groups (
dict[str, list[str]]) – Dictionary mapping group names (e.g., ‘AnimalA’) to lists of channel names.- Returns:
Dictionary mapping group names to new LRO instances.
- Return type:
dict[str, LongRecordingOrganizer]
- Raises:
ValueError – If requested channels are not found in the recording.
ImportError – If SpikeInterface is not available.
Example
>>> lro = LongRecordingOrganizer("/path/to/data", mode="bin") >>> splits = lro.split({"AnimalA": ["Ch1", "Ch2"], "AnimalB": ["Ch3", "Ch4"]}) >>> splits["AnimalA"].persist("/output/AnimalA", format="zarr")
- persist(output_dir: str | Path, format: Literal['zarr', 'binary'] = 'zarr', n_jobs: int = 1, chunk_duration: str = '1s', progress_bar: bool = True, **kwargs) Path[source]#
Save this LRO’s recording to disk.
Delegates to SpikeInterface’s save() function.
- Parameters:
output_dir (
Union[str, Path]) – Directory to save the recording.format (
Literal["zarr", "binary"], optional) – Save format. Defaults to “zarr”.n_jobs (
int, optional) – Number of parallel jobs. Defaults to 1.chunk_duration (
str, optional) – Chunk duration for processing. Defaults to “1s”.progress_bar (
bool, optional) – Show progress bar. Defaults to True.**kwargs – Additional arguments passed to SI’s save().
- Returns:
The output directory where the recording was saved.
- Return type:
Path
- get_datetime_fragment(fragment_len_s, fragment_idx)[source]#
Get the datetime for a specific fragment using the timestamp mapper.
- Parameters:
fragment_len_s (
float) – Length of each fragment in secondsfragment_idx (
int) – Index of the fragment to get datetime for
- Returns:
The datetime corresponding to the start of the fragment
- Return type:
datetime
- Raises:
ValueError – If timestamp mapper is not initialized (only available in ‘bin’ mode)
- convert_to_mne() RawArray[source]#
Convert this LongRecording object to an MNE RawArray.
- Returns:
The converted MNE RawArray
- Return type:
mne.io.RawArray
- save_to_edf(filename: str | Path, overwrite: bool = False)[source]#
Save the recording to an EDF file via MNE.
- Parameters:
filename (
str | Path) – Path to save the EDF file to.overwrite (
bool) – Whether to overwrite if file exists.
- compute_bad_channels(lof_threshold: float | None = None, force_recompute: bool = False, lof_chunk_duration_s: float = 60)[source]#
Compute bad channels using LOF analysis with unified score storage.
- Parameters:
lof_threshold (
float, optional) – Threshold for determining bad channels from LOF scores. If None, only computes/loads scores without setting bad_channel_names.force_recompute (
bool) – Whether to recompute LOF scores even if they exist.lof_chunk_duration_s (
float) – Duration in seconds of each chunk used for the pairwise-distance computation in LOF. Defaults to 60.
- apply_lof_threshold(lof_threshold: float)[source]#
Apply threshold to existing LOF scores to determine bad channels.
- Parameters:
lof_threshold (
float) – Threshold for determining bad channels.
- get_lof_scores() dict[source]#
Get LOF scores with channel names.
- Returns:
Dictionary mapping channel names to LOF scores.
- Return type:
dict
- finalize_file_timestamps()[source]#
Finalize file timestamps using manual times if provided, otherwise validate CSV times.
- get_date_string() str[source]#
Get the string representation of the recording date (Start Time).
- Returns:
Date string in format “%b-%d-%Y” (e.g. “Jan-21-2022”).
- Return type:
str
- Raises:
ValueError – If no timestamps are available in the recording.
- merge(other_lro)[source]#
Merge another LRO into this one using si.concatenate_recordings.
This creates a new concatenated recording from this LRO and the other LRO. The other LRO should represent a later time period to maintain temporal order.
- Parameters:
other_lro (
LongRecordingOrganizer) – The LRO to merge into this one- Raises:
ValueError – If LROs are incompatible (different channels, sampling rates, etc.)
ImportError – If SpikeInterface is not available