Source code for neuroconv.tools.spikeinterface.spikeinterface

import uuid
import warnings
from collections import defaultdict
from typing import Any, Literal, Optional, Union

import numpy as np
import psutil
import pynwb
from hdmf.data_utils import AbstractDataChunkIterator
from pydantic import FilePath
from spikeinterface import BaseRecording, BaseSorting, SortingAnalyzer

from .spikeinterfacerecordingdatachunkiterator import (
    SpikeInterfaceRecordingDataChunkIterator,
)
from ..nwb_helpers import get_module, make_or_load_nwbfile
from ...utils import (
    DeepDict,
    calculate_regular_series_rate,
    dict_deep_update,
)
from ...utils.str_utils import human_readable_size


def _get_nwb_metadata(recording: BaseRecording, metadata: dict = None):
    """
    Return default metadata for all recording fields.

    Parameters
    ----------
    recording: spikeinterface.BaseRecording
    metadata: dict
        metadata info for constructing the nwb file (optional).
    """
    metadata = dict(
        NWBFile=dict(
            session_description="Auto-generated by NwbRecordingExtractor without description.",
            identifier=str(uuid.uuid4()),
        ),
        Ecephys=dict(
            Device=[dict(name="Device", description="Ecephys probe. Automatically generated.")],
            ElectrodeGroup=[
                dict(name=str(group_name), description="no description", location="unknown", device="Device")
                for group_name in np.unique(recording.get_channel_groups())
            ],
        ),
    )
    return metadata


[docs]def add_devices_to_nwbfile(nwbfile: pynwb.NWBFile, metadata: Optional[DeepDict] = None): """ Add device information to nwbfile object. Will always ensure nwbfile has at least one device, but multiple devices within the metadata list will also be created. Parameters ---------- nwbfile: NWBFile nwb file to which the recording information is to be added metadata: DeepDict metadata info for constructing the nwb file (optional). Should be of the format:: metadata['Ecephys']['Device'] = [ { 'name': my_name, 'description': my_description }, ... ] Missing keys in an element of metadata['Ecephys']['Device'] will be auto-populated with defaults. """ if nwbfile is not None: assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile" # Default Device metadata defaults = dict(name="Device", description="Ecephys probe. Automatically generated.") if metadata is None: metadata = dict() if "Ecephys" not in metadata: metadata["Ecephys"] = dict() if "Device" not in metadata["Ecephys"]: metadata["Ecephys"]["Device"] = [defaults] for device_metadata in metadata["Ecephys"]["Device"]: if device_metadata.get("name", defaults["name"]) not in nwbfile.devices: device_kwargs = dict(defaults, **device_metadata) nwbfile.create_device(**device_kwargs)
[docs]def add_electrode_groups_to_nwbfile(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: Optional[dict] = None): """ Add electrode group information to nwbfile object. Will always ensure nwbfile has at least one electrode group. Will auto-generate a linked device if the specified name does not exist in the nwbfile. Parameters ---------- recording: spikeinterface.BaseRecording nwbfile: pynwb.NWBFile nwb file to which the recording information is to be added metadata: dict metadata info for constructing the nwb file (optional). Should be of the format:: metadata['Ecephys']['ElectrodeGroup'] = [ { 'name': my_name, 'description': my_description, 'location': electrode_location, 'device': my_device_name }, ... ] Missing keys in an element of ``metadata['Ecephys']['ElectrodeGroup']`` will be auto-populated with defaults. Group names set by RecordingExtractor channel properties will also be included with passed metadata, but will only use default description and location. """ assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile" if metadata is None: metadata = dict() if "Ecephys" not in metadata: metadata["Ecephys"] = dict() add_devices_to_nwbfile(nwbfile=nwbfile, metadata=metadata) group_names = _get_group_name(recording=recording) defaults = [ dict( name=group_name, description="no description", location="unknown", device=[i.name for i in nwbfile.devices.values()][0], ) for group_name in group_names ] if "ElectrodeGroup" not in metadata["Ecephys"]: metadata["Ecephys"]["ElectrodeGroup"] = defaults assert all( [isinstance(x, dict) for x in metadata["Ecephys"]["ElectrodeGroup"]] ), "Expected metadata['Ecephys']['ElectrodeGroup'] to be a list of dictionaries!" for group_metadata in metadata["Ecephys"]["ElectrodeGroup"]: if group_metadata.get("name", defaults[0]["name"]) not in nwbfile.electrode_groups: device_name = group_metadata.get("device", defaults[0]["device"]) if device_name not in nwbfile.devices: new_device_metadata = dict(Ecephys=dict(Device=[dict(name=device_name)])) add_devices_to_nwbfile(nwbfile=nwbfile, metadata=new_device_metadata) warnings.warn( f"Device '{device_name}' not detected in " "attempted link to electrode group! Automatically generating." ) electrode_group_kwargs = dict(defaults[0], **group_metadata) electrode_group_kwargs.update(device=nwbfile.devices[device_name]) nwbfile.create_electrode_group(**electrode_group_kwargs) # TODO: Check this, probably not necessary if not nwbfile.electrode_groups: device_name = list(nwbfile.devices.keys())[0] device = nwbfile.devices[device_name] if len(nwbfile.devices) > 1: warnings.warn( "More than one device found when adding electrode group " f"via channel properties: using device '{device_name}'. To use a " "different device, indicate it the metadata argument." ) electrode_group_kwargs = dict(defaults[0]) electrode_group_kwargs.update(device=device) for group_name in np.unique(recording.get_channel_groups()).tolist(): electrode_group_kwargs.update(name=str(group_name)) nwbfile.create_electrode_group(**electrode_group_kwargs)
def _get_channel_name(recording: BaseRecording) -> np.ndarray: """ Extract the canonical `channel_name` from the recording, which will be written in the electrodes table. Parameters ---------- recording : BaseRecording The recording object from which to extract the channel names. Returns ------- np.ndarray An array containing the channel names. If the `channel_name` property is not available, the channel IDs as strings will be returned. """ # That uses either the `channel_name` property or the channel ids as string otherwise. channel_names = recording.get_property("channel_name") if channel_names is None: channel_names = recording.get_channel_ids().astype("str", copy=False) return channel_names def _get_group_name(recording: BaseRecording) -> np.ndarray: """ Extract the canonical `group_name` from the recording, which will be written in the electrodes table. Parameters ---------- recording : BaseRecording The recording object from which to extract the group names. Returns ------- np.ndarray An array containing the group names. If the `group_name` property is not available, the channel groups will be returned. If the group names are empty, a default value 'ElectrodeGroup' will be used. Raises ------ ValueError If the number of unique group names doesn't match the number of unique groups, or if the mapping between group names and group numbers is inconsistent. """ default_value = "ElectrodeGroup" group_names = recording.get_property("group_name") groups = recording.get_channel_groups() if group_names is None: group_names = groups if group_names is None: group_names = np.full(recording.get_num_channels(), fill_value=default_value) # Always ensure group_names are strings group_names = group_names.astype("str", copy=False) # If for any reason the group names are empty, fill them with the default group_names[group_names == ""] = default_value # Validate group names against groups if groups is not None: unique_groups = set(groups) unique_names = set(group_names) if len(unique_names) != len(unique_groups): raise ValueError("The number of group names must match the number of groups") # Check consistency of group name to group number mapping group_to_name_map = {} for group, name in zip(groups, group_names): if group in group_to_name_map: if group_to_name_map[group] != name: raise ValueError("Inconsistent mapping between group numbers and group names") else: group_to_name_map[group] = name return group_names def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> list[str]: """ Generate a list of global identifiers for channels in the electrode table of an NWB file. These identifiers are used to map electrodes across writing operations. Parameters ---------- nwbfile : pynwb.NWBFile The NWB file from which to extract the electrode table information. Returns ------- list[str] A list of unique keys, each representing a combination of channel name and group name from the electrodes table. If the electrodes table or the necessary columns are not present, an empty list is returned. """ if nwbfile.electrodes is None: return [] if "channel_name" not in nwbfile.electrodes.colnames or "group_name" not in nwbfile.electrodes.colnames: return [] channel_names = nwbfile.electrodes["channel_name"][:] group_names = nwbfile.electrodes["group_name"][:] unique_keys = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)] return unique_keys def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> list[int]: """ Get the indices of the electrodes in the NWBFile that correspond to the channels in the recording. This function matches the `channel_name` and `group_name` from the recording to the global identifiers in the NWBFile's electrodes table, returning the indices of these matching electrodes. Parameters ---------- recording : BaseRecording The recording object from which to extract channel and group names. nwbfile : pynwb.NWBFile The NWBFile containing the electrodes table to search for matches. Returns ------- list[int] A list of indices corresponding to the positions in the NWBFile's electrodes table that match the channels in the recording. """ channel_names = _get_channel_name(recording=recording) group_names = _get_group_name(recording=recording) channel_global_ids = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)] table_global_ids = _get_electrodes_table_global_ids(nwbfile=nwbfile) electrode_table_indices = [table_global_ids.index(ch_id) for ch_id in channel_global_ids] return electrode_table_indices def _get_null_value_for_property(property: str, sample_data: Any, null_values_for_properties: dict[str, Any]) -> Any: """ Retrieve the null value for a given property based on its data type or a provided mapping. Also performs type checking to ensure the default value matches the type of the existing data. Parameters ---------- sample_data : Any The sample data for which the default value is being determined. This can be of any data type. null_values_for_properties : dict of str to Any A dictionary mapping properties to their respective default values. If a property is not found in this dictionary, a sensible default value based on the type of `sample_data` will be used. Returns ------- Any The default value for the specified property. The type of the default value will match the type of `sample_data` or the type specified in `null_values_for_properties`. Raises ------ ValueError If a sensible default value cannot be determined for the given property and data type, or if the type of the provided default value does not match the type of the existing data. """ type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", float: np.nan, complex: np.nan} # Check for numpy scalar types sample_data = sample_data.item() if isinstance(sample_data, np.generic) else sample_data default_value = null_values_for_properties.get(property, None) if default_value is None: sample_data_type = type(sample_data) default_value = type_to_default_value.get(sample_data_type, None) if default_value is None: error_msg = ( f"Could not find a sensible default value for property '{property}' of type {sample_data_type} \n" "This can be fixed by by modifying the recording property or setting a sensible default value " "using the `add_electrodes` function argument `null_values_for_properties` as in: \n" "null_values_for_properties = {{property}': default_value}" ) raise ValueError(error_msg) if type(default_value) != sample_data_type: error_msg = ( f"Default value for property '{property}' in null_values_for_properties dict has a " f"different type {type(default_value)} than the currently existing data type {sample_data_type}. \n" "Modify the recording property or the default value to match" ) raise ValueError(error_msg) return default_value
[docs]def add_electrodes_to_nwbfile( recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: Optional[dict] = None, exclude: tuple = (), null_values_for_properties: Optional[dict] = None, ): """ Build an electrode_table from the recording information and it to the nwbfile object. Parameters ---------- recording: spikeinterface.BaseRecording nwbfile: NWBFile nwb file to which the recording information is to be added metadata: dict metadata info for constructing the nwb file (optional). Should be of the format:: metadata['Ecephys']['Electrodes'] = [ { 'name': my_name, 'description': my_description }, ... ] Note that data intended to be added to the electrodes table of the NWBFile should be set as channel properties in the RecordingExtractor object. Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults whenever possible. If 'my_name' is set to one of the required fields for nwbfile electrodes (id, x, y, z, imp, location, filtering, group_name), then the metadata will override their default values. Setting 'my_name' to metadata field 'group' is not supported as the linking to nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case. If no group information is passed via metadata, automatic linking to existing electrode groups, possibly including the default, will occur. exclude: tuple An iterable containing the string names of channel properties in the RecordingExtractor object to ignore when writing to the NWBFile. """ assert isinstance( nwbfile, pynwb.NWBFile ), f"'nwbfile' should be of type pynwb.NWBFile but is of type {type(nwbfile)}" null_values_for_properties = dict() if null_values_for_properties is None else null_values_for_properties # Test that metadata has the expected structure electrodes_metadata = list() if metadata is not None: electrodes_metadata = metadata.get("Ecephys", dict()).get("Electrodes", list()) required_keys = {"name", "description"} assert all( [isinstance(property, dict) and set(property.keys()) == required_keys for property in electrodes_metadata] ), ( "Expected metadata['Ecephys']['Electrodes'] to be a list of dictionaries, " "containing the keys 'name' and 'description'" ) assert all( [property["name"] != "group" for property in electrodes_metadata] ), "The recording property 'group' is not allowed; please use 'group_name' instead!" # Transform to a dict that maps property name to its description property_descriptions = dict() for property in electrodes_metadata: property_descriptions[property["name"]] = property["description"] # 1. Build columns details from extractor properties: dict(name: dict(description='',data=data, index=False)) data_to_add = dict() recording_properties = recording.get_property_keys() spikeinterface_special_cases = [ "offset_to_uV", # Written in the ElectricalSeries "gain_to_uV", # Written in the ElectricalSeries "contact_vector", # Structured array representing the probe "channel_name", # We handle this here with _get_channel_name "channel_names", # Some formats from neo also have this property, skip it "group_name", # We handle this here _get_group_name "group", # We handle this here with _get_group_name ] excluded_properties = list(exclude) + spikeinterface_special_cases properties_to_extract = [property for property in recording_properties if property not in excluded_properties] for property in properties_to_extract: data = np.asarray(recording.get_property(property)).copy() # Do not modify properties of the recording index = isinstance(data[0], (list, np.ndarray, tuple)) if index and isinstance(data[0], np.ndarray): index = data[0].ndim # Fill with provided custom descriptions description = property_descriptions.get(property, "no description") data_to_add[property] = dict(description=description, data=data, index=index) # Special cases properties channel_names = _get_channel_name(recording=recording) data_to_add["channel_name"] = dict(description="unique channel reference", data=channel_names, index=False) group_names = _get_group_name(recording=recording) data_to_add["group_name"] = dict(description="group_name", data=group_names, index=False) # Location in spikeinterface is equivalent to rel_x, rel_y, rel_z in the nwb standard if "location" in data_to_add: data = data_to_add["location"]["data"] column_number_to_property = {0: "rel_x", 1: "rel_y", 2: "rel_z"} for column_number in range(data.shape[1]): property = column_number_to_property[column_number] data_to_add[property] = dict(description=property, data=data[:, column_number], index=False) data_to_add.pop("location") # In the electrode table location is the brain area of spikeinterface if "brain_area" in data_to_add: data_to_add["location"] = data_to_add["brain_area"] data_to_add["location"].update(description="location") data_to_add.pop("brain_area") else: # This is a required property and needs a default value data = np.full(recording.get_num_channels(), fill_value="unknown") data_to_add["location"] = dict(description="location", data=data, index=False) # Add missing groups to the nwb file groupless_names = [group_name for group_name in group_names if group_name not in nwbfile.electrode_groups] if len(groupless_names) > 0: electrode_group_list = [dict(name=group_name) for group_name in groupless_names] missing_group_metadata = dict(Ecephys=dict(ElectrodeGroup=electrode_group_list)) add_electrode_groups_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=missing_group_metadata) group_list = [nwbfile.electrode_groups[group_name] for group_name in group_names] data_to_add["group"] = dict(description="the ElectrodeGroup object", data=group_list, index=False) schema_properties = {"group", "group_name", "location"} properties_to_add = set(data_to_add) electrode_table_previous_properties = set(nwbfile.electrodes.colnames) if nwbfile.electrodes else set() # The schema properties are always added by rows because they are required properties_to_add_by_rows = schema_properties.union(electrode_table_previous_properties) properties_to_add_by_columns = properties_to_add.difference(properties_to_add_by_rows) # Properties that were added before require null values to add by rows if data is missing properties_requiring_null_values = electrode_table_previous_properties.difference(properties_to_add) nul_values_for_rows = dict() for property in properties_requiring_null_values: sample_data = nwbfile.electrodes[property][:][0] null_value = _get_null_value_for_property( property=property, sample_data=sample_data, null_values_for_properties=null_values_for_properties, ) nul_values_for_rows[property] = null_value # We only add new electrodes to the table existing_global_ids = _get_electrodes_table_global_ids(nwbfile=nwbfile) channel_global_ids = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)] channel_indices_to_add = [index for index, key in enumerate(channel_global_ids) if key not in existing_global_ids] properties_with_data = properties_to_add_by_rows.intersection(data_to_add) for channel_index in channel_indices_to_add: electrode_kwargs = nul_values_for_rows data_dict = {property: data_to_add[property]["data"][channel_index] for property in properties_with_data} electrode_kwargs.update(**data_dict) nwbfile.add_electrode(**electrode_kwargs, enforce_unique_id=True) # The channel_name column as we use channel_name, group_name as a unique identifier # We fill previously inexistent values with the electrode table ids electrode_table_size = len(nwbfile.electrodes.id[:]) previous_table_size = electrode_table_size - recording.get_num_channels() if "channel_name" in properties_to_add_by_columns: cols_args = data_to_add["channel_name"] data = cols_args["data"] previous_ids = nwbfile.electrodes.id[:previous_table_size] default_value = np.array(previous_ids).astype("str") extended_data = np.hstack([default_value, data]) cols_args["data"] = extended_data nwbfile.add_electrode_column("channel_name", **cols_args) all_indices = np.arange(electrode_table_size) indices_for_new_data = _get_electrode_table_indices_for_recording(recording=recording, nwbfile=nwbfile) indices_for_null_values = [index for index in all_indices if index not in indices_for_new_data] extending_column = len(indices_for_null_values) > 0 # Add properties as columns for property in properties_to_add_by_columns - {"channel_name"}: cols_args = data_to_add[property] data = cols_args["data"] # This is the simple case, early return if not extending_column: nwbfile.add_electrode_column(property, **cols_args) continue adding_ragged_array = cols_args["index"] if not adding_ragged_array: sample_data = data[0] dtype = data.dtype extended_data = np.empty(shape=electrode_table_size, dtype=dtype) extended_data[indices_for_new_data] = data null_value = _get_null_value_for_property( property=property, sample_data=sample_data, null_values_for_properties=null_values_for_properties, ) extended_data[indices_for_null_values] = null_value else: dtype = np.ndarray extended_data = np.empty(shape=electrode_table_size, dtype=dtype) for index, value in enumerate(data): index_in_extended_data = indices_for_new_data[index] index_in_extended_data = indices_for_new_data[index] extended_data[index_in_extended_data] = value.tolist() for index in indices_for_null_values: null_value = [] extended_data[index] = null_value cols_args["data"] = extended_data nwbfile.add_electrode_column(property, **cols_args)
[docs]def check_if_recording_traces_fit_into_memory(recording: BaseRecording, segment_index: int = 0) -> None: """ Raises an error if the full traces of a recording extractor are larger than psutil.virtual_memory().available. Parameters ---------- recording : spikeinterface.BaseRecording A recording extractor object from spikeinterface. segment_index : int, optional The segment index of the recording extractor object, by default 0 Raises ------ MemoryError """ element_size_in_bytes = recording.get_dtype().itemsize num_channels = recording.get_num_channels() num_frames = recording.get_num_frames(segment_index=segment_index) traces_size_in_bytes = element_size_in_bytes * num_channels * num_frames available_memory_in_bytes = psutil.virtual_memory().available if traces_size_in_bytes > available_memory_in_bytes: message = ( f"Memory error, full electrical series is {human_readable_size(traces_size_in_bytes, binary=True)} but only" f" {human_readable_size(available_memory_in_bytes, binary=True)} are available. Use iterator_type='V2'" ) raise MemoryError(message)
def _recording_traces_to_hdmf_iterator( recording: BaseRecording, segment_index: int = None, return_scaled: bool = False, iterator_type: Optional[str] = "v2", iterator_opts: dict = None, ) -> AbstractDataChunkIterator: """Function to wrap traces of spikeinterface recording into an AbstractDataChunkIterator. Parameters ---------- recording : spikeinterface.BaseRecording A recording extractor from spikeinterface segment_index : int, optional The recording segment to add to the NWBFile. return_scaled : bool, defaults to False When True recording extractor objects from spikeinterface return their traces in microvolts. iterator_type: {"v2", None}, default: 'v2' The type of DataChunkIterator to use. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the iterator. See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator for the full list of options. Returns ------- traces_as_iterator: AbstractDataChunkIterator The traces of the recording extractor wrapped in an iterator object. Raises ------ ValueError If the iterator_type is not 'v2' or None. """ supported_iterator_types = ["v2", None] if iterator_type not in supported_iterator_types: message = f"iterator_type {iterator_type} should be either 'v1', 'v2' (recommended) or None" raise ValueError(message) iterator_opts = dict() if iterator_opts is None else iterator_opts if iterator_type is None: check_if_recording_traces_fit_into_memory(recording=recording, segment_index=segment_index) traces_as_iterator = recording.get_traces(return_scaled=return_scaled, segment_index=segment_index) elif iterator_type == "v2": traces_as_iterator = SpikeInterfaceRecordingDataChunkIterator( recording=recording, segment_index=segment_index, return_scaled=return_scaled, **iterator_opts, ) else: raise ValueError("iterator_type must be None or 'v2'.") return traces_as_iterator def _report_variable_offset(channel_offsets, channel_ids): """ Helper function to report variable offsets per channel IDs. Groups the different available offsets per channel IDs and raises a ValueError. """ # Group the different offsets per channel IDs offset_to_channel_ids = {} for offset, channel_id in zip(channel_offsets, channel_ids): offset = offset.item() if isinstance(offset, np.generic) else offset channel_id = channel_id.item() if isinstance(channel_id, np.generic) else channel_id if offset not in offset_to_channel_ids: offset_to_channel_ids[offset] = [] offset_to_channel_ids[offset].append(channel_id) # Create a user-friendly message message_lines = ["Recording extractors with heterogeneous offsets are not supported."] message_lines.append("Multiple offsets were found per channel IDs:") for offset, ids in offset_to_channel_ids.items(): message_lines.append(f" Offset {offset}: Channel IDs {ids}") message = "\n".join(message_lines) raise ValueError(message)
[docs]def add_electrical_series_to_nwbfile( recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None, segment_index: int = 0, starting_time: Optional[float] = None, write_as: Literal["raw", "processed", "lfp"] = "raw", es_key: str = None, write_scaled: bool = False, iterator_type: Optional[str] = "v2", iterator_opts: Optional[dict] = None, always_write_timestamps: bool = False, ): """ Adds traces from recording object as ElectricalSeries to an NWBFile object. Parameters ---------- recording : SpikeInterfaceRecording A recording extractor from spikeinterface nwbfile : NWBFile nwb file to which the recording information is to be added metadata : dict, optional metadata info for constructing the nwb file. Should be of the format:: metadata['Ecephys']['ElectricalSeries'] = dict( name=my_name, description=my_description ) segment_index : int, default: 0 The recording segment to add to the NWBFile. starting_time : float, optional Sets the starting time of the ElectricalSeries to a manually set value. write_as : {'raw', 'processed', 'lfp'} How to save the traces data in the nwb file. Options: - 'raw': save it in acquisition - 'processed': save it as FilteredEphys, in a processing module - 'lfp': save it as LFP, in a processing module es_key : str, optional Key in metadata dictionary containing metadata info for the specific electrical series write_scaled : bool, default: False If True, writes the traces in uV with the right conversion. If False , the data is stored as it is and the right conversions factors are added to the nwbfile. iterator_type: {"v2", None}, default: 'v2' The type of DataChunkIterator to use. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the iterator. See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator for the full list of options. always_write_timestamps : bool, default: False Set to True to always write timestamps. By default (False), the function checks if the timestamps are uniformly sampled, and if so, stores the data using a regular sampling rate instead of explicit timestamps. If set to True, timestamps will be written explicitly, regardless of whether the sampling rate is uniform. Notes ----- Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults whenever possible. """ if starting_time is not None: warnings.warn( "The 'starting_time' parameter is deprecated and will be removed in June 2025. " "Use the time alignment methods or set the recording times directlyfor modifying the starting time or timestamps " "of the data if needed: " "https://neuroconv.readthedocs.io/en/main/user_guide/temporal_alignment.html", DeprecationWarning, stacklevel=2, ) assert write_as in [ "raw", "processed", "lfp", ], f"'write_as' should be 'raw', 'processed' or 'lfp', but instead received value {write_as}" modality_signature = write_as.upper() if write_as == "lfp" else write_as.capitalize() default_name = f"ElectricalSeries{modality_signature}" default_description = dict(raw="Raw acquired data", lfp="Processed data - LFP", processed="Processed data") eseries_kwargs = dict(name=default_name, description=default_description[write_as]) # Select and/or create module if lfp or processed data is to be stored. if write_as in ["lfp", "processed"]: ecephys_mod = get_module( nwbfile=nwbfile, name="ecephys", description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.", ) if write_as == "lfp" and "LFP" not in ecephys_mod.data_interfaces: ecephys_mod.add(pynwb.ecephys.LFP(name="LFP")) if write_as == "processed" and "Processed" not in ecephys_mod.data_interfaces: ecephys_mod.add(pynwb.ecephys.FilteredEphys(name="Processed")) if metadata is not None and "Ecephys" in metadata and es_key is not None: assert es_key in metadata["Ecephys"], f"metadata['Ecephys'] dictionary does not contain key '{es_key}'" eseries_kwargs.update(metadata["Ecephys"][es_key]) # If the recording extractor has more than 1 segment, append numbers to the names so that the names are unique. # 0-pad these names based on the number of segments. # If there are 10 segments use 2 digits, if there are 100 segments use 3 digits, etc. if recording.get_num_segments() > 1: width = int(np.ceil(np.log10((recording.get_num_segments())))) eseries_kwargs["name"] += f"{segment_index:0{width}}" # The add_electrodes adds a column with channel name to the electrode table. add_electrodes_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata) # Create a region for the electrodes table electrode_table_indices = _get_electrode_table_indices_for_recording(recording=recording, nwbfile=nwbfile) electrode_table_region = nwbfile.create_electrode_table_region( region=electrode_table_indices, description="electrode_table_region", ) eseries_kwargs.update(electrodes=electrode_table_region) # Spikeinterface guarantees data in micro volts when return_scaled=True. This multiplies by gain and adds offsets # In nwb to get traces in Volts we take data*channel_conversion*conversion + offset channel_conversion = recording.get_channel_gains() channel_offsets = recording.get_channel_offsets() unique_channel_conversion = np.unique(channel_conversion) unique_channel_conversion = unique_channel_conversion[0] if len(unique_channel_conversion) == 1 else None unique_offset = np.unique(channel_offsets) if unique_offset.size > 1: channel_ids = recording.get_channel_ids() # This prints a user friendly error where the user is provided with a map from offset to channels _report_variable_offset(channel_offsets, channel_ids) unique_offset = unique_offset[0] if unique_offset[0] is not None else 0 micro_to_volts_conversion_factor = 1e-6 if not write_scaled and unique_channel_conversion is None: eseries_kwargs.update(conversion=micro_to_volts_conversion_factor) eseries_kwargs.update(channel_conversion=channel_conversion) elif not write_scaled and unique_channel_conversion is not None: eseries_kwargs.update(conversion=unique_channel_conversion * micro_to_volts_conversion_factor) if not write_scaled: eseries_kwargs.update(offset=unique_offset * micro_to_volts_conversion_factor) # Iterator ephys_data_iterator = _recording_traces_to_hdmf_iterator( recording=recording, segment_index=segment_index, iterator_type=iterator_type, iterator_opts=iterator_opts, ) eseries_kwargs.update(data=ephys_data_iterator) starting_time = starting_time if starting_time is not None else 0 if always_write_timestamps: timestamps = recording.get_times(segment_index=segment_index) shifted_timestamps = starting_time + timestamps eseries_kwargs.update(timestamps=shifted_timestamps) else: # By default we write the rate if the timestamps are regular recording_has_timestamps = recording.has_time_vector(segment_index=segment_index) if recording_has_timestamps: timestamps = recording.get_times(segment_index=segment_index) rate = calculate_regular_series_rate(series=timestamps) # Returns None if it is not regular recording_t_start = timestamps[0] else: rate = recording.get_sampling_frequency() recording_t_start = recording._recording_segments[segment_index].t_start or 0 # Shift timestamps if starting_time is set if rate: starting_time = float(starting_time + recording_t_start) # Note that we call the sampling frequency again because the estimated rate might be different from the # sampling frequency of the recording extractor by some epsilon. eseries_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency()) else: shifted_timestamps = starting_time + timestamps eseries_kwargs.update(timestamps=shifted_timestamps) # Create ElectricalSeries object and add it to nwbfile es = pynwb.ecephys.ElectricalSeries(**eseries_kwargs) if write_as == "raw": nwbfile.add_acquisition(es) elif write_as == "processed": ecephys_mod.data_interfaces["Processed"].add_electrical_series(es) elif write_as == "lfp": ecephys_mod.data_interfaces["LFP"].add_electrical_series(es)
[docs]def add_electrodes_info_to_nwbfile(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None): """ Add device, electrode_groups, and electrodes info to the nwbfile. Parameters ---------- recording : SpikeInterfaceRecording nwbfile : NWBFile NWB file to which the recording information is to be added metadata : dict, optional metadata info for constructing the nwb file. Should be of the format:: metadata['Ecephys']['Electrodes'] = [ { 'name': my_name, 'description': my_description }, ... ] Note that data intended to be added to the electrodes table of the ``NWBFile`` should be set as channel properties in the ``RecordingExtractor`` object. Missing keys in an element of ``metadata['Ecephys']['ElectrodeGroup']`` will be auto-populated with defaults whenever possible. If ``'my_name'`` is set to one of the required fields for nwbfile electrodes (id, x, y, z, imp, location, filtering, group_name), then the metadata will override their default values. Setting ``'my_name'`` to metadata field ``'group'`` is not supported as the linking to ``nwbfile.electrode_groups`` is handled automatically; please specify the string ``'group_name'`` in this case. If no group information is passed via metadata, automatic linking to existing electrode groups, possibly including the default, will occur. """ add_devices_to_nwbfile(nwbfile=nwbfile, metadata=metadata) add_electrode_groups_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata) add_electrodes_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata)
[docs]def add_recording_to_nwbfile( recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: Optional[dict] = None, starting_time: Optional[float] = None, write_as: Literal["raw", "processed", "lfp"] = "raw", es_key: Optional[str] = None, write_electrical_series: bool = True, write_scaled: bool = False, iterator_type: str = "v2", iterator_opts: Optional[dict] = None, always_write_timestamps: bool = False, ): """ Add traces from a recording object as an ElectricalSeries to an NWBFile object. Also adds device, electrode_groups, and electrodes to the NWBFile. Parameters ---------- recording : BaseRecording A recording extractor from SpikeInterface. nwbfile : pynwb.NWBFile The NWBFile object to which the recording information is to be added. metadata : dict, optional Metadata information for constructing the NWBFile. This should include: - metadata['Ecephys']['ElectricalSeries'] : dict Dictionary with metadata for the ElectricalSeries, such as: - name : str Name of the ElectricalSeries. - description : str Description of the ElectricalSeries. starting_time : float, optional Manually set the starting time of the ElectricalSeries. If not provided, the starting time is taken from the recording extractor. write_as : {'raw', 'processed', 'lfp'}, default='raw' Specifies how to save the trace data in the NWB file. Options are: - 'raw': Save the data in the acquisition group. - 'processed': Save the data as FilteredEphys in a processing module. - 'lfp': Save the data as LFP in a processing module. es_key : str, optional Key in the metadata dictionary containing metadata information for the specific ElectricalSeries. write_electrical_series : bool, default=True If True, writes the ElectricalSeries to the NWBFile. If False, no ElectricalSeries is written. write_scaled : bool, default=False If True, writes the traces in microvolts (uV) with the appropriate conversion. If False, the data is stored as-is, and the correct conversion factors are added to the NWBFile. iterator_type : {'v2', None}, default='v2' The type of DataChunkIterator to use when writing data in chunks. Options are: - 'v2': The SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. - None: Write the TimeSeries with no memory chunking. iterator_opts : dict, optional Dictionary of options for the iterator. Refer to the documentation at https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator for a full list of available options. always_write_timestamps : bool, default: False Set to True to always write timestamps. By default (False), the function checks if the timestamps are uniformly sampled, and if so, stores the data using a regular sampling rate instead of explicit timestamps. If set to True, timestamps will be written explicitly, regardless of whether the sampling rate is uniform. Notes ----- Missing keys in an element of `metadata['Ecephys']['ElectrodeGroup']` will be auto-populated with defaults whenever possible. Ensure that the provided metadata dictionary is correctly structured to avoid unintended behavior. """ if hasattr(recording, "nwb_metadata"): metadata = dict_deep_update(recording.nwb_metadata, metadata) elif metadata is None: metadata = _get_nwb_metadata(recording=recording) add_electrodes_info_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata) if write_electrical_series: number_of_segments = recording.get_num_segments() for segment_index in range(number_of_segments): add_electrical_series_to_nwbfile( recording=recording, nwbfile=nwbfile, segment_index=segment_index, starting_time=starting_time, metadata=metadata, write_as=write_as, es_key=es_key, write_scaled=write_scaled, iterator_type=iterator_type, iterator_opts=iterator_opts, always_write_timestamps=always_write_timestamps, )
[docs]def write_recording_to_nwbfile( recording: BaseRecording, nwbfile_path: Optional[FilePath] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = False, starting_time: Optional[float] = None, write_as: Optional[str] = "raw", es_key: Optional[str] = None, write_electrical_series: bool = True, write_scaled: bool = False, iterator_type: Optional[str] = "v2", iterator_opts: Optional[dict] = None, ) -> pynwb.NWBFile: """ Primary method for writing a RecordingExtractor object to an NWBFile. Parameters ---------- recording : spikeinterface.BaseRecording nwbfile_path : FilePath, optional Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling:: write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional metadata info for constructing the nwb file (optional). Should be of the format:: metadata['Ecephys'] = { 'Device': [ { 'name': my_name, 'description': my_description }, ... ] 'ElectrodeGroup': [ { 'name': my_name, 'description': my_description, 'location': electrode_location, 'device': my_device_name }, ... ] 'Electrodes': [ { 'name': my_name, 'description': my_description }, ... ] 'ElectricalSeries' = { 'name': my_name, 'description': my_description } Note that data intended to be added to the electrodes table of the NWBFile should be set as channel properties in the RecordingExtractor object. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. verbose : bool, default: False If 'nwbfile_path' is specified, informs user after a successful write operation. starting_time : float, optional Sets the starting time of the ElectricalSeries to a manually set value. write_as: {'raw', 'processed', 'lfp'}, optional How to save the traces data in the nwb file. - 'raw' will save it in acquisition - 'processed' will save it as FilteredEphys, in a processing module - 'lfp' will save it as LFP, in a processing module es_key: str, optional Key in metadata dictionary containing metadata info for the specific electrical series write_electrical_series: bool, default: True If True, electrical series are written in acquisition. If False, only device, electrode_groups, and electrodes are written to NWB. write_scaled: bool, default: True If True, writes the scaled traces (return_scaled=True) iterator_type: {"v2", None} The type of DataChunkIterator to use. 'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking. None: write the TimeSeries with no memory chunking. iterator_opts: dict, optional Dictionary of options for the RecordingExtractorDataChunkIterator (iterator_type='v2'). Valid options are: * buffer_gb : float, default: 1.0 In units of GB. Recommended to be as much free RAM as available. Automatically calculates suitable buffer shape. * buffer_shape : tuple, optional Manual specification of buffer shape to return on each iteration. Must be a multiple of chunk_shape along each axis. Cannot be set if `buffer_gb` is specified. * chunk_mb : float. default: 1.0 Should be below 1 MB. Automatically calculates suitable chunk shape. * chunk_shape : tuple, optional Manual specification of the internal chunk shape for the HDF5 dataset. Cannot be set if `chunk_mb` is also specified. * display_progress : bool, default: False Display a progress bar with iteration rate and estimated completion time. * progress_bar_options : dict, optional Dictionary of keyword arguments to be passed directly to tqdm. See https://github.com/tqdm/tqdm#parameters for options. """ with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: add_recording_to_nwbfile( recording=recording, nwbfile=nwbfile_out, starting_time=starting_time, metadata=metadata, write_as=write_as, es_key=es_key, write_electrical_series=write_electrical_series, write_scaled=write_scaled, iterator_type=iterator_type, iterator_opts=iterator_opts, ) return nwbfile_out
[docs]def add_units_table_to_nwbfile( sorting: BaseSorting, nwbfile: pynwb.NWBFile, unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[list[str]] = None, units_table_name: str = "units", unit_table_description: Optional[str] = None, write_in_processing_module: bool = False, waveform_means: Optional[np.ndarray] = None, waveform_sds: Optional[np.ndarray] = None, unit_electrode_indices: Optional[list[list[int]]] = None, null_values_for_properties: Optional[dict] = None, ): """ Add sorting data to a NWBFile object as a Units table. This function extracts unit properties from a SortingExtractor object and writes them to an NWBFile Units table, either in the primary units interface or the processing module (for intermediate/historical data). It handles unit selection, property customization, waveform data, and electrode mapping. Parameters ---------- sorting : spikeinterface.BaseSorting The SortingExtractor object containing unit data. nwbfile : pynwb.NWBFile The NWBFile object to write the unit data into. unit_ids : list of int or str, optional The specific unit IDs to write. If None, all units are written. property_descriptions : dict, optional Custom descriptions for unit properties. Keys should match property names in `sorting`, and values will be used as descriptions in the Units table. skip_properties : list of str, optional Unit properties to exclude from writing. units_table_name : str, default: 'units' Name of the Units table. Must be 'units' if `write_in_processing_module` is False. unit_table_description : str, optional Description for the Units table (e.g., sorting method, curation details). write_in_processing_module : bool, default: False If True, write to the processing module (intermediate data). If False, write to the primary NWBFile.units table. waveform_means : np.ndarray, optional Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels). waveform_sds : np.ndarray, optional Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels). unit_electrode_indices : list of lists of int, optional For each unit, a list of electrode indices corresponding to waveform data. unit_electrode_indices : list of lists of int, optional A list of lists of integers indicating the indices of the electrodes that each unit is associated with. The length of the list must match the number of units in the sorting extractor. """ unit_table_description = unit_table_description or "Autogenerated by neuroconv." assert isinstance( nwbfile, pynwb.NWBFile ), f"'nwbfile' should be of type pynwb.NWBFile but is of type {type(nwbfile)}" if unit_electrode_indices is not None: electrodes_table = nwbfile.electrodes if electrodes_table is None: raise ValueError( "Electrodes table is required to map units to electrodes. Add an electrode table to the NWBFile first." ) null_values_for_properties = dict() if null_values_for_properties is None else null_values_for_properties if not write_in_processing_module and units_table_name != "units": raise ValueError("When writing to the nwbfile.units table, the name of the table must be 'units'!") if write_in_processing_module: ecephys_mod = get_module( nwbfile=nwbfile, name="ecephys", description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.", ) write_table_first_time = units_table_name not in ecephys_mod.data_interfaces if write_table_first_time: units_table = pynwb.misc.Units(name=units_table_name, description=unit_table_description) ecephys_mod.add(units_table) units_table = ecephys_mod[units_table_name] else: write_table_first_time = nwbfile.units is None if write_table_first_time: nwbfile.units = pynwb.misc.Units(name="units", description=unit_table_description) units_table = nwbfile.units default_descriptions = dict( isi_violation="Quality metric that measures the ISI violation ratio as a proxy for the purity of the unit.", firing_rate="Number of spikes per unit of time.", template="The extracellular average waveform.", max_channel="The recording channel id with the largest amplitude.", halfwidth="The full-width half maximum of the negative peak computed on the maximum channel.", peak_to_valley="The duration between the negative and the positive peaks computed on the maximum channel.", snr="The signal-to-noise ratio of the unit.", quality="Quality of the unit as defined by phy (good, mua, noise).", spike_amplitude="Average amplitude of peaks detected on the channel.", spike_rate="Average rate of peaks detected on the channel.", unit_name="Unique reference for each unit.", ) if property_descriptions is None: property_descriptions = dict() if skip_properties is None: skip_properties = list() property_descriptions = dict(default_descriptions, **property_descriptions) data_to_add = defaultdict(dict) sorting_properties = sorting.get_property_keys() excluded_properties = list(skip_properties) + ["contact_vector"] properties_to_extract = [property for property in sorting_properties if property not in excluded_properties] if unit_ids is not None: sorting = sorting.select_units(unit_ids=unit_ids) if unit_electrode_indices is not None: unit_electrode_indices = np.array(unit_electrode_indices)[sorting.ids_to_indices(unit_ids)] unit_ids = sorting.unit_ids # Extract properties for property in properties_to_extract: data = sorting.get_property(property) index = isinstance(data[0], (list, np.ndarray, tuple)) if index and isinstance(data[0], np.ndarray): index = data[0].ndim description = property_descriptions.get(property, "No description.") data_to_add[property].update(description=description, data=data, index=index) if property in ["max_channel", "max_electrode"] and nwbfile.electrodes is not None: data_to_add[property].update(table=nwbfile.electrodes) # Unit name logic if "unit_name" in data_to_add: # if 'unit_name' is set as a property, it is used to override default unit_ids (and "id") unit_name_array = data_to_add["unit_name"]["data"] else: unit_name_array = unit_ids.astype("str", copy=False) data_to_add["unit_name"].update(description="Unique reference for each unit.", data=unit_name_array) units_table_previous_properties = set(units_table.colnames).difference({"spike_times"}) properties_to_add = set(data_to_add) properties_to_add_by_rows = units_table_previous_properties.union({"id"}) properties_to_add_by_columns = properties_to_add - properties_to_add_by_rows # Properties that were added before require null values to add by rows if data is missing properties_requiring_null_values = units_table_previous_properties.difference(properties_to_add) null_values_for_row = {} for property in properties_requiring_null_values - {"electrodes"}: # TODO, fix electrodes sample_data = units_table[property][:][0] null_value = _get_null_value_for_property( property=property, sample_data=sample_data, null_values_for_properties=null_values_for_properties, ) null_values_for_row[property] = null_value # Special case null_values_for_row["id"] = None # Add data by rows excluding the rows with previously added unit names unit_names_used_previously = [] if "unit_name" in units_table_previous_properties: unit_names_used_previously = units_table["unit_name"].data has_electrodes_column = "electrodes" in units_table.colnames properties_with_data = {property for property in properties_to_add_by_rows if "data" in data_to_add[property]} rows_in_data = [index for index in range(sorting.get_num_units())] if not has_electrodes_column: rows_to_add = [index for index in rows_in_data if unit_name_array[index] not in unit_names_used_previously] else: rows_to_add = [] for index in rows_in_data: if unit_name_array[index] not in unit_names_used_previously: rows_to_add.append(index) else: unit_name = unit_name_array[index] previous_electrodes = units_table[np.where(units_table["unit_name"][:] == unit_name)[0]].electrodes if list(previous_electrodes.values[0]) != list(unit_electrode_indices[index]): rows_to_add.append(index) for row in rows_to_add: unit_kwargs = null_values_for_row for property in properties_with_data: unit_kwargs[property] = data_to_add[property]["data"][row] spike_times = [] # Extract and concatenate the spike times from multiple segments for segment_index in range(sorting.get_num_segments()): segment_spike_times = sorting.get_unit_spike_train( unit_id=unit_ids[row], segment_index=segment_index, return_times=True ) spike_times.append(segment_spike_times) spike_times = np.concatenate(spike_times) if waveform_means is not None: unit_kwargs["waveform_mean"] = waveform_means[row] if waveform_sds is not None: unit_kwargs["waveform_sd"] = waveform_sds[row] if unit_electrode_indices is not None: unit_kwargs["electrodes"] = unit_electrode_indices[row] units_table.add_unit(spike_times=spike_times, **unit_kwargs, enforce_unique_id=True) # Add unit_name as a column and fill previously existing rows with unit_name equal to str(ids) unit_table_size = len(units_table.id[:]) previous_table_size = len(units_table.id[:]) - len(unit_name_array) if "unit_name" in properties_to_add_by_columns: cols_args = data_to_add["unit_name"] data = cols_args["data"] previous_ids = units_table.id[:previous_table_size] default_value = np.array(previous_ids).astype("str") extended_data = np.hstack([default_value, data]) cols_args["data"] = extended_data units_table.add_column("unit_name", **cols_args) # Build a channel name to electrode table index map table_df = units_table.to_dataframe().reset_index() unit_name_to_electrode_index = { unit_name: table_df.query(f"unit_name=='{unit_name}'").index[0] for unit_name in unit_name_array } indices_for_new_data = [unit_name_to_electrode_index[unit_name] for unit_name in unit_name_array] indices_for_null_values = table_df.index.difference(indices_for_new_data).values extending_column = len(indices_for_null_values) > 0 # Add properties as columns for property in properties_to_add_by_columns - {"unit_name"}: cols_args = data_to_add[property] data = cols_args["data"] # This is the simple case, early return if not extending_column: units_table.add_column(property, **cols_args) continue # Extending the columns is done differently for ragged arrays adding_ragged_array = cols_args["index"] if not adding_ragged_array: sample_data = data[0] dtype = data.dtype extended_data = np.empty(shape=unit_table_size, dtype=dtype) extended_data[indices_for_new_data] = data null_value = _get_null_value_for_property( property=property, sample_data=sample_data, null_values_for_properties=null_values_for_properties, ) extended_data[indices_for_null_values] = null_value else: dtype = np.ndarray extended_data = np.empty(shape=unit_table_size, dtype=dtype) for index, value in enumerate(data): index_in_extended_data = indices_for_new_data[index] extended_data[index_in_extended_data] = value.tolist() for index in indices_for_null_values: null_value = [] extended_data[index] = null_value # Add the data cols_args["data"] = extended_data units_table.add_column(property, **cols_args)
[docs]def add_sorting_to_nwbfile( sorting: BaseSorting, nwbfile: Optional[pynwb.NWBFile] = None, unit_ids: Optional[Union[list[str], list[int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", waveform_means: Optional[np.ndarray] = None, waveform_sds: Optional[np.ndarray] = None, unit_electrode_indices: Optional[list[list[int]]] = None, ): """Add sorting data (units and their properties) to an NWBFile. This function serves as a convenient wrapper around `add_units_table` to match Spikeinterface's `SortingExtractor` Parameters ---------- sorting : BaseSorting The SortingExtractor object containing unit data. nwbfile : pynwb.NWBFile, optional The NWBFile object to write the unit data into. unit_ids : list of int or str, optional The specific unit IDs to write. If None, all units are written. property_descriptions : dict, optional Custom descriptions for unit properties. Keys should match property names in `sorting`, and values will be used as descriptions in the Units table. skip_properties : list of str, optional Unit properties to exclude from writing. write_as : {'units', 'processing'}, default: 'units' Where to write the unit data: - 'units': Write to the primary NWBFile.units table. - 'processing': Write to the processing module (intermediate data). units_name : str, default: 'units' Name of the Units table. Must be 'units' if `write_as` is 'units'. units_description : str, optional Description for the Units table (e.g., sorting method, curation details). waveform_means : np.ndarray, optional Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels). waveform_sds : np.ndarray, optional Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels). unit_electrode_indices : list of lists of int, optional A list of lists of integers indicating the indices of the electrodes that each unit is associated with. The length of the list must match the number of units in the sorting extractor. """ assert write_as in [ "units", "processing", ], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!" write_in_processing_module = False if write_as == "units" else True add_units_table_to_nwbfile( sorting=sorting, unit_ids=unit_ids, nwbfile=nwbfile, property_descriptions=property_descriptions, skip_properties=skip_properties, write_in_processing_module=write_in_processing_module, units_table_name=units_name, unit_table_description=units_description, waveform_means=waveform_means, waveform_sds=waveform_sds, unit_electrode_indices=unit_electrode_indices, )
[docs]def write_sorting_to_nwbfile( sorting: BaseSorting, nwbfile_path: Optional[FilePath] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, verbose: bool = False, unit_ids: Optional[list[Union[str, int]]] = None, property_descriptions: Optional[dict] = None, skip_properties: Optional[list[str]] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", waveform_means: Optional[np.ndarray] = None, waveform_sds: Optional[np.ndarray] = None, unit_electrode_indices=None, ): """ Primary method for writing a SortingExtractor object to an NWBFile. Parameters ---------- sorting : spikeinterface.BaseSorting nwbfile_path : FilePath, optional Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling:: write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. The default is False (append mode). verbose : bool, default: False If 'nwbfile_path' is specified, informs user after a successful write operation. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. property_descriptions : dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties : list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. write_as : {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' waveform_means : np.ndarray, optional Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels). waveform_sds : np.ndarray, optional Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels). unit_electrode_indices : list of lists of int, optional For each unit, a list of electrode indices corresponding to waveform data. """ with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: add_sorting_to_nwbfile( sorting=sorting, nwbfile=nwbfile_out, unit_ids=unit_ids, property_descriptions=property_descriptions, skip_properties=skip_properties, write_as=write_as, units_name=units_name, units_description=units_description, waveform_means=waveform_means, waveform_sds=waveform_sds, unit_electrode_indices=unit_electrode_indices, )
[docs]def add_sorting_analyzer_to_nwbfile( sorting_analyzer: SortingAnalyzer, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, recording: Optional[BaseRecording] = None, unit_ids: Optional[Union[list[str], list[int]]] = None, skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): """ Convenience function to write directly a sorting analyzer object to an nwbfile. The function adds the data of the recording and the sorting plus the following information from the sorting analyzer: - quality metrics - template mean and std - template metrics Parameters ---------- sorting_analyzer : spikeinterface.SortingAnalyzer The sorting analyzer object to be written to the NWBFile. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling:: write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. The "Ecephys" section of metadata is also used to create electrodes and electrical series fields. recording : BaseRecording, optional If the sorting_analyzer is 'recordingless', this argument needs to be passed to save electrode info. Otherwise, electrodes info is not added to the nwb file. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. property_descriptions : dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties : list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. write_as : {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, optional, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' """ # TODO: move into add_units assert write_as in [ "units", "processing", ], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!" if write_as == "units": assert units_name == "units", "When writing to the nwbfile.units table, the name of the table must be 'units'!" write_in_processing_module = False if write_as == "units" else True # retrieve templates and stds template_extension = sorting_analyzer.get_extension("templates") if template_extension is None: raise ValueError("No templates found in the sorting analyzer.") template_means = template_extension.get_templates() template_stds = template_extension.get_templates(operator="std") sorting = sorting_analyzer.sorting if unit_ids is not None: unit_indices = sorting.ids_to_indices(unit_ids) template_means = template_means[unit_indices] template_stds = template_stds[unit_indices] # metrics properties (quality, template) are added as properties to the sorting copy sorting_copy = sorting.select_units(unit_ids=sorting.unit_ids) if sorting_analyzer.has_extension("quality_metrics"): qm = sorting_analyzer.get_extension("quality_metrics").get_data() for prop in qm.columns: if prop not in sorting_copy.get_property_keys(): sorting_copy.set_property(prop, qm[prop]) if sorting_analyzer.has_extension("template_metrics"): tm = sorting_analyzer.get_extension("template_metrics").get_data() for prop in tm.columns: if prop not in sorting_copy.get_property_keys(): sorting_copy.set_property(prop, tm[prop]) add_electrodes_info_to_nwbfile(recording, nwbfile=nwbfile, metadata=metadata) electrode_group_indices = _get_electrode_group_indices(recording, nwbfile=nwbfile) unit_electrode_indices = [electrode_group_indices] * len(sorting.unit_ids) add_units_table_to_nwbfile( sorting=sorting_copy, nwbfile=nwbfile, unit_ids=unit_ids, property_descriptions=property_descriptions, skip_properties=skip_properties, write_in_processing_module=write_in_processing_module, units_table_name=units_name, unit_table_description=units_description, waveform_means=template_means, waveform_sds=template_stds, unit_electrode_indices=unit_electrode_indices, )
[docs]def write_sorting_analyzer_to_nwbfile( sorting_analyzer: SortingAnalyzer, nwbfile_path: Optional[FilePath] = None, nwbfile: Optional[pynwb.NWBFile] = None, metadata: Optional[dict] = None, overwrite: bool = False, recording: Optional[BaseRecording] = None, verbose: bool = False, unit_ids: Optional[Union[list[str], list[int]]] = None, write_electrical_series: bool = False, add_electrical_series_kwargs: Optional[dict] = None, skip_properties: Optional[list[str]] = None, property_descriptions: Optional[dict] = None, write_as: Literal["units", "processing"] = "units", units_name: str = "units", units_description: str = "Autogenerated by neuroconv.", ): """ Convenience function to write directly a sorting analyzer object to an nwbfile. The function adds the data of the recording and the sorting plus the following information from the sorting analyzer: - quality metrics - template mean and std - template metrics Parameters ---------- sorting_analyzer : spikeinterface.SortingAnalyzer The sorting analyzer object to be written to the NWBFile. nwbfile_path : FilePath Path for where to write or load (if overwrite=False) the NWBFile. If specified, the context will always write to this location. nwbfile : NWBFile, optional If passed, this function will fill the relevant fields within the NWBFile object. E.g., calling:: write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile) will result in the appropriate changes to the my_nwbfile object. If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated and returned by the function. metadata : dict, optional Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True. The "Ecephys" section of metadata is also used to create electrodes and electrical series fields. overwrite : bool, default: False Whether to overwrite the NWBFile if one exists at the nwbfile_path. recording : BaseRecording, optional If the sorting_analyzer is 'recordingless', this argument needs to be passed to save electrode info. Otherwise, electrodes info is not added to the nwb file. verbose : bool, default: False If 'nwbfile_path' is specified, informs user after a successful write operation. unit_ids : list, optional Controls the unit_ids that will be written to the nwb file. If None (default), all units are written. write_electrical_series : bool, default: False If True, the recording object associated to the analyzer is written as an electrical series. add_electrical_series_kwargs: dict, optional Keyword arguments to control the `add_electrical_series()` function in case write_electrical_series=True property_descriptions: dict, optional For each key in this dictionary which matches the name of a unit property in sorting, adds the value as a description to that custom unit column. skip_properties: list of str, optional Each string in this list that matches a unit property will not be written to the NWBFile. write_as: {'units', 'processing'} How to save the units table in the nwb file. Options: - 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data. - 'processing' will save it to the processing module to serve as a historical provenance for the official table. units_name : str, default: 'units' The name of the units table. If write_as=='units', then units_name must also be 'units'. units_description : str, default: 'Autogenerated by neuroconv.' """ metadata = metadata if metadata is not None else dict() if sorting_analyzer.has_recording(): recording = sorting_analyzer.recording assert recording is not None, ( "recording not found. To add the electrode table, the sorting_analyzer " "needs to have a recording attached or the 'recording' argument needs to be used." ) # try: with make_or_load_nwbfile( nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose ) as nwbfile_out: if write_electrical_series: add_electrical_series_kwargs = add_electrical_series_kwargs or dict() add_electrical_series_to_nwbfile( recording=recording, nwbfile=nwbfile_out, metadata=metadata, **add_electrical_series_kwargs ) add_sorting_analyzer_to_nwbfile( sorting_analyzer=sorting_analyzer, nwbfile=nwbfile_out, metadata=metadata, recording=recording, unit_ids=unit_ids, skip_properties=skip_properties, property_descriptions=property_descriptions, write_as=write_as, units_name=units_name, units_description=units_description, )
def _get_electrode_group_indices(recording, nwbfile): """ """ if "group_name" in recording.get_property_keys(): group_names = np.unique(recording.get_property("group_name")) elif "group" in recording.get_property_keys(): group_names = np.unique(recording.get_property("group")) else: group_names = None if group_names is None: electrode_group_indices = None else: group_names = [str(group_name) for group_name in group_names] electrode_group_indices = nwbfile.electrodes.to_dataframe().query(f"group_name in {group_names}").index.values return electrode_group_indices