Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 13 additions & 24 deletions src/bitshuffle.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,31 +80,25 @@ int64_t bshuf_compress_lz4_block(ioc_chain *C_ptr,


/* Decompress and bitunshuffle a single block. */
int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr,
int64_t bshuf_decompress_lz4_block(o_chain *C_ptr,
const size_t size, const size_t elem_size, const int option) {

int64_t nbytes, count;
void *out, *tmp_buf;
const void *in;
size_t this_iter;
int32_t nbytes_from_header;

in = ioc_get_in(C_ptr, &this_iter);
nbytes_from_header = bshuf_read_uint32_BE(in);
ioc_set_next_in(C_ptr, &this_iter,
(void*) ((char*) in + nbytes_from_header + 4));

out = ioc_get_out(C_ptr, &this_iter);
ioc_set_next_out(C_ptr, &this_iter,
(void *) ((char *) out + size * elem_size));
in = C_ptr->in;
nbytes_from_header = C_ptr->nbytes;
out = C_ptr->out;

tmp_buf = malloc(size * elem_size);
if (tmp_buf == NULL) return -1;

nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header,
size * elem_size);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf);
if (nbytes != size * elem_size) {
if (nbytes != (int64_t)(size * elem_size)) {
free(tmp_buf);
return -91;
}
Expand Down Expand Up @@ -166,30 +160,25 @@ int64_t bshuf_compress_zstd_block(ioc_chain *C_ptr,


/* Decompress and bitunshuffle a single block. */
int64_t bshuf_decompress_zstd_block(ioc_chain *C_ptr,
int64_t bshuf_decompress_zstd_block(o_chain *C_ptr,
const size_t size, const size_t elem_size, const int option) {

int64_t nbytes, count;
void *out, *tmp_buf;
const void *in;
size_t this_iter;
int32_t nbytes_from_header;

in = ioc_get_in(C_ptr, &this_iter);
nbytes_from_header = bshuf_read_uint32_BE(in);
ioc_set_next_in(C_ptr, &this_iter,
(void*) ((char*) in + nbytes_from_header + 4));

out = ioc_get_out(C_ptr, &this_iter);
ioc_set_next_out(C_ptr, &this_iter,
(void *) ((char *) out + size * elem_size));
in = C_ptr->in;
nbytes_from_header = C_ptr->nbytes;
out = C_ptr->out;

tmp_buf = malloc(size * elem_size);

if (tmp_buf == NULL) return -1;

nbytes = ZSTD_decompress(tmp_buf, size * elem_size, (void *)((char *) in + 4), nbytes_from_header);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf);
if (nbytes != size * elem_size) {
if (nbytes != (int64_t)(size * elem_size)) {
free(tmp_buf);
return -91;
}
Expand Down Expand Up @@ -242,7 +231,7 @@ int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size,

int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size,
return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_lz4_block, in, out, size,
elem_size, block_size, 0/*option*/);
}

Expand Down Expand Up @@ -278,7 +267,7 @@ int64_t bshuf_compress_zstd(const void* in, void* out, const size_t size,

int64_t bshuf_decompress_zstd(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_decompress_zstd_block, in, out, size,
return bshuf_blocked_decompress_wrap_fun(&bshuf_decompress_zstd_block, in, out, size,
elem_size, block_size, 0/*option*/);
}
#endif // ZSTD_SUPPORT
69 changes: 69 additions & 0 deletions src/bitshuffle_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -1927,6 +1927,75 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,
}


/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out, \
const size_t size, const size_t elem_size, size_t block_size, const int option) {

omp_size_t ii = 0;
size_t nblocks = 0;
int64_t err = 0;
int64_t count, cum_count=0;
size_t last_block_size;
size_t leftover_bytes;
char *last_in;
char *last_out;
o_chain *C;

if (block_size == 0) {
block_size = bshuf_default_block_size(elem_size);
}
if (block_size % BSHUF_BLOCKED_MULT) return -81;

nblocks = (omp_size_t)( size / block_size );

last_block_size = size % block_size;
last_block_size = last_block_size - last_block_size % BSHUF_BLOCKED_MULT;

if (last_block_size){
nblocks += 1;
} else {
/* for the last run of the loop */
last_block_size = block_size;
}


#if defined(_OPENMP)
#pragma omp parallel private(C, count) shared(last_in, last_out,) reduction(+ : cum_count)
{
/* thread local structure */
#endif
C = (struct o_chain *) malloc( sizeof(struct o_chain) );
o_chain_init(C, in, out);
#pragma omp for schedule(static)
for (ii = 0; ii < nblocks; ii ++) {

o_chain_goto( C, ii, block_size * elem_size );

if ( ii < (nblocks - 1) ){
count = fun(C, block_size, elem_size, option);
} else {
count = fun(C, last_block_size, elem_size, option);
last_in = (char *) C->in + C->nbytes + 4;
last_out = (char *) C->out + last_block_size * elem_size;
}
if (count < 0) err = count;
cum_count += count;

} /* for loop */
free(C);
#if defined(_OPENMP)
} /* parallel region */
#endif
if (err < 0) return err;

leftover_bytes = size % BSHUF_BLOCKED_MULT * elem_size;
memcpy(last_out, last_in, leftover_bytes);

return cum_count + leftover_bytes;
}


/* Bitshuffle a single block. */
int64_t bshuf_bitshuffle_block(ioc_chain *C_ptr,
const size_t size, const size_t elem_size, const int option) {
Expand Down
8 changes: 8 additions & 0 deletions src/bitshuffle_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,19 @@ int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size,
typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr,
const size_t size, const size_t elem_size, const int option);

/* Function definition for worker functions that process a single block. */
typedef int64_t (*bshufBlockFunDefDC)(o_chain* C_ptr,
const size_t size, const size_t elem_size, const int option);

/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,
const size_t size, const size_t elem_size, size_t block_size, const int option);

/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_decompress_wrap_fun(bshufBlockFunDefDC fun, const void* in, void* out,
const size_t size, const size_t elem_size, size_t block_size, const int option);
#ifdef __cplusplus
} // extern "C"
#endif
Expand Down
29 changes: 29 additions & 0 deletions src/iochain.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
*/

#include <stdlib.h>
#include "bitshuffle_internals.h"
#include "iochain.h"


Expand Down Expand Up @@ -87,3 +88,31 @@ void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr) {
omp_unset_lock(&(C->in_pl[(*this_iter) % IOC_SIZE].lock));
#endif
}


void o_chain_init( o_chain *C, const void *in, const void *out){
C->i0 = in;
C->o0 = out;
C->in = (void*) in;
C->out = (void*) out;
C->current = 0;
C->nbytes = bshuf_read_uint32_BE(C->in);
}



void o_chain_goto( o_chain *C, size_t ii, size_t osize ){

if ( ii < C->current ){
/* rewind to start. Should never happen with omp ordered */
C->in = (void*) C->i0;
C->current = 0;
C->nbytes = bshuf_read_uint32_BE(C->in);
}
while ( C->current < ii ) {
C->in = (void*) ((char *) C->in + C->nbytes + 4);
C->current++;
C->nbytes = bshuf_read_uint32_BE(C->in);
}
C->out = (void*) ((char *) C->o0 + ii * osize);
}
15 changes: 15 additions & 0 deletions src/iochain.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@
#endif



typedef struct o_chain {
const void *i0;
const void *o0;
void *in;
void *out;
size_t current;
uint32_t nbytes;
} o_chain;


#define IOC_SIZE 33


Expand Down Expand Up @@ -90,4 +101,8 @@ void ioc_set_next_in(ioc_chain *C, size_t* this_iter, void* in_ptr);
void * ioc_get_out(ioc_chain *C, size_t *this_iter);
void ioc_set_next_out(ioc_chain *C, size_t *this_iter, void* out_ptr);


void o_chain_init( o_chain *C, const void *in, const void* out);
void o_chain_goto( o_chain *C, size_t ii, size_t osize );

#endif // IOCHAIN_H