import typing as tp
from itertools import chain
from itertools import zip_longest
import numpy as np
from static_frame.core.container import ContainerBase
from static_frame.core.container_util import index_from_optional_constructor
from static_frame.core.container_util import iter_component_signature_bytes
from static_frame.core.display import Display
from static_frame.core.display import DisplayActive
from static_frame.core.display import DisplayHeader
from static_frame.core.display_config import DisplayConfig
from static_frame.core.doc_str import doc_inject
from static_frame.core.exception import ErrorInitBus
from static_frame.core.exception import ErrorInitIndexNonUnique
from static_frame.core.frame import Frame
from static_frame.core.index import Index
from static_frame.core.index_auto import IndexAutoFactoryType
from static_frame.core.index_auto import RelabelInput
from static_frame.core.index_base import IndexBase
from static_frame.core.node_iter import IterNodeApplyType
from static_frame.core.node_iter import IterNodeNoArg
from static_frame.core.node_iter import IterNodeType
from static_frame.core.node_selector import InterfaceGetItem
from static_frame.core.node_selector import InterfaceSelectTrio
from static_frame.core.series import Series
from static_frame.core.store import Store
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.store_config import StoreConfigMap
from static_frame.core.store_config import StoreConfigMapInitializer
from static_frame.core.store_hdf5 import StoreHDF5
from static_frame.core.store_sqlite import StoreSQLite
from static_frame.core.store_xlsx import StoreXLSX
from static_frame.core.store_zip import StoreZipCSV
from static_frame.core.store_zip import StoreZipNPY
from static_frame.core.store_zip import StoreZipNPZ
from static_frame.core.store_zip import StoreZipParquet
from static_frame.core.store_zip import StoreZipPickle
from static_frame.core.store_zip import StoreZipTSV
from static_frame.core.style_config import StyleConfig
from static_frame.core.util import DEFAULT_SORT_KIND
from static_frame.core.util import DTYPE_BOOL
from static_frame.core.util import DTYPE_FLOAT_DEFAULT
from static_frame.core.util import DTYPE_OBJECT
from static_frame.core.util import INT_TYPES
from static_frame.core.util import NAME_DEFAULT
from static_frame.core.util import NULL_SLICE
from static_frame.core.util import ZIP_LONGEST_DEFAULT
from static_frame.core.util import BoolOrBools
from static_frame.core.util import GetItemKeyType
from static_frame.core.util import IndexConstructor
from static_frame.core.util import IndexConstructors
from static_frame.core.util import IndexInitializer
from static_frame.core.util import NameType
from static_frame.core.util import PathSpecifier
#-------------------------------------------------------------------------------
class FrameDeferredMeta(type):
def __repr__(cls) -> str:
return f'<{cls.__name__}>'
class FrameDeferred(metaclass=FrameDeferredMeta):
'''
Token placeholder for :obj:`Frame` not yet loaded.
'''
BusItemsType = tp.Iterable[tp.Tuple[
tp.Hashable, tp.Union[Frame, tp.Type[FrameDeferred]]]]
FrameIterType = tp.Iterator[Frame]
#-------------------------------------------------------------------------------
[docs]class Bus(ContainerBase, StoreClientMixin): # not a ContainerOperand
'''
A randomly-accessible container of :obj:`Frame`. When created from a multi-table storage format (such as a zip-pickle or XLSX), a Bus will lazily read in components as they are accessed. When combined with the ``max_persist`` parameter, a Bus will not hold on to more than ``max_persist`` references, permitting low-memory reading of collections of :obj:`Frame`.
'''
__slots__ = (
'_loaded',
'_loaded_all',
'_values_mutable',
'_index',
'_name',
'_store',
'_config',
'_last_accessed',
'_max_persist',
)
_values_mutable: np.ndarray
_index: IndexBase
_store: tp.Optional[Store]
_config: StoreConfigMap
_name: NameType
STATIC = False
_NDIM: int = 1
[docs] @classmethod
def from_items(cls,
pairs: tp.Iterable[tp.Tuple[tp.Hashable, Frame]],
*,
config: StoreConfigMapInitializer = None,
name: NameType = None,
index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None
) -> 'Bus':
'''Return a :obj:`Bus` from an iterable of pairs of label, :obj:`Frame`.
Returns:
:obj:`Bus`
'''
frames = []
index = []
for i, f in pairs: # might be a generator
index.append(i)
frames.append(f)
return cls(frames,
index=index,
index_constructor=index_constructor,
name=name,
config=config,
)
[docs] @classmethod
def from_frames(cls,
frames: tp.Iterable[Frame],
*,
index_constructor: IndexConstructor = None,
config: StoreConfigMapInitializer = None,
name: NameType = None,
) -> 'Bus':
'''Return a :obj:`Bus` from an iterable of :obj:`Frame`; labels will be drawn from :obj:`Frame.name`.
'''
try:
return cls.from_items(((f.name, f) for f in frames),
index_constructor=index_constructor,
config=config,
name=name,
)
except ErrorInitIndexNonUnique:
raise ErrorInitIndexNonUnique('Frames do not have unique names.') from None
[docs] @classmethod
def from_dict(cls,
mapping: tp.Dict[tp.Hashable, Frame],
*,
name: NameType = None,
index_constructor: tp.Optional[tp.Callable[..., IndexBase]] = None
) -> 'Bus':
'''Bus construction from a mapping of labels and :obj:`Frame`.
Args:
mapping: a dictionary or similar mapping interface.
Returns:
:obj:`Bus`
'''
return cls(frames=mapping.values(),
index=mapping.keys(),
index_constructor=index_constructor,
name=name,
)
[docs] @classmethod
def from_series(cls,
series: Series,
*,
store: tp.Optional[Store] = None,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
own_data: bool = False,
) -> 'Bus':
'''
Create a :obj:`Bus` from a :obj:`Series` of :obj:`Frame`.
'''
# NOTE: this interface is for 0.9 after the default Bus no longer accepts a Series
return cls(series.values,
index=series.index,
store=store,
config=config,
max_persist=max_persist,
own_data=own_data,
own_index=True,
name=series.name,
)
[docs] @classmethod
def from_concat(cls,
containers: tp.Iterable['Bus'],
*,
index: tp.Optional[tp.Union[IndexInitializer, IndexAutoFactoryType]] = None,
name: NameType = NAME_DEFAULT,
) -> 'Bus':
'''
Concatenate multiple :obj:`Bus` into a new :obj:`Bus`. All :obj:`Bus` will load all :obj:`Frame` into memory if any are deferred.
'''
# will extract .values, .index from Bus, which will correct load from Store as needed
# NOTE: useful to use Series here as it handles aligned names, IndexAutoFactory, etc.
series = Series.from_concat(containers, index=index, name=name)
return cls.from_series(series, own_data=True)
#---------------------------------------------------------------------------
# constructors by data format
@classmethod
def _from_store(cls,
store: Store,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
return cls(None, # will generate FrameDeferred array
index=store.labels(config=config),
index_constructor=index_constructor,
store=store,
config=config,
max_persist=max_persist,
own_data=True,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_tsv(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped TSV :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipTSV(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_csv(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped CSV :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipCSV(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_pickle(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped pickle :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipPickle(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_npz(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped NPZ :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipNPZ(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_npy(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped NPY :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipNPY(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_zip_parquet(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to zipped parquet :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreZipParquet(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_xlsx(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to an XLSX :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
# how to pass configuration for multiple sheets?
store = StoreXLSX(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_sqlite(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to an SQLite :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreSQLite(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
[docs] @classmethod
@doc_inject(selector='bus_constructor')
def from_hdf5(cls,
fp: PathSpecifier,
*,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
Given a file path to a HDF5 :obj:`Bus` store, return a :obj:`Bus` instance.
{args}
'''
store = StoreHDF5(fp)
return cls._from_store(store,
config=config,
max_persist=max_persist,
index_constructor=index_constructor,
)
#---------------------------------------------------------------------------
[docs] def __init__(self,
frames: tp.Optional[tp.Iterable[tp.Union[Frame, tp.Type[FrameDeferred]]]],
*,
index: IndexInitializer,
index_constructor: IndexConstructor = None,
name: NameType = NAME_DEFAULT,
store: tp.Optional[Store] = None,
config: StoreConfigMapInitializer = None,
max_persist: tp.Optional[int] = None,
own_index: bool = False,
own_data: bool = False,
):
'''
Default Bus constructor.
{args}
'''
if max_persist is not None:
# use an (ordered) dictionary to give use an ordered set, simply pointing to None for all keys
self._last_accessed: tp.Dict[tp.Hashable, None] = {}
if own_index:
self._index = index #type: ignore
else:
self._index = index_from_optional_constructor(index,
default_constructor=Index,
explicit_constructor=index_constructor
)
count = len(self._index)
frames_array: np.ndarray
if frames is None:
if store is None:
raise ErrorInitBus('Cannot initialize a :obj:`Bus` with neither `frames` nor `store`.')
self._values_mutable = np.full(count, FrameDeferred, dtype=DTYPE_OBJECT)
self._loaded = np.full(count, False, dtype=DTYPE_BOOL)
self._loaded_all = False
else:
if frames.__class__ is np.ndarray:
if frames.dtype != DTYPE_OBJECT: #type: ignore
raise ErrorInitBus(
f'Series passed to initializer must have dtype object, not {frames.dtype}') #type: ignore
frames_array = frames
load_array = False
else:
if own_data:
raise ErrorInitBus('Cannot use `own_data` when not supplying an array.')
frames_array = np.empty(count, dtype=DTYPE_OBJECT)
load_array = True
self._loaded = np.empty(count, dtype=DTYPE_BOOL)
# do a one time iteration of series
for i, (label, value) in enumerate(zip_longest(
index,
frames,
fillvalue=ZIP_LONGEST_DEFAULT,
)):
if label is ZIP_LONGEST_DEFAULT or value is ZIP_LONGEST_DEFAULT:
raise ErrorInitBus('frames and index are not of equal length')
if load_array:
frames_array[i] = value
if value is FrameDeferred:
self._loaded[i] = False
elif isinstance(value, Frame): # permit FrameGO?
if max_persist is not None:
self._last_accessed[label] = None
self._loaded[i] = True
else:
raise ErrorInitBus(f'supplied {value.__class__} is not a Frame or FrameDeferred.')
self._loaded_all = self._loaded.all()
if own_data or load_array:
self._values_mutable = frames_array
else:
self._values_mutable = frames_array.copy()
self._values_mutable.flags.writeable = True
# self._index = index
self._name = None if name is NAME_DEFAULT else name
self._store = store
# Not handling cases of max_persist being greater than the length of the Series (might floor to length)
if max_persist is not None and max_persist < self._loaded.sum():
raise ErrorInitBus('max_persist cannot be less than the number of already loaded Frames')
self._max_persist = max_persist
# providing None will result in default; providing a StoreConfig or StoreConfigMap will return an appropriate map
self._config = StoreConfigMap.from_initializer(config)
#---------------------------------------------------------------------------
def _derive_from_series(self,
series: Series,
*,
own_data: bool = False,
) -> 'Bus':
'''Utility for creating a derived Bus, propagating the associated ``Store`` and configuration. This can be used if the passed `series` is a subset or re-ordering of self._series; however, if the index has been transformed, this method should not be used, as, if there is a Store, the labels are no longer found in that Store.
'''
# NOTE: there may be a more efficient path than using a Series
return self.__class__.from_series(series,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_data=own_data,
)
# ---------------------------------------------------------------------------
[docs] def __reversed__(self) -> tp.Iterator[tp.Hashable]:
'''
Returns a reverse iterator on the :obj:`Bus` index.
Returns:
:obj:`Index`
'''
return reversed(self._index)
# def __copy__(self) -> 'Bus':
# '''
# Return a new Bus, holding new references to Frames as well as a link to the a new Store instance.
# '''
# return self.__class__(series,
# store=self._store.__copy__(),
# config=self._config,
# max_persiste=self._max_persist,
# )
#---------------------------------------------------------------------------
# name interface
@property #type: ignore
@doc_inject()
def name(self) -> NameType:
'''{}'''
return self._name
[docs] def rename(self, name: NameType) -> 'Bus':
'''
Return a new :obj:`Bus` with an updated name attribute.
'''
# NOTE: do not want to use .values as this will force loading all Frames; use _values_mutable and let a copy be made by constructor
return self.__class__(self._values_mutable,
index=self._index,
name=name,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_index=True,
own_data=False,
)
#---------------------------------------------------------------------------
# interfaces
@property
def loc(self) -> InterfaceGetItem['Bus']:
return InterfaceGetItem(self._extract_loc)
@property
def iloc(self) -> InterfaceGetItem['Bus']:
return InterfaceGetItem(self._extract_iloc)
@property
def drop(self) -> InterfaceSelectTrio['Bus']:
'''
Interface for dropping elements from :obj:`static_frame.Bus`.
'''
return InterfaceSelectTrio( #type: ignore
func_iloc=self._drop_iloc,
func_loc=self._drop_loc,
func_getitem=self._drop_loc
)
#---------------------------------------------------------------------------
@property
def iter_element(self) -> IterNodeNoArg['Bus']:
'''
Iterator of elements.
'''
return IterNodeNoArg(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.VALUES,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
@property
def iter_element_items(self) -> IterNodeNoArg['Bus']:
'''
Iterator of label, element pairs.
'''
return IterNodeNoArg(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.ITEMS,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
#---------------------------------------------------------------------------
# index manipulation
# NOTE: must return a new Bus with fully-realized Frames, as cannot gaurantee usage of a Store after labels have been changed.
[docs] @doc_inject(selector='reindex', class_name='Bus')
def reindex(self,
index: IndexInitializer,
*,
fill_value: tp.Any,
own_index: bool = False,
check_equals: bool = True
) -> 'Bus':
'''
{doc}
Args:
index: {index_initializer}
columns: {index_initializer}
{fill_value}
{own_index}
'''
series = self.to_series().reindex(index,
fill_value=fill_value,
own_index=own_index,
check_equals=check_equals,
)
# NOTE: do not propagate store after reindex
return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel', class_name='Bus')
def relabel(self,
index: tp.Optional[RelabelInput],
*,
index_constructor: IndexConstructor = None,
) -> 'Bus':
'''
{doc}
Args:
index: {relabel_input}
'''
# NOTE: can be done without going trhough a series
series = self.to_series().relabel(index, index_constructor=index_constructor)
# NOTE: do not propagate store after relabel
return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_flat', class_name='Bus')
def relabel_flat(self) -> 'Bus':
'''
{doc}
'''
series = self.to_series().relabel_flat()
return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_level_add', class_name='Bus')
def relabel_level_add(self,
level: tp.Hashable
) -> 'Bus':
'''
{doc}
Args:
level: {level}
'''
series = self.to_series().relabel_level_add(level)
return self.__class__.from_series(series, config=self._config)
[docs] @doc_inject(selector='relabel_level_drop', class_name='Bus')
def relabel_level_drop(self,
count: int = 1
) -> 'Bus':
'''
{doc}
Args:
count: {count}
'''
series = self.to_series().relabel_level_drop(count)
return self.__class__.from_series(series, config=self._config)
[docs] def rehierarch(self,
depth_map: tp.Sequence[int],
*,
index_constructors: IndexConstructors = None,
) -> 'Bus':
'''
Return a new :obj:`Bus` with new a hierarchy based on the supplied ``depth_map``.
'''
series = self.to_series().rehierarch(
depth_map,
index_constructors=index_constructors,
)
return self.__class__.from_series(series, config=self._config)
#---------------------------------------------------------------------------
# na / falsy handling
# NOTE: not implemented, as a Bus must contain only Frame or FrameDeferred
#---------------------------------------------------------------------------
# cache management
@staticmethod
def _store_reader(
store: Store,
config: StoreConfigMap,
labels: tp.Iterator[tp.Hashable],
max_persist: tp.Optional[int],
) -> FrameIterType:
'''
Read as many labels as possible from Store, then yield back each one at a time. If max_persist is active, max_persist will set the maximum number of Frame to load per read. Using Store.read_many is shown to have significant performance benefits on large collections of Frame.
'''
if max_persist is None:
for frame in store.read_many(labels, config=config):
yield frame
elif max_persist > 1:
coll = []
for label in labels:
coll.append(label)
# try to collect max_persist-sized bundles in coll, then use read_many to get all at once, then clear if we have more to iter
if len(coll) == max_persist:
for frame in store.read_many(coll, config=config):
yield frame
coll.clear()
if coll: # less than max persist remaining
for frame in store.read_many(coll, config=config):
yield frame
else: # max persist is 1
for label in labels:
yield store.read(label, config=config[label])
def _update_series_cache_iloc(self, key: GetItemKeyType) -> None:
'''
Update the Series cache with the key specified, where key can be any iloc GetItemKeyType.
Args:
key: always an iloc key.
'''
max_persist_active = self._max_persist is not None
load = False if self._loaded_all else not self._loaded[key].all()
if not load and not max_persist_active:
return
index = self._index
if not load and max_persist_active: # must update LRU position
labels = (index.iloc[key],) if isinstance(key, INT_TYPES) else index.iloc[key].values
for label in labels: # update LRU position
self._last_accessed[label] = self._last_accessed.pop(label, None)
return
if self._store is None: # there has to be a Store defined if we are partially loaded
raise RuntimeError('no store defined')
if max_persist_active:
loaded_count = self._loaded.sum()
array = self._values_mutable
target_values = array[key]
target_labels = self._index.iloc[key]
# targets = self._series.iloc[key] # key is iloc key
store_reader: FrameIterType
targets_items: BusItemsType
if not isinstance(target_values, np.ndarray):
targets_items = ((target_labels, target_values),) # present element as items
store_reader = (self._store.read(target_labels,
config=self._config[target_labels]) for _ in range(1))
else: # more than one Frame
store_reader = self._store_reader(
store=self._store,
config=self._config,
labels=(label for label, f in zip(target_labels, target_values)
if f is FrameDeferred),
max_persist=self._max_persist,
)
targets_items = zip(target_labels, target_values)
# Iterate over items that have been selected; there must be at least 1 FrameDeffered among this selection
for label, frame in targets_items:
idx = index._loc_to_iloc(label)
if max_persist_active: # update LRU position
self._last_accessed[label] = self._last_accessed.pop(label, None)
if frame is FrameDeferred:
frame = next(store_reader)
if not self._loaded[idx]:
# as we are iterating from `targets`, we might be holding on to references of Frames that we already removed in `array`; in this case we do not need to `read`, but we still need to update the new array
array[idx] = frame
self._loaded[idx] = True # update loaded status
if max_persist_active:
loaded_count += 1
if max_persist_active and loaded_count > self._max_persist:
label_remove = next(iter(self._last_accessed))
del self._last_accessed[label_remove]
idx_remove = index._loc_to_iloc(label_remove)
self._loaded[idx_remove] = False
array[idx_remove] = FrameDeferred
loaded_count -= 1
self._loaded_all = self._loaded.all()
[docs] def unpersist(self) -> None:
'''Replace loaded :obj:`Frame` with :obj:`FrameDeferred`.
'''
if self._store is None:
# have this be a no-op so that Yarn or Quilt can call regardless of Store
return
if self._max_persist is not None:
last_accessed = self._last_accessed
else:
last_accessed = dict.fromkeys(self.index)
index = self._index
array = self._values_mutable
for label_remove in last_accessed:
idx_remove = index._loc_to_iloc(label_remove)
self._loaded[idx_remove] = False
array[idx_remove] = FrameDeferred
last_accessed.clear()
self._loaded_all = False
#---------------------------------------------------------------------------
# extraction
def _extract_iloc(self, key: GetItemKeyType) -> 'Bus':
'''
Returns:
Bus or, if an element is selected, a Frame
'''
self._update_series_cache_iloc(key=key)
# iterable selection should be handled by NP
values = self._values_mutable[key]
# NOTE: Bus only stores Frame and FrameDeferred, can rely on check with values
if not values.__class__ is np.ndarray: # if we have a single element
return values #type: ignore
return self.__class__(values,
index=self._index.iloc[key],
name=self._name,
store=self._store,
config=self._config,
max_persist=self._max_persist,
own_index=True,
own_data=False, # force immutable copy
)
def _extract_loc(self, key: GetItemKeyType) -> 'Bus':
iloc_key = self._index._loc_to_iloc(key)
return self._extract_iloc(iloc_key)
[docs] @doc_inject(selector='selector')
def __getitem__(self, key: GetItemKeyType) -> 'Bus':
'''Selector of values by label.
Args:
key: {key_loc}
'''
return self._extract_loc(key)
#---------------------------------------------------------------------------
# utilities for alternate extraction: drop
def _drop_iloc(self, key: GetItemKeyType) -> 'Bus':
series = self._to_series_state()._drop_iloc(key)
return self._derive_from_series(series, own_data=True)
def _drop_loc(self, key: GetItemKeyType) -> 'Bus':
return self._drop_iloc(self._index._loc_to_iloc(key))
#---------------------------------------------------------------------------
# axis functions
def _axis_element_items(self,
) -> tp.Iterator[tp.Tuple[tp.Hashable, Frame]]:
'''Generator of index, value pairs, equivalent to Series.items(). Repeated to have a common signature as other axis functions.
'''
yield from self.items()
def _axis_element(self,
) -> tp.Iterator[tp.Any]:
if self._loaded_all:
yield from self._values_mutable
elif self._max_persist is None: # load all at once if possible
if not self._loaded_all:
self._update_series_cache_iloc(key=NULL_SLICE)
yield from self._values_mutable
elif self._max_persist > 1:
i = 0
i_max = len(self._index.values)
while i < i_max:
key = slice(i, min(i + self._max_persist, i_max))
# draw values to force usage of read_many in _store_reader
self._update_series_cache_iloc(key=key)
for j in range(key.start, key.stop):
yield self._values_mutable[j]
i += self._max_persist
else: # max_persist is 1
for i in range(self.__len__()):
self._update_series_cache_iloc(key=i)
yield self._values_mutable[i]
#---------------------------------------------------------------------------
# dictionary-like interface; these will force loading contained Frame
[docs] def items(self) -> tp.Iterator[tp.Tuple[tp.Hashable, Frame]]:
'''Iterator of pairs of :obj:`Bus` label and contained :obj:`Frame`.
'''
if self._loaded_all:
yield from zip(self._index, self._values_mutable)
elif self._max_persist is None: # load all at once if possible
if not self._loaded_all:
self._update_series_cache_iloc(key=NULL_SLICE)
yield from zip(self._index, self._values_mutable)
elif self._max_persist > 1:
labels = self._index.values
i = 0
i_max = len(labels)
while i < i_max:
key = slice(i, min(i + self._max_persist, i_max))
labels_select = labels[key] # may over select
# draw values to force usage of read_many in _store_reader
self._update_series_cache_iloc(key=key)
yield from zip(labels_select, self._values_mutable[key])
i += self._max_persist
else: # max_persist is 1
for i, label in enumerate(self._index.values):
self._update_series_cache_iloc(key=i)
yield label, self._values_mutable[i]
_items_store = items
@property
def values(self) -> np.ndarray:
'''A 1D object array of all :obj:`Frame` contained in the :obj:`Bus`. The returned ``np.ndarray`` will have ``Frame``; this will never return an array with ``FrameDeferred``, but ``max_persist`` will be observed in reading from the Store.
'''
# NOTE: when self._values_mutable is fully loaded, it could become immutable and avoid a copy
if self._loaded_all:
post = self._values_mutable.copy()
post.flags.writeable = False
return post
if self._max_persist is None: # load all at once if possible
# b._loaded_all must be False
self._update_series_cache_iloc(key=NULL_SLICE)
post = self._values_mutable.copy()
post.flags.writeable = False
return post
# return a new array; force new iteration to account for max_persist
post = np.empty(self.__len__(), dtype=object)
if self._max_persist > 1:
i = 0
i_max = len(self._index.values)
while i < i_max:
key = slice(i, min(i + self._max_persist, i_max))
# draw values to force usage of read_many in _store_reader
self._update_series_cache_iloc(key=key)
post[key] = self._values_mutable[key]
i += self._max_persist
else: # max_persist is 1
for i in range(self.__len__()):
self._update_series_cache_iloc(key=i)
post[i] = self._values_mutable[i]
post.flags.writeable = False
return post
#---------------------------------------------------------------------------
[docs] def __len__(self) -> int:
'''Length of values.
'''
return self._index.__len__()
[docs] @doc_inject()
def display(self,
config: tp.Optional[DisplayConfig] = None,
*,
style_config: tp.Optional[StyleConfig] = None,
) -> Display:
'''{doc}
Args:
{config}
'''
# NOTE: the key change over serires is providing the Bus as the displayed class
config = config or DisplayActive.get()
display_cls = Display.from_values((),
header=DisplayHeader(self.__class__, self._name),
config=config)
return self._to_series_state()._display(config,
display_cls=display_cls,
style_config=style_config,
)
#---------------------------------------------------------------------------
# extended discriptors; in general, these do not force loading Frame
@property
def mloc(self) -> Series:
'''Returns a :obj:`Series` showing a tuple of memory locations within each loaded Frame.
'''
if not self._loaded.any():
return Series.from_element(None, index=self._index)
def gen() -> tp.Iterator[tp.Tuple[tp.Hashable, tp.Optional[tp.Tuple[int, ...]]]]:
for label, f in zip(self._index, self._values_mutable):
if f is FrameDeferred:
yield label, None
else:
yield label, tuple(f.mloc)
return Series.from_items(gen())
@property
def dtypes(self) -> Frame:
'''Returns a :obj:`Frame` of dtype per column for all loaded Frames.
'''
if not self._loaded.any():
return Frame(index=self._index)
f = Frame.from_concat(
frames=(f.dtypes for f in self._values_mutable if f is not FrameDeferred),
fill_value=None,
).reindex(index=self._index, fill_value=None)
return tp.cast(Frame, f)
@property
def shapes(self) -> Series:
'''A :obj:`Series` describing the shape of each loaded :obj:`Frame`. Unloaded :obj:`Frame` will have a shape of None.
Returns:
:obj:`Series`
'''
values = (f.shape if f is not FrameDeferred else None for f in self._values_mutable)
return Series(values, index=self._index, dtype=object, name='shape')
@property
def nbytes(self) -> int:
'''Total bytes of data currently loaded in the Bus.
'''
return sum(f.nbytes if f is not FrameDeferred else 0 for f in self._values_mutable)
@property
def status(self) -> Frame:
'''
Return a :obj:`Frame` indicating loaded status, size, bytes, and shape of all loaded :obj:`Frame`.
'''
def gen() -> tp.Iterator[Series]:
yield Series(self._loaded,
index=self._index,
dtype=DTYPE_BOOL,
name='loaded')
for attr, dtype, missing in (
('size', DTYPE_FLOAT_DEFAULT, np.nan),
('nbytes', DTYPE_FLOAT_DEFAULT, np.nan),
('shape', DTYPE_OBJECT, None)
):
values = (getattr(f, attr) if f is not FrameDeferred
else missing for f in self._values_mutable)
yield Series(values, index=self._index, dtype=dtype, name=attr)
return tp.cast(Frame, Frame.from_concat(gen(), axis=1))
#---------------------------------------------------------------------------
# common attributes from the numpy array
@property
def dtype(self) -> np.dtype:
'''
Return the dtype of the underlying NumPy array.
Returns:
:obj:`numpy.dtype`
'''
return DTYPE_OBJECT
@property
def shape(self) -> tp.Tuple[int]:
'''
Return a tuple describing the shape of the underlying NumPy array.
Returns:
:obj:`Tuple[int]`
'''
return self._values_mutable.shape #type: ignore
@property
def ndim(self) -> int:
'''
Return the number of dimensions, which for a :obj:`Bus` is always 1.
Returns:
:obj:`int`
'''
return self._NDIM
@property
def size(self) -> int:
'''
Return the size of the underlying NumPy array.
Returns:
:obj:`int`
'''
return self._values_mutable.size #type: ignore
#---------------------------------------------------------------------------
@property
def index(self) -> IndexBase:
'''
The index instance assigned to this container.
Returns:
:obj:`Index`
'''
return self._index
# @property
# def _index(self) -> IndexBase:
# return self._series._index
#---------------------------------------------------------------------------
# dictionary-like interface
[docs] def keys(self) -> IndexBase:
'''
Iterator of index labels.
Returns:
:obj:`Iterator[Hashable]`
'''
return self._index
[docs] def __iter__(self) -> tp.Iterator[tp.Hashable]:
'''
Iterator of index labels, same as :obj:`static_frame.Series.keys`.
Returns:
:obj:`Iterator[Hashasble]`
'''
return self._index.__iter__()
[docs] def __contains__(self, value: tp.Hashable) -> bool:
'''
Inclusion of value in index labels.
Returns:
:obj:`bool`
'''
return self._index.__contains__(value)
[docs] def get(self, key: tp.Hashable,
default: tp.Any = None,
) -> tp.Any:
'''
Return the value found at the index key, else the default if the key is not found.
Returns:
:obj:`Any`
'''
if key not in self._index:
return default
# will always return an element
return self._extract_loc(key=key)
#---------------------------------------------------------------------------
[docs] @doc_inject()
def equals(self,
other: tp.Any,
*,
compare_name: bool = False,
compare_dtype: bool = False,
compare_class: bool = False,
skipna: bool = True,
) -> bool:
'''
{doc}
Note: this will attempt to load and compare all Frame managed by the Bus.
Args:
{compare_name}
{compare_dtype}
{compare_class}
{skipna}
'''
if id(other) == id(self):
return True
if compare_class and self.__class__ != other.__class__:
return False
elif not isinstance(other, Bus):
return False
# NOTE: dtype self._series is always object
if len(self._index) != len(other._index):
return False
if compare_name and self._name != other._name:
return False
if not self._index.equals(
other._index,
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
# can zip because length of Series already match
# using .values will force loading all Frame into memory; better to use items() to permit collection
for (_, frame_self), (_, frame_other) in zip(self.items(), other.items()):
if not frame_self.equals(frame_other,
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
return True
#---------------------------------------------------------------------------
# transformations resulting in changed dimensionality
[docs] @doc_inject(selector='head', class_name='Bus')
def head(self, count: int = 5) -> 'Bus':
'''{doc}
Args:
{count}
Returns:
:obj:`Bus`
'''
return self.iloc[:count]
[docs] @doc_inject(selector='tail', class_name='Bus')
def tail(self, count: int = 5) -> 'Bus':
'''{doc}s
Args:
{count}
Returns:
:obj:`Bus`
'''
return self.iloc[-count:]
#---------------------------------------------------------------------------
# transformations resulting in the same dimensionality
[docs] @doc_inject(selector='sort')
def sort_index(self,
*,
ascending: BoolOrBools = True,
kind: str = DEFAULT_SORT_KIND,
key: tp.Optional[tp.Callable[[IndexBase], tp.Union[np.ndarray, IndexBase]]] = None,
) -> 'Bus':
'''
Return a new Bus ordered by the sorted Index.
Args:
*
{ascendings}
{kind}
{key}
Returns:
:obj:`Bus`
'''
series = self._to_series_state().sort_index(
ascending=ascending,
kind=kind,
key=key,
)
return self._derive_from_series(series, own_data=True)
[docs] @doc_inject(selector='sort')
def sort_values(self,
*,
ascending: bool = True,
kind: str = DEFAULT_SORT_KIND,
key: tp.Callable[['Series'], tp.Union[np.ndarray, 'Series']],
) -> 'Bus':
'''
Return a new Bus ordered by the sorted values. Note that as a Bus contains Frames, a `key` argument must be provided to extract a sortable value, and this key function will process a :obj:`Series` of :obj:`Frame`.
Args:
*
{ascending}
{kind}
{key}
Returns:
:obj:`Bus`
'''
cfs = self.to_series() # force loading all Frame
series = cfs.sort_values(
ascending=ascending,
kind=kind,
key=key,
)
return self._derive_from_series(series, own_data=True)
[docs] def roll(self,
shift: int,
*,
include_index: bool = False,
) -> 'Bus':
'''Return a Bus with values rotated forward and wrapped around the index (with a positive shift) or backward and wrapped around the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
include_index: Determine if the Index is shifted with the underlying data.
Returns:
:obj:`Bus`
'''
series = self._to_series_state().roll(shift=shift, include_index=include_index)
return self._derive_from_series(series, own_data=True)
[docs] def shift(self,
shift: int,
*,
fill_value: tp.Any,
) -> 'Bus':
'''Return a :obj:`Bus` with values shifted forward on the index (with a positive shift) or backward on the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
fill_value: Value to be used to fill data missing after the shift.
Returns:
:obj:`Bus`
'''
series = self._to_series_state().shift(shift=shift, fill_value=fill_value)
return self._derive_from_series(series, own_data=True)
#---------------------------------------------------------------------------
# exporter
def _to_series_state(self) -> Series:
# the mutable array will be copied in the Series construction
return Series(self._values_mutable,
index=self._index,
own_index=True,
name=self._name,
)
[docs] def to_series(self) -> Series:
'''Return a :obj:`Series` with the :obj:`Frame` contained in this :obj:`Bus`. If the :obj:`Bus` is associated with a :obj:`Store`, all :obj:`Frame` will be loaded into memory and the returned :obj:`Bus` will no longer be associated with the :obj:`Store`.
'''
# values returns an immutable array and will fully realize from Store
return Series(self.values,
index=self._index,
own_index=True,
name=self._name,
)
def _to_signature_bytes(self,
include_name: bool = True,
include_class: bool = True,
encoding: str = 'utf-8',
) -> bytes:
v = (f._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding,
) for f in self._axis_element())
return b''.join(chain(
iter_component_signature_bytes(self,
include_name=include_name,
include_class=include_class,
encoding=encoding),
(self._index._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding),),
v))