from __future__ import annotations
import gzip
import os
import tempfile
from io import BytesIO
from io import StringIO
from pathlib import Path
from urllib import request
from urllib.parse import quote
from urllib.parse import urlparse
from urllib.parse import urlunparse
from zipfile import ZipFile
import typing_extensions as tp
from static_frame.core.doc_str import doc_inject
if tp.TYPE_CHECKING:
from types import TracebackType # pragma: no cover
class StringIOTemporaryFile(StringIO):
'''Subclass of a StringIO that reads from a managed file that is deleted when this instance goes out of scope.
'''
def __init__(self, fp: Path, encoding: str) -> None:
self._fp = fp
self._file = open(fp, 'r', encoding=encoding)
super().__init__()
def __del__(self) -> None:
self._file.close()
os.unlink(self._fp)
super().__del__()
def seek(self, offset: int, whence: int = 0) -> int:
return self._file.seek(offset, whence)
def read(self, size: tp.Optional[int] =-1) -> str:
return self._file.read(size)
def readline(self, size: int = -1) -> str: # type: ignore
return self._file.readline(size)
def __iter__(self) -> tp.Iterator[str]: # type: ignore
return self._file.__iter__()
class BytesIOTemporaryFile(BytesIO):
'''Subclass of a BytesIO that reads from a managed file that is deleted when this instance goes out of scope.
'''
def __init__(self, fp: Path) -> None:
self._fp = fp
self._file = open(fp, 'rb')
super().__init__()
def __del__(self) -> None:
self._file.close()
os.unlink(self._fp)
super().__del__()
def seek(self, offset: int, whence: int = 0) -> int:
return self._file.seek(offset, whence)
def read(self, size: tp.Optional[int] = -1) -> bytes:
return self._file.read(size)
def readline(self, size: tp.Optional[int] = -1) -> bytes:
return self._file.readline(size)
def __iter__(self) -> tp.Iterator[bytes]:
return self._file.__iter__()
#-------------------------------------------------------------------------------
class MaybeTemporaryFile:
'''Provide one context manager that, if an `fp` is given, works as a normal file; if no `fp` is given, produce a temporary file.
'''
def __init__(self, fp: tp.Optional[Path], mode: str, encoding: str):
if fp:
self._f = open(fp, mode=mode, encoding=encoding)
else:
self._f = tempfile.NamedTemporaryFile(mode=mode,
suffix=None,
delete=False,
encoding=encoding,
)
def __enter__(self) -> tp.IO[tp.Any]:
return self._f.__enter__()
def __exit__(self,
type: tp.Type[BaseException],
value: BaseException,
traceback: TracebackType,
) -> None:
self._f.__exit__(type, value, traceback)
#-------------------------------------------------------------------------------
WWWReturnType = tp.Union[Path, StringIO, BytesIO]
[docs]
class WWW:
'''Utilities for downloading resources from the world-wide-web.
'''
__slots__ = ()
@staticmethod
def _url_prepare(url: str) -> str:
'''Remove leading trailing white space, quote the path component to handle spaces. This does not handling spaces in queries
'''
url_parts = urlparse(url.strip())
return urlunparse(
url_parts._replace(path=quote(url_parts.path))
)
@classmethod
def _download_archive(cls,
url: tp.Union[str, request.Request],
in_memory: bool,
buffer_size: int,
extension: str,
) -> tp.Union[Path, BytesIO]:
archive: tp.Union[Path, BytesIO]
if isinstance(url, str):
url = cls._url_prepare(url)
with request.urlopen(url) as response:
if in_memory:
archive = BytesIO(response.read())
else:
with tempfile.NamedTemporaryFile(mode='wb',
suffix=extension,
delete=False,
) as f:
archive = Path(f.name)
while True:
b = response.read(buffer_size)
if b:
f.write(b)
else:
break
return archive
@staticmethod
def _resolve_fp_and_in_memory(
in_memory: tp.Optional[bool],
fp: tp.Optional[tp.Union[Path, str]] = None,
) -> tp.Tuple[bool, tp.Optional[Path]]:
'''
If an fp is given and in_memory is True, error; else, in_memory is set to False; if an fp is not given and in_memory is None, default to True, else use in_memory.
'''
if fp is not None:
if in_memory is True:
raise RuntimeError('If supplying an `fp`, `in_memory` cannot be True.')
in_memory = False
if isinstance(fp, str): # just to pass Path
fp = Path(fp)
else:
in_memory = True if in_memory is None else in_memory
return in_memory, fp
@staticmethod
def _write_maybe_temporary(
fp: tp.Optional[Path],
encoding: str,
extractor: tp.Callable[[], tp.Union[str, bytes]]
) -> WWWReturnType:
with MaybeTemporaryFile(fp=fp,
mode='w' if encoding else 'wb',
encoding=encoding,
) as f:
fp_written = Path(f.name)
while True: # can use iter() function with for
b = extractor()
if b:
f.write(b)
else:
break
if fp:
return fp_written
if encoding:
return StringIOTemporaryFile(fp_written, encoding=encoding)
return BytesIOTemporaryFile(fp_written)
#---------------------------------------------------------------------------
[docs]
@classmethod
@doc_inject(selector='www')
def from_file(cls,
url: tp.Union[str, request.Request],
*,
encoding: str = 'utf-8',
in_memory: tp.Optional[bool] = None,
buffer_size: int = 8192,
fp: tp.Optional[tp.Union[Path, str]] = None,
) -> WWWReturnType:
'''
{doc}
Args:
{url}
{encoding}
{in_memory}
{buffer_size}
{fp}
'''
in_memory, fp = cls._resolve_fp_and_in_memory(in_memory, fp)
with request.urlopen(url) as response:
if in_memory:
if encoding:
return StringIO(response.read().decode(encoding))
else:
return BytesIO(response.read())
if encoding:
extractor = lambda: response.read(buffer_size).decode(encoding)
else:
extractor = lambda: response.read(buffer_size)
return cls._write_maybe_temporary(
fp=fp,
encoding=encoding,
extractor=extractor,
)
[docs]
@classmethod
@doc_inject(selector='www')
def from_zip(cls,
url: tp.Union[str, request.Request],
*,
encoding: str = 'utf-8',
in_memory: tp.Optional[bool] = None,
buffer_size: int = 8192,
fp: tp.Optional[tp.Union[Path, str]] = None,
component: tp.Optional[str] = None,
) -> WWWReturnType:
'''
{doc}
Args:
{url}
{encoding}
{in_memory}
{buffer_size}
{fp}
{component}
'''
in_memory, fp = cls._resolve_fp_and_in_memory(in_memory, fp)
archive = cls._download_archive(url=url,
in_memory=in_memory,
buffer_size=buffer_size,
extension='.zip',
)
with ZipFile(archive) as zf:
names = zf.namelist()
if component:
name = component
else:
if len(names) > 1:
samples = ', '.join(names[:20])
etc = '...' if len(samples) > 20 else ''
raise RuntimeError(f'More than one file found in zip archive ({samples}{etc}); name a single file with the `component` argument.')
name = names.pop()
data = zf.read(name)
data_io: tp.Union[StringIO, BytesIO]
if encoding:
data_io = StringIO(data.decode(encoding))
else:
data_io = BytesIO(data)
if in_memory:
return data_io
# not in-memory, write a file, delete archive
os.unlink(archive) # type: ignore
return cls._write_maybe_temporary(
fp=fp,
encoding=encoding,
extractor=data_io.read,
)
[docs]
@classmethod
@doc_inject(selector='www')
def from_gzip(cls,
url: tp.Union[str, request.Request],
*,
encoding: str = 'utf-8',
in_memory: tp.Optional[bool] = None,
buffer_size: int = 8192,
fp: tp.Optional[tp.Union[Path, str]] = None,
) -> WWWReturnType:
'''
{doc}
Args:
{url}
{encoding}
{in_memory}
{buffer_size}
{fp}
'''
in_memory, fp = cls._resolve_fp_and_in_memory(in_memory, fp)
archive = cls._download_archive(url=url,
in_memory=in_memory,
buffer_size=buffer_size,
extension='.gzip',
)
with gzip.open(archive) as gz:
data = gz.read()
data_io: tp.Union[StringIO, BytesIO]
if encoding:
data_io = StringIO(data.decode(encoding))
else:
data_io = BytesIO(data)
if in_memory:
return data_io
# not in-memory, write a file, delete archive
os.unlink(archive) # type: ignore
return cls._write_maybe_temporary(
fp=fp,
encoding=encoding,
extractor=data_io.read,
)