Source code for postprocessor.chainer

#!/usr/bin/env jupyter

import re
import typing as t
from copy import copy

import pandas as pd

from agora.io.signal import Signal
from agora.utils.association import validate_association
from agora.utils.kymograph import bidirectional_retainment_filter
from postprocessor.core.abc import get_parameters, get_process
from postprocessor.core.lineageprocess import LineageProcessParameters


[docs]class Chainer(Signal): """ Extend Signal by applying post-processes and allowing composite signals that combine basic signals. It "chains" multiple processes upon fetching a dataset to produce the desired datasets. Instead of reading processes previously applied, it executes them when called. """ _synonyms = { "m5m": ("extraction/GFP/max/max5px", "extraction/GFP/max/median") }
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def replace_path(path: str, bgsub: bool = ""): # function to add bgsub to paths channel = path.split("/")[1] suffix = "_bgsub" if bgsub else "" path = re.sub(channel, f"{channel}{suffix}", path) return path # Add chain with and without bgsub for composite statistics self.common_chains = { alias + bgsub: lambda **kwargs: self.get( replace_path(denominator, alias + bgsub), **kwargs ) / self.get(replace_path(numerator, alias + bgsub), **kwargs) for alias, (denominator, numerator) in self._synonyms.items() for bgsub in ("", "_bgsub") }
[docs] def get( self, dataset: str, chain: t.Collection[str] = ("standard", "interpolate", "savgol"), in_minutes: bool = True, stages: bool = True, retain: t.Optional[float] = None, **kwargs, ): """Load data from an h5 file.""" if dataset in self.common_chains: # get dataset for composite chains data = self.common_chains[dataset](**kwargs) else: # use Signal's get_raw data = self.get_raw(dataset, in_minutes=in_minutes) if chain: data = self.apply_chain(data, chain, **kwargs) if retain: # keep data only from early time points data = self.get_retained(data, retain) # data = data.loc[data.notna().sum(axis=1) > data.shape[1] * retain] if stages and "stage" not in data.columns.names: # return stages as additional column level stages_index = [ x for i, (name, span) in enumerate(self.stages_span_tp) for x in (f"{i} { name }",) * span ] data.columns = pd.MultiIndex.from_tuples( zip(stages_index, data.columns), names=("stage", "time"), ) return data
[docs] def apply_chain( self, input_data: pd.DataFrame, chain: t.Tuple[str, ...], **kwargs ): """ Apply a series of processes to a data set. Like postprocessing, Chainer consecutively applies processes. Parameters can be passed as kwargs. Chainer does not support applying the same process multiple times with different parameters. Parameters ---------- input_data : pd.DataFrame Input data to process. chain : t.Tuple[str, ...] Tuple of strings with the names of the processes **kwargs : kwargs Arguments passed on to Process.as_function() method to modify the parameters. Examples -------- FIXME: Add docs. """ result = copy(input_data) self._intermediate_steps = [] for process in chain: if process == "standard": result = bidirectional_retainment_filter(result) else: params = kwargs.get(process, {}) process_cls = get_process(process) result = process_cls.as_function(result, **params) process_type = process_cls.__module__.split(".")[-2] if process_type == "reshapers": if process == "merger": raise (NotImplementedError) self._intermediate_steps.append(result) return result