Source code for static_frame.core.node_fill_value

from __future__ import annotations

import numpy as np
import typing_extensions as tp

from static_frame.core.node_selector import (
    Interface,
    InterfaceBatch,
    InterGetItemLocReduces,
)
from static_frame.core.util import (
    KEY_MULTIPLE_TYPES,
    NULL_SLICE,
    OPERATORS,
    TCallableAny,
    TLabel,
    TLocSelector,
    TLocSelectorCompound,
)

if tp.TYPE_CHECKING:
    from static_frame.core.batch import Batch
    from static_frame.core.frame import Frame
    from static_frame.core.hloc import HLoc
    from static_frame.core.index import ILoc
    from static_frame.core.index_base import IndexBase
    from static_frame.core.node_selector import TFrameOrSeries
    from static_frame.core.node_transpose import (
        InterfaceBatchTranspose,
        InterfaceTranspose,
    )
    from static_frame.core.series import Series

    TSeriesAny = Series[tp.Any, tp.Any]
    TFrameAny = Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]]

TVContainer_co = tp.TypeVar(
    'TVContainer_co',
    'Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]]',
    'Series[tp.Any, tp.Any]',
    covariant=True,
)
INTERFACE_FILL_VALUE = (
    'loc',
    '__getitem__',
    'via_T',
    '__add__',
    '__sub__',
    '__mul__',
    # '__matmul__',
    '__truediv__',
    '__floordiv__',
    '__mod__',
    '__pow__',
    '__lshift__',
    '__rshift__',
    '__and__',
    '__xor__',
    '__or__',
    '__lt__',
    '__le__',
    '__eq__',
    '__ne__',
    '__gt__',
    '__ge__',
    '__radd__',
    '__rsub__',
    '__rmul__',
    # '__rmatmul__',
    '__rtruediv__',
    '__rfloordiv__',
)


class InterfaceFillValue(Interface, tp.Generic[TVContainer_co]):
    __slots__ = (
        '_container',
        '_fill_value',
        '_axis',
    )

    _INTERFACE = INTERFACE_FILL_VALUE

    def __init__(
        self,
        container: TVContainer_co,
        *,
        fill_value: tp.Any = np.nan,
        axis: int = 0,
    ) -> None:
        self._container: TVContainer_co = container
        self._fill_value = fill_value
        self._axis = axis

    # ---------------------------------------------------------------------------
    @property
    def via_T(self) -> InterfaceTranspose[TFrameAny]:
        """
        Interface for using binary operators with one-dimensional sequences, where the opperand is applied column-wise.
        """
        from static_frame.core.frame import Frame
        from static_frame.core.node_transpose import InterfaceTranspose

        if not isinstance(self._container, Frame):
            raise NotImplementedError('via_T functionality only available on Frame')
        return InterfaceTranspose(
            container=self._container,
            fill_value=self._fill_value,
        )

    # ---------------------------------------------------------------------------
    @staticmethod
    def _extract_key_attrs(
        key: TLocSelector | ILoc | HLoc,
        index: 'IndexBase',
    ) -> tp.Tuple[TLocSelector, bool, bool]:
        """Given a loc-style key into the supplied `index`, return a selection (labels from that index) as well as Boolean attributes"""
        from static_frame.core.container_util import key_from_container_key
        from static_frame.core.hloc import HLoc

        key_is_null_slice: bool
        key_is_multiple: bool

        if key.__class__ is HLoc:
            labels = index._extract_loc(key)  # type: ignore
            key_is_multiple = any(isinstance(k, KEY_MULTIPLE_TYPES) for k in key)  # type: ignore
            key_is_null_slice = False
        elif key.__class__ is slice:
            labels = index._extract_loc(key)  # type: ignore
            key_is_multiple = True
            key_is_null_slice = key == NULL_SLICE  # type: ignore
        else:
            labels = key_from_container_key(index, key, expand_iloc=True)
            key_is_multiple = isinstance(labels, KEY_MULTIPLE_TYPES)
            key_is_null_slice = False

        return labels, key_is_multiple, key_is_null_slice

    def _extract_loc1d(
        self,
        key: TLocSelector = NULL_SLICE,
    ) -> TSeriesAny:
        """This is only called if container is 1D"""
        from static_frame.core.container_util import (
            get_col_fill_value_factory,
            index_from_index,
        )

        labels, is_multiple, _ = self._extract_key_attrs(
            key,
            self._container._index,
        )
        fill_value = self._fill_value
        container: Series = self._container  # type: ignore [assignment]

        if is_multiple:
            index = index_from_index(labels, container.index)
            return container.reindex(index, fill_value=fill_value, own_index=True)

        # if a single value, return it or the fill value
        fv = get_col_fill_value_factory(fill_value, None)(0, container.dtype)
        return container.get(labels, fv)  # type: ignore

    def _extract_loc2d(
        self,
        row_key: TLocSelector = NULL_SLICE,
        column_key: TLocSelector = NULL_SLICE,
    ) -> tp.Union[TFrameAny, TSeriesAny]:
        """
        NOTE: keys are loc keys; None is interpreted as selector, not a NULL_SLICE
        """
        from static_frame.core.container_util import (
            get_col_fill_value_factory,
            index_from_index,
        )
        from static_frame.core.series import Series

        fill_value = self._fill_value
        container: Frame = self._container  # type: ignore [assignment]

        row_labels, row_is_multiple, row_is_null_slice = self._extract_key_attrs(
            row_key,
            container._index,
        )
        column_labels, column_is_multiple, column_is_null_slice = self._extract_key_attrs(
            column_key,
            container._columns,
        )

        if row_is_multiple and column_is_multiple:
            # cannot reindex if loc keys are elements
            index = (
                index_from_index(row_labels, container.index)
                if not row_is_null_slice
                else None
            )
            columns = (
                index_from_index(column_labels, container.columns)
                if not column_is_null_slice
                else None
            )
            return container.reindex(
                index=index,
                columns=columns,
                fill_value=fill_value,
                own_index=index is not None,
                own_columns=columns is not None,
            )
        elif not row_is_multiple and not column_is_multiple:  # selecting an element
            try:
                return container.loc[row_labels, column_labels]
            except KeyError:
                fv = get_col_fill_value_factory(fill_value, None)(0, None)
                return fv  # type: ignore
        elif not row_is_multiple:
            # row is an element, return Series indexed by columns
            index = index_from_index(column_labels, container.columns)
            if row_labels in container._index:  # type: ignore
                # NOTE: as row_labels might be a tuple, force second argument
                s: TSeriesAny = container.loc[row_labels, NULL_SLICE]
                return s.reindex(index, fill_value=fill_value, own_index=True)

            fv = get_col_fill_value_factory(fill_value, None)(0, None)
            return Series.from_element(
                fv,
                index=index,
                name=row_labels,  # type: ignore
                own_index=True,
            )
        # columns is an element, return Series indexed by index
        if column_labels in container._columns:  # type: ignore
            index = index_from_index(row_labels, container.index)
            s = container[column_labels]  # type: ignore
            return s.reindex(index, fill_value=fill_value, own_index=True)

        index = index_from_index(row_labels, container.index)
        fv = get_col_fill_value_factory(fill_value, None)(0, None)
        return Series.from_element(
            fv,
            index=index,
            name=column_labels,  # type: ignore
            own_index=True,
        )

    def _extract_loc2d_compound(self, key: TLocSelectorCompound) -> TFrameOrSeries:
        if isinstance(key, tuple):
            row_key, column_key = key  # pyright: ignore
        else:
            row_key = key
            column_key = NULL_SLICE
        return self._extract_loc2d(row_key, column_key)

    # ---------------------------------------------------------------------------
    @property
    def loc(self) -> InterGetItemLocReduces[TFrameOrSeries, tp.Any]:
        """Label-based selection where labels not specified will define a new container containing those labels filled with the fill value."""
        if self._container._NDIM == 1:
            return InterGetItemLocReduces(self._extract_loc1d)
        return InterGetItemLocReduces(self._extract_loc2d_compound)

[docs] def __getitem__(self, key: TLocSelector) -> TFrameOrSeries: """Label-based selection where labels not specified will define a new container containing those labels filled with the fill value.""" if self._container._NDIM == 1: return self._extract_loc1d(key) return self._extract_loc2d(NULL_SLICE, key)
# ---------------------------------------------------------------------------
[docs] def __add__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__add__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __sub__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__sub__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __mul__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__mul__'], other=other, axis=self._axis, fill_value=self._fill_value, )
# def __matmul__(self, other: tp.Any) -> tp.Any: # return self._container._ufunc_binary_operator( # operator=OPERATORS['__matmul__'], # other=other, # axis=self._axis, # fill_value=self._fill_value, # )
[docs] def __truediv__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__truediv__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __floordiv__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__floordiv__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __mod__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__mod__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __pow__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__pow__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __lshift__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__lshift__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __rshift__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__rshift__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __and__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__and__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __xor__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__xor__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __or__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__or__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __lt__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__lt__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __le__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__le__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __eq__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__eq__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __ne__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__ne__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __gt__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__gt__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __ge__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__ge__'], other=other, axis=self._axis, fill_value=self._fill_value, )
# ---------------------------------------------------------------------------
[docs] def __radd__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__radd__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __rsub__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__rsub__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __rmul__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__rmul__'], other=other, axis=self._axis, fill_value=self._fill_value, )
# def __rmatmul__(self, other: tp.Any) -> tp.Any: # return self._container._ufunc_binary_operator( # operator=OPERATORS['__rmatmul__'], # other=other, # axis=self._axis, # fill_value=self._fill_value, # )
[docs] def __rtruediv__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__rtruediv__'], other=other, axis=self._axis, fill_value=self._fill_value, )
[docs] def __rfloordiv__(self, other: tp.Any) -> tp.Any: return self._container._ufunc_binary_operator( operator=OPERATORS['__rfloordiv__'], other=other, axis=self._axis, fill_value=self._fill_value, )
# --------------------------------------------------------------------------- class InterfaceFillValueGO(InterfaceFillValue[TVContainer_co]): # only type is FrameGO __slots__ = () _INTERFACE = InterfaceFillValue._INTERFACE + ( # type: ignore '__setitem__', ) def __setitem__( self, key: TLabel, value: tp.Any, ) -> None: self._container.__setitem__(key, value, self._fill_value) # type: ignore # --------------------------------------------------------------------------- class InterfaceBatchFillValue(InterfaceBatch): """Alternate string interface specialized for the :obj:`Batch`.""" _INTERFACE = INTERFACE_FILL_VALUE __slots__ = ( '_batch_apply', '_fill_value', '_axis', ) def __init__( self, batch_apply: tp.Callable[[TCallableAny], 'Batch'], fill_value: object = np.nan, axis: int = 0, ) -> None: self._batch_apply = batch_apply self._fill_value = fill_value self._axis = axis # --------------------------------------------------------------------------- @property def via_T(self) -> 'InterfaceBatchTranspose': """ Interface for using binary operators with one-dimensional sequences, where the opperand is applied column-wise. """ from static_frame.core.node_transpose import InterfaceBatchTranspose return InterfaceBatchTranspose( batch_apply=self._batch_apply, fill_value=self._fill_value, ) # --------------------------------------------------------------------------- @property def loc(self) -> InterGetItemLocReduces[Batch, tp.Any]: """Label-based selection where labels not specified will define a new container containing those labels filled with the fill value.""" def func(key: TLocSelector) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).loc[key] ) return InterGetItemLocReduces(func) def __getitem__(self, key: TLocSelector) -> 'Batch': """Label-based selection where labels not specified will define a new container containing those labels filled with the fill value.""" return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__getitem__(key) ) # --------------------------------------------------------------------------- def __add__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__add__(other) ) def __sub__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__sub__(other) ) def __mul__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__mul__(other) ) # def __matmul__(self, other: tp.Any) -> 'Batch': def __truediv__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__truediv__(other) ) def __floordiv__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__floordiv__(other) ) def __mod__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__mod__(other) ) def __pow__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__pow__(other) ) def __lshift__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__lshift__(other) ) def __rshift__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__rshift__(other) ) def __and__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__and__(other) ) def __xor__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__xor__(other) ) def __or__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__or__(other) ) def __lt__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__lt__(other) ) def __le__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__le__(other) ) def __eq__(self, other: tp.Any) -> 'Batch': # type: ignore return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__eq__(other) ) def __ne__(self, other: tp.Any) -> 'Batch': # type: ignore return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__ne__(other) ) def __gt__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__gt__(other) ) def __ge__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__ge__(other) ) # --------------------------------------------------------------------------- def __radd__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__radd__(other) ) def __rsub__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__rsub__(other) ) def __rmul__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__rmul__(other) ) # def __rmatmul__(self, other: tp.Any) -> 'Batch': def __rtruediv__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__rtruediv__(other) ) def __rfloordiv__(self, other: tp.Any) -> 'Batch': return self._batch_apply( lambda c: InterfaceFillValue( c, fill_value=self._fill_value, axis=self._axis ).__rfloordiv__(other) )