Source code for static_frame.core.batch

from __future__ import annotations

import numpy as np
import typing_extensions as tp

from static_frame.core.bus import Bus
from static_frame.core.container import ContainerOperand
from static_frame.core.display import Display, DisplayActive, DisplayHeader
from static_frame.core.doc_str import doc_inject, doc_update
from static_frame.core.exception import BatchIterableInvalid, immutable_type_error_factory
from static_frame.core.frame import Frame
from static_frame.core.node_dt import InterfaceBatchDatetime
from static_frame.core.node_fill_value import InterfaceBatchFillValue
from static_frame.core.node_re import InterfaceBatchRe
from static_frame.core.node_selector import (
    InterfaceBatchAsType,
    InterfaceGetItemBLoc,
    InterfaceSelectTrio,
    InterGetItemILocCompound,
    InterGetItemLocCompound,
)
from static_frame.core.node_str import InterfaceBatchString
from static_frame.core.node_transpose import InterfaceBatchTranspose
from static_frame.core.node_values import InterfaceBatchValues
from static_frame.core.reduce import InterfaceBatchReduceDispatch
from static_frame.core.series import Series
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.store_sqlite import StoreSQLite
from static_frame.core.store_xlsx import StoreXLSX
from static_frame.core.store_zip import (
    StoreZipCSV,
    StoreZipNPY,
    StoreZipNPZ,
    StoreZipParquet,
    StoreZipPickle,
    StoreZipTSV,
)
from static_frame.core.util import (
    DEFAULT_SORT_KIND,
    DTYPE_OBJECT,
    ELEMENT_TUPLE,
    NAME_DEFAULT,
    TBlocKey,
    TBoolOrBools,
    TCallableAny,
    TDtypeSpecifier,
    TILocSelectorCompound,
    TIndexCtorSpecifier,
    TIndexCtorSpecifiers,
    TIndexInitializer,
    TKeyOrKeys,
    TLabel,
    TLocSelector,
    TLocSelectorCompound,
    TMpContext,
    TName,
    TPathSpecifier,
    TSortKinds,
    TUFunc,
    get_concurrent_executor,
)

TFrameOrSeries = tp.Union[Frame, Series]
TIteratorFrameItems = tp.Iterator[tp.Tuple[TLabel, TFrameOrSeries]]
TGeneratorFrameItems = tp.Callable[..., TIteratorFrameItems]

if tp.TYPE_CHECKING:
    from static_frame.core.display_config import DisplayConfig
    from static_frame.core.index import Index
    from static_frame.core.index_auto import (
        TIndexAutoFactory,
        TRelabelInput,
    )
    from static_frame.core.store import Store
    from static_frame.core.store_config import StoreConfigMapInitializer
    from static_frame.core.style_config import StyleConfig

    TNDArrayAny = np.ndarray[tp.Any, tp.Any]
    TDtypeAny = np.dtype[tp.Any]

TSeriesAny = Series[tp.Any, tp.Any]
TFrameAny = Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]]
TBusAny = Bus[tp.Any]

# -------------------------------------------------------------------------------
# family of executor functions normalized in signature (taking a single tuple of args) for usage in processor pool calls


def normalize_container(post: tp.Any) -> TFrameOrSeries:
    # post might be an element, promote to a Series to permit concatenation
    if post.__class__ is np.ndarray:
        if post.ndim == 1:
            return Series(post)
        elif post.ndim == 2:
            return Frame(post)
        # let ndim 0 pass
    if not isinstance(post, (Frame, Series)):
        # NOTE: do not set index as (container.name,), as this can lead to diagonal formations; will already be paired with stored labels
        return Series.from_element(post, index=ELEMENT_TUPLE)
    return post


def call_func(bundle: tp.Tuple[TFrameOrSeries, TCallableAny]) -> TFrameOrSeries:
    container, func = bundle
    return func(container)  # type: ignore


def call_func_items(
    bundle: tp.Tuple[TFrameOrSeries, TCallableAny, TLabel],
) -> TFrameOrSeries:
    container, func, label = bundle
    return func(label, container)  # type: ignore


def call_attr(bundle: tp.Tuple[TFrameOrSeries, str, tp.Any, tp.Any]) -> TFrameOrSeries:
    container, attr, args, kwargs = bundle
    func = getattr(container, attr)
    return func(*args, **kwargs)  # type: ignore


# -------------------------------------------------------------------------------
[docs] class Batch(ContainerOperand, StoreClientMixin): """ A lazy, sequentially evaluated container of :obj:`Frame` that broadcasts operations on contained :obj:`Frame` by return new :obj:`Batch` instances. Full evaluation of operations only occurs when iterating or calling an exporter, such as ``to_frame()`` or ``to_series()``. """ __slots__ = ( '_items', '_name', '_max_workers', '_chunksize', '_use_threads', '_mp_context', ) _mp_context: TMpContext
[docs] @classmethod def from_frames( cls, frames: tp.Iterable[TFrameAny], /, *, name: TName = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """Return a :obj:`Batch` from an iterable of :obj:`Frame`; labels will be drawn from :obj:`Frame.name`.""" return cls( ((f.name, f) for f in frames), name=name, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
# --------------------------------------------------------------------------- # constructors by data format @classmethod def _from_store( cls, store: Store, /, *, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': items = ((label, store.read(label)) for label in store.labels()) return cls( items, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_tsv( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped TSV :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipTSV(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_csv( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped CSV :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipCSV(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_pickle( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped pickle :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipPickle(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_npz( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped NPZ :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipNPZ(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_npy( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped NPY :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipNPY(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_zip_parquet( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to zipped parquet :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreZipParquet(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_xlsx( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to an XLSX :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreXLSX(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
[docs] @classmethod @doc_inject(selector='batch_constructor') def from_sqlite( cls, fp: TPathSpecifier, /, *, config: StoreConfigMapInitializer = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ) -> 'Batch': """ Given a file path to an SQLite :obj:`Batch` store, return a :obj:`Batch` instance. {args} """ store = StoreSQLite(fp, config=config) return cls._from_store( store, max_workers=max_workers, chunksize=chunksize, use_threads=use_threads, mp_context=mp_context, )
# ---------------------------------------------------------------------------
[docs] def __init__( self, items: TIteratorFrameItems, /, *, name: TName = None, max_workers: tp.Optional[int] = None, chunksize: int = 1, use_threads: bool = False, mp_context: TMpContext = None, ): """ Default constructor of a :obj:`Batch`. {args} """ self._items = items # might be a generator! self._name = name self._max_workers = max_workers self._chunksize = chunksize self._use_threads = use_threads self._mp_context = mp_context
# --------------------------------------------------------------------------- def _derive( self, gen: TGeneratorFrameItems, name: TName = None, ) -> 'Batch': """Utility for creating derived Batch""" return self.__class__( gen(), name=name if name is not None else self._name, max_workers=self._max_workers, chunksize=self._chunksize, use_threads=self._use_threads, ) @property def via_container(self) -> 'Batch': """ Return a new Batch with all values wrapped in either a :obj:`Frame` or :obj:`Series`. """ def gen() -> TIteratorFrameItems: for label, v in self._items: yield label, normalize_container(v) return self._derive(gen) # --------------------------------------------------------------------------- # name interface @property @doc_inject() def name(self) -> TName: """{}""" return self._name # --------------------------------------------------------------------------- @property def shapes(self) -> Series[Index[tp.Any], np.object_]: """A :obj:`Series` describing the shape of each iterated :obj:`Frame`. Returns: :obj:`tp.Tuple[int]` """ items = ((label, f.shape) for label, f in self._items) return Series.from_items(items, name='shape', dtype=DTYPE_OBJECT)
[docs] def display( self, config: tp.Optional[DisplayConfig] = None, /, *, style_config: tp.Optional[StyleConfig] = None, ) -> Display: """Provide a :obj:`Series`-style display of the :obj:`Batch`. Note that if the held iterator is a generator, this display will exhaust the generator.""" config = config or DisplayActive.get() items = ((label, f.__class__) for label, f in self._items) series: TSeriesAny = Series.from_items(items, name=self._name) display_cls = Display.from_values( (), header=DisplayHeader(self.__class__, self._name), config=config ) return series._display( config, display_cls=display_cls, style_config=style_config, )
[docs] def __repr__(self) -> str: """Provide a display of the :obj:`Batch` that does not exhaust the generator.""" if self._name: header = f'{self.__class__.__name__}: {self._name}' else: header = self.__class__.__name__ return f'<{header} max_workers={self._max_workers}>'
# --------------------------------------------------------------------------- # core function application routines def _iter_items(self) -> TIteratorFrameItems: """Iter pairs in items, providing helpful exception of a pair is not found. Thies is necessary as we cannot validate the items until we actually do an iteration, and the iterable might be an iterator.""" for pair in self._items: try: label, frame = pair except ValueError: raise BatchIterableInvalid() from None yield label, frame def _apply_pool( self, labels: tp.List[TLabel], arg_iter: tp.Iterator[tp.Tuple[tp.Any, ...]], caller: tp.Callable[..., TFrameOrSeries], ) -> 'Batch': pool_executor = get_concurrent_executor( use_threads=self._use_threads, max_workers=self._max_workers, mp_context=self._mp_context, ) def gen_pool() -> TIteratorFrameItems: with pool_executor() as executor: yield from zip( labels, executor.map(caller, arg_iter, chunksize=self._chunksize) ) return self._derive(gen_pool) def _apply_pool_except( self, labels: tp.List[TLabel], arg_iter: tp.Iterator[tp.Tuple[tp.Any, ...]], caller: tp.Callable[..., TFrameOrSeries], exception: tp.Type[Exception], ) -> 'Batch': if self._chunksize != 1: raise NotImplementedError( 'Cannot use apply_except idioms with chunksize other than 1' ) pool_executor = get_concurrent_executor( use_threads=self._use_threads, max_workers=self._max_workers, mp_context=self._mp_context, ) def gen_pool() -> TIteratorFrameItems: futures = [] with pool_executor() as executor: for args in arg_iter: futures.append(executor.submit(caller, args)) for label, future in zip(labels, futures): try: container = future.result() except exception: continue yield label, container return self._derive(gen_pool) def _apply_attr( self, *args: tp.Any, attr: str, **kwargs: tp.Any, ) -> 'Batch': """ Apply a method on a Frame given as an attr string. """ if self._max_workers is None: def gen() -> TIteratorFrameItems: for label, frame in self._iter_items(): yield label, call_attr((frame, attr, args, kwargs)) return self._derive(gen) labels = [] def arg_gen() -> tp.Iterator[tp.Tuple[TFrameOrSeries, str, tp.Any, tp.Any]]: for label, frame in self._iter_items(): labels.append(label) yield frame, attr, args, kwargs return self._apply_pool(labels, arg_gen(), call_attr)
[docs] def apply( self, func: TCallableAny, /, ) -> 'Batch': """ Apply a function to each :obj:`Frame` contained in this :obj:`Frame`, where a function is given the :obj:`Frame` as an argument. """ if self._max_workers is None: def gen() -> TIteratorFrameItems: for label, frame in self._iter_items(): yield label, call_func((frame, func)) return self._derive(gen) labels = [] def arg_gen() -> tp.Iterator[tp.Tuple[TFrameOrSeries, TCallableAny]]: for label, frame in self._iter_items(): labels.append(label) yield frame, func return self._apply_pool(labels, arg_gen(), call_func)
[docs] def apply_except( self, func: TCallableAny, exception: tp.Type[Exception], /, ) -> 'Batch': """ Apply a function to each :obj:`Frame` contained in this :obj:`Frame`, where a function is given the :obj:`Frame` as an argument. Exceptions raised that matching the `except` argument will be silenced. """ if self._max_workers is None: def gen() -> TIteratorFrameItems: for label, frame in self._iter_items(): try: yield label, call_func((frame, func)) except exception: pass return self._derive(gen) labels = [] def arg_gen() -> tp.Iterator[tp.Tuple[TFrameOrSeries, TCallableAny]]: for label, frame in self._iter_items(): labels.append(label) yield frame, func return self._apply_pool_except( labels, arg_gen(), call_func, exception, )
[docs] def apply_items( self, func: TCallableAny, /, ) -> 'Batch': """ Apply a function to each :obj:`Frame` contained in this :obj:`Frame`, where a function is given the pair of label, :obj:`Frame` as an argument. """ if self._max_workers is None: def gen() -> TIteratorFrameItems: for label, frame in self._iter_items(): yield label, call_func_items((frame, func, label)) return self._derive(gen) labels = [] def arg_gen() -> tp.Iterator[tp.Tuple[TFrameOrSeries, TCallableAny, TLabel]]: for label, frame in self._iter_items(): labels.append(label) yield frame, func, label return self._apply_pool(labels, arg_gen(), call_func_items)
[docs] def apply_items_except( self, func: TCallableAny, /, *, exception: tp.Type[Exception], ) -> 'Batch': """ Apply a function to each :obj:`Frame` contained in this :obj:`Frame`, where a function is given the pair of label, :obj:`Frame` as an argument. Exceptions raised that matching the `except` argument will be silenced. """ if self._max_workers is None: def gen() -> TIteratorFrameItems: for label, frame in self._iter_items(): try: yield label, call_func_items((frame, func, label)) except exception: pass return self._derive(gen) labels = [] def arg_gen() -> tp.Iterator[tp.Tuple[TFrameOrSeries, TCallableAny, TLabel]]: for label, frame in self._iter_items(): labels.append(label) yield frame, func, label return self._apply_pool_except( labels, arg_gen(), call_func_items, exception, )
# --------------------------------------------------------------------------- @property def reduce(self) -> InterfaceBatchReduceDispatch: """Return a ``ReduceAligned`` interface, permitting function application per column or on entire containers.""" return InterfaceBatchReduceDispatch(self.apply) # --------------------------------------------------------------------------- # extraction def _extract_iloc(self, key: TILocSelectorCompound) -> 'Batch': return self._apply_attr(attr='_extract_iloc', key=key) def _extract_loc(self, key: TLocSelectorCompound) -> 'Batch': return self._apply_attr(attr='_extract_loc', key=key) def _extract_bloc(self, key: TBlocKey) -> 'Batch': return self._apply_attr(attr='_extract_bloc', key=key)
[docs] def __getitem__(self, key: TLocSelector) -> 'Batch': "" return self._apply_attr(attr='__getitem__', key=key)
def __setitem__(self, key: TLabel, value: tp.Any) -> None: raise immutable_type_error_factory(self.__class__, '', key, value) # --------------------------------------------------------------------------- def _drop_iloc(self, key: TILocSelectorCompound) -> 'Batch': return self._apply_attr(attr='_drop_iloc', key=key) def _drop_loc(self, key: TLocSelectorCompound) -> 'Batch': return self._apply_attr(attr='_drop_loc', key=key) def _drop_getitem(self, key: TLocSelectorCompound) -> 'Batch': return self._apply_attr(attr='_drop_getitem', key=key) # --------------------------------------------------------------------------- # interfaces @property def loc(self) -> InterGetItemLocCompound['Batch']: return InterGetItemLocCompound(self._extract_loc) @property def iloc(self) -> InterGetItemILocCompound['Batch']: return InterGetItemILocCompound(self._extract_iloc) @property def bloc(self) -> InterfaceGetItemBLoc['Batch']: return InterfaceGetItemBLoc(self._extract_bloc) @property def drop(self) -> InterfaceSelectTrio['Batch']: return InterfaceSelectTrio( # type: ignore func_iloc=self._drop_iloc, func_loc=self._drop_loc, func_getitem=self._drop_getitem, ) # NOTE: note sure if assign interfaces would work in this context # --------------------------------------------------------------------------- # dictionary-like interface # these methods operate on the Batch itself, not the contained Frames
[docs] def keys(self) -> tp.Iterator[TLabel]: """ Iterator of :obj:`Frame` labels. """ for k, _ in self._iter_items(): yield k
[docs] def __iter__(self) -> tp.Iterator[TLabel]: """ Iterator of :obj:`Frame` labels, same as :obj:`Batch.keys`. """ yield from self.keys()
@property def values(self) -> tp.Iterator[TFrameOrSeries]: # type: ignore # NOTE: this violates the supertype """ Return an iterator of values (:obj:`Frame` or :obj:`Series`) stored in this :obj:`Batch`. """ return (v for _, v in self._iter_items())
[docs] def items(self) -> TIteratorFrameItems: """ Iterator of labels, :obj:`Frame`. """ return self._iter_items()
_items_store = items # --------------------------------------------------------------------------- # axis and shape ufunc methods def _ufunc_unary_operator(self, operator: TUFunc) -> 'Batch': return self._apply_attr(attr='_ufunc_unary_operator', operator=operator) def _ufunc_binary_operator( self, *, operator: TUFunc, other: tp.Any, fill_value: object = np.nan, ) -> 'Batch': return self._apply_attr( attr='_ufunc_binary_operator', operator=operator, other=other, ) def _ufunc_axis_skipna( self, *, axis: int, skipna: bool, ufunc: TUFunc, ufunc_skipna: TUFunc, composable: bool, dtypes: tp.Tuple[TDtypeAny, ...], size_one_unity: bool, ) -> 'Batch': return self._apply_attr( attr='_ufunc_axis_skipna', axis=axis, skipna=skipna, ufunc=ufunc, ufunc_skipna=ufunc_skipna, composable=composable, dtypes=dtypes, size_one_unity=size_one_unity, ) def _ufunc_shape_skipna( self, *, axis: int, skipna: bool, ufunc: TUFunc, ufunc_skipna: TUFunc, composable: bool, dtypes: tp.Tuple[TDtypeAny, ...], size_one_unity: bool, ) -> 'Batch': return self._apply_attr( attr='_ufunc_shape_skipna', axis=axis, skipna=skipna, ufunc=ufunc, ufunc_skipna=ufunc_skipna, composable=composable, dtypes=dtypes, size_one_unity=size_one_unity, ) # --------------------------------------------------------------------------- # via interfaces @property def via_values(self) -> InterfaceBatchValues: """ Interface for applying a function to values in this container. """ return InterfaceBatchValues(self.apply) @property def via_str(self) -> InterfaceBatchString: """ Interface for applying string methods to elements in this container. """ return InterfaceBatchString(self.apply) @property def via_dt(self) -> InterfaceBatchDatetime: """ Interface for applying datetime properties and methods to elements in this container. """ return InterfaceBatchDatetime(self.apply) @property def via_T(self) -> InterfaceBatchTranspose: """ Interface for using binary operators with one-dimensional sequences, where the opperand is applied column-wise. """ return InterfaceBatchTranspose(self.apply)
[docs] def via_fill_value( self, fill_value: object = np.nan, /, ) -> InterfaceBatchFillValue: """ Interface for using binary operators and methods with a pre-defined fill value. """ return InterfaceBatchFillValue( self.apply, fill_value=fill_value, )
[docs] def via_re( self, pattern: str, flags: int = 0, /, ) -> InterfaceBatchRe: """ Interface for applying regular expressions to elements in this container. """ return InterfaceBatchRe( self.apply, pattern=pattern, flags=flags, )
# --------------------------------------------------------------------------- # transformations resulting in the same dimensionality @property def astype(self) -> InterfaceBatchAsType['Batch']: """ Return a new Batch with astype transformed. """ return InterfaceBatchAsType(self.apply)
[docs] def rename( self, name: TName = NAME_DEFAULT, /, *, index: TName = NAME_DEFAULT, columns: TName = NAME_DEFAULT, ) -> 'Batch': """ Return a new Batch with an updated name attribute. """ return self._apply_attr( name, attr='rename', index=index, columns=columns, )
[docs] def sort_index( self, *, ascending: bool = True, kind: TSortKinds = DEFAULT_SORT_KIND ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj;`Frame` ordered by the sorted ``index``. """ return self._apply_attr( attr='sort_index', ascending=ascending, kind=kind, )
[docs] def sort_columns( self, *, ascending: bool = True, kind: TSortKinds = DEFAULT_SORT_KIND ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` ordered by the sorted ``columns``. """ return self._apply_attr( attr='sort_columns', ascending=ascending, kind=kind, )
[docs] def sort_values( self, label: TKeyOrKeys, /, *, ascending: bool = True, axis: int = 1, kind: TSortKinds = DEFAULT_SORT_KIND, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` ordered by the sorted values, where values are given by single column or iterable of columns. Args: label: a label or iterable of keys. """ return self._apply_attr( label, attr='sort_values', ascending=ascending, axis=axis, kind=kind, )
[docs] def isin( self, other: tp.Any, /, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` as a same-sized Boolean :obj:`Frame` that shows if the same-positioned element is in the passed iterable. """ return self._apply_attr( other, attr='isin', )
[docs] @doc_inject(class_name='Batch') def clip( self, *, lower: tp.Optional[tp.Union[float, TSeriesAny, TFrameAny]] = None, upper: tp.Optional[tp.Union[float, TSeriesAny, TFrameAny]] = None, axis: tp.Optional[int] = None, ) -> 'Batch': """{} Args: lower: value, :obj:`Series`, :obj:`Frame` upper: value, :obj:`Series`, :obj:`Frame` axis: required if ``lower`` or ``upper`` are given as a :obj:`Series`. """ return self._apply_attr( attr='clip', lower=lower, upper=upper, axis=axis, )
[docs] def transpose(self) -> 'Batch': """Transpose. Return a :obj:`Frame` with ``index`` as ``columns`` and vice versa.""" return self._apply_attr(attr='transpose')
@property def T(self) -> 'Batch': """Transpose. Return a :obj:`Frame` with ``index`` as ``columns`` and vice versa.""" return self._apply_attr(attr='transpose')
[docs] @doc_inject(selector='duplicated') def duplicated( self, *, axis: int = 0, exclude_first: bool = False, exclude_last: bool = False ) -> 'Batch': """ Return an axis-sized Boolean :obj:`Series` that shows True for all rows (axis 0) or columns (axis 1) duplicated. Args: {axis} {exclude_first} {exclude_last} """ return self._apply_attr( attr='duplicated', axis=axis, exclude_first=exclude_first, exclude_last=exclude_last, )
[docs] @doc_inject(selector='duplicated') def drop_duplicated( self, *, axis: int = 0, exclude_first: bool = False, exclude_last: bool = False ) -> 'Batch': """ Return a :obj:`Batch` with contained :obj:`Frame` with duplicated rows (axis 0) or columns (axis 1) removed. All values in the row or column are compared to determine duplication. Args: {axis} {exclude_first} {exclude_last} """ return self._apply_attr( attr='drop_duplicated', axis=axis, exclude_first=exclude_first, exclude_last=exclude_last, )
# as only useful on Frame, perhaps skip? # def set_index(self, # def set_index_hierarchy(self, # def unset_index(self, *,
[docs] def __round__( self, decimals: int = 0, /, ) -> 'Batch': """ Return a :obj:`Batch` with contained :obj:`Frame` rounded to the given decimals. Negative decimals round to the left of the decimal point. Args: decimals: number of decimals to round to. """ return self._apply_attr( decimals, attr='__round__', )
[docs] def roll( self, index: int = 0, columns: int = 0, *, include_index: bool = False, include_columns: bool = False, ) -> 'Batch': """ Roll columns and/or rows by positive or negative integer counts, where columns and/or rows roll around the axis. Args: include_index: Determine if index is included in index-wise rotation. include_columns: Determine if column index is included in index-wise rotation. """ return self._apply_attr( attr='roll', index=index, columns=columns, include_index=include_index, include_columns=include_columns, )
[docs] def shift( self, index: int = 0, columns: int = 0, fill_value: tp.Any = np.nan ) -> 'Batch': """ Shift columns and/or rows by positive or negative integer counts, where columns and/or rows fall of the axis and introduce missing values, filled by `fill_value`. """ return self._apply_attr( attr='shift', index=index, columns=columns, fill_value=fill_value, )
# --------------------------------------------------------------------------- # na handling
[docs] def isna(self) -> 'Batch': """ Return a :obj:`Batch` with contained, same-indexed :obj:`Frame` indicating True which values are NaN or None. """ return self._apply_attr(attr='isna')
[docs] def notna(self) -> 'Batch': """ Return a :obj:`Batch` with contained, same-indexed :obj:`Frame` indicating True which values are not NaN or None. """ return self._apply_attr(attr='notna')
[docs] def dropna( self, *, axis: int = 0, condition: tp.Callable[[TNDArrayAny], bool] = np.all, ) -> 'Batch': """ Return a :obj:`Batch` with contained :obj:`Frame` after removing rows (axis 0) or columns (axis 1) where any or all values are NA (NaN or None). The condition is determined by a NumPy ufunc that process the Boolean array returned by ``isna()``; the default is ``np.all``. Args: axis: condition: """ return self._apply_attr(attr='dropna', axis=axis, condition=condition)
# --------------------------------------------------------------------------- # falsy handling
[docs] def isfalsy(self) -> 'Batch': """ Return a :obj:`Batch` with contained, same-indexed :obj:`Frame` indicating True which values are Falsy. """ return self._apply_attr(attr='isfalsy')
[docs] def notfalsy(self) -> 'Batch': """ Return a :obj:`Batch` with contained, same-indexed :obj:`Frame` indicating True which values are not Falsy. """ return self._apply_attr(attr='notfalsy')
[docs] def dropfalsy( self, *, axis: int = 0, condition: tp.Callable[[TNDArrayAny], bool] = np.all, ) -> 'Batch': """ Return a :obj:`Batch` with contained :obj:`Frame` after removing rows (axis 0) or columns (axis 1) where any or all values are NA (NaN or None). The condition is determined by a NumPy ufunc that process the Boolean array returned by ``isna()``; the default is ``np.all``. Args: axis: condition: """ return self._apply_attr(attr='dropfalsy', axis=axis, condition=condition)
# --------------------------------------------------------------------------- # na filling
[docs] def fillna( self, value: tp.Any, /, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling null (NaN or None) with the provided ``value``. """ return self._apply_attr( value, attr='fillna', )
[docs] def fillna_leading(self, value: tp.Any, /, *, axis: int = 0) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling leading (and only leading) null (NaN or None) with the provided ``value``. Args: {value} {axis} """ return self._apply_attr(value, attr='fillna_leading', axis=axis)
[docs] def fillna_trailing( self, value: tp.Any, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling trailing (and only trailing) null (NaN or None) with the provided ``value``. Args: {value} {axis} """ return self._apply_attr(value, attr='fillna_trailing', axis=axis)
[docs] def fillna_forward( self, limit: int = 0, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling forward null (NaN or None) with the last observed value. Args: {limit} {axis} """ return self._apply_attr( limit, attr='fillna_forward', axis=axis, )
[docs] def fillna_backward( self, limit: int = 0, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling backward null (NaN or None) with the first observed value. Args: {limit} {axis} """ return self._apply_attr( limit, attr='fillna_backward', axis=axis, )
# --------------------------------------------------------------------------- # falsy filling
[docs] def fillfalsy( self, value: tp.Any, /, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling falsy values with the provided ``value``. """ return self._apply_attr( value, attr='fillfalsy', )
[docs] def fillfalsy_leading( self, value: tp.Any, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling leading (and only leading) falsy values with the provided ``value``. Args: {value} {axis} """ return self._apply_attr( value, attr='fillfalsy_leading', axis=axis, )
[docs] def fillfalsy_trailing( self, value: tp.Any, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling trailing (and only trailing) falsy values with the provided ``value``. Args: {value} {axis} """ return self._apply_attr( value, attr='fillfalsy_trailing', axis=axis, )
[docs] def fillfalsy_forward( self, limit: int = 0, /, *, axis: int = 0, ) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling forward falsy values with the last observed value. Args: {limit} {axis} """ return self._apply_attr( limit, attr='fillfalsy_forward', axis=axis, )
[docs] def fillfalsy_backward(self, limit: int = 0, /, *, axis: int = 0) -> 'Batch': """ Return a new :obj:`Batch` with contained :obj:`Frame` after filling backward falsy values with the first observed value. Args: {limit} {axis} """ return self._apply_attr( limit, attr='fillfalsy_backward', axis=axis, )
# --------------------------------------------------------------------------- # index and relabel
[docs] def relabel( self, index: tp.Optional[TRelabelInput] = None, columns: tp.Optional[TRelabelInput] = None, *, index_constructor: TIndexCtorSpecifier = None, columns_constructor: TIndexCtorSpecifier = None, ) -> 'Batch': return self._apply_attr( attr='relabel', index=index, columns=columns, index_constructor=index_constructor, columns_constructor=columns_constructor, )
[docs] def unset_index( self, *, names: tp.Iterable[TLabel] = (), consolidate_blocks: bool = False, columns_constructors: TIndexCtorSpecifiers = None, ) -> 'Batch': return self._apply_attr( attr='unset_index', names=names, consolidate_blocks=consolidate_blocks, columns_constructors=columns_constructors, )
[docs] def reindex( self, index: tp.Optional[TIndexInitializer] = None, columns: tp.Optional[TIndexInitializer] = None, *, fill_value: object = np.nan, own_index: bool = False, own_columns: bool = False, check_equals: bool = True, ) -> 'Batch': return self._apply_attr( attr='reindex', index=index, columns=columns, fill_value=fill_value, own_index=own_index, own_columns=own_columns, check_equals=check_equals, )
[docs] def relabel_flat( self, index: bool = False, columns: bool = False, ) -> 'Batch': return self._apply_attr(attr='relabel_flat', index=index, columns=columns)
[docs] def relabel_level_add( self, index: TLabel = None, columns: TLabel = None, *, index_constructor: TIndexCtorSpecifier = None, columns_constructor: TIndexCtorSpecifier = None, ) -> 'Batch': return self._apply_attr( attr='relabel_level_add', index=index, columns=columns, index_constructor=index_constructor, columns_constructor=columns_constructor, )
[docs] def relabel_level_drop(self, index: int = 0, columns: int = 0) -> 'Batch': return self._apply_attr(attr='relabel_level_drop', index=index, columns=columns)
[docs] def relabel_shift_in( self, key: TLocSelector, /, *, axis: int = 0, ) -> 'Batch': return self._apply_attr(key, attr='relabel_shift_in', axis=axis)
# --------------------------------------------------------------------------- # rank
[docs] def rank_ordinal( self, *, axis: int = 0, skipna: bool = True, ascending: TBoolOrBools = True, start: int = 0, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._apply_attr( attr='rank_ordinal', axis=axis, skipna=skipna, ascending=ascending, start=start, fill_value=fill_value, )
[docs] def rank_dense( self, *, axis: int = 0, skipna: bool = True, ascending: TBoolOrBools = True, start: int = 0, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._apply_attr( attr='rank_dense', axis=axis, skipna=skipna, ascending=ascending, start=start, fill_value=fill_value, )
[docs] def rank_min( self, *, axis: int = 0, skipna: bool = True, ascending: TBoolOrBools = True, start: int = 0, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._apply_attr( attr='rank_min', axis=axis, skipna=skipna, ascending=ascending, start=start, fill_value=fill_value, )
[docs] def rank_max( self, *, axis: int = 0, skipna: bool = True, ascending: TBoolOrBools = True, start: int = 0, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._apply_attr( attr='rank_max', axis=axis, skipna=skipna, ascending=ascending, start=start, fill_value=fill_value, )
[docs] def rank_mean( self, *, axis: int = 0, skipna: bool = True, ascending: TBoolOrBools = True, start: int = 0, fill_value: tp.Any = np.nan, ) -> 'Batch': return self._apply_attr( attr='rank_mean', axis=axis, skipna=skipna, ascending=ascending, start=start, fill_value=fill_value, )
# --------------------------------------------------------------------------- # transformations resulting in changed dimensionality
[docs] def count( self, *, skipna: bool = True, skipfalsy: bool = False, unique: bool = False, axis: int = 0, ) -> 'Batch': """Apply count on contained Frames.""" return self._apply_attr( attr='count', skipna=skipna, skipfalsy=skipfalsy, unique=unique, axis=axis, )
[docs] @doc_inject(selector='sample') def sample( self, index: tp.Optional[int] = None, columns: tp.Optional[int] = None, *, seed: tp.Optional[int] = None, ) -> 'Batch': """Apply sample on contained Frames. Args: {index} {columns} {seed} """ return self._apply_attr( attr='sample', index=index, columns=columns, seed=seed, )
[docs] @doc_inject(selector='head', class_name='Batch') def head( self, count: int = 5, /, ) -> 'Batch': """{doc} Args: {count} """ return self._apply_attr( count, attr='head', )
[docs] @doc_inject(selector='tail', class_name='Batch') def tail( self, count: int = 5, /, ) -> 'Batch': """{doc} Args: {count} """ return self._apply_attr( count, attr='tail', )
[docs] @doc_inject(selector='argminmax') def loc_min(self, *, skipna: bool = True, axis: int = 0) -> 'Batch': """ Return the labels corresponding to the minimum value found. Args: {skipna} {axis} """ return self._apply_attr( attr='loc_min', skipna=skipna, axis=axis, )
[docs] @doc_inject(selector='argminmax') def iloc_min(self, *, skipna: bool = True, axis: int = 0) -> 'Batch': """ Return the integer indices corresponding to the minimum values found. Args: {skipna} {axis} """ return self._apply_attr( attr='iloc_min', skipna=skipna, axis=axis, )
[docs] @doc_inject(selector='argminmax') def loc_max(self, *, skipna: bool = True, axis: int = 0) -> 'Batch': """ Return the labels corresponding to the maximum values found. Args: {skipna} {axis} """ return self._apply_attr( attr='loc_max', skipna=skipna, axis=axis, )
[docs] @doc_inject(selector='argminmax') def iloc_max(self, *, skipna: bool = True, axis: int = 0) -> 'Batch': """ Return the integer indices corresponding to the maximum values found. Args: {skipna} {axis} """ return self._apply_attr( attr='iloc_max', skipna=skipna, axis=axis, )
[docs] def cov( self, *, axis: int = 1, ddof: int = 1, ) -> 'Batch': """ Compute a covariance matrix. Args: axis: if 0, each row represents a variable, with observations as columns; if 1, each column represents a variable, with observations as rows. Defaults to 1. ddof: Delta degrees of freedom, defaults to 1. """ return self._apply_attr( attr='cov', axis=axis, ddof=ddof, )
[docs] def corr( self, *, axis: int = 1, ) -> 'Batch': """ Compute a correlation matrix. Args: axis: if 0, each row represents a variable, with observations as columns; if 1, each column represents a variable, with observations as rows. Defaults to 1. """ return self._apply_attr( attr='corr', axis=axis, )
# --------------------------------------------------------------------------- # utility function to numpy array
[docs] def unique( self, *, axis: tp.Optional[int] = None, ) -> 'Batch': """ Return a NumPy array of unqiue values. If the axis argument is provied, uniqueness is determined by columns or row. """ return self._apply_attr( attr='unique', axis=axis, )
# --------------------------------------------------------------------------- # exporter
[docs] def to_series( self, *, dtype: TDtypeSpecifier = None, name: TName = None, index_constructor: TIndexCtorSpecifier = None, ) -> TSeriesAny: """ Consolidate stored values into a new :obj:`Series` using the stored labels as the index. """ return Series.from_items( self._items, dtype=dtype, name=name, index_constructor=index_constructor, )
[docs] def to_frame( self, *, axis: int = 0, union: bool = True, index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None, columns: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None, index_constructor: TIndexCtorSpecifier = None, columns_constructor: TIndexCtorSpecifier = None, name: TName = None, fill_value: object = np.nan, consolidate_blocks: bool = False, ) -> TFrameAny: """ Consolidate stored :obj:`Frame` into a new :obj:`Frame` using the stored labels as the index on the provided ``axis`` using :obj:`Frame.from_concat`. This assumes that that the contained :obj:`Frame` have been reduced to a single dimension along the provided `axis`. """ labels = [] containers: tp.List[TFrameOrSeries] = [] ndim1d = True for label, container in self._items: container = normalize_container(container) labels.append(label) ndim1d &= container.ndim == 1 containers.append(container) name = name if name is not None else self._name if ndim1d: if axis == 0 and index is None: index = labels if axis == 1 and columns is None: columns = labels return Frame.from_concat( containers, axis=axis, union=union, index=index, columns=columns, index_constructor=index_constructor, columns_constructor=columns_constructor, name=name, fill_value=fill_value, consolidate_blocks=consolidate_blocks, ) # produce a hierarchical index to return all Frames f: TFrameAny = Frame.from_concat_items( zip(labels, containers), axis=axis, union=union, name=name, fill_value=fill_value, consolidate_blocks=consolidate_blocks, index_constructor=index_constructor, columns_constructor=columns_constructor, ) if index is not None or columns is not None: # this relabels, as that is how Frame.from_concat works # NOTE: we need to apply index_constructor, columns_constructors if defined f = f.relabel(index=index, columns=columns) return f
[docs] def to_bus( self, *, index_constructor: TIndexCtorSpecifier = None, ) -> TBusAny: """Realize the :obj:`Batch` as an :obj:`Bus`. Note that, as a :obj:`Bus` must have all labels (even if :obj:`Frame` are loaded lazily), this :obj:`Batch` will be exhausted.""" frames: tp.List[TFrameAny] = [] index = [] for i, f in self.items(): index.append(i) if isinstance(f, Series): frames.append(f.to_frame()) else: frames.append(f) return Bus( frames, index=index, index_constructor=index_constructor, name=self._name, )
def _to_signature_bytes( self, include_name: bool = True, include_class: bool = True, encoding: str = 'utf-8', ) -> bytes: return self.to_bus()._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding, )
doc_update(Batch.__init__, selector='batch_init')