diff --git a/MANIFEST.in b/MANIFEST.in index b26acdf..b00e58a 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,2 +1,3 @@ include versioneer.py include starepandas/_version.py +include starepandas/.config diff --git a/setup.cfg b/setup.cfg index 1fb9a25..758b6b0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -19,42 +19,60 @@ project_urls = Bug Tracker = https://github.com/SpatioTemporal/STAREPandas/issues classifiers = Programming Language :: Python :: 3 - License :: OSI Approved :: MIT License Operating System :: OS Independent +license = MIT [options] -packages = find: python_requires = >=3.12 +packages = find: install_requires = numpy>=2.2.1 geopandas>=0.14.0 shapely>=2.0 pandas>=2.0.3 - dask>=2025.1.0 - distributed>=2022.9.0 + # Dask stack: keep these EXACTLY the same version + dask==2025.7.0 + distributed==2025.7.0 + fsspec==2025.7.0 + s3fs==2025.7.0 + # ---- AWS bits ---- + # If you DO NOT directly import boto3, remove it entirely to avoid conflicts + # boto3>=1.23.10 <-- remove this line + # Optionally *enforce* a matching boto3 via aiobotocore extras: + aiobotocore[boto3]>=2.24,<2.25 + # ------------------- cartopy>=0.22.0 pystare>=0.8.14 h5py>=3.7.0 pyarrow>=19.0.0 pyhdf>=0.10.5 netCDF4>=1.5.8 - boto3>=1.23.10 astropy>=7.0.0 matplotlib>=3.5.2 - rasterio>=1.3.0 - xarray>=2022.6.0 + rasterio>=1.4.1 + # Old xarray + new dask is risky; bump xarray to a modern release + xarray>=2025.8.0 rtree>=1.0.0 scipy>=1.9.1 puremagic>=1.14 sqlalchemy>=2.0.18 GeoAlchemy2>=0.14.0 psycopg2>=2.9.6 + zarr>=2.15.0 # peg sqlalchemy version until pandas fixes https://github.com/pandas-dev/pandas/issues/51015 include_package_data = True +[options.packages.find] +exclude = + figures + tests + examples + docs + [options.package_data] +starepandas = .config starepandas.datasets = '*.hdf', '*.nc' diff --git a/starepandas/.config b/starepandas/.config new file mode 100644 index 0000000..9b055a9 --- /dev/null +++ b/starepandas/.config @@ -0,0 +1,9 @@ +key=[Key] +secret=[Secret] +region_name=[region] +rds_host=[host] +port=[port] +username=[username] +password=[password] +database=[database] + diff --git a/starepandas/io/granules/granule.py b/starepandas/io/granules/granule.py index 43fdb7e..dfafa01 100644 --- a/starepandas/io/granules/granule.py +++ b/starepandas/io/granules/granule.py @@ -3,6 +3,149 @@ import numpy import pystare import pandas +import netCDF4 + + +class Sidecar: + """Helper class for creating STARE sidecar files.""" + + def __init__(self, granule_path, out_path=None): + self.file_path = self.name_from_granule(granule_path, out_path) + self.create() + self.zlib = True + self.shuffle = True + + def name_from_granule(self, granule_path, out_path): + if out_path: + return out_path + '.'.join(granule_path.split('/')[-1].split('.')[0:-1]) + '_stare.nc' + else: + return '.'.join(granule_path.split('.')[0:-1]) + '_stare.nc' + + def create(self): + with netCDF4.Dataset(self.file_path, "w", format="NETCDF4") as rootgrp: + pass + + def write_dimension(self, name, length, group=None): + with netCDF4.Dataset(self.file_path , 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + grp.createDimension(name, length) + + def write_dimensions(self, i, j, l, nom_res=None, group=None): + i_name = 'i' + j_name = 'j' + if nom_res: + i_name += '_{nom_res}'.format(nom_res=nom_res) + j_name += '_{nom_res}'.format(nom_res=nom_res) + with netCDF4.Dataset(self.file_path, 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + grp.createDimension(i_name, i) + grp.createDimension(j_name, j) + + def write_lons(self, lons, nom_res=None, group=None, fill_value=None): + i = lons.shape[0] + j = lons.shape[1] + varname = 'Longitude' + i_name = 'i' + j_name = 'j' + if nom_res: + varname += '_{nom_res}'.format(nom_res=nom_res) + i_name += '_{nom_res}'.format(nom_res=nom_res) + j_name += '_{nom_res}'.format(nom_res=nom_res) + with netCDF4.Dataset(self.file_path, 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + lons_netcdf = grp.createVariable(varname=varname, + datatype='f4', + dimensions=(i_name, j_name), + chunksizes=[i, j], + shuffle=self.shuffle, + zlib=self.zlib, + fill_value=fill_value) + lons_netcdf.long_name = 'Longitude' + lons_netcdf.units = 'degrees_east' + lons_netcdf[:, :] = lons + + def write_lats(self, lats, nom_res=None, group=None, fill_value=None): + i = lats.shape[0] + j = lats.shape[1] + varname = 'Latitude' + i_name = 'i' + j_name = 'j' + if nom_res: + varname += '_{nom_res}'.format(nom_res=nom_res) + i_name += '_{nom_res}'.format(nom_res=nom_res) + j_name += '_{nom_res}'.format(nom_res=nom_res) + with netCDF4.Dataset(self.file_path, 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + lats_netcdf = grp.createVariable(varname=varname, + datatype='f4', + dimensions=(i_name, j_name), + chunksizes=[i, j], + shuffle=self.shuffle, + zlib=self.zlib, + fill_value=fill_value) + lats_netcdf.long_name = 'Latitude' + lats_netcdf.units = 'degrees_north' + lats_netcdf[:, :] = lats + + def write_sids(self, sids, nom_res=None, group=None, fill_value=0): + i = sids.shape[0] + j = sids.shape[1] + varname = 'STARE_index'.format(nom_res=nom_res) + i_name = 'i' + j_name = 'j' + if nom_res: + varname += '_{nom_res}'.format(nom_res=nom_res) + i_name += '_{nom_res}'.format(nom_res=nom_res) + j_name += '_{nom_res}'.format(nom_res=nom_res) + with netCDF4.Dataset(self.file_path, 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + sids_netcdf = grp.createVariable(varname=varname, + datatype='u8', + dimensions=(i_name, j_name), + chunksizes=[i, j], + shuffle=self.shuffle, + zlib=self.zlib, + fill_value=fill_value) + sids_netcdf.long_name = 'SpatioTemporal Adaptive Resolution Encoding (STARE) index' + sids_netcdf[:, :] = sids + + def write_cover(self, cover, nom_res=None, group=None, fill_value=None): + l = cover.size + varname = 'STARE_cover' + l_name = 'l' + + with netCDF4.Dataset(self.file_path, 'a', format="NETCDF4") as rootgrp: + if group: + grp = rootgrp.createGroup(group) + else: + grp = rootgrp + # Only create the 'l' dimension if it does not already exist + if l_name not in grp.dimensions: + grp.createDimension(l_name, l) + cover_netcdf = grp.createVariable(varname=varname, + datatype='u8', + dimensions=(l_name), + chunksizes=[l], + shuffle=self.shuffle, + zlib=self.zlib, + fill_value=fill_value) + cover_netcdf.long_name = 'SpatioTemporal Adaptive Resolution Encoding (STARE) cover' + cover_netcdf[:] = cover class Granule: @@ -47,6 +190,52 @@ def guess_companion_path(self, prefix=None, folder=None): def add_sids(self, adapt_resolution=True): self.sids = pystare.from_latlon_2d(lat=self.lat, lon=self.lon, adapt_level=adapt_resolution) + def create_sidecar(self, n_workers=1, cover_res=None, out_path=None): + """Create a STARE sidecar file for this granule. + + Parameters + ---------- + n_workers : int, optional + Number of workers for parallel processing. Default is 1. + cover_res : int, optional + Resolution for the cover. If None, will be automatically determined. + out_path : str, optional + Output path for the sidecar file. If None, uses default naming convention. + + Returns + ------- + Sidecar + The created sidecar object. + """ + if self.lat is None or self.lon is None: + raise ValueError("Latitude and longitude data must be loaded before creating sidecar. Call read_latlon() first.") + + sidecar = Sidecar(self.file_path, out_path) + + # Generate STARE indices + sids = pystare.from_latlon_2d(lat=self.lat, lon=self.lon, adapt_level=True) + + if not cover_res: + cover_res = 10 # Use a fixed default cover resolution + # Clamp cover_res to [0, 27] + cover_res = max(0, min(27, cover_res)) + + sids_adapted = pystare.spatial_coerce_resolution(sids, cover_res) + cover_sids = numpy.unique(sids_adapted) + + i = self.lat.shape[0] + j = self.lat.shape[1] + l = cover_sids.size + + # Write dimensions and data + sidecar.write_dimensions(i, j, l, nom_res=self.nom_res) + sidecar.write_lons(self.lon, nom_res=self.nom_res) + sidecar.write_lats(self.lat, nom_res=self.nom_res) + sidecar.write_sids(sids, nom_res=self.nom_res) + sidecar.write_cover(cover_sids, nom_res=self.nom_res) + + return sidecar + def read_sidecar_index(self, sidecar_path=None): if sidecar_path is not None: scp = sidecar_path diff --git a/starepandas/io/granules/ssmis.py b/starepandas/io/granules/ssmis.py index 393fe93..5837d06 100644 --- a/starepandas/io/granules/ssmis.py +++ b/starepandas/io/granules/ssmis.py @@ -1,15 +1,16 @@ -from starepandas.io.granules.granule import Granule +from starepandas.io.granules.granule import Granule, Sidecar import starepandas.io.s3 import datetime import numpy import pystare +import os +import netCDF4 class SSMIS(Granule): - + def __init__(self, file_path, sidecar_path=None, scans=['S1', 'S2', 'S3', 'S4']): super().__init__(file_path, sidecar_path) - self.netcdf = starepandas.io.s3.nc4_dataset_wrapper(self.file_path, 'r', format='NETCDF4') self.lat = None self.lon = None self.timestamps = {} @@ -18,32 +19,171 @@ def __init__(self, file_path, sidecar_path=None, scans=['S1', 'S2', 'S3', 'S4']) self.nom_res = '' self.scans = scans self.header = {} + self.file_type = self._determine_file_type() + self.dataset = self._open_dataset() self.parse_header() + def _determine_file_type(self): + """Determine if the file is NetCDF4 or HDF5 based on file extension or content.""" + if self.file_path.lower().endswith('.h5') or self.file_path.lower().endswith('.hdf5'): + return 'hdf5' + elif self.file_path.lower().endswith('.nc') or self.file_path.lower().endswith('.nc4'): + return 'netcdf4' + else: + # Try to determine by attempting to open as HDF5 first, then NetCDF4 + try: + if 's3://' == self.file_path[0:5]: + s3 = starepandas.io.s3.parse_s3_url(self.file_path) + try: + import boto3 + s3_client = boto3.client('s3') + test_file = starepandas.io.s3.h5_dataset_from_s3( + s3_client, s3['bucket_name'], + s3['prefix'] + s3['prefix_end'] + s3['resource'] + ) + test_file.close() + return 'hdf5' + except ImportError: + raise ImportError("boto3 is required for S3 HDF5 file access") + else: + try: + import h5py + with h5py.File(self.file_path, 'r') as test_file: + pass + return 'hdf5' + except ImportError: + raise ImportError("h5py is required for HDF5 file access") + except Exception: + try: + if 's3://' == self.file_path[0:5]: + s3 = starepandas.io.s3.parse_s3_url(self.file_path) + try: + import boto3 + s3_client = boto3.client('s3') + test_file = starepandas.io.s3.nc4_dataset_from_s3( + s3_client, s3['bucket_name'], + s3['prefix'] + s3['prefix_end'] + s3['resource'] + ) + test_file.close() + return 'netcdf4' + except ImportError: + raise ImportError("boto3 is required for S3 NetCDF4 file access") + else: + try: + import netCDF4 + with netCDF4.Dataset(self.file_path, 'r') as test_file: + pass + return 'netcdf4' + except ImportError: + raise ImportError("netCDF4 is required for NetCDF4 file access") + except Exception: + # Default to NetCDF4 if we can't determine + return 'netcdf4' + + def _open_dataset(self): + """Open the dataset based on file type.""" + if self.file_type == 'hdf5': + if 's3://' == self.file_path[0:5]: + s3 = starepandas.io.s3.parse_s3_url(self.file_path) + try: + import boto3 + s3_client = boto3.client('s3') + return starepandas.io.s3.h5_dataset_from_s3( + s3_client, s3['bucket_name'], + s3['prefix'] + s3['prefix_end'] + s3['resource'] + ) + except ImportError: + raise ImportError("boto3 is required for S3 HDF5 file access") + else: + try: + import h5py + return h5py.File(self.file_path, 'r') + except ImportError: + raise ImportError("h5py is required for HDF5 file access") + else: # netcdf4 + return starepandas.io.s3.nc4_dataset_wrapper(self.file_path, 'r', format='NETCDF4') + def parse_header(self): self.header = {} - for h in self.netcdf.FileHeader.replace(';', '').strip().split('\n'): - key = h.split('=')[0] - value = h.split('=')[1] - self.header[key] = value - self.ts_start = self.header['StartGranuleDateTime'] - self.ts_end = self.header['StopGranuleDateTime'] + if self.file_type == 'hdf5': + # For HDF5, try to get header from attributes + if 'FileHeader' in self.dataset.attrs: + header_str = self.dataset.attrs['FileHeader'] + # Handle bytes vs string conversion for HDF5 + if isinstance(header_str, bytes): + header_str = header_str.decode('utf-8') + elif isinstance(header_str, numpy.ndarray): + header_str = str(header_str) + else: + # Try to find header in global attributes + header_str = "" + for key, value in self.dataset.attrs.items(): + if 'header' in key.lower() or 'metadata' in key.lower(): + if isinstance(value, bytes): + header_str = value.decode('utf-8') + else: + header_str = str(value) + break + + if not header_str: + # Create minimal header from available attributes + self.header = {} + for key, value in self.dataset.attrs.items(): + if isinstance(value, bytes): + self.header[key] = value.decode('utf-8') + else: + self.header[key] = str(value) + self.ts_start = self.header.get('StartGranuleDateTime', '') + self.ts_end = self.header.get('StopGranuleDateTime', '') + return + else: # netcdf4 + try: + header_str = self.dataset.FileHeader + except AttributeError: + # FileHeader attribute not found, create minimal header + self.header = {} + self.ts_start = '' + self.ts_end = '' + return + + # Parse header string + if header_str: + for h in header_str.replace(';', '').strip().split('\n'): + if '=' in h: + key = h.split('=')[0] + value = h.split('=')[1] + self.header[key] = value + self.ts_start = self.header.get('StartGranuleDateTime', '') + self.ts_end = self.header.get('StopGranuleDateTime', '') def read_latlon(self): self.lat = {} self.lon = {} for scan in self.scans: - self.lat[scan] = self.netcdf.groups[scan]['Latitude'][:].astype(numpy.double) - self.lon[scan] = self.netcdf.groups[scan]['Longitude'][:].astype(numpy.double) + if self.file_type == 'hdf5': + self.lat[scan] = self.dataset[scan]['Latitude'][:].astype(numpy.double) + self.lon[scan] = self.dataset[scan]['Longitude'][:].astype(numpy.double) + else: # netcdf4 + self.lat[scan] = self.dataset.groups[scan]['Latitude'][:].astype(numpy.double) + self.lon[scan] = self.dataset.groups[scan]['Longitude'][:].astype(numpy.double) def read_timestamp_scan(self, scan): - year = self.netcdf.groups[scan]['ScanTime']['Year'][:] - month = self.netcdf.groups[scan]['ScanTime']['Month'][:] - day = self.netcdf.groups[scan]['ScanTime']['DayOfMonth'][:] - hour = self.netcdf.groups[scan]['ScanTime']['Hour'][:] - minute = self.netcdf.groups[scan]['ScanTime']['Minute'][:] - second = self.netcdf.groups[scan]['ScanTime']['Second'][:] - millisecond = self.netcdf.groups[scan]['ScanTime']['MilliSecond'][:] + if self.file_type == 'hdf5': + year = self.dataset[scan]['ScanTime']['Year'][:] + month = self.dataset[scan]['ScanTime']['Month'][:] + day = self.dataset[scan]['ScanTime']['DayOfMonth'][:] + hour = self.dataset[scan]['ScanTime']['Hour'][:] + minute = self.dataset[scan]['ScanTime']['Minute'][:] + second = self.dataset[scan]['ScanTime']['Second'][:] + millisecond = self.dataset[scan]['ScanTime']['MilliSecond'][:] + else: # netcdf4 + year = self.dataset.groups[scan]['ScanTime']['Year'][:] + month = self.dataset.groups[scan]['ScanTime']['Month'][:] + day = self.dataset.groups[scan]['ScanTime']['DayOfMonth'][:] + hour = self.dataset.groups[scan]['ScanTime']['Hour'][:] + minute = self.dataset.groups[scan]['ScanTime']['Minute'][:] + second = self.dataset.groups[scan]['ScanTime']['Second'][:] + millisecond = self.dataset.groups[scan]['ScanTime']['MilliSecond'][:] timestamps = [] for d in zip(year, month, day, hour, minute, second, millisecond): @@ -55,7 +195,7 @@ def read_timestamp_scan(self, scan): else: timestamps.append(datetime.datetime(*d)) return numpy.array(timestamps) - + def read_timestamps(self): if 'S1' in self.scans: ts = self.read_timestamp_scan('S1') @@ -87,29 +227,51 @@ def read_timestamps(self): def read_data(self): if 'S1' in self.scans: - self.data['S1']['Tc1'] = self.netcdf.groups['S1']['Tc'][:, :, 0] - self.data['S1']['Tc2'] = self.netcdf.groups['S1']['Tc'][:, :, 1] - self.data['S1']['Tc3'] = self.netcdf.groups['S1']['Tc'][:, :, 2] + if self.file_type == 'hdf5': + self.data['S1']['Tc1'] = self.dataset['S1']['Tc'][:, :, 0] + self.data['S1']['Tc2'] = self.dataset['S1']['Tc'][:, :, 1] + self.data['S1']['Tc3'] = self.dataset['S1']['Tc'][:, :, 2] + else: # netcdf4 + self.data['S1']['Tc1'] = self.dataset.groups['S1']['Tc'][:, :, 0] + self.data['S1']['Tc2'] = self.dataset.groups['S1']['Tc'][:, :, 1] + self.data['S1']['Tc3'] = self.dataset.groups['S1']['Tc'][:, :, 2] if 'S2' in self.scans: - self.data['S2']['Tc1'] = self.netcdf.groups['S2']['Tc'][:, :, 0] - self.data['S2']['Tc2'] = self.netcdf.groups['S2']['Tc'][:, :, 1] + if self.file_type == 'hdf5': + self.data['S2']['Tc1'] = self.dataset['S2']['Tc'][:, :, 0] + self.data['S2']['Tc2'] = self.dataset['S2']['Tc'][:, :, 1] + else: # netcdf4 + self.data['S2']['Tc1'] = self.dataset.groups['S2']['Tc'][:, :, 0] + self.data['S2']['Tc2'] = self.dataset.groups['S2']['Tc'][:, :, 1] if 'S3' in self.scans: - self.data['S3']['Tc1'] = self.netcdf.groups['S3']['Tc'][:, :, 0] - self.data['S3']['Tc2'] = self.netcdf.groups['S3']['Tc'][:, :, 1] - self.data['S3']['Tc3'] = self.netcdf.groups['S3']['Tc'][:, :, 2] - self.data['S3']['Tc4'] = self.netcdf.groups['S3']['Tc'][:, :, 3] + if self.file_type == 'hdf5': + self.data['S3']['Tc1'] = self.dataset['S3']['Tc'][:, :, 0] + self.data['S3']['Tc2'] = self.dataset['S3']['Tc'][:, :, 1] + self.data['S3']['Tc3'] = self.dataset['S3']['Tc'][:, :, 2] + self.data['S3']['Tc4'] = self.dataset['S3']['Tc'][:, :, 3] + else: # netcdf4 + self.data['S3']['Tc1'] = self.dataset.groups['S3']['Tc'][:, :, 0] + self.data['S3']['Tc2'] = self.dataset.groups['S3']['Tc'][:, :, 1] + self.data['S3']['Tc3'] = self.dataset.groups['S3']['Tc'][:, :, 2] + self.data['S3']['Tc4'] = self.dataset.groups['S3']['Tc'][:, :, 3] if 'S4' in self.scans: - self.data['S4']['Tc1'] = self.netcdf.groups['S4']['Tc'][:, :, 0] - self.data['S4']['Tc2'] = self.netcdf.groups['S4']['Tc'][:, :, 1] + if self.file_type == 'hdf5': + self.data['S4']['Tc1'] = self.dataset['S4']['Tc'][:, :, 0] + self.data['S4']['Tc2'] = self.dataset['S4']['Tc'][:, :, 1] + else: # netcdf4 + self.data['S4']['Tc1'] = self.dataset.groups['S4']['Tc'][:, :, 0] + self.data['S4']['Tc2'] = self.dataset.groups['S4']['Tc'][:, :, 1] def add_sids(self, adapt_resolution=True): + if self.lat is None: + raise ValueError("Latitude and longitude data must be loaded before adding SIDs. Call read_latlon() first.") + self.sids = {} for scan in self.scans: - self.sids[scan] = pystare.from_latlon2D(lat=self.lat[scan], lon=self.lon[scan], - adapt_resolution=adapt_resolution) + self.sids[scan] = pystare.from_latlon_2d(lat=self.lat[scan], lon=self.lon[scan], + adapt_level=adapt_resolution) def read_sidecar_latlon(self, sidecar_path=None): self.lat = {} @@ -165,4 +327,76 @@ def to_df(self, xy=False): dfs[scan] = df return dfs + def close(self): + """Close the dataset file.""" + if hasattr(self, 'dataset') and self.dataset is not None: + self.dataset.close() + + def create_sidecar(self, n_workers=1, cover_res=None, out_path=None): + """Create a STARE sidecar file for this SSMIS granule. + + Parameters + ---------- + n_workers : int, optional + Number of workers for parallel processing. Default is 1. + cover_res : int, optional + Resolution for the cover. If None, will be automatically determined. + out_path : str, optional + Output path for the sidecar file. If None, uses default naming convention. + + Returns + ------- + Sidecar + The created sidecar object. + """ + if self.lat is None or self.lon is None: + raise ValueError("Latitude and longitude data must be loaded before creating sidecar. Call read_latlon() first.") + + sidecar = Sidecar(self.file_path, out_path) + + cover_all = [] + for scan in self.scans: + lons = self.lon[scan] + lats = self.lat[scan] + sids = pystare.from_latlon_2d(lat=lats, lon=lons, adapt_level=True) + + if not cover_res: + cover_res = 10 # Use a fixed default cover resolution + # Clamp cover_res to [0, 27] + cover_res = max(0, min(27, cover_res)) + + sids_adapted = pystare.spatial_coerce_resolution(sids, cover_res) + cover_sids = numpy.unique(sids_adapted) + + cover_all.append(cover_sids) + + i = lats.shape[0] + j = lats.shape[1] + l = cover_sids.size + + nom_res = None + + sidecar.write_dimensions(i, j, l, nom_res=nom_res, group=scan) + sidecar.write_lons(lons, nom_res=nom_res, group=scan) + sidecar.write_lats(lats, nom_res=nom_res, group=scan) + sidecar.write_sids(sids, nom_res=nom_res, group=scan) + sidecar.write_cover(cover_sids, nom_res=nom_res, group=scan) + + cover_all = numpy.concatenate(cover_all) + cover_all = numpy.unique(cover_all) + + # Only create the 'l' dimension if it does not already exist + with netCDF4.Dataset(sidecar.file_path, 'a', format='NETCDF4') as ncfile: + if 'l' not in ncfile.dimensions: + sidecar.write_dimension('l', cover_all.size) + sidecar.write_cover(cover_all, nom_res=nom_res) + + return sidecar + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + diff --git a/starepandas/staredataframe.py b/starepandas/staredataframe.py index f23c64b..408aca5 100644 --- a/starepandas/staredataframe.py +++ b/starepandas/staredataframe.py @@ -2,7 +2,7 @@ import geopandas.plotting import pystare import pandas -import numpy +import numpy as np import starepandas import netCDF4 import starepandas.tools.trixel_conversions @@ -10,6 +10,11 @@ import starepandas.io.pod import multiprocessing import pickle +import zarr +import s3fs +import os +import json +import datetime import logging import time @@ -17,6 +22,236 @@ from pathlib import Path +_AWS_S3_STORAGE_OPTIONS = {} +_AWS_RDS_OPTIONS = {} + +def aws_configure(key=None, secret=None, token=None, region_name=None, endpoint_url=None, client_kwargs=None, + rds=None, db_host=None, db_port=None, db_username=None, db_password=None, db_database=None, + **s3fs_kwargs): + """ + Configure default AWS/S3 options for zarr S3 helpers and optional RDS Postgres metadata store. + + Parameters + - key: AWS access key id + - secret: AWS secret access key + - token: AWS session token (optional) + - region_name: AWS region (e.g., "us-west-2") + - endpoint_url: Custom S3-compatible endpoint (optional) + - client_kwargs: dict to merge into s3fs client_kwargs + - rds: dict with keys {host, port, username, password, database} for RDS connection + - db_host/db_port/db_username/db_password/db_database: overrides for rds dict + - **s3fs_kwargs: any additional s3fs.S3FileSystem kwargs + + Notes + - These options are used by to_zarr_s3/from_zarr_s3 when storage_options is not provided. + - You can pass a ready-made 'client_kwargs' dict or individual fields like 'region_name'/'endpoint_url'. + """ + global _AWS_S3_STORAGE_OPTIONS, _AWS_RDS_OPTIONS + options = dict(s3fs_kwargs) if s3fs_kwargs else {} + if key is not None: + options['key'] = key + if secret is not None: + options['secret'] = secret + if token is not None: + options['token'] = token + + ck = dict(client_kwargs) if client_kwargs else {} + if region_name is not None: + ck['region_name'] = region_name + if endpoint_url is not None: + ck['endpoint_url'] = endpoint_url + if ck: + options['client_kwargs'] = ck + + _AWS_S3_STORAGE_OPTIONS = options + + # Configure RDS/PostgreSQL connection options + rds_opts = {} + if isinstance(rds, dict): + rds_opts.update(rds) + if db_host is not None: + rds_opts['host'] = db_host + if db_port is not None: + rds_opts['port'] = int(db_port) + if db_username is not None: + rds_opts['username'] = db_username + if db_password is not None: + rds_opts['password'] = db_password + if db_database is not None: + rds_opts['database'] = db_database + + if rds_opts: + _AWS_RDS_OPTIONS = rds_opts + return _AWS_S3_STORAGE_OPTIONS + +def load_aws_configure(config_path): + """ + Load AWS/S3 configuration from a JSON file and set defaults for zarr S3 helpers. + + The JSON may contain either s3fs-style keys (key, secret, token, client_kwargs) + or AWS-style keys (aws_access_key_id, aws_secret_access_key, aws_session_token, region_name, endpoint_url). + + It may also include an 'rds' block with {host, port, username, password, database}, or top-level aliases. + """ + with open(config_path, 'r') as f: + data = json.load(f) + + key = data.get('key') or data.get('aws_access_key_id') + secret = data.get('secret') or data.get('aws_secret_access_key') + token = data.get('token') or data.get('aws_session_token') + + client_kwargs = data.get('client_kwargs', {}) + region_name = data.get('region_name') or data.get('region') + endpoint_url = data.get('endpoint_url') + + rds_block = data.get('rds') or {} + for k in ['host', 'port', 'username', 'password', 'database']: + if k in data and k not in rds_block: + rds_block[k] = data[k] + + return aws_configure( + key=key, + secret=secret, + token=token, + region_name=region_name, + endpoint_url=endpoint_url, + client_kwargs=client_kwargs, + rds=rds_block, + **{k: v for k, v in data.items() if k not in { + 'key', 'secret', 'token', 'client_kwargs', + 'aws_access_key_id', 'aws_secret_access_key', 'aws_session_token', + 'region', 'region_name', 'endpoint_url', 'rds', + 'host', 'port', 'username', 'password', 'database' + }} + ) + +def _load_config_from_default_locations() -> bool: + """Try to load configuration from default file locations. + + Order of precedence: + - Env var STAREPANDAS_AWS_CONFIG + - ./.config (current working directory) + - /.config (project root next to this module) + - ~/.starepandas_aws_config.json + - ~/.starepandas/aws.json + Returns True if successfully loaded, else False. + """ + candidates = [] + env_path = os.environ.get('STAREPANDAS_AWS_CONFIG') + if env_path: + candidates.append(env_path) + candidates.append(os.path.join(os.getcwd(), '.config')) + candidates.append(str(Path(__file__).resolve().parents[1] / '.config')) + candidates.append(os.path.join(Path.home(), '.starepandas_aws_config.json')) + candidates.append(os.path.join(Path.home(), '.starepandas', 'aws.json')) + + for cfg in candidates: + try: + if os.path.isfile(cfg): + # Support simple .config key=value pairs as well as JSON + try: + with open(cfg, 'r') as f: + txt = f.read().strip() + if txt.startswith('{'): + load_aws_configure(cfg) + else: + # Parse simple key=value lines + kv = {} + for line in txt.splitlines(): + line = line.strip() + if not line or line.startswith('#'): + continue + if '=' in line: + k, v = line.split('=', 1) + kv[k.strip()] = v.strip() + # Map to expected fields + rds_block = { + 'host': kv.get('rds_host') or kv.get('host'), + 'port': int(kv.get('port', '5432')), + 'username': kv.get('username') or kv.get('user'), + 'password': kv.get('password'), + 'database': kv.get('database') or 'postgres' + } + aws_configure( + key=kv.get('key'), + secret=kv.get('secret'), + region_name=kv.get('region_name') or kv.get('region'), + rds=rds_block + ) + except Exception: + # Fallback to JSON loader if parsing failed unexpectedly + load_aws_configure(cfg) + return True + except Exception: + continue + return False + +def _ensure_rds_db_and_table(target_dbname='StarePodsMetadata'): + """ + Ensure the RDS Postgres database and table exist, and return a connection to the target DB. + Expects _AWS_RDS_OPTIONS with keys: host, port, username, password, database (admin DB to connect first). + """ + if not _AWS_RDS_OPTIONS: + # Attempt to auto-load default config file if available + _load_config_from_default_locations() + if not _AWS_RDS_OPTIONS: + raise ValueError( + "Missing RDS configuration. Call load_aws_configure(config_path) or aws_configure(..., rds=...) " + "to set RDS connection parameters." + ) + + try: + import psycopg2 + from psycopg2 import sql + except ImportError as e: + raise ImportError("psycopg2 is required for RDS metadata operations. Install 'psycopg2-binary'.") from e + + host = _AWS_RDS_OPTIONS.get('host') + port = int(_AWS_RDS_OPTIONS.get('port', 5432)) + user = _AWS_RDS_OPTIONS.get('username') or _AWS_RDS_OPTIONS.get('user') + password = _AWS_RDS_OPTIONS.get('password') + admin_db = _AWS_RDS_OPTIONS.get('database') or 'postgres' + + if not all([host, user, password]): + raise ValueError("RDS configuration incomplete: require host, username, password (and optionally port, database).") + + # Connect to admin DB to ensure target DB exists + admin_conn = psycopg2.connect(host=host, port=port, user=user, password=password, dbname=admin_db) + admin_conn.set_session(autocommit=True) + try: + with admin_conn.cursor() as cur: + cur.execute("SELECT 1 FROM pg_database WHERE datname=%s", (target_dbname,)) + exists = cur.fetchone() is not None + if not exists: + cur.execute(sql.SQL("CREATE DATABASE {} ").format(sql.Identifier(target_dbname))) + finally: + admin_conn.close() + + # Connect to target DB and ensure table + conn = psycopg2.connect(host=host, port=port, user=user, password=password, dbname=target_dbname) + with conn.cursor() as cur: + cur.execute( + """ + CREATE TABLE IF NOT EXISTS "PodsMetadata" ( + "Dataset" TEXT, + "DataLevel" TEXT, + "RawData Collected Time" TIMESTAMP, + grouped_id INTEGER, + "S3 bucket" TEXT, + "Resolution level" INTEGER, + "MetadataJson" JSONB + ) + """ + ) + conn.commit() + return conn + +def _parse_s3_bucket(s3_path: str) -> str: + if not s3_path.startswith('s3://'): + return '' + rest = s3_path[5:] + return rest.split('/', 1)[0] if '/' in rest else rest + DEFAULT_SID_COLUMN_NAME = 'sids' DEFAULT_TID_COLUMN_NAME = 'tids' DEFAULT_TRIXEL_COLUMN_NAME = 'trixels' @@ -24,10 +259,10 @@ def compress_sids_group(group): sids = group[1].to_numpy() # zero element is group label, 1 element is the df - if sids.dtype == numpy.dtype('O'): + if sids.dtype == np.dtype('O'): # If we receive a series of SID collections we merge all sids into a single 1D array # to_numpy() would have produced an array of lists in this case - sids = numpy.concatenate(sids) + sids = np.concatenate(sids) sids = starepandas.compress_sids(sids) return tuple([group[0], sids]) @@ -222,11 +457,11 @@ def drop_na_sids(self, inplace=False): """Drop all rows that have NA values for the SIDs and cast the column to numpy.int64 """ if inplace: self.dropna(subset=[self._sid_column_name], inplace=inplace) - self[self._sid_column_name] = self[self._sid_column_name].astype(numpy.dtype('int64')) + self[self._sid_column_name] = self[self._sid_column_name].astype(np.dtype('int64')) else: frame = self.__deepcopy__() frame = frame.dropna(subset=[frame._sid_column_name], inplace=inplace) - frame[frame._sid_column_name] = frame[frame._sid_column_name].astype(numpy.dtype('int64')) + frame[frame._sid_column_name] = frame[frame._sid_column_name].astype(np.dtype('int64')) return frame def make_tids(self, column='ts_start', end_column=None, forward_res=48, reverse_res=48): @@ -297,7 +532,7 @@ def set_sids(self, col, inplace=False): else: frame = self.__deepcopy__() - if isinstance(col, (list, numpy.ndarray, pandas.Series)): + if isinstance(col, (list, np.ndarray, pandas.Series)): frame[frame._sid_column_name] = col elif hasattr(col, "ndim") and col.ndim != 1: raise ValueError("Must pass array with one dimension only.") @@ -340,7 +575,7 @@ def set_tids(self, col, inplace=False): else: frame = self.__deepcopy__() - if isinstance(col, (list, numpy.ndarray, pandas.Series)): + if isinstance(col, (list, np.ndarray, pandas.Series)): frame[frame._tid_column_name] = col elif hasattr(col, "ndim") and col.ndim != 1: raise ValueError("Must pass array with one dimension only.") @@ -438,7 +673,7 @@ def set_trixels(self, col, inplace=False): else: frame = self.__deepcopy__() - if isinstance(col, (pandas.Series, geopandas.GeoSeries, list, numpy.ndarray)): + if isinstance(col, (pandas.Series, geopandas.GeoSeries, list, np.ndarray)): col = geopandas.geodataframe._ensure_geometry(col) frame[frame._trixel_column_name] = col elif isinstance(col, str) and col in self.columns: @@ -466,7 +701,7 @@ def trixel_vertices(self): Examples --------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_vertices() (array([29.9999996 , 45.00000069, 29.9999996 ]), array([-170.26439001, -45. , 80.26439001]), array([80.264389]), array([135.])) @@ -491,7 +726,7 @@ def trixel_centers(self, vertices=None): Examples --------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_centers() array([[134.9 , 80.264389]]) @@ -520,7 +755,7 @@ def trixel_centers_ecef(self, vertices=None): Examples --------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_centers_ecef() array([[-0.11957316, 0.11957316, 0.98559856]]) @@ -548,7 +783,7 @@ def trixel_centerpoints(self, vertices=None): Examples --------- - # >>> sids = numpy.array([4458764513820540928]) + # >>> sids = np.array([4458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> centers = df.trixel_centerpoints() # >>> print(centers[0]) @@ -582,7 +817,7 @@ def trixel_corners(self, vertices=None, from_trixels=False): Examples ---------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_corners() array([[[-170.26439001, 29.9999996 ], @@ -620,7 +855,7 @@ def trixel_corners_ecef(self, vertices=None): Examples ---------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_corners_ecef() array([[[-0.85355339, -0.14644661, 0.49999999], @@ -650,7 +885,7 @@ def trixel_grings(self, vertices=None): Examples ---------- - # >>> sids = numpy.array([3458764513820540928]) + # >>> sids = np.array([3458764513820540928]) # >>> df = starepandas.STAREDataFrame(sids=sids) # >>> df.trixel_grings() array([[[ 0.14644661, 0.85355339, 0.49999999], @@ -767,10 +1002,10 @@ def stare_intersects(self, other, method='binsearch', n_partitions=1, num_worker dtype: bool """ - if isinstance(other, (int, numpy.int64)): + if isinstance(other, (int, np.int64)): # Other is a single STARE index value other = [other] - elif isinstance(other, (numpy.ndarray, list)): + elif isinstance(other, (np.ndarray, list)): # Other is a collection/set of STARE index values pass else: @@ -873,10 +1108,10 @@ def stare_dissolve(self, by=None, num_workers=1, geom=False, aggfunc="first", ** """ if by is None: sids = self[self._sid_column_name].to_numpy() - if sids.dtype == numpy.dtype('O'): + if sids.dtype == np.dtype('O'): # If we receive a series of SID collections we merge all sids into a single 1D array # to_numpy() would have produced an array of lists in this case - sids = numpy.concatenate(sids) + sids = np.concatenate(sids) sids = starepandas.compress_sids(sids) return sids else: @@ -965,12 +1200,12 @@ def to_sids_level(self, level, inplace=False, clear_to_level=False): sids = df[df._sid_column_name] if pandas.api.types.is_integer_dtype(sids): # We have column of single SIDs and can send whole column to pystare - sids = sids.astype(numpy.dtype('int64')) + sids = sids.astype(np.dtype('int64')) sids = pystare.spatial_coerce_resolution(sids, level) if clear_to_level: # pystare_terminator_mask uses << operator, which requires us to cast to numpy array first - sids = pystare.spatial_clear_to_resolution(numpy.array(sids)) + sids = pystare.spatial_clear_to_resolution(np.array(sids)) else: pass @@ -1004,7 +1239,7 @@ def clear_to_level(self, inplace=False): df = self.__deepcopy__() sids = df[df._sid_column_name] - sids = pystare.spatial_clear_to_resolution(numpy.array(sids)) + sids = pystare.spatial_clear_to_resolution(np.array(sids)) df[df._sid_column_name] = sids if not inplace: @@ -1382,6 +1617,411 @@ def to_arrays(self, shape=None, pivot=False): return arrays + def to_zarr_s3(self, s3_path, level, chunk_size=250000, storage_options=None, + dataset=None, data_level=None, raw_collected_time=None, metadata=None): + """ + Partition STAREDataFrame by SIDs at specified level and write to S3 in grouped layout. + + Layout: s3_path//[one array per column + __row_positions__] + + Parameters + ---------- + s3_path : str + S3 path where the zarr root directory will be created (e.g., "s3://bucket/granule_name") + level : int + STARE level for partitioning SIDs + chunk_size : int, optional + Size of chunks for zarr arrays (default: 250000) + storage_options : dict, optional + S3 storage options including credentials and region + dataset : str, optional + Dataset name to record in metadata table + data_level : str, optional + Data level string to record in metadata table + raw_collected_time : datetime, optional + Timestamp when raw data was collected; defaults to UTC now if not provided + metadata : dict, optional + Additional metadata to store in the JSON field + + Returns + ------- + str + The S3 path where data was written + """ + # Resolve storage options: use per-call options over configured defaults + merged_opts = dict(_AWS_S3_STORAGE_OPTIONS) + if not merged_opts: + # Attempt to auto-load default config file if available + _load_config_from_default_locations() + merged_opts = dict(_AWS_S3_STORAGE_OPTIONS) + if storage_options: + merged_opts.update(storage_options) + if not merged_opts: + raise ValueError( + "Missing S3 configuration. Call load_aws_configure(config_path) or aws_configure(...) " + "to set credentials/region, or pass storage_options to to_zarr_s3." + ) + + # Ensure grouping by coerced SIDs and preserve encounter order of groups + coerced = self.to_sids_level(level=level, clear_to_level=True) + grouped = self.groupby(coerced[self._sid_column_name], sort=False) + + # Record original row order so we can reconstruct the exact order on read + original_positions = pandas.Series(np.arange(len(self), dtype=np.int64), index=self.index) + + # Prepare RDS connection and metadata defaults + conn = _ensure_rds_db_and_table('StarePodsMetadata') + bucket_name = _parse_s3_bucket(s3_path) + ts = raw_collected_time if raw_collected_time is not None else datetime.datetime.utcnow() + base_meta = dict(metadata or {}) + + # Write each group to its own zarr group under s3_path/ + for group_id, gdf in grouped: + # Skip invalid groups if any + if isinstance(group_id, (int, np.integer)) and group_id < 0: + continue + + group_path = f"{s3_path}/{group_id}" + zg = zarr.open_group(group_path, mode="w", storage_options=merged_opts) + + # Per-group arrays for each column + for col in self.columns: + values = gdf[col].to_numpy() + if values.dtype == np.dtype('O'): + values = values.astype('U') + zg.empty(name=col, shape=(len(values),), dtype=values.dtype, chunks=(min(chunk_size, max(1, len(values))),))[:] = values + + # Hidden helper to preserve original order + row_pos = original_positions.loc[gdf.index].to_numpy(dtype=np.int64) + zg.empty(name="__row_positions__", shape=(len(row_pos),), dtype=row_pos.dtype, chunks=(min(chunk_size, max(1, len(row_pos))),))[:] = row_pos + + # Insert metadata row into RDS for this group + try: + # best-effort to fit grouped_id into 4-byte signed int, also store full in json + full_gid = int(group_id) + gid32 = full_gid & 0x7fffffff + meta_row = dict(base_meta) + meta_row.update({ + 'grouped_id_full': full_gid, + 'group_path': group_path, + 'num_rows': int(len(gdf)), + 'columns': list(self.columns), + }) + with conn.cursor() as cur: + cur.execute( + 'INSERT INTO "PodsMetadata" ("Dataset", "DataLevel", "RawData Collected Time", grouped_id, "S3 bucket", "Resolution level", "MetadataJson")\ + VALUES (%s, %s, %s, %s, %s, %s, %s)', + ( + dataset, data_level, ts, gid32, bucket_name, int(level), json.dumps(meta_row) + ) + ) + except Exception: + # Do not fail writing data due to metadata issues + pass + + try: + conn.commit() + except Exception: + pass + finally: + try: + conn.close() + except Exception: + pass + + return s3_path + + def to_zarr_local(self, local_path, level, chunk_size=250000): + """ + Partition STAREDataFrame by SIDs at specified level and write to local storage in grouped layout. + + Layout: local_path//[one array per column + __row_positions__] + + Parameters + ---------- + local_path : str + Local path where the zarr root directory will be created + level : int + STARE level for partitioning SIDs + chunk_size : int, optional + Size of chunks for zarr arrays (default: 250000) + + Returns + ------- + str + The local path where data was written + """ + # Ensure root directory exists + os.makedirs(local_path, exist_ok=True) + + # Group by SIDs at the specified level, preserving encounter order + coerced = self.to_sids_level(level=level, clear_to_level=True) + grouped = self.groupby(coerced[self._sid_column_name], sort=False) + + # Record original row order + original_positions = pandas.Series(np.arange(len(self), dtype=np.int64), index=self.index) + + # Write each group to its own zarr group under local_path/ + for group_id, gdf in grouped: + if isinstance(group_id, (int, np.integer)) and group_id < 0: + continue + + group_dir = os.path.join(local_path, str(group_id)) + zg = zarr.open_group(group_dir, mode="w") + + for col in self.columns: + values = gdf[col].to_numpy() + if values.dtype == np.dtype('O'): + values = values.astype('U') + zg.empty(name=col, shape=(len(values),), dtype=values.dtype, chunks=(min(chunk_size, max(1, len(values))),))[:] = values + + row_pos = original_positions.loc[gdf.index].to_numpy(dtype=np.int64) + zg.empty(name="__row_positions__", shape=(len(row_pos),), dtype=row_pos.dtype, chunks=(min(chunk_size, max(1, len(row_pos))),))[:] = row_pos + + return local_path + + @classmethod + def from_zarr_s3(cls, s3_path, storage_options=None): + """ + Read STAREDataFrame from S3 grouped zarr store written by to_zarr_s3. + + Parameters + ---------- + s3_path : str + S3 path to the zarr root directory + storage_options : dict, optional + S3 storage options including credentials and region + + Returns + ------- + STAREDataFrame + The reconstructed STAREDataFrame in original row order + """ + merged_opts = dict(_AWS_S3_STORAGE_OPTIONS) + if not merged_opts: + _load_config_from_default_locations() + merged_opts = dict(_AWS_S3_STORAGE_OPTIONS) + if storage_options: + merged_opts.update(storage_options) + if not merged_opts: + raise ValueError( + "Missing S3 configuration. Call load_aws_configure(config_path) or aws_configure(...) " + "to set credentials/region, or pass storage_options to from_zarr_s3." + ) + fs = s3fs.S3FileSystem(**merged_opts) + + # Discover immediate child prefixes that are zarr groups (contain .zgroup) + try: + entries = fs.ls(s3_path) + except Exception: + entries = [] + + group_dirs = [] + for entry in entries: + # Ensure path is a directory-like and has a .zgroup + candidate = entry.rstrip('/') + if fs.exists(candidate + '/.zgroup'): + group_dirs.append(candidate) + + # Read each group's arrays into a DataFrame and collect + frames = [] + for gpath in group_dirs: + zg = zarr.open_group(gpath, mode="r", storage_options=merged_opts) + cols = [name for name in zg.array_keys() if name != "__row_positions__"] + data = {} + for name in cols: + arr = zg[name][:] + if arr.dtype.kind == 'U': + arr = arr.astype('O') + data[name] = arr + df_part = pandas.DataFrame(data) + row_pos = zg["__row_positions__"][:].astype(np.int64) + df_part["__row_pos__"] = row_pos + frames.append(df_part) + + if not frames: + return cls() + + df = pandas.concat(frames, ignore_index=True) + df.sort_values("__row_pos__", inplace=True) + df.drop(columns=["__row_pos__"], inplace=True) + + return cls(df) + + @classmethod + def from_zarr_local(cls, local_path): + """ + Read STAREDataFrame from local grouped zarr store written by to_zarr_local. + + Parameters + ---------- + local_path : str + Local path to the zarr root directory + + Returns + ------- + STAREDataFrame + The reconstructed STAREDataFrame in original row order + """ + if not os.path.isdir(local_path): + # Nothing to read + return cls() + + # Discover child directories that contain a .zgroup file + group_dirs = [] + for name in os.listdir(local_path): + candidate = os.path.join(local_path, name) + if os.path.isdir(candidate) and os.path.exists(os.path.join(candidate, '.zgroup')): + group_dirs.append(candidate) + + frames = [] + for gdir in group_dirs: + zg = zarr.open_group(gdir, mode="r") + cols = [n for n in zg.array_keys() if n != "__row_positions__"] + data = {} + for n in cols: + arr = zg[n][:] + if arr.dtype.kind == 'U': + arr = arr.astype('O') + data[n] = arr + df_part = pandas.DataFrame(data) + row_pos = zg["__row_positions__"][:].astype(np.int64) + df_part["__row_pos__"] = row_pos + frames.append(df_part) + + if not frames: + return cls() + + df = pandas.concat(frames, ignore_index=True) + df.sort_values("__row_pos__", inplace=True) + df.drop(columns=["__row_pos__"], inplace=True) + + return cls(df) + + def to_pickle_s3(self, s3_path, storage_options=None, compress=None): + """ + Write STAREDataFrame to S3 as pickle file. + + Parameters + ---------- + s3_path : str + S3 path where the pickle file will be written (e.g., "s3://bucket/granule_name.pkl") + storage_options : dict, optional + S3 storage options including credentials and region + compress : str, optional + Compression method ('bz2' or None) + + Returns + ------- + str + The S3 path where data was written + """ + import s3fs + + # Create S3 filesystem + fs = s3fs.S3FileSystem(**storage_options or {}) + + # Write pickle to S3 + with fs.open(s3_path, 'wb') as f: + if compress == 'bz2': + import bz2 + with bz2.open(f, 'wb') as bz2f: + pickle.dump(self, bz2f) + else: + pickle.dump(self, f) + + return s3_path + + def to_pickle_local(self, local_path, compress=None): + """ + Write STAREDataFrame to local storage as pickle file. + + Parameters + ---------- + local_path : str + Local path where the pickle file will be written + compress : str, optional + Compression method ('bz2' or None) + + Returns + ------- + str + The local path where data was written + """ + # Write pickle to local filesystem + if compress == 'bz2': + import bz2 + with bz2.open(local_path, 'wb') as f: + pickle.dump(self, f) + else: + with open(local_path, 'wb') as f: + pickle.dump(self, f) + + return local_path + + @classmethod + def from_pickle_s3(cls, s3_path, storage_options=None, compress=None): + """ + Read STAREDataFrame from S3 pickle file. + + Parameters + ---------- + s3_path : str + S3 path to the pickle file + storage_options : dict, optional + S3 storage options including credentials and region + compress : str, optional + Compression method ('bz2' or None) + + Returns + ------- + STAREDataFrame + The reconstructed STAREDataFrame + """ + import s3fs + + # Create S3 filesystem + fs = s3fs.S3FileSystem(**storage_options or {}) + + # Read pickle from S3 + with fs.open(s3_path, 'rb') as f: + if compress == 'bz2': + import bz2 + with bz2.open(f, 'rb') as bz2f: + df = pickle.load(bz2f) + else: + df = pickle.load(f) + + return df + + @classmethod + def from_pickle_local(cls, local_path, compress=None): + """ + Read STAREDataFrame from local pickle file. + + Parameters + ---------- + local_path : str + Local path to the pickle file + compress : str, optional + Compression method ('bz2' or None) + + Returns + ------- + STAREDataFrame + The reconstructed STAREDataFrame + """ + # Read pickle from local filesystem + if compress == 'bz2': + import bz2 + with bz2.open(local_path, 'rb') as f: + df = pickle.load(f) + else: + with open(local_path, 'rb') as f: + df = pickle.load(f) + + return df + def to_sidecar(self, file_name, cover=False, shuffle=True, zlib=True): """ Writes STARE Sidecar @@ -1438,7 +2078,6 @@ def to_postgis(self, name, con, schema=None, if_exists="fail", index=False, inde """ starepandas.io.postgis.write(gdf=self, engine=con, table_name=name) - def _dataframe_set_sids(self, col, inplace=False): # We create a function here so that we can take conventional DataFrames and convert them to sdfs if inplace: @@ -1447,6 +2086,5 @@ def _dataframe_set_sids(self, col, inplace=False): # this will copy so that BlockManager gets copied return sdf.set_sids(col, inplace=False) - geopandas.GeoDataFrame.set_sids = _dataframe_set_sids pandas.DataFrame.set_sids = _dataframe_set_sids diff --git a/tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5 b/tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5 new file mode 100644 index 0000000..fe49635 Binary files /dev/null and b/tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5 differ diff --git a/tests/test_create_sidecar.py b/tests/test_create_sidecar.py new file mode 100644 index 0000000..1eceec5 --- /dev/null +++ b/tests/test_create_sidecar.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +""" +Test script for create_sidecar functionality. +This script tests the create_sidecar method for SSMIS granules. +""" + +import os +import sys + +# Add the current directory to the path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from starepandas.io.granules.ssmis import SSMIS + + +def test_create_sidecar(): + """Test creating a sidecar file for SSMIS.""" + print("=== Testing SSMIS create_sidecar Functionality ===") + + file_path = "tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5" + + if not os.path.exists(file_path): + print(f"File not found: {file_path}") + return False + + try: + with SSMIS(file_path, scans=['S1', 'S2']) as ssmis: + print(f"✓ File type detected: {ssmis.file_type}") + print(f"✓ Available scans: {ssmis.scans}") + + # Read latitude and longitude + ssmis.read_latlon() + print(f"✓ Latitude shape for S1: {ssmis.lat['S1'].shape}") + print(f"✓ Longitude shape for S1: {ssmis.lon['S1'].shape}") + + # Create sidecar file + print("Creating sidecar file...") + sidecar = ssmis.create_sidecar(n_workers=1, out_path="test_output/") + + print(f"✓ Sidecar file created: {sidecar.file_path}") + + # Check if the sidecar file exists + if os.path.exists(sidecar.file_path): + print(f"✓ Sidecar file exists on disk") + + # Check file size + file_size = os.path.getsize(sidecar.file_path) + print(f"✓ Sidecar file size: {file_size} bytes") + + # Clean up + os.remove(sidecar.file_path) + print("✓ Test file cleaned up") + + return True + else: + print(f"✗ Sidecar file was not created") + return False + + except Exception as e: + print(f"✗ Error creating sidecar: {e}") + return False + + +def test_create_sidecar_without_latlon(): + """Test that create_sidecar fails when lat/lon not loaded.""" + print("\n=== Testing create_sidecar Error Handling ===") + + file_path = "tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5" + + if not os.path.exists(file_path): + print(f"File not found: {file_path}") + return False + + try: + with SSMIS(file_path, scans=['S1']) as ssmis: + # Try to create sidecar without reading lat/lon + print("Attempting to create sidecar without reading lat/lon...") + sidecar = ssmis.create_sidecar() + print("✗ Should have raised an error") + return False + + except ValueError as e: + if "Latitude and longitude data must be loaded" in str(e): + print("✓ Correctly raised ValueError for missing lat/lon data") + return True + else: + print(f"✗ Unexpected error: {e}") + return False + except Exception as e: + print(f"✗ Unexpected error: {e}") + return False + + +def main(): + """Run all tests.""" + print("SSMIS create_sidecar Functionality Test") + print("=" * 50) + + # Create test output directory + os.makedirs("test_output", exist_ok=True) + + # Run tests + sidecar_success = test_create_sidecar() + error_handling_success = test_create_sidecar_without_latlon() + + print("\n" + "=" * 50) + print("Test Summary:") + print(f"create_sidecar Functionality: {'✓ PASS' if sidecar_success else '✗ FAIL'}") + print(f"Error Handling: {'✓ PASS' if error_handling_success else '✗ FAIL'}") + + if sidecar_success and error_handling_success: + print("\n🎉 All tests passed! create_sidecar functionality is working correctly.") + else: + print("\n❌ Some tests failed. Please check the error messages above.") + + # Clean up test directory + try: + os.rmdir("test_output") + except: + pass + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/test_pickle_functions.py b/tests/test_pickle_functions.py new file mode 100644 index 0000000..f728043 --- /dev/null +++ b/tests/test_pickle_functions.py @@ -0,0 +1,160 @@ +import os +import tempfile +import shutil +import numpy as np +import pandas as pd +import starepandas as sp +from shapely.geometry import Point +import pytest + +def test_pickle_local_functions(): + """Test pickle local storage functions""" + + # Create a test STAREDataFrame with geometry and SIDs + data = { + 'lat': [0, 1, 2, 3, 4], + 'lon': [0, 1, 2, 3, 4], + 'data': [1, 2, 3, 4, 5], + 'category': ['A', 'B', 'A', 'B', 'A'] + } + + sdf = sp.STAREDataFrame(data) + + # Add geometry column + sdf['geometry'] = [Point(lon, lat) for lon, lat in zip(sdf['lon'], sdf['lat'])] + sdf = sdf.set_geometry('geometry') + + # Create SIDs + sids = sdf.make_sids(level=10) + sdf['sids'] = sids + + print(f"Original STAREDataFrame shape: {sdf.shape}") + print(f"Columns: {sdf.columns.tolist()}") + print(f"Has SIDs: {sdf.has_sids()}") + + # Create temporary directory for testing + with tempfile.TemporaryDirectory() as temp_dir: + local_path = os.path.join(temp_dir, "test_granule.pkl") + + print(f"Testing to_pickle_local...") + # Test writing to local pickle + written_path = sdf.to_pickle_local(local_path) + print(f"Written to: {written_path}") + + print(f"Testing from_pickle_local...") + # Test reading from local pickle + sdf_read = sp.STAREDataFrame.from_pickle_local(local_path) + print(f"Read STAREDataFrame shape: {sdf_read.shape}") + print(f"Read columns: {sdf_read.columns.tolist()}") + + # Verify data integrity + assert sdf.shape == sdf_read.shape, f"Shape mismatch: {sdf.shape} != {sdf_read.shape}" + assert set(sdf.columns) == set(sdf_read.columns), f"Column mismatch: {sdf.columns} != {sdf_read.columns}" + + # Check that data values match for each column by name + for col in sdf.columns: + assert sdf[col].equals(sdf_read[col]), f"Column {col} data mismatch" + + print("✅ Local pickle functions test passed!") + + # Test with compression + compressed_path = os.path.join(temp_dir, "test_granule_compressed.pkl") + print(f"Testing to_pickle_local with compression...") + written_path = sdf.to_pickle_local(compressed_path, compress='bz2') + print(f"Written compressed to: {written_path}") + + print(f"Testing from_pickle_local with compression...") + sdf_read_compressed = sp.STAREDataFrame.from_pickle_local(compressed_path, compress='bz2') + print(f"Read compressed STAREDataFrame shape: {sdf_read_compressed.shape}") + + # Verify data integrity for compressed version + assert sdf.shape == sdf_read_compressed.shape, f"Shape mismatch: {sdf.shape} != {sdf_read_compressed.shape}" + assert set(sdf.columns) == set(sdf_read_compressed.columns), f"Column mismatch: {sdf.columns} != {sdf_read_compressed.columns}" + + for col in sdf.columns: + assert sdf[col].equals(sdf_read_compressed[col]), f"Column {col} data mismatch in compressed version" + + print("✅ Local pickle functions with compression test passed!") + +def test_pickle_s3_functions(): + """Test pickle S3 storage functions (requires S3 credentials)""" + + # Skip this test if no S3 credentials are available + # In a real test environment, you would use mock S3 or test credentials + pytest.skip("S3 test requires credentials - skipping for now") + + # Create a test STAREDataFrame with geometry and SIDs + data = { + 'lat': [0, 1, 2, 3, 4], + 'lon': [0, 1, 2, 3, 4], + 'data': [1, 2, 3, 4, 5], + 'category': ['A', 'B', 'A', 'B', 'A'] + } + + sdf = sp.STAREDataFrame(data) + + # Add geometry column + sdf['geometry'] = [Point(lon, lat) for lon, lat in zip(sdf['lon'], sdf['lat'])] + sdf = sdf.set_geometry('geometry') + + # Create SIDs + sids = sdf.make_sids(level=10) + sdf['sids'] = sids + + # S3 test would go here with proper credentials + # s3_path = "s3://test-bucket/test-granule.pkl" + # storage_options = { + # "key": "your-key", + # "secret": "your-secret", + # "client_kwargs": {"region_name": "us-west-2"} + # } + # + # written_path = sdf.to_pickle_s3(s3_path, storage_options=storage_options) + # sdf_read = sp.STAREDataFrame.from_pickle_s3(s3_path, storage_options=storage_options) + + print("✅ S3 pickle functions test skipped (requires credentials)") + +def test_pickle_with_real_granule_data(): + """Test pickle functions with actual granule data if available""" + + # Check if we have test data available + test_data_path = "tests/data/granules/MOD05_L2.A2019336.0000.061.2019336211522_stare.nc" + + if os.path.exists(test_data_path): + print(f"Testing with real granule data: {test_data_path}") + + # Read granule with starepandas + sdf = sp.read_granule(test_data_path, sidecar=True, latlon=True, read_timestamp=False) + + print(f"Granule STAREDataFrame shape: {sdf.shape}") + print(f"Columns: {sdf.columns.tolist()}") + print(f"Has SIDs: {sdf.has_sids()}") + + # Create temporary directory for testing + with tempfile.TemporaryDirectory() as temp_dir: + local_path = os.path.join(temp_dir, "real_granule.pkl") + + print(f"Testing to_pickle_local with real data...") + # Test writing to local pickle + written_path = sdf.to_pickle_local(local_path) + print(f"Written to: {written_path}") + + print(f"Testing from_pickle_local with real data...") + # Test reading from local pickle + sdf_read = sp.STAREDataFrame.from_pickle_local(local_path) + print(f"Read STAREDataFrame shape: {sdf_read.shape}") + + # Verify data integrity + assert sdf.shape == sdf_read.shape, f"Shape mismatch: {sdf.shape} != {sdf_read.shape}" + assert set(sdf.columns) == set(sdf_read.columns), f"Column mismatch: {sdf.columns} != {sdf_read.columns}" + + print("✅ Real granule pickle test passed!") + else: + print(f"Test data not available at {test_data_path}, skipping real data test") + +if __name__ == "__main__": + print("Running pickle functions tests...") + test_pickle_local_functions() + test_pickle_s3_functions() + test_pickle_with_real_granule_data() + print("All tests completed!") \ No newline at end of file diff --git a/tests/test_ssmis_hdf5.py b/tests/test_ssmis_hdf5.py new file mode 100644 index 0000000..c2a224d --- /dev/null +++ b/tests/test_ssmis_hdf5.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +""" +Test script for SSMIS HDF5 support. +This script tests both NetCDF4 and HDF5 file formats using the available test data. +""" + +import os +import sys +import numpy as np + +# Add the current directory to the path +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from starepandas.io.granules.ssmis import SSMIS + + +def test_ssmis_netcdf4(): + """Test SSMIS NetCDF4 file reading.""" + print("=== Testing SSMIS NetCDF4 Support ===") + + file_path = "tests/data/granules/1C.F16.SSMIS.XCAL2016-V.20210110-S002714-E020909.088907.V05A.HDF5" + + if not os.path.exists(file_path): + print(f"File not found: {file_path}") + return False + + try: + with SSMIS(file_path, scans=['S1', 'S2']) as ssmis: + print(f"✓ File type detected: {ssmis.file_type}") + print(f"✓ Available scans: {ssmis.scans}") + + # Test reading latitude and longitude + ssmis.read_latlon() + print(f"✓ Latitude shape for S1: {ssmis.lat['S1'].shape}") + print(f"✓ Longitude shape for S1: {ssmis.lon['S1'].shape}") + + # Test reading timestamps + ssmis.read_timestamps() + print(f"✓ Timestamp shape for S1: {ssmis.timestamps['S1'].shape}") + + # Test reading data + ssmis.read_data() + print(f"✓ Available data for S1: {list(ssmis.data['S1'].keys())}") + if 'Tc1' in ssmis.data['S1']: + print(f"✓ Tc1 data shape: {ssmis.data['S1']['Tc1'].shape}") + + # Test converting to DataFrame + dfs = ssmis.to_df() + print(f"✓ DataFrame for S1 has {len(dfs['S1'])} rows") + + return True + + except Exception as e: + print(f"✗ Error reading NetCDF4 file: {e}") + return False + + +def test_ssmis_hdf5(): + """Test SSMIS HDF5 file reading.""" + print("\n=== Testing SSMIS HDF5 Support ===") + + file_path = "tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5" + + if not os.path.exists(file_path): + print(f"File not found: {file_path}") + return False + + try: + with SSMIS(file_path, scans=['S1', 'S2', 'S3', 'S4']) as ssmis: + print(f"✓ File type detected: {ssmis.file_type}") + print(f"✓ Available scans: {ssmis.scans}") + + # Test reading latitude and longitude + ssmis.read_latlon() + print(f"✓ Latitude shape for S1: {ssmis.lat['S1'].shape}") + print(f"✓ Longitude shape for S1: {ssmis.lon['S1'].shape}") + + # Test reading timestamps + ssmis.read_timestamps() + print(f"✓ Timestamp shape for S1: {ssmis.timestamps['S1'].shape}") + + # Test reading data + ssmis.read_data() + print(f"✓ Available data for S1: {list(ssmis.data['S1'].keys())}") + if 'Tc1' in ssmis.data['S1']: + print(f"✓ Tc1 data shape: {ssmis.data['S1']['Tc1'].shape}") + + # Test converting to DataFrame + dfs = ssmis.to_df() + print(f"✓ DataFrame for S1 has {len(dfs['S1'])} rows") + + return True + + except Exception as e: + print(f"✗ Error reading HDF5 file: {e}") + return False + + +def test_auto_detection(): + """Test automatic file type detection.""" + print("\n=== Testing Automatic File Type Detection ===") + + netcdf_file = "tests/data/granules/1C.F16.SSMIS.XCAL2016-V.20210110-S002714-E020909.088907.V05A.HDF5" + hdf5_file = "tests/data/granules/1C.F18.SSMIS.XCAL2021-V.20250105-S222535-E000725.078504.V07B.HDF5" + + files_to_test = [ + (netcdf_file, "NetCDF4"), + (hdf5_file, "HDF5") + ] + + for file_path, expected_type in files_to_test: + if os.path.exists(file_path): + try: + with SSMIS(file_path) as ssmis: + detected_type = ssmis.file_type + status = "✓" if detected_type == expected_type.lower() else "✗" + print(f"{status} File: {os.path.basename(file_path)}") + print(f" Expected: {expected_type}, Detected: {detected_type}") + except Exception as e: + print(f"✗ Error with {os.path.basename(file_path)}: {e}") + else: + print(f"✗ File not found: {file_path}") + + +def main(): + """Run all tests.""" + print("SSMIS HDF5/NetCDF4 Support Test") + print("=" * 50) + + # Run tests + netcdf_success = test_ssmis_netcdf4() + hdf5_success = test_ssmis_hdf5() + test_auto_detection() + + print("\n" + "=" * 50) + print("Test Summary:") + print(f"NetCDF4 Support: {'✓ PASS' if netcdf_success else '✗ FAIL'}") + print(f"HDF5 Support: {'✓ PASS' if hdf5_success else '✗ FAIL'}") + + if netcdf_success and hdf5_success: + print("\n🎉 All tests passed! SSMIS HDF5 support is working correctly.") + else: + print("\n❌ Some tests failed. Please check the error messages above.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tests/test_zarr_functions.py b/tests/test_zarr_functions.py new file mode 100644 index 0000000..da48826 --- /dev/null +++ b/tests/test_zarr_functions.py @@ -0,0 +1,142 @@ +import os +import tempfile +import shutil +import numpy as np +import pandas as pd +import starepandas as sp +from shapely.geometry import Point +import pytest + +def test_zarr_local_functions(): + """Test zarr local storage functions following the pattern from zarrpods_v2.ipynb""" + + # Create a test STAREDataFrame with geometry and SIDs + data = { + 'lat': [0, 1, 2, 3, 4], + 'lon': [0, 1, 2, 3, 4], + 'data': [1, 2, 3, 4, 5], + 'category': ['A', 'B', 'A', 'B', 'A'] + } + + sdf = sp.STAREDataFrame(data) + + # Add geometry column + sdf['geometry'] = [Point(lon, lat) for lon, lat in zip(sdf['lon'], sdf['lat'])] + sdf = sdf.set_geometry('geometry') + + # Create SIDs + sids = sdf.make_sids(level=10) + sdf['sids'] = sids + + print(f"Original STAREDataFrame shape: {sdf.shape}") + print(f"Columns: {sdf.columns.tolist()}") + print(f"Has SIDs: {sdf.has_sids()}") + + # Create temporary directory for testing + with tempfile.TemporaryDirectory() as temp_dir: + local_path = os.path.join(temp_dir, "test_granule.zarr") + + print(f"Testing to_zarr_local...") + # Test writing to local zarr + written_path = sdf.to_zarr_local(local_path, level=10, chunk_size=1000) + print(f"Written to: {written_path}") + + print(f"Testing from_zarr_local...") + # Test reading from local zarr + sdf_read = sp.STAREDataFrame.from_zarr_local(local_path) + print(f"Read STAREDataFrame shape: {sdf_read.shape}") + print(f"Read columns: {sdf_read.columns.tolist()}") + + # Verify data integrity (ignore column order) + assert sdf.shape == sdf_read.shape, f"Shape mismatch: {sdf.shape} != {sdf_read.shape}" + assert set(sdf.columns) == set(sdf_read.columns), f"Column mismatch: {sdf.columns} != {sdf_read.columns}" + + # Check that data values match for each column by name (excluding geometry which might be serialized differently) + for col in sdf.columns: + if col != 'geometry': + assert sdf[col].equals(sdf_read[col]), f"Column {col} data mismatch" + + print("✅ Local zarr functions test passed!") + +def test_zarr_s3_functions(): + """Test zarr S3 storage functions (requires S3 credentials)""" + + # Skip this test if no S3 credentials are available + # In a real test environment, you would use mock S3 or test credentials + pytest.skip("S3 test requires credentials - skipping for now") + + # Create a test STAREDataFrame with geometry and SIDs + data = { + 'lat': [0, 1, 2, 3, 4], + 'lon': [0, 1, 2, 3, 4], + 'data': [1, 2, 3, 4, 5], + 'category': ['A', 'B', 'A', 'B', 'A'] + } + + sdf = sp.STAREDataFrame(data) + + # Add geometry column + sdf['geometry'] = [Point(lon, lat) for lon, lat in zip(sdf['lon'], sdf['lat'])] + sdf = sdf.set_geometry('geometry') + + # Create SIDs + sids = sdf.make_sids(level=10) + sdf['sids'] = sids + + # S3 test would go here with proper credentials + # s3_path = "s3://test-bucket/test-granule" + # storage_options = { + # "key": "your-key", + # "secret": "your-secret", + # "client_kwargs": {"region_name": "us-west-2"} + # } + # + # written_path = sdf.to_zarr_s3(s3_path, level=10, storage_options=storage_options) + # sdf_read = sp.STAREDataFrame.from_zarr_s3(s3_path, storage_options=storage_options) + + print("✅ S3 zarr functions test skipped (requires credentials)") + +def test_zarr_with_real_granule_data(): + """Test zarr functions with actual granule data if available""" + + # Check if we have test data available + test_data_path = "tests/data/granules/MOD05_L2.A2019336.0000.061.2019336211522_stare.nc" + + if os.path.exists(test_data_path): + print(f"Testing with real granule data: {test_data_path}") + + # Read granule with starepandas + sdf = sp.read_granule(test_data_path, sidecar=True, latlon=True, read_timestamp=False) + + print(f"Granule STAREDataFrame shape: {sdf.shape}") + print(f"Columns: {sdf.columns.tolist()}") + print(f"Has SIDs: {sdf.has_sids()}") + + # Create temporary directory for testing + with tempfile.TemporaryDirectory() as temp_dir: + local_path = os.path.join(temp_dir, "real_granule.zarr") + + print(f"Testing to_zarr_local with real data...") + # Test writing to local zarr + written_path = sdf.to_zarr_local(local_path, level=10, chunk_size=250000) + print(f"Written to: {written_path}") + + print(f"Testing from_zarr_local with real data...") + # Test reading from local zarr + sdf_read = sp.STAREDataFrame.from_zarr_local(local_path) + print(f"Read STAREDataFrame shape: {sdf_read.shape}") + + # Verify data integrity + assert sdf.shape == sdf_read.shape, f"Shape mismatch: {sdf.shape} != {sdf_read.shape}" + assert list(sdf.columns) == list(sdf_read.columns), f"Column mismatch: {sdf.columns} != {sdf_read.columns}" + + print("✅ Real granule zarr test passed!") + else: + print(f"Test data not available at {test_data_path}, skipping real data test") + +if __name__ == "__main__": + print("Running zarr functions tests...") + test_zarr_local_functions() + test_zarr_s3_functions() + test_zarr_with_real_granule_data() + print("All tests completed!") \ No newline at end of file