import uuid
import warnings
from collections import defaultdict
from typing import Any, Literal, Optional, Union
import numpy as np
import psutil
import pynwb
from hdmf.data_utils import AbstractDataChunkIterator
from pydantic import FilePath
from spikeinterface import BaseRecording, BaseSorting, SortingAnalyzer
from .spikeinterfacerecordingdatachunkiterator import (
SpikeInterfaceRecordingDataChunkIterator,
)
from ..nwb_helpers import get_module, make_or_load_nwbfile
from ...utils import (
DeepDict,
calculate_regular_series_rate,
dict_deep_update,
)
from ...utils.str_utils import human_readable_size
def _get_nwb_metadata(recording: BaseRecording, metadata: dict = None):
"""
Return default metadata for all recording fields.
Parameters
----------
recording: spikeinterface.BaseRecording
metadata: dict
metadata info for constructing the nwb file (optional).
"""
metadata = dict(
NWBFile=dict(
session_description="Auto-generated by NwbRecordingExtractor without description.",
identifier=str(uuid.uuid4()),
),
Ecephys=dict(
Device=[dict(name="Device", description="Ecephys probe. Automatically generated.")],
ElectrodeGroup=[
dict(name=str(group_name), description="no description", location="unknown", device="Device")
for group_name in np.unique(recording.get_channel_groups())
],
),
)
return metadata
[docs]def add_devices_to_nwbfile(nwbfile: pynwb.NWBFile, metadata: Optional[DeepDict] = None):
"""
Add device information to nwbfile object.
Will always ensure nwbfile has at least one device, but multiple
devices within the metadata list will also be created.
Parameters
----------
nwbfile: NWBFile
nwb file to which the recording information is to be added
metadata: DeepDict
metadata info for constructing the nwb file (optional).
Should be of the format::
metadata['Ecephys']['Device'] = [
{
'name': my_name,
'description': my_description
},
...
]
Missing keys in an element of metadata['Ecephys']['Device'] will be auto-populated with defaults.
"""
if nwbfile is not None:
assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile"
# Default Device metadata
defaults = dict(name="Device", description="Ecephys probe. Automatically generated.")
if metadata is None:
metadata = dict()
if "Ecephys" not in metadata:
metadata["Ecephys"] = dict()
if "Device" not in metadata["Ecephys"]:
metadata["Ecephys"]["Device"] = [defaults]
for device_metadata in metadata["Ecephys"]["Device"]:
if device_metadata.get("name", defaults["name"]) not in nwbfile.devices:
device_kwargs = dict(defaults, **device_metadata)
nwbfile.create_device(**device_kwargs)
[docs]def add_electrode_groups_to_nwbfile(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: Optional[dict] = None):
"""
Add electrode group information to nwbfile object.
Will always ensure nwbfile has at least one electrode group.
Will auto-generate a linked device if the specified name does not exist in the nwbfile.
Parameters
----------
recording: spikeinterface.BaseRecording
nwbfile: pynwb.NWBFile
nwb file to which the recording information is to be added
metadata: dict
metadata info for constructing the nwb file (optional).
Should be of the format::
metadata['Ecephys']['ElectrodeGroup'] = [
{
'name': my_name,
'description': my_description,
'location': electrode_location,
'device': my_device_name
},
...
]
Missing keys in an element of ``metadata['Ecephys']['ElectrodeGroup']`` will be auto-populated with defaults.
Group names set by RecordingExtractor channel properties will also be included with passed metadata,
but will only use default description and location.
"""
assert isinstance(nwbfile, pynwb.NWBFile), "'nwbfile' should be of type pynwb.NWBFile"
if metadata is None:
metadata = dict()
if "Ecephys" not in metadata:
metadata["Ecephys"] = dict()
add_devices_to_nwbfile(nwbfile=nwbfile, metadata=metadata)
group_names = _get_group_name(recording=recording)
defaults = [
dict(
name=group_name,
description="no description",
location="unknown",
device=[i.name for i in nwbfile.devices.values()][0],
)
for group_name in group_names
]
if "ElectrodeGroup" not in metadata["Ecephys"]:
metadata["Ecephys"]["ElectrodeGroup"] = defaults
assert all(
[isinstance(x, dict) for x in metadata["Ecephys"]["ElectrodeGroup"]]
), "Expected metadata['Ecephys']['ElectrodeGroup'] to be a list of dictionaries!"
for group_metadata in metadata["Ecephys"]["ElectrodeGroup"]:
if group_metadata.get("name", defaults[0]["name"]) not in nwbfile.electrode_groups:
device_name = group_metadata.get("device", defaults[0]["device"])
if device_name not in nwbfile.devices:
new_device_metadata = dict(Ecephys=dict(Device=[dict(name=device_name)]))
add_devices_to_nwbfile(nwbfile=nwbfile, metadata=new_device_metadata)
warnings.warn(
f"Device '{device_name}' not detected in "
"attempted link to electrode group! Automatically generating."
)
electrode_group_kwargs = dict(defaults[0], **group_metadata)
electrode_group_kwargs.update(device=nwbfile.devices[device_name])
nwbfile.create_electrode_group(**electrode_group_kwargs)
# TODO: Check this, probably not necessary
if not nwbfile.electrode_groups:
device_name = list(nwbfile.devices.keys())[0]
device = nwbfile.devices[device_name]
if len(nwbfile.devices) > 1:
warnings.warn(
"More than one device found when adding electrode group "
f"via channel properties: using device '{device_name}'. To use a "
"different device, indicate it the metadata argument."
)
electrode_group_kwargs = dict(defaults[0])
electrode_group_kwargs.update(device=device)
for group_name in np.unique(recording.get_channel_groups()).tolist():
electrode_group_kwargs.update(name=str(group_name))
nwbfile.create_electrode_group(**electrode_group_kwargs)
def _get_channel_name(recording: BaseRecording) -> np.ndarray:
"""
Extract the canonical `channel_name` from the recording, which will be written
in the electrodes table.
Parameters
----------
recording : BaseRecording
The recording object from which to extract the channel names.
Returns
-------
np.ndarray
An array containing the channel names. If the `channel_name` property is not
available, the channel IDs as strings will be returned.
"""
# That uses either the `channel_name` property or the channel ids as string otherwise.
channel_names = recording.get_property("channel_name")
if channel_names is None:
channel_names = recording.get_channel_ids().astype("str", copy=False)
return channel_names
def _get_group_name(recording: BaseRecording) -> np.ndarray:
"""
Extract the canonical `group_name` from the recording, which will be written
in the electrodes table.
Parameters
----------
recording : BaseRecording
The recording object from which to extract the group names.
Returns
-------
np.ndarray
An array containing the group names. If the `group_name` property is not
available, the channel groups will be returned. If the group names are
empty, a default value 'ElectrodeGroup' will be used.
Raises
------
ValueError
If the number of unique group names doesn't match the number of unique groups,
or if the mapping between group names and group numbers is inconsistent.
"""
default_value = "ElectrodeGroup"
group_names = recording.get_property("group_name")
groups = recording.get_channel_groups()
if group_names is None:
group_names = groups
if group_names is None:
group_names = np.full(recording.get_num_channels(), fill_value=default_value)
# Always ensure group_names are strings
group_names = group_names.astype("str", copy=False)
# If for any reason the group names are empty, fill them with the default
group_names[group_names == ""] = default_value
# Validate group names against groups
if groups is not None:
unique_groups = set(groups)
unique_names = set(group_names)
if len(unique_names) != len(unique_groups):
raise ValueError("The number of group names must match the number of groups")
# Check consistency of group name to group number mapping
group_to_name_map = {}
for group, name in zip(groups, group_names):
if group in group_to_name_map:
if group_to_name_map[group] != name:
raise ValueError("Inconsistent mapping between group numbers and group names")
else:
group_to_name_map[group] = name
return group_names
def _get_electrodes_table_global_ids(nwbfile: pynwb.NWBFile) -> list[str]:
"""
Generate a list of global identifiers for channels in the electrode table of an NWB file.
These identifiers are used to map electrodes across writing operations.
Parameters
----------
nwbfile : pynwb.NWBFile
The NWB file from which to extract the electrode table information.
Returns
-------
list[str]
A list of unique keys, each representing a combination of channel name and
group name from the electrodes table. If the electrodes table or the
necessary columns are not present, an empty list is returned.
"""
if nwbfile.electrodes is None:
return []
if "channel_name" not in nwbfile.electrodes.colnames or "group_name" not in nwbfile.electrodes.colnames:
return []
channel_names = nwbfile.electrodes["channel_name"][:]
group_names = nwbfile.electrodes["group_name"][:]
unique_keys = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)]
return unique_keys
def _get_electrode_table_indices_for_recording(recording: BaseRecording, nwbfile: pynwb.NWBFile) -> list[int]:
"""
Get the indices of the electrodes in the NWBFile that correspond to the channels
in the recording.
This function matches the `channel_name` and `group_name` from the recording to
the global identifiers in the NWBFile's electrodes table, returning the indices
of these matching electrodes.
Parameters
----------
recording : BaseRecording
The recording object from which to extract channel and group names.
nwbfile : pynwb.NWBFile
The NWBFile containing the electrodes table to search for matches.
Returns
-------
list[int]
A list of indices corresponding to the positions in the NWBFile's electrodes
table that match the channels in the recording.
"""
channel_names = _get_channel_name(recording=recording)
group_names = _get_group_name(recording=recording)
channel_global_ids = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)]
table_global_ids = _get_electrodes_table_global_ids(nwbfile=nwbfile)
electrode_table_indices = [table_global_ids.index(ch_id) for ch_id in channel_global_ids]
return electrode_table_indices
def _get_null_value_for_property(property: str, sample_data: Any, null_values_for_properties: dict[str, Any]) -> Any:
"""
Retrieve the null value for a given property based on its data type or a provided mapping.
Also performs type checking to ensure the default value matches the type of the existing data.
Parameters
----------
sample_data : Any
The sample data for which the default value is being determined. This can be of any data type.
null_values_for_properties : dict of str to Any
A dictionary mapping properties to their respective default values. If a property is not found in this
dictionary, a sensible default value based on the type of `sample_data` will be used.
Returns
-------
Any
The default value for the specified property. The type of the default value will match the type of `sample_data`
or the type specified in `null_values_for_properties`.
Raises
------
ValueError
If a sensible default value cannot be determined for the given property and data type, or if the type of the
provided default value does not match the type of the existing data.
"""
type_to_default_value = {list: [], np.ndarray: np.array(np.nan), str: "", float: np.nan, complex: np.nan}
# Check for numpy scalar types
sample_data = sample_data.item() if isinstance(sample_data, np.generic) else sample_data
default_value = null_values_for_properties.get(property, None)
if default_value is None:
sample_data_type = type(sample_data)
default_value = type_to_default_value.get(sample_data_type, None)
if default_value is None:
error_msg = (
f"Could not find a sensible default value for property '{property}' of type {sample_data_type} \n"
"This can be fixed by by modifying the recording property or setting a sensible default value "
"using the `add_electrodes` function argument `null_values_for_properties` as in: \n"
"null_values_for_properties = {{property}': default_value}"
)
raise ValueError(error_msg)
if type(default_value) != sample_data_type:
error_msg = (
f"Default value for property '{property}' in null_values_for_properties dict has a "
f"different type {type(default_value)} than the currently existing data type {sample_data_type}. \n"
"Modify the recording property or the default value to match"
)
raise ValueError(error_msg)
return default_value
[docs]def add_electrodes_to_nwbfile(
recording: BaseRecording,
nwbfile: pynwb.NWBFile,
metadata: Optional[dict] = None,
exclude: tuple = (),
null_values_for_properties: Optional[dict] = None,
):
"""
Build an electrode_table from the recording information and it to the nwbfile object.
Parameters
----------
recording: spikeinterface.BaseRecording
nwbfile: NWBFile
nwb file to which the recording information is to be added
metadata: dict
metadata info for constructing the nwb file (optional).
Should be of the format::
metadata['Ecephys']['Electrodes'] = [
{
'name': my_name,
'description': my_description
},
...
]
Note that data intended to be added to the electrodes table of the NWBFile should be set as channel
properties in the RecordingExtractor object.
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults
whenever possible.
If 'my_name' is set to one of the required fields for nwbfile
electrodes (id, x, y, z, imp, location, filtering, group_name),
then the metadata will override their default values.
Setting 'my_name' to metadata field 'group' is not supported as the linking to
nwbfile.electrode_groups is handled automatically; please specify the string 'group_name' in this case.
If no group information is passed via metadata, automatic linking to existing electrode groups,
possibly including the default, will occur.
exclude: tuple
An iterable containing the string names of channel properties in the RecordingExtractor
object to ignore when writing to the NWBFile.
"""
assert isinstance(
nwbfile, pynwb.NWBFile
), f"'nwbfile' should be of type pynwb.NWBFile but is of type {type(nwbfile)}"
null_values_for_properties = dict() if null_values_for_properties is None else null_values_for_properties
# Test that metadata has the expected structure
electrodes_metadata = list()
if metadata is not None:
electrodes_metadata = metadata.get("Ecephys", dict()).get("Electrodes", list())
required_keys = {"name", "description"}
assert all(
[isinstance(property, dict) and set(property.keys()) == required_keys for property in electrodes_metadata]
), (
"Expected metadata['Ecephys']['Electrodes'] to be a list of dictionaries, "
"containing the keys 'name' and 'description'"
)
assert all(
[property["name"] != "group" for property in electrodes_metadata]
), "The recording property 'group' is not allowed; please use 'group_name' instead!"
# Transform to a dict that maps property name to its description
property_descriptions = dict()
for property in electrodes_metadata:
property_descriptions[property["name"]] = property["description"]
# 1. Build columns details from extractor properties: dict(name: dict(description='',data=data, index=False))
data_to_add = dict()
recording_properties = recording.get_property_keys()
spikeinterface_special_cases = [
"offset_to_uV", # Written in the ElectricalSeries
"gain_to_uV", # Written in the ElectricalSeries
"contact_vector", # Structured array representing the probe
"channel_name", # We handle this here with _get_channel_name
"channel_names", # Some formats from neo also have this property, skip it
"group_name", # We handle this here _get_group_name
"group", # We handle this here with _get_group_name
]
excluded_properties = list(exclude) + spikeinterface_special_cases
properties_to_extract = [property for property in recording_properties if property not in excluded_properties]
for property in properties_to_extract:
data = np.asarray(recording.get_property(property)).copy() # Do not modify properties of the recording
index = isinstance(data[0], (list, np.ndarray, tuple))
if index and isinstance(data[0], np.ndarray):
index = data[0].ndim
# Fill with provided custom descriptions
description = property_descriptions.get(property, "no description")
data_to_add[property] = dict(description=description, data=data, index=index)
# Special cases properties
channel_names = _get_channel_name(recording=recording)
data_to_add["channel_name"] = dict(description="unique channel reference", data=channel_names, index=False)
group_names = _get_group_name(recording=recording)
data_to_add["group_name"] = dict(description="group_name", data=group_names, index=False)
# Location in spikeinterface is equivalent to rel_x, rel_y, rel_z in the nwb standard
if "location" in data_to_add:
data = data_to_add["location"]["data"]
column_number_to_property = {0: "rel_x", 1: "rel_y", 2: "rel_z"}
for column_number in range(data.shape[1]):
property = column_number_to_property[column_number]
data_to_add[property] = dict(description=property, data=data[:, column_number], index=False)
data_to_add.pop("location")
# In the electrode table location is the brain area of spikeinterface
if "brain_area" in data_to_add:
data_to_add["location"] = data_to_add["brain_area"]
data_to_add["location"].update(description="location")
data_to_add.pop("brain_area")
else:
# This is a required property and needs a default value
data = np.full(recording.get_num_channels(), fill_value="unknown")
data_to_add["location"] = dict(description="location", data=data, index=False)
# Add missing groups to the nwb file
groupless_names = [group_name for group_name in group_names if group_name not in nwbfile.electrode_groups]
if len(groupless_names) > 0:
electrode_group_list = [dict(name=group_name) for group_name in groupless_names]
missing_group_metadata = dict(Ecephys=dict(ElectrodeGroup=electrode_group_list))
add_electrode_groups_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=missing_group_metadata)
group_list = [nwbfile.electrode_groups[group_name] for group_name in group_names]
data_to_add["group"] = dict(description="the ElectrodeGroup object", data=group_list, index=False)
schema_properties = {"group", "group_name", "location"}
properties_to_add = set(data_to_add)
electrode_table_previous_properties = set(nwbfile.electrodes.colnames) if nwbfile.electrodes else set()
# The schema properties are always added by rows because they are required
properties_to_add_by_rows = schema_properties.union(electrode_table_previous_properties)
properties_to_add_by_columns = properties_to_add.difference(properties_to_add_by_rows)
# Properties that were added before require null values to add by rows if data is missing
properties_requiring_null_values = electrode_table_previous_properties.difference(properties_to_add)
nul_values_for_rows = dict()
for property in properties_requiring_null_values:
sample_data = nwbfile.electrodes[property][:][0]
null_value = _get_null_value_for_property(
property=property,
sample_data=sample_data,
null_values_for_properties=null_values_for_properties,
)
nul_values_for_rows[property] = null_value
# We only add new electrodes to the table
existing_global_ids = _get_electrodes_table_global_ids(nwbfile=nwbfile)
channel_global_ids = [f"{ch_name}_{gr_name}" for ch_name, gr_name in zip(channel_names, group_names)]
channel_indices_to_add = [index for index, key in enumerate(channel_global_ids) if key not in existing_global_ids]
properties_with_data = properties_to_add_by_rows.intersection(data_to_add)
for channel_index in channel_indices_to_add:
electrode_kwargs = nul_values_for_rows
data_dict = {property: data_to_add[property]["data"][channel_index] for property in properties_with_data}
electrode_kwargs.update(**data_dict)
nwbfile.add_electrode(**electrode_kwargs, enforce_unique_id=True)
# The channel_name column as we use channel_name, group_name as a unique identifier
# We fill previously inexistent values with the electrode table ids
electrode_table_size = len(nwbfile.electrodes.id[:])
previous_table_size = electrode_table_size - recording.get_num_channels()
if "channel_name" in properties_to_add_by_columns:
cols_args = data_to_add["channel_name"]
data = cols_args["data"]
previous_ids = nwbfile.electrodes.id[:previous_table_size]
default_value = np.array(previous_ids).astype("str")
extended_data = np.hstack([default_value, data])
cols_args["data"] = extended_data
nwbfile.add_electrode_column("channel_name", **cols_args)
all_indices = np.arange(electrode_table_size)
indices_for_new_data = _get_electrode_table_indices_for_recording(recording=recording, nwbfile=nwbfile)
indices_for_null_values = [index for index in all_indices if index not in indices_for_new_data]
extending_column = len(indices_for_null_values) > 0
# Add properties as columns
for property in properties_to_add_by_columns - {"channel_name"}:
cols_args = data_to_add[property]
data = cols_args["data"]
# This is the simple case, early return
if not extending_column:
nwbfile.add_electrode_column(property, **cols_args)
continue
adding_ragged_array = cols_args["index"]
if not adding_ragged_array:
sample_data = data[0]
dtype = data.dtype
extended_data = np.empty(shape=electrode_table_size, dtype=dtype)
extended_data[indices_for_new_data] = data
null_value = _get_null_value_for_property(
property=property,
sample_data=sample_data,
null_values_for_properties=null_values_for_properties,
)
extended_data[indices_for_null_values] = null_value
else:
dtype = np.ndarray
extended_data = np.empty(shape=electrode_table_size, dtype=dtype)
for index, value in enumerate(data):
index_in_extended_data = indices_for_new_data[index]
index_in_extended_data = indices_for_new_data[index]
extended_data[index_in_extended_data] = value.tolist()
for index in indices_for_null_values:
null_value = []
extended_data[index] = null_value
cols_args["data"] = extended_data
nwbfile.add_electrode_column(property, **cols_args)
[docs]def check_if_recording_traces_fit_into_memory(recording: BaseRecording, segment_index: int = 0) -> None:
"""
Raises an error if the full traces of a recording extractor are larger than psutil.virtual_memory().available.
Parameters
----------
recording : spikeinterface.BaseRecording
A recording extractor object from spikeinterface.
segment_index : int, optional
The segment index of the recording extractor object, by default 0
Raises
------
MemoryError
"""
element_size_in_bytes = recording.get_dtype().itemsize
num_channels = recording.get_num_channels()
num_frames = recording.get_num_frames(segment_index=segment_index)
traces_size_in_bytes = element_size_in_bytes * num_channels * num_frames
available_memory_in_bytes = psutil.virtual_memory().available
if traces_size_in_bytes > available_memory_in_bytes:
message = (
f"Memory error, full electrical series is {human_readable_size(traces_size_in_bytes, binary=True)} but only"
f" {human_readable_size(available_memory_in_bytes, binary=True)} are available. Use iterator_type='V2'"
)
raise MemoryError(message)
def _recording_traces_to_hdmf_iterator(
recording: BaseRecording,
segment_index: int = None,
return_scaled: bool = False,
iterator_type: Optional[str] = "v2",
iterator_opts: dict = None,
) -> AbstractDataChunkIterator:
"""Function to wrap traces of spikeinterface recording into an AbstractDataChunkIterator.
Parameters
----------
recording : spikeinterface.BaseRecording
A recording extractor from spikeinterface
segment_index : int, optional
The recording segment to add to the NWBFile.
return_scaled : bool, defaults to False
When True recording extractor objects from spikeinterface return their traces in microvolts.
iterator_type: {"v2", None}, default: 'v2'
The type of DataChunkIterator to use.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the iterator.
See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator
for the full list of options.
Returns
-------
traces_as_iterator: AbstractDataChunkIterator
The traces of the recording extractor wrapped in an iterator object.
Raises
------
ValueError
If the iterator_type is not 'v2' or None.
"""
supported_iterator_types = ["v2", None]
if iterator_type not in supported_iterator_types:
message = f"iterator_type {iterator_type} should be either 'v1', 'v2' (recommended) or None"
raise ValueError(message)
iterator_opts = dict() if iterator_opts is None else iterator_opts
if iterator_type is None:
check_if_recording_traces_fit_into_memory(recording=recording, segment_index=segment_index)
traces_as_iterator = recording.get_traces(return_scaled=return_scaled, segment_index=segment_index)
elif iterator_type == "v2":
traces_as_iterator = SpikeInterfaceRecordingDataChunkIterator(
recording=recording,
segment_index=segment_index,
return_scaled=return_scaled,
**iterator_opts,
)
else:
raise ValueError("iterator_type must be None or 'v2'.")
return traces_as_iterator
def _report_variable_offset(channel_offsets, channel_ids):
"""
Helper function to report variable offsets per channel IDs.
Groups the different available offsets per channel IDs and raises a ValueError.
"""
# Group the different offsets per channel IDs
offset_to_channel_ids = {}
for offset, channel_id in zip(channel_offsets, channel_ids):
offset = offset.item() if isinstance(offset, np.generic) else offset
channel_id = channel_id.item() if isinstance(channel_id, np.generic) else channel_id
if offset not in offset_to_channel_ids:
offset_to_channel_ids[offset] = []
offset_to_channel_ids[offset].append(channel_id)
# Create a user-friendly message
message_lines = ["Recording extractors with heterogeneous offsets are not supported."]
message_lines.append("Multiple offsets were found per channel IDs:")
for offset, ids in offset_to_channel_ids.items():
message_lines.append(f" Offset {offset}: Channel IDs {ids}")
message = "\n".join(message_lines)
raise ValueError(message)
[docs]def add_electrical_series_to_nwbfile(
recording: BaseRecording,
nwbfile: pynwb.NWBFile,
metadata: dict = None,
segment_index: int = 0,
starting_time: Optional[float] = None,
write_as: Literal["raw", "processed", "lfp"] = "raw",
es_key: str = None,
write_scaled: bool = False,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
always_write_timestamps: bool = False,
):
"""
Adds traces from recording object as ElectricalSeries to an NWBFile object.
Parameters
----------
recording : SpikeInterfaceRecording
A recording extractor from spikeinterface
nwbfile : NWBFile
nwb file to which the recording information is to be added
metadata : dict, optional
metadata info for constructing the nwb file.
Should be of the format::
metadata['Ecephys']['ElectricalSeries'] = dict(
name=my_name,
description=my_description
)
segment_index : int, default: 0
The recording segment to add to the NWBFile.
starting_time : float, optional
Sets the starting time of the ElectricalSeries to a manually set value.
write_as : {'raw', 'processed', 'lfp'}
How to save the traces data in the nwb file. Options:
- 'raw': save it in acquisition
- 'processed': save it as FilteredEphys, in a processing module
- 'lfp': save it as LFP, in a processing module
es_key : str, optional
Key in metadata dictionary containing metadata info for the specific electrical series
write_scaled : bool, default: False
If True, writes the traces in uV with the right conversion.
If False , the data is stored as it is and the right conversions factors are added to the nwbfile.
iterator_type: {"v2", None}, default: 'v2'
The type of DataChunkIterator to use.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the iterator.
See https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator
for the full list of options.
always_write_timestamps : bool, default: False
Set to True to always write timestamps.
By default (False), the function checks if the timestamps are uniformly sampled, and if so, stores the data
using a regular sampling rate instead of explicit timestamps. If set to True, timestamps will be written
explicitly, regardless of whether the sampling rate is uniform.
Notes
-----
Missing keys in an element of metadata['Ecephys']['ElectrodeGroup'] will be auto-populated with defaults
whenever possible.
"""
if starting_time is not None:
warnings.warn(
"The 'starting_time' parameter is deprecated and will be removed in June 2025. "
"Use the time alignment methods or set the recording times directlyfor modifying the starting time or timestamps "
"of the data if needed: "
"https://neuroconv.readthedocs.io/en/main/user_guide/temporal_alignment.html",
DeprecationWarning,
stacklevel=2,
)
assert write_as in [
"raw",
"processed",
"lfp",
], f"'write_as' should be 'raw', 'processed' or 'lfp', but instead received value {write_as}"
modality_signature = write_as.upper() if write_as == "lfp" else write_as.capitalize()
default_name = f"ElectricalSeries{modality_signature}"
default_description = dict(raw="Raw acquired data", lfp="Processed data - LFP", processed="Processed data")
eseries_kwargs = dict(name=default_name, description=default_description[write_as])
# Select and/or create module if lfp or processed data is to be stored.
if write_as in ["lfp", "processed"]:
ecephys_mod = get_module(
nwbfile=nwbfile,
name="ecephys",
description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.",
)
if write_as == "lfp" and "LFP" not in ecephys_mod.data_interfaces:
ecephys_mod.add(pynwb.ecephys.LFP(name="LFP"))
if write_as == "processed" and "Processed" not in ecephys_mod.data_interfaces:
ecephys_mod.add(pynwb.ecephys.FilteredEphys(name="Processed"))
if metadata is not None and "Ecephys" in metadata and es_key is not None:
assert es_key in metadata["Ecephys"], f"metadata['Ecephys'] dictionary does not contain key '{es_key}'"
eseries_kwargs.update(metadata["Ecephys"][es_key])
# If the recording extractor has more than 1 segment, append numbers to the names so that the names are unique.
# 0-pad these names based on the number of segments.
# If there are 10 segments use 2 digits, if there are 100 segments use 3 digits, etc.
if recording.get_num_segments() > 1:
width = int(np.ceil(np.log10((recording.get_num_segments()))))
eseries_kwargs["name"] += f"{segment_index:0{width}}"
# The add_electrodes adds a column with channel name to the electrode table.
add_electrodes_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata)
# Create a region for the electrodes table
electrode_table_indices = _get_electrode_table_indices_for_recording(recording=recording, nwbfile=nwbfile)
electrode_table_region = nwbfile.create_electrode_table_region(
region=electrode_table_indices,
description="electrode_table_region",
)
eseries_kwargs.update(electrodes=electrode_table_region)
# Spikeinterface guarantees data in micro volts when return_scaled=True. This multiplies by gain and adds offsets
# In nwb to get traces in Volts we take data*channel_conversion*conversion + offset
channel_conversion = recording.get_channel_gains()
channel_offsets = recording.get_channel_offsets()
unique_channel_conversion = np.unique(channel_conversion)
unique_channel_conversion = unique_channel_conversion[0] if len(unique_channel_conversion) == 1 else None
unique_offset = np.unique(channel_offsets)
if unique_offset.size > 1:
channel_ids = recording.get_channel_ids()
# This prints a user friendly error where the user is provided with a map from offset to channels
_report_variable_offset(channel_offsets, channel_ids)
unique_offset = unique_offset[0] if unique_offset[0] is not None else 0
micro_to_volts_conversion_factor = 1e-6
if not write_scaled and unique_channel_conversion is None:
eseries_kwargs.update(conversion=micro_to_volts_conversion_factor)
eseries_kwargs.update(channel_conversion=channel_conversion)
elif not write_scaled and unique_channel_conversion is not None:
eseries_kwargs.update(conversion=unique_channel_conversion * micro_to_volts_conversion_factor)
if not write_scaled:
eseries_kwargs.update(offset=unique_offset * micro_to_volts_conversion_factor)
# Iterator
ephys_data_iterator = _recording_traces_to_hdmf_iterator(
recording=recording,
segment_index=segment_index,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)
eseries_kwargs.update(data=ephys_data_iterator)
starting_time = starting_time if starting_time is not None else 0
if always_write_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
shifted_timestamps = starting_time + timestamps
eseries_kwargs.update(timestamps=shifted_timestamps)
else:
# By default we write the rate if the timestamps are regular
recording_has_timestamps = recording.has_time_vector(segment_index=segment_index)
if recording_has_timestamps:
timestamps = recording.get_times(segment_index=segment_index)
rate = calculate_regular_series_rate(series=timestamps) # Returns None if it is not regular
recording_t_start = timestamps[0]
else:
rate = recording.get_sampling_frequency()
recording_t_start = recording._recording_segments[segment_index].t_start or 0
# Shift timestamps if starting_time is set
if rate:
starting_time = float(starting_time + recording_t_start)
# Note that we call the sampling frequency again because the estimated rate might be different from the
# sampling frequency of the recording extractor by some epsilon.
eseries_kwargs.update(starting_time=starting_time, rate=recording.get_sampling_frequency())
else:
shifted_timestamps = starting_time + timestamps
eseries_kwargs.update(timestamps=shifted_timestamps)
# Create ElectricalSeries object and add it to nwbfile
es = pynwb.ecephys.ElectricalSeries(**eseries_kwargs)
if write_as == "raw":
nwbfile.add_acquisition(es)
elif write_as == "processed":
ecephys_mod.data_interfaces["Processed"].add_electrical_series(es)
elif write_as == "lfp":
ecephys_mod.data_interfaces["LFP"].add_electrical_series(es)
[docs]def add_electrodes_info_to_nwbfile(recording: BaseRecording, nwbfile: pynwb.NWBFile, metadata: dict = None):
"""
Add device, electrode_groups, and electrodes info to the nwbfile.
Parameters
----------
recording : SpikeInterfaceRecording
nwbfile : NWBFile
NWB file to which the recording information is to be added
metadata : dict, optional
metadata info for constructing the nwb file.
Should be of the format::
metadata['Ecephys']['Electrodes'] = [
{
'name': my_name,
'description': my_description
},
...
]
Note that data intended to be added to the electrodes table of the ``NWBFile`` should be set as channel
properties in the ``RecordingExtractor`` object.
Missing keys in an element of ``metadata['Ecephys']['ElectrodeGroup']`` will be auto-populated with defaults
whenever possible.
If ``'my_name'`` is set to one of the required fields for nwbfile
electrodes (id, x, y, z, imp, location, filtering, group_name),
then the metadata will override their default values.
Setting ``'my_name'`` to metadata field ``'group'`` is not supported as the linking to
``nwbfile.electrode_groups`` is handled automatically; please specify the string ``'group_name'`` in this case.
If no group information is passed via metadata, automatic linking to existing electrode groups,
possibly including the default, will occur.
"""
add_devices_to_nwbfile(nwbfile=nwbfile, metadata=metadata)
add_electrode_groups_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata)
add_electrodes_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata)
[docs]def add_recording_to_nwbfile(
recording: BaseRecording,
nwbfile: pynwb.NWBFile,
metadata: Optional[dict] = None,
starting_time: Optional[float] = None,
write_as: Literal["raw", "processed", "lfp"] = "raw",
es_key: Optional[str] = None,
write_electrical_series: bool = True,
write_scaled: bool = False,
iterator_type: str = "v2",
iterator_opts: Optional[dict] = None,
always_write_timestamps: bool = False,
):
"""
Add traces from a recording object as an ElectricalSeries to an NWBFile object.
Also adds device, electrode_groups, and electrodes to the NWBFile.
Parameters
----------
recording : BaseRecording
A recording extractor from SpikeInterface.
nwbfile : pynwb.NWBFile
The NWBFile object to which the recording information is to be added.
metadata : dict, optional
Metadata information for constructing the NWBFile. This should include:
- metadata['Ecephys']['ElectricalSeries'] : dict
Dictionary with metadata for the ElectricalSeries, such as:
- name : str
Name of the ElectricalSeries.
- description : str
Description of the ElectricalSeries.
starting_time : float, optional
Manually set the starting time of the ElectricalSeries. If not provided,
the starting time is taken from the recording extractor.
write_as : {'raw', 'processed', 'lfp'}, default='raw'
Specifies how to save the trace data in the NWB file. Options are:
- 'raw': Save the data in the acquisition group.
- 'processed': Save the data as FilteredEphys in a processing module.
- 'lfp': Save the data as LFP in a processing module.
es_key : str, optional
Key in the metadata dictionary containing metadata information for the specific ElectricalSeries.
write_electrical_series : bool, default=True
If True, writes the ElectricalSeries to the NWBFile. If False, no ElectricalSeries is written.
write_scaled : bool, default=False
If True, writes the traces in microvolts (uV) with the appropriate conversion.
If False, the data is stored as-is, and the correct conversion factors are added to the NWBFile.
iterator_type : {'v2', None}, default='v2'
The type of DataChunkIterator to use when writing data in chunks. Options are:
- 'v2': The SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
- None: Write the TimeSeries with no memory chunking.
iterator_opts : dict, optional
Dictionary of options for the iterator. Refer to the documentation at
https://hdmf.readthedocs.io/en/stable/hdmf.data_utils.html#hdmf.data_utils.GenericDataChunkIterator
for a full list of available options.
always_write_timestamps : bool, default: False
Set to True to always write timestamps.
By default (False), the function checks if the timestamps are uniformly sampled, and if so, stores the data
using a regular sampling rate instead of explicit timestamps. If set to True, timestamps will be written
explicitly, regardless of whether the sampling rate is uniform.
Notes
-----
Missing keys in an element of `metadata['Ecephys']['ElectrodeGroup']` will be auto-populated with defaults
whenever possible. Ensure that the provided metadata dictionary is correctly structured to avoid
unintended behavior.
"""
if hasattr(recording, "nwb_metadata"):
metadata = dict_deep_update(recording.nwb_metadata, metadata)
elif metadata is None:
metadata = _get_nwb_metadata(recording=recording)
add_electrodes_info_to_nwbfile(recording=recording, nwbfile=nwbfile, metadata=metadata)
if write_electrical_series:
number_of_segments = recording.get_num_segments()
for segment_index in range(number_of_segments):
add_electrical_series_to_nwbfile(
recording=recording,
nwbfile=nwbfile,
segment_index=segment_index,
starting_time=starting_time,
metadata=metadata,
write_as=write_as,
es_key=es_key,
write_scaled=write_scaled,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
always_write_timestamps=always_write_timestamps,
)
[docs]def write_recording_to_nwbfile(
recording: BaseRecording,
nwbfile_path: Optional[FilePath] = None,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
overwrite: bool = False,
verbose: bool = False,
starting_time: Optional[float] = None,
write_as: Optional[str] = "raw",
es_key: Optional[str] = None,
write_electrical_series: bool = True,
write_scaled: bool = False,
iterator_type: Optional[str] = "v2",
iterator_opts: Optional[dict] = None,
) -> pynwb.NWBFile:
"""
Primary method for writing a RecordingExtractor object to an NWBFile.
Parameters
----------
recording : spikeinterface.BaseRecording
nwbfile_path : FilePath, optional
Path for where to write or load (if overwrite=False) the NWBFile.
If specified, the context will always write to this location.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling::
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
metadata info for constructing the nwb file (optional). Should be
of the format::
metadata['Ecephys'] = {
'Device': [
{
'name': my_name,
'description': my_description
},
...
]
'ElectrodeGroup': [
{
'name': my_name,
'description': my_description,
'location': electrode_location,
'device': my_device_name
},
...
]
'Electrodes': [
{
'name': my_name,
'description': my_description
},
...
]
'ElectricalSeries' = {
'name': my_name,
'description': my_description
}
Note that data intended to be added to the electrodes table of the NWBFile should be set as channel
properties in the RecordingExtractor object.
overwrite : bool, default: False
Whether to overwrite the NWBFile if one exists at the nwbfile_path.
verbose : bool, default: False
If 'nwbfile_path' is specified, informs user after a successful write operation.
starting_time : float, optional
Sets the starting time of the ElectricalSeries to a manually set value.
write_as: {'raw', 'processed', 'lfp'}, optional
How to save the traces data in the nwb file.
- 'raw' will save it in acquisition
- 'processed' will save it as FilteredEphys, in a processing module
- 'lfp' will save it as LFP, in a processing module
es_key: str, optional
Key in metadata dictionary containing metadata info for the specific electrical series
write_electrical_series: bool, default: True
If True, electrical series are written in acquisition. If False, only device, electrode_groups,
and electrodes are written to NWB.
write_scaled: bool, default: True
If True, writes the scaled traces (return_scaled=True)
iterator_type: {"v2", None}
The type of DataChunkIterator to use.
'v2' is the locally developed SpikeInterfaceRecordingDataChunkIterator, which offers full control over chunking.
None: write the TimeSeries with no memory chunking.
iterator_opts: dict, optional
Dictionary of options for the RecordingExtractorDataChunkIterator (iterator_type='v2').
Valid options are:
* buffer_gb : float, default: 1.0
In units of GB. Recommended to be as much free RAM as available. Automatically calculates suitable
buffer shape.
* buffer_shape : tuple, optional
Manual specification of buffer shape to return on each iteration.
Must be a multiple of chunk_shape along each axis.
Cannot be set if `buffer_gb` is specified.
* chunk_mb : float. default: 1.0
Should be below 1 MB. Automatically calculates suitable chunk shape.
* chunk_shape : tuple, optional
Manual specification of the internal chunk shape for the HDF5 dataset.
Cannot be set if `chunk_mb` is also specified.
* display_progress : bool, default: False
Display a progress bar with iteration rate and estimated completion time.
* progress_bar_options : dict, optional
Dictionary of keyword arguments to be passed directly to tqdm.
See https://github.com/tqdm/tqdm#parameters for options.
"""
with make_or_load_nwbfile(
nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose
) as nwbfile_out:
add_recording_to_nwbfile(
recording=recording,
nwbfile=nwbfile_out,
starting_time=starting_time,
metadata=metadata,
write_as=write_as,
es_key=es_key,
write_electrical_series=write_electrical_series,
write_scaled=write_scaled,
iterator_type=iterator_type,
iterator_opts=iterator_opts,
)
return nwbfile_out
[docs]def add_units_table_to_nwbfile(
sorting: BaseSorting,
nwbfile: pynwb.NWBFile,
unit_ids: Optional[list[Union[str, int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[list[str]] = None,
units_table_name: str = "units",
unit_table_description: Optional[str] = None,
write_in_processing_module: bool = False,
waveform_means: Optional[np.ndarray] = None,
waveform_sds: Optional[np.ndarray] = None,
unit_electrode_indices: Optional[list[list[int]]] = None,
null_values_for_properties: Optional[dict] = None,
):
"""
Add sorting data to a NWBFile object as a Units table.
This function extracts unit properties from a SortingExtractor object and writes them
to an NWBFile Units table, either in the primary units interface or the processing
module (for intermediate/historical data). It handles unit selection, property customization,
waveform data, and electrode mapping.
Parameters
----------
sorting : spikeinterface.BaseSorting
The SortingExtractor object containing unit data.
nwbfile : pynwb.NWBFile
The NWBFile object to write the unit data into.
unit_ids : list of int or str, optional
The specific unit IDs to write. If None, all units are written.
property_descriptions : dict, optional
Custom descriptions for unit properties. Keys should match property names in `sorting`,
and values will be used as descriptions in the Units table.
skip_properties : list of str, optional
Unit properties to exclude from writing.
units_table_name : str, default: 'units'
Name of the Units table. Must be 'units' if `write_in_processing_module` is False.
unit_table_description : str, optional
Description for the Units table (e.g., sorting method, curation details).
write_in_processing_module : bool, default: False
If True, write to the processing module (intermediate data). If False, write to
the primary NWBFile.units table.
waveform_means : np.ndarray, optional
Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels).
waveform_sds : np.ndarray, optional
Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels).
unit_electrode_indices : list of lists of int, optional
For each unit, a list of electrode indices corresponding to waveform data.
unit_electrode_indices : list of lists of int, optional
A list of lists of integers indicating the indices of the electrodes that each unit is associated with.
The length of the list must match the number of units in the sorting extractor.
"""
unit_table_description = unit_table_description or "Autogenerated by neuroconv."
assert isinstance(
nwbfile, pynwb.NWBFile
), f"'nwbfile' should be of type pynwb.NWBFile but is of type {type(nwbfile)}"
if unit_electrode_indices is not None:
electrodes_table = nwbfile.electrodes
if electrodes_table is None:
raise ValueError(
"Electrodes table is required to map units to electrodes. Add an electrode table to the NWBFile first."
)
null_values_for_properties = dict() if null_values_for_properties is None else null_values_for_properties
if not write_in_processing_module and units_table_name != "units":
raise ValueError("When writing to the nwbfile.units table, the name of the table must be 'units'!")
if write_in_processing_module:
ecephys_mod = get_module(
nwbfile=nwbfile,
name="ecephys",
description="Intermediate data from extracellular electrophysiology recordings, e.g., LFP.",
)
write_table_first_time = units_table_name not in ecephys_mod.data_interfaces
if write_table_first_time:
units_table = pynwb.misc.Units(name=units_table_name, description=unit_table_description)
ecephys_mod.add(units_table)
units_table = ecephys_mod[units_table_name]
else:
write_table_first_time = nwbfile.units is None
if write_table_first_time:
nwbfile.units = pynwb.misc.Units(name="units", description=unit_table_description)
units_table = nwbfile.units
default_descriptions = dict(
isi_violation="Quality metric that measures the ISI violation ratio as a proxy for the purity of the unit.",
firing_rate="Number of spikes per unit of time.",
template="The extracellular average waveform.",
max_channel="The recording channel id with the largest amplitude.",
halfwidth="The full-width half maximum of the negative peak computed on the maximum channel.",
peak_to_valley="The duration between the negative and the positive peaks computed on the maximum channel.",
snr="The signal-to-noise ratio of the unit.",
quality="Quality of the unit as defined by phy (good, mua, noise).",
spike_amplitude="Average amplitude of peaks detected on the channel.",
spike_rate="Average rate of peaks detected on the channel.",
unit_name="Unique reference for each unit.",
)
if property_descriptions is None:
property_descriptions = dict()
if skip_properties is None:
skip_properties = list()
property_descriptions = dict(default_descriptions, **property_descriptions)
data_to_add = defaultdict(dict)
sorting_properties = sorting.get_property_keys()
excluded_properties = list(skip_properties) + ["contact_vector"]
properties_to_extract = [property for property in sorting_properties if property not in excluded_properties]
if unit_ids is not None:
sorting = sorting.select_units(unit_ids=unit_ids)
if unit_electrode_indices is not None:
unit_electrode_indices = np.array(unit_electrode_indices)[sorting.ids_to_indices(unit_ids)]
unit_ids = sorting.unit_ids
# Extract properties
for property in properties_to_extract:
data = sorting.get_property(property)
index = isinstance(data[0], (list, np.ndarray, tuple))
if index and isinstance(data[0], np.ndarray):
index = data[0].ndim
description = property_descriptions.get(property, "No description.")
data_to_add[property].update(description=description, data=data, index=index)
if property in ["max_channel", "max_electrode"] and nwbfile.electrodes is not None:
data_to_add[property].update(table=nwbfile.electrodes)
# Unit name logic
if "unit_name" in data_to_add:
# if 'unit_name' is set as a property, it is used to override default unit_ids (and "id")
unit_name_array = data_to_add["unit_name"]["data"]
else:
unit_name_array = unit_ids.astype("str", copy=False)
data_to_add["unit_name"].update(description="Unique reference for each unit.", data=unit_name_array)
units_table_previous_properties = set(units_table.colnames).difference({"spike_times"})
properties_to_add = set(data_to_add)
properties_to_add_by_rows = units_table_previous_properties.union({"id"})
properties_to_add_by_columns = properties_to_add - properties_to_add_by_rows
# Properties that were added before require null values to add by rows if data is missing
properties_requiring_null_values = units_table_previous_properties.difference(properties_to_add)
null_values_for_row = {}
for property in properties_requiring_null_values - {"electrodes"}: # TODO, fix electrodes
sample_data = units_table[property][:][0]
null_value = _get_null_value_for_property(
property=property,
sample_data=sample_data,
null_values_for_properties=null_values_for_properties,
)
null_values_for_row[property] = null_value
# Special case
null_values_for_row["id"] = None
# Add data by rows excluding the rows with previously added unit names
unit_names_used_previously = []
if "unit_name" in units_table_previous_properties:
unit_names_used_previously = units_table["unit_name"].data
has_electrodes_column = "electrodes" in units_table.colnames
properties_with_data = {property for property in properties_to_add_by_rows if "data" in data_to_add[property]}
rows_in_data = [index for index in range(sorting.get_num_units())]
if not has_electrodes_column:
rows_to_add = [index for index in rows_in_data if unit_name_array[index] not in unit_names_used_previously]
else:
rows_to_add = []
for index in rows_in_data:
if unit_name_array[index] not in unit_names_used_previously:
rows_to_add.append(index)
else:
unit_name = unit_name_array[index]
previous_electrodes = units_table[np.where(units_table["unit_name"][:] == unit_name)[0]].electrodes
if list(previous_electrodes.values[0]) != list(unit_electrode_indices[index]):
rows_to_add.append(index)
for row in rows_to_add:
unit_kwargs = null_values_for_row
for property in properties_with_data:
unit_kwargs[property] = data_to_add[property]["data"][row]
spike_times = []
# Extract and concatenate the spike times from multiple segments
for segment_index in range(sorting.get_num_segments()):
segment_spike_times = sorting.get_unit_spike_train(
unit_id=unit_ids[row], segment_index=segment_index, return_times=True
)
spike_times.append(segment_spike_times)
spike_times = np.concatenate(spike_times)
if waveform_means is not None:
unit_kwargs["waveform_mean"] = waveform_means[row]
if waveform_sds is not None:
unit_kwargs["waveform_sd"] = waveform_sds[row]
if unit_electrode_indices is not None:
unit_kwargs["electrodes"] = unit_electrode_indices[row]
units_table.add_unit(spike_times=spike_times, **unit_kwargs, enforce_unique_id=True)
# Add unit_name as a column and fill previously existing rows with unit_name equal to str(ids)
unit_table_size = len(units_table.id[:])
previous_table_size = len(units_table.id[:]) - len(unit_name_array)
if "unit_name" in properties_to_add_by_columns:
cols_args = data_to_add["unit_name"]
data = cols_args["data"]
previous_ids = units_table.id[:previous_table_size]
default_value = np.array(previous_ids).astype("str")
extended_data = np.hstack([default_value, data])
cols_args["data"] = extended_data
units_table.add_column("unit_name", **cols_args)
# Build a channel name to electrode table index map
table_df = units_table.to_dataframe().reset_index()
unit_name_to_electrode_index = {
unit_name: table_df.query(f"unit_name=='{unit_name}'").index[0] for unit_name in unit_name_array
}
indices_for_new_data = [unit_name_to_electrode_index[unit_name] for unit_name in unit_name_array]
indices_for_null_values = table_df.index.difference(indices_for_new_data).values
extending_column = len(indices_for_null_values) > 0
# Add properties as columns
for property in properties_to_add_by_columns - {"unit_name"}:
cols_args = data_to_add[property]
data = cols_args["data"]
# This is the simple case, early return
if not extending_column:
units_table.add_column(property, **cols_args)
continue
# Extending the columns is done differently for ragged arrays
adding_ragged_array = cols_args["index"]
if not adding_ragged_array:
sample_data = data[0]
dtype = data.dtype
extended_data = np.empty(shape=unit_table_size, dtype=dtype)
extended_data[indices_for_new_data] = data
null_value = _get_null_value_for_property(
property=property,
sample_data=sample_data,
null_values_for_properties=null_values_for_properties,
)
extended_data[indices_for_null_values] = null_value
else:
dtype = np.ndarray
extended_data = np.empty(shape=unit_table_size, dtype=dtype)
for index, value in enumerate(data):
index_in_extended_data = indices_for_new_data[index]
extended_data[index_in_extended_data] = value.tolist()
for index in indices_for_null_values:
null_value = []
extended_data[index] = null_value
# Add the data
cols_args["data"] = extended_data
units_table.add_column(property, **cols_args)
[docs]def add_sorting_to_nwbfile(
sorting: BaseSorting,
nwbfile: Optional[pynwb.NWBFile] = None,
unit_ids: Optional[Union[list[str], list[int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[list[str]] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
waveform_means: Optional[np.ndarray] = None,
waveform_sds: Optional[np.ndarray] = None,
unit_electrode_indices: Optional[list[list[int]]] = None,
):
"""Add sorting data (units and their properties) to an NWBFile.
This function serves as a convenient wrapper around `add_units_table` to match
Spikeinterface's `SortingExtractor`
Parameters
----------
sorting : BaseSorting
The SortingExtractor object containing unit data.
nwbfile : pynwb.NWBFile, optional
The NWBFile object to write the unit data into.
unit_ids : list of int or str, optional
The specific unit IDs to write. If None, all units are written.
property_descriptions : dict, optional
Custom descriptions for unit properties. Keys should match property names in `sorting`,
and values will be used as descriptions in the Units table.
skip_properties : list of str, optional
Unit properties to exclude from writing.
write_as : {'units', 'processing'}, default: 'units'
Where to write the unit data:
- 'units': Write to the primary NWBFile.units table.
- 'processing': Write to the processing module (intermediate data).
units_name : str, default: 'units'
Name of the Units table. Must be 'units' if `write_as` is 'units'.
units_description : str, optional
Description for the Units table (e.g., sorting method, curation details).
waveform_means : np.ndarray, optional
Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels).
waveform_sds : np.ndarray, optional
Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels).
unit_electrode_indices : list of lists of int, optional
A list of lists of integers indicating the indices of the electrodes that each unit is associated with.
The length of the list must match the number of units in the sorting extractor.
"""
assert write_as in [
"units",
"processing",
], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!"
write_in_processing_module = False if write_as == "units" else True
add_units_table_to_nwbfile(
sorting=sorting,
unit_ids=unit_ids,
nwbfile=nwbfile,
property_descriptions=property_descriptions,
skip_properties=skip_properties,
write_in_processing_module=write_in_processing_module,
units_table_name=units_name,
unit_table_description=units_description,
waveform_means=waveform_means,
waveform_sds=waveform_sds,
unit_electrode_indices=unit_electrode_indices,
)
[docs]def write_sorting_to_nwbfile(
sorting: BaseSorting,
nwbfile_path: Optional[FilePath] = None,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
overwrite: bool = False,
verbose: bool = False,
unit_ids: Optional[list[Union[str, int]]] = None,
property_descriptions: Optional[dict] = None,
skip_properties: Optional[list[str]] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
waveform_means: Optional[np.ndarray] = None,
waveform_sds: Optional[np.ndarray] = None,
unit_electrode_indices=None,
):
"""
Primary method for writing a SortingExtractor object to an NWBFile.
Parameters
----------
sorting : spikeinterface.BaseSorting
nwbfile_path : FilePath, optional
Path for where to write or load (if overwrite=False) the NWBFile.
If specified, the context will always write to this location.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling::
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True.
overwrite : bool, default: False
Whether to overwrite the NWBFile if one exists at the nwbfile_path.
The default is False (append mode).
verbose : bool, default: False
If 'nwbfile_path' is specified, informs user after a successful write operation.
unit_ids : list, optional
Controls the unit_ids that will be written to the nwb file. If None (default), all
units are written.
property_descriptions : dict, optional
For each key in this dictionary which matches the name of a unit
property in sorting, adds the value as a description to that
custom unit column.
skip_properties : list of str, optional
Each string in this list that matches a unit property will not be written to the NWBFile.
write_as : {'units', 'processing'}
How to save the units table in the nwb file. Options:
- 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data.
- 'processing' will save it to the processing module to serve as a historical provenance for the official table.
units_name : str, default: 'units'
The name of the units table. If write_as=='units', then units_name must also be 'units'.
units_description : str, default: 'Autogenerated by neuroconv.'
waveform_means : np.ndarray, optional
Waveform mean (template) for each unit. Shape: (num_units, num_samples, num_channels).
waveform_sds : np.ndarray, optional
Waveform standard deviation for each unit. Shape: (num_units, num_samples, num_channels).
unit_electrode_indices : list of lists of int, optional
For each unit, a list of electrode indices corresponding to waveform data.
"""
with make_or_load_nwbfile(
nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose
) as nwbfile_out:
add_sorting_to_nwbfile(
sorting=sorting,
nwbfile=nwbfile_out,
unit_ids=unit_ids,
property_descriptions=property_descriptions,
skip_properties=skip_properties,
write_as=write_as,
units_name=units_name,
units_description=units_description,
waveform_means=waveform_means,
waveform_sds=waveform_sds,
unit_electrode_indices=unit_electrode_indices,
)
[docs]def add_sorting_analyzer_to_nwbfile(
sorting_analyzer: SortingAnalyzer,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
recording: Optional[BaseRecording] = None,
unit_ids: Optional[Union[list[str], list[int]]] = None,
skip_properties: Optional[list[str]] = None,
property_descriptions: Optional[dict] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
):
"""
Convenience function to write directly a sorting analyzer object to an nwbfile.
The function adds the data of the recording and the sorting plus the following information from the sorting analyzer:
- quality metrics
- template mean and std
- template metrics
Parameters
----------
sorting_analyzer : spikeinterface.SortingAnalyzer
The sorting analyzer object to be written to the NWBFile.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling::
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True.
The "Ecephys" section of metadata is also used to create electrodes and electrical series fields.
recording : BaseRecording, optional
If the sorting_analyzer is 'recordingless', this argument needs to be passed to save electrode info.
Otherwise, electrodes info is not added to the nwb file.
unit_ids : list, optional
Controls the unit_ids that will be written to the nwb file. If None (default), all
units are written.
property_descriptions : dict, optional
For each key in this dictionary which matches the name of a unit
property in sorting, adds the value as a description to that
custom unit column.
skip_properties : list of str, optional
Each string in this list that matches a unit property will not be written to the NWBFile.
write_as : {'units', 'processing'}
How to save the units table in the nwb file. Options:
- 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data.
- 'processing' will save it to the processing module to serve as a historical provenance for the official table.
units_name : str, optional, default: 'units'
The name of the units table. If write_as=='units', then units_name must also be 'units'.
units_description : str, default: 'Autogenerated by neuroconv.'
"""
# TODO: move into add_units
assert write_as in [
"units",
"processing",
], f"Argument write_as ({write_as}) should be one of 'units' or 'processing'!"
if write_as == "units":
assert units_name == "units", "When writing to the nwbfile.units table, the name of the table must be 'units'!"
write_in_processing_module = False if write_as == "units" else True
# retrieve templates and stds
template_extension = sorting_analyzer.get_extension("templates")
if template_extension is None:
raise ValueError("No templates found in the sorting analyzer.")
template_means = template_extension.get_templates()
template_stds = template_extension.get_templates(operator="std")
sorting = sorting_analyzer.sorting
if unit_ids is not None:
unit_indices = sorting.ids_to_indices(unit_ids)
template_means = template_means[unit_indices]
template_stds = template_stds[unit_indices]
# metrics properties (quality, template) are added as properties to the sorting copy
sorting_copy = sorting.select_units(unit_ids=sorting.unit_ids)
if sorting_analyzer.has_extension("quality_metrics"):
qm = sorting_analyzer.get_extension("quality_metrics").get_data()
for prop in qm.columns:
if prop not in sorting_copy.get_property_keys():
sorting_copy.set_property(prop, qm[prop])
if sorting_analyzer.has_extension("template_metrics"):
tm = sorting_analyzer.get_extension("template_metrics").get_data()
for prop in tm.columns:
if prop not in sorting_copy.get_property_keys():
sorting_copy.set_property(prop, tm[prop])
add_electrodes_info_to_nwbfile(recording, nwbfile=nwbfile, metadata=metadata)
electrode_group_indices = _get_electrode_group_indices(recording, nwbfile=nwbfile)
unit_electrode_indices = [electrode_group_indices] * len(sorting.unit_ids)
add_units_table_to_nwbfile(
sorting=sorting_copy,
nwbfile=nwbfile,
unit_ids=unit_ids,
property_descriptions=property_descriptions,
skip_properties=skip_properties,
write_in_processing_module=write_in_processing_module,
units_table_name=units_name,
unit_table_description=units_description,
waveform_means=template_means,
waveform_sds=template_stds,
unit_electrode_indices=unit_electrode_indices,
)
[docs]def write_sorting_analyzer_to_nwbfile(
sorting_analyzer: SortingAnalyzer,
nwbfile_path: Optional[FilePath] = None,
nwbfile: Optional[pynwb.NWBFile] = None,
metadata: Optional[dict] = None,
overwrite: bool = False,
recording: Optional[BaseRecording] = None,
verbose: bool = False,
unit_ids: Optional[Union[list[str], list[int]]] = None,
write_electrical_series: bool = False,
add_electrical_series_kwargs: Optional[dict] = None,
skip_properties: Optional[list[str]] = None,
property_descriptions: Optional[dict] = None,
write_as: Literal["units", "processing"] = "units",
units_name: str = "units",
units_description: str = "Autogenerated by neuroconv.",
):
"""
Convenience function to write directly a sorting analyzer object to an nwbfile.
The function adds the data of the recording and the sorting plus the following information from the sorting analyzer:
- quality metrics
- template mean and std
- template metrics
Parameters
----------
sorting_analyzer : spikeinterface.SortingAnalyzer
The sorting analyzer object to be written to the NWBFile.
nwbfile_path : FilePath
Path for where to write or load (if overwrite=False) the NWBFile.
If specified, the context will always write to this location.
nwbfile : NWBFile, optional
If passed, this function will fill the relevant fields within the NWBFile object.
E.g., calling::
write_recording(recording=my_recording_extractor, nwbfile=my_nwbfile)
will result in the appropriate changes to the my_nwbfile object.
If neither 'nwbfile_path' nor 'nwbfile' are specified, an NWBFile object will be automatically generated
and returned by the function.
metadata : dict, optional
Metadata dictionary with information used to create the NWBFile when one does not exist or overwrite=True.
The "Ecephys" section of metadata is also used to create electrodes and electrical series fields.
overwrite : bool, default: False
Whether to overwrite the NWBFile if one exists at the nwbfile_path.
recording : BaseRecording, optional
If the sorting_analyzer is 'recordingless', this argument needs to be passed to save electrode info.
Otherwise, electrodes info is not added to the nwb file.
verbose : bool, default: False
If 'nwbfile_path' is specified, informs user after a successful write operation.
unit_ids : list, optional
Controls the unit_ids that will be written to the nwb file. If None (default), all
units are written.
write_electrical_series : bool, default: False
If True, the recording object associated to the analyzer is written as an electrical series.
add_electrical_series_kwargs: dict, optional
Keyword arguments to control the `add_electrical_series()` function in case write_electrical_series=True
property_descriptions: dict, optional
For each key in this dictionary which matches the name of a unit
property in sorting, adds the value as a description to that
custom unit column.
skip_properties: list of str, optional
Each string in this list that matches a unit property will not be written to the NWBFile.
write_as: {'units', 'processing'}
How to save the units table in the nwb file. Options:
- 'units' will save it to the official NWBFile.Units position; recommended only for the final form of the data.
- 'processing' will save it to the processing module to serve as a historical provenance for the official table.
units_name : str, default: 'units'
The name of the units table. If write_as=='units', then units_name must also be 'units'.
units_description : str, default: 'Autogenerated by neuroconv.'
"""
metadata = metadata if metadata is not None else dict()
if sorting_analyzer.has_recording():
recording = sorting_analyzer.recording
assert recording is not None, (
"recording not found. To add the electrode table, the sorting_analyzer "
"needs to have a recording attached or the 'recording' argument needs to be used."
)
# try:
with make_or_load_nwbfile(
nwbfile_path=nwbfile_path, nwbfile=nwbfile, metadata=metadata, overwrite=overwrite, verbose=verbose
) as nwbfile_out:
if write_electrical_series:
add_electrical_series_kwargs = add_electrical_series_kwargs or dict()
add_electrical_series_to_nwbfile(
recording=recording, nwbfile=nwbfile_out, metadata=metadata, **add_electrical_series_kwargs
)
add_sorting_analyzer_to_nwbfile(
sorting_analyzer=sorting_analyzer,
nwbfile=nwbfile_out,
metadata=metadata,
recording=recording,
unit_ids=unit_ids,
skip_properties=skip_properties,
property_descriptions=property_descriptions,
write_as=write_as,
units_name=units_name,
units_description=units_description,
)
def _get_electrode_group_indices(recording, nwbfile):
""" """
if "group_name" in recording.get_property_keys():
group_names = np.unique(recording.get_property("group_name"))
elif "group" in recording.get_property_keys():
group_names = np.unique(recording.get_property("group"))
else:
group_names = None
if group_names is None:
electrode_group_indices = None
else:
group_names = [str(group_name) for group_name in group_names]
electrode_group_indices = nwbfile.electrodes.to_dataframe().query(f"group_name in {group_names}").index.values
return electrode_group_indices