Source code for static_frame.core.archive_npy

from __future__ import annotations

import io
import json
import mmap
import os
import shutil
import struct
from ast import literal_eval
from collections import defaultdict
from contextlib import contextmanager
from io import UnsupportedOperation
from zipfile import ZIP_STORED, ZipFile

import numpy as np
import typing_extensions as tp

from static_frame.core.archive_zip import ZipFilePartRO, ZipFileRO
from static_frame.core.container_util import (
    ContainerMap,
    index_many_concat,
    index_many_to_one,
)
from static_frame.core.exception import (
    AxisInvalid,
    ErrorInitIndexNonUnique,
    ErrorNPYDecode,
    ErrorNPYEncode,
)
from static_frame.core.index import Index
from static_frame.core.index_base import IndexBase
from static_frame.core.index_datetime import dtype_to_index_cls
from static_frame.core.interface_meta import InterfaceMeta
from static_frame.core.metadata import NPYLabel
from static_frame.core.util import (
    DTYPE_OBJECT_KIND,
    JSONTranslator,
    ManyToOneType,
    TLabel,
    TName,
    TNDArrayIntDefault,
    TPathSpecifier,
    TPathSpecifierOrBinaryIO,
    TPathSpecifierOrIO,
    concat_resolved,
)

if tp.TYPE_CHECKING:
    from types import TracebackType

    import pandas as pd

    from static_frame.core.bus import Bus
    from static_frame.core.frame import Frame
    from static_frame.core.generic_aliases import TFrameAny
    from static_frame.core.yarn import Yarn

    TNDArrayAny = np.ndarray[tp.Any, tp.Any]
    TDtypeAny = np.dtype[tp.Any]
    HeaderType = tp.Tuple[TDtypeAny, bool, tp.Tuple[int, ...]]
    HeaderDecodeCacheType = tp.Dict[bytes, HeaderType]

# -------------------------------------------------------------------------------


TNDIterFlags = tp.Sequence[tp.Literal['external_loop', 'buffered', 'zerosize_ok']]


class NPYConverter:
    """Optimized implementation based on numpy/lib/format.py"""

    MAGIC_PREFIX = b'\x93NUMPY' + bytes((1, 0))  # version 1.0
    MAGIC_LEN = len(MAGIC_PREFIX)
    ARRAY_ALIGN = 64
    STRUCT_FMT = '<H'  # version 1.0, unsigned short
    STRUCT_FMT_SIZE = struct.calcsize(STRUCT_FMT)  # 2 bytes
    MAGIC_AND_STRUCT_FMT_SIZE_LEN = MAGIC_LEN + STRUCT_FMT_SIZE
    ENCODING = 'latin1'  # version 1.0
    BUFFERSIZE_NUMERATOR = 16 * 1024**2  # ~16 MB
    NDITER_FLAGS: TNDIterFlags = ('external_loop', 'buffered', 'zerosize_ok')

    @classmethod
    def _header_encode(cls, header: str) -> bytes:
        """
        Takes a string header, and attaches the prefix and padding to it.
        This is hard-coded to only use Version 1.0
        """
        center = header.encode(cls.ENCODING)
        hlen = len(center) + 1

        padlen = cls.ARRAY_ALIGN - (
            (cls.MAGIC_AND_STRUCT_FMT_SIZE_LEN + hlen) % cls.ARRAY_ALIGN
        )
        prefix = cls.MAGIC_PREFIX + struct.pack(cls.STRUCT_FMT, hlen + padlen)
        postfix = b' ' * padlen + b'\n'
        return prefix + center + postfix

    @classmethod
    def to_npy(cls, file: tp.IO[bytes], array: TNDArrayAny) -> None:
        """Write an NPY 1.0 file to the open, writeable, binary file given by ``file``. NPY 1.0 is used as structured arrays are not supported."""
        dtype = array.dtype
        if dtype.kind == DTYPE_OBJECT_KIND:
            preview = repr(array)
            raise ErrorNPYEncode(
                f'No support for object dtypes: {preview[:40]}{"..." if len(preview) > 40 else ""}'
            )
        if dtype.names is not None:
            raise ErrorNPYEncode('No support for structured arrays')
        if array.ndim == 0 or array.ndim > 2:
            raise ErrorNPYEncode('No support for ndim other than 1 and 2.')

        flags = array.flags
        fortran_order = flags.f_contiguous

        # NOTE: do not know how to create array with itmesize 0, assume array.itemsize > 0
        # NOTE: derived numpy configuration
        buffersize = max(cls.BUFFERSIZE_NUMERATOR // array.itemsize, 1)

        header = f'{{"descr":"{dtype.str}","fortran_order":{fortran_order},"shape":{array.shape}}}'
        file.write(cls._header_encode(header))

        # NOTE: this works but forces copying everything in memory
        # if flags.f_contiguous and not flags.c_contiguous:
        #     file.write(array.T.tobytes())
        # else:
        #     file.write(array.tobytes())

        # NOTE: this approach works with normal open files (but not zip files) and is not materially faster than using buffering
        # if isinstance(file, io.BufferedWriter):
        #     if fortran_order and not flags.c_contiguous:
        #         array.T.tofile(file)
        #     else:
        #         array.tofile(file)

        # NOTE: this might be made more efficient by creating an ArrayKit function that extracts bytes directly, avoiding creating an array for each chunk.
        if fortran_order and not flags.c_contiguous:
            for chunk in np.nditer(
                array,
                flags=cls.NDITER_FLAGS,
                buffersize=buffersize,
                order='F',
            ):
                file.write(chunk.tobytes('C'))  # type: ignore
        else:
            for chunk in np.nditer(
                array,
                flags=cls.NDITER_FLAGS,
                buffersize=buffersize,
                order='C',
            ):
                file.write(chunk.tobytes('C'))  # type: ignore

    @classmethod
    def _header_decode(
        cls,
        file: tp.IO[bytes],
        header_decode_cache: HeaderDecodeCacheType,
    ) -> HeaderType:
        """Extract and decode the header."""
        length_size = file.read(cls.STRUCT_FMT_SIZE)
        # unpack tuple of one element
        (length_header,) = struct.unpack(cls.STRUCT_FMT, length_size)
        header = file.read(length_header)
        if header not in header_decode_cache:
            # eval dict and strip values, relying on order
            dtype_str, fortran_order, shape = literal_eval(
                header.decode(cls.ENCODING)
            ).values()
            header_decode_cache[header] = np.dtype(dtype_str), fortran_order, shape
        return header_decode_cache[header]

    @classmethod
    def header_from_npy(
        cls,
        file: tp.IO[bytes],
        header_decode_cache: HeaderDecodeCacheType,
    ) -> HeaderType:
        """Utility method to just read the header."""
        if cls.MAGIC_PREFIX != file.read(cls.MAGIC_LEN):
            raise ErrorNPYDecode('Invalid NPY header found.')  # COV_MISSING
        return cls._header_decode(file, header_decode_cache)

    @classmethod
    def from_npy(
        cls,
        file: tp.IO[bytes],
        header_decode_cache: HeaderDecodeCacheType,
        memory_map: bool = False,
    ) -> tp.Tuple[TNDArrayAny, tp.Optional[mmap.mmap]]:
        """Read an NPY 1.0 file."""
        if cls.MAGIC_PREFIX != file.read(cls.MAGIC_LEN):
            raise ErrorNPYDecode('Invalid NPY header found.')

        dtype, fortran_order, shape = cls._header_decode(file, header_decode_cache)
        dtype_kind = dtype.kind
        if dtype_kind == DTYPE_OBJECT_KIND:
            raise ErrorNPYDecode('no support for object dtypes')

        ndim = len(shape)
        if ndim == 1:
            size = shape[0]
        elif ndim == 2:
            size = shape[0] * shape[1]
        else:
            raise ErrorNPYDecode(f'No support for {ndim}-dimensional arrays')

        if memory_map:
            # offset calculations derived from numpy/core/memmap.py
            offset_header = file.tell()
            byte_count = offset_header + size * dtype.itemsize
            # ALLOCATIONGRANULARITY is 4096 on linux, if offset_header is 64 (or less than 4096), this will set offset_mmap to zero
            offset_mmap = offset_header - offset_header % mmap.ALLOCATIONGRANULARITY
            byte_count -= offset_mmap
            offset_array = offset_header - offset_mmap
            mm = mmap.mmap(
                file.fileno(),
                byte_count,
                access=mmap.ACCESS_READ,
                offset=offset_mmap,
            )
            # will always be immutable
            array: TNDArrayAny = np.ndarray(
                shape,
                dtype=dtype,
                buffer=mm,
                offset=offset_array,
                order='F' if fortran_order else 'C',
            )
            # assert not array.flags.writeable
            return array, mm

        if dtype_kind == 'M' or dtype_kind == 'm' or file.__class__ is not ZipFilePartRO:  # type: ignore
            # NOTE: produces a read-only view on the existing data
            array = np.frombuffer(file.read(), dtype=dtype)
        else:
            # NOTE: using readinto shown to be faster than frombuffer, particularly in the context of tall Frames
            array = np.empty(size, dtype=dtype)
            file.readinto(array.data)  # type: ignore
            array.flags.writeable = False

        if fortran_order and ndim == 2:
            array.shape = (shape[1], shape[0])
            array = array.transpose()
        else:
            array.shape = shape
        # assert not array.flags.writeable
        return array, None


# -------------------------------------------------------------------------------
class Archive:
    """Abstraction of a read/write archive, such as a directory or a zip archive. Holds state over the life of writing / reading a Frame."""

    FILE_META = '__meta__.json'

    __slots__ = (
        '_memory_map',
        '_archive',
        '_closable',
        '_header_decode_cache',
    )

    _memory_map: bool
    _header_decode_cache: HeaderDecodeCacheType
    _archive: tp.Any  # defined below tp.Union[ZipFile, ZipFileRO, TPathSpecifier]

    # set per subclass
    FUNC_REMOVE_FP: tp.Callable[..., None]

    def __init__(
        self,
        fp: TPathSpecifierOrIO,
        writeable: bool,
        memory_map: bool,
    ):
        raise NotImplementedError()  # pragma: no cover

    def __del__(self) -> None:
        pass

    def __contains__(
        self,
        name: str,
        /,
    ) -> bool:
        raise NotImplementedError()  # pragma: no cover

    def labels(self) -> tp.Iterator[str]:
        raise NotImplementedError()  # pragma: no cover

    def write_array(self, name: str, array: TNDArrayAny) -> None:
        raise NotImplementedError()  # pragma: no cover

    def read_array(self, name: str) -> TNDArrayAny:
        raise NotImplementedError()  # pragma: no cover

    def read_array_header(self, name: str) -> HeaderType:
        raise NotImplementedError()  # pragma: no cover

    def size_array(self, name: str) -> int:
        raise NotImplementedError()  # pragma: no cover

    def write_metadata(self, content: tp.Any) -> None:
        raise NotImplementedError()  # pragma: no cover

    def read_metadata(self) -> tp.Any:
        raise NotImplementedError()  # pragma: no cover

    def size_metadata(self) -> int:
        raise NotImplementedError()  # pragma: no cover

    def close(self) -> None:
        for f in getattr(self, '_closable', ()):
            f.close()


class ArchiveZip(Archive):
    """Archives based on a new ZipFile per Frame; ZipFile creation happens on __init__."""

    __slots__ = ()

    _archive: tp.Union[ZipFile, ZipFileRO]

    FUNC_REMOVE_FP = os.remove

    def __init__(
        self,
        fp: TPathSpecifierOrBinaryIO,
        writeable: bool,
        memory_map: bool,
    ):
        if writeable:
            self._archive = ZipFile(
                fp,
                mode='w',
                compression=ZIP_STORED,
                allowZip64=True,
            )
        else:
            self._archive = ZipFileRO(fp)
            self._header_decode_cache = {}

        if memory_map:
            raise RuntimeError(f'Cannot memory_map with {self}')

        self._memory_map = memory_map

    def __del__(self) -> None:
        # Note: If the fp we were given didn't exist, _archive also doesn't exist.
        archive = getattr(self, '_archive', None)
        if archive:
            archive.close()

    def __contains__(
        self,
        name: str,
        /,
    ) -> bool:
        try:
            self._archive.getinfo(name)
        except KeyError:
            return False
        return True

    def labels(self) -> tp.Iterator[str]:
        yield from self._archive.namelist()

    def write_array(self, name: str, array: TNDArrayAny) -> None:
        # NOTE: zip only has 'w' mode, not 'wb'
        # NOTE: force_zip64 required for large files
        f = self._archive.open(name, 'w', force_zip64=True)  # type: ignore
        try:
            NPYConverter.to_npy(f, array)
        finally:
            f.close()

    def read_array(self, name: str) -> TNDArrayAny:
        f = self._archive.open(name)
        try:
            array, _ = NPYConverter.from_npy(f, self._header_decode_cache)
        finally:
            f.close()
        array.flags.writeable = False
        return array

    def read_array_header(self, name: str) -> HeaderType:
        """Alternate reader for status displays."""
        f = self._archive.open(name)
        try:
            header = NPYConverter.header_from_npy(f, self._header_decode_cache)
        finally:
            f.close()
        return header

    def size_array(self, name: str) -> int:
        return self._archive.getinfo(name).file_size

    def write_metadata(self, content: tp.Any) -> None:
        # writestr is a method on the ZipFile
        self._archive.writestr(
            self.FILE_META,
            json.dumps(content),
        )

    def read_metadata(self) -> tp.Any:
        return json.loads(self._archive.read(self.FILE_META))

    def size_metadata(self) -> int:
        return self._archive.getinfo(self.FILE_META).file_size


class ArchiveDirectory(Archive):
    """Archive interface to a directory, where the directory is created on write and NPY files are authored into the file system."""

    __slots__ = ()

    _archive: TPathSpecifier
    FUNC_REMOVE_FP = shutil.rmtree

    def __init__(
        self,
        fp: TPathSpecifier,
        writeable: bool,
        memory_map: bool,
    ):
        if writeable:
            # because an error in writing will remove the entire directory, we requires the directory to be newly created
            if os.path.exists(fp):
                raise RuntimeError(f'Atttempting to write to an existant directory: {fp}')
            os.mkdir(fp)
        else:
            if not os.path.exists(fp):
                raise RuntimeError(
                    f'Atttempting to read from a non-existant directory: {fp}'
                )
            if not os.path.isdir(fp):
                raise RuntimeError(f'A directory must be provided, not {fp}')
            self._header_decode_cache = {}

        self._archive = fp
        self._memory_map = memory_map

    def labels(self) -> tp.Iterator[str]:
        # NOTE: should this filter?
        yield from (f.name for f in os.scandir(self._archive))  # type: ignore

    def __contains__(
        self,
        name: str,
        /,
    ) -> bool:
        fp = os.path.join(self._archive, name)
        return os.path.exists(fp)

    def write_array(self, name: str, array: TNDArrayAny) -> None:
        fp = os.path.join(self._archive, name)
        f = open(fp, 'wb')
        try:
            NPYConverter.to_npy(f, array)
        finally:
            f.close()

    def read_array(self, name: str) -> TNDArrayAny:
        fp = os.path.join(self._archive, name)
        if self._memory_map:
            if not hasattr(self, '_closable'):
                self._closable = []

            f = open(fp, 'rb')
            try:
                array, mm = NPYConverter.from_npy(
                    f,
                    self._header_decode_cache,
                    self._memory_map,
                )
            finally:
                f.close()  # NOTE: can close the file after creating memory map
            # self._closable.append(f)
            self._closable.append(mm)
            return array

        f = open(fp, 'rb')
        try:
            array, _ = NPYConverter.from_npy(
                f,
                self._header_decode_cache,
                self._memory_map,
            )
        finally:
            f.close()
        return array

    def read_array_header(self, name: str) -> HeaderType:
        """Alternate reader for status displays."""
        fp = os.path.join(self._archive, name)
        f = open(fp, 'rb')
        try:
            header = NPYConverter.header_from_npy(f, self._header_decode_cache)
        finally:
            f.close()
        return header

    def size_array(self, name: str) -> int:
        fp = os.path.join(self._archive, name)
        return os.path.getsize(fp)

    def write_metadata(self, content: tp.Any) -> None:
        fp = os.path.join(self._archive, self.FILE_META)
        f = open(fp, 'w', encoding='utf-8')
        try:
            f.write(json.dumps(content))
        finally:
            f.close()

    def read_metadata(self) -> tp.Any:
        fp = os.path.join(self._archive, self.FILE_META)
        f = open(fp, 'r', encoding='utf-8')
        try:
            post = json.loads(f.read())
        finally:
            f.close()
        return post

    def size_metadata(self) -> int:
        fp = os.path.join(self._archive, self.FILE_META)
        return os.path.getsize(fp)


class ArchiveZipWrapper(Archive):
    """Archive based on a shared (and already open/created) ZipFile."""

    __slots__ = ('prefix', '_delimiter')

    _archive: ZipFile

    def __init__(
        self,
        zf: ZipFile,
        writeable: bool,
        memory_map: bool,
        delimiter: str,
    ):
        self._archive = zf
        self.prefix = ''  # must be directly set by clients
        self._delimiter = delimiter

        if not writeable:
            self._header_decode_cache = {}
        if memory_map:
            raise RuntimeError(f'Cannot memory_map with {self}')
        self._memory_map = memory_map

    def labels(self) -> tp.Iterator[str]:
        """Only return unique outer-directory labels, not all contents (NPY) in the file. These labels are exclusively string (they are added post processing with label_encoding)."""
        dir_last = ''  # dir name cannot be an empty sting
        for name in self._archive.namelist():
            # split on the last observed separator
            if name.endswith(self._delimiter):
                continue  # pragma: no cover
            dir_current, _ = name.rsplit(self._delimiter, maxsplit=1)
            if dir_current != dir_last:
                dir_last = dir_current
                yield dir_current

    def __del__(self) -> None:
        # let the creator of the zip perform any cleanup
        pass

    def __contains__(
        self,
        name: str,
        /,
    ) -> bool:
        name = f'{self.prefix}{self._delimiter}{name}'
        try:
            self._archive.getinfo(name)
        except KeyError:
            return False
        return True

    def write_array(self, name: str, array: TNDArrayAny) -> None:
        # NOTE: force_zip64 required for large files
        name = f'{self.prefix}{self._delimiter}{name}'
        f = self._archive.open(name, 'w', force_zip64=True)
        try:
            NPYConverter.to_npy(f, array)
        finally:
            f.close()

    def read_array(self, name: str) -> TNDArrayAny:
        name = f'{self.prefix}{self._delimiter}{name}'
        f = self._archive.open(name)
        try:
            array, _ = NPYConverter.from_npy(f, self._header_decode_cache)
        finally:
            f.close()
        array.flags.writeable = False
        return array

    def read_array_header(self, name: str) -> HeaderType:
        """Alternate reader for status displays."""
        name = f'{self.prefix}{self._delimiter}{name}'
        f = self._archive.open(name)
        try:
            header = NPYConverter.header_from_npy(f, self._header_decode_cache)
        finally:
            f.close()
        return header

    def size_array(self, name: str) -> int:
        name = f'{self.prefix}{self._delimiter}{name}'
        return self._archive.getinfo(name).file_size

    def write_metadata(self, content: tp.Any) -> None:
        name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
        self._archive.writestr(
            name,
            json.dumps(content),
        )

    def read_metadata(self) -> tp.Any:
        name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
        return json.loads(self._archive.read(name))

    def size_metadata(self) -> int:
        name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
        return self._archive.getinfo(name).file_size


# -------------------------------------------------------------------------------


class ArchiveIndexConverter:
    """Utility methods for converting Index or index components."""

    @staticmethod
    def index_encode(
        *,
        metadata: tp.Dict[str, TLabel],
        archive: Archive,
        index: 'IndexBase',
        key_template_values: str,
        key_types: str,
        depth: int,
        include: bool,
    ) -> None:
        """
        Args:
            metadata: mutates in place with json components for class names of index types.
        """
        if depth == 1 and index._map is None:  # type: ignore
            pass  # do not store anything
        elif include:
            if depth == 1:
                archive.write_array(key_template_values.format(0), index.values)
            else:
                for i in range(depth):
                    archive.write_array(
                        key_template_values.format(i), index.values_at_depth(i)
                    )
                metadata[key_types] = [cls.__name__ for cls in index.index_types.values]  # type: ignore

    @staticmethod
    def array_encode(
        *,
        metadata: tp.Dict[str, TLabel],
        archive: Archive,
        array: TNDArrayAny,
        key_template_values: str,
    ) -> None:
        """
        Args:
            metadata: mutates in place with json components
        """
        assert array.ndim == 1
        archive.write_array(key_template_values.format(0), array)

    @staticmethod
    def index_decode(
        *,
        archive: Archive,
        metadata: tp.Dict[str, tp.Any],
        key_template_values: str,
        key_types: str,  # which key to fetch IH component types
        depth: int,
        cls_index: tp.Type['IndexBase'],
        name: TName,
    ) -> tp.Optional['IndexBase']:
        """Build index or columns."""
        from static_frame.core.type_blocks import TypeBlocks

        if key_template_values.format(0) not in archive:
            index = None
        elif depth == 1:
            index = cls_index(
                archive.read_array(key_template_values.format(0)),
                name=name,
            )
        else:
            index_tb = TypeBlocks.from_blocks(
                archive.read_array(key_template_values.format(i)) for i in range(depth)
            )
            index_constructors = [
                ContainerMap.str_to_cls(name) for name in metadata[key_types]
            ]
            index = cls_index._from_type_blocks(  # type: ignore
                index_tb,
                name=name,
                index_constructors=index_constructors,
            )
        return index


class ArchiveFrameConverter:
    _ARCHIVE_CLS: tp.Type[Archive]

    @staticmethod
    def frame_encode(
        *,
        archive: Archive,
        frame: TFrameAny,
        include_index: bool = True,
        include_columns: bool = True,
        consolidate_blocks: bool = False,
    ) -> None:
        metadata: tp.Dict[str, tp.Any] = {}

        # NOTE: isolate custom pre-json encoding only where needed: on `name` attributes; the name might be nested tuples, so we cannot assume that name is just a string
        metadata[NPYLabel.KEY_NAMES] = [
            JSONTranslator.encode_element(frame._name),
            JSONTranslator.encode_element(frame._index._name),
            JSONTranslator.encode_element(frame._columns._name),
        ]
        # do not store Frame class as caller will determine
        metadata[NPYLabel.KEY_TYPES] = [
            frame._index.__class__.__name__,
            frame._columns.__class__.__name__,
        ]

        # store shape, index depths
        depth_index = frame._index.depth
        depth_columns = frame._columns.depth

        if consolidate_blocks:
            # NOTE: by taking iter, can avoid 2x memory in some circumstances
            block_iter = frame._blocks._reblock()
        else:
            block_iter = iter(frame._blocks._blocks)

        ArchiveIndexConverter.index_encode(
            metadata=metadata,
            archive=archive,
            index=frame._index,
            key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_INDEX,
            key_types=NPYLabel.KEY_TYPES_INDEX,
            depth=depth_index,
            include=include_index,
        )
        ArchiveIndexConverter.index_encode(
            metadata=metadata,
            archive=archive,
            index=frame._columns,
            key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_COLUMNS,
            key_types=NPYLabel.KEY_TYPES_COLUMNS,
            depth=depth_columns,
            include=include_columns,
        )
        i = 0
        for i, array in enumerate(block_iter, 1):
            archive.write_array(NPYLabel.FILE_TEMPLATE_BLOCKS.format(i - 1), array)

        metadata[NPYLabel.KEY_DEPTHS] = [
            i,  # block count
            depth_index,
            depth_columns,
        ]

        archive.write_metadata(metadata)

    @classmethod
    def to_archive(
        cls,
        *,
        frame: TFrameAny,
        fp: TPathSpecifierOrIO,
        include_index: bool = True,
        include_columns: bool = True,
        consolidate_blocks: bool = False,
    ) -> None:
        """
        Write a :obj:`Frame` as an npz file.
        """
        archive = cls._ARCHIVE_CLS(
            fp,
            writeable=True,
            memory_map=False,
        )
        try:
            cls.frame_encode(
                archive=archive,
                frame=frame,
                include_index=include_index,
                include_columns=include_columns,
                consolidate_blocks=consolidate_blocks,
            )
        except ErrorNPYEncode:
            archive.close()
            archive.__del__()  # force cleanup
            # fp can be BytesIO in a to_npz/to_zip_npz scenario
            if not isinstance(fp, io.IOBase) and os.path.exists(fp):  # type: ignore[arg-type]
                cls._ARCHIVE_CLS.FUNC_REMOVE_FP(fp)
            raise

    @classmethod
    def frame_decode(
        cls,
        *,
        archive: Archive,
        constructor: tp.Type[TFrameAny],
    ) -> TFrameAny:
        """
        Create a :obj:`Frame` from an npz file.
        """
        from static_frame.core.type_blocks import TypeBlocks

        metadata = archive.read_metadata()

        # NOTE: we isolate custom post-JSON decoding to only where it is needed: the name attributes. JSON will bring back tuple `name` attributes as lists; these must be converted to tuples to be hashable. Alternatives (like storing repr and using literal_eval) are slower than JSON.
        names = metadata[NPYLabel.KEY_NAMES]

        name = JSONTranslator.decode_element(names[0])
        name_index = JSONTranslator.decode_element(names[1])
        name_columns = JSONTranslator.decode_element(names[2])

        block_count, depth_index, depth_columns = metadata[NPYLabel.KEY_DEPTHS]

        cls_index: tp.Type[IndexBase]
        cls_columns: tp.Type[IndexBase]
        cls_index, cls_columns = (  # type: ignore
            ContainerMap.str_to_cls(name) for name in metadata[NPYLabel.KEY_TYPES]
        )

        index = ArchiveIndexConverter.index_decode(
            archive=archive,
            metadata=metadata,
            key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_INDEX,
            key_types=NPYLabel.KEY_TYPES_INDEX,
            depth=depth_index,
            cls_index=cls_index,
            name=name_index,
        )

        # we need to align the mutability of the constructor with the Index type on the columns
        if constructor.STATIC != cls_columns.STATIC:
            if constructor.STATIC:
                cls_columns = cls_columns._IMMUTABLE_CONSTRUCTOR  # type: ignore
            else:
                cls_columns = cls_columns._MUTABLE_CONSTRUCTOR  # type: ignore

        columns = ArchiveIndexConverter.index_decode(
            archive=archive,
            metadata=metadata,
            key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_COLUMNS,
            key_types=NPYLabel.KEY_TYPES_COLUMNS,
            depth=depth_columns,
            cls_index=cls_columns,
            name=name_columns,
        )

        if block_count:
            tb = TypeBlocks.from_blocks(
                archive.read_array(NPYLabel.FILE_TEMPLATE_BLOCKS.format(i))
                for i in range(block_count)
            )
        else:
            tb = TypeBlocks.from_zero_size_shape()

        f = constructor(
            tb,
            own_data=True,
            index=index,
            own_index=False if index is None else True,
            columns=columns,
            own_columns=False if columns is None else True,
            name=name,
        )
        return f

    @classmethod
    def from_archive(
        cls,
        *,
        constructor: tp.Type[TFrameAny],
        fp: TPathSpecifierOrIO,
    ) -> TFrameAny:
        """
        Create a :obj:`Frame` from an npz file.
        """
        archive = cls._ARCHIVE_CLS(
            fp,
            writeable=False,
            memory_map=False,
        )
        f = cls.frame_decode(
            archive=archive,
            constructor=constructor,
        )
        return f

    @classmethod
    def from_archive_mmap(
        cls,
        *,
        constructor: tp.Type[TFrameAny],
        fp: TPathSpecifier,
    ) -> tp.Tuple[TFrameAny, tp.Callable[[], None]]:
        """
        Create a :obj:`Frame` from an npz file.
        """
        archive = cls._ARCHIVE_CLS(
            fp,
            writeable=False,
            memory_map=True,
        )
        f = cls.frame_decode(
            archive=archive,
            constructor=constructor,
        )
        return f, archive.close


class NPZFrameConverter(ArchiveFrameConverter):
    _ARCHIVE_CLS = ArchiveZip


class NPYFrameConverter(ArchiveFrameConverter):
    _ARCHIVE_CLS = ArchiveDirectory


# -------------------------------------------------------------------------------
# for converting from components, unstructured Frames


class ArchiveComponentsConverter(metaclass=InterfaceMeta):
    """
    A family of methods to write NPY/NPZ from things other than a Frame, or multi-frame collections like a Bus/Yarn/Quilt but with the intention of production a consolidate Frame, not just a zip of Frames.
    """

    _ARCHIVE_CLS: tp.Type[Archive]

    __slots__ = (
        '_archive',
        '_writeable',
    )

    def __init__(self, fp: TPathSpecifier, mode: str = 'r') -> None:
        if mode == 'w':
            writeable = True
        elif mode == 'r':
            writeable = False
        else:
            raise RuntimeError('Invalid value for mode; use "w" or "r"')

        self._writeable = writeable
        self._archive = self._ARCHIVE_CLS(
            fp,
            writeable=self._writeable,
            memory_map=False,
        )

    def __enter__(self) -> 'ArchiveComponentsConverter':
        """When entering a context manager, a handle to this instance is returned."""
        return self

    def __exit__(
        self,
        type: tp.Type[BaseException],
        value: BaseException,
        traceback: TracebackType,
    ) -> None:
        """When exiting a context manager, resources are closed as necessary."""
        self._archive.close()
        self._archive.__del__()  # force closing resources

    @property
    def contents(self) -> TFrameAny:
        """
        Return a :obj:`Frame` indicating name, dtype, shape, and bytes, of Archive components.
        """
        if self._writeable:
            raise UnsupportedOperation('Open with mode "r" to get contents.')

        from static_frame.core.frame import Frame

        def gen() -> tp.Iterator[tp.Tuple[tp.Any, ...]]:
            # metadata is in labels; sort by ext,ension first to put at top
            for name in sorted(
                self._archive.labels(), key=lambda fn: tuple(reversed(fn.split('.')))
            ):
                # NOTE: will not work with ArchiveZipWrapper
                if name == self._archive.FILE_META:
                    yield (name, self._archive.size_metadata()) + ('', '', '')
                else:
                    header = self._archive.read_array_header(name)
                    yield (name, self._archive.size_array(name)) + header

        f: TFrameAny = Frame.from_records(
            gen(),
            columns=('name', 'size', 'dtype', 'fortran', 'shape'),
            name=str(self._archive._archive),
        )
        return f.set_index('name', drop=True)

    @property
    def nbytes(self) -> int:
        """
        Return numer of bytes stored in this archive.
        """
        if self._writeable:
            raise UnsupportedOperation('Open with mode "r" to get nbytes.')

        def gen() -> tp.Iterator[int]:
            # metadata is in labels; sort by extension first to put at top
            for name in self._archive.labels():
                # NOTE: will not work with ArchiveZipWrapper
                if name == self._archive.FILE_META:
                    yield self._archive.size_metadata()
                else:
                    yield self._archive.size_array(name)

        return sum(gen())

    def from_arrays(
        self,
        blocks: tp.Iterable[TNDArrayAny],
        *,
        index: TNDArrayAny | IndexBase | None = None,
        columns: TNDArrayAny | IndexBase | None = None,
        name: TName = None,
        axis: int = 0,
    ) -> None:
        """
        Given an iterable of arrays, write out an NPZ or NPY directly, without building up intermediary :obj:`Frame`. If axis 0, the arrays are vertically stacked; if axis 1, they are horizontally stacked. For both axis, if included, indices must be of appropriate length.

        Args:
            blocks:
            *
            index: An array, :obj:`Index`, or :obj:`IndexHierarchy`.
            columns: An array, :obj:`Index`, or :obj:`IndexHierarchy`.
            name:
            axis:
        """
        if not self._writeable:
            raise UnsupportedOperation('Open with mode "w" to write.')

        metadata: tp.Dict[str, tp.Any] = {}

        if isinstance(index, IndexBase):
            depth_index = index.depth
            name_index = index.name
            cls_index = index.__class__
            ArchiveIndexConverter.index_encode(
                metadata=metadata,
                archive=self._archive,
                index=index,
                key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_INDEX,
                key_types=NPYLabel.KEY_TYPES_INDEX,
                depth=depth_index,
                include=True,
            )
        elif index is not None:
            if index.__class__ is not np.ndarray:
                raise RuntimeError(
                    'index argument must be an Index, IndexHierarchy, or 1D np.ndarray'
                )

            depth_index = 1
            name_index = None
            cls_index = dtype_to_index_cls(True, index.dtype)
            ArchiveIndexConverter.array_encode(
                metadata=metadata,
                archive=self._archive,
                array=index,
                key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_INDEX,
            )
        else:
            depth_index = 1
            name_index = None
            cls_index = Index

        if isinstance(columns, IndexBase):
            depth_columns = columns.depth
            name_columns = columns.name
            cls_columns = columns.__class__
            ArchiveIndexConverter.index_encode(
                metadata=metadata,
                archive=self._archive,
                index=columns,
                key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_COLUMNS,
                key_types=NPYLabel.KEY_TYPES_COLUMNS,
                depth=depth_columns,
                include=True,
            )
        elif columns is not None:
            if columns.__class__ is not np.ndarray:
                raise RuntimeError(
                    'columns argument must be an Index, IndexHierarchy, or 1D np.ndarray'
                )

            depth_columns = 1  # only support 1D
            name_columns = None
            cls_columns = dtype_to_index_cls(True, columns.dtype)
            ArchiveIndexConverter.array_encode(
                metadata=metadata,
                archive=self._archive,
                array=columns,
                key_template_values=NPYLabel.FILE_TEMPLATE_VALUES_COLUMNS,
            )
        else:
            depth_columns = 1  # only support 1D
            name_columns = None
            cls_columns = Index

        metadata[NPYLabel.KEY_NAMES] = [
            name,
            name_index,
            name_columns,
        ]
        # do not store Frame class as caller will determine
        metadata[NPYLabel.KEY_TYPES] = [
            cls_index.__name__,
            cls_columns.__name__,
        ]

        if axis == 1:
            rows = 0
            for i, array in enumerate(blocks):
                if not rows:
                    rows = array.shape[0]
                else:
                    if array.shape[0] != rows:
                        raise RuntimeError('incompatible block shapes')
                self._archive.write_array(NPYLabel.FILE_TEMPLATE_BLOCKS.format(i), array)
        elif axis == 0:
            # for now, just vertically concat and write, though this has a 2X memory requirement
            resolved = concat_resolved(blocks, axis=0)
            # if this results in an obect array, an exception will be raised
            self._archive.write_array(NPYLabel.FILE_TEMPLATE_BLOCKS.format(0), resolved)
            i = 0
        else:
            raise AxisInvalid(f'invalid axis {axis}')

        metadata[NPYLabel.KEY_DEPTHS] = [
            i + 1,  # block count
            depth_index,
            depth_columns,
        ]
        self._archive.write_metadata(metadata)

    def from_frames(
        self,
        frames: tp.Iterable[TFrameAny],
        *,
        include_index: bool = True,
        include_columns: bool = True,
        axis: int = 0,
        union: bool = True,
        name: TName = None,
        fill_value: object = np.nan,
    ) -> None:
        """Given an iterable of Frames, write out an NPZ or NPY directly, without building up an intermediary Frame. If axis 0, the Frames must be block compatible; if axis 1, the Frames must have the same number of rows. For both axis, if included, concatenated indices must be unique or aligned.

        Args:
            frames:
            *
            include_index:
            include_columns:
            axis:
            union:
            name:
            fill_value:

        """
        if not self._writeable:
            raise UnsupportedOperation('Open with mode "w" to write.')

        from static_frame.core.frame import Frame
        from static_frame.core.type_blocks import TypeBlocks

        frames = [f if isinstance(f, Frame) else f.to_frame(axis) for f in frames]
        index: tp.Optional[IndexBase]

        # NOTE: based on Frame.from_concat
        if axis == 1:  # stacks columns (extends rows horizontally)
            if include_columns:
                try:
                    columns = index_many_concat(
                        (f._columns for f in frames),
                        Index,
                    )
                except ErrorInitIndexNonUnique:
                    raise RuntimeError(
                        'Column names after horizontal concatenation are not unique; set include_columns to None to ignore.'
                    ) from None
            else:
                columns = None

            if include_index:
                index = index_many_to_one(
                    (f._index for f in frames),
                    Index,
                    many_to_one_type=ManyToOneType.UNION
                    if union
                    else ManyToOneType.INTERSECT,
                )
            else:
                raise RuntimeError('Must include index for horizontal alignment.')

            def blocks() -> tp.Iterator[TNDArrayAny]:
                for f in frames:
                    if len(f.index) != len(index) or (f.index != index).any():  # type: ignore
                        f = f.reindex(index=index, fill_value=fill_value)
                    for block in f._blocks._blocks:
                        yield block

        elif axis == 0:  # stacks rows (extends columns vertically)
            if include_index:
                try:
                    index = index_many_concat((f._index for f in frames), Index)
                except ErrorInitIndexNonUnique:
                    raise RuntimeError(
                        'Index names after vertical concatenation are not unique; set include_index to None to ignore'
                    ) from None
            else:
                index = None

            if include_columns:
                columns = index_many_to_one(
                    (f._columns for f in frames),
                    Index,
                    many_to_one_type=ManyToOneType.UNION
                    if union
                    else ManyToOneType.INTERSECT,
                )
            else:
                raise RuntimeError('Must include columns for vertical alignment.')

            def blocks() -> tp.Iterator[TNDArrayAny]:
                type_blocks = []
                previous_f: tp.Optional[TFrameAny] = None
                block_compatible = True
                reblock_compatible = True

                for f in frames:
                    if len(f.columns) != len(columns) or (f.columns != columns).any():  # pyright: ignore
                        f = f.reindex(columns=columns, fill_value=fill_value)

                    type_blocks.append(f._blocks)
                    # column size is all the same by this point
                    if previous_f is not None:  # after the first
                        if block_compatible:
                            block_compatible &= f._blocks.block_compatible(
                                previous_f._blocks, axis=1
                            )  # only compare columns
                        if reblock_compatible:
                            reblock_compatible &= f._blocks.reblock_compatible(
                                previous_f._blocks
                            )
                    previous_f = f

                yield from TypeBlocks.vstack_blocks_to_blocks(
                    type_blocks=type_blocks,
                    block_compatible=block_compatible,
                    reblock_compatible=reblock_compatible,
                )
        else:
            raise AxisInvalid(f'no support for {axis}')

        self.from_arrays(
            blocks=blocks(),
            index=index,
            columns=columns,
            name=name,
            axis=1,  # blocks are normalized for horizontal concat
        )


[docs] class NPZ(ArchiveComponentsConverter): """Utility object for reading characteristics from, or writing new, NPZ files from arrays or :obj:`Frame`.""" _ARCHIVE_CLS = ArchiveZip
[docs] class NPY(ArchiveComponentsConverter): """Utility object for reading characteristics from, or writing new, NPY directories from arrays or :obj:`Frame`.""" _ARCHIVE_CLS = ArchiveDirectory
# ------------------------------------------------------------------------------- class ZipCache: def __init__(self) -> None: self._cache: dict[str, ZipFile] = {} def __enter__(self) -> tp.Self: return self def __exit__(self, *args: tp.Any) -> None: for zf in self._cache.values(): zf.close() self._cache.clear() def get(self, path: str) -> ZipFile: if path not in self._cache: self._cache[path] = ZipFile(path) return self._cache[path] class ArchiveManifest: @staticmethod def _from_yarn( fp: TPathSpecifier, container: Yarn, *, label_encoder: tp.Callable[[TLabel], str] | None = None, ) -> None: """ Args: label_encoder: labels defined in the passed container must be strings or use a label_encoder to writing out `Frame` in the Manifest. """ from static_frame.core.store_zip import StoreZipNPY, StoreZipNPZ # this might only be needed for NPYs bus_fp_f_label_to_files: defaultdict[str, defaultdict[str, list[str]]] = ( defaultdict(lambda: defaultdict(list)) ) def map_zf(fp: str, zf: ZipFile) -> None: if fp not in bus_fp_f_label_to_files: for name in zf.namelist(): outer, inner = name.split(StoreZipNPY._DELIMITER) bus_fp_f_label_to_files[fp][outer].append(inner) def prepare_label(label: TLabel) -> str: if label_encoder: return label_encoder(label) if not isinstance(label, str): raise RuntimeError( 'Must provide a label_encoder to convert contained `Frame` names to strings' ) return label with ZipCache() as zcache: for idx_indexer, f_yarn_label in enumerate(container.index): # one dir will be created per frame f_dir_out = os.path.join(fp, prepare_label(f_yarn_label)) idx_h: TNDArrayIntDefault | int = container._indexer[idx_indexer] # the pos of the bus, and the label of the Frame in that bus idx_bus, f_bus_label = container._hierarchy.iloc[idx_h] bus: Bus = container._values[idx_bus] # type: ignore # if bus is a zip npz or zip npy, can copy store = bus._store f_target: str if isinstance(store, StoreZipNPY): # in an ZIP NPY, each file is bundled with outer/inner, where outer is the encoded Frame name and inner is the standardized NPY component name os.makedirs(f_dir_out, exist_ok=True) zf = zcache.get(store._fp) map_zf(store._fp, zf) f_target = store._config.default.label_encode(f_bus_label) # use mapping to avoid iterating over all names; we may not pull out all frame contained in the zip NPY for inner in bus_fp_f_label_to_files[store._fp][f_target]: fp_out = os.path.join(f_dir_out, inner) with open(fp_out, 'wb') as f: f.write(zf.read(f'{f_target}{StoreZipNPY._DELIMITER}{inner}')) elif isinstance(store, StoreZipNPZ): # has .NPZ files contained, open and write out files os.makedirs(f_dir_out, exist_ok=True) zf = zcache.get(store._fp) # get .npz name f_target = ( store._config.default.label_encode(f_bus_label) + store._EXT_CONTAINED ) with ZipFileRO(zf.open(f_target)) as zfnpz: for inner in zfnpz.namelist(): fp_out = os.path.join(f_dir_out, inner) with open(fp_out, 'wb') as f: f.write(zfnpz.read(inner)) else: # must load Frame in memory and write out frame: Frame = bus._extract_loc(f_bus_label) frame.to_npy(f_dir_out) @staticmethod def _from_bus( fp: TPathSpecifier, container: Bus, *, label_encoder: tp.Callable[[TLabel], str] | None = None, ) -> None: # we do not need a label encoder as we only have "native" bus labels from static_frame.core.store import Store from static_frame.core.store_zip import StoreZipNPY, StoreZipNPZ # this might only be needed for NPYs f_label_to_files: defaultdict[str, list[str]] = defaultdict(list) def map_zf(zf: ZipFile) -> None: if not f_label_to_files: for name in zf.namelist(): outer, inner = name.split(StoreZipNPY._DELIMITER) f_label_to_files[outer].append(inner) def prepare_label(label: TLabel) -> str: if label_encoder: return label_encoder(label) if not isinstance(label, str): raise RuntimeError( 'Must provide a label_encoder to convert contained `Frame` names to strings' ) return label store = container._store f_target: str with ZipCache() as zcache: for f_bus_label in container._index: if isinstance(store, StoreZipNPY): f_target = store._config.default.label_encode(f_bus_label) f_dir_out = os.path.join(fp, f_target) os.makedirs(f_dir_out, exist_ok=True) zf = zcache.get(store._fp) map_zf(zf) for inner in f_label_to_files[f_target]: fp_out = os.path.join(f_dir_out, inner) with open(fp_out, 'wb') as f: f.write(zf.read(f'{f_target}{StoreZipNPY._DELIMITER}{inner}')) elif isinstance(store, StoreZipNPZ): f_encoded_label = store._config.default.label_encode(f_bus_label) f_target = f_encoded_label + store._EXT_CONTAINED f_dir_out = os.path.join(fp, f_encoded_label) os.makedirs(f_dir_out, exist_ok=True) zf = zcache.get(store._fp) # f_target will only be opened once with ZipFileRO(zf.open(f_target)) as zfnpz: for inner in zfnpz.namelist(): fp_out = os.path.join(f_dir_out, inner) with open(fp_out, 'wb') as f: f.write(zfnpz.read(inner)) else: # must load Frame in memory and write out if store is not None and isinstance( store, Store ): # not StoreManifest f_target = store._config.default.label_encode(f_bus_label) else: f_target = prepare_label(f_bus_label) f_dir_out = os.path.join(fp, f_target) frame: Frame = container._extract_loc(f_bus_label) frame.to_npy(f_dir_out) @classmethod def to_manifest( cls, fp: TPathSpecifier, container: Bus | Yarn, /, *, label_encoder: tp.Callable[[TLabel], str] | None = None, ) -> None: """ Args: label_encoder: labels defined in the passed container must be strings or use a label_encoder to writing out `Frame` in the Manifest. """ if not os.path.exists(fp): os.makedirs(fp) elif not os.path.isdir(fp): raise RuntimeError(f'Provided path {fp} must be a directory.') from static_frame.core.bus import Bus from static_frame.core.yarn import Yarn if isinstance(container, Yarn): return cls._from_yarn(fp, container, label_encoder=label_encoder) if isinstance(container, Bus): return cls._from_bus(fp, container, label_encoder=label_encoder) raise NotImplementedError(f'{container.__class__} not supported.')