Source code for static_frame.core.store_config

from __future__ import annotations

import typing_extensions as tp

from static_frame.core.exception import ErrorInitStoreConfig
from static_frame.core.frame import Frame
from static_frame.core.interface_meta import InterfaceMeta
from static_frame.core.util import TDepthLevel
from static_frame.core.util import TDtypesSpecifier
from static_frame.core.util import TIndexCtorSpecifiers
from static_frame.core.util import TLabel

TFrameAny = Frame[tp.Any, tp.Any, tp.Unpack[tp.Tuple[tp.Any, ...]]] # type: ignore[type-arg]

#-------------------------------------------------------------------------------

class StoreConfigHE(metaclass=InterfaceMeta):
    '''
    A read-only, hashable container used by :obj:`Store` subclasses for reading from and writing to multi-table storage formats.
    '''

    index_depth: int
    index_name_depth_level: tp.Optional[TDepthLevel]
    index_constructors: TIndexCtorSpecifiers
    columns_depth: int
    columns_name_depth_level: tp.Optional[TDepthLevel]
    columns_constructors: TIndexCtorSpecifiers
    columns_select: tp.Optional[tp.Iterable[str]]
    dtypes: TDtypesSpecifier
    consolidate_blocks: bool
    skip_header: int
    skip_footer: int
    trim_nadir: bool
    include_index: bool
    include_index_name: bool
    include_columns: bool
    include_columns_name: bool
    merge_hierarchical_labels: bool
    read_max_workers: tp.Optional[int]
    read_chunksize: int
    write_max_workers: tp.Optional[int]
    write_chunksize: int
    mp_context: tp.Optional[str]
    _hash: tp.Optional[int]

    __slots__ = (
            'index_depth',
            'index_name_depth_level',
            'index_constructors',
            'columns_depth',
            'columns_name_depth_level',
            'columns_constructors',
            'columns_select',
            'dtypes',
            'consolidate_blocks',
            'skip_header',
            'skip_footer',
            'trim_nadir',
            'include_index',
            'include_index_name',
            'include_columns',
            'include_columns_name',
            'merge_hierarchical_labels',
            'read_max_workers',
            'read_chunksize',
            'write_max_workers',
            'write_chunksize',
            'mp_context',
            '_hash'
            )

    def __init__(self, *,
            # constructors
            index_depth: int = 0, # this default does not permit round trip
            index_name_depth_level: tp.Optional[TDepthLevel] = None,
            index_constructors: TIndexCtorSpecifiers = None,
            columns_depth: int = 1,
            columns_name_depth_level: tp.Optional[TDepthLevel] = None,
            columns_constructors: TIndexCtorSpecifiers = None,
            columns_select: tp.Optional[tp.Iterable[str]] = None,
            dtypes: TDtypesSpecifier = None,
            consolidate_blocks: bool = False,
            # not used by all constructors
            skip_header: int = 0,
            skip_footer: int = 0,
            trim_nadir: bool = False,
            # exporters
            include_index: bool = True,
            include_index_name: bool = True,
            include_columns: bool = True,
            include_columns_name: bool = False,
            # not used by all exporters
            merge_hierarchical_labels: bool = True,
            # multiprocessing configuration
            read_max_workers: tp.Optional[int] = None,
            read_chunksize: int = 1,
            write_max_workers: tp.Optional[int] = None,
            write_chunksize: int = 1,
            mp_context: tp.Optional[str] = None,
            ):
        '''
        Args:
            include_index: Boolean to determine if the ``index`` is included in output.
            include_columns: Boolean to determine if the ``columns`` is included in output.
        '''
        # constructor
        self.index_depth = index_depth
        self.index_name_depth_level = index_name_depth_level
        self.index_constructors = index_constructors
        self.columns_depth = columns_depth
        self.columns_name_depth_level = columns_name_depth_level
        self.columns_constructors = columns_constructors
        self.columns_select = columns_select
        self.dtypes = dtypes
        self.consolidate_blocks = consolidate_blocks
        self.skip_header = skip_header
        self.skip_footer = skip_footer
        self.trim_nadir = trim_nadir

        # exporter
        self.include_index = include_index
        self.include_index_name = include_index_name
        self.include_columns = include_columns
        self.include_columns_name = include_columns_name
        # self.format_index = format_index
        # self.format_columns = format_columns
        self.merge_hierarchical_labels = merge_hierarchical_labels

        self.read_max_workers = read_max_workers
        self.read_chunksize = read_chunksize
        self.write_max_workers = write_max_workers
        self.write_chunksize = write_chunksize
        self.mp_context = mp_context
        self._hash = None

    def __eq__(self, other: tp.Any) -> bool:
        if not isinstance(other, StoreConfigHE):
            return False

        for attr in self.__slots__:
            if attr.startswith('_'):
                continue
            if getattr(self, attr) != getattr(other, attr):
                return False

        return True

    def __ne__(self, other: tp.Any) -> bool:
        return not self.__eq__(other)

    @staticmethod
    def _hash_depth_specifier(depth_specifier: tp.Optional[TDepthLevel]) -> TLabel:
        if depth_specifier is None or isinstance(depth_specifier, int):
            return depth_specifier
        return tuple(depth_specifier)

    @staticmethod
    def _hash_dtypes_specifier(dtypes_specifier: TDtypesSpecifier) -> TLabel:
        if dtypes_specifier is None :
            return dtypes_specifier
        if isinstance(dtypes_specifier, dict):
            return tuple(dtypes_specifier.items())
        if isinstance(dtypes_specifier, list):
            return tuple(dtypes_specifier)
        return dtypes_specifier # type: ignore [return-value]

    def __hash__(self) -> int:
        if self._hash is None:
            self._hash = hash((
                    self.index_depth, # int
                    self._hash_depth_specifier(self.index_name_depth_level),
                    self.index_constructors, # class or callable
                    self.columns_depth, # int
                    self._hash_depth_specifier(self.columns_name_depth_level),
                    self.columns_constructors, # class or callable
                    self.columns_select if self.columns_select is None else tuple(self.columns_select),
                    self._hash_dtypes_specifier(self.dtypes),
                    self.consolidate_blocks, # bool
                    self.skip_header, # int
                    self.skip_footer, # int
                    self.trim_nadir, # bool
                    self.include_index, # bool
                    self.include_index_name, # bool
                    self.include_columns, # bool
                    self.include_columns_name, # bool
                    self.merge_hierarchical_labels, # bool
                    self.read_max_workers, # Optional[int]
                    self.read_chunksize, # int
                    self.write_max_workers, # Optional[int]
                    self.write_chunksize, # int
                    self.mp_context,
            ))
        return self._hash


[docs] class StoreConfig(StoreConfigHE): ''' A read-only container of parameters used by :obj:`Store` subclasses for reading from and writing to multi-table storage formats. ''' label_encoder: tp.Optional[tp.Callable[[TLabel], str]] label_decoder: tp.Optional[tp.Callable[[str], TLabel]] __slots__ = ( 'label_encoder', 'label_decoder', )
[docs] @classmethod def from_frame(cls, frame: TFrameAny) -> 'StoreConfig': '''Derive a config from a Frame. ''' include_index = frame.index.depth > 1 or not frame.index._map is None # type: ignore index_depth = 0 if not include_index else frame.index.depth include_columns = frame.columns.depth > 1 or not frame.columns._map is None # type: ignore columns_depth = 0 if not include_columns else frame.columns.depth return cls( index_depth=index_depth, columns_depth=columns_depth, include_index=include_index, include_columns=include_columns )
[docs] def __init__(self, *, index_depth: int = 0, index_name_depth_level: tp.Optional[TDepthLevel] = None, index_constructors: TIndexCtorSpecifiers = None, columns_depth: int = 1, columns_name_depth_level: tp.Optional[TDepthLevel] = None, columns_constructors: TIndexCtorSpecifiers = None, columns_select: tp.Optional[tp.Iterable[str]] = None, dtypes: TDtypesSpecifier = None, consolidate_blocks: bool = False, skip_header: int = 0, skip_footer: int = 0, trim_nadir: bool = False, include_index: bool = True, include_index_name: bool = True, include_columns: bool = True, include_columns_name: bool = False, merge_hierarchical_labels: bool = True, label_encoder: tp.Optional[tp.Callable[[TLabel], str]] = None, label_decoder: tp.Optional[tp.Callable[[str], TLabel]] = None, read_max_workers: tp.Optional[int] = None, read_chunksize: int = 1, write_max_workers: tp.Optional[int] = None, write_chunksize: int = 1, mp_context: tp.Optional[str] = None, ): StoreConfigHE.__init__(self, index_depth=index_depth, index_name_depth_level=index_name_depth_level, index_constructors=index_constructors, columns_depth=columns_depth, columns_name_depth_level=columns_name_depth_level, columns_constructors=columns_constructors, columns_select=columns_select, dtypes=dtypes, consolidate_blocks=consolidate_blocks, skip_header=skip_header, skip_footer=skip_footer, trim_nadir=trim_nadir, include_index=include_index, include_index_name=include_index_name, include_columns=include_columns, include_columns_name=include_columns_name, merge_hierarchical_labels=merge_hierarchical_labels, read_max_workers=read_max_workers, read_chunksize=read_chunksize, write_max_workers=write_max_workers, write_chunksize=write_chunksize, mp_context=mp_context, ) self.label_encoder = label_encoder self.label_decoder = label_decoder
[docs] def label_encode(self, label: TLabel) -> str: if self.label_encoder: label = self.label_encoder(label) if not isinstance(label, str): raise RuntimeError(f'Store label {label!r} is not a string; provide a label_encoder to StoreConfig') return label
[docs] def label_decode(self, label: str) -> TLabel: if self.label_decoder: return self.label_decoder(label) return label
[docs] def to_store_config_he(self) -> 'StoreConfigHE': ''' Return a ``StoreConfigHE`` version of this StoreConfig. ''' return StoreConfigHE(**{attr: getattr(self, attr) for attr in StoreConfigHE.__slots__ if not attr.startswith('_')})
def __eq__(self, other: tp.Any) -> bool: if not isinstance(other, StoreConfig): return False return id(self) == id(other) def __hash__(self) -> int: raise NotImplementedError()
SCMMapType = tp.Mapping[tp.Any, StoreConfig] SCMMapInitializer = tp.Optional[SCMMapType] StoreConfigMapInitializer = tp.Union[ StoreConfig, SCMMapInitializer, 'StoreConfigMap' ] class StoreConfigMap: ''' Container of one or more StoreConfig, with the optional specification of a default StoreConfig. Assumed immutable over the life of the instance. ''' __slots__ = ( '_map', '_default' ) _DEFAULT: StoreConfig = StoreConfig() # These attrs (when set) must align with default _ALIGN_WITH_DEFAULT_ATTRS = ( 'label_encoder', 'label_decoder', 'read_max_workers', 'read_chunksize', 'write_max_workers', 'write_chunksize', ) @classmethod def from_frames(cls, frames: tp.Iterable[TFrameAny]) -> 'StoreConfigMap': ''' Derive a config map from an iterable of Frames ''' config_map = {f.name: StoreConfig.from_frame(f) for f in frames} return cls(config_map, own_config_map=True) @classmethod def from_config(cls, config: StoreConfig) -> 'StoreConfigMap': return cls(default=config) @classmethod def from_initializer( cls, initializer: StoreConfigMapInitializer ) -> 'StoreConfigMap': if isinstance(initializer, StoreConfig): return cls.from_config(initializer) if isinstance(initializer, cls): # return same instance return initializer if initializer is None: # will get default configuration return cls() assert isinstance(initializer, dict) return cls(initializer) def __init__(self, config_map: SCMMapInitializer = None, *, default: tp.Optional[StoreConfig] = None, own_config_map: bool = False ): if default is None: self._default = self._DEFAULT elif not isinstance(default, StoreConfig): raise ErrorInitStoreConfig( f'unspported class {default}, must be {StoreConfig}') else: self._default = default # initialize new dict and transfer to support checking Config classes self._map: SCMMapType = {} if own_config_map and config_map is not None: self._map = config_map elif config_map: for label, config in config_map.items(): if not isinstance(config, self._DEFAULT.__class__): raise ErrorInitStoreConfig( f'unspported class {config}, must be {self._DEFAULT.__class__}') for attr in self._ALIGN_WITH_DEFAULT_ATTRS: if getattr(config, attr) != getattr(self._default, attr): raise ErrorInitStoreConfig(f'config {label} has {attr} inconsistent with default; align values and/or pass a default StoreConfig.') self._map[label] = config def __getitem__(self, key: tp.Optional[TLabel]) -> StoreConfig: return self._map.get(key, self._default) @property def default(self) -> StoreConfig: return self._default