Source code for static_frame.core.node_values

import typing as tp

import numpy as np
from arraykit import column_2d_filter

from static_frame.core.node_selector import Interface
from static_frame.core.node_selector import InterfaceBatch
from static_frame.core.type_blocks import TypeBlocks
from static_frame.core.util import AnyCallable
from static_frame.core.util import DtypeSpecifier
from static_frame.core.util import UFunc
from static_frame.core.util import blocks_to_array_2d

if tp.TYPE_CHECKING:
    from static_frame.core.batch import Batch  # pylint: disable = W0611 #pragma: no cover
    from static_frame.core.frame import Frame  # pylint: disable = W0611 #pragma: no cover
    from static_frame.core.index import Index  # pylint: disable = W0611 #pragma: no cover
    from static_frame.core.index_hierarchy import IndexHierarchy  # pylint: disable = W0611 #pragma: no cover
    from static_frame.core.series import Series  # pylint: disable = W0611 #pragma: no cover

TContainer = tp.TypeVar('TContainer',
        'Frame',
        'IndexHierarchy',
        'Series',
        'Index',
        )

INTERFACE_VALUES = (
        'apply',
        '__array_ufunc__',
        '__call__',
        )

VALID_UFUNC_ARRAY_METHODS = frozenset(('__call__',))


class InterfaceValues(Interface[TContainer]):
    '''
    If a user wants to call a ufunc and get back an array of variable dimensionality, they have to call that ufunc on one consolidated array via .values; any attempt at block-level manipulation will have to, under some scenarios, figure out how to combine the per-block results (and an appropriate type) into an array. This is undesirable. Instead, all applications of this interface must use UFuncs that retain dimensionality.
    '''
    __slots__ = (
            '_container',
            '_consolidate_blocks',
            '_unify_blocks',
            '_dtype',
            )
    INTERFACE = INTERFACE_VALUES

    def __init__(self,
            container: TContainer,
            *,
            consolidate_blocks: bool = False,
            unify_blocks: bool = False,
            dtype: DtypeSpecifier = None,
            ) -> None:
        self._container: TContainer = container
        self._consolidate_blocks = consolidate_blocks
        self._unify_blocks = unify_blocks
        self._dtype = dtype

[docs] def __call__(self, *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: DtypeSpecifier = None, ) -> 'InterfaceValues[TContainer]': ''' Args: consolidate_blocks: Group adjacent same-typed arrays into 2D arrays. unify_blocks: Group all arrays into single array, re-typing to an appropriate dtype. dtype: specify a dtype to be used in conversion before consolidation or unification, and before function application. ''' return self.__class__(self._container, consolidate_blocks=consolidate_blocks, unify_blocks=unify_blocks, dtype=dtype, )
[docs] def __array_ufunc__(self, ufunc: UFunc, method: str, *args: tp.Any, **kwargs: tp.Any, ) -> TContainer: '''Support for applying NumPy functions directly on containers, returning NumPy arrays. ''' from static_frame.core.frame import Frame from static_frame.core.series import Series if method not in VALID_UFUNC_ARRAY_METHODS: return NotImplemented #pragma: no cover def func(block: np.ndarray, normalize_2d: bool = True) -> np.ndarray: if normalize_2d: block = column_2d_filter(block) # NOTE: we assume that our target array (the passed in block) should alwasy be the first argument; then, we filter out arguments that are either this object or an InterfaceBatchValues instance args_final = [block] for arg in args: if arg is self or isinstance(arg, InterfaceBatchValues): continue args_final.append(arg) # [(arg if arg is not self else block) for arg in args] return ufunc(*args_final, **kwargs) if self._container._NDIM == 2: blocks: tp.Iterable[np.ndarray] = self._container._blocks._blocks #type: ignore if self._unify_blocks: dtype = self._container._blocks._row_dtype if self._dtype is None else self._dtype #type: ignore tb = TypeBlocks.from_blocks(func(blocks_to_array_2d( blocks=blocks, shape=self._container.shape, dtype=dtype, ))) elif self._consolidate_blocks: if self._dtype is not None: blocks = (b.astype(self._dtype) for b in blocks) tb = TypeBlocks.from_blocks( func(b) for b in TypeBlocks.consolidate_blocks(blocks) ) else: if self._dtype is not None: blocks = (func(b.astype(self._dtype)) for b in blocks) else: blocks = (func(b) for b in blocks) tb = TypeBlocks.from_blocks(blocks) if isinstance(self._container, Frame): return self._container.__class__( tb, index=self._container.index, columns=self._container.columns, name=self._container.name, own_index=True, own_data=True, own_columns=self._container.STATIC, ) #IndexHierarchy return self._container._from_type_blocks( #type: ignore tb, index_constructors=self._container._index_constructors, # type: ignore name=self._container._name, own_blocks=True, ) # all 1D containers if self._dtype is not None: values = func(self._container.values.astype(self._dtype), normalize_2d=False, ) else: values = func(self._container.values, normalize_2d=False, ) if isinstance(self._container, Series): return self._container.__class__(values, index=self._container.index, name=self._container.name, own_index=True, ) # else, Index return self._container.__class__(values, name=self._container.name, )
[docs] def apply(self, func: UFunc, *args: tp.Any, **kwargs: tp.Any, ) -> TContainer: return self.__array_ufunc__( func, '__call__', *args, **kwargs, )
class InterfaceBatchValues(InterfaceBatch): __slots__ = ( '_batch_apply', '_consolidate_blocks', '_unify_blocks', '_dtype', ) INTERFACE = INTERFACE_VALUES def __init__(self, batch_apply: tp.Callable[[AnyCallable], 'Batch'], *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: DtypeSpecifier = None, ) -> None: self._batch_apply = batch_apply self._consolidate_blocks = consolidate_blocks self._unify_blocks = unify_blocks self._dtype = dtype #---------------------------------------------------------------------------
[docs] def apply(self, func: UFunc, *args: tp.Any, **kwargs: tp.Any, ) -> 'Batch': ''' Interface for using binary operators and methods with a pre-defined fill value. ''' return self._batch_apply(lambda c: c.via_values( consolidate_blocks=self._consolidate_blocks, unify_blocks=self._unify_blocks, dtype=self._dtype, ).apply(func, *args, **kwargs))
[docs] def __call__(self, *, consolidate_blocks: bool = False, unify_blocks: bool = False, dtype: DtypeSpecifier = None, ) -> 'InterfaceBatchValues': ''' Args: consolidate_blocks: Group adjacent same-typed arrays into 2D arrays. unify_blocks: Group all arrays into single array, re-typing to an appropriate dtype. dtype: specify a dtype to be used in conversion before consolidation or unification, and before function application. ''' return self.__class__(self._batch_apply, consolidate_blocks=consolidate_blocks, unify_blocks=unify_blocks, dtype=dtype, )
[docs] def __array_ufunc__(self, ufunc: UFunc, method: str, *args: tp.Any, **kwargs: tp.Any, ) -> 'Batch': '''Support for applying NumPy functions directly on containers, returning NumPy arrays. ''' # NOTE: want to fail method is not supported at call time of this function, not the deferred execution via Batch if method not in VALID_UFUNC_ARRAY_METHODS: return NotImplemented #pragma: no cover def func(c: TContainer) -> np.ndarray: return c.via_values( consolidate_blocks=self._consolidate_blocks, unify_blocks=self._unify_blocks, dtype=self._dtype, ).__array_ufunc__(ufunc, method, *args, **kwargs, ) return self._batch_apply(func)