diff --git a/src/bitshuffle.c b/src/bitshuffle.c index f641b0f..955dbba 100644 --- a/src/bitshuffle.c +++ b/src/bitshuffle.c @@ -80,23 +80,17 @@ int64_t bshuf_compress_lz4_block(ioc_chain *C_ptr, /* Decompress and bitunshuffle a single block. */ -int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr, +int64_t bshuf_decompress_lz4_block(o_chain *C_ptr, const size_t size, const size_t elem_size, const int option) { int64_t nbytes, count; void *out, *tmp_buf; const void *in; - size_t this_iter; int32_t nbytes_from_header; - in = ioc_get_in(C_ptr, &this_iter); - nbytes_from_header = bshuf_read_uint32_BE(in); - ioc_set_next_in(C_ptr, &this_iter, - (void*) ((char*) in + nbytes_from_header + 4)); - - out = ioc_get_out(C_ptr, &this_iter); - ioc_set_next_out(C_ptr, &this_iter, - (void *) ((char *) out + size * elem_size)); + in = C_ptr->in; + nbytes_from_header = C_ptr->nbytes; + out = C_ptr->out; tmp_buf = malloc(size * elem_size); if (tmp_buf == NULL) return -1; @@ -104,7 +98,7 @@ int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr, nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header, size * elem_size); CHECK_ERR_FREE_LZ(nbytes, tmp_buf); - if (nbytes != size * elem_size) { + if (nbytes != (int64_t)(size * elem_size)) { free(tmp_buf); return -91; } @@ -166,30 +160,25 @@ int64_t bshuf_compress_zstd_block(ioc_chain *C_ptr, /* Decompress and bitunshuffle a single block. */ -int64_t bshuf_decompress_zstd_block(ioc_chain *C_ptr, +int64_t bshuf_decompress_zstd_block(o_chain *C_ptr, const size_t size, const size_t elem_size, const int option) { int64_t nbytes, count; void *out, *tmp_buf; const void *in; - size_t this_iter; int32_t nbytes_from_header; - in = ioc_get_in(C_ptr, &this_iter); - nbytes_from_header = bshuf_read_uint32_BE(in); - ioc_set_next_in(C_ptr, &this_iter, - (void*) ((char*) in + nbytes_from_header + 4)); - - out = ioc_get_out(C_ptr, &this_iter); - ioc_set_next_out(C_ptr, &this_iter, - (void *) ((char *) out + size * elem_size)); + in = C_ptr->in; + nbytes_from_header = C_ptr->nbytes; + out = C_ptr->out; tmp_buf = malloc(size * elem_size); + if (tmp_buf == NULL) return -1; nbytes = ZSTD_decompress(tmp_buf, size * elem_size, (void *)((char *) in + 4), nbytes_from_header); CHECK_ERR_FREE_LZ(nbytes, tmp_buf); - if (nbytes != size * elem_size) { + if (nbytes != (int64_t)(size * elem_size)) { free(tmp_buf); return -91; } @@ -242,7 +231,7 @@ int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { - return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size, + return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_lz4_block, in, out, size, elem_size, block_size, 0/*option*/); } @@ -278,7 +267,7 @@ int64_t bshuf_compress_zstd(const void* in, void* out, const size_t size, int64_t bshuf_decompress_zstd(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { - return bshuf_blocked_wrap_fun(&bshuf_decompress_zstd_block, in, out, size, + return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_zstd_block, in, out, size, elem_size, block_size, 0/*option*/); } #endif // ZSTD_SUPPORT diff --git a/src/bitshuffle_core.c b/src/bitshuffle_core.c index 985d768..8ae0c4b 100644 --- a/src/bitshuffle_core.c +++ b/src/bitshuffle_core.c @@ -1927,6 +1927,75 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, } +/* Wrap a function for processing a single block to process an entire buffer in + * parallel. */ +int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out, \ + const size_t size, const size_t elem_size, size_t block_size, const int option) { + + omp_size_t ii = 0; + size_t nblocks = 0; + int64_t err = 0; + int64_t count, cum_count=0; + size_t last_block_size; + size_t leftover_bytes; + char *last_in; + char *last_out; + o_chain *C; + + if (block_size == 0) { + block_size = bshuf_default_block_size(elem_size); + } + if (block_size % BSHUF_BLOCKED_MULT) return -81; + + nblocks = (omp_size_t)( size / block_size ); + + last_block_size = size % block_size; + last_block_size = last_block_size - last_block_size % BSHUF_BLOCKED_MULT; + + if (last_block_size){ + nblocks += 1; + } else { + /* for the last run of the loop */ + last_block_size = block_size; + } + + +#if defined(_OPENMP) + #pragma omp parallel private(C, count) shared(last_in, last_out,) reduction(+ : cum_count) + { + /* thread local structure */ +#endif + C = (struct o_chain *) malloc( sizeof(struct o_chain) ); + o_chain_init(C, in, out); + #pragma omp for schedule(static) + for (ii = 0; ii < nblocks; ii ++) { + + o_chain_goto( C, ii, block_size * elem_size ); + + if ( ii < (nblocks - 1) ){ + count = fun(C, block_size, elem_size, option); + } else { + count = fun(C, last_block_size, elem_size, option); + last_in = (char *) C->in + C->nbytes + 4; + last_out = (char *) C->out + last_block_size * elem_size; + } + if (count < 0) err = count; + cum_count += count; + + } /* for loop */ + free(C); +#if defined(_OPENMP) + } /* parallel region */ +#endif + if (err < 0) return err; + + leftover_bytes = size % BSHUF_BLOCKED_MULT * elem_size; + memcpy(last_out, last_in, leftover_bytes); + + return cum_count + leftover_bytes; +} + + /* Bitshuffle a single block. */ int64_t bshuf_bitshuffle_block(ioc_chain *C_ptr, const size_t size, const size_t elem_size, const int option) { diff --git a/src/bitshuffle_internals.h b/src/bitshuffle_internals.h index 74e0567..aaac5a4 100644 --- a/src/bitshuffle_internals.h +++ b/src/bitshuffle_internals.h @@ -69,11 +69,19 @@ int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size, typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr, const size_t size, const size_t elem_size, const int option); +/* Function definition for worker functions that process a single block. */ +typedef int64_t (*bshufBlockFunDefDC)(o_chain* C_ptr, + const size_t size, const size_t elem_size, const int option); + /* Wrap a function for processing a single block to process an entire buffer in * parallel. */ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size, const int option); +/* Wrap a function for processing a single block to process an entire buffer in + * parallel. */ +int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out, + const size_t size, const size_t elem_size, size_t block_size, const int option); #ifdef __cplusplus } // extern "C" #endif diff --git a/src/iochain.c b/src/iochain.c index 1822c43..e29f426 100644 --- a/src/iochain.c +++ b/src/iochain.c @@ -11,6 +11,7 @@ */ #include +#include "bitshuffle_internals.h" #include "iochain.h" @@ -87,3 +88,31 @@ void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr) { omp_unset_lock(&(C->in_pl[(*this_iter) % IOC_SIZE].lock)); #endif } + + +void o_chain_init( o_chain *C, const void *in, const void *out){ + C->i0 = in; + C->o0 = out; + C->in = (void*) in; + C->out = (void*) out; + C->current = 0; + C->nbytes = bshuf_read_uint32_BE(C->in); +} + + + +void o_chain_goto( o_chain *C, size_t ii, size_t osize ){ + + if ( ii < C->current ){ + /* rewind to start. Should never happen with omp ordered */ + C->in = (void*) C->i0; + C->current = 0; + C->nbytes = bshuf_read_uint32_BE(C->in); + } + while ( C->current < ii ) { + C->in = (void*) ((char *) C->in + C->nbytes + 4); + C->current++; + C->nbytes = bshuf_read_uint32_BE(C->in); + } + C->out = (void*) ((char *) C->o0 + ii * osize); +} \ No newline at end of file diff --git a/src/iochain.h b/src/iochain.h index d6923f8..8a629f2 100644 --- a/src/iochain.h +++ b/src/iochain.h @@ -55,6 +55,17 @@ #endif + +typedef struct o_chain { + const void *i0; + const void *o0; + void *in; + void *out; + size_t current; + uint32_t nbytes; +} o_chain; + + #define IOC_SIZE 33 @@ -90,4 +101,8 @@ void ioc_set_next_in(ioc_chain *C, size_t* this_iter, void* in_ptr); void * ioc_get_out(ioc_chain *C, size_t *this_iter); void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr); + +void o_chain_init( o_chain *C, const void *in, const void* out); +void o_chain_goto( o_chain *C, size_t ii, size_t osize ); + #endif // IOCHAIN_H