Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
7a6a288
refactor: remove temp dataset
arjunsridhar12345 Jun 1, 2026
24e6b17
feat: add acquisition processor module
arjunsridhar12345 Jun 1, 2026
d1a01ea
refactor: use import from data contract instead of temp file
arjunsridhar12345 Jun 1, 2026
99c6adf
test: add tests for acquisition builder
arjunsridhar12345 Jun 1, 2026
9d6656d
feat: add acquisition builder example notebook on how to use
arjunsridhar12345 Jun 1, 2026
f5b7f6f
refactor: simplify description of unit
arjunsridhar12345 Jun 1, 2026
fcf311d
fix: rename to match names in current NWB file
arjunsridhar12345 Jun 2, 2026
282b726
test: update failing tests
arjunsridhar12345 Jun 2, 2026
0fedb01
refactor: remove nwb from name to try to make it more clear
arjunsridhar12345 Jun 2, 2026
8d246a2
test: update tests
arjunsridhar12345 Jun 2, 2026
f2645b0
refactor: remove intermediate acquisition object and all references
arjunsridhar12345 Jun 4, 2026
43905fc
feat: add nwb acquisition builder
arjunsridhar12345 Jun 4, 2026
8c21d20
test: update tests
arjunsridhar12345 Jun 4, 2026
eb846ed
feat: start adding logic for getting reward info to acquisition
arjunsridhar12345 Jun 4, 2026
f8ffdbc
test: add tests
arjunsridhar12345 Jun 4, 2026
e96da5b
Merge remote-tracking branch 'origin/dev' into 7-create-nwb-acquisiti…
arjunsridhar12345 Jun 24, 2026
7e1f346
feat: try to add manual rewards
arjunsridhar12345 Jun 24, 2026
bceaeb8
test: update tests
arjunsridhar12345 Jun 24, 2026
cfb73ca
feat: add lick times to acquisition
arjunsridhar12345 Jun 24, 2026
229166b
refactor: add more comments to try to clarify logic
arjunsridhar12345 Jun 25, 2026
ca082a8
Merge remote-tracking branch 'origin/dev' into 7-create-nwb-acquisiti…
arjunsridhar12345 Jun 25, 2026
9b34d15
refactor: disable getting all streams for now
arjunsridhar12345 Jun 25, 2026
7df8226
fic: disable more stuff for now
arjunsridhar12345 Jun 25, 2026
1d5ad6b
build: update lock file
arjunsridhar12345 Jun 30, 2026
38fecfc
refactor: PR feedback. add stream name to licks and simplify manual r…
arjunsridhar12345 Jun 30, 2026
d3498fe
test: update tests
arjunsridhar12345 Jun 30, 2026
f3e4fc3
build: upgrade lock
arjunsridhar12345 Jun 30, 2026
91df8e9
refactor: try to make flexible lick series generation
arjunsridhar12345 Jun 30, 2026
a3e222a
test: update tests
arjunsridhar12345 Jun 30, 2026
5617914
refactor: minor update to passing in lick data
arjunsridhar12345 Jun 30, 2026
351e980
build: update dependecies
arjunsridhar12345 Jun 30, 2026
5f7d7b2
feat: add utils to make data types nwb compatible
arjunsridhar12345 Jun 30, 2026
f53c6d0
test: update tests
arjunsridhar12345 Jun 30, 2026
23006d1
chore: lint
arjunsridhar12345 Jun 30, 2026
7038493
fix: adapt to clean dict for nwb
arjunsridhar12345 Jul 1, 2026
9360d7e
fix: allow for none type descriptions
arjunsridhar12345 Jul 1, 2026
1176632
fix: convert pydantic model to nwb com
arjunsridhar12345 Jul 1, 2026
81c759a
test: fix interrogate
arjunsridhar12345 Jul 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ readme = "README.md"
version = "0.0.0"

dependencies = [
"aind-behavior-dynamic-foraging[data] @ git+https://github.com/AllenNeuralDynamics/Aind.Behavior.DynamicForaging.git@v0.0.2",
"aind-behavior-dynamic-foraging[data] @ git+https://github.com/AllenNeuralDynamics/Aind.Behavior.DynamicForaging.git@f517d14ff965763a8f713b8a1be5dfb26b9312e1",
"ipykernel",
]

Expand Down
13 changes: 13 additions & 0 deletions src/dynamic_foraging_processing/nwb/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""NWB modules for dynamic foraging datasets."""

from dynamic_foraging_processing.nwb.acquisition import (
AcquisitionBuilder,
AcquisitionSeries,
AcquisitionTable,
)

__all__ = [
"AcquisitionBuilder",
"AcquisitionSeries",
"AcquisitionTable",
]
13 changes: 13 additions & 0 deletions src/dynamic_foraging_processing/nwb/acquisition/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""NWB acquisition models."""

from dynamic_foraging_processing.nwb.acquisition.acquisition_builder import AcquisitionBuilder
from dynamic_foraging_processing.nwb.acquisition.models import (
AcquisitionSeries,
AcquisitionTable,
)

__all__ = [
"AcquisitionBuilder",
"AcquisitionSeries",
"AcquisitionTable",
]
309 changes: 309 additions & 0 deletions src/dynamic_foraging_processing/nwb/acquisition/acquisition_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
"""Acquisition builder for NWB acquisition module."""

import typing as t

import numpy as np
import pandas as pd

from dynamic_foraging_processing.nwb.acquisition.models import (
AcquisitionSeries,
AcquisitionTable,
)
from dynamic_foraging_processing.nwb.utils import clean_dataframe_for_nwb
from dynamic_foraging_processing.raw_data_loader import RawDataLoader
from dynamic_foraging_processing.utils.rewards import get_annotated_rewards


class LickSource(t.NamedTuple):
"""Location of one lick port's signal in the raw dataset.

Attributes
----------
device : str
The Harp device node under ``Behavior`` (``"HarpBehavior"`` for the
standard behavior board, ``"HarpLickometerLeft"`` /
``"HarpLickometerRight"`` for the lickometer board).
stream : str
The digital-input stream on that device (``"DigitalInputState"`` for
the behavior board, ``"LickState"`` for the lickometer board).
port : str
The column (``"DIPort0"`` / ``"DIPort1"``
for the behavior board, ``"Channel0"`` for the lickometer board).
"""

device: str
stream: str
port: str


class AcquisitionBuilder:
"""Builds the NWB acquisition module from raw dynamic foraging data."""

def __init__(self, loader: RawDataLoader):
"""Initialize the acquisition builder.

Parameters
----------
loader : RawDataLoader
Loader providing access to the dynamic foraging dataset.
"""
self.loader = loader

def get_reward_delivery(self) -> pd.DataFrame:
"""Get the reward delivery stream from the dataset.

Returns
-------
pandas.DataFrame
``WRITE`` messages from the loaded ``OutputSet`` stream under
``Behavior/HarpBehavior``.
"""
data = self.loader.dataset.at("Behavior").at("HarpBehavior").at("OutputSet").load().data
data_write_messages = data[data["MessageType"] == "WRITE"]

return data_write_messages

def get_trial_outcomes(self) -> pd.DataFrame:
"""Get the ``TrialOutcome`` software-event stream.

Returns
-------
pandas.DataFrame
The ``TrialOutcome`` stream under ``Behavior/SoftwareEvents``,
indexed by trial timestamp with a ``data`` payload column.
"""
return (
self.loader.dataset.at("Behavior").at("SoftwareEvents").at("TrialOutcome").load().data
)

def get_manual_water_times(self) -> pd.DataFrame:
"""Get the manual-water software-event stream.

Returns
-------
pandas.DataFrame
The ``GiveManualWaterRight`` stream under ``Behavior/SoftwareEvents``,
indexed by event timestamp with a ``data`` column that is ``True``
for right-port manual water and ``False`` for left-port manual water.
An empty frame (with a ``data`` column) is returned when the stream
is absent.
"""
try:
return (
self.loader.dataset.at("Behavior")
.at("SoftwareEvents")
.at("GiveManualWaterRight")
.load()
.data
)
except (KeyError, FileNotFoundError):
return pd.DataFrame({"data": []})

def get_lick_times(self, device: str, stream_name: str, port: str) -> np.ndarray:
"""Get the lick times for one lick port from a Harp digital-input stream.

On the standard behavior board licks are read from
``HarpBehavior``/``DigitalInputState``, with left licks on ``DIPort0``
and right licks on ``DIPort1``. The lickometer board exposes each side
as its own device (``HarpLickometerLeft`` / ``HarpLickometerRight``)
with a ``LickState`` stream and a ``Channel0`` column. A lick time is a
timestamp at which the selected column's digital input is high.

Parameters
----------
device : str
The Harp device node under ``Behavior`` (e.g. ``"HarpBehavior"`` for
the standard behavior board).
stream_name : str
The digital-input stream to read licks from (e.g.
``"DigitalInputState"`` for the standard behavior board).
port : str
The column to read licks from (``"DIPort1"`` for the right lick
port, ``"DIPort0"`` for the left lick port on the behavior board).

Returns
-------
numpy.ndarray
Sorted lick timestamps for the requested port, or an empty array
when the stream is absent.
"""
try:
data = self.loader.dataset.at("Behavior").at(device).at(stream_name).load().data
except (KeyError, FileNotFoundError):
return np.array([])
licks = data[data[port].fillna(False).astype(bool)]
return licks.index.to_numpy()

def _lick_time_series(
self, *, source: LickSource, name: str, side_label: str
) -> AcquisitionSeries:
"""Build one lick port's lick-time series from a Harp digital-input stream.

Parameters
----------
source : LickSource
The device, stream, and column locating this lick port's signal
(e.g. ``LickSource("HarpBehavior", "DigitalInputState", "DIPort0")``
for the standard behavior board's left port).
name : str
Acquisition series name.
side_label : str
Human-readable side label used in the description.

Returns
-------
AcquisitionSeries
The lick-time series for this lick port. The ``data`` array marks
each timestamp as a detected lick (``True``).
"""
lick_times = self.get_lick_times(source.device, source.stream, source.port)
return AcquisitionSeries(
name=name,
data=np.ones(lick_times.shape[0], dtype=bool),
timestamps=lick_times,
unit="second",
description=(
f"The lick times of the {side_label} lick port ({source.port} on {source.device})."
),
)

def _reward_delivery_series(
self,
writes: pd.DataFrame,
trial_outcomes: pd.DataFrame,
manual_water: pd.DataFrame,
*,
port_column: str,
is_right: bool,
name: str,
side_label: str,
) -> AcquisitionSeries:
"""Build one lick port's reward-delivery series with reward annotations.

Only valve-open events (``port_column`` is truthy) are reward
deliveries; the ``data`` field annotates each as earned, manual, or
automatic via :func:`get_annotated_rewards`.

Parameters
----------
writes : pandas.DataFrame
``OutputSet`` ``WRITE`` messages indexed by timestamp.
trial_outcomes : pandas.DataFrame
The ``TrialOutcome`` stream, indexed by trial timestamp.
manual_water : pandas.DataFrame
The ``GiveManualWaterRight`` stream; the ``data`` column selects the
side (``True`` right, ``False`` left).
port_column : str
Supply-port column for this side (``"SupplyPort0"`` left,
``"SupplyPort1"`` right).
is_right : bool
``True`` for the right lick port, ``False`` for the left.
name : str
Acquisition series name.
side_label : str
Human-readable side label used in the description.

Returns
-------
AcquisitionSeries
The reward-delivery series for this lick port.
"""
open_writes = writes[writes[port_column].fillna(False).astype(bool)]
delivery_times = open_writes.index.to_numpy()
manual_water_times = manual_water.index[manual_water["data"] == is_right].to_numpy()
annotations = get_annotated_rewards(
delivery_times,
trial_outcomes,
manual_water_times,
)
return AcquisitionSeries(
name=name,
data=annotations,
timestamps=delivery_times,
unit="second",
description=(
f"The reward delivery time of the {side_label} lick port. The data field "
"annotates whether the reward was earned, manual, or automatic"
),
)

def build_acquisition(
self,
left_lick: LickSource = LickSource("HarpBehavior", "DigitalInputState", "DIPort0"),
right_lick: LickSource = LickSource("HarpBehavior", "DigitalInputState", "DIPort1"),
) -> t.List[t.Union[AcquisitionSeries, AcquisitionTable]]:
"""Build the NWB acquisition entries.

Parameters
----------
left_lick, right_lick : LickSource, optional
Where to read each side's lick times. Defaults to the standard
behavior board (``HarpBehavior``/``DigitalInputState`` on
``DIPort0``/``DIPort1``). For the lickometer board pass, e.g.,
``LickSource("HarpLickometerLeft", "LickState", "Channel0")`` and
``LickSource("HarpLickometerRight", "LickState", "Channel0")``.

Returns
-------
list of AcquisitionSeries or AcquisitionTable
Acquisition entries to write to the NWB acquisition module.
"""
rewards = self.get_reward_delivery()
trial_outcomes = self.get_trial_outcomes()
manual_water = self.get_manual_water_times()

acquisition_streams = self.loader.get_all_raw_data()
acqusition_streams_descriptions = self.loader.raw_data_stream_descriptions

acquisiton_entries: t.List[t.Union[AcquisitionSeries, AcquisitionTable]] = []

for stream_name, stream_data in acquisition_streams.items():
description = acqusition_streams_descriptions.get(stream_name)
if description is None:
description = ""
acquisiton_entries.append(
AcquisitionTable(
name=stream_name,
data=clean_dataframe_for_nwb(stream_data),
description=description,
)
)

acquisiton_entries.append(
self._reward_delivery_series(
rewards,
trial_outcomes,
manual_water,
port_column="SupplyPort0",
is_right=False,
name="left_reward_delivery_time",
side_label="left",
)
)
acquisiton_entries.append(
self._reward_delivery_series(
rewards,
trial_outcomes,
manual_water,
port_column="SupplyPort1",
is_right=True,
name="right_reward_delivery_time",
side_label="right",
)
)
acquisiton_entries.append(
self._lick_time_series(
source=left_lick,
name="left_lick_time",
side_label="left",
)
)
acquisiton_entries.append(
self._lick_time_series(
source=right_lick,
name="right_lick_time",
side_label="right",
)
)

return acquisiton_entries
Loading
Loading