Source code for neuroconv.datainterfaces.ecephys.spikeglx.spikeglxconverter
from pathlib import Path
from typing import Optional
from pydantic import DirectoryPath, validate_call
from .spikeglxdatainterface import SpikeGLXRecordingInterface
from .spikeglxnidqinterface import SpikeGLXNIDQInterface
from ....nwbconverter import ConverterPipe
from ....utils import get_json_schema_from_method_signature
[docs]class SpikeGLXConverterPipe(ConverterPipe):
"""
The simplest, easiest to use class for converting all SpikeGLX data in a folder.
Primary conversion class for handling multiple SpikeGLX data streams.
"""
display_name = "SpikeGLX Converter"
keywords = SpikeGLXRecordingInterface.keywords + SpikeGLXNIDQInterface.keywords
associated_suffixes = SpikeGLXRecordingInterface.associated_suffixes + SpikeGLXNIDQInterface.associated_suffixes
info = "Converter for multi-stream SpikeGLX recording data."
[docs] @classmethod
def get_source_schema(cls) -> dict:
"""
Get the schema for the source arguments.
Returns
-------
dict
The schema dictionary containing input parameters and descriptions
for initializing the SpikeGLX converter.
"""
source_schema = get_json_schema_from_method_signature(method=cls.__init__, exclude=["streams"])
source_schema["properties"]["folder_path"]["description"] = "Path to the folder containing SpikeGLX streams."
return source_schema
[docs] @classmethod
def get_streams(cls, folder_path: DirectoryPath) -> list[str]:
"""
Get the stream IDs available in the folder.
Parameters
----------
folder_path : DirectoryPath
Path to the folder containing SpikeGLX streams.
Returns
-------
list of str
The IDs of all available streams in the folder.
"""
from spikeinterface.extractors import SpikeGLXRecordingExtractor
# The first entry is the stream ids the second is the stream names
return SpikeGLXRecordingExtractor.get_streams(folder_path=folder_path)[0]
@validate_call
def __init__(
self,
folder_path: DirectoryPath,
streams: Optional[list[str]] = None,
verbose: bool = False,
):
"""
Read all data from multiple streams stored in the SpikeGLX format.
This can include...
(a) single-probe but multi-band such as AP+LF streams
(b) multi-probe with multi-band
(c) with or without the associated NIDQ channels
Parameters
----------
folder_path : DirectoryPath
Path to folder containing the NIDQ stream and subfolders containing each IMEC stream.
streams : list of strings, optional
A specific list of streams you wish to load.
To see which streams are available, run `SpikeGLXConverter.get_streams(folder_path="path/to/spikeglx")`.
By default, all available streams are loaded.
verbose : bool, default: False
Whether to output verbose text.
"""
folder_path = Path(folder_path)
streams_ids = streams or self.get_streams(folder_path=folder_path)
data_interfaces = dict()
nidq_streams = [stream_id for stream_id in streams_ids if stream_id == "nidq"]
electrical_streams = [stream_id for stream_id in streams_ids if stream_id not in nidq_streams]
for stream_id in electrical_streams:
data_interfaces[stream_id] = SpikeGLXRecordingInterface(folder_path=folder_path, stream_id=stream_id)
for stream_id in nidq_streams:
data_interfaces[stream_id] = SpikeGLXNIDQInterface(folder_path=folder_path)
super().__init__(data_interfaces=data_interfaces, verbose=verbose)
[docs] def get_conversion_options_schema(self) -> dict:
conversion_options_schema = super().get_conversion_options_schema()
conversion_options_schema["properties"].update(
{name: interface.get_conversion_options_schema() for name, interface in self.data_interface_objects.items()}
)
return conversion_options_schema