Source code for static_frame.core.bus

from __future__ import annotations

from itertools import chain
from itertools import zip_longest

import numpy as np
import typing_extensions as tp

from static_frame.core.container import ContainerBase
from static_frame.core.container_util import index_from_optional_constructor
from static_frame.core.container_util import iter_component_signature_bytes
from static_frame.core.display import Display
from static_frame.core.display import DisplayActive
from static_frame.core.display import DisplayHeader
from static_frame.core.display_config import DisplayConfig
from static_frame.core.doc_str import doc_inject
from static_frame.core.exception import ErrorInitBus
from static_frame.core.exception import ErrorInitIndexNonUnique
from static_frame.core.frame import Frame
from static_frame.core.index import Index
from static_frame.core.index_auto import TIndexAutoFactory
from static_frame.core.index_auto import TRelabelInput
from static_frame.core.index_base import IndexBase
from static_frame.core.node_iter import IterNodeApplyType
from static_frame.core.node_iter import IterNodeNoArg
from static_frame.core.node_iter import IterNodeType
from static_frame.core.node_selector import InterfaceSelectTrio
from static_frame.core.node_selector import InterGetItemILocReduces
from static_frame.core.node_selector import InterGetItemLocReduces
from static_frame.core.series import Series
from static_frame.core.store import Store
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.store_config import StoreConfigMap
from static_frame.core.store_config import StoreConfigMapInitializer
from static_frame.core.store_hdf5 import StoreHDF5
from static_frame.core.store_sqlite import StoreSQLite
from static_frame.core.store_xlsx import StoreXLSX
from static_frame.core.store_zip import StoreZipCSV
from static_frame.core.store_zip import StoreZipNPY
from static_frame.core.store_zip import StoreZipNPZ
from static_frame.core.store_zip import StoreZipParquet
from static_frame.core.store_zip import StoreZipPickle
from static_frame.core.store_zip import StoreZipTSV
from static_frame.core.style_config import StyleConfig
from static_frame.core.util import DEFAULT_SORT_KIND
from static_frame.core.util import DTYPE_BOOL
from static_frame.core.util import DTYPE_FLOAT_DEFAULT
from static_frame.core.util import DTYPE_OBJECT
from static_frame.core.util import INT_TYPES
from static_frame.core.util import NAME_DEFAULT
from static_frame.core.util import NULL_SLICE
from static_frame.core.util import ZIP_LONGEST_DEFAULT
from static_frame.core.util import TBoolOrBools
from static_frame.core.util import TILocSelector
from static_frame.core.util import TIndexCtorSpecifier
from static_frame.core.util import TIndexCtorSpecifiers
from static_frame.core.util import TIndexInitializer
from static_frame.core.util import TLabel
from static_frame.core.util import TLocSelector
from static_frame.core.util import TName
from static_frame.core.util import TPathSpecifier
from static_frame.core.util import TSortKinds


#-------------------------------------------------------------------------------
class FrameDeferredMeta(type):
    def __repr__(cls) -> str:
        return f'<{cls.__name__}>'

class FrameDeferred(metaclass=FrameDeferredMeta):
    '''
    Token placeholder for :obj:`Frame` not yet loaded.
    '''
#-------------------------------------------------------------------------------

if tp.TYPE_CHECKING:
    from static_frame.core.generic_aliases import TFrameAny  # pragma: no cover
    from static_frame.core.generic_aliases import TIndexHierarchyAny  # pragma: no cover
    from static_frame.core.generic_aliases import TSeriesAny  # pragma: no cover

    TNDArrayAny = np.ndarray[tp.Any, tp.Any] #pragma: no cover
    TDtypeAny = np.dtype[tp.Any] #pragma: no cover
    TDtypeObject = np.dtype[np.object_] #pragma: no cover
    TSeriesObject = Series[tp.Any, np.object_] #pragma: no cover

    TBusItems = tp.Iterable[tp.Tuple[ #pragma: no cover
            TLabel, tp.Union[TFrameAny, tp.Type[FrameDeferred]]]] #pragma: no cover

    TIterFrame = tp.Iterator[TFrameAny] #pragma: no cover

#-------------------------------------------------------------------------------
TVIndex = tp.TypeVar('TVIndex', bound=IndexBase, default=tp.Any)

[docs] class Bus(ContainerBase, StoreClientMixin, tp.Generic[TVIndex]): # not a ContainerOperand ''' A randomly-accessible container of :obj:`Frame`. When created from a multi-table storage format (such as a zip-pickle or XLSX), a Bus will lazily read in components as they are accessed. When combined with the ``max_persist`` parameter, a Bus will not hold on to more than ``max_persist`` references, permitting low-memory reading of collections of :obj:`Frame`. ''' __slots__ = ( '_loaded', '_loaded_all', '_values_mutable', '_index', '_name', '_store', '_config', '_last_accessed', '_max_persist', ) _values_mutable: TNDArrayAny _index: IndexBase _store: tp.Optional[Store] _config: StoreConfigMap _name: TName STATIC = False _NDIM: int = 1
[docs] @classmethod def from_items(cls, pairs: tp.Iterable[tp.Tuple[TLabel, TFrameAny]], *, config: StoreConfigMapInitializer = None, name: TName = None, index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None ) -> tp.Self: '''Return a :obj:`Bus` from an iterable of pairs of label, :obj:`Frame`. Returns: :obj:`Bus` ''' frames = [] index = [] for i, f in pairs: # might be a generator index.append(i) frames.append(f) return cls(frames, index=index, index_constructor=index_constructor, name=name, config=config, )
[docs] @classmethod def from_frames(cls, frames: tp.Iterable[TFrameAny], *, index_constructor: TIndexCtorSpecifier = None, config: StoreConfigMapInitializer = None, name: TName = None, ) -> tp.Self: '''Return a :obj:`Bus` from an iterable of :obj:`Frame`; labels will be drawn from :obj:`Frame.name`. ''' try: return cls.from_items(((f.name, f) for f in frames), index_constructor=index_constructor, config=config, name=name, ) except ErrorInitIndexNonUnique: raise ErrorInitIndexNonUnique('Frames do not have unique names.') from None
[docs] @classmethod def from_dict(cls, mapping: tp.Dict[TLabel, TFrameAny], *, name: TName = None, index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None ) -> tp.Self: '''Bus construction from a mapping of labels and :obj:`Frame`. Args: mapping: a dictionary or similar mapping interface. Returns: :obj:`Bus` ''' return cls(frames=mapping.values(), index=mapping.keys(), index_constructor=index_constructor, name=name, )
[docs] @classmethod def from_series(cls, series: TSeriesAny, *, store: tp.Optional[Store] = None, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, own_data: bool = False, ) -> tp.Self: ''' Create a :obj:`Bus` from a :obj:`Series` of :obj:`Frame`. ''' # NOTE: this interface is for 0.9 after the default Bus no longer accepts a Series return cls(series.values, index=series.index, store=store, config=config, max_persist=max_persist, own_data=own_data, own_index=True, name=series.name, )
[docs] @classmethod def from_concat(cls, containers: tp.Iterable[TBusAny], *, index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None, name: TName = NAME_DEFAULT, ) -> tp.Self: ''' Concatenate multiple :obj:`Bus` into a new :obj:`Bus`. All :obj:`Bus` will load all :obj:`Frame` into memory if any are deferred. ''' # will extract .values, .index from Bus, which will correct load from Store as needed # NOTE: useful to use Series here as it handles aligned names, IndexAutoFactory, etc. series: TSeriesObject = Series.from_concat(containers, index=index, name=name) return cls.from_series(series, own_data=True)
#--------------------------------------------------------------------------- # constructors by data format @classmethod def _from_store(cls, store: Store, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: return cls(None, # will generate FrameDeferred array index=store.labels(config=config), index_constructor=index_constructor, store=store, config=config, max_persist=max_persist, own_data=True, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_tsv(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped TSV :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipTSV(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_csv(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped CSV :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipCSV(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_pickle(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped pickle :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipPickle(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_npz(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped NPZ :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipNPZ(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_npy(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped NPY :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipNPY(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_zip_parquet(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to zipped parquet :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreZipParquet(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_xlsx(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to an XLSX :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' # how to pass configuration for multiple sheets? store = StoreXLSX(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_sqlite(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to an SQLite :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreSQLite(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
[docs] @classmethod @doc_inject(selector='bus_constructor') def from_hdf5(cls, fp: TPathSpecifier, *, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' Given a file path to a HDF5 :obj:`Bus` store, return a :obj:`Bus` instance. {args} ''' store = StoreHDF5(fp) return cls._from_store(store, config=config, max_persist=max_persist, index_constructor=index_constructor, )
#---------------------------------------------------------------------------
[docs] def __init__(self, frames: TNDArrayAny | tp.Iterable[TFrameAny | tp.Type[FrameDeferred]] | None, *, index: TIndexInitializer, index_constructor: TIndexCtorSpecifier = None, name: TName = NAME_DEFAULT, store: tp.Optional[Store] = None, config: StoreConfigMapInitializer = None, max_persist: tp.Optional[int] = None, own_index: bool = False, own_data: bool = False, ): ''' Default Bus constructor. {args} ''' if max_persist is not None: # use an (ordered) dictionary to give use an ordered set, simply pointing to None for all keys self._last_accessed: tp.Dict[TLabel, None] = {} if own_index: self._index = index #type: ignore else: self._index = index_from_optional_constructor(index, default_constructor=Index, explicit_constructor=index_constructor ) count = len(self._index) # pyright: ignore frames_array: TNDArrayAny self._loaded: TNDArrayAny load_array: bool | np.bool_ self._loaded_all: bool | np.bool_ if frames is None: if store is None: raise ErrorInitBus('Cannot initialize a :obj:`Bus` with neither `frames` nor `store`.') self._values_mutable = np.full(count, FrameDeferred, dtype=DTYPE_OBJECT) self._loaded = np.full(count, False, dtype=DTYPE_BOOL) self._loaded_all = False else: if frames.__class__ is np.ndarray: if frames.dtype != DTYPE_OBJECT: #type: ignore raise ErrorInitBus( f'Series passed to initializer must have dtype object, not {frames.dtype}') #type: ignore frames_array = frames # type: ignore load_array = False else: if own_data: raise ErrorInitBus('Cannot use `own_data` when not supplying an array.') frames_array = np.empty(count, dtype=DTYPE_OBJECT) load_array = True self._loaded = np.empty(count, dtype=DTYPE_BOOL) # do a one time iteration of series for i, (label, value) in enumerate(zip_longest( index, frames, fillvalue=ZIP_LONGEST_DEFAULT, )): if label is ZIP_LONGEST_DEFAULT or value is ZIP_LONGEST_DEFAULT: raise ErrorInitBus('frames and index are not of equal length') if load_array: frames_array[i] = value if value is FrameDeferred: self._loaded[i] = False elif isinstance(value, Frame): # permit FrameGO? if max_persist is not None: self._last_accessed[label] = None self._loaded[i] = True else: raise ErrorInitBus(f'supplied {value.__class__} is not a Frame or FrameDeferred.') self._loaded_all = self._loaded.all() if own_data or load_array: self._values_mutable = frames_array else: self._values_mutable = frames_array.copy() self._values_mutable.flags.writeable = True # self._index = index self._name = None if name is NAME_DEFAULT else name self._store = store # Not handling cases of max_persist being greater than the length of the Series (might floor to length) if max_persist is not None and max_persist < self._loaded.sum(): raise ErrorInitBus('max_persist cannot be less than the number of already loaded Frames') self._max_persist = max_persist # providing None will result in default; providing a StoreConfig or StoreConfigMap will return an appropriate map self._config = StoreConfigMap.from_initializer(config)
#--------------------------------------------------------------------------- def _derive_from_series(self, series: TSeriesObject, *, own_data: bool = False, ) -> tp.Self: '''Utility for creating a derived Bus, propagating the associated ``Store`` and configuration. This can be used if the passed `series` is a subset or re-ordering of self._series; however, if the index has been transformed, this method should not be used, as, if there is a Store, the labels are no longer found in that Store. ''' # NOTE: there may be a more efficient path than using a Series return self.__class__.from_series(series, store=self._store, config=self._config, max_persist=self._max_persist, own_data=own_data, ) # ---------------------------------------------------------------------------
[docs] def __reversed__(self) -> tp.Iterator[TLabel]: ''' Returns a reverse iterator on the :obj:`Bus` index. Returns: :obj:`Index` ''' return reversed(self._index)
# def __copy__(self) -> tp.Self: # ''' # Return a new Bus, holding new references to Frames as well as a link to the a new Store instance. # ''' # return self.__class__(series, # store=self._store.__copy__(), # config=self._config, # max_persiste=self._max_persist, # ) #--------------------------------------------------------------------------- # name interface @property @doc_inject() def name(self) -> TName: '''{}''' return self._name
[docs] def rename(self, name: TName) -> tp.Self: ''' Return a new :obj:`Bus` with an updated name attribute. ''' # NOTE: do not want to use .values as this will force loading all Frames; use _values_mutable and let a copy be made by constructor return self.__class__(self._values_mutable, index=self._index, name=name, store=self._store, config=self._config, max_persist=self._max_persist, own_index=True, own_data=False, )
#--------------------------------------------------------------------------- # interfaces @property def loc(self) -> InterGetItemLocReduces[TBusAny, np.object_]: return InterGetItemLocReduces(self._extract_loc) @property def iloc(self) -> InterGetItemILocReduces[TBusAny, np.object_]: return InterGetItemILocReduces(self._extract_iloc) @property def drop(self) -> InterfaceSelectTrio[TBusAny]: ''' Interface for dropping elements from :obj:`static_frame.Bus`. ''' return InterfaceSelectTrio( #type: ignore func_iloc=self._drop_iloc, func_loc=self._drop_loc, func_getitem=self._drop_loc ) #--------------------------------------------------------------------------- @property def iter_element(self) -> IterNodeNoArg[TBusAny]: ''' Iterator of elements. ''' return IterNodeNoArg( container=self, function_items=self._axis_element_items, function_values=self._axis_element, yield_type=IterNodeType.VALUES, apply_type=IterNodeApplyType.SERIES_VALUES, ) @property def iter_element_items(self) -> IterNodeNoArg[TBusAny]: ''' Iterator of label, element pairs. ''' return IterNodeNoArg( container=self, function_items=self._axis_element_items, function_values=self._axis_element, yield_type=IterNodeType.ITEMS, apply_type=IterNodeApplyType.SERIES_VALUES, ) #--------------------------------------------------------------------------- # index manipulation # NOTE: must return a new Bus with fully-realized Frames, as cannot gaurantee usage of a Store after labels have been changed.
[docs] @doc_inject(selector='reindex', class_name='Bus') def reindex(self, index: TIndexInitializer, *, fill_value: tp.Any, own_index: bool = False, check_equals: bool = True ) -> tp.Self: ''' {doc} Args: index: {index_initializer} columns: {index_initializer} {fill_value} {own_index} ''' series = self.to_series().reindex(index, fill_value=fill_value, own_index=own_index, check_equals=check_equals, ) # NOTE: do not propagate store after reindex return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel', class_name='Bus') def relabel(self, index: tp.Optional[TRelabelInput], *, index_constructor: TIndexCtorSpecifier = None, ) -> tp.Self: ''' {doc} Args: index: {relabel_input_index} ''' # NOTE: can be done without going trhough a series series = self.to_series().relabel(index, index_constructor=index_constructor) # NOTE: do not propagate store after relabel return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_flat', class_name='Bus') def relabel_flat(self) -> tp.Self: ''' {doc} ''' series = self.to_series().relabel_flat() return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_level_add', class_name='Bus') def relabel_level_add(self, level: TLabel ) -> tp.Self: ''' {doc} Args: level: {level} ''' series = self.to_series().relabel_level_add(level) return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_level_drop', class_name='Bus') def relabel_level_drop(self, count: int = 1 ) -> tp.Self: ''' {doc} Args: count: {count} ''' series = self.to_series().relabel_level_drop(count) return self.__class__.from_series(series, config=self._config)
[docs] def rehierarch(self, depth_map: tp.Sequence[int], *, index_constructors: TIndexCtorSpecifiers = None, ) -> tp.Self: ''' Return a new :obj:`Bus` with new a hierarchy based on the supplied ``depth_map``. ''' series = self.to_series().rehierarch( depth_map, index_constructors=index_constructors, ) return self.__class__.from_series(series, config=self._config)
#--------------------------------------------------------------------------- # na / falsy handling # NOTE: not implemented, as a Bus must contain only Frame or FrameDeferred #--------------------------------------------------------------------------- # cache management def _update_values_mutable_iloc(self, key: TILocSelector) -> None: ''' Update _values_mutable with the key specified, where key can be any iloc. Args: key: always an iloc key. ''' max_persist = self._max_persist max_persist_active = max_persist is not None target_loaded = self._loaded[key] target_loaded_count = target_loaded.sum() load = False if self._loaded_all else not target_loaded.all() if not load and not max_persist_active: return index = self._index label: TLabel key_is_element = isinstance(key, INT_TYPES) if not load and max_persist_active: # must update LRU position labels = (index.iloc[key],) if key_is_element else index.iloc[key].values for label in labels: # update LRU position self._last_accessed[label] = self._last_accessed.pop(label, None) return if self._store is None: # Store must be defined if we are partially loaded raise RuntimeError('no store defined') array = self._values_mutable # selection might result in an element so types here are not precise target_values: TNDArrayAny = array[key] target_labels: TNDArrayAny | TIndexHierarchyAny if self._index._NDIM == 2: # if an IndexHierarchy, using .values results in a 2D array that might coerce types; thus, must keep an index target_labels = self._index[key] # type: ignore[assignment] else: # if an 1D index, we can immediately reduce to an array target_labels = self._index.values[key] target_count = 1 if key_is_element else len(target_labels) if max_persist_active: loaded_count = self._loaded.sum() loaded_available = max_persist - loaded_count loaded_needed = target_count - target_loaded_count store_reader: TIterFrame targets_items: TBusItems # NOTE: prepare iterable of pairs of label, Frame / FrameDeferred; ensure that for every FrameDeferred, the appropriate Frame is loaded and yielded from the store_reader in order. We must ensure within the target of requested Frame we do not delete any previously-loaded Frame. If max_persist is less than the target, reduce the target to max_persist. if key_is_element: store_reader = iter((self._store.read(target_labels, config=self._config[target_labels]),)) # type: ignore targets_items = ((target_labels, target_values),) # type: ignore # more than one Frame elif (not max_persist_active or max_persist == 1 or loaded_needed <= loaded_available # pyright: ignore ): # only read-in labels that are deferred; as loaded_needed is less than loaded_available, no Frame will be removed if target_loaded_count: labels_to_read = target_labels[~target_loaded] else: # no targets are loaded labels_to_read = target_labels store_reader = self._store.read_many(labels_to_read, config=self._config) targets_items = zip(target_labels, target_values) # max_persist_active, must delete some Frame else: if loaded_needed <= max_persist: # loaded_needed is less than _max_persist but greater than loaded_available, meaning that some Frame have to be deleted. we must ensure we do not delete a Frame we already have loaded within the target region, so move them to the back of the LRU if target_loaded_count: # update LRU position to ensure we do not delete in target for label in target_labels[target_loaded]: # update LRU position self._last_accessed[label] = self._last_accessed.pop(label, None) labels_to_read = target_labels[~target_loaded] else: # no targets are loaded labels_to_read = target_labels else: # loaded_needed > max_persist: # Need to load more than max_persist, so limit to last max_persist-length components. All other Frame, if loaded, will be deleted # assert max_persist < len(target_labels) target_labels = target_labels[-max_persist:] # type: ignore # pylint: disable=E1130 target_values = target_values[-max_persist:] # type: ignore # pylint: disable=E1130 target_loaded = target_loaded[-max_persist:] # type: ignore # pylint: disable=E1130 if target_loaded.any(): # update LRU position to ensure we do not delete in target for label in target_labels[target_loaded]: self._last_accessed[label] = self._last_accessed.pop(label, None) labels_to_read = target_labels[~target_loaded] else: # no targets are loaded, will only load a subset of targets of size equal to max_persist; can unpersist everything else labels_to_read = target_labels array[self._loaded] = FrameDeferred self._loaded[NULL_SLICE] = False self._last_accessed.clear() store_reader = self._store.read_many(labels_to_read, config=self._config) targets_items = zip(target_labels, target_values) # Iterate over items that have been selected; there must be at least 1 FrameDeferred among this selection. Note that we iterate over all Frame in the target, not just those form the store, as we need to update LRU positions for all values in the target for label, frame in targets_items: # pyright: ignore idx = index._loc_to_iloc(label) if frame is FrameDeferred: frame = next(store_reader) array[idx] = frame self._loaded[idx] = True # update loaded status if max_persist_active: loaded_count += 1 if max_persist_active: # update LRU position self._last_accessed[label] = self._last_accessed.pop(label, None) if loaded_count > max_persist: # pyright: ignore label_remove = next(iter(self._last_accessed)) del self._last_accessed[label_remove] idx_remove = index._loc_to_iloc(label_remove) self._loaded[idx_remove] = False array[idx_remove] = FrameDeferred loaded_count -= 1 self._loaded_all = self._loaded.all()
[docs] def unpersist(self) -> None: '''Replace all loaded :obj:`Frame` with :obj:`FrameDeferred`. ''' if self._store is None: # no-op so Yarn or Quilt can call regardless of Store return self._values_mutable[self._loaded] = FrameDeferred self._loaded[NULL_SLICE] = False self._loaded_all = False if self._max_persist is not None: self._last_accessed.clear()
#--------------------------------------------------------------------------- # extraction def _extract_iloc(self, key: TILocSelector) -> tp.Self: ''' Returns: Bus or, if an element is selected, a Frame ''' self._update_values_mutable_iloc(key=key) # iterable selection should be handled by NP values: tp.Any = self._values_mutable[key] # NOTE: Bus only stores Frame and FrameDeferred, can rely on check with values if not values.__class__ is np.ndarray: # if we have a single element return values #type: ignore return self.__class__(values, index=self._index.iloc[key], name=self._name, store=self._store, config=self._config, max_persist=self._max_persist, own_index=True, own_data=False, # force immutable copy ) def _extract_loc(self, key: TLocSelector) -> tp.Self: iloc_key = self._index._loc_to_iloc(key) return self._extract_iloc(iloc_key)
[docs] @doc_inject(selector='selector') def __getitem__(self, key: TLocSelector) -> tp.Self: '''Selector of values by label. Args: key: {key_loc} ''' return self._extract_loc(key)
#--------------------------------------------------------------------------- # utilities for alternate extraction: drop def _drop_iloc(self, key: TILocSelector) -> tp.Self: series = self._to_series_state()._drop_iloc(key) return self._derive_from_series(series, own_data=True) def _drop_loc(self, key: TLocSelector) -> tp.Self: return self._drop_iloc(self._index._loc_to_iloc(key)) #--------------------------------------------------------------------------- # axis functions def _axis_element_items(self, ) -> tp.Iterator[tp.Tuple[TLabel, TFrameAny]]: '''Generator of index, value pairs, equivalent to Series.items(). Repeated to have a common signature as other axis functions. ''' yield from self.items() def _axis_element(self, ) -> tp.Iterator[tp.Any]: if self._loaded_all: yield from self._values_mutable elif self._max_persist is None: # load all at once if possible if not self._loaded_all: self._update_values_mutable_iloc(key=NULL_SLICE) yield from self._values_mutable elif self._max_persist > 1: i = 0 i_max = len(self._index.values) while i < i_max: # draw values up to size of max_persist key = slice(i, min(i + self._max_persist, i_max)) self._update_values_mutable_iloc(key=key) for j in range(key.start, key.stop): yield self._values_mutable[j] i += self._max_persist else: # max_persist is 1 for i in range(self.__len__()): self._update_values_mutable_iloc(key=i) yield self._values_mutable[i] #--------------------------------------------------------------------------- # dictionary-like interface; these will force loading contained Frame
[docs] def items(self) -> tp.Iterator[tp.Tuple[TLabel, TFrameAny]]: '''Iterator of pairs of :obj:`Bus` label and contained :obj:`Frame`. ''' if self._loaded_all: yield from zip(self._index, self._values_mutable) elif self._max_persist is None: # load all at once if possible if not self._loaded_all: self._update_values_mutable_iloc(key=NULL_SLICE) yield from zip(self._index, self._values_mutable) elif self._max_persist > 1: # if _max_persist is greater than 1, load as many Frame as possible (up to the max persist) at a time; this optimizes read operations from the Store labels = self._index.values i = 0 i_max = len(labels) while i < i_max: key = slice(i, min(i + self._max_persist, i_max)) labels_select = labels[key] # may over select self._update_values_mutable_iloc(key=key) yield from zip(labels_select, self._values_mutable[key]) i += self._max_persist else: # max_persist is 1 for i, label in enumerate(self._index.values): self._update_values_mutable_iloc(key=i) yield label, self._values_mutable[i]
_items_store = items @property def values(self) -> TNDArrayAny: '''A 1D object array of all :obj:`Frame` contained in the :obj:`Bus`. The returned ``np.ndarray`` will have ``Frame``; this will never return an array with ``FrameDeferred``, but ``max_persist`` will be observed in reading from the Store. ''' # NOTE: when self._values_mutable is fully loaded, it could become immutable and avoid a copy. However, with unpersist(), we might unload all Frame if self._loaded_all: post = self._values_mutable.copy() post.flags.writeable = False return post if self._max_persist is None: # load all at once if possible # b._loaded_all must be False self._update_values_mutable_iloc(key=NULL_SLICE) post = self._values_mutable.copy() post.flags.writeable = False return post # return a new array; force new iteration to account for max_persist post = np.empty(self.__len__(), dtype=object) if self._max_persist > 1: i = 0 i_max = len(self._index.values) while i < i_max: key = slice(i, min(i + self._max_persist, i_max)) # draw values to force usage of read_many in _store_reader self._update_values_mutable_iloc(key=key) post[key] = self._values_mutable[key] i += self._max_persist else: # max_persist is 1 for i in range(self.__len__()): self._update_values_mutable_iloc(key=i) post[i] = self._values_mutable[i] post.flags.writeable = False return post #---------------------------------------------------------------------------
[docs] def __len__(self) -> int: '''Length of values. ''' return self._index.__len__()
[docs] @doc_inject() def display(self, config: tp.Optional[DisplayConfig] = None, *, style_config: tp.Optional[StyleConfig] = None, ) -> Display: '''{doc} Args: {config} ''' # NOTE: the key change over serires is providing the Bus as the displayed class config = config or DisplayActive.get() display_cls = Display.from_values((), header=DisplayHeader(self.__class__, self._name), config=config) return self._to_series_state()._display(config, display_cls=display_cls, style_config=style_config, )
#--------------------------------------------------------------------------- # extended discriptors; in general, these do not force loading Frame @property def mloc(self) -> TSeriesObject: '''Returns a :obj:`Series` showing a tuple of memory locations within each loaded Frame. ''' if not self._loaded.any(): return Series.from_element(None, index=self._index) def gen() -> tp.Iterator[tp.Tuple[TLabel, tp.Optional[tp.Tuple[int, ...]]]]: for label, f in zip(self._index, self._values_mutable): if f is FrameDeferred: yield label, None else: yield label, tuple(f.mloc) return Series.from_items(gen()) @property def dtypes(self) -> TFrameAny: '''Returns a :obj:`Frame` of dtype per column for all loaded Frames. ''' if not self._loaded.any(): return Frame(index=self._index) f: TFrameAny = Frame.from_concat( frames=(f.dtypes for f in self._values_mutable if f is not FrameDeferred), fill_value=None, ).reindex(index=self._index, fill_value=None) return f @property def shapes(self) -> TSeriesObject: '''A :obj:`Series` describing the shape of each loaded :obj:`Frame`. Unloaded :obj:`Frame` will have a shape of None. Returns: :obj:`Series` ''' values = (f.shape if f is not FrameDeferred else None for f in self._values_mutable) return Series(values, index=self._index, dtype=DTYPE_OBJECT, name='shape') @property def nbytes(self) -> int: '''Total bytes of data currently loaded in the Bus. ''' return sum(f.nbytes if f is not FrameDeferred else 0 for f in self._values_mutable) @property def status(self) -> TFrameAny: ''' Return a :obj:`Frame` indicating loaded status, size, bytes, and shape of all loaded :obj:`Frame`. ''' def gen() -> tp.Iterator[TSeriesAny]: yield Series(self._loaded, index=self._index, dtype=DTYPE_BOOL, name='loaded') for attr, dtype, missing in ( ('size', DTYPE_FLOAT_DEFAULT, np.nan), ('nbytes', DTYPE_FLOAT_DEFAULT, np.nan), ('shape', DTYPE_OBJECT, None) ): values = (getattr(f, attr) if f is not FrameDeferred else missing for f in self._values_mutable) yield Series(values, index=self._index, dtype=dtype, name=attr) return Frame.from_concat(gen(), axis=1) #--------------------------------------------------------------------------- # common attributes from the numpy array @property def dtype(self) -> TDtypeObject: ''' Return the dtype of the underlying NumPy array. Returns: :obj:`numpy.dtype` ''' return DTYPE_OBJECT @property def shape(self) -> tp.Tuple[int]: ''' Return a tuple describing the shape of the underlying NumPy array. Returns: :obj:`Tuple[int]` ''' return self._values_mutable.shape #type: ignore @property def ndim(self) -> int: ''' Return the number of dimensions, which for a :obj:`Bus` is always 1. Returns: :obj:`int` ''' return self._NDIM @property def size(self) -> int: ''' Return the size of the underlying NumPy array. Returns: :obj:`int` ''' return self._values_mutable.size #--------------------------------------------------------------------------- @property def index(self) -> IndexBase: ''' The index instance assigned to this container. Returns: :obj:`Index` ''' return self._index #--------------------------------------------------------------------------- # dictionary-like interface
[docs] def keys(self) -> IndexBase: ''' Iterator of index labels. Returns: :obj:`Iterator[Hashable]` ''' return self._index
[docs] def __iter__(self) -> tp.Iterator[TLabel]: ''' Iterator of index labels, same as :obj:`static_frame.Series.keys`. Returns: :obj:`Iterator[Hashasble]` ''' return self._index.__iter__()
[docs] def __contains__(self, value: TLabel) -> bool: ''' Inclusion of value in index labels. Returns: :obj:`bool` ''' return self._index.__contains__(value)
[docs] def get(self, key: TLabel, default: tp.Any = None, ) -> tp.Any: ''' Return the value found at the index key, else the default if the key is not found. Returns: :obj:`Any` ''' if key not in self._index: return default # will always return an element return self._extract_loc(key=key)
#---------------------------------------------------------------------------
[docs] @doc_inject() def equals(self, other: tp.Any, *, compare_name: bool = False, compare_dtype: bool = False, compare_class: bool = False, skipna: bool = True, ) -> bool: ''' {doc} Note: this will attempt to load and compare all Frame managed by the Bus. Args: {compare_name} {compare_dtype} {compare_class} {skipna} ''' if id(other) == id(self): return True if compare_class and self.__class__ != other.__class__: return False elif not isinstance(other, Bus): return False # NOTE: dtype self._series is always object if len(self._index) != len(other._index): return False if compare_name and self._name != other._name: return False if not self._index.equals( other._index, compare_name=compare_name, compare_dtype=compare_dtype, compare_class=compare_class, skipna=skipna, ): return False # can zip because length of Series already match # using .values will force loading all Frame into memory; better to use items() to permit collection for (_, frame_self), (_, frame_other) in zip(self.items(), other.items()): if not frame_self.equals(frame_other, compare_name=compare_name, compare_dtype=compare_dtype, compare_class=compare_class, skipna=skipna, ): return False return True
#--------------------------------------------------------------------------- # transformations resulting in changed dimensionality
[docs] @doc_inject(selector='head', class_name='Bus') def head(self, count: int = 5) -> TBusAny: '''{doc} Args: {count} Returns: :obj:`Bus` ''' return self.iloc[:count]
[docs] @doc_inject(selector='tail', class_name='Bus') def tail(self, count: int = 5) -> TBusAny: '''{doc}s Args: {count} Returns: :obj:`Bus` ''' return self.iloc[-count:]
#--------------------------------------------------------------------------- # transformations resulting in the same dimensionality
[docs] @doc_inject(selector='sort') def sort_index(self, *, ascending: TBoolOrBools = True, kind: TSortKinds = DEFAULT_SORT_KIND, key: tp.Optional[tp.Callable[[IndexBase], tp.Union[TNDArrayAny, IndexBase]]] = None, ) -> tp.Self: ''' Return a new Bus ordered by the sorted Index. Args: * {ascendings} {kind} {key} Returns: :obj:`Bus` ''' series = self._to_series_state().sort_index( ascending=ascending, kind=kind, key=key, ) return self._derive_from_series(series, own_data=True)
[docs] @doc_inject(selector='sort') def sort_values(self, *, ascending: bool = True, kind: TSortKinds = DEFAULT_SORT_KIND, key: tp.Callable[[TSeriesAny], tp.Union[TNDArrayAny, TSeriesAny]], ) -> tp.Self: ''' Return a new Bus ordered by the sorted values. Note that as a Bus contains Frames, a `key` argument must be provided to extract a sortable value, and this key function will process a :obj:`Series` of :obj:`Frame`. Args: * {ascending} {kind} {key} Returns: :obj:`Bus` ''' cfs = self.to_series() # force loading all Frame series = cfs.sort_values( ascending=ascending, kind=kind, key=key, ) return self._derive_from_series(series, own_data=True)
[docs] def roll(self, shift: int, *, include_index: bool = False, ) -> tp.Self: '''Return a Bus with values rotated forward and wrapped around the index (with a positive shift) or backward and wrapped around the index (with a negative shift). Args: shift: Positive or negative integer shift. include_index: Determine if the Index is shifted with the underlying data. Returns: :obj:`Bus` ''' series = self._to_series_state().roll(shift=shift, include_index=include_index) return self._derive_from_series(series, own_data=True)
[docs] def shift(self, shift: int, *, fill_value: tp.Any, ) -> tp.Self: '''Return a :obj:`Bus` with values shifted forward on the index (with a positive shift) or backward on the index (with a negative shift). Args: shift: Positive or negative integer shift. fill_value: Value to be used to fill data missing after the shift. Returns: :obj:`Bus` ''' series = self._to_series_state().shift(shift=shift, fill_value=fill_value) return self._derive_from_series(series, own_data=True)
#--------------------------------------------------------------------------- # exporter def _to_series_state(self) -> TSeriesObject: # the mutable array will be copied in the Series construction return Series(self._values_mutable, index=self._index, own_index=True, name=self._name, )
[docs] def to_series(self) -> TSeriesObject: '''Return a :obj:`Series` with the :obj:`Frame` contained in this :obj:`Bus`. If the :obj:`Bus` is associated with a :obj:`Store`, all :obj:`Frame` will be loaded into memory and the returned :obj:`Bus` will no longer be associated with the :obj:`Store`. ''' # values returns an immutable array and will fully realize from Store return Series(self.values, index=self._index, own_index=True, name=self._name, )
def _to_signature_bytes(self, include_name: bool = True, include_class: bool = True, encoding: str = 'utf-8', ) -> bytes: v = (f._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding, ) for f in self._axis_element()) return b''.join(chain( iter_component_signature_bytes(self, include_name=include_name, include_class=include_class, encoding=encoding), (self._index._to_signature_bytes( include_name=include_name, include_class=include_class, encoding=encoding),), v))
TBusAny = Bus[tp.Any]