Source code for static_frame.core.store_config

from __future__ import annotations

import typing_extensions as tp

from static_frame.core.exception import ErrorInitStoreConfig
from static_frame.core.frame import Frame
from static_frame.core.interface_meta import InterfaceMeta
from static_frame.core.store_filter import STORE_FILTER_DEFAULT, StoreFilter
from static_frame.core.util import (
    DTYPE_STR_KINDS,
    TDepthLevel,
    TDtypesSpecifier,
    TIndexCtorSpecifiers,
    TLabel,
    TMpContext,
)

TFrameAny = Frame[tp.Any, tp.Any, tp.Unpack[tuple[tp.Any, ...]]]


def label_encode_tuple(source: tuple[tp.Any, ...]) -> str:
    """For encoding tuples of NumPy scalars in strings that can use literal_eval to re-evaluate"""
    parts = []
    for obj in source:
        if dt := getattr(obj, 'dtype', None):  # a NumPy scalar
            if dt.kind in DTYPE_STR_KINDS:
                parts.append(f"'{obj}'")
            else:  # str, not repr, must be used
                parts.append(str(obj))
        elif isinstance(obj, str):
            parts.append(repr(obj))
        else:
            parts.append(str(obj))
    return f'({", ".join(parts)})'


# -------------------------------------------------------------------------------


class StoreConfigHE(metaclass=InterfaceMeta):
    """
    A read-only, hashable container used by :obj:`Store` subclasses for reading from and writing to multi-table storage formats.
    """

    index_depth: int
    index_name_depth_level: TDepthLevel | None
    index_constructors: TIndexCtorSpecifiers
    columns_depth: int
    columns_name_depth_level: TDepthLevel | None
    columns_constructors: TIndexCtorSpecifiers
    columns_select: tp.Iterable[str] | None
    dtypes: TDtypesSpecifier
    consolidate_blocks: bool
    skip_header: int
    skip_footer: int
    trim_nadir: bool
    include_index: bool
    include_index_name: bool
    include_columns: bool
    include_columns_name: bool
    merge_hierarchical_labels: bool
    read_max_workers: int | None
    read_chunksize: int
    write_max_workers: int | None
    write_chunksize: int
    mp_context: TMpContext
    _hash: int | None

    __slots__ = (
        'index_depth',
        'index_name_depth_level',
        'index_constructors',
        'columns_depth',
        'columns_name_depth_level',
        'columns_constructors',
        'columns_select',
        'dtypes',
        'consolidate_blocks',
        'skip_header',
        'skip_footer',
        'trim_nadir',
        'include_index',
        'include_index_name',
        'include_columns',
        'include_columns_name',
        'merge_hierarchical_labels',
        'read_max_workers',
        'read_chunksize',
        'write_max_workers',
        'write_chunksize',
        'mp_context',
        '_hash',
    )

    def __init__(
        self,
        *,
        # constructors
        index_depth: int = 0,  # this default does not permit round trip
        index_name_depth_level: TDepthLevel | None = None,
        index_constructors: TIndexCtorSpecifiers = None,
        columns_depth: int = 1,
        columns_name_depth_level: TDepthLevel | None = None,
        columns_constructors: TIndexCtorSpecifiers = None,
        columns_select: tp.Iterable[str] | None = None,
        dtypes: TDtypesSpecifier = None,
        consolidate_blocks: bool = False,
        # not used by all constructors
        skip_header: int = 0,
        skip_footer: int = 0,
        trim_nadir: bool = False,
        # exporters
        include_index: bool = True,
        include_index_name: bool = True,
        include_columns: bool = True,
        include_columns_name: bool = False,
        # not used by all exporters
        merge_hierarchical_labels: bool = True,
        # multiprocessing configuration
        read_max_workers: int | None = None,
        read_chunksize: int = 1,
        write_max_workers: int | None = None,
        write_chunksize: int = 1,
        mp_context: TMpContext = None,
    ) -> None:
        """
        Args:
            include_index: Boolean to determine if the ``index`` is included in output.
            include_columns: Boolean to determine if the ``columns`` is included in output.
        """
        # constructor
        self.index_depth = index_depth
        self.index_name_depth_level = index_name_depth_level
        self.index_constructors = index_constructors
        self.columns_depth = columns_depth
        self.columns_name_depth_level = columns_name_depth_level
        self.columns_constructors = columns_constructors
        self.columns_select = columns_select
        self.dtypes = dtypes
        self.consolidate_blocks = consolidate_blocks
        self.skip_header = skip_header
        self.skip_footer = skip_footer
        self.trim_nadir = trim_nadir

        # exporter
        self.include_index = include_index
        self.include_index_name = include_index_name
        self.include_columns = include_columns
        self.include_columns_name = include_columns_name
        # self.format_index = format_index
        # self.format_columns = format_columns
        self.merge_hierarchical_labels = merge_hierarchical_labels

        self.read_max_workers = read_max_workers
        self.read_chunksize = read_chunksize
        self.write_max_workers = write_max_workers
        self.write_chunksize = write_chunksize
        self.mp_context = mp_context
        self._hash = None

    def __eq__(self, other: tp.Any) -> bool:
        if not isinstance(other, StoreConfigHE):
            return False

        for attr in self.__slots__:
            if attr.startswith('_'):
                continue
            if getattr(self, attr) != getattr(other, attr):
                return False

        return True

    def __ne__(self, other: tp.Any) -> bool:
        return not self.__eq__(other)

    @staticmethod
    def _hash_depth_specifier(depth_specifier: TDepthLevel | None) -> TLabel:
        if depth_specifier is None or isinstance(depth_specifier, int):
            return depth_specifier
        return tuple(depth_specifier)

    @staticmethod
    def _hash_dtypes_specifier(dtypes_specifier: TDtypesSpecifier) -> TLabel:
        if dtypes_specifier is None:
            return dtypes_specifier
        if isinstance(dtypes_specifier, dict):
            return tuple(dtypes_specifier.items())
        if isinstance(dtypes_specifier, list):
            return tuple(dtypes_specifier)
        return dtypes_specifier  # type: ignore [return-value]

    def __hash__(self) -> int:
        if self._hash is None:
            self._hash = hash(
                (
                    self.index_depth,  # int
                    self._hash_depth_specifier(self.index_name_depth_level),
                    self.index_constructors,  # class or callable
                    self.columns_depth,  # int
                    self._hash_depth_specifier(self.columns_name_depth_level),
                    self.columns_constructors,  # class or callable
                    self.columns_select
                    if self.columns_select is None
                    else tuple(self.columns_select),
                    self._hash_dtypes_specifier(self.dtypes),
                    self.consolidate_blocks,  # bool
                    self.skip_header,  # int
                    self.skip_footer,  # int
                    self.trim_nadir,  # bool
                    self.include_index,  # bool
                    self.include_index_name,  # bool
                    self.include_columns,  # bool
                    self.include_columns_name,  # bool
                    self.merge_hierarchical_labels,  # bool
                    self.read_max_workers,  # Optional[int]
                    self.read_chunksize,  # int
                    self.write_max_workers,  # Optional[int]
                    self.write_chunksize,  # int
                    self.mp_context,
                )
            )
        return self._hash


[docs] class StoreConfig(StoreConfigHE): """ A read-only container of parameters used by :obj:`Store` subclasses for reading from and writing to multi-table storage formats. """ label_encoder: tp.Callable[[TLabel], str] | None label_decoder: tp.Callable[[str], TLabel] | None read_frame_filter: tp.Callable[[TLabel, Frame], Frame] | None store_filter: StoreFilter | None __slots__ = ( 'label_encoder', 'label_decoder', 'read_frame_filter', 'store_filter', )
[docs] @classmethod def from_frame(cls, frame: TFrameAny) -> 'StoreConfig': """Derive a config from a Frame.""" include_index = frame.index.depth > 1 or frame.index._map is not None # type: ignore index_depth = 0 if not include_index else frame.index.depth include_columns = frame.columns.depth > 1 or frame.columns._map is not None # type: ignore columns_depth = 0 if not include_columns else frame.columns.depth return cls( index_depth=index_depth, columns_depth=columns_depth, include_index=include_index, include_columns=include_columns, )
[docs] def __init__( self, *, index_depth: int = 0, index_name_depth_level: TDepthLevel | None = None, index_constructors: TIndexCtorSpecifiers = None, columns_depth: int = 1, columns_name_depth_level: TDepthLevel | None = None, columns_constructors: TIndexCtorSpecifiers = None, columns_select: tp.Iterable[str] | None = None, dtypes: TDtypesSpecifier = None, consolidate_blocks: bool = False, skip_header: int = 0, skip_footer: int = 0, trim_nadir: bool = False, include_index: bool = True, include_index_name: bool = True, include_columns: bool = True, include_columns_name: bool = False, merge_hierarchical_labels: bool = True, label_encoder: tp.Callable[[TLabel], str] | None = None, label_decoder: tp.Callable[[str], TLabel] | None = None, read_frame_filter: tp.Callable[[TLabel, Frame], Frame] | None = None, read_max_workers: int | None = None, read_chunksize: int = 1, write_max_workers: int | None = None, write_chunksize: int = 1, mp_context: TMpContext = None, store_filter: StoreFilter | None = STORE_FILTER_DEFAULT, ): StoreConfigHE.__init__( self, index_depth=index_depth, index_name_depth_level=index_name_depth_level, index_constructors=index_constructors, columns_depth=columns_depth, columns_name_depth_level=columns_name_depth_level, columns_constructors=columns_constructors, columns_select=columns_select, dtypes=dtypes, consolidate_blocks=consolidate_blocks, skip_header=skip_header, skip_footer=skip_footer, trim_nadir=trim_nadir, include_index=include_index, include_index_name=include_index_name, include_columns=include_columns, include_columns_name=include_columns_name, merge_hierarchical_labels=merge_hierarchical_labels, read_max_workers=read_max_workers, read_chunksize=read_chunksize, write_max_workers=write_max_workers, write_chunksize=write_chunksize, mp_context=mp_context, ) self.label_encoder = label_encoder self.label_decoder = label_decoder self.read_frame_filter = read_frame_filter self.store_filter = store_filter
[docs] def label_encode(self, label: TLabel) -> str: if self.label_encoder is str and isinstance(label, tuple): # with NumPy2, str() of a tuple of NumPy scalars returns (np.str_('a'), np.int64(1)), not ('a', 1) label = label_encode_tuple(label) elif self.label_encoder: label = self.label_encoder(label) if not isinstance(label, str): raise RuntimeError( f'Store label {label!r} is not a string; provide a label_encoder to StoreConfig' ) return label
[docs] def label_decode(self, label: str) -> TLabel: if self.label_decoder: return self.label_decoder(label) return label
[docs] def to_store_config_he(self) -> 'StoreConfigHE': """ Return a ``StoreConfigHE`` version of this StoreConfig. """ return StoreConfigHE( **{ attr: getattr(self, attr) for attr in StoreConfigHE.__slots__ if not attr.startswith('_') } )
def __eq__(self, other: tp.Any) -> bool: if not isinstance(other, StoreConfig): return False return id(self) == id(other) def __hash__(self) -> int: raise NotImplementedError()
SCMMapType: tp.TypeAlias = tp.Mapping[TLabel, StoreConfig] SCMMapInitializer: tp.TypeAlias = tp.Optional[SCMMapType] StoreConfigMapInitializer: tp.TypeAlias = tp.Union[ StoreConfig, SCMMapInitializer, 'StoreConfigMap' ] class StoreConfigMap: """ Container of one or more StoreConfig, with the optional specification of a default StoreConfig. Assumed immutable over the life of the instance. """ __slots__ = ('_map', '_default') _DEFAULT: StoreConfig = StoreConfig() # These attrs (when set) must align with default _ALIGN_WITH_DEFAULT_ATTRS = ( 'label_encoder', 'label_decoder', 'read_frame_filter', 'read_max_workers', 'read_chunksize', 'write_max_workers', 'write_chunksize', ) @classmethod def from_frames(cls, frames: tp.Iterable[TFrameAny]) -> 'StoreConfigMap': """ Derive a config map from an iterable of Frames """ config_map = {f.name: StoreConfig.from_frame(f) for f in frames} return cls(config_map, own_config_map=True) @classmethod def from_config(cls, config: StoreConfig) -> 'StoreConfigMap': return cls(default=config) @classmethod def from_initializer(cls, initializer: StoreConfigMapInitializer) -> 'StoreConfigMap': if isinstance(initializer, StoreConfig): return cls.from_config(initializer) if isinstance(initializer, cls): # return same instance return initializer if initializer is None: # will get default configuration return cls() assert isinstance(initializer, dict) return cls(initializer) def __init__( self, config_map: SCMMapInitializer = None, *, default: StoreConfig | None = None, own_config_map: bool = False, ): if default is None: self._default = self._DEFAULT elif not isinstance(default, StoreConfig): raise ErrorInitStoreConfig( f'unspported class {default}, must be {StoreConfig}' ) else: self._default = default # initialize new dict and transfer to support checking Config classes self._map: SCMMapType = {} if own_config_map and config_map is not None: self._map = config_map elif config_map: for label, config in config_map.items(): if not isinstance(config, self._DEFAULT.__class__): raise ErrorInitStoreConfig( f'unspported class {config}, must be {self._DEFAULT.__class__}' ) for attr in self._ALIGN_WITH_DEFAULT_ATTRS: if getattr(config, attr) != getattr(self._default, attr): raise ErrorInitStoreConfig( f'config {label!r} has {attr} inconsistent with default; align values and/or pass a default StoreConfig.' ) self._map[label] = config def __getitem__(self, key: TLabel | None) -> StoreConfig: return self._map.get(key, self._default) @property def default(self) -> StoreConfig: return self._default