Source code for static_frame.core.reduce

from __future__ import annotations

from itertools import repeat

import numpy as np
import typing_extensions as tp
from arraykit import resolve_dtype

from static_frame.core.frame import Frame
from static_frame.core.generic_aliases import TFrameAny
from static_frame.core.index_auto import IndexAutoFactory
from static_frame.core.index_auto import TIndexAutoFactory
from static_frame.core.index_base import IndexBase
from static_frame.core.node_selector import Interface
from static_frame.core.node_selector import InterfaceBatch
from static_frame.core.series import Series
from static_frame.core.type_blocks import TypeBlocks
from static_frame.core.util import DTYPE_OBJECT
from static_frame.core.util import EMPTY_ARRAY
from static_frame.core.util import FRAME_INITIALIZER_DEFAULT
from static_frame.core.util import NULL_SLICE
from static_frame.core.util import IterNodeType
from static_frame.core.util import TCallableAny
from static_frame.core.util import TILocSelectorOne
from static_frame.core.util import TIndexCtorSpecifier
from static_frame.core.util import TIndexInitializer
from static_frame.core.util import TLabel
from static_frame.core.util import TName
from static_frame.core.util import TUFunc
from static_frame.core.util import concat_resolved
from static_frame.core.util import iterable_to_array_1d
from static_frame.core.util import ufunc_dtype_to_dtype

if tp.TYPE_CHECKING:
    from static_frame.core.batch import Batch  # pylint: disable=W0611,C0412 #pragma: no cover

TNDArrayAny = np.ndarray[tp.Any, tp.Any]
TDtypeAny = np.dtype[tp.Any]

TFrameOrSeries = tp.Union[Frame, Series]
TFrameOrArray = tp.Union[Frame, TNDArrayAny]
TIterableFrameItems = tp.Iterable[tp.Tuple[TLabel, TFrameOrArray]]
TShape2D = tp.Tuple[int, int]

TListILocToFunc = tp.List[tp.Tuple[TILocSelectorOne, TUFunc]]
TListLabelToFunc = tp.List[tp.Tuple[TLabel, TUFunc]]

#-------------------------------------------------------------------------------
class Reduce:
    '''The `Reduce` interface exposes methods for applying functions to one or more `Frame`s that return a new `Frame`. The `Reduce` instance is configured via constructors on `ReduceDispatch`.
    '''

    _INTERFACE: tp.Tuple[str, ...] = (
        'keys',
        '__iter__',
        'items',
        'values',
        'to_frame',
        )

    def _prepare_items(self,
            axis: int,
            items: TIterableFrameItems,
            ) -> tp.Tuple[tp.Sequence[TLabel], tp.Sequence[TFrameOrArray], TShape2D]:
        raise NotImplementedError() # pragma: no cover

    def _get_iter(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Iterator[Series | TFrameAny | TNDArrayAny]:
        raise NotImplementedError() # pragma: no cover

    #---------------------------------------------------------------------------
    # dictionary-like interface
    _items: TIterableFrameItems
    _axis: int

    def keys(self) -> tp.Iterator[TLabel]:
        labels, _, _ = self._prepare_items(
                self._axis,
                self._items,
                )
        yield from labels

    def __iter__(self) -> tp.Iterator[TLabel]:
        yield from self.keys()

    def items(self) -> tp.Iterator[tp.Tuple[TLabel, Series | TFrameAny | TNDArrayAny]]:
        labels, components, shape = self._prepare_items(
                self._axis,
                self._items,
                )
        if components:
            sample = components[0]
        else: # an empty iterator
            sample = EMPTY_ARRAY

        return zip(labels, self._get_iter(
                components=components,
                labels=labels,
                shape=shape,
                sample=sample,
                is_array=sample.__class__ is np.ndarray,
                ))

    def values(self) -> tp.Iterator[Series | TFrameAny | TNDArrayAny]:
        yield from (v for _, v in self.items())

    def to_frame(self, *,
                index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
                columns: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
                index_constructor: TIndexCtorSpecifier = None,
                columns_constructor: TIndexCtorSpecifier = None,
                name: TName = None,
                consolidate_blocks: bool = False
            ) -> TFrameAny:
        '''
        Return a ``Frame`` after processing column reduction functions.
        '''
        raise NotImplementedError() # pragma: no cover
        # TODO: add `retain_labels`


class ReduceComponent(Reduce):
    '''`ReduceComponent` reduces by applying a function to the entire component (an array or `Frame`) and collecting the resulting `Series` or `Frame`. If an "items" iterator is used, the function will be supplied two arguments, the label and the component.
    '''

    __slots__ = (
        '_items',
        '_func',
        '_yield_type',
        '_axis',
        '_fill_value',
    )
    def __init__(self,
            items: TIterableFrameItems,
            func: TUFunc,
            yield_type: IterNodeType,
            axis: int = 1,
            fill_value: tp.Any = np.nan,
            ):
        '''
        Args:
            axis_labels: Index on the axis used to label reductions.
        '''
        self._items = items
        self._func = func
        self._yield_type = yield_type
        self._axis = axis
        self._fill_value = fill_value

    def _prepare_items(self,
            axis: int,
            items: TIterableFrameItems,
            ) -> tp.Tuple[tp.Sequence[TLabel], tp.Sequence[TFrameOrArray], TShape2D]:

        labels: tp.List[TLabel] = []
        components: tp.List[TFrameOrArray] = []
        for label, component in items:
            labels.append(label)
            components.append(component)
        return labels, components, (-1, -1)

    def _get_iter(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Iterator[Series | TFrameAny | TNDArrayAny]:
        '''
        Return an iterator of ``Series`` after processing column reduction functions.
        '''
        if self._axis == 1: # each component reduces to a row
            if self._yield_type == IterNodeType.VALUES:
                for f in components:
                    yield self._func(f)
            else:
                for label, f in zip(labels, components):
                    yield self._func(label, f)
        else:  # each component reduces to a column
            raise NotImplementedError() # pragma: no cover

    #---------------------------------------------------------------------------
    def to_frame(self, *,
            index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
            columns: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
            index_constructor: TIndexCtorSpecifier = None,
            columns_constructor: TIndexCtorSpecifier = None,
            name: TName = None,
            consolidate_blocks: bool = False
        ) -> TFrameAny:
        '''
        Return a ``Frame`` after processing column reduction functions.
        '''

        labels, components, _ = self._prepare_items(
                self._axis,
                self._items,
                )
        if components:
            sample = components[0]
        else:
            sample = EMPTY_ARRAY

        is_array = sample.__class__ is np.ndarray
        parts = self._get_iter(
                components=components,
                labels=labels,
                shape=(-1, -1),
                sample=sample,
                is_array=is_array,
                )
        if not is_array:
            return Frame.from_concat(
                    parts, # type: ignore
                    axis=0,
                    union=True,
                    index=index,
                    index_constructor=index_constructor,
                    columns=columns,
                    columns_constructor=columns_constructor,
                    name=name,
                    consolidate_blocks=consolidate_blocks,
                    fill_value=self._fill_value,
                    )
        part: tp.Iterable[TNDArrayAny] = list(parts) # type: ignore
        if not part:
            block = FRAME_INITIALIZER_DEFAULT
        else:
            block = concat_resolved(part, 0) # immutable
        if columns is None:
            columns = IndexAutoFactory
        if index is None:
            index = IndexAutoFactory
        return Frame(block, # type: ignore
                index=index,
                index_constructor=index_constructor,
                columns=columns,
                columns_constructor=columns_constructor,
                name=name,
                )



class ReduceAxis(Reduce):
    '''`ReduceAxis` reduces along an axis (i.e., columns) by applying one or more functions on each column to return a 1D Series (or array) for each component.
    '''
    __slots__ = (
            '_axis',
            '_items',
            '_axis_labels',
            '_axis_len',
            '_yield_type',
            )

    _INTERFACE: tp.Tuple[str, ...] = (
            '__iter__',
            'keys',
            'values',
            'items',
            'to_frame',
            )

    _items: TIterableFrameItems
    _axis_labels: IndexBase | tp.Sequence[TLabel] | None
    _axis: int
    _axis_len: int
    _yield_type: IterNodeType

    @staticmethod
    def _derive_row_dtype_array(
            sample: TNDArrayAny,
            iloc_to_func: TListILocToFunc,
            ) -> TDtypeAny | None:
        dt_src = sample.dtype # an array
        dtype: TDtypeAny | None = None
        for _, func in iloc_to_func:
            if not (dt := ufunc_dtype_to_dtype(func, dt_src)):
                return None
            if dtype is None:
                dtype = dt
            dtype = resolve_dtype(dtype, dt)
            if dtype == DTYPE_OBJECT:
                return dtype
        return dtype

    @staticmethod
    def _derive_row_dtype_frame(
            sample: Frame,
            iloc_to_func: TListILocToFunc,
            ) -> TDtypeAny | None:
        dt_src = sample._blocks.dtypes # an array
        dtype = None
        for iloc, func in iloc_to_func:
            if not (dt := ufunc_dtype_to_dtype(func, dt_src[iloc])):
                return None
            if dtype is None:
                dtype = dt
            dtype = resolve_dtype(dtype, dt)
            if dtype == DTYPE_OBJECT:
                return dtype
        return dtype

    def _get_blocks(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Sequence[TNDArrayAny]:
        raise NotImplementedError() # pragma: no cover

    def _prepare_items(self,
            axis: int,
            items: TIterableFrameItems,
            ) -> tp.Tuple[tp.Sequence[TLabel], tp.Sequence[TFrameOrArray], TShape2D]:

        labels: tp.List[TLabel] = []
        components: tp.List[TFrameOrArray] = []
        for label, component in items:
            labels.append(label)
            # NOTE: could assert uniformity of shape / labels here
            components.append(component)

        shape = (len(labels), self._axis_len) # axis == 1
        # shape = (self._axis_len, len(labels))

        return labels, components, shape

    #---------------------------------------------------------------------------
    def to_frame(self, *,
            index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
            columns: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
            index_constructor: TIndexCtorSpecifier = None,
            columns_constructor: TIndexCtorSpecifier = None,
            name: TName = None,
            consolidate_blocks: bool = False
        ) -> TFrameAny:
        '''
        Return a ``Frame`` after processing column reduction functions.
        '''
        labels, components, shape = self._prepare_items(
                self._axis,
                self._items,
                )
        if components:
            sample = components[0]
        else: # return a zero-row Frame
            raise NotImplementedError() # pragma: no cover

        is_array = sample.__class__ is np.ndarray
        blocks = self._get_blocks(components, labels, shape, sample, is_array)

        own_columns = False
        if columns is None:
            if isinstance(self._axis_labels, IndexBase):
                # NOTE: this is implicitly only ReduceAligned
                columns = self._axis_labels[[pair[0] for pair in self._iloc_to_func]] # type: ignore
                own_columns = True
            else:
                columns = self._axis_labels
                own_columns = False

        # implement consolidate_blocks
        tb = TypeBlocks.from_blocks(blocks)
        if consolidate_blocks:
            tb = tb.consolidate()

        if self._axis == 1:
            if index is None:
                index = labels

        return Frame(tb,
                index=index,
                columns=columns,
                own_columns=own_columns,
                name=name,
                own_data=True,
                index_constructor=index_constructor,
                columns_constructor=columns_constructor,
                )


class ReduceAligned(ReduceAxis):
    '''Utilities for Reducing a `Frame` (or many `Frame`) by applying functions to columns.
    '''
    # Axis 1 will reduce components into rows (labels are the index, ilocs refer to column positions); axis 0 will reduce components into columns (labels are the column labels, ilocs refer to index positions).

    __slots__ = (
            '_iloc_to_func',
            )

    def __init__(self,
            items: TIterableFrameItems,
            iloc_to_func: TListILocToFunc,
            axis_labels: IndexBase | tp.Sequence[TLabel],
            yield_type: IterNodeType,
            axis: int = 1,
            /,
            ):
        '''
        Args:
            axis_labels: Index on the axis used to label reductions.
        '''
        self._items = items
        self._iloc_to_func = iloc_to_func
        self._axis_labels = axis_labels
        self._yield_type = yield_type
        self._axis = axis
        self._axis_len = len(self._iloc_to_func)

    def _get_blocks(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Sequence[TNDArrayAny]:

        blocks: tp.List[TNDArrayAny] = []
        v: TNDArrayAny | tp.List[tp.Any]

        if is_array:
            dtype = sample.dtype # type: ignore
            if self._axis == 1: # each component reduces to a row
                size = shape[0]
                for iloc, func in self._iloc_to_func:
                    post_dt = ufunc_dtype_to_dtype(func, dtype)
                    if post_dt is not None:
                        v = np.empty(size, dtype=post_dt)
                    else:
                        v = [None] * size
                    if self._yield_type == IterNodeType.VALUES:
                        for i, array in enumerate(components):
                            v[i] = func(array[NULL_SLICE, iloc])
                    else:
                        for i, (label, array) in enumerate(zip(labels, components)):
                            v[i] = func(label, array[NULL_SLICE, iloc])

                    if v.__class__ is not np.ndarray:
                        v, _ = iterable_to_array_1d(v, count=size)
                    v.flags.writeable = False # type: ignore
                    blocks.append(v) # type: ignore
            else:  # each component reduces to a column
                raise NotImplementedError() # pragma: no cover

        else: # component is a Frame
            dtypes = sample._blocks.dtypes # type: ignore
            if self._axis == 1:
                # each component reduces to a row
                size = shape[0]
                for iloc, func in self._iloc_to_func:
                    post_dt = ufunc_dtype_to_dtype(func, dtypes[iloc])
                    if post_dt is not None:
                        v = np.empty(size, dtype=post_dt)
                    else:
                        v = [None] * size
                    if self._yield_type == IterNodeType.VALUES:
                        for i, frame in enumerate(components):
                            v[i] = func(frame._blocks._extract_array_column(iloc)) # type: ignore
                    else:
                        for i, (label, frame) in enumerate(zip(labels, components)):
                            v[i] = func(label, frame._blocks._extract_array_column(iloc)) # type: ignore
                    if v.__class__ is not np.ndarray:
                        v, _ = iterable_to_array_1d(v, count=size)
                    v.flags.writeable = False # type: ignore
                    blocks.append(v) # type: ignore
            else: # each component reduces to a column
                raise NotImplementedError() # pragma: no cover
        return blocks

    def _get_iter(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Iterator[Series | TFrameAny | TNDArrayAny]:
        '''
        Return an iterator of ``Series`` after processing column reduction functions.
        '''
        index: IndexBase | tp.Sequence[TLabel]

        if isinstance(self._axis_labels, IndexBase):
            # TODO: handle static
            index = self._axis_labels[[pair[0] for pair in self._iloc_to_func]] # pyright: ignore
            own_index = True
        elif self._axis_labels is not None:
            # a sequence of labels to be used
            index = self._axis_labels
            own_index = False
        else:
            raise NotImplementedError() # pragma: no cover

        # We are yielding rows that result from each columnar function application; using the dtype of sample, the dtype expected from func, across all funcs, we can determine the resultant array dtype and not use a list, below
        assert self._axis == 1
        v: TNDArrayAny | tp.List[tp.Any]
        size = shape[1]

        if is_array:
            if self._yield_type == IterNodeType.VALUES:
                # this only works if IterNodeType.VALUES, as we cannot identify the function of it takes a pair
                dtype = self._derive_row_dtype_array(sample, self._iloc_to_func) # type: ignore
                if dtype is not None:
                    for array in components:
                        v = np.empty(size, dtype=dtype)
                        for i, (iloc, func) in enumerate(self._iloc_to_func):
                            v[i] = func(array[NULL_SLICE, iloc])
                        v.flags.writeable = False
                        yield v
                else:
                    for array in components:
                        v = [None] * size
                        for i, (iloc, func) in enumerate(self._iloc_to_func):
                            v[i] = func(array[NULL_SLICE, iloc])
                        v, _ = iterable_to_array_1d(v, count=size)
                        yield v
            else: # items
                for label, array in zip(labels, components):
                    v = [None] * size
                    for i, (iloc, func) in enumerate(self._iloc_to_func):
                        v[i] = func(label, array[NULL_SLICE, iloc])
                    v, _ = iterable_to_array_1d(v, count=size)
                    yield v

        else: # component is a Frame
            if self._yield_type == IterNodeType.VALUES:
                dtype = self._derive_row_dtype_frame(sample, self._iloc_to_func) # type: ignore
                if dtype is not None:
                    for label, f in zip(labels, components):
                        v = np.empty(size, dtype=dtype)
                        for i, (iloc, func) in enumerate(self._iloc_to_func):
                            v[i] = func(f._extract(NULL_SLICE, iloc)) # type: ignore
                        v.flags.writeable = False
                        yield Series(v, index=index, name=label, own_index=own_index)
                else:
                    for label, f in zip(labels, components):
                        v = [None] * size
                        for i, (iloc, func) in enumerate(self._iloc_to_func):
                            v[i] = func(f._extract(NULL_SLICE, iloc)) # type: ignore
                        v, _ = iterable_to_array_1d(v, count=size)
                        yield Series(v, index=index, name=label, own_index=own_index)
            else: # items
                for label, f in zip(labels, components):
                    v = [None] * size
                    for i, (iloc, func) in enumerate(self._iloc_to_func):
                        v[i] = func(label, f._extract(NULL_SLICE, iloc)) # type: ignore
                    v, _ = iterable_to_array_1d(v, count=size)
                    yield Series(v, index=index, name=label, own_index=own_index)


class ReduceUnaligned(ReduceAxis):
    '''Utilities for Reducing a `Frame` (or many `Frame`) by applying functions to columns.
    '''
    __slots__ = (
            '_loc_to_func',
            '_fill_value',
            )

    def __init__(self,
            items: TIterableFrameItems,
            loc_to_func: TListLabelToFunc,
            axis_labels: tp.Sequence[TLabel] | None,
            yield_type: IterNodeType,
            axis: int = 1,
            fill_value: tp.Any = np.nan,
            /,
            ):
        '''
        Args:
            axis_labels: Index on the axis used to label reductions.
        '''
        self._items = items
        self._loc_to_func = loc_to_func
        self._axis_labels = axis_labels
        self._yield_type = yield_type
        self._axis = axis
        self._axis_len = len(self._loc_to_func)
        self._fill_value = fill_value

    def _get_blocks(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Sequence[TNDArrayAny]:

        assert not is_array # arrays cannot be supported for unaligned reduce

        blocks: tp.List[TNDArrayAny] = []
        v: TNDArrayAny | tp.List[tp.Any]
        assert self._axis == 1 # each component reduces to a row
        size = shape[0]

        for loc, func in self._loc_to_func:
            # NOTE: we cannot easily predict array type as the sample may not be representative
            v = [None] * size
            if self._yield_type == IterNodeType.VALUES:
                for i, frame in enumerate(components):
                    try:
                        iloc = frame.columns.loc_to_iloc(loc) # type: ignore
                    except KeyError:
                        iloc = -1
                    if iloc >= 0:
                        v[i] = func(frame._blocks._extract_array_column(iloc)) # type: ignore
                    else:
                        v[i] = self._fill_value
            else:
                for i, (label, frame) in enumerate(zip(labels, components)):
                    try:
                        iloc = frame.columns.loc_to_iloc(loc) # type: ignore
                    except KeyError:
                        iloc = -1
                    if iloc >= 0:
                        v[i] = func(label, frame._blocks._extract_array_column(iloc)) # type: ignore
                    else:
                        v[i] = self._fill_value

            v, _ = iterable_to_array_1d(v, count=size)
            v.flags.writeable = False
            blocks.append(v)

        return blocks

    def _get_iter(self,
            components: tp.Sequence[TFrameOrArray],
            labels: tp.Sequence[TLabel],
            shape: TShape2D,
            sample: TFrameOrArray,
            is_array: bool,
            ) -> tp.Iterator[Series | TFrameAny | TNDArrayAny]:
        '''
        Return an iterator of ``Series`` after processing column reduction functions.
        '''
        assert not is_array # arrays cannot be supported for unaligned reduce
        assert self._axis == 1 # each component reduces to a row
        v: TNDArrayAny | tp.List[tp.Any]
        size = shape[1]
        fv = self._fill_value
        # NOTE: we cannot easily predict array type as we do not have a representative sample of the contained frame
        for label, f in zip(labels, components):
            v = [None] * size
            if self._yield_type == IterNodeType.VALUES:
                for i, (loc, func) in enumerate(self._loc_to_func):
                    try:
                        iloc = f.columns.loc_to_iloc(loc) # type: ignore
                    except KeyError:
                        iloc = -1
                    if iloc >= 0:
                        v[i] = func(f._extract(NULL_SLICE, iloc)) # type: ignore
                    else:
                        v[i] = fv
            else:
                for i, (loc, func) in enumerate(self._loc_to_func):
                    try:
                        iloc = f.columns.loc_to_iloc(loc) # type: ignore
                    except KeyError:
                        iloc = -1
                    if iloc >= 0:
                        v[i] = func(label, f._extract(NULL_SLICE, iloc)) # type: ignore
                    else:
                        v[i] = fv

            v, _ = iterable_to_array_1d(v, count=size)
            yield Series(v, index=self._axis_labels, name=label)

#-------------------------------------------------------------------------------

INTERFACE_REDUCE_DISPATCH: tp.Tuple[str, ...] = (
        'from_func',
        'from_map_func',
        'from_label_map',
        'from_label_pair_map',
        )

class ReduceDispatch(Interface):
    '''Interface for exposing `Reduce` constructors.
    '''

    __slots__ = (
        '_items',
        '_yield_type',
        '_axis',
        )

    CLS_DELEGATE = Reduce
    _INTERFACE = INTERFACE_REDUCE_DISPATCH

    _items: TIterableFrameItems
    _yield_type: IterNodeType
    _axis: int

[docs] def from_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> ReduceComponent: '''For each `Frame`, and given a function `func` that returns either a `Series` or a `Frame`, call that function on each `Frame`. ''' return ReduceComponent(self._items, func, yield_type=self._yield_type, axis=self._axis, fill_value=fill_value, )
[docs] def from_map_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> Reduce: raise NotImplementedError() # pragma: no cover
[docs] def from_label_map(self, func_map: tp.Mapping[TLabel, TUFunc], *, fill_value: tp.Any = np.nan, ) -> Reduce: raise NotImplementedError() # pragma: no cover
[docs] def from_label_pair_map(self, func_map: tp.Mapping[tp.Tuple[TLabel, TLabel], TUFunc], *, fill_value: tp.Any = np.nan, ) -> Reduce: raise NotImplementedError() # pragma: no cover
class ReduceDispatchAligned(ReduceDispatch): '''Interface for creating reductions from uniform collections of Frames. ''' __slots__ = ( '_axis_labels', ) def __init__(self, items: TIterableFrameItems, axis_labels: IndexBase, # always an index *, yield_type: IterNodeType, axis: int = 1, ) -> None: ''' Args: axis_labels: Index on the axis used to label reductions. ''' self._items = items self._axis_labels = axis_labels self._yield_type = yield_type self._axis = axis
[docs] def from_map_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> ReduceAligned: ''' For each `Frame`, reduce by applying, for each column, a function that reduces to (0-dimensional) elements, where the column label and function are given as a mapping. Column labels are retained. ''' iloc_to_func: TListILocToFunc = list(zip( range(len(self._axis_labels)), repeat(func), )) return ReduceAligned(self._items, iloc_to_func, self._axis_labels, self._yield_type, self._axis, )
[docs] def from_label_map(self, func_map: tp.Mapping[TLabel, TUFunc], *, fill_value: tp.Any = np.nan, ) -> ReduceAligned: ''' For `Frame`, reduce by applying a function to each column, where the column label and function are given as a mapping. Column labels are retained. Args: func_map: a mapping of column labels to functions. ''' loc_to_iloc = self._axis_labels.loc_to_iloc iloc_to_func: TListILocToFunc = list( (loc_to_iloc(label), func) for label, func in func_map.items()) return ReduceAligned(self._items, iloc_to_func, self._axis_labels, self._yield_type, self._axis, )
[docs] def from_label_pair_map(self, func_map: tp.Mapping[tp.Tuple[TLabel, TLabel], TUFunc], *, fill_value: tp.Any = np.nan, ) -> ReduceAligned: ''' For `Frame`, reduce by applying a function to a column and assigning the result a new label. Functions are provided as values in a mapping, where the key is tuple of source label, destination label. Args: func_map: a mapping of pairs of source label, destination label, to a function. ''' loc_to_iloc = self._axis_labels.loc_to_iloc iloc_to_func: TListILocToFunc = [] axis_labels = [] for (iloc, label), func in func_map.items(): axis_labels.append(label) iloc_to_func.append((loc_to_iloc(iloc), func)) # NOTE: ignore self._axis_labels return ReduceAligned(self._items, iloc_to_func, axis_labels, self._yield_type, self._axis, )
#------------------------------------------------------------------------------- class ReduceDispatchUnaligned(ReduceDispatch): '''Delegate interface for creating reductions from uniform collections of Frames. ''' _INTERFACE = INTERFACE_REDUCE_DISPATCH def __init__(self, items: TIterableFrameItems, *, axis: int = 1, yield_type: IterNodeType, ) -> None: ''' Args: axis_labels: Index on the axis used to label reductions. ''' self._items = items self._axis = axis self._yield_type = yield_type def from_map_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> ReduceComponent: def func_derived(f: Frame) -> Series: # get a ReduceDispatchAligned return next(iter(f.reduce.from_map_func(func).values())) # type: ignore return ReduceComponent(self._items, func_derived, # type: ignore yield_type=self._yield_type, axis=self._axis, fill_value=fill_value, ) def from_label_map(self, func_map: tp.Mapping[TLabel, TUFunc], *, fill_value: tp.Any = np.nan, ) -> ReduceUnaligned: ''' For `Frame`, reduce by applying a function to each column, where the column label and function are given as a mapping. Column labels are retained. Args: func_map: a mapping of column labels to functions. ''' loc_to_func: TListLabelToFunc = [] axis_labels: tp.List[TLabel] = [] for pair in func_map.items(): axis_labels.append(pair[0]) loc_to_func.append(pair) return ReduceUnaligned(self._items, loc_to_func, axis_labels, self._yield_type, self._axis, fill_value, ) def from_label_pair_map(self, func_map: tp.Mapping[tp.Tuple[TLabel, TLabel], TUFunc], *, fill_value: tp.Any = np.nan, ) -> ReduceUnaligned: ''' For `Frame`, reduce by applying a function to a column and assigning the result a new label. Functions are provided as values in a mapping, where the key is tuple of source label, destination label. Args: func_map: a mapping of pairs of source label, destination label, to a function. ''' loc_to_func: TListLabelToFunc = [] axis_labels = [] for (loc, label), func in func_map.items(): axis_labels.append(label) loc_to_func.append((loc, func)) # NOTE: ignore self._axis_labels return ReduceUnaligned(self._items, loc_to_func, axis_labels, self._yield_type, self._axis, fill_value, ) #------------------------------------------------------------------------------- class InterfaceBatchReduceDispatch(InterfaceBatch): '''Alternate string interface specialized for the :obj:`Batch`. ''' __slots__ = ( '_batch_apply', ) _INTERFACE = INTERFACE_REDUCE_DISPATCH def __init__(self, batch_apply: tp.Callable[[TCallableAny], 'Batch'], ) -> None: self._batch_apply = batch_apply #---------------------------------------------------------------------------
[docs] def from_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._batch_apply(lambda f: f.reduce.from_func( func, fill_value=fill_value, ).to_frame())
[docs] def from_map_func(self, func: TUFunc, *, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._batch_apply(lambda f: f.reduce.from_map_func( func, fill_value=fill_value, ).to_frame())
[docs] def from_label_map(self, func_map: tp.Mapping[TLabel, TUFunc], *, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._batch_apply(lambda f: f.reduce.from_label_map( func_map, fill_value=fill_value, ).to_frame())
[docs] def from_label_pair_map(self, func_map: tp.Mapping[tp.Tuple[TLabel, TLabel], TUFunc], *, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._batch_apply(lambda f: f.reduce.from_label_pair_map( func_map, fill_value=fill_value, ).to_frame())