Source code for static_frame.core.memory_measure

from __future__ import annotations

from collections import abc
from itertools import chain
from sys import getsizeof
from typing import NamedTuple

import numpy as np
import typing_extensions as tp

from static_frame.core.display_config import DisplayConfig
from static_frame.core.doc_str import doc_inject
from static_frame.core.util import DTYPE_OBJECT_KIND, EMPTY_ARRAY, bytes_to_size_label

if tp.TYPE_CHECKING:
    from static_frame.core.frame import Frame

    TNDArrayAny = np.ndarray[tp.Any, tp.Any]
    TFrameAny = Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]]


class MFConfig(NamedTuple):
    local_only: bool  # only data locally owned by arrays, or all referenced data
    materialized: bool  # measure byte payload nbytes (regardless of sharing)
    data_only: bool  # only array byte payloads, or all objects


class MeasureFormatMeta(type):
    def __iter__(cls) -> tp.Iterator[MFConfig]:
        return (v for (k, v) in vars(cls).items() if not k.startswith('_'))


class MeasureFormat(metaclass=MeasureFormatMeta):
    LOCAL = MFConfig(
        local_only=True,
        materialized=False,
        data_only=False,
    )
    LOCAL_MATERIALIZED = MFConfig(
        local_only=True,
        materialized=True,
        data_only=False,
    )
    LOCAL_MATERIALIZED_DATA = MFConfig(
        local_only=True,
        materialized=True,
        data_only=True,
    )
    # NOTE MFConfig(local_only=True, materialized=False, data_only=True) is invalid

    REFERENCED = MFConfig(
        local_only=False,
        materialized=False,
        data_only=False,
    )
    REFERENCED_MATERIALIZED = MFConfig(
        local_only=False,
        materialized=True,
        data_only=False,
    )
    REFERENCED_MATERIALIZED_DATA = MFConfig(
        local_only=False,
        materialized=True,
        data_only=True,
    )
    # NOTE MFConfig(local_only=False, materialized=False, data_only=True) is invalid


FORMAT_TO_DISPLAY = {
    MeasureFormat.LOCAL: 'L',
    MeasureFormat.LOCAL_MATERIALIZED: 'LM',
    MeasureFormat.LOCAL_MATERIALIZED_DATA: 'LMD',
    MeasureFormat.REFERENCED: 'R',
    MeasureFormat.REFERENCED_MATERIALIZED: 'RM',
    MeasureFormat.REFERENCED_MATERIALIZED_DATA: 'RMD',
}


class MaterializedArray:
    """Wrapper of array that delivers the sizeof as the fully realized size, ignoring any potential sharing of memory."""

    __slots__ = (
        '_array',
        '_format',
    )
    BASE_ARRAY_BYTES = getsizeof(EMPTY_ARRAY)

    def __init__(
        self,
        array: TNDArrayAny,
        format: MFConfig = MeasureFormat.LOCAL,
    ):
        self._array = array
        self._format = format

    def __sizeof__(self) -> int:
        size = 0
        if self._format.local_only and self._array.base is not None:
            pass  # all data referenced externally
        else:
            size += self._array.nbytes

        if not self._format.data_only:
            size += self.BASE_ARRAY_BYTES

        return size


class MemoryMeasure:
    @staticmethod
    def _iter_iterable(obj: tp.Any) -> tp.Iterator[tp.Any]:
        """
        Generates the iterable children that have not been counted by a getsizeof call on the parent object
        """
        if hasattr(obj, '__iter__') and not isinstance(obj, str):
            if obj.__class__ is np.ndarray and obj.dtype.kind == DTYPE_OBJECT_KIND:
                # NOTE: iter(obj) would return slices for multi-dimensional arrays
                yield from (obj[loc] for loc in np.ndindex(obj.shape))
            elif isinstance(obj, (abc.Sequence, abc.Set)):
                yield from obj
            elif isinstance(obj, dict):
                yield from chain.from_iterable(obj.items())
            else:
                # The full size of the object is included in its getsizeof call
                # e.g. SF Containers, FrozenAutoMap, integer numpy arrays, int, float, etc.
                pass

    @staticmethod
    def _iter_slots(obj: tp.Any) -> tp.Iterator[tp.Any]:
        """
        Generates an iterable of the values of all slot-based attributes in an object, including the slots contained in the object's parent classes based on the MRO
        """
        # NOTE: This does NOT support 'single-string' slots (i.e. __slots__ = 'foo')
        slots = chain.from_iterable(
            cls.__slots__ for cls in obj.__class__.__mro__ if hasattr(cls, '__slots__')
        )
        attrs = (
            getattr(obj, slot)
            for slot in slots
            if slot != '__weakref__' and hasattr(obj, slot)
        )
        yield from attrs

    @classmethod
    def nested_sizable_elements(
        cls,
        obj: tp.Any,
        *,
        format: MFConfig = MeasureFormat.REFERENCED,
        seen: tp.Set[int],
    ) -> tp.Iterator[tp.Any]:
        """
        Generates an iterable of all objects the parent object has references to, including nested references. This function considers both the iterable unsized children (based on _iter_iterable) and the sizable
        attributes listed in its slots. The resulting generator is in pre-order and includes the parent object at the end.
        """
        from static_frame.core.container import ContainerBase

        if id(obj) in seen:
            return
        seen.add(id(obj))

        if obj.__class__ is np.ndarray:
            if format.materialized:
                obj = MaterializedArray(obj, format=format)
            else:  # non-object arrays report included elements
                if obj.dtype.kind == DTYPE_OBJECT_KIND:
                    for el in cls._iter_iterable(obj):
                        yield from cls.nested_sizable_elements(
                            el, seen=seen, format=format
                        )
                if not format.local_only and obj.base is not None:
                    # include the base array for numpy slices / views only if that base has not been seen
                    yield from cls.nested_sizable_elements(
                        obj.base, seen=seen, format=format
                    )
            yield obj

        elif obj.__class__ is MaterializedArray:
            yield obj

        else:
            for el in cls._iter_iterable(obj):  # will not yield if no __iter__
                yield from cls.nested_sizable_elements(el, seen=seen, format=format)
            # arrays do not have slots
            for el in cls._iter_slots(obj):
                yield from cls.nested_sizable_elements(el, seen=seen, format=format)
            yield obj


def memory_total(
    obj: tp.Any,
    *,
    format: MFConfig = MeasureFormat.REFERENCED,
    seen: tp.Union[None, tp.Set[tp.Any]] = None,
) -> int:
    """
    Returns the total size of the object and its references, including nested refrences
    """
    seen = set() if seen is None else seen

    def gen() -> tp.Iterator[int]:
        for component in MemoryMeasure.nested_sizable_elements(
            obj,
            seen=seen,
            format=format,
        ):
            if format.data_only and component.__class__ is MaterializedArray:
                yield component.__sizeof__()  # call directly to avoid gc ovehead addition
            else:
                yield getsizeof(component)

    return sum(gen())


[docs] @doc_inject(selector='memory') class MemoryDisplay: """{}""" __slots__ = ( '_frame', '_repr', )
[docs] @classmethod def from_any( cls, obj: tp.Any, label_component_pairs: tp.Iterable[tp.Tuple[str, tp.Any]] = (), ) -> 'MemoryDisplay': """Given any slotted object, return a :obj:`MemoryDisplay` instance.""" from static_frame.core.frame import Frame parts = chain(label_component_pairs, (('Total', obj),)) def gen() -> tp.Iterator[tp.Tuple[tp.Tuple[str, ...], tp.List[int]]]: for label, part in parts: sizes = [] for format in MeasureFormat: # NOTE: not sharing seen accross evaluations sizes.append(memory_total(part, format=format)) yield label, sizes # pyright: ignore if hasattr(obj, 'name') and obj.name is not None: name = f'<{obj.__class__.__name__}: {obj.name}>' else: name = f'<{obj.__class__.__name__}>' f: TFrameAny = Frame.from_records_items( gen(), columns=(FORMAT_TO_DISPLAY[mf] for mf in MeasureFormat), name=name, ) return cls(f)
[docs] def __init__(self, frame: TFrameAny): """Initialize an instance with a ``Frame`` of byte counts.""" from static_frame.core.frame import Frame self._frame = frame f_size = self._frame.iter_element().apply(bytes_to_size_label) def gen() -> tp.Iterator[tp.Sequence[str]]: for row_old in f_size.iter_tuple(axis=1): row_new = [] for e in row_old: row_new.extend(e.split(' ')) yield row_new f: TFrameAny = Frame.from_records(gen(), index=f_size.index) columns = [ f_size.columns[i // 2] if i % 2 == 0 else f'{f_size.columns[i // 2]}u'.ljust(5) for i in range(len(f.columns)) ] f = f.relabel(columns=columns) dc = DisplayConfig(type_show=False) self._repr: str = f.display(dc).__repr__()
[docs] def __repr__(self) -> str: return self._repr
[docs] def to_frame(self) -> TFrameAny: """Return a Frame of byte counts.""" return self._frame