Source code for static_frame.core.yarn

import typing as tp
from collections.abc import Set
from itertools import chain

import numpy as np

from static_frame.core.axis_map import buses_to_hierarchy
from static_frame.core.bus import Bus
from static_frame.core.container import ContainerBase
from static_frame.core.container_util import index_from_optional_constructor
from static_frame.core.container_util import index_many_concat
from static_frame.core.container_util import iter_component_signature_bytes
from static_frame.core.container_util import rehierarch_from_index_hierarchy
from static_frame.core.display import Display
from static_frame.core.display import DisplayActive
from static_frame.core.display import DisplayHeader
from static_frame.core.display_config import DisplayConfig
from static_frame.core.doc_str import doc_inject
from static_frame.core.exception import ErrorInitYarn
from static_frame.core.exception import RelabelInvalid
from static_frame.core.frame import Frame
from static_frame.core.index import Index
from static_frame.core.index_auto import IndexAutoFactory
from static_frame.core.index_auto import IndexAutoFactoryType
from static_frame.core.index_auto import RelabelInput
from static_frame.core.index_base import IndexBase
from static_frame.core.index_hierarchy import IndexHierarchy
from static_frame.core.node_iter import IterNodeApplyType
from static_frame.core.node_iter import IterNodeNoArg
from static_frame.core.node_iter import IterNodeType
from static_frame.core.node_selector import InterfaceGetItem
from static_frame.core.node_selector import InterfaceSelectTrio
from static_frame.core.series import Series
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.style_config import StyleConfig
from static_frame.core.util import DTYPE_OBJECT
from static_frame.core.util import NAME_DEFAULT
from static_frame.core.util import GetItemKeyType
from static_frame.core.util import IndexConstructor
from static_frame.core.util import IndexConstructors
from static_frame.core.util import IndexInitializer
from static_frame.core.util import NameType
from static_frame.core.util import is_callable_or_mapping


[docs]class Yarn(ContainerBase, StoreClientMixin): ''' A :obj:`Series`-like container made of an ordered collection of :obj:`Bus`. :obj:`Yarn` can be indexed independently of the contained :obj:`Bus`, permitting independent labels per contained :obj:`Frame`. ''' __slots__ = ( '_series', '_hierarchy', '_index', '_deepcopy_from_bus', ) _series: Series _hierarchy: IndexHierarchy _index: IndexBase _NDIM: int = 1
[docs] @classmethod def from_buses(cls, buses: tp.Iterable[Bus], *, name: NameType = None, retain_labels: bool, deepcopy_from_bus: bool = False, ) -> 'Yarn': '''Return a :obj:`Yarn` from an iterable of :obj:`Bus`; labels will be drawn from :obj:`Bus.name`. ''' series = Series.from_items( ((b.name, b) for b in buses), dtype=DTYPE_OBJECT, name=name, ) hierarchy = buses_to_hierarchy( series.values, series.index, deepcopy_from_bus=deepcopy_from_bus, init_exception_cls=ErrorInitYarn, ) if retain_labels: index = hierarchy else: index = hierarchy.level_drop(1) #type: ignore return cls(series, hierarchy=hierarchy, index=index, deepcopy_from_bus=deepcopy_from_bus, )
[docs] @classmethod def from_concat(cls, containers: tp.Iterable['Yarn'], *, index: tp.Optional[tp.Union[IndexInitializer, IndexAutoFactoryType]] = None, name: NameType = NAME_DEFAULT, deepcopy_from_bus: bool = False, ) -> 'Yarn': ''' Concatenate multiple :obj:`Yarn` into a new :obj:`Yarn`. Loaded status of :obj:`Frame` within each :obj:`Bus` will not be altered. Args: containers: index: Optionally provide new labels for the result of the concatenation. name: deepcopy_from_bus: ''' bus_components = [] index_components: tp.Optional[tp.List[IndexBase]] = None if index is not None else [] for element in containers: if isinstance(element, Yarn): bus_components.extend(element._series.values) if index_components is not None: index_components.append(element.index) else: raise NotImplementedError(f'cannot instantiate from {type(element)}') array = np.empty(len(bus_components), dtype=DTYPE_OBJECT) for i, bus in enumerate(bus_components): array[i] = bus array.flags.writeable = False if index_components is not None: index = index_many_concat(index_components, Index) series = Series(array, name=name) return cls(series, deepcopy_from_bus=deepcopy_from_bus, index=index, )
#---------------------------------------------------------------------------
[docs] def __init__(self, series: tp.Union[Series, tp.Iterable[Bus]], *, index: tp.Optional[tp.Union[IndexBase, IndexAutoFactoryType]] = None, index_constructor: tp.Optional[IndexConstructor] = None, deepcopy_from_bus: bool = False, hierarchy: tp.Optional[IndexHierarchy] = None, own_index: bool = False, ) -> None: ''' Args: series: An iterable (or :obj:`Series`) of :obj:`Bus`. The length of this container is not the same as ``index``, if provided. index: Optionally provide an index for the :obj:`Frame` contained in all :obj:`Bus`. index_constructor: deepcopy_from_bus: hierarchy: own_index: ''' if isinstance(series, Series): if series.dtype != DTYPE_OBJECT: raise ErrorInitYarn( f'Series passed to initializer must have dtype object, not {series.dtype}') self._series = series # Bus by Bus label else: self._series = Series(series, dtype=DTYPE_OBJECT) # get a default index self._deepcopy_from_bus = deepcopy_from_bus # _hierarchy might be None while we still need to set self._index if hierarchy is None: self._hierarchy = buses_to_hierarchy( self._series.values, self._series.index, deepcopy_from_bus=self._deepcopy_from_bus, init_exception_cls=ErrorInitYarn, ) else: self._hierarchy = hierarchy if own_index: self._index = index #type: ignore elif index is None or index is IndexAutoFactory: self._index = IndexAutoFactory.from_optional_constructor( len(self._hierarchy), default_constructor=Index, explicit_constructor=index_constructor ) else: # an iterable of labels or an Index self._index = index_from_optional_constructor(index, default_constructor=Index, explicit_constructor=index_constructor ) if len(self._index) != len(self._hierarchy): raise ErrorInitYarn(f'Length of supplied index ({len(self._index)}) not of sufficient size ({len(self._hierarchy)}).')
#--------------------------------------------------------------------------- # deferred loading of axis info
[docs] def unpersist(self) -> None: '''For the :obj:`Bus` contained in this object, replace all loaded :obj:`Frame` with :obj:`FrameDeferred`. ''' for b in self._series.values: b.unpersist()
#---------------------------------------------------------------------------
[docs] def __reversed__(self) -> tp.Iterator[tp.Hashable]: ''' Returns a reverse iterator on the :obj:`Yarn` index. Returns: :obj:`Index` ''' return reversed(self._index)
#--------------------------------------------------------------------------- # name interface @property #type: ignore @doc_inject() def name(self) -> NameType: '''{}''' return self._series._name
[docs] def rename(self, name: NameType) -> 'Yarn': ''' Return a new :obj:`Yarn` with an updated name attribute. Args: name ''' # NOTE: do not need to call _update_index_labels; can continue to defer series = self._series.rename(name) return self.__class__(series, index=self._index, hierarchy=self._hierarchy, deepcopy_from_bus=self._deepcopy_from_bus, )
#--------------------------------------------------------------------------- # interfaces @property def loc(self) -> InterfaceGetItem['Yarn']: return InterfaceGetItem(self._extract_loc) @property def iloc(self) -> InterfaceGetItem['Yarn']: return InterfaceGetItem(self._extract_iloc) @property def drop(self) -> InterfaceSelectTrio['Yarn']: ''' Interface for dropping elements from :obj:`Yarn`. ''' return InterfaceSelectTrio( #type: ignore func_iloc=self._drop_iloc, func_loc=self._drop_loc, func_getitem=self._drop_loc ) #--------------------------------------------------------------------------- @property def iter_element(self) -> IterNodeNoArg['Yarn']: ''' Iterator of elements. ''' return IterNodeNoArg( container=self, function_items=self._axis_element_items, function_values=self._axis_element, yield_type=IterNodeType.VALUES, apply_type=IterNodeApplyType.SERIES_VALUES, ) @property def iter_element_items(self) -> IterNodeNoArg['Yarn']: ''' Iterator of label, element pairs. ''' return IterNodeNoArg( container=self, function_items=self._axis_element_items, function_values=self._axis_element, yield_type=IterNodeType.ITEMS, apply_type=IterNodeApplyType.SERIES_VALUES, ) #--------------------------------------------------------------------------- # common attributes from the numpy array @property def dtype(self) -> np.dtype: ''' Return the dtype of the realized NumPy array. Returns: :obj:`numpy.dtype` ''' return DTYPE_OBJECT # always dtype object @property def shape(self) -> tp.Tuple[int]: ''' Return a tuple describing the shape of the realized NumPy array. Returns: :obj:`Tuple[int]` ''' return (self._hierarchy.shape[0],) @property def ndim(self) -> int: ''' Return the number of dimensions, which for a :obj:`Yarn` is always 1. Returns: :obj:`int` ''' return self._NDIM @property def size(self) -> int: ''' Return the size of the underlying NumPy array. Returns: :obj:`int` ''' return self._hierarchy.shape[0] #--------------------------------------------------------------------------- @property def index(self) -> IndexBase: ''' The index instance assigned to this container. Returns: :obj:`Index` ''' return self._index #--------------------------------------------------------------------------- # dictionary-like interface
[docs] def keys(self) -> IndexBase: ''' Iterator of index labels. Returns: :obj:`Iterator[Hashable]` ''' return self._index
[docs] def __iter__(self) -> tp.Iterator[tp.Hashable]: ''' Iterator of index labels, same as :obj:`static_frame.Series.keys`. Returns: :obj:`Iterator[Hashasble]` ''' return self._index.__iter__()
[docs] def __contains__(self, value: tp.Hashable) -> bool: ''' Inclusion of value in index labels. Returns: :obj:`bool` ''' return self._index.__contains__(value)
[docs] def get(self, key: tp.Hashable, default: tp.Any = None, ) -> tp.Any: ''' Return the value found at the index key, else the default if the key is not found. Returns: :obj:`Any` ''' if key not in self._index: return default return self.__getitem__(key)
[docs] def items(self) -> tp.Iterator[tp.Tuple[tp.Hashable, Frame]]: '''Iterator of pairs of :obj:`Yarn` label and contained :obj:`Frame`. ''' labels = iter(self._index) for bus in self._series.values: # NOTE: cannot use Bus.items() as it may not have the same index representation as the Yarn; Bus._axis_element is optimized for handling max_persist > 1 loading for f in bus._axis_element(): yield next(labels), f
_items_store = items @property def values(self) -> np.ndarray: '''A 1D object array of all :obj:`Frame` contained in all contained :obj:`Bus`. ''' array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT) np.concatenate([b.values for b in self._series.values], out=array) array.flags.writeable = False return array #---------------------------------------------------------------------------
[docs] @doc_inject() def equals(self, other: tp.Any, *, compare_name: bool = False, compare_dtype: bool = False, compare_class: bool = False, skipna: bool = True, ) -> bool: ''' {doc} Note: this will attempt to load and compare all Frame managed by the Bus. Args: {compare_name} {compare_dtype} {compare_class} {skipna} ''' if id(other) == id(self): return True if compare_class and self.__class__ != other.__class__: return False elif not isinstance(other, Yarn): return False if compare_name and self._series._name != other._series._name: return False # length of series in Yarn might be different but may still have the same frames, so look at realized length if len(self) != len(other): return False if not self._index.equals( other.index, # call property to force index creation compare_name=compare_name, compare_dtype=compare_dtype, compare_class=compare_class, skipna=skipna, ): return False # can zip because length of Series already match # using .values will force loading all Frame into memory; better to use items() to permit collection for (_, frame_self), (_, frame_other) in zip(self.items(), other.items()): if not frame_self.equals(frame_other, compare_name=compare_name, compare_dtype=compare_dtype, compare_class=compare_class, skipna=skipna, ): return False return True
#--------------------------------------------------------------------------- # transformations resulting in changed dimensionality
[docs] @doc_inject(selector='head', class_name='Yarn') def head(self, count: int = 5) -> 'Yarn': '''{doc} Args: {count} Returns: :obj:`Yarn` ''' return self.iloc[:count]
[docs] @doc_inject(selector='tail', class_name='Yarn') def tail(self, count: int = 5) -> 'Yarn': '''{doc}s Args: {count} Returns: :obj:`Yarn` ''' return self.iloc[-count:]
#--------------------------------------------------------------------------- # extraction def _extract_iloc(self, key: GetItemKeyType) -> 'Yarn': ''' Returns: Yarn or, if an element is selected, a Frame ''' target_hierarchy = self._hierarchy._extract_iloc(key) if isinstance(target_hierarchy, tuple): # got a single element, return a Frame return self._series[target_hierarchy[0]][target_hierarchy[1]] #type: ignore # get the outer-most index of the hierarchical index target_bus_index = target_hierarchy.unique(depth_level=0, order_by_occurrence=True) target_bus_index = next(iter(target_hierarchy._index_constructors))(target_bus_index) # create a Boolean array equal to the entire realized length valid = np.full(len(self._index), False) valid[key] = True index = self._index.iloc[key] buses = np.empty(len(target_bus_index), dtype=DTYPE_OBJECT) pos = 0 for bus_label, width in self._hierarchy.label_widths_at_depth(0): if bus_label not in target_bus_index: pos += width continue extract_per_bus = valid[pos: pos+width] pos += width idx = target_bus_index.loc_to_iloc(bus_label) buses[idx] = self._series[bus_label]._extract_iloc(extract_per_bus) buses.flags.writeable = False target_series = Series(buses, index=target_bus_index, own_index=True, name=self._series._name, ) return self.__class__(target_series, index=index, hierarchy=target_hierarchy, deepcopy_from_bus=self._deepcopy_from_bus, own_index=True, ) def _extract_loc(self, key: GetItemKeyType) -> 'Yarn': # use the index active for this Yarn key_iloc = self._index._loc_to_iloc(key) return self._extract_iloc(key_iloc)
[docs] @doc_inject(selector='selector') def __getitem__(self, key: GetItemKeyType) -> 'Yarn': '''Selector of values by label. Args: key: {key_loc} ''' return self._extract_loc(key)
#--------------------------------------------------------------------------- # utilities for alternate extraction: drop def _drop_iloc(self, key: GetItemKeyType) -> 'Yarn': invalid = np.full(len(self._index), True) invalid[key] = False return self._extract_iloc(invalid) def _drop_loc(self, key: GetItemKeyType) -> 'Yarn': return self._drop_iloc(self._index._loc_to_iloc(key)) #--------------------------------------------------------------------------- # axis functions def _axis_element_items(self, ) -> tp.Iterator[tp.Tuple[tp.Hashable, tp.Any]]: '''Generator of index, value pairs, equivalent to Series.items(). Repeated to have a common signature as other axis functions. ''' yield from self.items() def _axis_element(self, ) -> tp.Iterator[tp.Any]: for bus in self._series.values: yield from bus._axis_element() #---------------------------------------------------------------------------
[docs] def __len__(self) -> int: '''Length of values. ''' return self._index.__len__()
[docs] @doc_inject() def display(self, config: tp.Optional[DisplayConfig] = None, *, style_config: tp.Optional[StyleConfig] = None, ) -> Display: '''{doc} Args: {config} ''' # NOTE: the key change over serires is providing the Bus as the displayed class config = config or DisplayActive.get() display_cls = Display.from_values((), header=DisplayHeader(self.__class__, self._series._name), config=config) # NOTE: do not load FrameDeferred, so concatenate contained Series's values directly array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT) np.concatenate( [b._values_mutable for b in self._series.values], out=array) array.flags.writeable = False series = Series(array, index=self._index, own_index=True) return series._display(config, display_cls=display_cls, style_config=style_config, )
#--------------------------------------------------------------------------- # extended discriptors; in general, these do not force loading Frame @property def mloc(self) -> Series: '''Returns a :obj:`Series` showing a tuple of memory locations within each loaded Frame. ''' return Series.from_concat((b.mloc for b in self._series.values), index=self._index) @property def dtypes(self) -> Frame: '''Returns a Frame of dtypes for all loaded Frames. ''' f = Frame.from_concat( frames=(f.dtypes for f in self._series.values), fill_value=None, ).relabel(index=self._index) return tp.cast(Frame, f) @property def shapes(self) -> Series: '''A :obj:`Series` describing the shape of each loaded :obj:`Frame`. Unloaded :obj:`Frame` will have a shape of None. Returns: :obj:`tp.Series` ''' return Series.from_concat((b.shapes for b in self._series.values), index=self._index) @property def nbytes(self) -> int: '''Total bytes of data currently loaded in :obj:`Bus` contained in this :obj:`Yarn`. ''' return sum(b.nbytes for b in self._series.values) @property def status(self) -> Frame: ''' Return a :obj:`Frame` indicating loaded status, size, bytes, and shape of all loaded :obj:`Frame` in :obj:`Bus` contined in this :obj:`Yarn`. ''' f = Frame.from_concat( (b.status for b in self._series.values), index=IndexAutoFactory) return f.relabel(index=self._index) # type: ignore #--------------------------------------------------------------------------- # exporter
[docs] def to_series(self) -> Series: '''Return a :obj:`Series` with the :obj:`Frame` contained in all contained :obj:`Bus`. ''' # NOTE: this should load all deferred Frame return Series(self.values, index=self._index, own_index=True)
def _to_signature_bytes(self, include_name: bool = True, include_class: bool = True, encoding: str = 'utf-8', ) -> bytes: v = (f._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding, ) for f in self._axis_element()) return b''.join(chain( iter_component_signature_bytes(self, include_name=include_name, include_class=include_class, encoding=encoding), (self._index._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding), self._hierarchy._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding),), v)) #--------------------------------------------------------------------------- # index manipulation
[docs] @doc_inject(selector='relabel', class_name='Yarn') def relabel(self, index: tp.Optional[RelabelInput] ) -> 'Yarn': ''' {doc} Args: index: {relabel_input} ''' #NOTE: we name the parameter index for alignment with the corresponding Frame method own_index = False if index is IndexAutoFactory: index_init = None elif index is None: index_init = self._index elif is_callable_or_mapping(index): #type: ignore index_init = self._index.relabel(index) own_index = True elif isinstance(index, Set): raise RelabelInvalid() else: index_init = index #type: ignore return self.__class__(self._series, # no change to Buses index=index_init, deepcopy_from_bus=self._deepcopy_from_bus, hierarchy=self._hierarchy, # no change own_index=own_index, )
[docs] @doc_inject(selector='relabel_flat', class_name='Yarn') def relabel_flat(self) -> 'Yarn': ''' {doc} ''' if not isinstance(self._index, IndexHierarchy): raise RuntimeError('cannot flatten an Index that is not an IndexHierarchy') return self.__class__(self._series, # no change to Buses index=self._index.flat(), deepcopy_from_bus=self._deepcopy_from_bus, hierarchy=self._hierarchy, # no change own_index=True, )
[docs] @doc_inject(selector='relabel_level_add', class_name='Yarn') def relabel_level_add(self, level: tp.Hashable ) -> 'Yarn': ''' {doc} Args: level: {level} ''' return self.__class__(self._series, # no change to Buses index=self._index.level_add(level), deepcopy_from_bus=self._deepcopy_from_bus, hierarchy=self._hierarchy, # no change own_index=True, )
[docs] @doc_inject(selector='relabel_level_drop', class_name='Yarn') def relabel_level_drop(self, count: int = 1 ) -> 'Yarn': ''' {doc} Args: count: {count} ''' if not isinstance(self._index, IndexHierarchy): raise RuntimeError('cannot drop level of an Index that is not an IndexHierarchy') return self.__class__(self._series, # no change to Buses index=self._index.level_drop(count), deepcopy_from_bus=self._deepcopy_from_bus, hierarchy=self._hierarchy, # no change own_index=True, )
[docs] def rehierarch(self, depth_map: tp.Sequence[int], *, index_constructors: IndexConstructors = None, ) -> 'Yarn': ''' Return a new :obj:`Series` with new a hierarchy based on the supplied ``depth_map``. ''' if self.index.depth == 1: raise RuntimeError('cannot rehierarch when there is no hierarchy') index, iloc_map = rehierarch_from_index_hierarchy( labels=self._index, #type: ignore depth_map=depth_map, index_constructors=index_constructors, name=self._index.name, ) return self._extract_iloc(iloc_map).relabel(index) #type: ignore