Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 178 additions & 18 deletions pyfive/h5d.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,191 @@
from io import UnsupportedOperation
from time import time

import os
import struct
import logging
from importlib.metadata import version
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)

StoreInfo = namedtuple("StoreInfo", "chunk_offset filter_mask byte_offset size")
ChunkIndex = namedtuple("ChunkIndex", "chunk_address chunk_dims")


class DatasetID:

class ChunkRead:
"""
Mixin providing parallel and bulk chunk-reading strategies.

``DatasetID`` inherits from this class so that the hot path in
``_get_selection_via_chunks`` can dispatch to the best available I/O strategy:

* **Case A - fsspec ``cat_ranges``**: a single bulk request issued to the
filesystem; ideal for remote stores (S3, GCS, https, …).
* **Case B - ``os.pread`` thread pool**: parallel POSIX reads sharing a
single file descriptor without seek contention.
* **Case C - serial fallback**: safe for in-memory buffers and any
custom file-like wrapper.
"""

# ------------------------------------------------------------------ #
# Shared helpers #
# ------------------------------------------------------------------ #

def set_parallelism(self, thread_count=0, cat_range_allowed=False):
"""
Configure experimental chunk-read parallelism.

``thread_count`` controls POSIX threaded reads via ``os.pread``:
- ``0`` disables threaded reads
- ``>0`` enables threaded reads with that many workers

``cat_range_allowed`` enables fsspec bulk reads via ``cat_ranges``
for compatible non-posix file handles.

This is a ``pyfive`` API extension, and is opt-in by default as it may not be suitable for all use cases.
It is recommended to enable it when working with remote files, but it may not be suitable for local files.

"""
if thread_count is None:
thread_count = 0
thread_count = int(thread_count)
if thread_count < 0:
raise ValueError("thread_count must be >= 0")

self._thread_count = thread_count
self._cat_range_allowed = bool(cat_range_allowed)


def _get_required_chunks(self, indexer):
"""
Walk *indexer* and return a list of
``(chunk_coords, chunk_selection, out_selection, storeinfo)``
tuples for every chunk needed to satisfy the selection.
"""
result = []
for chunk_coords, chunk_selection, out_selection in indexer:
chunk_coords = tuple(map(mul, chunk_coords, self.chunks))
storeinfo = self._index[chunk_coords]
result.append((chunk_coords, chunk_selection, out_selection, storeinfo))
return result

def _decode_chunk(self, chunk_buffer, filter_mask, dtype):
"""
Apply the filter pipeline (if any) and return a shaped ndarray
"""
if self.filter_pipeline is not None:
chunk_buffer = BTreeV1RawDataChunks._filter_chunk(
chunk_buffer,
filter_mask,
self.filter_pipeline,
self.dtype.itemsize,
)
return np.frombuffer(chunk_buffer, dtype=dtype).reshape(
self.chunks, order=self._order
)

def _select_chunks(self, indexer, out, dtype):
"""
Collect required chunks and dispatch I/O to the best strategy.
Called by ``_get_selection_via_chunks`` in place of the serial loop.
"""
chunks = self._get_required_chunks(indexer)
if not chunks:
return

# Case A: fsspec - bulk parallel fetch via cat_ranges
if not self.posix and self._cat_range_allowed:
fh = self._fh
if hasattr(fh, "fs") and hasattr(fh.fs, "cat_ranges"):
logger.debug("[pyfive] chunk read strategy: fsspec_cat_ranges")
self._read_bulk_fsspec(fh, chunks, out, dtype)
return

# Case B: POSIX - thread-parallel reads via os.pread
if self.posix and hasattr(os, "pread") and self._thread_count != 0:
logger.debug(
"[pyfive] chunk read strategy: posix_pread_threads workers=%s",
self._thread_count,
)
self._read_parallel_threads(chunks, out, dtype)
return

# Case C: serial fallback
logger.debug("[pyfive] chunk read strategy: serial")
self._read_serial(chunks, out, dtype)

# ------------------------------------------------------------------ #
# Strategy implementations #
# ------------------------------------------------------------------ #

def _read_serial(self, chunks, out, dtype):
"""
Read one chunk at a time (safe for any file-like object).
"""
fh = self._fh
for _coords, chunk_sel, out_sel, storeinfo in chunks:
fh.seek(storeinfo.byte_offset)
chunk_buffer = fh.read(storeinfo.size)
out[out_sel] = self._decode_chunk(
chunk_buffer, storeinfo.filter_mask, dtype
)[chunk_sel]
if self.posix:
fh.close()

def _read_parallel_threads(self, chunks, out, dtype):
"""
Thread-parallel read via ``os.pread``.

``os.pread`` does not advance the file-position pointer, so all
worker threads share a single open file descriptor safely.
"""
fh = open(self._filename, "rb")
fd = fh.fileno()

def _read_one(item):
_coords, chunk_sel, out_sel, storeinfo = item
return (
chunk_sel,
out_sel,
storeinfo.filter_mask,
os.pread(fd, storeinfo.size, storeinfo.byte_offset),
)

try:
with ThreadPoolExecutor(max_workers=self._thread_count) as executor:
results = list(executor.map(_read_one, chunks))
finally:
fh.close()

for chunk_sel, out_sel, filter_mask, chunk_buffer in results:
out[out_sel] = self._decode_chunk(chunk_buffer, filter_mask, dtype)[
chunk_sel
]

def _read_bulk_fsspec(self, fh, chunks, out, dtype):
"""
Bulk read via ``fsspec`` ``cat_ranges``.

Issues a single pipelined request for all required byte-ranges,
which on object stores typically translates to a small number of
HTTP range requests rather than one round-trip per chunk.
"""
path = fh.path
starts = [si.byte_offset for _, _, _, si in chunks]
stops = [si.byte_offset + si.size for _, _, _, si in chunks]
buffers = fh.fs.cat_ranges([path] * len(chunks), starts, stops)

for (_coords, chunk_sel, out_sel, storeinfo), chunk_buffer in zip(
chunks, buffers
):
out[out_sel] = self._decode_chunk(
chunk_buffer, storeinfo.filter_mask, dtype
)[chunk_sel]


class DatasetID(ChunkRead):
"""
Implements an "HDF5 dataset identifier", which despite the name, actually
represents the data of a dataset in a file, and not an identifier. It includes all
Expand Down Expand Up @@ -100,6 +274,8 @@ def __init__(
self.shape = dataobject.shape
self.rank = len(self.shape)
self.chunks = dataobject.chunks
# Experimental chunk-read settings are opt-in by default.
self.set_parallelism()

# experimental code. We need to find out whether or not this
# is unnecessary duplication. At the moment it seems best for
Expand Down Expand Up @@ -679,23 +855,7 @@ def _get_selection_via_chunks(self, args):
fh.close()

else:
for chunk_coords, chunk_selection, out_selection in indexer:
# map from chunk coordinate space to array space which
# is how hdf5 keeps the index
chunk_coords = tuple(map(mul, chunk_coords, self.chunks))
filter_mask, chunk_buffer = self.read_direct_chunk(chunk_coords)
if self.filter_pipeline is not None:
# we are only using the class method here, future
# filter pipelines may need their own function
chunk_buffer = BTreeV1RawDataChunks._filter_chunk(
chunk_buffer,
filter_mask,
self.filter_pipeline,
self.dtype.itemsize,
)
chunk_data = np.frombuffer(chunk_buffer, dtype=dtype)
chunk_data = chunk_data.reshape(self.chunks, order=self._order)
out[out_selection] = chunk_data[chunk_selection]
self._select_chunks(indexer, out, dtype)

if isinstance(self._ptype, P5ReferenceType):
to_reference = np.vectorize(Reference)
Expand Down
4 changes: 2 additions & 2 deletions pyfive/high_level.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,12 +281,12 @@ def __init__(
# Already wrapped
self._fh = fh
elif type(fh).__name__ == "S3File" or hasattr(fh, "fs"):
# S3 file handle - wrap with buffering
# fsspec file handle - wrap with buffering
# We check for the S3File type by name to avoid a hard dependency on s3fs,
# but also check for an 'fs' attribute which is common in s3fs file-like objects.
# This may yet be too broad, but it is unlikely to cause issues for non-S3 files.
logger.info(
"[pyfive] Detected S3 file, enabling metadata buffering (%d MB)",
"[pyfive] Detected remote file, enabling metadata buffering (%d MB)",
metadata_buffer_size,
)
self._fh = MetadataBufferingWrapper(fh, buffer_size=metadata_buffer_size)
Expand Down
Loading