from __future__ import annotations
from datetime import datetime
from datetime import timezone
from itertools import chain
from itertools import islice
from itertools import zip_longest
from pathlib import Path
import numpy as np
import typing_extensions as tp
from static_frame.core.container import ContainerBase
from static_frame.core.container_util import index_from_optional_constructor
from static_frame.core.container_util import iter_component_signature_bytes
from static_frame.core.display import Display
from static_frame.core.display import DisplayActive
from static_frame.core.display import DisplayHeader
from static_frame.core.doc_str import doc_inject
from static_frame.core.exception import ErrorInitBus
from static_frame.core.exception import ErrorInitIndexNonUnique
from static_frame.core.exception import immutable_type_error_factory
from static_frame.core.frame import Frame
from static_frame.core.index import Index
from static_frame.core.index_base import IndexBase
from static_frame.core.node_iter import IterNodeApplyType
from static_frame.core.node_iter import IterNodeNoArgReducible
from static_frame.core.node_selector import InterfacePersist
from static_frame.core.node_selector import InterfaceSelectTrio
from static_frame.core.node_selector import InterGetItemILocReduces
from static_frame.core.node_selector import InterGetItemLocReduces
from static_frame.core.series import Series
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.store_config import StoreConfigMap
from static_frame.core.store_config import StoreConfigMapInitializer
from static_frame.core.store_sqlite import StoreSQLite
from static_frame.core.store_xlsx import StoreXLSX
from static_frame.core.store_zip import StoreZipCSV
from static_frame.core.store_zip import StoreZipNPY
from static_frame.core.store_zip import StoreZipNPZ
from static_frame.core.store_zip import StoreZipParquet
from static_frame.core.store_zip import StoreZipPickle
from static_frame.core.store_zip import StoreZipTSV
from static_frame.core.util import DEFAULT_SORT_KIND
from static_frame.core.util import DTYPE_BOOL
from static_frame.core.util import DTYPE_FLOAT_DEFAULT
from static_frame.core.util import DTYPE_OBJECT
from static_frame.core.util import INT_TYPES
from static_frame.core.util import NAME_DEFAULT
from static_frame.core.util import NULL_SLICE
from static_frame.core.util import ZIP_LONGEST_DEFAULT
from static_frame.core.util import IterNodeType
from static_frame.core.util import TBoolOrBools
from static_frame.core.util import TILocSelector
from static_frame.core.util import TIndexCtorSpecifier
from static_frame.core.util import TIndexCtorSpecifiers
from static_frame.core.util import TIndexInitializer
from static_frame.core.util import TLabel
from static_frame.core.util import TLocSelector
from static_frame.core.util import TName
from static_frame.core.util import TNDArrayObject
from static_frame.core.util import TPathSpecifier
from static_frame.core.util import TSortKinds
from static_frame.core.util import bytes_to_size_label
if tp.TYPE_CHECKING:
from collections.abc import Container # pragma: no cover
from static_frame.core.display_config import DisplayConfig # pragma: no cover
from static_frame.core.index_auto import TIndexAutoFactory # pragma: no cover
from static_frame.core.index_auto import TRelabelInput # pragma: no cover
from static_frame.core.store import Store # pragma: no cover
from static_frame.core.style_config import StyleConfig # pragma: no cover
#-------------------------------------------------------------------------------
class FrameDeferredMeta(type):
def __repr__(cls) -> str:
return f'<{cls.__name__}>'
class FrameDeferred(metaclass=FrameDeferredMeta):
'''
Token placeholder for :obj:`Frame` not yet loaded.
'''
#-------------------------------------------------------------------------------
if tp.TYPE_CHECKING:
from static_frame.core.generic_aliases import TFrameAny # pragma: no cover
from static_frame.core.generic_aliases import TSeriesAny # pragma: no cover
TNDArrayAny = np.ndarray[tp.Any, tp.Any] #pragma: no cover
TDtypeAny = np.dtype[tp.Any] #pragma: no cover
TDtypeObject = np.dtype[np.object_] #pragma: no cover
TSeriesObject = Series[tp.Any, np.object_] #pragma: no cover
TBusItems = tp.Iterable[tp.Tuple[ #pragma: no cover
TLabel, tp.Union[TFrameAny, tp.Type[FrameDeferred]]]] #pragma: no cover
TIterFrame = tp.Iterator[TFrameAny] #pragma: no cover
#-------------------------------------------------------------------------------
TVIndex = tp.TypeVar('TVIndex', bound=IndexBase, default=tp.Any)
[docs]
class Bus(ContainerBase, StoreClientMixin, tp.Generic[TVIndex]): # not a ContainerOperand
'''
A randomly-accessible container of :obj:`Frame`. When created from a multi-table storage format (such as a zip-pickle or XLSX), a Bus will lazily read in components as they are accessed. When combined with the ``max_persist`` parameter, a Bus will not hold on to more than ``max_persist`` references, permitting low-memory reading of collections of :obj:`Frame`.
'''
__slots__ = (
'_loaded',
'_loaded_all',
'_values_mutable',
'_index',
'_name',
'_store',
'_config',
'_last_loaded',
'_max_persist',
)
_values_mutable: TNDArrayAny
_index: IndexBase
_store: tp.Optional[Store]
_config: StoreConfigMap
_name: TName
STATIC = False
_NDIM: int = 1
[docs]
@classmethod
def from_items(cls,
pairs: tp.Iterable[tp.Tuple[TLabel, TFrameAny]],
*,
config: StoreConfigMapInitializer = None,
name: TName = None,
index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None
) -> tp.Self:
'''Return a :obj:`Bus` from an iterable of pairs of label, :obj:`Frame`.
Returns:
:obj:`Bus`
'''
frames = []
index = []
for i, f in pairs: # might be a generator
index.append(i)
frames.append(f)
return cls(frames,
index=index,
index_constructor=index_constructor,
name=name,
config=config,
)
[docs]
@classmethod
def from_frames(cls,
frames: tp.Iterable[TFrameAny],
*,
index_constructor: TIndexCtorSpecifier = None,
config: StoreConfigMapInitializer = None,
name: TName = None,
) -> tp.Self:
'''Return a :obj:`Bus` from an iterable of :obj:`Frame`; labels will be drawn from :obj:`Frame.name`.
'''
try:
return cls.from_items(((f.name, f) for f in frames),
index_constructor=index_constructor,
config=config,
name=name,
)
except ErrorInitIndexNonUnique:
raise ErrorInitIndexNonUnique('Frames do not have unique names.') from None
[docs]
@classmethod
def from_dict(cls,
mapping: tp.Dict[TLabel, TFrameAny],
*,
name: TName = None,
index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None
) -> tp.Self:
'''Bus construction from a mapping of labels and :obj:`Frame`.
Args:
mapping: a dictionary or similar mapping interface.
Returns:
:obj:`Bus`
'''
return cls(frames=mapping.values(),
index=mapping.keys(),
index_constructor=index_constructor,
name=name,
)
[docs]
@classmethod
def from_series(cls,
series: TSeriesAny,
*,
store: tp.Optional[Store] = None,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
own_data: bool = False,
) -> tp.Self:
'''
Create a :obj:`Bus` from a :obj:`Series` of :obj:`Frame`.
'''
# NOTE: this interface is for 0.9 after the default Bus no longer accepts a Series
return cls(series.values,
index=series.index,
store=store,
config=config,
max_persist=max_persist,
own_data=own_data,
own_index=True,
name=series.name,
)
[docs]
@classmethod
def from_concat(cls,
containers: tp.Iterable[TBusAny],
*,
index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
name: TName = NAME_DEFAULT,
) -> tp.Self:
'''
Concatenate multiple :obj:`Bus` into a new :obj:`Bus`. All :obj:`Bus` will load all :obj:`Frame` into memory if any are deferred.
'''
# will extract .values, .index from Bus, which will correct load from Store as needed
# NOTE: useful to use Series here as it handles aligned names, IndexAutoFactory, etc.
series: TSeriesObject = Series.from_concat(containers, index=index, name=name)
return cls.from_series(series, own_data=True)
#---------------------------------------------------------------------------
# constructors by data format
@classmethod
def _from_store(cls,
store: Store,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
return cls(None, # will generate FrameDeferred array
index=store.labels(config=config),
index_constructor=index_constructor,
store=store,
config=config,
max_persist=max_persist,
own_data=True,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_tsv(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped TSV :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipTSV(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_csv(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped CSV :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipCSV(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_pickle(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped pickle :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipPickle(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_npz(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped NPZ :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipNPZ(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_npy(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped NPY :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipNPY(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_zip_parquet(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to zipped parquet :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipParquet(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_xlsx(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to an XLSX :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
# how to pass configuration for multiple sheets?
store = StoreXLSX(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs]
@classmethod
@doc_inject(selector='bus_constructor')
def from_sqlite(cls,
fp: TPathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
Given a file path to an SQLite :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreSQLite(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
#---------------------------------------------------------------------------
[docs]
def __init__(self,
frames: TNDArrayAny | tp.Iterable[TFrameAny | tp.Type[FrameDeferred]] | None,
*,
index: TIndexInitializer,
index_constructor: TIndexCtorSpecifier = None,
name: TName = NAME_DEFAULT,
store: tp.Optional[Store] = None,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
own_index: bool = False,
own_data: bool = False,
):
'''
Default Bus constructor.
{args}
'''
if max_persist is not None:
if max_persist < 1:
raise ErrorInitBus('Cannot initialize a :obj:`Bus` with `max_persist` less than 1; use `None` to disable `max_persist`.')
# use an dict to give use an ordered set pointing to None for all keys
self._last_loaded: tp.Dict[TLabel, None] = {}
if own_index:
self._index = index #type: ignore
else:
self._index = index_from_optional_constructor(index,
default_constructor=Index,
explicit_constructor=index_constructor
)
count = len(self._index) # pyright: ignore
frames_array: TNDArrayAny
self._loaded: TNDArrayAny
load_array: bool | np.bool_
self._loaded_all: bool | np.bool_
if frames is None:
if store is None:
raise ErrorInitBus('Cannot initialize a :obj:`Bus` with neither `frames` nor `store`.')
self._values_mutable = np.full(count, FrameDeferred, dtype=DTYPE_OBJECT)
self._loaded = np.full(count, False, dtype=DTYPE_BOOL)
self._loaded_all = False
else:
if frames.__class__ is np.ndarray:
if frames.dtype != DTYPE_OBJECT: #type: ignore
raise ErrorInitBus(
f'Series passed to initializer must have dtype object, not {frames.dtype}') #type: ignore
frames_array = frames # type: ignore
load_array = False
else:
if own_data:
raise ErrorInitBus('Cannot use `own_data` when not supplying an array.')
frames_array = np.empty(count, dtype=DTYPE_OBJECT)
load_array = True
self._loaded = np.empty(count, dtype=DTYPE_BOOL)
# do a one time iteration of series
for i, (label, value) in enumerate(zip_longest(
index,
frames,
fillvalue=ZIP_LONGEST_DEFAULT,
)):
if label is ZIP_LONGEST_DEFAULT or value is ZIP_LONGEST_DEFAULT:
raise ErrorInitBus('frames and index are not of equal length')
if load_array:
frames_array[i] = value
if value is FrameDeferred:
self._loaded[i] = False
elif isinstance(value, Frame): # permit FrameGO?
if max_persist is not None:
self._last_loaded[label] = None
self._loaded[i] = True
else:
raise ErrorInitBus(f'supplied {value.__class__} is not a Frame or FrameDeferred.')
self._loaded_all = self._loaded.all()
if own_data or load_array:
self._values_mutable = frames_array
else:
self._values_mutable = frames_array.copy()
self._values_mutable.flags.writeable = True
# self._index = index
self._name = None if name is NAME_DEFAULT else name
self._store = store
# Not handling cases of max_persist being greater than the length of the Series (might floor to length)
if max_persist is not None and max_persist < self._loaded.sum():
raise ErrorInitBus('max_persist cannot be less than the number of already loaded Frames')
self._max_persist = max_persist
# providing None will result in default; providing a StoreConfig or StoreConfigMap will return an appropriate map
self._config = StoreConfigMap.from_initializer(config)
#---------------------------------------------------------------------------
def _derive_from_series(self,
series: TSeriesObject,
*,
own_data: bool = False,
) -> tp.Self:
'''Utility for creating a derived Bus, propagating the associated ``Store`` and configuration. This can be used if the passed `series` is a subset or re-ordering of self._series; however, if the index has been transformed, this method should not be used, as, if there is a Store, the labels are no longer found in that Store.
'''
# NOTE: there may be a more efficient path than using a Series
return self.__class__.from_series(series,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_data=own_data,
)
# ---------------------------------------------------------------------------
[docs]
def __reversed__(self) -> tp.Iterator[TLabel]:
'''
Returns a reverse iterator on the :obj:`Bus` index.
Returns:
:obj:`Index`
'''
return reversed(self._index)
#---------------------------------------------------------------------------
# name interface
@property
@doc_inject()
def name(self) -> TName:
'''{}'''
return self._name
[docs]
def rename(self, name: TName) -> tp.Self:
'''
Return a new :obj:`Bus` with an updated name attribute.
'''
# NOTE: do not want to use .values as this will force loading all Frames; use _values_mutable and let a copy be made by constructor
return self.__class__(self._values_mutable,
index=self._index,
name=name,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_index=True,
own_data=False,
)
#---------------------------------------------------------------------------
# interfaces
@property
def loc(self) -> InterGetItemLocReduces[TBusAny, np.object_]:
return InterGetItemLocReduces(self._extract_loc)
@property
def iloc(self) -> InterGetItemILocReduces[TBusAny, np.object_]:
return InterGetItemILocReduces(self._extract_iloc)
@property
def drop(self) -> InterfaceSelectTrio[TBusAny]:
'''
Interface for dropping elements from :obj:`static_frame.Bus`.
'''
return InterfaceSelectTrio( #type: ignore
func_iloc=self._drop_iloc,
func_loc=self._drop_loc,
func_getitem=self._drop_loc
)
@property
def persist(self) -> InterfacePersist[TBusAny]:
'''
Interface for selectively (or completely) pre-load `Frame` from a store to optimize subsequent single `Frame` extraction.
'''
return InterfacePersist(
func_iloc=self._persist_iloc,
func_loc=self._persist_loc,
func_getitem=self._persist_loc,
)
#---------------------------------------------------------------------------
@property
def iter_element(self) -> IterNodeNoArgReducible[TBusAny]:
'''
Iterator of elements.
'''
return IterNodeNoArgReducible(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.VALUES,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
@property
def iter_element_items(self) -> IterNodeNoArgReducible[TBusAny]:
'''
Iterator of label, element pairs.
'''
return IterNodeNoArgReducible(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.ITEMS,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
#---------------------------------------------------------------------------
# index manipulation
# NOTE: must return a new Bus with fully-realized Frames, as cannot guarantee usage of a Store after labels have been changed.
[docs]
@doc_inject(selector='reindex', class_name='Bus')
def reindex(self,
index: TIndexInitializer,
*,
fill_value: tp.Any,
own_index: bool = False,
check_equals: bool = True
) -> tp.Self:
'''
{doc}
Args:
index: {index_initializer}
columns: {index_initializer}
{fill_value}
{own_index}
'''
series = self.to_series().reindex(index,
fill_value=fill_value,
own_index=own_index,
check_equals=check_equals,
)
# NOTE: do not propagate store after reindex
return self.__class__.from_series(series, config=self._config)
[docs]
@doc_inject(selector='relabel', class_name='Bus')
def relabel(self,
index: tp.Optional[TRelabelInput],
*,
index_constructor: TIndexCtorSpecifier = None,
) -> tp.Self:
'''
{doc}
Args:
index: {relabel_input_index}
'''
# NOTE: can be done without going trhough a series
series = self.to_series().relabel(index, index_constructor=index_constructor)
# NOTE: do not propagate store after relabel
return self.__class__.from_series(series, config=self._config)
[docs]
@doc_inject(selector='relabel_flat', class_name='Bus')
def relabel_flat(self) -> tp.Self:
'''
{doc}
'''
series = self.to_series().relabel_flat()
return self.__class__.from_series(series, config=self._config)
[docs]
@doc_inject(selector='relabel_level_add', class_name='Bus')
def relabel_level_add(self,
level: TLabel
) -> tp.Self:
'''
{doc}
Args:
level: {level}
'''
series = self.to_series().relabel_level_add(level)
return self.__class__.from_series(series, config=self._config)
[docs]
@doc_inject(selector='relabel_level_drop', class_name='Bus')
def relabel_level_drop(self,
count: int = 1
) -> tp.Self:
'''
{doc}
Args:
count: {count}
'''
series = self.to_series().relabel_level_drop(count)
return self.__class__.from_series(series, config=self._config)
[docs]
def rehierarch(self,
depth_map: tp.Sequence[int],
*,
index_constructors: TIndexCtorSpecifiers = None,
) -> tp.Self:
'''
Return a new :obj:`Bus` with new a hierarchy based on the supplied ``depth_map``.
'''
series = self.to_series().rehierarch(
depth_map,
index_constructors=index_constructors,
)
return self.__class__.from_series(series, config=self._config)
#---------------------------------------------------------------------------
# na / falsy handling
# NOTE: these methods are not implemented, as a Bus must contain only Frame or FrameDeferred
#---------------------------------------------------------------------------
# cache management
[docs]
def unpersist(self) -> None:
'''Replace all loaded :obj:`Frame` with :obj:`FrameDeferred`.
'''
if self._store is None:
# no-op so Yarn or Quilt can call regardless of Store
return
self._values_mutable[self._loaded] = FrameDeferred
self._loaded[NULL_SLICE] = False
self._loaded_all = False
if self._max_persist is not None:
self._last_loaded.clear()
#---------------------------------------------------------------------------
def _unpersist_next(self, labels_retain: Container[TLabel]) -> None:
'''Remove the next available loaded Frame. This does not adjust self._loaded_all.
Args:
labels_retain: container of labels that are in the assignment region that are not being loaded and need to be retained.
'''
# avoid mutating self._last_loaded while iterating; avoid creating a list if not needed
restore: list[TLabel] | None = None
for label_remove in self._last_loaded:
if label_remove not in labels_retain:
break
if restore is None:
restore = [label_remove]
else:
restore.append(label_remove)
if restore is not None:
for label in restore: # move to back
self._last_loaded[label] = self._last_loaded.pop(label)
del self._last_loaded[label_remove]
idx_remove = self._index._loc_to_iloc(label_remove)
self._loaded[idx_remove] = False
self._values_mutable[idx_remove] = FrameDeferred
def _persist_one(self,
pos: int | np.integer[tp.Any],
update_last_loaded: bool,
) -> Frame:
label = self._index[pos]
f: Frame = self._store.read(label, config=self._config) # type: ignore
self._values_mutable[pos] = f
self._loaded[pos] = True # update loaded status
if update_last_loaded:
self._last_loaded[label] = None
return f
#---------------------------------------------------------------------------
def _update_mutable_persistent_one(self,
key: TILocSelector,
) -> None:
if self._loaded_all or not isinstance(key, INT_TYPES) or self._loaded[key]:
return
_ = self._persist_one(key, False)
self._loaded_all = self._loaded.all()
def _update_mutable_persistent_iter(self) -> tp.Iterator[Frame]:
values_mutable = self._values_mutable
if self._loaded_all:
yield from values_mutable
index = self._index
loaded = self._loaded # Boolean array
loaded_count = loaded.sum()
size = len(loaded)
labels_to_load: tp.Iterable[TLabel]
if index._NDIM == 2: # if an IndexHierarchy avoid going to an array
labels_to_load = index[~loaded]
else:
labels_to_load = index.values[~loaded]
store_reader = self._store.read_many(labels_to_load, config=self._config) # type: ignore
for idx in range(size): # iter over all values
if not loaded[idx]:
f = next(store_reader)
values_mutable[idx] = f
loaded[idx] = True
loaded_count += 1
self._loaded_all = loaded_count == size
yield f
else:
yield values_mutable[idx]
def _update_mutable_persistant_many(self,
key: TILocSelector,
) -> None:
'''Load all `Frame` targeted by `key`.
'''
if self._loaded_all:
return
index = self._index
loaded = self._loaded # Boolean array
values_mutable = self._values_mutable
if key.__class__ is slice and key == NULL_SLICE:
labels_unloaded = ~loaded
else:
targets = np.zeros(len(loaded), dtype=DTYPE_BOOL)
targets[key] = True
labels_unloaded = ~loaded & targets
if labels_unloaded.any():
labels_to_load: tp.Iterable[TLabel]
if index._NDIM == 2: # if an IndexHierarchy avoid going to an array
labels_to_load = index[labels_unloaded]
else:
labels_to_load = index.values[labels_unloaded]
for idx, f in zip(
index.positions[labels_unloaded],
self._store.read_many(labels_to_load, config=self._config) # type: ignore[union-attr]
):
values_mutable[idx] = f
loaded[labels_unloaded] = True
self._loaded_all = self._loaded.all()
#---------------------------------------------------------------------------
def _update_mutable_max_persist_one(self,
key: TILocSelector,
) -> None:
'''
For loading a single element; keys beyond a single element are accepted but are a no-op.
'''
if self._loaded_all or not isinstance(key, INT_TYPES):
return
loaded = self._loaded # Boolean array
loaded_count = loaded.sum()
if loaded[key]:
return
size = len(loaded)
loaded_count += 1
if loaded_count > self._max_persist:
self._unpersist_next(())
loaded_count -= 1
_ = self._persist_one(key, True)
self._loaded_all = loaded_count == size
def _update_mutable_max_persist_iter(self) -> tp.Iterator[Frame]:
'''Iterator of all values in the context of max_persist
'''
values_mutable = self._values_mutable
if self._loaded_all:
yield from values_mutable
max_persist: int = self._max_persist # type: ignore[assignment]
index = self._index
loaded = self._loaded # Boolean array
last_loaded = self._last_loaded
loaded_count = loaded.sum()
size = len(loaded)
if max_persist > 1:
i = 0
labels_to_load: tp.Iterable[TLabel]
while i < size:
i_end = i + max_persist
targets = np.zeros(size, dtype=DTYPE_BOOL)
targets[i: i_end] = True
labels_unloaded = ~loaded & targets
if not labels_unloaded.any():
yield from islice(values_mutable, i, i_end)
else:
if index._NDIM == 2: # if an IndexHierarchy avoid going to an array
labels_to_load = index[labels_unloaded]
else:
labels_to_load = index.values[labels_unloaded]
labels_to_keep = index[targets] # keep as index for lookup
store_reader = self._store.read_many(labels_to_load, config=self._config) # type: ignore
for idx in range(i, min(i_end, size)):
if loaded[idx]:
yield values_mutable[idx]
else:
loaded_count += 1
if loaded_count > max_persist:
self._unpersist_next(labels_to_keep)
loaded_count -= 1
f = next(store_reader)
values_mutable[idx] = f
loaded[idx] = True
last_loaded[index[idx]] = None
self._loaded_all = loaded_count == size
yield f
i = i_end
else: # max_persist is 1
for i in range(size):
if loaded[i]:
yield values_mutable[i]
else:
loaded_count += 1
if loaded_count > max_persist:
self._unpersist_next(())
loaded_count -= 1
f = self._persist_one(i, True)
self._loaded_all = loaded_count == size
yield f
def _update_mutable_max_persist_many(self,
key: TILocSelector,
) -> None:
'''Load all `Frame` targeted by `key`.
'''
if self._loaded_all:
return
index = self._index
loaded = self._loaded # Boolean array
last_loaded = self._last_loaded
values_mutable = self._values_mutable
max_persist: int = self._max_persist # type: ignore[assignment]
loaded_count = loaded.sum()
size = len(loaded)
if key.__class__ is slice and key == NULL_SLICE:
targets = np.ones(size, dtype=DTYPE_BOOL)
labels_unloaded = ~loaded
else:
targets = np.zeros(size, dtype=DTYPE_BOOL)
targets[key] = True
labels_unloaded = ~loaded & targets
if labels_unloaded.any():
labels_to_load: tp.Iterable[TLabel]
if index._NDIM == 2: # if an IndexHierarchy avoid going to an array
labels_to_load = index[labels_unloaded]
else:
labels_to_load = index.values[labels_unloaded]
to_load_count = len(labels_to_load) # type: ignore
if to_load_count + loaded_count <= max_persist:
for label, idx, f in zip(
labels_to_load,
index.positions[labels_unloaded],
self._store.read_many(labels_to_load, config=self._config) # type: ignore[union-attr]
):
values_mutable[idx] = f
last_loaded[label] = None
loaded[labels_unloaded] = True
self._loaded_all = to_load_count + loaded_count == size
else: # load only max_persist count from targets
# from original targets, find max_persist number of indices
mp_key = np.nonzero(targets)[0][-max_persist:]
targets = np.zeros(size, dtype=DTYPE_BOOL)
targets[mp_key] = True
labels_unloaded = ~loaded & targets
if labels_unloaded.any():
if index._NDIM == 2: # if an IndexHierarchy avoid going to an array
labels_to_load = index[labels_unloaded]
else:
labels_to_load = index.values[labels_unloaded]
labels_to_keep = index[targets] # an index for lookup
store_reader = self._store.read_many(labels_to_load, config=self._config) # type: ignore[union-attr]
for label, idx in zip(
labels_to_load,
index.positions[labels_unloaded],
):
loaded_count += 1
if loaded_count > max_persist:
self._unpersist_next(labels_to_keep)
loaded_count -= 1
f = next(store_reader)
values_mutable[idx] = f
last_loaded[label] = None
loaded[labels_unloaded] = True
self._loaded_all = loaded_count == size
#---------------------------------------------------------------------------
def _persist_iloc(self, key: TILocSelector) -> None:
if self._max_persist is None:
self._update_mutable_persistant_many(key)
else:
self._update_mutable_max_persist_many(key)
def _persist_loc(self, key: TLocSelector) -> None:
return self._persist_iloc(self._index._loc_to_iloc(key))
#---------------------------------------------------------------------------
# extraction
def _extract_iloc(self, key: TILocSelector) -> tp.Self:
'''
Returns:
Bus or, if an element is selected, a Frame
'''
if self._max_persist is None:
self._update_mutable_persistent_one(key)
else:
self._update_mutable_max_persist_one(key)
# iterable selection should be handled by NP
values: tp.Any = self._values_mutable[key]
# NOTE: Bus only stores Frame and FrameDeferred, can rely on check with values
if values.__class__ is not np.ndarray: # if we have a single element
return values #type: ignore
return self.__class__(values,
index=self._index.iloc[key],
name=self._name,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_index=True,
own_data=False, # force immutable copy
)
def _extract_loc(self, key: TLocSelector) -> tp.Self:
iloc_key = self._index._loc_to_iloc(key)
return self._extract_iloc(iloc_key)
[docs]
@doc_inject(selector='selector')
def __getitem__(self, key: TLocSelector) -> tp.Self:
'''Selector of values by label.
Args:
key: {key_loc}
'''
return self._extract_loc(key)
def __setitem__(self, key: TLabel, value: tp.Any) -> None:
raise immutable_type_error_factory(self.__class__, '', key, value)
#---------------------------------------------------------------------------
# utilities for alternate extraction: drop
def _drop_iloc(self, key: TILocSelector) -> tp.Self:
series = self._to_series_state()._drop_iloc(key)
return self._derive_from_series(series, own_data=True)
def _drop_loc(self, key: TLocSelector) -> tp.Self:
return self._drop_iloc(self._index._loc_to_iloc(key))
#---------------------------------------------------------------------------
# axis functions
def _axis_element_items(self,
) -> tp.Iterator[tp.Tuple[TLabel, TFrameAny]]:
'''Generator of index, value pairs, equivalent to Series.items(). Repeated to have a common signature as other axis functions.
'''
yield from self.items()
def _axis_element(self,
) -> tp.Iterator[tp.Any]:
max_persist = self._max_persist
if self._loaded_all:
yield from self._values_mutable
elif max_persist is None:
yield from self._update_mutable_persistent_iter()
else:
yield from self._update_mutable_max_persist_iter()
#---------------------------------------------------------------------------
# dictionary-like interface; these will force loading contained Frame
[docs]
def items(self) -> tp.Iterator[tp.Tuple[TLabel, TFrameAny]]:
'''Iterator of pairs of :obj:`Bus` label and contained :obj:`Frame`.
'''
if self._max_persist is None: # load all at once if possible
yield from zip(self._index, self._update_mutable_persistent_iter())
else:
yield from zip(self._index, self._update_mutable_max_persist_iter())
_items_store = items
@property
def values(self) -> TNDArrayObject:
'''A 1D object array of all :obj:`Frame` contained in the :obj:`Bus`. The returned ``np.ndarray`` will have ``Frame``; this will never return an array with ``FrameDeferred``, but ``max_persist`` will be observed in reading from the Store.
'''
# NOTE: when self._values_mutable is fully loaded, it could become immutable and avoid a copy. However, with unpersist(), we might unload all Frame
if self._loaded_all:
post = self._values_mutable.copy()
post.flags.writeable = False
return post
max_persist = self._max_persist
if max_persist is None: # load all at once if possible
post = np.fromiter(self._update_mutable_persistent_iter(), dtype=DTYPE_OBJECT, count=self.__len__())
else:
post = np.fromiter(self._update_mutable_max_persist_iter(), dtype=DTYPE_OBJECT, count=self.__len__())
post.flags.writeable = False
return post
#---------------------------------------------------------------------------
[docs]
def __len__(self) -> int:
'''Length of values.
'''
return self._index.__len__()
[docs]
@doc_inject()
def display(self,
config: tp.Optional[DisplayConfig] = None,
*,
style_config: tp.Optional[StyleConfig] = None,
) -> Display:
'''{doc}
Args:
{config}
'''
# NOTE: the key change over serires is providing the Bus as the displayed class
config = config or DisplayActive.get()
display_cls = Display.from_values((),
header=DisplayHeader(self.__class__, self._name),
config=config)
return self._to_series_state()._display(config,
display_cls=display_cls,
style_config=style_config,
)
#---------------------------------------------------------------------------
# extended discriptors; in general, these do not force loading Frame
@property
def mloc(self) -> TSeriesObject:
'''Returns a :obj:`Series` showing a tuple of memory locations within each loaded Frame.
'''
if not self._loaded.any():
return Series.from_element(None, index=self._index)
def gen() -> tp.Iterator[tp.Tuple[TLabel, tp.Optional[tp.Tuple[int, ...]]]]:
for label, f in zip(self._index, self._values_mutable):
if f is FrameDeferred:
yield label, None
else:
yield label, tuple(f.mloc)
return Series.from_items(gen())
@property
def dtypes(self) -> TFrameAny:
'''Returns a :obj:`Frame` of dtype per column for all loaded Frames.
'''
if not self._loaded.any():
return Frame(index=self._index)
f: TFrameAny = Frame.from_concat(
frames=(f.dtypes for f in self._values_mutable if f is not FrameDeferred),
fill_value=None,
).reindex(index=self._index, fill_value=None)
return f
@property
def shapes(self) -> TSeriesObject:
'''A :obj:`Series` describing the shape of each loaded :obj:`Frame`. Unloaded :obj:`Frame` will have a shape of None.
Returns:
:obj:`Series`
'''
values = (f.shape if f is not FrameDeferred else None for f in self._values_mutable)
return Series(values, index=self._index, dtype=DTYPE_OBJECT, name='shape')
@property
def nbytes(self) -> int:
'''Total bytes of data currently loaded in the Bus.
'''
return sum(f.nbytes if f is not FrameDeferred else 0 for f in self._values_mutable)
@property
def status(self) -> TFrameAny:
'''
Return a :obj:`Frame` indicating loaded status, size, bytes, and shape of all loaded :obj:`Frame`.
'''
def gen() -> tp.Iterator[TSeriesAny]:
yield Series(self._loaded,
index=self._index,
dtype=DTYPE_BOOL,
name='loaded')
for attr, dtype, missing in (
('size', DTYPE_FLOAT_DEFAULT, np.nan),
('nbytes', DTYPE_FLOAT_DEFAULT, np.nan),
('shape', DTYPE_OBJECT, None)
):
values = (getattr(f, attr) if f is not FrameDeferred
else missing for f in self._values_mutable)
yield Series(values, index=self._index, dtype=dtype, name=attr)
return Frame.from_concat(gen(), axis=1)
@property
def inventory(self) -> TFrameAny:
'''Return a :obj:`Frame` indicating file_path, last-modified time, and size of underlying disk-based data stores if used for this :obj:`Bus`.
'''
records = []
index = [self._name]
if self._store is not None:
fp = Path(self._store._fp)
size = bytes_to_size_label(fp.stat().st_size)
utc = datetime.fromtimestamp(
self._store._last_modified,
timezone.utc).isoformat()
records.append([str(fp), utc, size])
else:
records.append(['', '', ''])
return Frame.from_records(records,
columns=('path', 'last_modified', 'size'),
index=index,
)
#---------------------------------------------------------------------------
# common attributes from the numpy array
@property
def dtype(self) -> TDtypeObject:
'''
Return the dtype of the underlying NumPy array.
Returns:
:obj:`numpy.dtype`
'''
return DTYPE_OBJECT
@property
def shape(self) -> tp.Tuple[int]:
'''
Return a tuple describing the shape of the underlying NumPy array.
Returns:
:obj:`Tuple[int]`
'''
return self._values_mutable.shape #type: ignore
@property
def ndim(self) -> int:
'''
Return the number of dimensions, which for a :obj:`Bus` is always 1.
Returns:
:obj:`int`
'''
return self._NDIM
@property
def size(self) -> int:
'''
Return the size of the underlying NumPy array.
Returns:
:obj:`int`
'''
return self._values_mutable.size
#---------------------------------------------------------------------------
@property
def index(self) -> IndexBase:
'''
The index instance assigned to this container.
Returns:
:obj:`Index`
'''
return self._index
#---------------------------------------------------------------------------
# dictionary-like interface
[docs]
def keys(self) -> IndexBase:
'''
Iterator of index labels.
Returns:
:obj:`Iterator[Hashable]`
'''
return self._index
[docs]
def __iter__(self) -> tp.Iterator[TLabel]:
'''
Iterator of index labels, same as :obj:`static_frame.Series.keys`.
Returns:
:obj:`Iterator[Hashasble]`
'''
return self._index.__iter__()
[docs]
def __contains__(self, value: TLabel) -> bool:
'''
Inclusion of value in index labels.
Returns:
:obj:`bool`
'''
return self._index.__contains__(value)
[docs]
def get(self, key: TLabel,
default: tp.Any = None,
) -> tp.Any:
'''
Return the value found at the index key, else the default if the key is not found.
Returns:
:obj:`Any`
'''
if key not in self._index:
return default
# will always return an element
return self._extract_loc(key=key)
#---------------------------------------------------------------------------
[docs]
@doc_inject()
def equals(self,
other: tp.Any,
*,
compare_name: bool = False,
compare_dtype: bool = False,
compare_class: bool = False,
skipna: bool = True,
) -> bool:
'''
{doc}
Note: this will attempt to load and compare all Frame managed by the Bus.
Args:
{compare_name}
{compare_dtype}
{compare_class}
{skipna}
'''
if id(other) == id(self):
return True
if compare_class and self.__class__ != other.__class__:
return False
elif not isinstance(other, Bus):
return False
# NOTE: dtype self._series is always object
if len(self._index) != len(other._index):
return False
if compare_name and self._name != other._name:
return False
if not self._index.equals(
other._index,
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
# can zip because length of Series already match
# using .values will force loading all Frame into memory; better to use items() to permit collection
for (_, frame_self), (_, frame_other) in zip(self.items(), other.items()):
if not frame_self.equals(frame_other,
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
return True
#---------------------------------------------------------------------------
# transformations resulting in changed dimensionality
[docs]
@doc_inject(selector='head', class_name='Bus')
def head(self, count: int = 5) -> TBusAny:
'''{doc}
Args:
{count}
Returns:
:obj:`Bus`
'''
return self.iloc[:count]
[docs]
@doc_inject(selector='tail', class_name='Bus')
def tail(self, count: int = 5) -> TBusAny:
'''{doc}s
Args:
{count}
Returns:
:obj:`Bus`
'''
return self.iloc[-count:]
#---------------------------------------------------------------------------
# transformations resulting in the same dimensionality
[docs]
@doc_inject(selector='sort')
def sort_index(self,
*,
ascending: TBoolOrBools = True,
kind: TSortKinds = DEFAULT_SORT_KIND,
key: tp.Optional[tp.Callable[[IndexBase], tp.Union[TNDArrayAny, IndexBase]]] = None,
) -> tp.Self:
'''
Return a new Bus ordered by the sorted Index.
Args:
*
{ascendings}
{kind}
{key}
Returns:
:obj:`Bus`
'''
series = self._to_series_state().sort_index(
ascending=ascending,
kind=kind,
key=key,
)
return self._derive_from_series(series, own_data=True)
[docs]
@doc_inject(selector='sort')
def sort_values(self,
*,
ascending: bool = True,
kind: TSortKinds = DEFAULT_SORT_KIND,
key: tp.Callable[[TSeriesAny], tp.Union[TNDArrayAny, TSeriesAny]],
) -> tp.Self:
'''
Return a new Bus ordered by the sorted values. Note that as a Bus contains Frames, a `key` argument must be provided to extract a sortable value, and this key function will process a :obj:`Series` of :obj:`Frame`.
Args:
*
{ascending}
{kind}
{key}
Returns:
:obj:`Bus`
'''
cfs = self.to_series() # force loading all Frame
series = cfs.sort_values(
ascending=ascending,
kind=kind,
key=key,
)
return self._derive_from_series(series, own_data=True)
[docs]
def roll(self,
shift: int,
*,
include_index: bool = False,
) -> tp.Self:
'''Return a Bus with values rotated forward and wrapped around the index (with a positive shift) or backward and wrapped around the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
include_index: Determine if the Index is shifted with the underlying data.
Returns:
:obj:`Bus`
'''
series = self._to_series_state().roll(shift=shift, include_index=include_index)
return self._derive_from_series(series, own_data=True)
[docs]
def shift(self,
shift: int,
*,
fill_value: tp.Any,
) -> tp.Self:
'''Return a :obj:`Bus` with values shifted forward on the index (with a positive shift) or backward on the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
fill_value: Value to be used to fill data missing after the shift.
Returns:
:obj:`Bus`
'''
series = self._to_series_state().shift(shift=shift, fill_value=fill_value)
return self._derive_from_series(series, own_data=True)
#---------------------------------------------------------------------------
# exporter
def _to_series_state(self) -> TSeriesObject:
# the mutable array will be copied in the Series construction
return Series(self._values_mutable,
index=self._index,
own_index=True,
name=self._name,
)
[docs]
def to_series(self) -> TSeriesObject:
'''Return a :obj:`Series` with the :obj:`Frame` contained in this :obj:`Bus`. If the :obj:`Bus` is associated with a :obj:`Store`, all :obj:`Frame` will be loaded into memory and the returned :obj:`Bus` will no longer be associated with the :obj:`Store`.
'''
# values returns an immutable array and will fully realize from Store
return Series(self.values,
index=self._index,
own_index=True,
name=self._name,
)
def _to_signature_bytes(self,
include_name: bool = True,
include_class: bool = True,
encoding: str = 'utf-8',
) -> bytes:
v = (f._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding,
) for f in self._axis_element())
return b''.join(chain(
iter_component_signature_bytes(self,
include_name=include_name,
include_class=include_class,
encoding=encoding),
(self._index._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding),),
v))
TBusAny = Bus[tp.Any]