Source code for neuroconv.datainterfaces.ecephys.neuralynx.neuralynxdatainterface

import json
from typing import Optional

import numpy as np
from pydantic import DirectoryPath

from ..baserecordingextractorinterface import BaseRecordingExtractorInterface
from ..basesortingextractorinterface import BaseSortingExtractorInterface
from ....utils import dict_deep_update


[docs]class NeuralynxRecordingInterface(BaseRecordingExtractorInterface): """Primary data interface for converting Neuralynx data. Uses :py:class:`~spikeinterface.extractors.NeuralynxRecordingExtractor`.""" display_name = "Neuralynx Recording" associated_suffixes = (".ncs", ".nse", ".ntt", ".nse", ".nev") info = "Interface for Neuralynx recording data."
[docs] @classmethod def get_stream_names(cls, folder_path: DirectoryPath) -> list[str]: from spikeinterface.extractors import NeuralynxRecordingExtractor stream_names, _ = NeuralynxRecordingExtractor.get_streams(folder_path=folder_path) return stream_names
[docs] @classmethod def get_source_schema(cls) -> dict: source_schema = super().get_source_schema() source_schema["properties"]["folder_path"][ "description" ] = 'Path to Neuralynx directory containing ".ncs", ".nse", ".ntt", ".nse", or ".nev" files.' return source_schema
def _source_data_to_extractor_kwargs(self, source_data: dict) -> dict: extractor_kwargs = source_data.copy() extractor_kwargs["all_annotations"] = True return extractor_kwargs def __init__( self, folder_path: DirectoryPath, stream_name: Optional[str] = None, verbose: bool = False, es_key: str = "ElectricalSeries", ): """ Initialize reading of OpenEphys binary recording. Parameters ---------- folder_path: FolderPathType Path to Neuralynx directory. stream_name : str, optional The name of the recording stream to load; only required if there is more than one stream detected. Call `NeuralynxRecordingInterface.get_stream_names(folder_path=...)` to see what streams are available. verbose : bool, default: False es_key : str, default: "ElectricalSeries" """ super().__init__( folder_path=folder_path, stream_name=stream_name, verbose=verbose, es_key=es_key, ) # convert properties of object dtype (e.g. datetime) and bool as these are not supported by nwb for key in self.recording_extractor.get_property_keys(): value = self.recording_extractor.get_property(key) if value.dtype == object or value.dtype == np.bool_: self.recording_extractor.set_property(key, np.asarray(value, dtype=str))
[docs] def get_metadata(self) -> dict: neo_metadata = extract_neo_header_metadata(self.recording_extractor.neo_reader) # remove filter related entries already covered by `add_recording_extractor_properties` neo_metadata = {k: v for k, v in neo_metadata.items() if not k.lower().startswith("dsp")} # map Neuralynx metadata to NWB nwb_metadata = {"NWBFile": {}, "Ecephys": {"Device": []}} neuralynx_device = None if "SessionUUID" in neo_metadata: # note: SessionUUID can not be used as 'identifier' as this requires uuid4 nwb_metadata["NWBFile"]["session_id"] = neo_metadata.pop("SessionUUID") if "recording_opened" in neo_metadata: nwb_metadata["NWBFile"]["session_start_time"] = neo_metadata.pop("recording_opened") if "AcquisitionSystem" in neo_metadata: neuralynx_device = {"name": neo_metadata.pop("AcquisitionSystem")} elif "HardwareSubSystemType" in neo_metadata: neuralynx_device = {"name": neo_metadata.pop("HardwareSubSystemType")} if neuralynx_device is not None: if "ApplicationName" in neo_metadata or "ApplicationVersion" in neo_metadata: name = neo_metadata.pop("ApplicationName", "") version = str(neo_metadata.pop("ApplicationVersion", "")) neuralynx_device["description"] = f"{name} {version}" nwb_metadata["Ecephys"]["Device"].append(neuralynx_device) neo_metadata = {k: str(v) for k, v in neo_metadata.items()} nwb_metadata["NWBFile"]["notes"] = json.dumps(neo_metadata, ensure_ascii=True) return dict_deep_update(super().get_metadata(), nwb_metadata)
[docs]class NeuralynxSortingInterface(BaseSortingExtractorInterface): """ Primary data interface for converting Neuralynx sorting data. Uses :py:class:`~spikeinterface.extractors.NeuralynxSortingExtractor`. """ display_name = "Neuralynx Sorting" associated_suffixes = (".nse", ".ntt", ".nse", ".nev") info = "Interface for Neuralynx sorting data." def __init__( self, folder_path: DirectoryPath, sampling_frequency: Optional[float] = None, verbose: bool = False, stream_id: Optional[str] = None, ): """_summary_ Parameters ---------- folder_path : str, Path The path to the folder/directory containing the data files for the session (nse, ntt, nse, nev) sampling_frequency : float, optional If a specific sampling_frequency is desired it can be set with this argument. verbose : bool, default: False Enables verbosity stream_id: str, optional Used by Spikeinterface and neo to calculate the t_start, if not provided and the stream is unique it will be chosen automatically """ super().__init__( folder_path=folder_path, sampling_frequency=sampling_frequency, stream_id=stream_id, verbose=verbose )
[docs]def extract_neo_header_metadata(neo_reader) -> dict: """ Extract the session metadata from a NeuralynxRawIO object Parameters ---------- neo_reader: NeuralynxRawIO object Neo IO to extract the metadata from Returns ------- dict: dictionary containing the session metadata across channels Uses the mu character, which may cause problems for downstream things that expect ASCII. """ # check if neuralynx file header objects are present and use these metadata extraction if hasattr(neo_reader, "file_headers"): # use only ncs files as only continuous signals are extracted # note that in the neo io the order of file headers is the same as for channels headers = [header for filename, header in neo_reader.file_headers.items() if filename.lower().endswith(".ncs")] # use metadata provided as array_annotations for each channel (neo version <=0.11.0) else: # TODO: Remove else after dependency update to neo >=0.12 headers = [] neo_annotations = neo_reader.raw_annotations for stream_annotations in neo_annotations["blocks"][0]["segments"][0]["signals"]: for chan_idx in range(len(stream_annotations["__array_annotations__"]["channel_names"])): headers.append({k: v[chan_idx] for k, v in stream_annotations["__array_annotations__"].items()}) # extract common attributes across channels by preserving only shared header keys and values common_header = _dict_intersection(headers) # reintroduce recording times as these are typically not exactly identical # use minimal recording_opened and maximal recording_closed value if "recording_opened" not in common_header and all(["recording_opened" in h for h in headers]): common_header["recording_opened"] = min([h["recording_opened"] for h in headers]) if "recording_closed" not in common_header and all(["recording_closed" in h for h in headers]): common_header["recording_closed"] = max([h["recording_closed"] for h in headers]) return common_header
def _dict_intersection(dict_list: list[dict]) -> dict: """ Intersect dict_list and return only common keys and values Parameters ---------- dict_list: list of dictionaries each representing a header Returns ------- dict: Dictionary containing key-value pairs common to all input dictionary_list """ # Collect keys appearing in all dictionaries common_keys = list(set.intersection(*[set(h.keys()) for h in dict_list])) # Add values for common keys if the value is identical across all headers (dict_list) first_dict = dict_list[0] all_dicts_have_same_value_for = lambda key: all([first_dict[key] == dict[key] for dict in dict_list]) common_header = {key: first_dict[key] for key in common_keys if all_dicts_have_same_value_for(key)} return common_header