diff --git a/pyfive/h5d.py b/pyfive/h5d.py index 2d70b3a..b19c8c3 100644 --- a/pyfive/h5d.py +++ b/pyfive/h5d.py @@ -14,9 +14,11 @@ from io import UnsupportedOperation from time import time +import os import struct import logging from importlib.metadata import version +from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) @@ -24,7 +26,179 @@ ChunkIndex = namedtuple("ChunkIndex", "chunk_address chunk_dims") -class DatasetID: + +class ChunkRead: + """ + Mixin providing parallel and bulk chunk-reading strategies. + + ``DatasetID`` inherits from this class so that the hot path in + ``_get_selection_via_chunks`` can dispatch to the best available I/O strategy: + + * **Case A - fsspec ``cat_ranges``**: a single bulk request issued to the + filesystem; ideal for remote stores (S3, GCS, https, …). + * **Case B - ``os.pread`` thread pool**: parallel POSIX reads sharing a + single file descriptor without seek contention. + * **Case C - serial fallback**: safe for in-memory buffers and any + custom file-like wrapper. + """ + + # ------------------------------------------------------------------ # + # Shared helpers # + # ------------------------------------------------------------------ # + + def set_parallelism(self, thread_count=0, cat_range_allowed=False): + """ + Configure experimental chunk-read parallelism. + + ``thread_count`` controls POSIX threaded reads via ``os.pread``: + - ``0`` disables threaded reads + - ``>0`` enables threaded reads with that many workers + + ``cat_range_allowed`` enables fsspec bulk reads via ``cat_ranges`` + for compatible non-posix file handles. + + This is a ``pyfive`` API extension, and is opt-in by default as it may not be suitable for all use cases. + It is recommended to enable it when working with remote files, but it may not be suitable for local files. + + """ + if thread_count is None: + thread_count = 0 + thread_count = int(thread_count) + if thread_count < 0: + raise ValueError("thread_count must be >= 0") + + self._thread_count = thread_count + self._cat_range_allowed = bool(cat_range_allowed) + + + def _get_required_chunks(self, indexer): + """ + Walk *indexer* and return a list of + ``(chunk_coords, chunk_selection, out_selection, storeinfo)`` + tuples for every chunk needed to satisfy the selection. + """ + result = [] + for chunk_coords, chunk_selection, out_selection in indexer: + chunk_coords = tuple(map(mul, chunk_coords, self.chunks)) + storeinfo = self._index[chunk_coords] + result.append((chunk_coords, chunk_selection, out_selection, storeinfo)) + return result + + def _decode_chunk(self, chunk_buffer, filter_mask, dtype): + """ + Apply the filter pipeline (if any) and return a shaped ndarray + """ + if self.filter_pipeline is not None: + chunk_buffer = BTreeV1RawDataChunks._filter_chunk( + chunk_buffer, + filter_mask, + self.filter_pipeline, + self.dtype.itemsize, + ) + return np.frombuffer(chunk_buffer, dtype=dtype).reshape( + self.chunks, order=self._order + ) + + def _select_chunks(self, indexer, out, dtype): + """ + Collect required chunks and dispatch I/O to the best strategy. + Called by ``_get_selection_via_chunks`` in place of the serial loop. + """ + chunks = self._get_required_chunks(indexer) + if not chunks: + return + + # Case A: fsspec - bulk parallel fetch via cat_ranges + if not self.posix and self._cat_range_allowed: + fh = self._fh + if hasattr(fh, "fs") and hasattr(fh.fs, "cat_ranges"): + logger.debug("[pyfive] chunk read strategy: fsspec_cat_ranges") + self._read_bulk_fsspec(fh, chunks, out, dtype) + return + + # Case B: POSIX - thread-parallel reads via os.pread + if self.posix and hasattr(os, "pread") and self._thread_count != 0: + logger.debug( + "[pyfive] chunk read strategy: posix_pread_threads workers=%s", + self._thread_count, + ) + self._read_parallel_threads(chunks, out, dtype) + return + + # Case C: serial fallback + logger.debug("[pyfive] chunk read strategy: serial") + self._read_serial(chunks, out, dtype) + + # ------------------------------------------------------------------ # + # Strategy implementations # + # ------------------------------------------------------------------ # + + def _read_serial(self, chunks, out, dtype): + """ + Read one chunk at a time (safe for any file-like object). + """ + fh = self._fh + for _coords, chunk_sel, out_sel, storeinfo in chunks: + fh.seek(storeinfo.byte_offset) + chunk_buffer = fh.read(storeinfo.size) + out[out_sel] = self._decode_chunk( + chunk_buffer, storeinfo.filter_mask, dtype + )[chunk_sel] + if self.posix: + fh.close() + + def _read_parallel_threads(self, chunks, out, dtype): + """ + Thread-parallel read via ``os.pread``. + + ``os.pread`` does not advance the file-position pointer, so all + worker threads share a single open file descriptor safely. + """ + fh = open(self._filename, "rb") + fd = fh.fileno() + + def _read_one(item): + _coords, chunk_sel, out_sel, storeinfo = item + return ( + chunk_sel, + out_sel, + storeinfo.filter_mask, + os.pread(fd, storeinfo.size, storeinfo.byte_offset), + ) + + try: + with ThreadPoolExecutor(max_workers=self._thread_count) as executor: + results = list(executor.map(_read_one, chunks)) + finally: + fh.close() + + for chunk_sel, out_sel, filter_mask, chunk_buffer in results: + out[out_sel] = self._decode_chunk(chunk_buffer, filter_mask, dtype)[ + chunk_sel + ] + + def _read_bulk_fsspec(self, fh, chunks, out, dtype): + """ + Bulk read via ``fsspec`` ``cat_ranges``. + + Issues a single pipelined request for all required byte-ranges, + which on object stores typically translates to a small number of + HTTP range requests rather than one round-trip per chunk. + """ + path = fh.path + starts = [si.byte_offset for _, _, _, si in chunks] + stops = [si.byte_offset + si.size for _, _, _, si in chunks] + buffers = fh.fs.cat_ranges([path] * len(chunks), starts, stops) + + for (_coords, chunk_sel, out_sel, storeinfo), chunk_buffer in zip( + chunks, buffers + ): + out[out_sel] = self._decode_chunk( + chunk_buffer, storeinfo.filter_mask, dtype + )[chunk_sel] + + +class DatasetID(ChunkRead): """ Implements an "HDF5 dataset identifier", which despite the name, actually represents the data of a dataset in a file, and not an identifier. It includes all @@ -100,6 +274,8 @@ def __init__( self.shape = dataobject.shape self.rank = len(self.shape) self.chunks = dataobject.chunks + # Experimental chunk-read settings are opt-in by default. + self.set_parallelism() # experimental code. We need to find out whether or not this # is unnecessary duplication. At the moment it seems best for @@ -679,23 +855,7 @@ def _get_selection_via_chunks(self, args): fh.close() else: - for chunk_coords, chunk_selection, out_selection in indexer: - # map from chunk coordinate space to array space which - # is how hdf5 keeps the index - chunk_coords = tuple(map(mul, chunk_coords, self.chunks)) - filter_mask, chunk_buffer = self.read_direct_chunk(chunk_coords) - if self.filter_pipeline is not None: - # we are only using the class method here, future - # filter pipelines may need their own function - chunk_buffer = BTreeV1RawDataChunks._filter_chunk( - chunk_buffer, - filter_mask, - self.filter_pipeline, - self.dtype.itemsize, - ) - chunk_data = np.frombuffer(chunk_buffer, dtype=dtype) - chunk_data = chunk_data.reshape(self.chunks, order=self._order) - out[out_selection] = chunk_data[chunk_selection] + self._select_chunks(indexer, out, dtype) if isinstance(self._ptype, P5ReferenceType): to_reference = np.vectorize(Reference) diff --git a/pyfive/high_level.py b/pyfive/high_level.py index 242bb2e..fe650b5 100644 --- a/pyfive/high_level.py +++ b/pyfive/high_level.py @@ -281,12 +281,12 @@ def __init__( # Already wrapped self._fh = fh elif type(fh).__name__ == "S3File" or hasattr(fh, "fs"): - # S3 file handle - wrap with buffering + # fsspec file handle - wrap with buffering # We check for the S3File type by name to avoid a hard dependency on s3fs, # but also check for an 'fs' attribute which is common in s3fs file-like objects. # This may yet be too broad, but it is unlikely to cause issues for non-S3 files. logger.info( - "[pyfive] Detected S3 file, enabling metadata buffering (%d MB)", + "[pyfive] Detected remote file, enabling metadata buffering (%d MB)", metadata_buffer_size, ) self._fh = MetadataBufferingWrapper(fh, buffer_size=metadata_buffer_size)