from __future__ import annotations
from collections.abc import Set, Sized
from functools import partial
from itertools import chain
import numpy as np
import typing_extensions as tp
from static_frame.core.archive_npy import ArchiveManifest
from static_frame.core.axis_map import buses_to_iloc_hierarchy, buses_to_loc_hierarchy
from static_frame.core.bus import FrameDeferred
from static_frame.core.container import ContainerBase
from static_frame.core.container_util import (
index_from_optional_constructor,
index_many_concat,
iter_component_signature_bytes,
rehierarch_from_index_hierarchy,
sort_index_from_params,
)
from static_frame.core.display import Display, DisplayActive, DisplayHeader
from static_frame.core.doc_str import doc_inject
from static_frame.core.exception import (
ErrorInitYarn,
RelabelInvalid,
immutable_type_error_factory,
)
from static_frame.core.frame import Frame
from static_frame.core.generic_aliases import (
TBusAny,
TFrameAny,
TIndexAny,
TIndexIntDefault,
TSeriesAny,
TSeriesObject,
)
from static_frame.core.index import Index
from static_frame.core.index_auto import (
IndexAutoConstructorFactory,
IndexAutoFactory,
TIndexAutoFactory,
TRelabelInput,
)
from static_frame.core.index_base import IndexBase
from static_frame.core.index_correspondence import IndexCorrespondence
from static_frame.core.index_hierarchy import IndexHierarchy
from static_frame.core.mfc_mapping import YarnMapping
from static_frame.core.node_iter import IterNodeApplyType, IterNodeNoArg
from static_frame.core.node_selector import (
InterfacePersist,
InterfaceSelectTrio,
InterGetItemILocReduces,
InterGetItemLocReduces,
)
from static_frame.core.series import Series
from static_frame.core.store_client_mixin import StoreClientMixin
from static_frame.core.util import (
BOOL_TYPES,
DEFAULT_SORT_KIND,
DTYPE_INT_DEFAULT,
DTYPE_OBJECT,
EMPTY_SLICE,
INT_TYPES,
NAME_DEFAULT,
REVERSE_SLICE,
IterNodeType,
PositionsAllocator,
SortStatus,
TBoolOrBools,
TDtypeObject,
TILocSelector,
TILocSelectorMany,
TILocSelectorOne,
TIndexCtorSpecifier,
TIndexCtorSpecifiers,
TIndexInitializer,
TLabel,
TLocSelector,
TLocSelectorMany,
TName,
TNDArrayAny,
TNDArrayIntDefault,
TNDArrayObject,
TPathSpecifier,
TSortKinds,
array_shift,
is_callable_or_mapping,
iterable_to_array_1d,
)
if tp.TYPE_CHECKING:
from static_frame.core.display_config import DisplayConfig
from static_frame.core.style_config import StyleConfig
# -------------------------------------------------------------------------------
TIHInternal = IndexHierarchy[TIndexIntDefault, TIndexAny]
TVIndex = tp.TypeVar('TVIndex', bound=IndexBase, default=tp.Any)
[docs]
class Yarn(ContainerBase, StoreClientMixin, tp.Generic[TVIndex]):
"""
A :obj:`Series`-like container made of an ordered collection of :obj:`Bus`. :obj:`Yarn` can be indexed independently of the contained :obj:`Bus`, permitting independent labels per contained :obj:`Frame`.
"""
__slots__ = (
'_values',
'_hierarchy',
'_index',
'_indexer',
'_name',
'_deepcopy_from_bus',
)
_values: TNDArrayObject # an array of Bus
_hierarchy: TIHInternal
_index: IndexBase
_indexer: TNDArrayIntDefault
_name: TName
_deepcopy_from_bus: bool
_NDIM: int = 1
[docs]
@classmethod
def from_buses(
cls,
buses: tp.Iterable[TBusAny],
/,
*,
name: TName = None,
retain_labels: bool,
deepcopy_from_bus: bool = False,
) -> tp.Self:
"""Return a :obj:`Yarn` from an iterable of :obj:`Bus`; labels will be drawn from :obj:`Bus.name`."""
values, _ = iterable_to_array_1d(buses, dtype=DTYPE_OBJECT)
hierarchy = buses_to_iloc_hierarchy(
values,
deepcopy_from_bus=deepcopy_from_bus,
init_exception_cls=ErrorInitYarn,
)
if retain_labels:
index = buses_to_loc_hierarchy(
values,
deepcopy_from_bus=deepcopy_from_bus,
init_exception_cls=ErrorInitYarn,
)
else:
index = hierarchy.level_drop(1) # type: ignore
return cls(
values,
hierarchy=hierarchy,
index=index,
name=name,
deepcopy_from_bus=deepcopy_from_bus,
)
[docs]
@classmethod
def from_concat(
cls,
containers: tp.Iterable[TYarnAny],
/,
*,
index: tp.Optional[tp.Union[TIndexInitializer, TIndexAutoFactory]] = None,
name: TName = NAME_DEFAULT,
deepcopy_from_bus: bool = False,
) -> tp.Self:
"""
Concatenate multiple :obj:`Yarn` into a new :obj:`Yarn`. Loaded status of :obj:`Frame` within each :obj:`Bus` will not be altered.
Args:
containers:
index: Optionally provide new labels for the result of the concatenation.
name:
deepcopy_from_bus:
"""
values_components: tp.List[TNDArrayObject] = []
indexer_components: tp.List[TNDArrayIntDefault] = []
index_components: tp.Optional[tp.List[IndexBase]] = (
None if index is not None else []
)
labels = [] # for new hierarchy
bus_count = 0
hierarchy_count = 0
for y in containers:
if not isinstance(y, Yarn):
raise NotImplementedError(f'Cannot concatenate from {type(y)}')
b_pos: int
for b_pos, frame_label in y._hierarchy: # type: ignore[assignment]
labels.append((b_pos + bus_count, frame_label))
values_components.append(y._values)
indexer_components.append(y._indexer + hierarchy_count)
bus_count += len(y._values)
hierarchy_count += len(y._hierarchy)
if index_components is not None: # only accumulate if index not provided
index_components.append(y.index)
values = np.concatenate(values_components, dtype=DTYPE_OBJECT)
indexer = np.concatenate(indexer_components, dtype=DTYPE_INT_DEFAULT)
ctor: tp.Callable[..., IndexBase] = partial(Index, dtype=DTYPE_INT_DEFAULT)
ctors: TIndexCtorSpecifiers = [ctor, IndexAutoConstructorFactory] # type: ignore[list-item]
hierarchy: TIHInternal = IndexHierarchy.from_labels(
labels,
index_constructors=ctors,
)
if index_components is not None:
index = index_many_concat(index_components, Index)
own_index = True
else: # provided index must be evaluated
own_index = False
return cls(
values,
index=index,
deepcopy_from_bus=deepcopy_from_bus,
indexer=indexer,
hierarchy=hierarchy,
name=name if name is not NAME_DEFAULT else None,
own_index=own_index,
)
# ---------------------------------------------------------------------------
[docs]
def __init__(
self,
series: tp.Union[TSeriesObject, tp.Iterable[TBusAny]], # rename: values
/,
*,
index: TIndexInitializer | TIndexAutoFactory | None = None,
index_constructor: tp.Optional[TIndexCtorSpecifier] = None,
deepcopy_from_bus: bool = False,
indexer: tp.Optional[TNDArrayIntDefault] = None,
hierarchy: tp.Optional[TIHInternal] = None,
name: TName = None,
own_index: bool = False,
) -> None:
"""
Args:
series: An iterable (or :obj:`Series`) of :obj:`Bus`. The length of this container may not be the same as ``index``, if provided.
index: Optionally provide an index for the :obj:`Frame` contained in all :obj:`Bus`.
index_constructor:
deepcopy_from_bus:
hierarchy: Optionally provide a depth-two `IndexHierarchy` constructed from `Bus` integer positions on the outer level, and contained `Frame` labels on the inner level.
indexer: For each `Frame` referenced by the index, provide the location within the internal `IndexHierarchy`.
name:
own_index:
"""
if isinstance(series, Series):
if series.dtype != DTYPE_OBJECT:
raise ErrorInitYarn(
f'Series passed to initializer must have dtype object, not {series.dtype}'
)
self._values = series.values
else:
try:
self._values, _ = iterable_to_array_1d(series, dtype=DTYPE_OBJECT)
except RuntimeError as e:
raise ErrorInitYarn(e) from None
self._name = name
self._deepcopy_from_bus = deepcopy_from_bus
if hierarchy is None:
self._hierarchy = buses_to_iloc_hierarchy(
self._values,
deepcopy_from_bus=self._deepcopy_from_bus,
init_exception_cls=ErrorInitYarn,
)
else: # NOTE: we assume this hierarchy is well-formed
self._hierarchy = hierarchy
self._index: IndexBase
if own_index:
self._index = index # type: ignore
elif index is None or index is IndexAutoFactory:
self._index = IndexAutoFactory.from_optional_constructor(
len(self._hierarchy),
default_constructor=Index,
explicit_constructor=index_constructor,
)
else: # an iterable of labels or an Index
self._index = index_from_optional_constructor(
index, default_constructor=Index, explicit_constructor=index_constructor
)
if len(self._index) > len(self._hierarchy): # pyright: ignore
raise ErrorInitYarn(
f'Length of supplied index ({len(self._index)}) not of sufficient size ({len(self._hierarchy)}).'
) # pyright: ignore
self._indexer: TNDArrayIntDefault
if indexer is None:
self._indexer = PositionsAllocator.get(len(self._index)) # pyright: ignore
else:
self._indexer = indexer
if len(self._indexer) != len(self._index): # pyright: ignore
raise ErrorInitYarn(
f'Length of supplied indexer ({len(self._indexer)}) not of sufficient size ({len(self._index)}).'
) # pyright: ignore
# ---------------------------------------------------------------------------
[docs]
def unpersist(self) -> None:
"""For the :obj:`Bus` contained in this object, replace all loaded :obj:`Frame` with :obj:`FrameDeferred`."""
for b in self._values:
if b is not None:
b.unpersist()
# ---------------------------------------------------------------------------
def _persist_iloc(self, key: TILocSelector) -> None:
indexer: tp.Union[TNDArrayIntDefault, int] = self._indexer[key]
sel_hierarchy = self._hierarchy._extract_iloc(indexer)
if isinstance(indexer, INT_TYPES):
b_pos, frame_label = sel_hierarchy # always two-item tuple
self._values[b_pos]._persist_loc(frame_label) # pyright: ignore
return
for b_pos, frame_label in sel_hierarchy:
self._values[b_pos]._persist_loc(frame_label) # pyright: ignore
def _persist_loc(self, key: TLocSelector) -> None:
return self._persist_iloc(self._index._loc_to_iloc(key))
@property
def persist(self) -> InterfacePersist[TBusAny]:
"""
Interface for selectively (or completely) pre-load `Frame` from a store to optimize subsequent single `Frame` extraction.
"""
return InterfacePersist(
func_iloc=self._persist_iloc,
func_loc=self._persist_loc,
func_getitem=self._persist_loc,
)
# ---------------------------------------------------------------------------
[docs]
def __reversed__(self) -> tp.Iterator[TLabel]:
"""
Returns a reverse iterator on the :obj:`Yarn` index.
Returns:
:obj:`Index`
"""
return reversed(self._index)
[docs]
def __copy__(self) -> tp.Self:
"""
Return a shallow copy of this :obj:`Yarn`.
"""
values = np.empty(len(self._values), dtype=DTYPE_OBJECT)
for i, b in enumerate(self._values):
values[i] = b.__copy__()
return self.__class__(
values,
index=self._index,
deepcopy_from_bus=self._deepcopy_from_bus,
indexer=self._indexer,
hierarchy=self._hierarchy,
name=self._name,
own_index=True,
)
[docs]
def copy(self) -> tp.Self:
"""
Return a shallow copy of this :obj:`Yarn`.
"""
return self.__copy__()
# ---------------------------------------------------------------------------
# name interface
@property
@doc_inject()
def name(self) -> TName:
"""{}"""
return self._name
[docs]
def rename(
self,
name: TName,
/,
) -> tp.Self:
"""
Return a new :obj:`Yarn` with an updated name attribute.
Args:
name
"""
# NOTE: do not need to call _update_index_labels; can continue to defer
return self.__class__(
self._values,
index=self._index,
hierarchy=self._hierarchy,
indexer=self._indexer,
name=name,
deepcopy_from_bus=self._deepcopy_from_bus,
own_index=True,
)
# ---------------------------------------------------------------------------
# interfaces
@property
def loc(self) -> InterGetItemLocReduces[TYarnAny, np.object_]:
return InterGetItemLocReduces(self._extract_loc) # type: ignore
@property
def iloc(self) -> InterGetItemILocReduces[TYarnAny, np.object_]:
return InterGetItemILocReduces(self._extract_iloc)
@property
def drop(self) -> InterfaceSelectTrio[TYarnAny]:
"""
Interface for dropping elements from :obj:`Yarn`.
"""
return InterfaceSelectTrio( # type: ignore
func_iloc=self._drop_iloc,
func_loc=self._drop_loc,
func_getitem=self._drop_loc,
)
# ---------------------------------------------------------------------------
@property
def iter_element(self) -> IterNodeNoArg[TYarnAny]:
"""
Iterator of elements.
"""
return IterNodeNoArg(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.VALUES,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
@property
def iter_element_items(self) -> IterNodeNoArg[TYarnAny]:
"""
Iterator of label, element pairs.
"""
return IterNodeNoArg(
container=self,
function_items=self._axis_element_items,
function_values=self._axis_element,
yield_type=IterNodeType.ITEMS,
apply_type=IterNodeApplyType.SERIES_VALUES,
)
# ---------------------------------------------------------------------------
# extraction
@tp.overload
def _extract_iloc(self, key: TILocSelectorOne) -> Frame: ...
@tp.overload
def _extract_iloc(self, key: TILocSelectorMany) -> tp.Self: ...
def _extract_iloc(self, key: TILocSelector) -> tp.Self | TFrameAny:
"""
Returns:
Yarn or, if an element is selected, a Frame
"""
indexer: TNDArrayIntDefault | int = self._indexer[key]
sel_hierarchy = self._hierarchy._extract_iloc(indexer)
if isinstance(indexer, INT_TYPES):
# got a single element, return a Frame
b_pos, frame_label = sel_hierarchy # always two-item tuple
f: Frame = self._values[b_pos]._extract_loc(frame_label) # pyright: ignore
return f
# NOTE: identify Bus that are no longer needed, and remove them from the values such that they can be GCd if necessary; for now, we leave the hierarchy (and the position numbers) unchanged
bus_pos = self._hierarchy.index_at_depth(0)
sel_bus_pos = sel_hierarchy.index_at_depth(0)
if len(sel_bus_pos) < len(bus_pos):
values = self._values.copy() # becomes mutable
for pos in bus_pos.difference(sel_bus_pos):
values[pos] = None # type: ignore
values.flags.writeable = False
else:
values = self._values
return self.__class__(
values,
index=self._index.iloc[key],
deepcopy_from_bus=self._deepcopy_from_bus,
hierarchy=self._hierarchy,
indexer=indexer,
name=self._name,
own_index=True,
)
@tp.overload
def _extract_loc(self, key: TLabel) -> Frame: ...
@tp.overload
def _extract_loc(self, key: TLocSelectorMany) -> tp.Self: ...
def _extract_loc(self, key: TLocSelector) -> TYarnAny | TFrameAny:
# use the index active for this Yarn
key_iloc = self._index._loc_to_iloc(key)
return self._extract_iloc(key_iloc)
@tp.overload
def __getitem__(self, key: TLabel) -> Frame: ...
@tp.overload
def __getitem__(self, key: TLocSelectorMany) -> tp.Self: ...
[docs]
@doc_inject(selector='selector')
def __getitem__(self, key: TLocSelector) -> TYarnAny | TFrameAny:
"""Selector of values by label.
Args:
key: {key_loc}
"""
return self._extract_loc(key)
def __setitem__(self, key: TLabel, value: tp.Any) -> None:
raise immutable_type_error_factory(self.__class__, '', key, value)
# ---------------------------------------------------------------------------
# utilities for alternate extraction: drop
def _drop_iloc(self, key: TILocSelector) -> tp.Self:
invalid = np.full(len(self._index), True)
invalid[key] = False
return self._extract_iloc(invalid)
def _drop_loc(self, key: TLocSelector) -> tp.Self:
return self._drop_iloc(self._index._loc_to_iloc(key))
# ---------------------------------------------------------------------------
# axis functions
def _axis_element_items(
self,
) -> tp.Iterator[tp.Tuple[TLabel, tp.Any]]:
"""Generator of index, value pairs, equivalent to Series.items(). Repeated to have a common signature as other axis functions."""
yield from self.items()
def _axis_element(
self,
) -> tp.Iterator[TFrameAny]:
for b_pos, frame_label in self._hierarchy._extract_iloc(self._indexer):
yield self._values[b_pos]._extract_loc(frame_label) # pyright: ignore
# ---------------------------------------------------------------------------
# index manipulation
[docs]
@doc_inject(selector='reindex', class_name='Bus')
def reindex(
self,
index: TIndexInitializer,
*,
fill_value: tp.Any = None,
own_index: bool = False,
check_equals: bool = True,
) -> tp.Self:
"""
{doc}
Args:
index: {index_initializer}
columns: {index_initializer}
{fill_value}
{own_index}
"""
index_owned: IndexBase
if own_index:
index_owned = index # type: ignore
else:
index_owned = index_from_optional_constructor(
index, default_constructor=Index
)
if check_equals and self._index.equals(index_owned):
# if labels are equal (even if a different Index subclass), we can simply use the new Index
return self.__class__(
self._values,
index=index_owned,
hierarchy=self._hierarchy,
indexer=self._indexer,
name=self._name,
deepcopy_from_bus=self._deepcopy_from_bus,
own_index=True,
)
ic = IndexCorrespondence.from_correspondence(self._index, index_owned)
if not ic.size:
return self._extract_iloc(EMPTY_SLICE)
if ic.is_subset: # must have some common
indexer = self._indexer[ic.iloc_src]
indexer.flags.writeable = False
return self.__class__(
self._values,
index=index_owned,
hierarchy=self._hierarchy,
indexer=indexer,
name=self._name,
deepcopy_from_bus=self._deepcopy_from_bus,
own_index=True,
)
raise NotImplementedError(
'Reindex operations that are not strict subsets are not supported by `Yarn`'
)
[docs]
@doc_inject(selector='relabel', class_name='Yarn')
def relabel(self, index: tp.Optional[TRelabelInput]) -> tp.Self:
"""
{doc}
Args:
index: {relabel_input_index}
"""
# NOTE: we name the parameter index for alignment with the corresponding Frame method
own_index = False
if index is IndexAutoFactory:
index_init = None
elif index is None:
index_init = self._index
elif is_callable_or_mapping(index):
index_init = self._index.relabel(index)
own_index = True
elif isinstance(index, Set):
raise RelabelInvalid()
else:
index_init = index # type: ignore
return self.__class__(
self._values, # no change to Buses
index=index_init, # pyright: ignore
deepcopy_from_bus=self._deepcopy_from_bus,
hierarchy=self._hierarchy, # no change
indexer=self._indexer,
own_index=own_index,
)
[docs]
@doc_inject(selector='relabel_flat', class_name='Yarn')
def relabel_flat(self) -> tp.Self:
"""
{doc}
"""
if not isinstance(self._index, IndexHierarchy):
raise RuntimeError('cannot flatten an Index that is not an IndexHierarchy')
return self.__class__(
self._values, # no change to Buses
index=self._index.flat(),
deepcopy_from_bus=self._deepcopy_from_bus,
hierarchy=self._hierarchy, # no change
indexer=self._indexer,
own_index=True,
)
[docs]
@doc_inject(selector='relabel_level_add', class_name='Yarn')
def relabel_level_add(
self,
level: TLabel,
/,
) -> tp.Self:
"""
{doc}
Args:
level: {level}
"""
return self.__class__(
self._values, # no change to Buses
index=self._index.level_add(level),
deepcopy_from_bus=self._deepcopy_from_bus,
hierarchy=self._hierarchy, # no change
indexer=self._indexer,
own_index=True,
)
[docs]
@doc_inject(selector='relabel_level_drop', class_name='Yarn')
def relabel_level_drop(
self,
count: int = 1,
/,
) -> tp.Self:
"""
{doc}
Args:
count: {count}
"""
if not isinstance(self._index, IndexHierarchy):
raise RuntimeError(
'cannot drop level of an Index that is not an IndexHierarchy'
)
return self.__class__(
self._values, # no change to Buses
index=self._index.level_drop(count),
deepcopy_from_bus=self._deepcopy_from_bus,
hierarchy=self._hierarchy, # no change
indexer=self._indexer,
own_index=True,
)
[docs]
def rehierarch(
self,
depth_map: tp.Sequence[int],
/,
*,
index_constructors: TIndexCtorSpecifiers = None,
) -> tp.Self:
"""
Return a new :obj:`Series` with new a hierarchy based on the supplied ``depth_map``.
"""
if self.index.depth == 1:
raise RuntimeError('cannot rehierarch when there is no hierarchy')
index, iloc_map = rehierarch_from_index_hierarchy(
labels=self._index, # type: ignore
depth_map=depth_map,
index_constructors=index_constructors,
name=self._index.name,
)
return self._extract_iloc(iloc_map).relabel(index)
# ---------------------------------------------------------------------------
[docs]
def items(self) -> tp.Iterator[tp.Tuple[TLabel, TFrameAny]]:
"""Iterator of pairs of :obj:`Yarn` label and contained :obj:`Frame`."""
labels = iter(self._index)
for b_pos, frame_label in self._hierarchy._extract_iloc(self._indexer):
# NOTE: missing optimization to read multiple Frame from Bus in one extraction
yield next(labels), self._values[b_pos]._extract_loc(frame_label) # pyright: ignore
_items_store = items
@property
def values(self) -> TNDArrayObject:
"""A 1D object array of all :obj:`Frame` contained in all contained :obj:`Bus`."""
array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT)
for i, (b_pos, frame_label) in enumerate(
self._hierarchy._extract_iloc(self._indexer)
):
array[i] = self._values[b_pos]._extract_loc(frame_label) # pyright: ignore
array.flags.writeable = False
return array
# ---------------------------------------------------------------------------
[docs]
def __len__(self) -> int:
"""Length of values."""
return self._index.__len__()
[docs]
@doc_inject()
def display(
self,
config: tp.Optional[DisplayConfig] = None,
/,
*,
style_config: tp.Optional[StyleConfig] = None,
) -> Display:
"""{doc}
Args:
{config}
"""
# NOTE: the key change over serires is providing the Bus as the displayed class
config = config or DisplayActive.get()
display_cls = Display.from_values(
(), header=DisplayHeader(self.__class__, self._name), config=config
)
array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT)
for i, (b_pos, frame_label) in enumerate(
self._hierarchy._extract_iloc(self._indexer)
):
b = self._values[b_pos]
# NOTE: do not load FrameDeferred
array[i] = b._values_mutable[b.index.loc_to_iloc(frame_label)] # pyright: ignore
array.flags.writeable = False
# create temporary series just for display
series: TSeriesObject = Series(array, index=self._index, own_index=True)
return series._display(
config,
display_cls=display_cls,
style_config=style_config,
)
# ---------------------------------------------------------------------------
# extended discriptors; in general, these do not force loading Frame
@property
def mloc(self) -> TSeriesObject:
"""Returns a :obj:`Series` showing a tuple of memory locations within each loaded Frame."""
mlocs = [(b.mloc if b is not None else None) for b in self._values]
array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT)
for i, (b_pos, frame_label) in enumerate(
self._hierarchy._extract_iloc(self._indexer)
):
array[i] = mlocs[b_pos]._extract_loc(frame_label)
array.flags.writeable = False
return Series(array, index=self._index, own_index=True, name='mloc')
@property
def dtypes(self) -> TFrameAny:
"""Returns a Frame of dtypes for all loaded Frames."""
deferred_dtypes = Series((None,))
def gen() -> tp.Iterator[TSeriesObject]:
for b_pos, frame_label in self._hierarchy._extract_iloc(self._indexer):
b = self._values[b_pos]
f = b._values_mutable[b.index.loc_to_iloc(frame_label)] # pyright: ignore
if f is FrameDeferred:
yield deferred_dtypes
else:
yield f.dtypes
return Frame.from_concat(gen(), index=self._index, fill_value=None)
@property
def shapes(self) -> TSeriesObject:
"""A :obj:`Series` describing the shape of each loaded :obj:`Frame`. Unloaded :obj:`Frame` will have a shape of None.
Returns:
:obj:`tp.Series`
"""
# collect shape Series
shapes = [(b.shapes if b is not None else None) for b in self._values]
array = np.empty(shape=len(self._index), dtype=DTYPE_OBJECT)
for i, (b_pos, frame_label) in enumerate(
self._hierarchy._extract_iloc(self._indexer)
):
array[i] = shapes[b_pos][frame_label] # pyright: ignore
array.flags.writeable = False
return Series(array, index=self._index, own_index=True, name='shape')
@property
def nbytes(self) -> int:
"""Total bytes of data currently loaded in :obj:`Frame` contained in this :obj:`Yarn`."""
post = 0
for b_pos, frame_label in self._hierarchy._extract_iloc(self._indexer):
b = self._values[b_pos]
f = b._values_mutable[b.index.loc_to_iloc(frame_label)] # pyright: ignore
if f is not FrameDeferred:
post += f.nbytes
return post
@property
def status(self) -> TFrameAny:
"""
Return a :obj:`Frame` indicating loaded status, size, bytes, and shape of all loaded :obj:`Frame` in :obj:`Bus` contined in this :obj:`Yarn`.
"""
# collect status Frame
status = [(b.status if b is not None else None) for b in self._values]
def gen() -> tp.Iterator[TNDArrayObject]:
for b_pos, frame_label in self._hierarchy._extract_iloc(self._indexer):
f = status[b_pos]
yield f._extract_array(f.index.loc_to_iloc(frame_label))
return Frame.from_records(
gen(), index=self._index, columns=('loaded', 'size', 'nbytes', 'shape')
)
@property
def inventory(self) -> TFrameAny:
"""Return a :obj:`Frame` indicating file_path, last-modified time, and size of underlying disk-based data stores if used for this :obj:`Yarn`."""
frames = []
index: tp.Dict[TLabel, None] = {} # ordered set
ih = self._hierarchy._extract_iloc(self._indexer)
for pos in ih.unique(0, order_by_occurrence=True):
b = self._values[pos]
frames.append(b.inventory) # pyright: ignore
index[b._name] = None # pyright: ignore
if len(index) == len(frames):
return Frame.from_concat(frames)
return Frame.from_concat(frames, index=IndexAutoFactory)
# ---------------------------------------------------------------------------
# common attributes from the numpy array
@property
def dtype(self) -> TDtypeObject:
"""
Return the dtype of the realized NumPy array.
Returns:
:obj:`numpy.dtype`
"""
return DTYPE_OBJECT # always dtype object
@property
def shape(self) -> tp.Tuple[int]:
"""
Return a tuple describing the shape of the realized NumPy array.
Returns:
:obj:`Tuple[int]`
"""
return (self._index.__len__(),)
@property
def ndim(self) -> int:
"""
Return the number of dimensions, which for a :obj:`Yarn` is always 1.
Returns:
:obj:`int`
"""
return self._NDIM
@property
def size(self) -> int:
"""
Return the size.
Returns:
:obj:`int`
"""
return self._index.__len__()
# ---------------------------------------------------------------------------
@property
def index(self) -> IndexBase:
"""
The index instance assigned to this container.
Returns:
:obj:`Index`
"""
return self._index
# ---------------------------------------------------------------------------
# dictionary-like interface
[docs]
def keys(self) -> IndexBase:
"""
Iterator of index labels.
Returns:
:obj:`Iterator[Hashable]`
"""
return self._index
[docs]
def __iter__(self) -> tp.Iterator[TLabel]:
"""
Iterator of index labels, same as :obj:`static_frame.Series.keys`.
Returns:
:obj:`Iterator[Hashasble]`
"""
return self._index.__iter__()
[docs]
def __contains__(
self,
value: TLabel,
/,
) -> bool:
"""
Inclusion of value in index labels.
Returns:
:obj:`bool`
"""
return self._index.__contains__(value)
[docs]
def get(
self,
key: TLabel,
default: tp.Any = None,
) -> tp.Any:
"""
Return the value found at the index key, else the default if the key is not found.
Returns:
:obj:`Any`
"""
if key not in self._index:
return default
return self.__getitem__(key)
@property
def via_mapping(self) -> YarnMapping[tp.Any]:
"""
Return a wrapper around :obj:`Yarn` data that fully implements the Python Mapping interface.
"""
return YarnMapping(self)
# ---------------------------------------------------------------------------
[docs]
@doc_inject()
def equals(
self,
other: tp.Any,
/,
*,
compare_name: bool = False,
compare_dtype: bool = False,
compare_class: bool = False,
skipna: bool = True,
) -> bool:
"""
{doc}
Note: this will attempt to load and compare all Frame managed by the Bus.
Args:
{compare_name}
{compare_dtype}
{compare_class}
{skipna}
"""
if id(other) == id(self):
return True
if compare_class and self.__class__ != other.__class__:
return False
elif not isinstance(other, Yarn):
return False
if compare_name and self._name != other._name:
return False
# length of series in Yarn might be different but may still have the same frames, so look at realized length
if len(self) != len(other):
return False
if not self._index.equals(
other.index, # call property to force index creation
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
# can zip because length of Series already match
# using .values will force loading all Frame into memory; better to use items() to permit collection
for (_, frame_self), (_, frame_other) in zip(self.items(), other.items()):
if not frame_self.equals(
frame_other,
compare_name=compare_name,
compare_dtype=compare_dtype,
compare_class=compare_class,
skipna=skipna,
):
return False
return True
# ---------------------------------------------------------------------------
# transformations resulting in changed dimensionality
[docs]
@doc_inject(selector='head', class_name='Yarn')
def head(
self,
count: int = 5,
/,
) -> TYarnAny:
"""{doc}
Args:
{count}
Returns:
:obj:`Yarn`
"""
return self.iloc[:count]
[docs]
@doc_inject(selector='tail', class_name='Yarn')
def tail(
self,
count: int = 5,
/,
) -> TYarnAny:
"""{doc}s
Args:
{count}
Returns:
:obj:`Yarn`
"""
return self.iloc[-count:]
# ---------------------------------------------------------------------------
# transformations resulting in the same dimensionality
def _reverse(self, axis: int = 0) -> tp.Self:
"""
Return a reversed copy of this container, with no data copied.
"""
return self._extract_iloc(REVERSE_SLICE)
def _apply_ordering(
self,
order: TNDArrayIntDefault,
sort_status: SortStatus,
axis: int = 0,
) -> tp.Self:
"""
Return a copy of this container with the specified ordering applied along the index of axis
"""
yarn = self._extract_iloc(order)
yarn._index._sort_status = sort_status
return yarn
[docs]
@doc_inject(selector='sort')
def sort_index(
self,
*,
ascending: TBoolOrBools = True,
kind: TSortKinds = DEFAULT_SORT_KIND,
key: tp.Optional[
tp.Callable[[IndexBase], tp.Union[TNDArrayAny, IndexBase]]
] = None,
) -> tp.Self:
"""
Return a new Yarn ordered by the sorted Index.
Args:
*
{ascendings}
{kind}
{key}
Returns:
:obj:`Yarn`
"""
return sort_index_from_params(
self._index,
ascending=ascending,
key=key,
kind=kind,
container=self,
)
[docs]
@doc_inject(selector='sort')
def sort_values(
self,
*,
ascending: bool = True,
kind: TSortKinds = DEFAULT_SORT_KIND,
key: tp.Callable[[TYarnAny], tp.Union[TNDArrayAny, TSeriesAny]],
) -> tp.Self:
"""
Return a new Yarn ordered by the sorted values. Note that as a Yarn contains Frames, a `key` argument must be provided to extract a sortable value, and this key function will process a :obj:`Series` of :obj:`Frame`.
Args:
*
{ascending}
{kind}
{key}
Returns:
:obj:`Yarn`
"""
cfs = key(self)
cfs_values: TNDArrayAny = cfs if cfs.__class__ is np.ndarray else cfs.values # type: ignore
asc_is_element = isinstance(ascending, BOOL_TYPES)
if not asc_is_element:
raise RuntimeError('Multiple ascending values not permitted.')
# argsort lets us do the sort once and reuse the results
order = np.argsort(cfs_values, kind=kind)
if not ascending:
order = order[::-1]
return self._extract_iloc(order)
[docs]
def roll(
self,
shift: int,
/,
*,
include_index: bool = False,
) -> tp.Self:
"""Return a Yarn with values rotated forward and wrapped around the index (with a positive shift) or backward and wrapped around the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
include_index: Determine if the Index is shifted with the underlying data.
Returns:
:obj:`Yarn`
"""
if shift % len(self._indexer):
indexer = array_shift(array=self._indexer, shift=shift, axis=0, wrap=True)
indexer.flags.writeable = False
else:
indexer = self._indexer
if include_index:
index = self._index.roll(shift=shift)
own_index = True
else:
index = self._index
own_index = False
return self.__class__(
self._values,
index=index,
own_index=own_index,
indexer=indexer,
hierarchy=self._hierarchy,
name=self._name,
deepcopy_from_bus=self._deepcopy_from_bus,
)
[docs]
def shift(
self,
shift: int,
/,
*,
fill_value: tp.Any,
) -> tp.Self:
"""Return a :obj:`Yarn` with values shifted forward on the index (with a positive shift) or backward on the index (with a negative shift).
Args:
shift: Positive or negative integer shift.
fill_value: Value to be used to fill data missing after the shift.
Returns:
:obj:`Yarn`
"""
raise NotImplementedError(
'A `Yarn` cannot be shifted as newly created missing values cannot be filled without replacing stored `Bus`.'
)
# ---------------------------------------------------------------------------
# exporter
[docs]
def to_series(self) -> TSeriesObject: # can get generic Bus index
"""Return a :obj:`Series` with the :obj:`Frame` contained in all contained :obj:`Bus`."""
# NOTE: this will load all deferred Frame
return Series(
self.values,
index=self._index,
own_index=True,
name=self._name,
)
[docs]
def to_manifest(
self,
fp: TPathSpecifier,
/,
*,
label_encoder: tp.Callable[[TLabel], str] | None = None,
) -> None:
"""Write each contained :obj:`Frame` as an NPY directory within the directory given by ``fp``. Each :obj:`Frame` is stored as a subdirectory named by its label. Frames from each underlying :obj:`Bus` are extracted, with zip NPY and zip NPZ stores extracting directly without full :obj:`Frame` materialization.
Args:
fp: directory path in which to write the manifest.
label_encoder: callable to convert non-string labels to strings for use as directory names. Required when labels are not strings.
"""
ArchiveManifest.to_manifest(fp, self, label_encoder=label_encoder)
def _to_signature_bytes(
self,
include_name: bool = True,
include_class: bool = True,
encoding: str = 'utf-8',
) -> bytes:
# For a Yarn, the signature bytes need only contain the signature of the associated Frame and the index; all else are internal implementation mechanisms
v = (
f._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding,
)
for f in self._axis_element()
)
return b''.join(
chain(
iter_component_signature_bytes(
self,
include_name=include_name,
include_class=include_class,
encoding=encoding,
),
(
self._index._to_signature_bytes(
include_name=include_name,
include_class=include_class,
encoding=encoding,
),
),
v,
)
)
TYarnAny = Yarn[tp.Any]