From 4bc5909082b02e215f067b0e16566d62563744c4 Mon Sep 17 00:00:00 2001 From: Jonathan Wright Date: Mon, 6 Feb 2023 15:09:28 +0100 Subject: [PATCH 1/2] Lock free decompression --- src/bitshuffle.c | 37 +++++++------------- src/bitshuffle_core.c | 69 ++++++++++++++++++++++++++++++++++++++ src/bitshuffle_internals.h | 8 +++++ src/iochain.c | 29 ++++++++++++++++ src/iochain.h | 15 +++++++++ 5 files changed, 134 insertions(+), 24 deletions(-) diff --git a/src/bitshuffle.c b/src/bitshuffle.c index ba5cde3a..5c0fc9ab 100644 --- a/src/bitshuffle.c +++ b/src/bitshuffle.c @@ -75,23 +75,17 @@ int64_t bshuf_compress_lz4_block(ioc_chain *C_ptr, \ /* Decompress and bitunshuffle a single block. */ -int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr, +int64_t bshuf_decompress_lz4_block(o_chain *C_ptr, const size_t size, const size_t elem_size, const int option) { int64_t nbytes, count; void *out, *tmp_buf; const void *in; - size_t this_iter; int32_t nbytes_from_header; - in = ioc_get_in(C_ptr, &this_iter); - nbytes_from_header = bshuf_read_uint32_BE(in); - ioc_set_next_in(C_ptr, &this_iter, - (void*) ((char*) in + nbytes_from_header + 4)); - - out = ioc_get_out(C_ptr, &this_iter); - ioc_set_next_out(C_ptr, &this_iter, - (void *) ((char *) out + size * elem_size)); + in = C_ptr->in; + nbytes_from_header = C_ptr->nbytes; + out = C_ptr->out; tmp_buf = malloc(size * elem_size); if (tmp_buf == NULL) return -1; @@ -99,7 +93,7 @@ int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr, nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header, size * elem_size); CHECK_ERR_FREE_LZ(nbytes, tmp_buf); - if (nbytes != size * elem_size) { + if (nbytes != (int64_t)(size * elem_size)) { free(tmp_buf); return -91; } @@ -161,30 +155,25 @@ int64_t bshuf_compress_zstd_block(ioc_chain *C_ptr, \ /* Decompress and bitunshuffle a single block. */ -int64_t bshuf_decompress_zstd_block(ioc_chain *C_ptr, +int64_t bshuf_decompress_zstd_block(o_chain *C_ptr, const size_t size, const size_t elem_size, const int option) { int64_t nbytes, count; void *out, *tmp_buf; const void *in; - size_t this_iter; int32_t nbytes_from_header; - in = ioc_get_in(C_ptr, &this_iter); - nbytes_from_header = bshuf_read_uint32_BE(in); - ioc_set_next_in(C_ptr, &this_iter, - (void*) ((char*) in + nbytes_from_header + 4)); - - out = ioc_get_out(C_ptr, &this_iter); - ioc_set_next_out(C_ptr, &this_iter, - (void *) ((char *) out + size * elem_size)); + in = C_ptr->in; + nbytes_from_header = C_ptr->nbytes; + out = C_ptr->out; tmp_buf = malloc(size * elem_size); + if (tmp_buf == NULL) return -1; nbytes = ZSTD_decompress(tmp_buf, size * elem_size, (void *)((char *) in + 4), nbytes_from_header); CHECK_ERR_FREE_LZ(nbytes, tmp_buf); - if (nbytes != size * elem_size) { + if (nbytes != (int64_t)(size * elem_size)) { free(tmp_buf); return -91; } @@ -237,7 +226,7 @@ int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { - return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size, + return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_lz4_block, in, out, size, elem_size, block_size, 0/*option*/); } @@ -273,7 +262,7 @@ int64_t bshuf_compress_zstd(const void* in, void* out, const size_t size, int64_t bshuf_decompress_zstd(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { - return bshuf_blocked_wrap_fun(&bshuf_decompress_zstd_block, in, out, size, + return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_zstd_block, in, out, size, elem_size, block_size, 0/*option*/); } #endif // ZSTD_SUPPORT diff --git a/src/bitshuffle_core.c b/src/bitshuffle_core.c index ba41473f..56da827c 100644 --- a/src/bitshuffle_core.c +++ b/src/bitshuffle_core.c @@ -1894,6 +1894,75 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, } +/* Wrap a function for processing a single block to process an entire buffer in + * parallel. */ +int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out, \ + const size_t size, const size_t elem_size, size_t block_size, const int option) { + + omp_size_t ii = 0; + size_t nblocks = 0; + int64_t err = 0; + int64_t count, cum_count=0; + size_t last_block_size; + size_t leftover_bytes; + char *last_in; + char *last_out; + o_chain *C; + + if (block_size == 0) { + block_size = bshuf_default_block_size(elem_size); + } + if (block_size % BSHUF_BLOCKED_MULT) return -81; + + nblocks = (omp_size_t)( size / block_size ); + + last_block_size = size % block_size; + last_block_size = last_block_size - last_block_size % BSHUF_BLOCKED_MULT; + + if (last_block_size){ + nblocks += 1; + } else { + /* for the last run of the loop */ + last_block_size = block_size; + } + + +#if defined(_OPENMP) + #pragma omp parallel private(C, count) shared(last_in, last_out,) reduction(+ : cum_count) + { + /* thread local structure */ + C = (struct o_chain *) malloc( sizeof(struct o_chain) ); + o_chain_init(C, in, out); + #pragma omp for schedule(static) +#endif + for (ii = 0; ii < nblocks; ii ++) { + + o_chain_goto( C, ii, block_size * elem_size ); + + if ( ii < (nblocks - 1) ){ + count = fun(C, block_size, elem_size, option); + } else { + count = fun(C, last_block_size, elem_size, option); + last_in = (char *) C->in + C->nbytes + 4; + last_out = (char *) C->out + last_block_size * elem_size; + } + if (count < 0) err = count; + cum_count += count; + + } /* for loop */ + free(C); +#if defined(_OPENMP) + } /* parallel region */ +#endif + if (err < 0) return err; + + leftover_bytes = size % BSHUF_BLOCKED_MULT * elem_size; + memcpy(last_out, last_in, leftover_bytes); + + return cum_count + leftover_bytes; +} + + /* Bitshuffle a single block. */ int64_t bshuf_bitshuffle_block(ioc_chain *C_ptr, \ const size_t size, const size_t elem_size, const int option) { diff --git a/src/bitshuffle_internals.h b/src/bitshuffle_internals.h index 59356f10..08ea83a9 100644 --- a/src/bitshuffle_internals.h +++ b/src/bitshuffle_internals.h @@ -63,11 +63,19 @@ int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size, typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr, const size_t size, const size_t elem_size, const int option); +/* Function definition for worker functions that process a single block. */ +typedef int64_t (*bshufBlockFunDefDC)(o_chain* C_ptr, + const size_t size, const size_t elem_size, const int option); + /* Wrap a function for processing a single block to process an entire buffer in * parallel. */ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size, const int option); +/* Wrap a function for processing a single block to process an entire buffer in + * parallel. */ +int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out, + const size_t size, const size_t elem_size, size_t block_size, const int option); #ifdef __cplusplus } // extern "C" #endif diff --git a/src/iochain.c b/src/iochain.c index 37015614..d2b0846e 100644 --- a/src/iochain.c +++ b/src/iochain.c @@ -11,6 +11,7 @@ */ #include +#include "bitshuffle_internals.h" #include "iochain.h" @@ -88,3 +89,31 @@ void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr) { #endif } + + +void o_chain_init( o_chain *C, const void *in, const void *out){ + C->i0 = in; + C->o0 = out; + C->in = (void*) in; + C->out = (void*) out; + C->current = 0; + C->nbytes = bshuf_read_uint32_BE(C->in); +} + + + +void o_chain_goto( o_chain *C, size_t ii, size_t osize ){ + + if ( ii < C->current ){ + /* rewind to start. Should never happen with omp ordered */ + C->in = (void*) C->i0; + C->current = 0; + C->nbytes = bshuf_read_uint32_BE(C->in); + } + while ( C->current < ii ) { + C->in = (void*) ((char *) C->in + C->nbytes + 4); + C->current++; + C->nbytes = bshuf_read_uint32_BE(C->in); + } + C->out = (void*) ((char *) C->o0 + ii * osize); +} diff --git a/src/iochain.h b/src/iochain.h index 8acafeae..84793685 100644 --- a/src/iochain.h +++ b/src/iochain.h @@ -55,6 +55,17 @@ #endif + +typedef struct o_chain { + const void *i0; + const void *o0; + void *in; + void *out; + size_t current; + uint32_t nbytes; +} o_chain; + + #define IOC_SIZE 33 @@ -90,5 +101,9 @@ void ioc_set_next_in(ioc_chain *C, size_t* this_iter, void* in_ptr); void * ioc_get_out(ioc_chain *C, size_t *this_iter); void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr); + +void o_chain_init( o_chain *C, const void *in, const void* out); +void o_chain_goto( o_chain *C, size_t ii, size_t osize ); + #endif // IOCHAIN_H From 594d4e5ac3045151b96415ce2957f6be59f1ad20 Mon Sep 17 00:00:00 2001 From: Jon Wright Date: Wed, 25 Sep 2024 21:27:56 +0200 Subject: [PATCH 2/2] bugfix for macos build without openmp --- src/bitshuffle_core.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bitshuffle_core.c b/src/bitshuffle_core.c index ea35cb46..7c1bd198 100644 --- a/src/bitshuffle_core.c +++ b/src/bitshuffle_core.c @@ -1931,10 +1931,10 @@ int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in #pragma omp parallel private(C, count) shared(last_in, last_out,) reduction(+ : cum_count) { /* thread local structure */ +#endif C = (struct o_chain *) malloc( sizeof(struct o_chain) ); o_chain_init(C, in, out); #pragma omp for schedule(static) -#endif for (ii = 0; ii < nblocks; ii ++) { o_chain_goto( C, ii, block_size * elem_size );