Source code for static_frame.core.node_values

from __future__ import annotations

import numpy as np
import typing_extensions as tp
from arraykit import column_2d_filter

from static_frame.core.node_selector import Interface
from static_frame.core.node_selector import InterfaceBatch
from static_frame.core.type_blocks import TypeBlocks
from static_frame.core.util import TCallableAny
from static_frame.core.util import TUFunc
from static_frame.core.util import blocks_to_array_2d

if tp.TYPE_CHECKING:
    from static_frame.core.batch import Batch  # pragma: no cover
    from static_frame.core.frame import Frame  # pylint: disable=W0611 #pragma: no cover
    from static_frame.core.index import Index  # pylint: disable=W0611 #pragma: no cover
    from static_frame.core.index_hierarchy import IndexHierarchy  # pylint: disable=W0611 #pragma: no cover
    from static_frame.core.series import Series  # pylint: disable=W0611 #pragma: no cover
    TNDArrayAny = np.ndarray[tp.Any, tp.Any] #pragma: no cover
    TDtypeAny = np.dtype[tp.Any] #pragma: no cover

TVContainer_co = tp.TypeVar('TVContainer_co',
        'Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]]', # type: ignore[type-arg]
        'IndexHierarchy',
        'Series[tp.Any, tp.Any]',
        'Index[tp.Any]',
        covariant=True,
        )

INTERFACE_VALUES = (
        'apply',
        '__array_ufunc__',
        '__call__',
        )

VALID_UFUNC_ARRAY_METHODS = frozenset(('__call__',))


class InterfaceValues(Interface, tp.Generic[TVContainer_co]):
    '''
    If a user wants to call a ufunc and get back an array of variable dimensionality, they have to call that ufunc on one consolidated array via .values; any attempt at block-level manipulation will have to, under some scenarios, figure out how to combine the per-block results (and an appropriate type) into an array. This is undesirable. Instead, all applications of this interface must use UFuncs that retain dimensionality.
    '''
    __slots__ = (
            '_container',
            '_consolidate_blocks',
            '_unify_blocks',
            '_dtype',
            )
    _INTERFACE = INTERFACE_VALUES

    def __init__(self,
            container: TVContainer_co,
            *,
            consolidate_blocks: bool = False,
            unify_blocks: bool = False,
            dtype: TDtypeAny | None = None,
            ) -> None:
        self._container: TVContainer_co = container
        self._consolidate_blocks = consolidate_blocks
        self._unify_blocks = unify_blocks
        self._dtype: TDtypeAny | None = dtype

[docs] def __call__(self, *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: TDtypeAny | None = None, ) -> 'InterfaceValues[TVContainer_co]': ''' Args: consolidate_blocks: Group adjacent same-typed arrays into 2D arrays. unify_blocks: Group all arrays into single array, re-typing to an appropriate dtype. dtype: specify a dtype to be used in conversion before consolidation or unification, and before function application. ''' return self.__class__(self._container, consolidate_blocks=consolidate_blocks, unify_blocks=unify_blocks, dtype=dtype, )
[docs] def __array_ufunc__(self, ufunc: TUFunc, method: str, *args: tp.Any, **kwargs: tp.Any, ) -> TVContainer_co: '''Support for applying NumPy functions directly on containers. ''' from static_frame.core.frame import Frame from static_frame.core.series import Series if method not in VALID_UFUNC_ARRAY_METHODS: return NotImplemented #pragma: no cover def func(block: TNDArrayAny, normalize_2d: bool = True) -> TNDArrayAny: if normalize_2d: block = column_2d_filter(block) # NOTE: we assume that our target array (the passed in block) should alwasy be the first argument; then, we filter out arguments that are either this object or an InterfaceBatchValues instance args_final = [block] for arg in args: if arg is self or isinstance(arg, InterfaceBatchValues): continue args_final.append(arg) # [(arg if arg is not self else block) for arg in args] array = ufunc(*args_final, **kwargs) array.flags.writeable = False return array if self._container._NDIM == 2: blocks: tp.Iterable[TNDArrayAny] = self._container._blocks._blocks #type: ignore if self._unify_blocks: dtype = self._container._blocks._index.dtype if self._dtype is None else self._dtype #type: ignore tb = TypeBlocks.from_blocks(func(blocks_to_array_2d( blocks=blocks, shape=self._container.shape, # type: ignore dtype=dtype, ))) elif self._consolidate_blocks: if self._dtype is not None: blocks = (b.astype(self._dtype) for b in blocks) tb = TypeBlocks.from_blocks( func(b) for b in TypeBlocks.consolidate_blocks(blocks) ) else: if self._dtype is not None: blocks = (func(b.astype(self._dtype)) for b in blocks) else: blocks = (func(b) for b in blocks) tb = TypeBlocks.from_blocks(blocks) if isinstance(self._container, Frame): return self._container.__class__( tb, index=self._container.index, columns=self._container.columns, name=self._container.name, own_index=True, own_data=True, own_columns=self._container.STATIC, ) #IndexHierarchy return self._container._from_type_blocks( #type: ignore tb, index_constructors=self._container._index_constructors, # type: ignore name=self._container._name, own_blocks=True, ) # all 1D containers if self._dtype is not None: values = func(self._container.values.astype(self._dtype), normalize_2d=False, ) else: values = func(self._container.values, normalize_2d=False, ) if isinstance(self._container, Series): return self._container.__class__(values, index=self._container.index, name=self._container.name, own_index=True, ) # else, Index return self._container.__class__(values, # type: ignore name=self._container.name, )
[docs] def apply(self, func: TUFunc, *args: tp.Any, **kwargs: tp.Any, ) -> TVContainer_co: return self.__array_ufunc__( func, '__call__', *args, **kwargs, )
class InterfaceBatchValues(InterfaceBatch): __slots__ = ( '_batch_apply', '_consolidate_blocks', '_unify_blocks', '_dtype', ) _INTERFACE = INTERFACE_VALUES def __init__(self, batch_apply: tp.Callable[[TCallableAny], 'Batch'], *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: TDtypeAny | None = None, ) -> None: self._batch_apply = batch_apply self._consolidate_blocks = consolidate_blocks self._unify_blocks = unify_blocks self._dtype = dtype #---------------------------------------------------------------------------
[docs] def apply(self, func: TUFunc, *args: tp.Any, **kwargs: tp.Any, ) -> 'Batch': ''' Interface for using binary operators and methods with a pre-defined fill value. ''' return self._batch_apply(lambda c: c.via_values( consolidate_blocks=self._consolidate_blocks, unify_blocks=self._unify_blocks, dtype=self._dtype, ).apply(func, *args, **kwargs))
[docs] def __call__(self, *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: TDtypeAny | None = None, ) -> 'InterfaceBatchValues': ''' Args: consolidate_blocks: Group adjacent same-typed arrays into 2D arrays. unify_blocks: Group all arrays into single array, re-typing to an appropriate dtype. dtype: specify a dtype to be used in conversion before consolidation or unification, and before function application. ''' return self.__class__(self._batch_apply, consolidate_blocks=consolidate_blocks, unify_blocks=unify_blocks, dtype=dtype, )
[docs] def __array_ufunc__(self, ufunc: TUFunc, method: str, *args: tp.Any, **kwargs: tp.Any, ) -> 'Batch': '''Support for applying NumPy functions directly on containers, returning NumPy arrays. ''' # NOTE: want to fail method is not supported at call time of this function, not the deferred execution via Batch if method not in VALID_UFUNC_ARRAY_METHODS: return NotImplemented #pragma: no cover def func(c: TVContainer_co) -> TVContainer_co: return c.via_values( consolidate_blocks=self._consolidate_blocks, unify_blocks=self._unify_blocks, dtype=self._dtype, ).__array_ufunc__(ufunc, method, *args, **kwargs, ) return self._batch_apply(func)