import json
import mmap
import os
import shutil
import struct
import typing as tp
from ast import literal_eval
from io import BytesIO
from io import UnsupportedOperation
from types import TracebackType
from zipfile import ZIP_STORED
from zipfile import ZipFile
import numpy as np
from static_frame.core.container_util import ContainerMap
from static_frame.core.container_util import index_many_concat
from static_frame.core.container_util import index_many_to_one
from static_frame.core.exception import AxisInvalid
from static_frame.core.exception import ErrorInitIndexNonUnique
from static_frame.core.exception import ErrorNPYDecode
from static_frame.core.exception import ErrorNPYEncode
from static_frame.core.index import Index
from static_frame.core.index_base import IndexBase
from static_frame.core.index_datetime import dtype_to_index_cls
from static_frame.core.interface_meta import InterfaceMeta
from static_frame.core.util import DTYPE_OBJECT_KIND
from static_frame.core.util import IndexInitializer
from static_frame.core.util import JSONTranslator
from static_frame.core.util import ManyToOneType
from static_frame.core.util import NameType
from static_frame.core.util import PathSpecifier
from static_frame.core.util import concat_resolved
if tp.TYPE_CHECKING:
import pandas as pd # pylint: disable=W0611 #pragma: no cover
from static_frame.core.frame import Frame # pylint: disable=W0611,C0412 #pragma: no cover
HeaderType = tp.Tuple[np.dtype, bool, tp.Tuple[int, ...]]
HeaderDecodeCacheType = tp.Dict[bytes, HeaderType]
#-------------------------------------------------------------------------------
class NPYConverter:
'''Optimized implementation based on numpy/lib/format.py
'''
MAGIC_PREFIX = b'\x93NUMPY' + bytes((1, 0)) # version 1.0
MAGIC_LEN = len(MAGIC_PREFIX)
ARRAY_ALIGN = 64
STRUCT_FMT = '<H' # version 1.0, unsigned short
STRUCT_FMT_SIZE = struct.calcsize(STRUCT_FMT) # 2 bytes
MAGIC_AND_STRUCT_FMT_SIZE_LEN = MAGIC_LEN + STRUCT_FMT_SIZE
ENCODING = 'latin1' # version 1.0
BUFFERSIZE_NUMERATOR = 16 * 1024 ** 2 # ~16 MB
NDITER_FLAGS = ('external_loop', 'buffered', 'zerosize_ok')
@classmethod
def _header_encode(cls, header: str) -> bytes:
'''
Takes a string header, and attaches the prefix and padding to it.
This is hard-coded to only use Version 1.0
'''
center = header.encode(cls.ENCODING)
hlen = len(center) + 1
padlen = cls.ARRAY_ALIGN - (
(cls.MAGIC_AND_STRUCT_FMT_SIZE_LEN + hlen) % cls.ARRAY_ALIGN
)
prefix = cls.MAGIC_PREFIX + struct.pack(cls.STRUCT_FMT, hlen + padlen)
postfix = b' ' * padlen + b'\n'
return prefix + center + postfix
@classmethod
def to_npy(cls, file: tp.IO[bytes], array: np.ndarray) -> None:
'''Write an NPY 1.0 file to the open, writeable, binary file given by ``file``. NPY 1.0 is used as structured arrays are not supported.
'''
dtype = array.dtype
if dtype.kind == DTYPE_OBJECT_KIND:
preview = repr(array)
raise ErrorNPYEncode(
f'No support for object dtypes: {preview[:40]}{"..." if len(preview) > 40 else ""}')
if dtype.names is not None:
raise ErrorNPYEncode('No support for structured arrays')
if array.ndim == 0 or array.ndim > 2:
raise ErrorNPYEncode('No support for ndim other than 1 and 2.')
flags = array.flags
fortran_order = flags.f_contiguous
# NOTE: do not know how to create array with itmesize 0, assume array.itemsize > 0
# NOTE: derived numpy configuration
buffersize = max(cls.BUFFERSIZE_NUMERATOR // array.itemsize, 1)
header = f'{{"descr":"{dtype.str}","fortran_order":{fortran_order},"shape":{array.shape}}}'
file.write(cls._header_encode(header))
# NOTE: this works but forces copying everything in memory
# if flags.f_contiguous and not flags.c_contiguous:
# file.write(array.T.tobytes())
# else:
# file.write(array.tobytes())
# NOTE: this approach works with normal open files (but not zip files) and is not materially faster than using buffering
# if isinstance(file, io.BufferedWriter):
# if fortran_order and not flags.c_contiguous:
# array.T.tofile(file)
# else:
# array.tofile(file)
# NOTE: this might be made more efficient by creating an ArrayKit function that extracts bytes directly, avoiding creating an array for each chunk.
if fortran_order and not flags.c_contiguous:
for chunk in np.nditer(
array,
flags=cls.NDITER_FLAGS,
buffersize=buffersize,
order='F',
):
file.write(chunk.tobytes('C'))
else:
for chunk in np.nditer(
array,
flags=cls.NDITER_FLAGS,
buffersize=buffersize,
order='C',
):
file.write(chunk.tobytes('C'))
@classmethod
def _header_decode(cls,
file: tp.IO[bytes],
header_decode_cache: HeaderDecodeCacheType,
) -> HeaderType:
'''Extract and decode the header.
'''
length_size = file.read(cls.STRUCT_FMT_SIZE)
# unpack tuple of one element
length_header, = struct.unpack(cls.STRUCT_FMT, length_size)
header = file.read(length_header)
if header not in header_decode_cache:
# eval dict and strip values, relying on order
dtype_str, fortran_order, shape = literal_eval(
header.decode(cls.ENCODING)
).values()
header_decode_cache[header] = np.dtype(dtype_str), fortran_order, shape
return header_decode_cache[header]
@classmethod
def header_from_npy(cls,
file: tp.IO[bytes],
header_decode_cache: HeaderDecodeCacheType,
) -> HeaderType:
'''Utility method to just read the header.
'''
if cls.MAGIC_PREFIX != file.read(cls.MAGIC_LEN):
raise ErrorNPYDecode('Invalid NPY header found.') # COV_MISSING
return cls._header_decode(file, header_decode_cache)
@classmethod
def from_npy(cls,
file: tp.IO[bytes],
header_decode_cache: HeaderDecodeCacheType,
memory_map: bool = False,
) -> tp.Tuple[np.ndarray, tp.Optional[mmap.mmap]]:
'''Read an NPY 1.0 file.
'''
if cls.MAGIC_PREFIX != file.read(cls.MAGIC_LEN):
raise ErrorNPYDecode('Invalid NPY header found.')
dtype, fortran_order, shape = cls._header_decode(file, header_decode_cache)
if dtype.kind == DTYPE_OBJECT_KIND:
raise ErrorNPYDecode('no support for object dtypes')
ndim = len(shape)
if ndim == 1:
size = shape[0]
elif ndim == 2:
size = shape[0] * shape[1]
else:
raise ErrorNPYDecode(f'No support for {ndim}-dimensional arrays')
if memory_map:
# offset calculations derived from numpy/core/memmap.py
offset_header = file.tell()
byte_count = offset_header + size * dtype.itemsize
# ALLOCATIONGRANULARITY is 4096 on linux, if offset_header is 64 (or less than 4096), this will set offset_mmap to zero
offset_mmap = offset_header - offset_header % mmap.ALLOCATIONGRANULARITY
byte_count -= offset_mmap
offset_array = offset_header - offset_mmap
mm = mmap.mmap(file.fileno(),
byte_count,
access=mmap.ACCESS_READ,
offset=offset_mmap,
)
# will always be immutable
array = np.ndarray(shape,
dtype=dtype,
buffer=mm,
offset=offset_array,
order='F' if fortran_order else 'C',
)
# assert not array.flags.writeable
return array, mm
# NOTE: we cannot use np.from_file, as the file object from a Zip is not a normal file
# NOTE: np.frombuffer produces a read-only view on the existing data
# array = np.frombuffer(file.read(size * dtype.itemsize), dtype=dtype)
array = np.frombuffer(file.read(), dtype=dtype)
if fortran_order and ndim == 2:
array.shape = (shape[1], shape[0])
array = array.transpose()
else:
array.shape = shape
# assert not array.flags.writeable
return array, None
#-------------------------------------------------------------------------------
class Archive:
'''Abstraction of a read/write archive, such as a directory or a zip archive. Holds state over the life of writing / reading a Frame.
'''
FILE_META = '__meta__.json'
__slots__ = (
'_memory_map',
'_archive',
'_closable',
'_header_decode_cache',
)
_memory_map: bool
_header_decode_cache: HeaderDecodeCacheType
_archive: tp.Union[ZipFile, PathSpecifier]
# set per subclass
FUNC_REMOVE_FP: tp.Callable[[PathSpecifier], None]
def __init__(self,
fp: PathSpecifier,
writeable: bool,
memory_map: bool,
):
raise NotImplementedError() #pragma: no cover
def __del__(self) -> None:
pass
def __contains__(self, name: str) -> bool:
raise NotImplementedError() # pragma: no cover
def labels(self) -> tp.Iterator[str]:
raise NotImplementedError() # pragma: no cover
def write_array(self, name: str, array: np.ndarray) -> None:
raise NotImplementedError() #pragma: no cover
def read_array(self, name: str) -> np.ndarray:
raise NotImplementedError() #pragma: no cover
def read_array_header(self, name: str) -> HeaderType:
raise NotImplementedError() #pragma: no cover
def size_array(self, name: str) -> int:
raise NotImplementedError() #pragma: no cover
def write_metadata(self, content: tp.Any) -> None:
raise NotImplementedError() #pragma: no cover
def read_metadata(self) -> tp.Any:
raise NotImplementedError() #pragma: no cover
def size_metadata(self) -> int:
raise NotImplementedError() #pragma: no cover
def close(self) -> None:
for f in getattr(self, '_closable', ()):
f.close()
class ArchiveZip(Archive):
'''Archives based on a new ZipFile per Frame; ZipFile creation happens on __init__.
'''
__slots__ = ()
_archive: ZipFile
FUNC_REMOVE_FP = os.remove
def __init__(self,
fp: PathSpecifier,
writeable: bool,
memory_map: bool,
):
mode = 'w' if writeable else 'r'
self._archive = ZipFile(fp, # pylint: disable=R1732
mode=mode,
compression=ZIP_STORED,
allowZip64=True,
)
if not writeable:
self._header_decode_cache = {}
if memory_map:
raise RuntimeError(f'Cannot memory_map with {self}')
self._memory_map = memory_map
def __del__(self) -> None:
# Note: If the fp we were given didn't exist, _archive also doesn't exist.
archive = getattr(self, '_archive', None)
if archive:
archive.close()
def __contains__(self, name: str) -> bool:
try:
self._archive.getinfo(name)
except KeyError:
return False
return True
def labels(self) -> tp.Iterator[str]:
yield from self._archive.namelist()
def write_array(self, name: str, array: np.ndarray) -> None:
# NOTE: zip only has 'w' mode, not 'wb'
# NOTE: force_zip64 required for large files
f = self._archive.open(name, 'w', force_zip64=True) # pylint: disable=R1732
try:
NPYConverter.to_npy(f, array)
finally:
f.close()
def read_array(self, name: str) -> np.ndarray:
f = self._archive.open(name) # pylint: disable=R1732
try:
array, _ = NPYConverter.from_npy(f, self._header_decode_cache)
finally:
f.close()
array.flags.writeable = False
return array
def read_array_header(self, name: str) -> HeaderType:
'''Alternate reader for status displays.
'''
f = self._archive.open(name) # pylint: disable=R1732
try:
header = NPYConverter.header_from_npy(f, self._header_decode_cache)
finally:
f.close()
return header
def size_array(self, name: str) -> int:
return self._archive.getinfo(name).file_size
def write_metadata(self, content: tp.Any) -> None:
# writestr is a method on the ZipFile
self._archive.writestr(
self.FILE_META,
json.dumps(content),
)
def read_metadata(self) -> tp.Any:
return json.loads(self._archive.read(self.FILE_META))
def size_metadata(self) -> int:
return self._archive.getinfo(self.FILE_META).file_size
class ArchiveDirectory(Archive):
'''Archive interface to a directory, where the directory is created on write and NPY files are authored into the files system.
'''
__slots__ = ()
_archive: PathSpecifier
FUNC_REMOVE_FP = shutil.rmtree
def __init__(self,
fp: PathSpecifier,
writeable: bool,
memory_map: bool,
):
if writeable:
# because an error in writing will remove the entire directory, we requires the directory to be newly created
if os.path.exists(fp):
raise RuntimeError(f'Atttempting to write to an existant directory: {fp}')
os.mkdir(fp)
else:
if not os.path.exists(fp):
raise RuntimeError(f'Atttempting to read from a non-existant directory: {fp}')
if not os.path.isdir(fp):
raise RuntimeError(f'A directory must be provided, not {fp}')
self._header_decode_cache = {}
self._archive = fp
self._memory_map = memory_map
def labels(self) -> tp.Iterator[str]:
# NOTE: should this filter?
yield from (f.name for f in os.scandir(self._archive)) #type: ignore
def __contains__(self, name: str) -> bool:
fp = os.path.join(self._archive, name)
return os.path.exists(fp)
def write_array(self, name: str, array: np.ndarray) -> None:
fp = os.path.join(self._archive, name)
f = open(fp, 'wb') # pylint: disable=R1732
try:
NPYConverter.to_npy(f, array)
finally:
f.close()
def read_array(self, name: str) -> np.ndarray:
fp = os.path.join(self._archive, name)
if self._memory_map:
if not hasattr(self, '_closable'):
self._closable = []
f = open(fp, 'rb') # pylint: disable=R1732
try:
array, mm = NPYConverter.from_npy(f,
self._header_decode_cache,
self._memory_map,
)
finally:
f.close() # NOTE: can close the file after creating memory map
# self._closable.append(f)
self._closable.append(mm)
return array
f = open(fp, 'rb') # pylint: disable=R1732
try:
array, _ = NPYConverter.from_npy(f,
self._header_decode_cache,
self._memory_map,
)
finally:
f.close()
return array
def read_array_header(self, name: str) -> HeaderType:
'''Alternate reader for status displays.
'''
fp = os.path.join(self._archive, name)
f = open(fp, 'rb') # pylint: disable=R1732
try:
header = NPYConverter.header_from_npy(f, self._header_decode_cache)
finally:
f.close()
return header
def size_array(self, name: str) -> int:
fp = os.path.join(self._archive, name)
return os.path.getsize(fp)
def write_metadata(self, content: tp.Any) -> None:
fp = os.path.join(self._archive, self.FILE_META)
f = open(fp, 'w', encoding='utf-8') # pylint: disable=R1732
try:
f.write(json.dumps(content))
finally:
f.close()
def read_metadata(self) -> tp.Any:
fp = os.path.join(self._archive, self.FILE_META)
f = open(fp, 'r', encoding='utf-8') # pylint: disable=R1732
try:
post = json.loads(f.read())
finally:
f.close()
return post
def size_metadata(self) -> int:
fp = os.path.join(self._archive, self.FILE_META)
return os.path.getsize(fp)
class ArchiveZipWrapper(Archive):
'''Archive based on a shared (and already open/created) ZipFile.
'''
__slots__ = ('prefix', '_delimiter')
_archive: ZipFile
def __init__(self,
zf: ZipFile,
writeable: bool,
memory_map: bool,
delimiter: str,
):
self._archive = zf
self.prefix = '' # must be directly set by clients
self._delimiter = delimiter
if not writeable:
self._header_decode_cache = {}
if memory_map:
raise RuntimeError(f'Cannot memory_map with {self}')
self._memory_map = memory_map
def labels(self) -> tp.Iterator[str]:
'''Only return unique outer-directory labels, not all contents (NPY) in the file. These labels are exclusively string (they are added post processing with label_encoding).
'''
dir_last = '' # dir name cannot be an empty sting
for name in self._archive.namelist():
# split on the last observed separator
if name.endswith(self._delimiter):
continue #pragma: no cover
dir_current, _ = name.rsplit(self._delimiter, maxsplit=1)
if dir_current != dir_last:
dir_last = dir_current
yield dir_current
def __del__(self) -> None:
# let the creator of the zip perform any cleanup
pass
def __contains__(self, name: str) -> bool:
name = f'{self.prefix}{self._delimiter}{name}'
try:
self._archive.getinfo(name)
except KeyError:
return False
return True
def write_array(self, name: str, array: np.ndarray) -> None:
# NOTE: zip only has 'w' mode, not 'wb'
# NOTE: force_zip64 required for large files
name = f'{self.prefix}{self._delimiter}{name}'
f = self._archive.open(name, 'w', force_zip64=True)
try:
NPYConverter.to_npy(f, array)
finally:
f.close()
def read_array(self, name: str) -> np.ndarray:
name = f'{self.prefix}{self._delimiter}{name}'
f = self._archive.open(name)
try:
array, _ = NPYConverter.from_npy(f, self._header_decode_cache)
finally:
f.close()
array.flags.writeable = False
return array
def read_array_header(self, name: str) -> HeaderType:
'''Alternate reader for status displays.
'''
name = f'{self.prefix}{self._delimiter}{name}'
f = self._archive.open(name)
try:
header = NPYConverter.header_from_npy(f, self._header_decode_cache)
finally:
f.close()
return header
def size_array(self, name: str) -> int:
name = f'{self.prefix}{self._delimiter}{name}'
return self._archive.getinfo(name).file_size
def write_metadata(self, content: tp.Any) -> None:
name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
self._archive.writestr(name,
json.dumps(content),
)
def read_metadata(self) -> tp.Any:
name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
return json.loads(self._archive.read(name))
def size_metadata(self) -> int:
name = f'{self.prefix}{self._delimiter}{self.FILE_META}'
return self._archive.getinfo(name).file_size
#-------------------------------------------------------------------------------
class Label:
KEY_NAMES = '__names__'
KEY_TYPES = '__types__'
KEY_DEPTHS = '__depths__'
KEY_TYPES_INDEX = '__types_index__'
KEY_TYPES_COLUMNS = '__types_columns__'
FILE_TEMPLATE_VALUES_INDEX = '__values_index_{}__.npy'
FILE_TEMPLATE_VALUES_COLUMNS = '__values_columns_{}__.npy'
FILE_TEMPLATE_BLOCKS = '__blocks_{}__.npy'
class ArchiveIndexConverter:
'''Utility methods for converting Index or index components.
'''
@staticmethod
def index_encode(
*,
metadata: tp.Dict[str, tp.Hashable],
archive: Archive,
index: 'IndexBase',
key_template_values: str,
key_types: str,
depth: int,
include: bool,
) -> None:
'''
Args:
metadata: mutates in place with json components for class names of index types.
'''
if depth == 1 and index._map is None: # type: ignore
pass # do not store anything
elif include:
if depth == 1:
archive.write_array(key_template_values.format(0), index.values)
else:
for i in range(depth):
archive.write_array(key_template_values.format(i), index.values_at_depth(i))
metadata[key_types] = [cls.__name__ for cls in index.index_types.values] # type: ignore
@staticmethod
def array_encode(
*,
metadata: tp.Dict[str, tp.Hashable],
archive: Archive,
array: tp.Union[np.ndarray, tp.Iterable[tp.Hashable]],
key_template_values: str,
) -> None:
'''
Args:
metadata: mutates in place with json components
'''
assert array.ndim == 1 # type: ignore
archive.write_array(key_template_values.format(0), array)
@staticmethod
def index_decode(*,
archive: Archive,
metadata: tp.Dict[str, tp.Any],
key_template_values: str,
key_types: str,
depth: int,
cls_index: tp.Type['IndexBase'],
name: NameType,
) -> tp.Optional['IndexBase']:
'''Build index or columns.
'''
from static_frame.core.type_blocks import TypeBlocks
if key_template_values.format(0) not in archive:
index = None
elif depth == 1:
index = cls_index(archive.read_array(key_template_values.format(0)),
name=name,
)
else:
index_tb = TypeBlocks.from_blocks(
archive.read_array(key_template_values.format(i))
for i in range(depth)
)
index_constructors = [ContainerMap.str_to_cls(name)
for name in metadata[key_types]]
index = cls_index._from_type_blocks(index_tb, # type: ignore
name=name,
index_constructors=index_constructors,
)
return index
class ArchiveFrameConverter:
_ARCHIVE_CLS: tp.Type[Archive]
@staticmethod
def frame_encode(*,
archive: Archive,
frame: 'Frame',
include_index: bool = True,
include_columns: bool = True,
consolidate_blocks: bool = False,
) -> None:
metadata: tp.Dict[str, tp.Any] = {}
# NOTE: isolate custom pre-json encoding only where needed: on `name` attributes; the name might be nested tuples, so we cannot assume that name is just a string
metadata[Label.KEY_NAMES] = [
JSONTranslator.encode_element(frame._name),
JSONTranslator.encode_element(frame._index._name),
JSONTranslator.encode_element(frame._columns._name),
]
# do not store Frame class as caller will determine
metadata[Label.KEY_TYPES] = [
frame._index.__class__.__name__,
frame._columns.__class__.__name__,
]
# store shape, index depths
depth_index = frame._index.depth
depth_columns = frame._columns.depth
if consolidate_blocks:
# NOTE: by taking iter, can avoid 2x memory in some circumstances
block_iter = frame._blocks._reblock()
else:
block_iter = iter(frame._blocks._blocks)
ArchiveIndexConverter.index_encode(
metadata=metadata,
archive=archive,
index=frame._index,
key_template_values=Label.FILE_TEMPLATE_VALUES_INDEX,
key_types=Label.KEY_TYPES_INDEX,
depth=depth_index,
include=include_index,
)
ArchiveIndexConverter.index_encode(
metadata=metadata,
archive=archive,
index=frame._columns,
key_template_values=Label.FILE_TEMPLATE_VALUES_COLUMNS,
key_types=Label.KEY_TYPES_COLUMNS,
depth=depth_columns,
include=include_columns,
)
i = 0
for i, array in enumerate(block_iter, 1):
archive.write_array(Label.FILE_TEMPLATE_BLOCKS.format(i-1), array)
metadata[Label.KEY_DEPTHS] = [
i, # block count
depth_index,
depth_columns]
archive.write_metadata(metadata)
@classmethod
def to_archive(cls,
*,
frame: 'Frame',
fp: PathSpecifier,
include_index: bool = True,
include_columns: bool = True,
consolidate_blocks: bool = False,
) -> None:
'''
Write a :obj:`Frame` as an npz file.
'''
archive = cls._ARCHIVE_CLS(fp,
writeable=True,
memory_map=False,
)
try:
cls.frame_encode(
archive=archive,
frame=frame,
include_index=include_index,
include_columns=include_columns,
consolidate_blocks=consolidate_blocks,
)
except ErrorNPYEncode:
archive.close()
archive.__del__() # force cleanup
# fp can be BytesIO in a to_zip_npz scenario
if not isinstance(fp, BytesIO) and os.path.exists(fp): #type: ignore
cls._ARCHIVE_CLS.FUNC_REMOVE_FP(fp)
raise
@classmethod
def frame_decode(cls,
*,
archive: Archive,
constructor: tp.Type['Frame'],
) -> 'Frame':
'''
Create a :obj:`Frame` from an npz file.
'''
from static_frame.core.type_blocks import TypeBlocks
metadata = archive.read_metadata()
# NOTE: we isolate custom post-JSON decoding to only where it is needed: the name attributes. JSON will bring back tuple `name` attributes as lists; these must be converted to tuples to be hashable. Alternatives (like storing repr and using literal_eval) are slower than JSON.
names = metadata[Label.KEY_NAMES]
name = JSONTranslator.decode_element(names[0])
name_index = JSONTranslator.decode_element(names[1])
name_columns = JSONTranslator.decode_element(names[2])
block_count, depth_index, depth_columns = metadata[Label.KEY_DEPTHS]
cls_index, cls_columns = (ContainerMap.str_to_cls(name)
for name in metadata[Label.KEY_TYPES])
index = ArchiveIndexConverter.index_decode(
archive=archive,
metadata=metadata,
key_template_values=Label.FILE_TEMPLATE_VALUES_INDEX,
key_types=Label.KEY_TYPES_INDEX,
depth=depth_index,
cls_index=cls_index,
name=name_index,
)
# we need to align the mutability of the constructor with the Index type on the columns
if constructor.STATIC != cls_columns.STATIC:
if constructor.STATIC:
cls_columns = cls_columns._IMMUTABLE_CONSTRUCTOR #type: ignore
else:
cls_columns = cls_columns._MUTABLE_CONSTRUCTOR #type: ignore
columns = ArchiveIndexConverter.index_decode(
archive=archive,
metadata=metadata,
key_template_values=Label.FILE_TEMPLATE_VALUES_COLUMNS,
key_types=Label.KEY_TYPES_COLUMNS,
depth=depth_columns,
cls_index=cls_columns,
name=name_columns,
)
if block_count:
tb = TypeBlocks.from_blocks(
archive.read_array(Label.FILE_TEMPLATE_BLOCKS.format(i))
for i in range(block_count)
)
else:
tb = TypeBlocks.from_zero_size_shape()
f = constructor(tb,
own_data=True,
index=index,
own_index = False if index is None else True,
columns=columns,
own_columns = False if columns is None else True,
name=name,
)
return f
@classmethod
def from_archive(cls,
*,
constructor: tp.Type['Frame'],
fp: PathSpecifier,
) -> 'Frame':
'''
Create a :obj:`Frame` from an npz file.
'''
archive = cls._ARCHIVE_CLS(fp,
writeable=False,
memory_map=False,
)
f = cls.frame_decode(
archive=archive,
constructor=constructor,
)
return f
@classmethod
def from_archive_mmap(cls,
*,
constructor: tp.Type['Frame'],
fp: PathSpecifier,
) -> tp.Tuple['Frame', tp.Callable[[], None]]:
'''
Create a :obj:`Frame` from an npz file.
'''
archive = cls._ARCHIVE_CLS(fp,
writeable=False,
memory_map=True,
)
f = cls.frame_decode(
archive=archive,
constructor=constructor,
)
return f, archive.close
class NPZFrameConverter(ArchiveFrameConverter):
_ARCHIVE_CLS = ArchiveZip
class NPYFrameConverter(ArchiveFrameConverter):
_ARCHIVE_CLS = ArchiveDirectory
#-------------------------------------------------------------------------------
# for converting from components, unstructured Frames
class ArchiveComponentsConverter(metaclass=InterfaceMeta):
'''
A family of methods to write NPY/NPZ from things other than a Frame, or multi-frame collections like a Bus/Yarn/Quilt but with the intention of production a consolidate Frame, not just a zip of Frames.
'''
_ARCHIVE_CLS: tp.Type[Archive]
__slots__ = (
'_archive',
'_writeable',
)
def __init__(self, fp: PathSpecifier, mode: str = 'r') -> None:
if mode == 'w':
writeable = True
elif mode == 'r':
writeable = False
else:
raise RuntimeError('Invalid value for mode; use "w" or "r"')
self._writeable = writeable
self._archive = self._ARCHIVE_CLS(fp,
writeable=self._writeable,
memory_map=False,
)
def __enter__(self) -> 'ArchiveComponentsConverter':
'''When entering a context manager, a handle to this instance is returned.
'''
return self
def __exit__(self,
type: tp.Type[BaseException],
value: BaseException,
traceback: TracebackType,
) -> None:
'''When exiting a context manager, resources are closed as necessary.
'''
self._archive.close()
self._archive.__del__() # force closing resources
@property
def contents(self) -> 'Frame':
'''
Return a :obj:`Frame` indicating name, dtype, shape, and bytes, of Archive components.
'''
if self._writeable:
raise UnsupportedOperation('Open with mode "r" to get contents.')
from static_frame.core.frame import Frame
def gen() -> tp.Iterator[tp.Tuple[tp.Any, ...]]:
# metadata is in labels; sort by ext,ension first to put at top
for name in sorted(
self._archive.labels(),
key=lambda fn: tuple(reversed(fn.split('.')))
):
# NOTE: will not work with ArchiveZipWrapper
if name == self._archive.FILE_META:
yield (name, self._archive.size_metadata()) + ('', '', '')
else:
header = self._archive.read_array_header(name)
yield (name, self._archive.size_array(name)) + header
f = Frame.from_records(gen(),
columns=('name', 'size', 'dtype', 'fortran', 'shape'),
name=str(self._archive._archive),
)
return f.set_index('name', drop=True) #type: ignore
@property
def nbytes(self) -> int:
'''
Return numer of bytes stored in this archive.
'''
if self._writeable:
raise UnsupportedOperation('Open with mode "r" to get nbytes.')
def gen() -> tp.Iterator[int]:
# metadata is in labels; sort by extension first to put at top
for name in self._archive.labels():
# NOTE: will not work with ArchiveZipWrapper
if name == self._archive.FILE_META:
yield self._archive.size_metadata()
else:
yield self._archive.size_array(name)
return sum(gen())
def from_arrays(self,
blocks: tp.Iterable[np.ndarray],
*,
index: tp.Optional[IndexInitializer] = None,
columns: tp.Optional[IndexInitializer] = None,
name: NameType = None,
axis: int = 0,
) -> None:
'''
Given an iterable of arrays, write out an NPZ or NPY directly, without building up intermediary :obj:`Frame`. If axis 0, the arrays are vertically stacked; if axis 1, they are horizontally stacked. For both axis, if included, indices must be of appropriate length.
Args:
blocks:
*,
index: An array, :obj:`Index`, or :obj:`IndexHierarchy`.
columns: An array, :obj:`Index`, or :obj:`IndexHierarchy`.
name:
axis:
'''
if not self._writeable:
raise UnsupportedOperation('Open with mode "w" to write.')
metadata: tp.Dict[str, tp.Any] = {}
if isinstance(index, IndexBase):
depth_index = index.depth
name_index = index.name
cls_index = index.__class__
ArchiveIndexConverter.index_encode(
metadata=metadata,
archive=self._archive,
index=index,
key_template_values=Label.FILE_TEMPLATE_VALUES_INDEX,
key_types=Label.KEY_TYPES_INDEX,
depth=depth_index,
include=True,
)
elif index is not None:
if index.__class__ is not np.ndarray:
raise RuntimeError('index argument must be an Index, IndexHierarchy, or 1D np.ndarray')
depth_index = 1
name_index = None
cls_index = dtype_to_index_cls(True, index.dtype) #type: ignore
ArchiveIndexConverter.array_encode(
metadata=metadata,
archive=self._archive,
array=index,
key_template_values=Label.FILE_TEMPLATE_VALUES_INDEX,
)
else:
depth_index = 1
name_index = None
cls_index = Index
if isinstance(columns, IndexBase):
depth_columns = columns.depth
name_columns = columns.name
cls_columns = columns.__class__
ArchiveIndexConverter.index_encode(
metadata=metadata,
archive=self._archive,
index=columns,
key_template_values=Label.FILE_TEMPLATE_VALUES_COLUMNS,
key_types=Label.KEY_TYPES_COLUMNS,
depth=depth_columns,
include=True,
)
elif columns is not None:
if columns.__class__ is not np.ndarray:
raise RuntimeError('columns argument must be an Index, IndexHierarchy, or 1D np.ndarray')
depth_columns = 1 # only support 1D
name_columns = None
cls_columns = dtype_to_index_cls(True, columns.dtype) #type: ignore
ArchiveIndexConverter.array_encode(
metadata=metadata,
archive=self._archive,
array=columns,
key_template_values=Label.FILE_TEMPLATE_VALUES_COLUMNS,
)
else:
depth_columns = 1 # only support 1D
name_columns = None
cls_columns = Index
metadata[Label.KEY_NAMES] = [name,
name_index,
name_columns,
]
# do not store Frame class as caller will determine
metadata[Label.KEY_TYPES] = [
cls_index.__name__,
cls_columns.__name__,
]
if axis == 1:
rows = 0
for i, array in enumerate(blocks):
if not rows:
rows = array.shape[0]
else:
if array.shape[0] != rows:
raise RuntimeError('incompatible block shapes')
self._archive.write_array(Label.FILE_TEMPLATE_BLOCKS.format(i), array)
elif axis == 0:
# for now, just vertically concat and write, though this has a 2X memory requirement
resolved = concat_resolved(blocks, axis=0)
# if this results in an obect array, an exception will be raised
self._archive.write_array(Label.FILE_TEMPLATE_BLOCKS.format(0), resolved)
i = 0
else:
raise AxisInvalid(f'invalid axis {axis}')
metadata[Label.KEY_DEPTHS] = [
i + 1, # block count
depth_index,
depth_columns]
self._archive.write_metadata(metadata)
def from_frames(self,
frames: tp.Iterable['Frame'],
*,
include_index: bool = True,
include_columns: bool = True,
axis: int = 0,
union: bool = True,
name: NameType = None,
fill_value: object = np.nan,
) -> None:
'''Given an iterable of Frames, write out an NPZ or NPY directly, without building up an intermediary Frame. If axis 0, the Frames must be block compatible; if axis 1, the Frames must have the same number of rows. For both axis, if included, concatenated indices must be unique or aligned.
Args:
frames:
*
include_index:
include_columns:
axis:
union:
name:
fill_value:
'''
if not self._writeable:
raise UnsupportedOperation('Open with mode "w" to write.')
from static_frame.core.frame import Frame
from static_frame.core.type_blocks import TypeBlocks
frames = [f if isinstance(f, Frame) else f.to_frame(axis) for f in frames] # type: ignore
index: tp.Optional[IndexBase]
# NOTE: based on Frame.from_concat
if axis == 1: # stacks columns (extends rows horizontally)
if include_columns:
try:
columns = index_many_concat(
(f._columns for f in frames),
Index,
)
except ErrorInitIndexNonUnique:
raise RuntimeError('Column names after horizontal concatenation are not unique; set include_columns to None to ignore.') from None
else:
columns = None
if include_index:
index = index_many_to_one(
(f._index for f in frames),
Index,
many_to_one_type=ManyToOneType.UNION if union else ManyToOneType.INTERSECT,
)
else:
raise RuntimeError('Must include index for horizontal alignment.')
def blocks() -> tp.Iterator[np.ndarray]:
for f in frames:
if len(f.index) != len(index) or (f.index != index).any():
f = f.reindex(index=index, fill_value=fill_value)
for block in f._blocks._blocks:
yield block
elif axis == 0: # stacks rows (extends columns vertically)
if include_index:
try:
index = index_many_concat((f._index for f in frames), Index)
except ErrorInitIndexNonUnique:
raise RuntimeError('Index names after vertical concatenation are not unique; set include_index to None to ignore') from None
else:
index = None
if include_columns:
columns = index_many_to_one(
(f._columns for f in frames),
Index,
many_to_one_type=ManyToOneType.UNION if union else ManyToOneType.INTERSECT,
)
else:
raise RuntimeError('Must include columns for vertical alignment.')
def blocks() -> tp.Iterator[np.ndarray]:
type_blocks = []
previous_f: tp.Optional[Frame] = None
block_compatible = True
reblock_compatible = True
for f in frames:
if len(f.columns) != len(columns) or (f.columns != columns).any():
f = f.reindex(columns=columns, fill_value=fill_value)
type_blocks.append(f._blocks)
# column size is all the same by this point
if previous_f is not None: # after the first
if block_compatible:
block_compatible &= f._blocks.block_compatible(
previous_f._blocks,
axis=1) # only compare columns
if reblock_compatible:
reblock_compatible &= f._blocks.reblock_compatible(
previous_f._blocks)
previous_f = f
yield from TypeBlocks.vstack_blocks_to_blocks(
type_blocks=type_blocks,
block_compatible=block_compatible,
reblock_compatible=reblock_compatible,
)
else:
raise AxisInvalid(f'no support for {axis}')
self.from_arrays(
blocks=blocks(),
index=index,
columns=columns,
name=name,
axis=1, # blocks are normalized for horizontal concat
)
[docs]class NPZ(ArchiveComponentsConverter):
'''Utility object for reading characteristics from, or writing new, NPZ files from arrays or :obj:`Frame`.
'''
_ARCHIVE_CLS = ArchiveZip
[docs]class NPY(ArchiveComponentsConverter):
'''Utility object for reading characteristics from, or writing new, NPY directories from arrays or :obj:`Frame`.
'''
_ARCHIVE_CLS = ArchiveDirectory