From 6697466e4ab66b4f5bbe73c853b1c2870aeb542c Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Wed, 18 Feb 2026 17:37:48 -0800 Subject: [PATCH 01/11] Add Signal Hound to SigMF conversion functionality Implement converter for Signal Hound files to SigMF format with metadata extraction and IQ data handling. --- sigmf/convert/signalhound.py | 502 +++++++++++++++++++++++++++++++++++ 1 file changed, 502 insertions(+) create mode 100644 sigmf/convert/signalhound.py diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py new file mode 100644 index 0000000..98afae2 --- /dev/null +++ b/sigmf/convert/signalhound.py @@ -0,0 +1,502 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# last updated 2-16-26 + +"""converter for signalhound files to SigMF format.""" + +import os +import json +import io +import logging +import tempfile +from pathlib import Path +import hashlib +import numpy as np +from datetime import datetime, timezone, timedelta + +import xml.etree.ElementTree as ET +from typing import Optional + +from .. import SigMFFile +from ..error import SigMFConversionError + +from .. import __version__ as toolversion +from .. import fromfile +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT + +import sys + +# Define constants for Spike +ENDIANNESS = "<" +DATATYPE = "ci16_le" # complex short int16 little-endian +# DATATYPE_SIZE = 4 # bytes per complex int16 sample (2 bytes I + 2 bytes Q) + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", + stream=sys.stdout, # ensure logs go to stdout (or use sys.stderr) +) + +log = logging.getLogger() + +def _to_float(x) -> Optional[float]: + """Convert value to float, return None if invalid.""" + try: + return float(x) + except Exception: + return None + + +def _to_int(x) -> Optional[int]: + """Convert value to int, return None if invalid.""" + try: + return int(float(x)) + except Exception: + return None + +def _parse_preview_trace(text) -> list[float]: + """Parse PreviewTrace string into list of floats.""" + if text is None: + return [] + s = text.strip() + if s.endswith(","): + s = s[:-1] + if not s: + return [] + parts = [p.strip() for p in s.split(",") if p.strip() != ""] + # return both list and numpy array if caller wants either + vals = [] + for p in parts: + try: + vals.append(float(p)) + except Exception: + # skip malformed entries + continue + return vals + + +def spike_to_sigmf_metadata(xml_file_path) -> dict: + """ + Build a SigMF metadata file the spike xml file. + + Parameters + ---------- + xml_file_path : str + Path to the spike xml file. + Returns + ------- + dict + SigMF metadata structure. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + log.info("===== Converting Spike XML metadata to SigMF format =====") + + xml_path = Path(xml_file_path) + tree = ET.parse(xml_path) + root = tree.getroot() + + def text_of(tag) -> Optional[str]: + """Extract and strip text from XML element.""" + el = root.find(tag) + return el.text.strip() if (el is not None and el.text is not None) else None + + # TODO: Determine if data dictionary is best structure for this data + + md = {} + # Signal Hound data elements + for tag in ( + "DeviceType", + "SerialNumber", + "DataType", + "ReferenceLevel", + "CenterFrequency", + "SampleRate", + "Decimation", + "IFBandwidth", + "ScaleFactor", + "IQFileName", + "EpochNanos", + "SampleCount", + "PreviewTrace", + ): + md[f"{tag}_raw"] = text_of(tag) + + # Optional log.info of data for debug >> + # log.info(md) + + # Typed fields / normalized + md["DataType"] = md.pop("DataType_raw") + md["DeviceType"] = md.pop("DeviceType_raw") + md["CenterFrequency"] = _to_float(md.pop("CenterFrequency_raw")) + md["SampleCount"] = _to_int(md.pop("SampleCount_raw")) + md["SampleRate"] = _to_float(md.pop("SampleRate_raw")) + md["EpochNanos"] = _to_int(md.pop("EpochNanos_raw")) + # Will be added as comments or annotations + md["ReferenceLevel"] = _to_float(md.pop("ReferenceLevel_raw")) + md["Decimation"] = _to_int(md.pop("Decimation_raw")) + md["IFBandwidth"] = _to_float(md.pop("IFBandwidth_raw")) + md["ScaleFactor"] = _to_float(md.pop("ScaleFactor_raw")) + md["SerialNumber"] = md.pop("SerialNumber_raw") + md["IQFileName"] = md.pop("IQFileName_raw") + + # PreviewTrace: list of floats and numpy array + # TODO: Consider adding a flag to include preview trace or not. + # TODO: Confirm np.int16 data type for preview data elements. + preview_raw = text_of("PreviewTrace") + md["PreviewTrace_list"] = _parse_preview_trace(preview_raw) + md["PreviewTrace_array"] = np.array(md["PreviewTrace_list"], dtype=np.int16) + + # Create a reference to the spike XML data + spike_xml = md + + # TODO: Confirm Zero Span Spike files are single channel + channel_number = 1 + + # Check datatype mapping based on Spike XML DataType field - should be "Complex Short" + spike_data_type = spike_xml.get("DataType") + if spike_data_type == "Complex Short": + data_type= "ci16_le" # complex int16 little-endian + else: + raise SigMFConversionError(f"Unsupported Spike DataType: {spike_data_type}") + # Check for DeviceType field for hardware description, otherwise use generic description + device_type = spike_xml.get("DeviceType") + hardware_description = ( + device_type if device_type is not None else "Signal Hound Device" + ) + + # Strip the extension from the original file path + base_file_name = os.path.splitext(xml_file_path)[0] + # Build the .iq file path for data file + data_file_path = base_file_name + ".iq" + filesize = os.path.getsize(data_file_path) + # complex 16-bit integer IQ data > ci16_le in SigMF + elem_size = np.dtype(np.int16).itemsize + elem_count = filesize // elem_size + log.info(f"Element Count: {elem_count}") + frame_bytes = 2 * elem_size + if filesize % frame_bytes != 0: + raise SigMFConversionError(f"File size {filesize} not divisible by {frame_bytes}; partial sample present") + + # Calculate sample count using the original IQ data file size + sample_count = filesize // frame_bytes + log.info(f"Sample count: {sample_count}") + + # For now define static values. Perhaps take as JSON or command arg input in the future. + spike_author = "Spike File Conversion - Unknown Author" + spike_licence = "Spike File Conversion - Unknown License" + spike_description = "Signal Hound Spike Zero Span File converted to SigMF format" + + # Convert the datetime object to an ISO 8601 formatted string + epoch_time = spike_xml.get("EpochNanos") + if epoch_time is None: + raise SigMFConversionError("Missing EpochNanos in Spike XML") + epoch_nanos = int(epoch_time) + secs = epoch_nanos // 1_000_000_000 + rem_ns = epoch_nanos % 1_000_000_000 + dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) + iso_8601_string = dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + # TODO: Confirm freq_upper_edge and lower_frequency_edge calculations - is this correct for Spike files? ScaleFactor? Mhz? /2? + center = float(spike_xml.get("CenterFrequency") or 0.0) + bandwidth = float(spike_xml.get("IFBandwidth") or 0.0) + upper_frequency_edge = center + (bandwidth / 2.0) + lower_frequency_edge = center - (bandwidth / 2.0) + + # --- Base Global Metadata --- + global_md = { + "core:author": spike_author, + "core:datatype": data_type, + "core:description": spike_description, + "core:hw": hardware_description, + "core:license": spike_licence, + "core:num_channels": channel_number, + "core:sample_rate": spike_xml.get("SampleRate"), + "core:version": "1.0.0", + "core:spike_ReferenceLevel": spike_xml.get("ReferenceLevel"), + "core:spike_Decimation": spike_xml.get("Decimation"), + "core:spike_IFBandwidth": spike_xml.get("IFBandwidth"), + "core:spike_ScaleFactor": spike_xml.get("ScaleFactor"), + "core:spike_IQFileName": spike_xml.get("IQFileName"), + } + + # --- Captures array --- + captures = [ + { + "core:datetime": iso_8601_string, + "core:frequency": float(spike_xml.get("CenterFrequency")), + "core:sample_start": 0, + } + ] + + # --- Create annotations array using calculated values--- + annotations = [ + { + "core:sample_start": 0, + "core:sample_count": sample_count, + "core:freq_upper_edge": upper_frequency_edge, + "core:freq_lower_edge": lower_frequency_edge, + "core:label": "Spike", + } + ] + + # --- Final SigMF object --- + sigmf = { + "global": global_md, + "captures": captures, + "annotations": annotations, + } + + return sigmf + + +def convert_iq_data(xml_file_path, sigmfObj=None) -> np.ndarray: + """ + Convert IQ data in .iq file to SigMF based on values in Zero Span XML file. + + Parameters + ---------- + xml_file_path : str + Path to the spike zero span XML file. + sigmfObj : SigMFFile + SigMF object with metadata information. + + Returns + ------- + numpy.ndarray + Parsed samples. + """ + + # TODO: Although this code may not be needed now, this function can be extended in the future to handle multiple channel recordings? + # (Samples pending for testing with multi-channel Spike files) + + log.info("===== Parsing spike file data values =====") + base_file_name = os.path.splitext(xml_file_path)[0] + iq_file_path = base_file_name + ".iq" + + # TODO: Confirm that the data that is used is correct for the Spike files + + # Gather IQ file information from generated SigMF data file + if isinstance(sigmfObj, dict): + sample_rate = ( + sigmfObj.get("global", {}).get("core:sample_rate") + or sigmfObj.get("global", {}).get("sample_rate") + or sigmfObj.get("core:sample_rate") + ) + elem_count=(sigmfObj.get("annotations", [{}])[0].get("core:sample_count"))*2 # *2 for I and Q samples + + # complex 16-bit integer IQ data > ci16_le in SigMF + elem_size = np.dtype(np.int16).itemsize + + # Read raw interleaved int16 IQ + samples = np.fromfile(iq_file_path, dtype=np.int16, offset=0, count=elem_count) + + # Trim trailing partial bytes + if samples.nbytes % elem_size != 0: + trim = samples % elem_size + log.warning("Trimming %d trailing byte(s) to align samples", trim) + samples -= trim + + # TODO: Confirm that there is no need to reassemble interleaved IQ samples + # samples = raw_samples[::2] + 1j*raw_samples[1::2] # convert to IQIQIQ... + + # TODO: Use consitent file names in output + # output_dir = filenames["meta_fn"].parent + samples.tofile(iq_file_path + ".sigmf-data") + log.info(f"==== Wrote SigMF data to {iq_file_path + '.sigmf-data'} ====") + + # Return the IQ data if needed for further processing if needed in the future. + return samples + +def signalhound_to_sigmf( + signalhound_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, + create_ncd: bool = False, +) -> SigMFFile: + """ + Read a signalhound file, optionally write sigmf archive, return associated SigMF object. + + Parameters + ---------- + signalhound_path : str + Path to the signalhound file. + out_path : str, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + + Raises + ------ + SigMFConversionError + If the signalhound file cannot be read. + """ + signalhound_path = Path(signalhound_path) + out_path = None if out_path is None else Path(out_path) + base_file_name = os.path.splitext(signalhound_path)[0] + meta_path = base_file_name + ".sigmf-meta" + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? For now using file modification time since it is more likely to be present and accurate for the actual data capture time, whereas the EpochNanos field may be missing or inaccurate in some cases. This can be revisited in the future if needed based on user feedback or specific use cases. + modify_time = signalhound_path.lstat().st_mtime + signalhound_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) + + capture_info = { + SigMFFile.DATETIME_KEY: signalhound_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + } + + data_bytes = signalhound_path.stat().st_size + log.info(f"Data Bytes: {data_bytes}") + + # Call the SigMF conversion for metadata generation (returns dict) + sigmfMetaData = spike_to_sigmf_metadata(signalhound_path) + + # Use the generated global metadata dict for SigMFFile construction + global_info = sigmfMetaData.get("global", {}) + + # create SigMF metadata + meta = SigMFFile(global_info=global_info) + meta.data_file = signalhound_path + + # Set captures information + capture_info[SigMFFile.FREQUENCY_KEY] = sigmfMetaData.get("captures", [{}])[0].get("core:frequency") + header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + # Set the annotations information + # Add annotations from metadata + for annotation in sigmfMetaData.get("annotations", []): + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + # Pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = {k: v for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + + # Manually set the fields that set_data_file() would normally populate + meta._data_file_offset = header_bytes + meta._data_file_size = data_bytes + meta._data_file_skip_checksum = True + + # TODO: Determine how to use memmap to avoid issues in SigMFFile. + # Explicitly disable memmap for SignalHound files since they may not be compatible with memmap + # meta._data_file_is_memmap = False + # meta._data_file_memmap_shape = None + # meta._data_file_is_binary = True + + # Get filenames for metadata, data, and archive based on output path and input file name + filenames = get_sigmf_filenames(out_path) + + # Create NCD if specified, otherwise create standard SigMF dataset or archive + if create_ncd: + # Write .sigmf-meta file + + with open(meta_path, "w") as f: + json.dump(sigmfMetaData, f, indent=2) + log.info(f"==== TEMP: Wrote SigMF metadata to {meta_path} ====") + + # TODO: Use this code for metadata and remove code above + # write metadata file if output path specified + if out_path is not None: + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("For NCD: wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) + log.debug("created %r", meta) + + if out_path is None: + base_path = signalhound_path.with_suffix(".sigmf") + else: + base_path = Path(out_path) + + # Create Archive if specified, otherwise write separate meta and data files + if create_archive: + # use temporary directory for data file when creating archive + with tempfile.TemporaryDirectory() as temp_dir: + data_path = Path(temp_dir) / filenames["data_fn"].name + + # Convert IQ data and write to temp directory + try: + iq_data = convert_iq_data(str(signalhound_path), sigmfMetaData) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") + + # Write converted IQ data to temporary file + iq_data.tofile(data_path) + log.info(f"Wrote converted IQ data to {data_path}") + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + # Add annotations from metadata + for annotation in sigmfMetaData.get("annotations", []): + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + annot_metadata = {k: v for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote SigMF archive to %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) + + else: + # Write separate meta and data files + # Convert IQ data for Zero span Spike file + try: + iq_data = convert_iq_data(str(signalhound_path), sigmfMetaData) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") + + # Create SigMFFile with converted IQ data in a BytesIO buffer + data_buffer = io.BytesIO(iq_data.tobytes()) + + meta = SigMFFile(global_info=global_info) + meta.set_data_file(data_buffer=data_buffer, skip_checksum=True) + meta.add_capture(0, metadata=capture_info) + + # Add annotations from metadata + for annotation in sigmfMetaData.get("annotations", []): + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + # Pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = {k: v for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + # TODO: Check which files are being written data vs. just meta data + # Previous Code... + # Write .sigmf-meta file + # meta_path = base_file_name + ".sigmf-meta" + # with open(meta_path, "w") as f: + # json.dump(sigmfMetaData, f, indent=2) + # log.info(f"==== Wrote SigMF metadata to {meta_path} ====") + # log.info("wrote SigMF dataset to %s", data_path) + + # Write metadata and data files + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote SigMF metadata and data files to %s", filenames["meta_fn"]) + + log.debug("Created %r", meta) + return meta From 23367364f3d6145307f6bcfdb5d52140dac8dc8f Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Wed, 18 Feb 2026 18:06:25 -0800 Subject: [PATCH 02/11] Add Signal Hound file support to conversion script Integrate Signal Hound file conversion into the main script, updating magic byte checks and handling for new file type. --- sigmf/convert/__main__.py | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/sigmf/convert/__main__.py b/sigmf/convert/__main__.py index de7d6c1..8f57a1b 100644 --- a/sigmf/convert/__main__.py +++ b/sigmf/convert/__main__.py @@ -16,6 +16,8 @@ from ..utils import get_magic_bytes from .blue import blue_to_sigmf from .wav import wav_to_sigmf +from .signalhound import signalhound_to_sigmf + def main() -> None: @@ -60,8 +62,8 @@ def main() -> None: exclusive_group.add_argument( "--ncd", action="store_true", help="Output .sigmf-meta only and process as a Non-Conforming Dataset (NCD)" ) - parser.add_argument("--overwrite", action="store_true", help="Overwrite existing output files") parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() level_lut = { @@ -90,28 +92,29 @@ def main() -> None: if magic_bytes == b"RIFF": # WAV file - _ = wav_to_sigmf( - wav_path=input_path, - out_path=output_path, - create_archive=args.archive, - create_ncd=args.ncd, - overwrite=args.overwrite, - ) + _ = wav_to_sigmf(wav_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) elif magic_bytes == b"BLUE": # BLUE file - _ = blue_to_sigmf( - blue_path=input_path, - out_path=output_path, - create_archive=args.archive, - create_ncd=args.ncd, - overwrite=args.overwrite, + _ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + + ## TODO: Determine proper way to integrate Signal Hound files. + elif magic_bytes == b" + # Signal Hound Spike 1.0 file + # Of the 66 Byte string move 43 bytes in to skip the XML declaration + # and get to the root element and take 18 chars for a more specific detection of Signal Hound Spike files + expanded_magic_bytes = get_magic_bytes(input_path, count=17, offset=40) + if expanded_magic_bytes == b"SignalHoundIQFile": + _ = signalhound_to_sigmf(signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + else: + raise SigMFConversionError( + f"Unsupported XML file format. Expanded Magic bytes: {expanded_magic_bytes}. " + f"Supported formats for conversion are WAV, BLUE/Platinum and Signal Hound Spike." ) - else: raise SigMFConversionError( f"Unsupported file format. Magic bytes: {magic_bytes}. " - f"Supported formats for conversion are WAV and BLUE/Platinum." + f"Supported formats for conversion are WAV, BLUE/Platinum and Signal Hound Spike." ) From 2c0e92f08977a87f316f4dae2090d8945af45b32 Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Mon, 2 Mar 2026 14:14:24 -0800 Subject: [PATCH 03/11] Update sigmf/convert/signalhound.py Co-authored-by: Teque5 --- sigmf/convert/signalhound.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 98afae2..99ea698 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -18,7 +18,7 @@ from datetime import datetime, timezone, timedelta import xml.etree.ElementTree as ET -from typing import Optional +from typing import Optional, List from .. import SigMFFile from ..error import SigMFConversionError From fe8c0cd08f1b411e0e6c28af7a66582746bbc62a Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Mon, 2 Mar 2026 14:19:22 -0800 Subject: [PATCH 04/11] Update __main__.py From 9195ff338b796f8aa8fa543d096f7316f8b51e29 Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Mon, 2 Mar 2026 14:21:19 -0800 Subject: [PATCH 05/11] Made code updates based on PR input. Still need to work on returning SigMF object instead of dictionary Updated the last modified date and refactored comments for clarity. --- sigmf/convert/signalhound.py | 29 +++++++---------------------- 1 file changed, 7 insertions(+), 22 deletions(-) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 99ea698..08b46b2 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -3,7 +3,7 @@ # This file is part of sigmf-python. https://github.com/sigmf/sigmf-python # # SPDX-License-Identifier: LGPL-3.0-or-later -# last updated 2-16-26 +# last updated 3-02-26 """converter for signalhound files to SigMF format.""" @@ -30,17 +30,7 @@ import sys -# Define constants for Spike -ENDIANNESS = "<" -DATATYPE = "ci16_le" # complex short int16 little-endian -# DATATYPE_SIZE = 4 # bytes per complex int16 sample (2 bytes I + 2 bytes Q) - -logging.basicConfig( - level=logging.INFO, - format="%(asctime)s %(levelname)s %(name)s: %(message)s", - stream=sys.stdout, # ensure logs go to stdout (or use sys.stderr) -) - +# Use logging defined in __main__.py log = logging.getLogger() def _to_float(x) -> Optional[float]: @@ -58,7 +48,7 @@ def _to_int(x) -> Optional[int]: except Exception: return None -def _parse_preview_trace(text) -> list[float]: +def _parse_preview_trace(text) -> List[float]: """Parse PreviewTrace string into list of floats.""" if text is None: return [] @@ -354,7 +344,8 @@ def signalhound_to_sigmf( if out_path is None: create_ncd = True - # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? For now using file modification time since it is more likely to be present and accurate for the actual data capture time, whereas the EpochNanos field may be missing or inaccurate in some cases. This can be revisited in the future if needed based on user feedback or specific use cases. + # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? + # For now using file modification time modify_time = signalhound_path.lstat().st_mtime signalhound_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) @@ -389,18 +380,12 @@ def signalhound_to_sigmf( annot_metadata = {k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - - + # Manually set the fields that set_data_file() would normally populate + # TODO: Consider refactoring to use set_data_file() for better consistency meta._data_file_offset = header_bytes meta._data_file_size = data_bytes meta._data_file_skip_checksum = True - - # TODO: Determine how to use memmap to avoid issues in SigMFFile. - # Explicitly disable memmap for SignalHound files since they may not be compatible with memmap - # meta._data_file_is_memmap = False - # meta._data_file_memmap_shape = None - # meta._data_file_is_binary = True # Get filenames for metadata, data, and archive based on output path and input file name filenames = get_sigmf_filenames(out_path) From 34c9c1a5bd43fd6d7eb8ab35f0ba4020058f77f4 Mon Sep 17 00:00:00 2001 From: KelseyCreekSoftware Date: Mon, 16 Mar 2026 09:21:21 -0700 Subject: [PATCH 06/11] This version of the Signal Hound to SigMF conversion uses more of the SigMF File Object, but is not working. Implement converter for Signal Hound files to SigMF format, including metadata extraction and IQ data conversion. --- .../convert/signalhound_fileobject_broken.py | 499 ++++++++++++++++++ 1 file changed, 499 insertions(+) create mode 100644 sigmf/convert/signalhound_fileobject_broken.py diff --git a/sigmf/convert/signalhound_fileobject_broken.py b/sigmf/convert/signalhound_fileobject_broken.py new file mode 100644 index 0000000..c001812 --- /dev/null +++ b/sigmf/convert/signalhound_fileobject_broken.py @@ -0,0 +1,499 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later +# last updated 3-16-26 + +"""converter for signalhound files to SigMF format.""" + +import os +import json +import io +import logging +import tempfile +from pathlib import Path +import hashlib +import numpy as np +from datetime import datetime, timezone, timedelta + +import xml.etree.ElementTree as ET +from typing import Optional, List + +from .. import SigMFFile +from ..error import SigMFConversionError + +from .. import __version__ as toolversion +from .. import fromfile +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT + +import sys + +# Use logging defined in __main__.py +log = logging.getLogger() + +def _to_float(x) -> Optional[float]: + """Convert value to float, return None if invalid.""" + try: + return float(x) + except Exception: + return None + + +def _to_int(x) -> Optional[int]: + """Convert value to int, return None if invalid.""" + try: + return int(float(x)) + except Exception: + return None + +def _parse_preview_trace(text) -> List[float]: + """Parse PreviewTrace string into list of floats.""" + if text is None: + return [] + s = text.strip() + if s.endswith(","): + s = s[:-1] + if not s: + return [] + parts = [p.strip() for p in s.split(",") if p.strip() != ""] + # return both list and numpy array if caller wants either + vals = [] + for p in parts: + try: + vals.append(float(p)) + except Exception: + # skip malformed entries + continue + return vals + + +def spike_to_sigmf_metadata(xml_file_path) -> dict: + """ + Build a SigMF metadata file the spike xml file. + + Parameters + ---------- + xml_file_path : str + Path to the spike xml file. + Returns + ------- + dict + SigMF metadata structure. + # TODO:-> SigMFFile: + sigmfObj : SigMFFile + SigMF object with metadata information. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + log.info("===== Converting Spike XML metadata to SigMF format =====") + + xml_path = Path(xml_file_path) + tree = ET.parse(xml_path) + root = tree.getroot() + + def text_of(tag) -> Optional[str]: + """Extract and strip text from XML element.""" + el = root.find(tag) + return el.text.strip() if (el is not None and el.text is not None) else None + + # TODO: Determine if data dictionary is best structure for this data + + md = {} + # Signal Hound data elements + for tag in ( + "DeviceType", + "SerialNumber", + "DataType", + "ReferenceLevel", + "CenterFrequency", + "SampleRate", + "Decimation", + "IFBandwidth", + "ScaleFactor", + "IQFileName", + "EpochNanos", + "SampleCount", + "PreviewTrace", + ): + md[f"{tag}_raw"] = text_of(tag) + + # Optional log.info of data for debug >> + # log.info(md) + + # Typed fields / normalized + md["DataType"] = md.pop("DataType_raw") + md["DeviceType"] = md.pop("DeviceType_raw") + md["CenterFrequency"] = _to_float(md.pop("CenterFrequency_raw")) + md["SampleCount"] = _to_int(md.pop("SampleCount_raw")) + md["SampleRate"] = _to_float(md.pop("SampleRate_raw")) + md["EpochNanos"] = _to_int(md.pop("EpochNanos_raw")) + # Will be added as comments or annotations + md["ReferenceLevel"] = _to_float(md.pop("ReferenceLevel_raw")) + md["Decimation"] = _to_int(md.pop("Decimation_raw")) + md["IFBandwidth"] = _to_float(md.pop("IFBandwidth_raw")) + md["ScaleFactor"] = _to_float(md.pop("ScaleFactor_raw")) + md["SerialNumber"] = md.pop("SerialNumber_raw") + md["IQFileName"] = md.pop("IQFileName_raw") + + # PreviewTrace: list of floats and numpy array + # TODO: Consider adding a flag to include preview trace or not. + # TODO: Confirm np.int16 data type for preview data elements. + preview_raw = text_of("PreviewTrace") + md["PreviewTrace_list"] = _parse_preview_trace(preview_raw) + md["PreviewTrace_array"] = np.array(md["PreviewTrace_list"], dtype=np.int16) + + # Create a reference to the spike XML data + spike_xml = md + + # TODO: Confirm Zero Span Spike files are single channel + channel_number = 1 + + # Check datatype mapping based on Spike XML DataType field - should be "Complex Short" + spike_data_type = spike_xml.get("DataType") + if spike_data_type == "Complex Short": + data_type= "ci16_le" # complex int16 little-endian + else: + raise SigMFConversionError(f"Unsupported Spike DataType: {spike_data_type}") + # Check for DeviceType field for hardware description, otherwise use generic description + device_type = spike_xml.get("DeviceType") + hardware_description = ( + device_type if device_type is not None else "Signal Hound Device" + ) + + # Strip the extension from the original file path + base_file_name = os.path.splitext(xml_file_path)[0] + # Build the .iq file path for data file + data_file_path = base_file_name + ".iq" + filesize = os.path.getsize(data_file_path) + # complex 16-bit integer IQ data > ci16_le in SigMF + elem_size = np.dtype(np.int16).itemsize + elem_count = filesize // elem_size + log.info(f"Element Count: {elem_count}") + frame_bytes = 2 * elem_size + if filesize % frame_bytes != 0: + raise SigMFConversionError(f"File size {filesize} not divisible by {frame_bytes}; partial sample present") + + # Calculate sample count using the original IQ data file size + sample_count = filesize // frame_bytes + log.info(f"Sample count: {sample_count}") + + # For now define static values. Perhaps take as JSON or command arg input in the future. + spike_author = "Spike File Conversion - Unknown Author" + spike_licence = "Spike File Conversion - Unknown License" + spike_description = "Signal Hound Spike Zero Span File converted to SigMF format" + + # Convert the datetime object to an ISO 8601 formatted string + epoch_time = spike_xml.get("EpochNanos") + if epoch_time is None: + raise SigMFConversionError("Missing EpochNanos in Spike XML") + epoch_nanos = int(epoch_time) + secs = epoch_nanos // 1_000_000_000 + rem_ns = epoch_nanos % 1_000_000_000 + dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) + iso_8601_string = dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + + # TODO: Confirm freq_upper_edge and lower_frequency_edge calculations - is this correct for Spike files? ScaleFactor? Mhz? /2? + center = float(spike_xml.get("CenterFrequency") or 0.0) + bandwidth = float(spike_xml.get("IFBandwidth") or 0.0) + upper_frequency_edge = center + (bandwidth / 2.0) + lower_frequency_edge = center - (bandwidth / 2.0) + + # --- Base Global Metadata --- + global_md = { + "core:author": spike_author, + "core:datatype": data_type, + "core:description": spike_description, + "core:hw": hardware_description, + "core:license": spike_licence, + "core:num_channels": channel_number, + "core:sample_rate": spike_xml.get("SampleRate"), + "core:version": "1.0.0", + "core:spike_ReferenceLevel": spike_xml.get("ReferenceLevel"), + "core:spike_Decimation": spike_xml.get("Decimation"), + "core:spike_IFBandwidth": spike_xml.get("IFBandwidth"), + "core:spike_ScaleFactor": spike_xml.get("ScaleFactor"), + "core:spike_IQFileName": spike_xml.get("IQFileName"), + } + + # --- Captures array --- + captures = [ + { + "core:datetime": iso_8601_string, + "core:frequency": float(spike_xml.get("CenterFrequency")), + "core:sample_start": 0, + } + ] + + # --- Create annotations array using calculated values--- + annotations = [ + { + "core:sample_start": 0, + "core:sample_count": sample_count, + "core:freq_upper_edge": upper_frequency_edge, + "core:freq_lower_edge": lower_frequency_edge, + "core:label": "Spike", + } + ] + + # --- Final SigMF object --- + sigmf = { + "global": global_md, + "captures": captures, + "annotations": annotations, + } + + # TODO: Return SigMFFile object instead of dict + + meta = SigMFFile(global_info=sigmf, skip_checksum=True) + + # return sigmf + return meta + +def convert_iq_data(xml_file_path, sigmfObj=None) -> np.ndarray: + """ + Convert IQ data in .iq file to SigMF based on values in Zero Span XML file. + + Parameters + ---------- + xml_file_path : str + Path to the spike zero span XML file. + sigmfObj : SigMFFile + SigMF object with metadata information. + + Returns + ------- + numpy.ndarray + Parsed samples. + """ + + # TODO: Although this code may not be needed now, this function can be extended in the future to handle multiple channel recordings? + # (Samples pending for testing with multi-channel Spike files) + + log.info("===== Parsing spike file data values =====") + base_file_name = os.path.splitext(xml_file_path)[0] + iq_file_path = base_file_name + ".iq" + + # TODO: Confirm that the data that is used is correct for the Spike files + + # Gather IQ file information from generated SigMF data file + if isinstance(sigmfObj, dict): + sample_rate = ( + sigmfObj.get("global", {}).get("core:sample_rate") + or sigmfObj.get("global", {}).get("sample_rate") + or sigmfObj.get("core:sample_rate") + ) + + + meta_dict = sigmfObj._metadata + + global_info = meta_dict.get("global", {}) + captures = meta_dict.get("captures", []) + annotations = meta_dict.get("annotations", []) + + # TODO: Why am I no able to access the sample_rate value from the generated SigMF metadata dict? Is this a problem with how the metadata is being generated in the spike_to_sigmf_metadata function? + # log.info(f"Sample rate from SigMF metadata dict: {sample_rate}") + # elem_count=(annotations[0].get("core:sample_count", 0))*2 #*2 for I and Q samples + + elem_size = np.dtype(np.int16).itemsize # complex 16-bit integer IQ data > ci16_le in SigMF + elem_count = os.path.getsize(iq_file_path) // np.dtype(np.int16).itemsize # complex 16-bit integer IQ data > ci16_le in SigMF + + # Read raw interleaved int16 IQ + samples = np.fromfile(iq_file_path, dtype=np.int16, offset=0, count=elem_count) + + # Trim trailing partial bytes + if samples.nbytes % elem_size != 0: + trim = samples % elem_size + log.warning("Trimming %d trailing byte(s) to align samples", trim) + samples -= trim + + # TODO: Confirm that there is no need to reassemble interleaved IQ samples + # samples = raw_samples[::2] + 1j*raw_samples[1::2] # convert to IQIQIQ... + + # TODO: Use consitent file names in output + # output_dir = filenames["meta_fn"].parent + samples.tofile(iq_file_path + ".sigmf-data") + log.info(f"==== Wrote SigMF data to {iq_file_path + '.sigmf-data'} ====") + + # Return the IQ data if needed for further processing if needed in the future. + return samples + +def signalhound_to_sigmf( + signalhound_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, + create_ncd: bool = False, +) -> SigMFFile: + + + """ + Read a signalhound file, optionally write sigmf archive, return associated SigMF object. + + Parameters + ---------- + signalhound_path : str + Path to the signalhound file. + out_path : str, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + + Raises + ------ + SigMFConversionError + If the signalhound file cannot be read. + """ + # FOR TESTING + # create_ncd=True + + signalhound_path = Path(signalhound_path) + out_path = None if out_path is None else Path(out_path) + base_file_name = os.path.splitext(signalhound_path)[0] + meta_path = base_file_name + ".sigmf-meta" + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? + # For now using file modification time + modify_time = signalhound_path.lstat().st_mtime + signalhound_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) + + capture_info = { + SigMFFile.DATETIME_KEY: signalhound_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + } + + data_bytes = signalhound_path.stat().st_size + log.info(f"Data Bytes: {data_bytes}") + + # TODO: Update to use SigMF File object instead of dict + # Call the SigMF conversion for metadata generation (returns dict) + SigMFMetaData = spike_to_sigmf_metadata(signalhound_path) + + # Use the generated global metadata dict for SigMFFile construction + # global_info = SigMFMetaData.get("global", {}) + SigMFMeta_dict = SigMFMetaData._metadata + global_info = SigMFMeta_dict.get("global", {}) + + # Set captures and annotations from the generated metadata dict + captures = SigMFMeta_dict.get("captures", []) + # Set annotations + annotations = SigMFMeta_dict.get("annotations", []) + + # create SigMF metadata + meta = SigMFFile(global_info=SigMFMetaData) + meta.data_file = signalhound_path + + # TBD - required? + header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = + global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" + + # Get filenames for metadata, data, and archive based on output path and input file name + filenames = get_sigmf_filenames(out_path) + + # Create NCD if specified, otherwise create standard SigMF dataset or archive + if create_ncd: + # Write .sigmf-meta file + + if out_path is not None: + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("For NCD: wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) + log.debug("created %r", meta) + + if out_path is None: + base_path = signalhound_path.with_suffix(".sigmf") + else: + base_path = Path(out_path) + + # Create Archive if specified, otherwise write separate meta and data files + if create_archive: + # use temporary directory for data file when creating archive + with tempfile.TemporaryDirectory() as temp_dir: + data_path = Path(temp_dir) / filenames["data_fn"].name + + # Convert IQ data and write to temp directory + try: + iq_data = convert_iq_data(str(signalhound_path), meta) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") + + # Write converted IQ data to temporary file + iq_data.tofile(data_path) + # log.info(f"Wrote converted IQ data to {data_path}") + # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = + global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + # Add annotations from metadata + annotations = global_info.get("annotations", []) + + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + annot_metadata = {k: v for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote SigMF archive to %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) + + else: + # Write separate meta and data files + # Convert IQ data for Zero span Spike file + try: + iq_data = convert_iq_data(str(signalhound_path), meta) + except Exception as e: + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") + + # Create SigMFFile with converted IQ data in a BytesIO buffer + data_buffer = io.BytesIO(iq_data.tobytes()) + global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" + # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = + header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + meta = SigMFFile(global_info=global_info) + + meta.set_data_file(data_buffer=data_buffer, skip_checksum=True) + meta.add_capture(0, metadata=capture_info) + + # Add annotations from metadata + annotations = global_info.get("annotations", []) + + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + + # Pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = {k: v for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) + + # Write metadata and data files + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote SigMF metadata and data files to %s", filenames["meta_fn"]) + + log.debug("Created %r", meta) + return meta From 6f1020684f3c7d07c0d9ee4dfd66faec04fb8052 Mon Sep 17 00:00:00 2001 From: Kyle A Logue Date: Tue, 17 Mar 2026 13:28:21 -0700 Subject: [PATCH 07/11] add parity tests for signal hound processing --- tests/test_convert_signalhound.py | 75 +++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 tests/test_convert_signalhound.py diff --git a/tests/test_convert_signalhound.py b/tests/test_convert_signalhound.py new file mode 100644 index 0000000..14619b6 --- /dev/null +++ b/tests/test_convert_signalhound.py @@ -0,0 +1,75 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for Signal Hound Converter""" + +import tempfile +import unittest +import wave +from pathlib import Path + +import numpy as np + +import sigmf +from sigmf.convert.signalhound import signalhound_to_sigmf + +from .test_convert_wav import _validate_ncd +from .testdata import get_nonsigmf_path + + +class TestSignalHoundWithNonSigMFRepo(unittest.TestCase): + """Test Signal Hound converter with real example files if available.""" + + def setUp(self) -> None: + """Find a non-SigMF dataset for testing.""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + nonsigmf_path = get_nonsigmf_path(self) + # glob all files in signal hound directory + hound_dir = nonsigmf_path / "signal_hound" + self.hound_paths = [] + self.hound_paths.extend(hound_dir.glob("*.xml")) + if not self.hound_paths: + self.fail(f"No Signal Hound XML files found in {hound_dir}.") + + def tearDown(self) -> None: + """Clean up temporary directory.""" + self.tmp_dir.cleanup() + + def test_sigmf_pair(self): + """test basic signal hound to sigmf conversion with file pairs""" + for hound_path in self.hound_paths: + sigmf_path = self.tmp_path / hound_path.stem + meta = signalhound_to_sigmf(signalhound_path=hound_path, out_path=sigmf_path) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_sigmf_archive(self): + """test signal hound to sigmf conversion with archive output""" + for hound_path in self.hound_paths: + sigmf_path = self.tmp_path / f"{hound_path.stem}_archive" + meta = signalhound_to_sigmf(signalhound_path=hound_path, out_path=sigmf_path, create_archive=True) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_create_ncd(self): + """test direct NCD conversion""" + for hound_path in self.hound_paths: + meta = signalhound_to_sigmf(signalhound_path=hound_path) + _validate_ncd(self, meta, hound_path) + if len(meta): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_fromfile_ncd(self): + """test automatic NCD conversion with fromfile""" + for hound_path in self.hound_paths: + meta = sigmf.fromfile(hound_path) + _validate_ncd(self, meta, hound_path) From 24634ea11e6e624382f7c530892c60fef071de79 Mon Sep 17 00:00:00 2001 From: Kyle A Logue Date: Tue, 17 Mar 2026 16:25:06 -0700 Subject: [PATCH 08/11] refactor signalhound, add tests, add docs, increment to v1.8.0 --- README.md | 1 + docs/requirements.txt | 5 +- docs/source/converters.rst | 45 +- docs/source/developers.rst | 1 + sigmf/__init__.py | 2 +- sigmf/convert/__main__.py | 5 +- sigmf/convert/blue.py | 1 - sigmf/convert/signalhound.py | 658 +++++++++--------- .../convert/signalhound_fileobject_broken.py | 499 ------------- sigmf/sigmffile.py | 5 + tests/test_convert_signalhound.py | 6 +- tests/test_convert_wav.py | 4 +- 12 files changed, 385 insertions(+), 847 deletions(-) delete mode 100644 sigmf/convert/signalhound_fileobject_broken.py diff --git a/README.md b/README.md index 201e095..e2457cc 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,7 @@ sample_rate = meta.sample_rate # get sample rate # read other formats containing RF time series as SigMF meta = sigmf.fromfile("recording.wav") # WAV meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum +meta = sigmf.fromfile("recording.xml") # Signal Hound Spike ``` ### Docs diff --git a/docs/requirements.txt b/docs/requirements.txt index 1016144..cf97c2d 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,2 @@ -# pinned 2025-01-15 -sphinx==8.1.3 -sphinx-rtd-theme==3.0.2 +sphinx>=8.0 +sphinx-rtd-theme>=3.0 diff --git a/docs/source/converters.rst b/docs/source/converters.rst index 71a5e88..22d45a9 100644 --- a/docs/source/converters.rst +++ b/docs/source/converters.rst @@ -12,6 +12,7 @@ Conversion is available for: * **BLUE files** - MIDAS Blue and Platinum BLUE RF recordings (usually ``.cdif``) * **WAV files** - Audio recordings (``.wav``) +* **Signal Hound Spike files** - Signal Hound zero-span recordings (``.xml`` + ``.iq``) All converters return a :class:`~sigmf.SigMFFile` object with converted metadata. @@ -29,6 +30,7 @@ formats and reads without writing any output files: # auto-detect and create NCD for any supported format meta = sigmf.fromfile("recording.cdif") # BLUE file meta = sigmf.fromfile("recording.wav") # WAV file + meta = sigmf.fromfile("recording.xml") # Signal Hound Spike file meta = sigmf.fromfile("recording.sigmf") # SigMF archive all_samples = meta.read_samples() @@ -44,6 +46,7 @@ For programmatic access, use the individual converter functions directly: from sigmf.convert.wav import wav_to_sigmf from sigmf.convert.blue import blue_to_sigmf + from sigmf.convert.signalhound import signalhound_to_sigmf # convert WAV to SigMF archive _ = wav_to_sigmf(wav_path="recording.wav", out_path="recording", create_archive=True) @@ -51,6 +54,9 @@ For programmatic access, use the individual converter functions directly: # convert BLUE to SigMF pair and return metadata for new files meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording") + # convert Signal Hound Spike to SigMF pair + meta = signalhound_to_sigmf(signalhound_path="recording.xml", out_path="recording") + Command Line Usage ~~~~~~~~~~~~~~~~~~ @@ -65,8 +71,9 @@ Converters are accessed through a unified command-line interface that automatica # examples sigmf_convert recording.cdif recording.sigmf sigmf_convert recording.wav recording.sigmf + sigmf_convert recording.xml recording.sigmf -The converter uses magic byte detection to automatically identify BLUE and WAV file formats. +The converter uses magic byte detection to automatically identify BLUE, WAV, and Signal Hound Spike file formats. No need to remember format-specific commands! @@ -168,4 +175,38 @@ Examples # access standard SigMF data & metadata all_samples = meta.read_samples() - sample_rate_hz = meta.sample_rate \ No newline at end of file + sample_rate_hz = meta.sample_rate + + +Signal Hound Spike Converter +----------------------------- + +The Signal Hound Spike converter handles recordings from Signal Hound devices. +These recordings consist of two files: an XML metadata file (``.xml``) and a binary IQ data file (``.iq``). +The converter extracts metadata from the XML file and references the IQ data file, storing Signal Hound-specific +fields in the ``spike:`` namespace extension. + +.. autofunction:: sigmf.convert.signalhound.signalhound_to_sigmf + +Examples +~~~~~~~~ + +.. code-block:: python + + from sigmf.convert.signalhound import signalhound_to_sigmf + + # standard conversion (provide path to XML file) + meta = signalhound_to_sigmf(signalhound_path="recording.xml", out_path="recording") + + # create NCD automatically (metadata-only, references original .iq file) + meta = signalhound_to_sigmf(signalhound_path="recording.xml") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() + sample_rate = meta.sample_rate + center_freq = meta.get_captures()[0]["core:frequency"] + + # access Signal Hound-specific metadata in spike: namespace + reference_level = meta.get_global_field("spike:reference_level") + if_bandwidth = meta.get_global_field("spike:if_bandwidth") + decimation = meta.get_global_field("spike:decimation") diff --git a/docs/source/developers.rst b/docs/source/developers.rst index 268c713..cd9ee1c 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -60,6 +60,7 @@ To build the docs and host locally: .. code-block:: console $ cd docs + $ pip install -r requirements.txt $ make clean $ make html $ python3 -m http.server --directory build/html/ diff --git a/sigmf/__init__.py b/sigmf/__init__.py index edcbb62..668713d 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.7.0" +__version__ = "1.8.0" # matching version of the SigMF specification __specification__ = "1.2.6" diff --git a/sigmf/convert/__main__.py b/sigmf/convert/__main__.py index 8f57a1b..e1abc6b 100644 --- a/sigmf/convert/__main__.py +++ b/sigmf/convert/__main__.py @@ -98,7 +98,6 @@ def main() -> None: # BLUE file _ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) - ## TODO: Determine proper way to integrate Signal Hound files. elif magic_bytes == b" # Signal Hound Spike 1.0 file # Of the 66 Byte string move 43 bytes in to skip the XML declaration @@ -109,12 +108,12 @@ def main() -> None: else: raise SigMFConversionError( f"Unsupported XML file format. Expanded Magic bytes: {expanded_magic_bytes}. " - f"Supported formats for conversion are WAV, BLUE/Platinum and Signal Hound Spike." + f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." ) else: raise SigMFConversionError( f"Unsupported file format. Magic bytes: {magic_bytes}. " - f"Supported formats for conversion are WAV, BLUE/Platinum and Signal Hound Spike." + f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." ) diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index 46cd6c8..c988542 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -25,7 +25,6 @@ import numpy as np from packaging.version import InvalidVersion, Version -from .. import __version__ as toolversion from ..error import SigMFConversionError from ..sigmffile import SigMFFile, fromfile, get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 08b46b2..31fb922 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -3,53 +3,44 @@ # This file is part of sigmf-python. https://github.com/sigmf/sigmf-python # # SPDX-License-Identifier: LGPL-3.0-or-later -# last updated 3-02-26 -"""converter for signalhound files to SigMF format.""" +"""Signal Hound Converter""" -import os -import json +import getpass import io import logging import tempfile +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta, timezone from pathlib import Path -import hashlib -import numpy as np -from datetime import datetime, timezone, timedelta +from typing import List, Optional, Tuple -import xml.etree.ElementTree as ET -from typing import Optional, List +import numpy as np -from .. import SigMFFile +from .. import SigMFFile, fromfile from ..error import SigMFConversionError - -from .. import __version__ as toolversion -from .. import fromfile from ..sigmffile import get_sigmf_filenames from ..utils import SIGMF_DATETIME_ISO8601_FMT -import sys - -# Use logging defined in __main__.py log = logging.getLogger() -def _to_float(x) -> Optional[float]: - """Convert value to float, return None if invalid.""" - try: - return float(x) - except Exception: - return None +def _text_of(root: ET.Element, tag: str) -> Optional[str]: + """Extract and strip text from XML element.""" + element = root.find(tag) + return element.text.strip() if (element is not None and element.text is not None) else None -def _to_int(x) -> Optional[int]: - """Convert value to int, return None if invalid.""" - try: - return int(float(x)) - except Exception: - return None -def _parse_preview_trace(text) -> List[float]: - """Parse PreviewTrace string into list of floats.""" +def _parse_preview_trace(text: Optional[str]) -> List[float]: + """ + Preview trace is a max-hold trace of the signal power across the capture, represented as a comma-separated string of values. + + Example + ------- + >>> trace_str = "-1.0, 0.1, 0.5, 0.3, 0.7" + >>> _parse_preview_trace(trace_str) + [-1.0, 0.1, 0.5, 0.3, 0.7] + """ if text is None: return [] s = text.strip() @@ -58,272 +49,292 @@ def _parse_preview_trace(text) -> List[float]: if not s: return [] parts = [p.strip() for p in s.split(",") if p.strip() != ""] - # return both list and numpy array if caller wants either vals = [] - for p in parts: - try: - vals.append(float(p)) - except Exception: - # skip malformed entries - continue + for part in parts: + vals.append(float(part)) return vals -def spike_to_sigmf_metadata(xml_file_path) -> dict: +def validate_spike(xml_path: Path) -> None: + """ + Validate required Spike XML metadata fields and associated IQ file. + + Parameters + ---------- + xml_path : Path + Path to the Spike XML file. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid, or IQ file doesn't exist. + """ + tree = ET.parse(xml_path) + root = tree.getroot() + + # validate CenterFrequency + center_freq_raw = _text_of(root, "CenterFrequency") + try: + center_frequency = float(center_freq_raw) + except (TypeError, ValueError) as err: + raise SigMFConversionError(f"Invalid or missing CenterFrequency: {center_freq_raw}") from err + + # validate SampleRate + sample_rate_raw = _text_of(root, "SampleRate") + try: + sample_rate = float(sample_rate_raw) + except (TypeError, ValueError) as err: + raise SigMFConversionError(f"Invalid or missing SampleRate: {sample_rate_raw}") from err + + if sample_rate <= 0: + raise SigMFConversionError(f"Invalid SampleRate: {sample_rate} (must be > 0)") + + # validate EpochNanos + epoch_nanos_raw = _text_of(root, "EpochNanos") + try: + int(epoch_nanos_raw) + except (TypeError, ValueError) as err: + raise SigMFConversionError(f"Invalid or missing EpochNanos: {epoch_nanos_raw}") from err + + # validate DataType + data_type_raw = _text_of(root, "DataType") + if data_type_raw is None: + raise SigMFConversionError("Missing DataType in Spike XML") + + # check datatype mapping - currently only "Complex Short" is supported + if data_type_raw != "Complex Short": + raise SigMFConversionError(f"Unsupported Spike DataType: {data_type_raw}") + + # validate associated IQ file exists + iq_file_path = xml_path.with_suffix(".iq") + if not iq_file_path.exists(): + raise SigMFConversionError(f"Could not find associated IQ file: {iq_file_path}") + + # validate IQ file size is aligned to sample boundary + filesize = iq_file_path.stat().st_size + elem_size = np.dtype(np.int16).itemsize + frame_bytes = 2 * elem_size # I and Q components + if filesize % frame_bytes != 0: + raise SigMFConversionError(f"IQ file size {filesize} not divisible by {frame_bytes}; partial sample present") + + +def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: """ - Build a SigMF metadata file the spike xml file. + Build a SigMF metadata components from the spike xml file. Parameters ---------- - xml_file_path : str + xml_path : Path Path to the spike xml file. + Returns ------- - dict - SigMF metadata structure. + tuple of (dict, dict, list, int) + global_info, capture_info, annotations, sample_count Raises ------ SigMFConversionError If required fields are missing or invalid. """ - log.info("===== Converting Spike XML metadata to SigMF format =====") + log.info("converting spike xml metadata to sigmf format") - xml_path = Path(xml_file_path) + xml_path = Path(xml_path) tree = ET.parse(xml_path) root = tree.getroot() - def text_of(tag) -> Optional[str]: - """Extract and strip text from XML element.""" - el = root.find(tag) - return el.text.strip() if (el is not None and el.text is not None) else None - - # TODO: Determine if data dictionary is best structure for this data - - md = {} - # Signal Hound data elements - for tag in ( - "DeviceType", - "SerialNumber", - "DataType", - "ReferenceLevel", - "CenterFrequency", - "SampleRate", - "Decimation", - "IFBandwidth", - "ScaleFactor", - "IQFileName", - "EpochNanos", - "SampleCount", - "PreviewTrace", - ): - md[f"{tag}_raw"] = text_of(tag) - - # Optional log.info of data for debug >> - # log.info(md) - - # Typed fields / normalized - md["DataType"] = md.pop("DataType_raw") - md["DeviceType"] = md.pop("DeviceType_raw") - md["CenterFrequency"] = _to_float(md.pop("CenterFrequency_raw")) - md["SampleCount"] = _to_int(md.pop("SampleCount_raw")) - md["SampleRate"] = _to_float(md.pop("SampleRate_raw")) - md["EpochNanos"] = _to_int(md.pop("EpochNanos_raw")) - # Will be added as comments or annotations - md["ReferenceLevel"] = _to_float(md.pop("ReferenceLevel_raw")) - md["Decimation"] = _to_int(md.pop("Decimation_raw")) - md["IFBandwidth"] = _to_float(md.pop("IFBandwidth_raw")) - md["ScaleFactor"] = _to_float(md.pop("ScaleFactor_raw")) - md["SerialNumber"] = md.pop("SerialNumber_raw") - md["IQFileName"] = md.pop("IQFileName_raw") - - # PreviewTrace: list of floats and numpy array - # TODO: Consider adding a flag to include preview trace or not. - # TODO: Confirm np.int16 data type for preview data elements. - preview_raw = text_of("PreviewTrace") - md["PreviewTrace_list"] = _parse_preview_trace(preview_raw) - md["PreviewTrace_array"] = np.array(md["PreviewTrace_list"], dtype=np.int16) - - # Create a reference to the spike XML data - spike_xml = md - - # TODO: Confirm Zero Span Spike files are single channel - channel_number = 1 - - # Check datatype mapping based on Spike XML DataType field - should be "Complex Short" - spike_data_type = spike_xml.get("DataType") - if spike_data_type == "Complex Short": - data_type= "ci16_le" # complex int16 little-endian + # validate required fields and associated IQ file + validate_spike(xml_path) + + # extract and convert required fields + center_frequency = float(_text_of(root, "CenterFrequency")) + sample_rate = float(_text_of(root, "SampleRate")) + epoch_nanos = int(_text_of(root, "EpochNanos")) + data_type_raw = _text_of(root, "DataType") + + # map datatype + if data_type_raw == "Complex Short": + data_type = "ci16_le" # complex int16 little-endian else: - raise SigMFConversionError(f"Unsupported Spike DataType: {spike_data_type}") - # Check for DeviceType field for hardware description, otherwise use generic description - device_type = spike_xml.get("DeviceType") - hardware_description = ( - device_type if device_type is not None else "Signal Hound Device" - ) - - # Strip the extension from the original file path - base_file_name = os.path.splitext(xml_file_path)[0] - # Build the .iq file path for data file - data_file_path = base_file_name + ".iq" - filesize = os.path.getsize(data_file_path) - # complex 16-bit integer IQ data > ci16_le in SigMF + raise SigMFConversionError(f"Unsupported Spike DataType: {data_type_raw}") + + # optional fields - only convert if present and valid + reference_level = None + reference_level_raw = _text_of(root, "ReferenceLevel") + if reference_level_raw is not None: + try: + reference_level = float(reference_level_raw) + except ValueError: + log.warning(f"could not parse ReferenceLevel: {reference_level_raw}") + + decimation = None + decimation_raw = _text_of(root, "Decimation") + if decimation_raw is not None: + try: + decimation = int(float(decimation_raw)) + except ValueError: + log.warning(f"could not parse Decimation: {decimation_raw}") + + if_bandwidth = None + if_bandwidth_raw = _text_of(root, "IFBandwidth") + if if_bandwidth_raw is not None: + try: + if_bandwidth = float(if_bandwidth_raw) + except ValueError: + log.warning(f"could not parse IFBandwidth: {if_bandwidth_raw}") + + scale_factor = None + scale_factor_raw = _text_of(root, "ScaleFactor") + if scale_factor_raw is not None: + try: + scale_factor = float(scale_factor_raw) + except ValueError: + log.warning(f"could not parse ScaleFactor: {scale_factor_raw}") + + device_type = _text_of(root, "DeviceType") + iq_file_name = _text_of(root, "IQFileName") + + # parse preview trace if present + preview_trace_raw = _text_of(root, "PreviewTrace") + preview_trace = _parse_preview_trace(preview_trace_raw) if preview_trace_raw else None + + # check for devicetype field for hardware description, otherwise use generic description + hardware_description = device_type if device_type is not None else "Signal Hound Device" + + # strip the extension from the original file path + base_file_name = xml_path.with_suffix("") + # build the .iq file path for data file + data_file_path = base_file_name.with_suffix(".iq") + filesize = data_file_path.stat().st_size + + # complex 16-bit integer IQ data > ci16_le in SigMF elem_size = np.dtype(np.int16).itemsize elem_count = filesize // elem_size - log.info(f"Element Count: {elem_count}") + log.debug("element count: %d", elem_count) frame_bytes = 2 * elem_size if filesize % frame_bytes != 0: raise SigMFConversionError(f"File size {filesize} not divisible by {frame_bytes}; partial sample present") - # Calculate sample count using the original IQ data file size - sample_count = filesize // frame_bytes - log.info(f"Sample count: {sample_count}") - - # For now define static values. Perhaps take as JSON or command arg input in the future. - spike_author = "Spike File Conversion - Unknown Author" - spike_licence = "Spike File Conversion - Unknown License" - spike_description = "Signal Hound Spike Zero Span File converted to SigMF format" - - # Convert the datetime object to an ISO 8601 formatted string - epoch_time = spike_xml.get("EpochNanos") - if epoch_time is None: - raise SigMFConversionError("Missing EpochNanos in Spike XML") - epoch_nanos = int(epoch_time) + # calculate sample count using the original IQ data file size + sample_count_calculated = filesize // frame_bytes + log.debug("sample count: %d", sample_count_calculated) + + # convert the datetime object to an ISO 8601 formatted string secs = epoch_nanos // 1_000_000_000 rem_ns = epoch_nanos % 1_000_000_000 dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) - iso_8601_string = dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" + iso_8601_string = dt.strftime(SIGMF_DATETIME_ISO8601_FMT) - # TODO: Confirm freq_upper_edge and lower_frequency_edge calculations - is this correct for Spike files? ScaleFactor? Mhz? /2? - center = float(spike_xml.get("CenterFrequency") or 0.0) - bandwidth = float(spike_xml.get("IFBandwidth") or 0.0) - upper_frequency_edge = center + (bandwidth / 2.0) - lower_frequency_edge = center - (bandwidth / 2.0) - - # --- Base Global Metadata --- + # base global metadata global_md = { - "core:author": spike_author, - "core:datatype": data_type, - "core:description": spike_description, - "core:hw": hardware_description, - "core:license": spike_licence, - "core:num_channels": channel_number, - "core:sample_rate": spike_xml.get("SampleRate"), - "core:version": "1.0.0", - "core:spike_ReferenceLevel": spike_xml.get("ReferenceLevel"), - "core:spike_Decimation": spike_xml.get("Decimation"), - "core:spike_IFBandwidth": spike_xml.get("IFBandwidth"), - "core:spike_ScaleFactor": spike_xml.get("ScaleFactor"), - "core:spike_IQFileName": spike_xml.get("IQFileName"), + SigMFFile.AUTHOR_KEY: getpass.getuser(), + SigMFFile.DATATYPE_KEY: data_type, + SigMFFile.DESCRIPTION_KEY: "Signal Hound Spike Zero Span File converted to SigMF format", + SigMFFile.HW_KEY: hardware_description, + SigMFFile.NUM_CHANNELS_KEY: 1, + SigMFFile.RECORDER_KEY: "Official SigMF Signal Hound converter", + SigMFFile.SAMPLE_RATE_KEY: sample_rate, + SigMFFile.EXTENSIONS_KEY: [{"name": "spike", "version": "0.0.1", "optional": True}], } - # --- Captures array --- - captures = [ - { - "core:datetime": iso_8601_string, - "core:frequency": float(spike_xml.get("CenterFrequency")), - "core:sample_start": 0, - } - ] - - # --- Create annotations array using calculated values--- - annotations = [ - { - "core:sample_start": 0, - "core:sample_count": sample_count, - "core:freq_upper_edge": upper_frequency_edge, - "core:freq_lower_edge": lower_frequency_edge, - "core:label": "Spike", - } - ] - - # --- Final SigMF object --- - sigmf = { - "global": global_md, - "captures": captures, - "annotations": annotations, + # add optional spike-specific fields to global metadata using spike: namespace + if reference_level is not None: + global_md["spike:reference_level"] = reference_level + if decimation is not None: + global_md["spike:decimation"] = decimation + if if_bandwidth is not None: + global_md["spike:if_bandwidth"] = if_bandwidth + if scale_factor is not None: + global_md["spike:scale_factor"] = scale_factor + if iq_file_name is not None: + global_md["spike:iq_filename"] = iq_file_name + # if preview_trace is not None and len(preview_trace) > 0: + # global_md["spike:preview_trace"] = preview_trace + + # capture info + capture_info = { + SigMFFile.DATETIME_KEY: iso_8601_string, + SigMFFile.FREQUENCY_KEY: center_frequency, } - return sigmf + # create annotations array using calculated values + annotations = [] + if if_bandwidth is not None: + upper_frequency_edge = center_frequency + (if_bandwidth / 2.0) + lower_frequency_edge = center_frequency - (if_bandwidth / 2.0) + annotations.append( + { + SigMFFile.START_INDEX_KEY: 0, + SigMFFile.LENGTH_INDEX_KEY: sample_count_calculated, + SigMFFile.FLO_KEY: lower_frequency_edge, + SigMFFile.FHI_KEY: upper_frequency_edge, + SigMFFile.LABEL_KEY: "Spike", + } + ) + + return global_md, capture_info, annotations, sample_count_calculated -def convert_iq_data(xml_file_path, sigmfObj=None) -> np.ndarray: +def convert_iq_data(xml_path: Path, sample_count: int) -> np.ndarray: """ Convert IQ data in .iq file to SigMF based on values in Zero Span XML file. - Parameters - ---------- - xml_file_path : str - Path to the spike zero span XML file. - sigmfObj : SigMFFile - SigMF object with metadata information. - - Returns - ------- - numpy.ndarray - Parsed samples. + Parameters + ---------- + xml_path : Path + Path to the spike zero span XML file. + sample_count : int + Number of samples to read. + + Returns + ------- + numpy.ndarray + Parsed samples. """ - - # TODO: Although this code may not be needed now, this function can be extended in the future to handle multiple channel recordings? - # (Samples pending for testing with multi-channel Spike files) - - log.info("===== Parsing spike file data values =====") - base_file_name = os.path.splitext(xml_file_path)[0] - iq_file_path = base_file_name + ".iq" - - # TODO: Confirm that the data that is used is correct for the Spike files - - # Gather IQ file information from generated SigMF data file - if isinstance(sigmfObj, dict): - sample_rate = ( - sigmfObj.get("global", {}).get("core:sample_rate") - or sigmfObj.get("global", {}).get("sample_rate") - or sigmfObj.get("core:sample_rate") - ) - elem_count=(sigmfObj.get("annotations", [{}])[0].get("core:sample_count"))*2 # *2 for I and Q samples - - # complex 16-bit integer IQ data > ci16_le in SigMF + log.debug("parsing spike file data values") + base_file_name = Path(xml_path).with_suffix("") + iq_file_path = base_file_name.with_suffix(".iq") + + # calculate element count (I and Q samples) + elem_count = sample_count * 2 # *2 for I and Q samples + + # complex 16-bit integer IQ data > ci16_le in SigMF elem_size = np.dtype(np.int16).itemsize - - # Read raw interleaved int16 IQ + + # read raw interleaved int16 IQ samples = np.fromfile(iq_file_path, dtype=np.int16, offset=0, count=elem_count) - # Trim trailing partial bytes + # trim trailing partial bytes if samples.nbytes % elem_size != 0: - trim = samples % elem_size - log.warning("Trimming %d trailing byte(s) to align samples", trim) - samples -= trim - - # TODO: Confirm that there is no need to reassemble interleaved IQ samples - # samples = raw_samples[::2] + 1j*raw_samples[1::2] # convert to IQIQIQ... - - # TODO: Use consitent file names in output - # output_dir = filenames["meta_fn"].parent - samples.tofile(iq_file_path + ".sigmf-data") - log.info(f"==== Wrote SigMF data to {iq_file_path + '.sigmf-data'} ====") + trim = samples.nbytes % elem_size + log.warning("trimming %d trailing byte(s) to align samples", trim) + samples = samples[: -(trim // elem_size)] - # Return the IQ data if needed for further processing if needed in the future. return samples + def signalhound_to_sigmf( - signalhound_path: str, - out_path: Optional[str] = None, + signalhound_path: Path, + out_path: Optional[Path] = None, create_archive: bool = False, create_ncd: bool = False, + overwrite: bool = False, ) -> SigMFFile: """ Read a signalhound file, optionally write sigmf archive, return associated SigMF object. Parameters ---------- - signalhound_path : str + signalhound_path : Path Path to the signalhound file. - out_path : str, optional + out_path : Path, optional Path to the output SigMF metadata file. create_archive : bool, optional When True, package output as a .sigmf archive. create_ncd : bool, optional When True, create Non-Conforming Dataset + overwrite : bool, optional + If False, raise exception if output files already exist. Returns ------- @@ -337,151 +348,128 @@ def signalhound_to_sigmf( """ signalhound_path = Path(signalhound_path) out_path = None if out_path is None else Path(out_path) - base_file_name = os.path.splitext(signalhound_path)[0] - meta_path = base_file_name + ".sigmf-meta" # auto-enable NCD when no output path is specified if out_path is None: create_ncd = True - # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? - # For now using file modification time - modify_time = signalhound_path.lstat().st_mtime - signalhound_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) + # call the SigMF conversion for metadata generation + global_info, capture_info, annotations, sample_count = _build_metadata(signalhound_path) - capture_info = { - SigMFFile.DATETIME_KEY: signalhound_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), - } + # get filenames for metadata, data, and archive based on output path and input file name + if out_path is None: + base_path = signalhound_path + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) - data_bytes = signalhound_path.stat().st_size - log.info(f"Data Bytes: {data_bytes}") - - # Call the SigMF conversion for metadata generation (returns dict) - sigmfMetaData = spike_to_sigmf_metadata(signalhound_path) - - # Use the generated global metadata dict for SigMFFile construction - global_info = sigmfMetaData.get("global", {}) - - # create SigMF metadata - meta = SigMFFile(global_info=global_info) - meta.data_file = signalhound_path - - # Set captures information - capture_info[SigMFFile.FREQUENCY_KEY] = sigmfMetaData.get("captures", [{}])[0].get("core:frequency") - header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases - capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes - - # Set the annotations information - # Add annotations from metadata - for annotation in sigmfMetaData.get("annotations", []): - start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) - length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) - # Pass remaining fields as metadata (excluding standard annotation keys) - annot_metadata = {k: v for k, v in annotation.items() - if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} - meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - - # Manually set the fields that set_data_file() would normally populate - # TODO: Consider refactoring to use set_data_file() for better consistency - meta._data_file_offset = header_bytes - meta._data_file_size = data_bytes - meta._data_file_skip_checksum = True - - # Get filenames for metadata, data, and archive based on output path and input file name - filenames = get_sigmf_filenames(out_path) - - # Create NCD if specified, otherwise create standard SigMF dataset or archive + # create NCD if specified, otherwise create standard SigMF dataset or archive if create_ncd: - # Write .sigmf-meta file + # add ncd-specific fields + global_info[SigMFFile.DATASET_KEY] = signalhound_path.with_suffix(".iq").name + # spike files have no header or trailing bytes + capture_info[SigMFFile.HEADER_BYTES_KEY] = 0 + global_info[SigMFFile.TRAILING_BYTES_KEY] = 0 + + # build the .iq file path for data file + base_file_name = signalhound_path.with_suffix("") + data_file_path = base_file_name.with_suffix(".iq") - with open(meta_path, "w") as f: - json.dump(sigmfMetaData, f, indent=2) - log.info(f"==== TEMP: Wrote SigMF metadata to {meta_path} ====") + # create metadata-only SigMF for NCD pointing to original file + meta = SigMFFile(global_info=global_info) + meta.set_data_file(data_file=data_file_path, offset=0) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + + # add annotations from metadata + for annotation in annotations: + start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) + length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) + # pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = { + k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } + meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - # TODO: Use this code for metadata and remove code above - # write metadata file if output path specified - if out_path is not None: + # write metadata file if output path specified + if out_path is not None: output_dir = filenames["meta_fn"].parent output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False) - log.info("For NCD: wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) - log.debug("created %r", meta) + meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) - if out_path is None: - base_path = signalhound_path.with_suffix(".sigmf") - else: - base_path = Path(out_path) + log.debug("created %r", meta) + return meta - # Create Archive if specified, otherwise write separate meta and data files + # create archive if specified, otherwise write separate meta and data files if create_archive: # use temporary directory for data file when creating archive with tempfile.TemporaryDirectory() as temp_dir: data_path = Path(temp_dir) / filenames["data_fn"].name - - # Convert IQ data and write to temp directory + + # convert iq data and write to temp directory try: - iq_data = convert_iq_data(str(signalhound_path), sigmfMetaData) + iq_data = convert_iq_data(signalhound_path, sample_count) except Exception as e: - raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") - - # Write converted IQ data to temporary file + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") from e + + # write converted iq data to temporary file iq_data.tofile(data_path) - log.info(f"Wrote converted IQ data to {data_path}") + log.debug("wrote converted iq data to %s", data_path) meta = SigMFFile(data_file=data_path, global_info=global_info) meta.add_capture(0, metadata=capture_info) - - # Add annotations from metadata - for annotation in sigmfMetaData.get("annotations", []): + + # add annotations from metadata + for annotation in annotations: start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) - annot_metadata = {k: v for k, v in annotation.items() - if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + annot_metadata = { + k: v + for k, v in annotation.items() + if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - meta.tofile(filenames["archive_fn"], toarchive=True) + output_dir = filenames["archive_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["archive_fn"], toarchive=True, overwrite=overwrite) log.info("wrote SigMF archive to %s", filenames["archive_fn"]) # metadata returned should be for this archive meta = fromfile(filenames["archive_fn"]) else: - # Write separate meta and data files - # Convert IQ data for Zero span Spike file + # write separate meta and data files + # convert iq data for zero span spike file try: - iq_data = convert_iq_data(str(signalhound_path), sigmfMetaData) + iq_data = convert_iq_data(signalhound_path, sample_count) except Exception as e: - raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") + raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") from e - # Create SigMFFile with converted IQ data in a BytesIO buffer - data_buffer = io.BytesIO(iq_data.tobytes()) - - meta = SigMFFile(global_info=global_info) - meta.set_data_file(data_buffer=data_buffer, skip_checksum=True) + # write data file + output_dir = filenames["data_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + iq_data.tofile(filenames["data_fn"]) + log.debug("wrote SigMF dataset to %s", filenames["data_fn"]) + + # create sigmffile with converted iq data + meta = SigMFFile(data_file=filenames["data_fn"], global_info=global_info) meta.add_capture(0, metadata=capture_info) - # Add annotations from metadata - for annotation in sigmfMetaData.get("annotations", []): + # add annotations from metadata + for annotation in annotations: start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) - # Pass remaining fields as metadata (excluding standard annotation keys) - annot_metadata = {k: v for k, v in annotation.items() - if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} + # pass remaining fields as metadata (excluding standard annotation keys) + annot_metadata = { + k: v for k, v in annotation.items() if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY] + } meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - # TODO: Check which files are being written data vs. just meta data - # Previous Code... - # Write .sigmf-meta file - # meta_path = base_file_name + ".sigmf-meta" - # with open(meta_path, "w") as f: - # json.dump(sigmfMetaData, f, indent=2) - # log.info(f"==== Wrote SigMF metadata to {meta_path} ====") - # log.info("wrote SigMF dataset to %s", data_path) - - # Write metadata and data files - output_dir = filenames["meta_fn"].parent - output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False) - log.info("wrote SigMF metadata and data files to %s", filenames["meta_fn"]) + # write metadata file + meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) - log.debug("Created %r", meta) + log.debug("created %r", meta) return meta diff --git a/sigmf/convert/signalhound_fileobject_broken.py b/sigmf/convert/signalhound_fileobject_broken.py deleted file mode 100644 index c001812..0000000 --- a/sigmf/convert/signalhound_fileobject_broken.py +++ /dev/null @@ -1,499 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later -# last updated 3-16-26 - -"""converter for signalhound files to SigMF format.""" - -import os -import json -import io -import logging -import tempfile -from pathlib import Path -import hashlib -import numpy as np -from datetime import datetime, timezone, timedelta - -import xml.etree.ElementTree as ET -from typing import Optional, List - -from .. import SigMFFile -from ..error import SigMFConversionError - -from .. import __version__ as toolversion -from .. import fromfile -from ..sigmffile import get_sigmf_filenames -from ..utils import SIGMF_DATETIME_ISO8601_FMT - -import sys - -# Use logging defined in __main__.py -log = logging.getLogger() - -def _to_float(x) -> Optional[float]: - """Convert value to float, return None if invalid.""" - try: - return float(x) - except Exception: - return None - - -def _to_int(x) -> Optional[int]: - """Convert value to int, return None if invalid.""" - try: - return int(float(x)) - except Exception: - return None - -def _parse_preview_trace(text) -> List[float]: - """Parse PreviewTrace string into list of floats.""" - if text is None: - return [] - s = text.strip() - if s.endswith(","): - s = s[:-1] - if not s: - return [] - parts = [p.strip() for p in s.split(",") if p.strip() != ""] - # return both list and numpy array if caller wants either - vals = [] - for p in parts: - try: - vals.append(float(p)) - except Exception: - # skip malformed entries - continue - return vals - - -def spike_to_sigmf_metadata(xml_file_path) -> dict: - """ - Build a SigMF metadata file the spike xml file. - - Parameters - ---------- - xml_file_path : str - Path to the spike xml file. - Returns - ------- - dict - SigMF metadata structure. - # TODO:-> SigMFFile: - sigmfObj : SigMFFile - SigMF object with metadata information. - - Raises - ------ - SigMFConversionError - If required fields are missing or invalid. - """ - log.info("===== Converting Spike XML metadata to SigMF format =====") - - xml_path = Path(xml_file_path) - tree = ET.parse(xml_path) - root = tree.getroot() - - def text_of(tag) -> Optional[str]: - """Extract and strip text from XML element.""" - el = root.find(tag) - return el.text.strip() if (el is not None and el.text is not None) else None - - # TODO: Determine if data dictionary is best structure for this data - - md = {} - # Signal Hound data elements - for tag in ( - "DeviceType", - "SerialNumber", - "DataType", - "ReferenceLevel", - "CenterFrequency", - "SampleRate", - "Decimation", - "IFBandwidth", - "ScaleFactor", - "IQFileName", - "EpochNanos", - "SampleCount", - "PreviewTrace", - ): - md[f"{tag}_raw"] = text_of(tag) - - # Optional log.info of data for debug >> - # log.info(md) - - # Typed fields / normalized - md["DataType"] = md.pop("DataType_raw") - md["DeviceType"] = md.pop("DeviceType_raw") - md["CenterFrequency"] = _to_float(md.pop("CenterFrequency_raw")) - md["SampleCount"] = _to_int(md.pop("SampleCount_raw")) - md["SampleRate"] = _to_float(md.pop("SampleRate_raw")) - md["EpochNanos"] = _to_int(md.pop("EpochNanos_raw")) - # Will be added as comments or annotations - md["ReferenceLevel"] = _to_float(md.pop("ReferenceLevel_raw")) - md["Decimation"] = _to_int(md.pop("Decimation_raw")) - md["IFBandwidth"] = _to_float(md.pop("IFBandwidth_raw")) - md["ScaleFactor"] = _to_float(md.pop("ScaleFactor_raw")) - md["SerialNumber"] = md.pop("SerialNumber_raw") - md["IQFileName"] = md.pop("IQFileName_raw") - - # PreviewTrace: list of floats and numpy array - # TODO: Consider adding a flag to include preview trace or not. - # TODO: Confirm np.int16 data type for preview data elements. - preview_raw = text_of("PreviewTrace") - md["PreviewTrace_list"] = _parse_preview_trace(preview_raw) - md["PreviewTrace_array"] = np.array(md["PreviewTrace_list"], dtype=np.int16) - - # Create a reference to the spike XML data - spike_xml = md - - # TODO: Confirm Zero Span Spike files are single channel - channel_number = 1 - - # Check datatype mapping based on Spike XML DataType field - should be "Complex Short" - spike_data_type = spike_xml.get("DataType") - if spike_data_type == "Complex Short": - data_type= "ci16_le" # complex int16 little-endian - else: - raise SigMFConversionError(f"Unsupported Spike DataType: {spike_data_type}") - # Check for DeviceType field for hardware description, otherwise use generic description - device_type = spike_xml.get("DeviceType") - hardware_description = ( - device_type if device_type is not None else "Signal Hound Device" - ) - - # Strip the extension from the original file path - base_file_name = os.path.splitext(xml_file_path)[0] - # Build the .iq file path for data file - data_file_path = base_file_name + ".iq" - filesize = os.path.getsize(data_file_path) - # complex 16-bit integer IQ data > ci16_le in SigMF - elem_size = np.dtype(np.int16).itemsize - elem_count = filesize // elem_size - log.info(f"Element Count: {elem_count}") - frame_bytes = 2 * elem_size - if filesize % frame_bytes != 0: - raise SigMFConversionError(f"File size {filesize} not divisible by {frame_bytes}; partial sample present") - - # Calculate sample count using the original IQ data file size - sample_count = filesize // frame_bytes - log.info(f"Sample count: {sample_count}") - - # For now define static values. Perhaps take as JSON or command arg input in the future. - spike_author = "Spike File Conversion - Unknown Author" - spike_licence = "Spike File Conversion - Unknown License" - spike_description = "Signal Hound Spike Zero Span File converted to SigMF format" - - # Convert the datetime object to an ISO 8601 formatted string - epoch_time = spike_xml.get("EpochNanos") - if epoch_time is None: - raise SigMFConversionError("Missing EpochNanos in Spike XML") - epoch_nanos = int(epoch_time) - secs = epoch_nanos // 1_000_000_000 - rem_ns = epoch_nanos % 1_000_000_000 - dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) - iso_8601_string = dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" - - # TODO: Confirm freq_upper_edge and lower_frequency_edge calculations - is this correct for Spike files? ScaleFactor? Mhz? /2? - center = float(spike_xml.get("CenterFrequency") or 0.0) - bandwidth = float(spike_xml.get("IFBandwidth") or 0.0) - upper_frequency_edge = center + (bandwidth / 2.0) - lower_frequency_edge = center - (bandwidth / 2.0) - - # --- Base Global Metadata --- - global_md = { - "core:author": spike_author, - "core:datatype": data_type, - "core:description": spike_description, - "core:hw": hardware_description, - "core:license": spike_licence, - "core:num_channels": channel_number, - "core:sample_rate": spike_xml.get("SampleRate"), - "core:version": "1.0.0", - "core:spike_ReferenceLevel": spike_xml.get("ReferenceLevel"), - "core:spike_Decimation": spike_xml.get("Decimation"), - "core:spike_IFBandwidth": spike_xml.get("IFBandwidth"), - "core:spike_ScaleFactor": spike_xml.get("ScaleFactor"), - "core:spike_IQFileName": spike_xml.get("IQFileName"), - } - - # --- Captures array --- - captures = [ - { - "core:datetime": iso_8601_string, - "core:frequency": float(spike_xml.get("CenterFrequency")), - "core:sample_start": 0, - } - ] - - # --- Create annotations array using calculated values--- - annotations = [ - { - "core:sample_start": 0, - "core:sample_count": sample_count, - "core:freq_upper_edge": upper_frequency_edge, - "core:freq_lower_edge": lower_frequency_edge, - "core:label": "Spike", - } - ] - - # --- Final SigMF object --- - sigmf = { - "global": global_md, - "captures": captures, - "annotations": annotations, - } - - # TODO: Return SigMFFile object instead of dict - - meta = SigMFFile(global_info=sigmf, skip_checksum=True) - - # return sigmf - return meta - -def convert_iq_data(xml_file_path, sigmfObj=None) -> np.ndarray: - """ - Convert IQ data in .iq file to SigMF based on values in Zero Span XML file. - - Parameters - ---------- - xml_file_path : str - Path to the spike zero span XML file. - sigmfObj : SigMFFile - SigMF object with metadata information. - - Returns - ------- - numpy.ndarray - Parsed samples. - """ - - # TODO: Although this code may not be needed now, this function can be extended in the future to handle multiple channel recordings? - # (Samples pending for testing with multi-channel Spike files) - - log.info("===== Parsing spike file data values =====") - base_file_name = os.path.splitext(xml_file_path)[0] - iq_file_path = base_file_name + ".iq" - - # TODO: Confirm that the data that is used is correct for the Spike files - - # Gather IQ file information from generated SigMF data file - if isinstance(sigmfObj, dict): - sample_rate = ( - sigmfObj.get("global", {}).get("core:sample_rate") - or sigmfObj.get("global", {}).get("sample_rate") - or sigmfObj.get("core:sample_rate") - ) - - - meta_dict = sigmfObj._metadata - - global_info = meta_dict.get("global", {}) - captures = meta_dict.get("captures", []) - annotations = meta_dict.get("annotations", []) - - # TODO: Why am I no able to access the sample_rate value from the generated SigMF metadata dict? Is this a problem with how the metadata is being generated in the spike_to_sigmf_metadata function? - # log.info(f"Sample rate from SigMF metadata dict: {sample_rate}") - # elem_count=(annotations[0].get("core:sample_count", 0))*2 #*2 for I and Q samples - - elem_size = np.dtype(np.int16).itemsize # complex 16-bit integer IQ data > ci16_le in SigMF - elem_count = os.path.getsize(iq_file_path) // np.dtype(np.int16).itemsize # complex 16-bit integer IQ data > ci16_le in SigMF - - # Read raw interleaved int16 IQ - samples = np.fromfile(iq_file_path, dtype=np.int16, offset=0, count=elem_count) - - # Trim trailing partial bytes - if samples.nbytes % elem_size != 0: - trim = samples % elem_size - log.warning("Trimming %d trailing byte(s) to align samples", trim) - samples -= trim - - # TODO: Confirm that there is no need to reassemble interleaved IQ samples - # samples = raw_samples[::2] + 1j*raw_samples[1::2] # convert to IQIQIQ... - - # TODO: Use consitent file names in output - # output_dir = filenames["meta_fn"].parent - samples.tofile(iq_file_path + ".sigmf-data") - log.info(f"==== Wrote SigMF data to {iq_file_path + '.sigmf-data'} ====") - - # Return the IQ data if needed for further processing if needed in the future. - return samples - -def signalhound_to_sigmf( - signalhound_path: str, - out_path: Optional[str] = None, - create_archive: bool = False, - create_ncd: bool = False, -) -> SigMFFile: - - - """ - Read a signalhound file, optionally write sigmf archive, return associated SigMF object. - - Parameters - ---------- - signalhound_path : str - Path to the signalhound file. - out_path : str, optional - Path to the output SigMF metadata file. - create_archive : bool, optional - When True, package output as a .sigmf archive. - create_ncd : bool, optional - When True, create Non-Conforming Dataset - - Returns - ------- - SigMFFile - SigMF object, potentially as Non-Conforming Dataset. - - Raises - ------ - SigMFConversionError - If the signalhound file cannot be read. - """ - # FOR TESTING - # create_ncd=True - - signalhound_path = Path(signalhound_path) - out_path = None if out_path is None else Path(out_path) - base_file_name = os.path.splitext(signalhound_path)[0] - meta_path = base_file_name + ".sigmf-meta" - - # auto-enable NCD when no output path is specified - if out_path is None: - create_ncd = True - - # TODO: Should time be based on file modification time or the EpochNanos field in the XML metadata? - # For now using file modification time - modify_time = signalhound_path.lstat().st_mtime - signalhound_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) - - capture_info = { - SigMFFile.DATETIME_KEY: signalhound_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), - } - - data_bytes = signalhound_path.stat().st_size - log.info(f"Data Bytes: {data_bytes}") - - # TODO: Update to use SigMF File object instead of dict - # Call the SigMF conversion for metadata generation (returns dict) - SigMFMetaData = spike_to_sigmf_metadata(signalhound_path) - - # Use the generated global metadata dict for SigMFFile construction - # global_info = SigMFMetaData.get("global", {}) - SigMFMeta_dict = SigMFMetaData._metadata - global_info = SigMFMeta_dict.get("global", {}) - - # Set captures and annotations from the generated metadata dict - captures = SigMFMeta_dict.get("captures", []) - # Set annotations - annotations = SigMFMeta_dict.get("annotations", []) - - # create SigMF metadata - meta = SigMFFile(global_info=SigMFMetaData) - meta.data_file = signalhound_path - - # TBD - required? - header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases - capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes - # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = - global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" - - # Get filenames for metadata, data, and archive based on output path and input file name - filenames = get_sigmf_filenames(out_path) - - # Create NCD if specified, otherwise create standard SigMF dataset or archive - if create_ncd: - # Write .sigmf-meta file - - if out_path is not None: - output_dir = filenames["meta_fn"].parent - output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False) - log.info("For NCD: wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) - log.debug("created %r", meta) - - if out_path is None: - base_path = signalhound_path.with_suffix(".sigmf") - else: - base_path = Path(out_path) - - # Create Archive if specified, otherwise write separate meta and data files - if create_archive: - # use temporary directory for data file when creating archive - with tempfile.TemporaryDirectory() as temp_dir: - data_path = Path(temp_dir) / filenames["data_fn"].name - - # Convert IQ data and write to temp directory - try: - iq_data = convert_iq_data(str(signalhound_path), meta) - except Exception as e: - raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") - - # Write converted IQ data to temporary file - iq_data.tofile(data_path) - # log.info(f"Wrote converted IQ data to {data_path}") - # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = - global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" - - meta = SigMFFile(data_file=data_path, global_info=global_info) - meta.add_capture(0, metadata=capture_info) - - # Add annotations from metadata - annotations = global_info.get("annotations", []) - - for annotation in annotations: - start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) - length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) - annot_metadata = {k: v for k, v in annotation.items() - if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} - meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - - meta.tofile(filenames["archive_fn"], toarchive=True) - log.info("wrote SigMF archive to %s", filenames["archive_fn"]) - # metadata returned should be for this archive - meta = fromfile(filenames["archive_fn"]) - - else: - # Write separate meta and data files - # Convert IQ data for Zero span Spike file - try: - iq_data = convert_iq_data(str(signalhound_path), meta) - except Exception as e: - raise SigMFConversionError(f"Failed to convert or parse IQ data values: {e}") - - # Create SigMFFile with converted IQ data in a BytesIO buffer - data_buffer = io.BytesIO(iq_data.tobytes()) - global_info[SigMFFile.DATATYPE_KEY] = "ci16_le" - # TODO: Get actual value global_info[SigMFFile.DATATYPE_KEY] = - header_bytes = 0 # No header bytes for raw IQ files, but could be set to non-zero if needed for other file types or future use cases - capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes - - meta = SigMFFile(global_info=global_info) - - meta.set_data_file(data_buffer=data_buffer, skip_checksum=True) - meta.add_capture(0, metadata=capture_info) - - # Add annotations from metadata - annotations = global_info.get("annotations", []) - - for annotation in annotations: - start_idx = annotation.get(SigMFFile.START_INDEX_KEY, 0) - length = annotation.get(SigMFFile.LENGTH_INDEX_KEY) - - # Pass remaining fields as metadata (excluding standard annotation keys) - annot_metadata = {k: v for k, v in annotation.items() - if k not in [SigMFFile.START_INDEX_KEY, SigMFFile.LENGTH_INDEX_KEY]} - meta.add_annotation(start_idx, length=length, metadata=annot_metadata) - - # Write metadata and data files - output_dir = filenames["meta_fn"].parent - output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False) - log.info("wrote SigMF metadata and data files to %s", filenames["meta_fn"]) - - log.debug("Created %r", meta) - return meta diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 5ae4291..2b17890 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -1364,6 +1364,11 @@ def fromfile(filename, skip_checksum=False, autoscale=True): return blue_to_sigmf(file_path, create_ncd=True) + elif ext == ".xml" and magic_bytes.startswith(b" Date: Tue, 17 Mar 2026 16:36:54 -0700 Subject: [PATCH 09/11] revert change to init to resolve conflict temporarily --- sigmf/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 668713d..edcbb62 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.8.0" +__version__ = "1.7.0" # matching version of the SigMF specification __specification__ = "1.2.6" From 7fbe176f132a7dbf99c33964bb965e457e7017c3 Mon Sep 17 00:00:00 2001 From: Kyle A Logue Date: Wed, 18 Mar 2026 09:51:54 -0700 Subject: [PATCH 10/11] unified detect_converter function & better signalhound conversion --- docs/source/converters.rst | 8 ++- sigmf/convert/__init__.py | 91 ++++++++++++++++++++++++ sigmf/convert/__main__.py | 36 +++------- sigmf/convert/signalhound.py | 111 ++++++++++++++++-------------- sigmf/sigmffile.py | 30 ++++---- tests/test_convert_blue.py | 9 ++- tests/test_convert_signalhound.py | 7 +- tests/test_convert_wav.py | 27 ++------ tests/testdata.py | 19 +++++ 9 files changed, 213 insertions(+), 125 deletions(-) diff --git a/docs/source/converters.rst b/docs/source/converters.rst index 22d45a9..43e2770 100644 --- a/docs/source/converters.rst +++ b/docs/source/converters.rst @@ -207,6 +207,8 @@ Examples center_freq = meta.get_captures()[0]["core:frequency"] # access Signal Hound-specific metadata in spike: namespace - reference_level = meta.get_global_field("spike:reference_level") - if_bandwidth = meta.get_global_field("spike:if_bandwidth") - decimation = meta.get_global_field("spike:decimation") + reference_level_dbm = meta.get_global_field("spike:reference_level_dbm") + scale_factor_mw = meta.get_global_field("spike:scale_factor_mw") + if_bandwidth_hz = meta.get_global_field("spike:if_bandwidth_hz") + iq_filename = meta.get_global_field("spike:iq_filename") # original IQ file name + preview_trace = meta.get_global_field("spike:preview_trace") # max-hold trace diff --git a/sigmf/convert/__init__.py b/sigmf/convert/__init__.py index e69de29..1d1654e 100644 --- a/sigmf/convert/__init__.py +++ b/sigmf/convert/__init__.py @@ -0,0 +1,91 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Convert non-SigMF recordings to SigMF format""" + +from pathlib import Path + +from ..error import SigMFConversionError + + +def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes: + """ + Get magic bytes from a file to help identify file type. + + Parameters + ---------- + file_path : Path + Path to the file to read magic bytes from. + count : int, optional + Number of bytes to read. Default is 4. + offset : int, optional + Byte offset to start reading from. Default is 0. + + Returns + ------- + bytes + Magic bytes from the file. + + Raises + ------ + SigMFConversionError + If file cannot be read or is too small. + """ + try: + with open(file_path, "rb") as handle: + handle.seek(offset) + magic_bytes = handle.read(count) + if len(magic_bytes) < count: + raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}") + return magic_bytes + except OSError as err: + raise SigMFConversionError(f"Failed to read magic bytes from {file_path}: {err}") from err + + +def detect_converter(file_path: Path): + """ + Detect the appropriate converter for a non-SigMF file. + + Parameters + ---------- + file_path : Path + Path to the file to detect. + + Returns + ------- + str + The converter name: "wav", "blue", or "signalhound" + + Raises + ------ + SigMFConversionError + If the file format is not supported or cannot be detected. + """ + magic_bytes = get_magic_bytes(file_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + return "wav" + + elif magic_bytes == b"BLUE": + return "blue" + + elif magic_bytes == b" + # Check if it's a Signal Hound Spike file + # Skip XML declaration (40 bytes) and check for SignalHoundIQFile root element + expanded_magic_bytes = get_magic_bytes(file_path, count=17, offset=40) + if expanded_magic_bytes == b"SignalHoundIQFile": + return "signalhound" + else: + raise SigMFConversionError( + f"Unsupported XML file format. Root element: {expanded_magic_bytes}. " + f"Expected SignalHoundIQFile for Signal Hound Spike files." + ) + + else: + raise SigMFConversionError( + f"Unsupported file format. Magic bytes: {magic_bytes}. " + f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." + ) diff --git a/sigmf/convert/__main__.py b/sigmf/convert/__main__.py index e1abc6b..2b215fe 100644 --- a/sigmf/convert/__main__.py +++ b/sigmf/convert/__main__.py @@ -13,11 +13,10 @@ from .. import __version__ as toolversion from ..error import SigMFConversionError -from ..utils import get_magic_bytes +from . import detect_converter from .blue import blue_to_sigmf -from .wav import wav_to_sigmf from .signalhound import signalhound_to_sigmf - +from .wav import wav_to_sigmf def main() -> None: @@ -87,33 +86,16 @@ def main() -> None: if output_path.is_dir(): raise SigMFConversionError(f"Output path must be a filename, not a directory: {output_path}") - # detect file type using magic bytes (same logic as fromfile()) - magic_bytes = get_magic_bytes(input_path, count=4, offset=0) + # detect file type using magic bytes + converter_type = detect_converter(input_path) - if magic_bytes == b"RIFF": - # WAV file + if converter_type == "wav": _ = wav_to_sigmf(wav_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) - - elif magic_bytes == b"BLUE": - # BLUE file + elif converter_type == "blue": _ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) - - elif magic_bytes == b" - # Signal Hound Spike 1.0 file - # Of the 66 Byte string move 43 bytes in to skip the XML declaration - # and get to the root element and take 18 chars for a more specific detection of Signal Hound Spike files - expanded_magic_bytes = get_magic_bytes(input_path, count=17, offset=40) - if expanded_magic_bytes == b"SignalHoundIQFile": - _ = signalhound_to_sigmf(signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) - else: - raise SigMFConversionError( - f"Unsupported XML file format. Expanded Magic bytes: {expanded_magic_bytes}. " - f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." - ) - else: - raise SigMFConversionError( - f"Unsupported file format. Magic bytes: {magic_bytes}. " - f"Supported formats for conversion are WAV, BLUE/Platinum, and Signal Hound Spike." + elif converter_type == "signalhound": + _ = signalhound_to_sigmf( + signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd ) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 31fb922..7666b6c 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -27,8 +27,8 @@ def _text_of(root: ET.Element, tag: str) -> Optional[str]: """Extract and strip text from XML element.""" - element = root.find(tag) - return element.text.strip() if (element is not None and element.text is not None) else None + elem = root.find(tag) + return elem.text.strip() if (elem is not None and elem.text is not None) else None def _parse_preview_trace(text: Optional[str]) -> List[float]: @@ -89,13 +89,6 @@ def validate_spike(xml_path: Path) -> None: if sample_rate <= 0: raise SigMFConversionError(f"Invalid SampleRate: {sample_rate} (must be > 0)") - # validate EpochNanos - epoch_nanos_raw = _text_of(root, "EpochNanos") - try: - int(epoch_nanos_raw) - except (TypeError, ValueError) as err: - raise SigMFConversionError(f"Invalid or missing EpochNanos: {epoch_nanos_raw}") from err - # validate DataType data_type_raw = _text_of(root, "DataType") if data_type_raw is None: @@ -120,12 +113,12 @@ def validate_spike(xml_path: Path) -> None: def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: """ - Build a SigMF metadata components from the spike xml file. + Build SigMF metadata components from the Spike XML file. Parameters ---------- xml_path : Path - Path to the spike xml file. + Path to the Spike XML file. Returns ------- @@ -149,9 +142,17 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # extract and convert required fields center_frequency = float(_text_of(root, "CenterFrequency")) sample_rate = float(_text_of(root, "SampleRate")) - epoch_nanos = int(_text_of(root, "EpochNanos")) data_type_raw = _text_of(root, "DataType") + # optional EpochNanos field + epoch_nanos = None + epoch_nanos_raw = _text_of(root, "EpochNanos") + if epoch_nanos_raw: + try: + epoch_nanos = int(epoch_nanos_raw) + except ValueError: + log.warning(f"could not parse EpochNanos: {epoch_nanos_raw}") + # map datatype if data_type_raw == "Complex Short": data_type = "ci16_le" # complex int16 little-endian @@ -161,7 +162,7 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # optional fields - only convert if present and valid reference_level = None reference_level_raw = _text_of(root, "ReferenceLevel") - if reference_level_raw is not None: + if reference_level_raw: try: reference_level = float(reference_level_raw) except ValueError: @@ -169,7 +170,7 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: decimation = None decimation_raw = _text_of(root, "Decimation") - if decimation_raw is not None: + if decimation_raw: try: decimation = int(float(decimation_raw)) except ValueError: @@ -177,7 +178,7 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: if_bandwidth = None if_bandwidth_raw = _text_of(root, "IFBandwidth") - if if_bandwidth_raw is not None: + if if_bandwidth_raw: try: if_bandwidth = float(if_bandwidth_raw) except ValueError: @@ -185,21 +186,34 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: scale_factor = None scale_factor_raw = _text_of(root, "ScaleFactor") - if scale_factor_raw is not None: + if scale_factor_raw: try: scale_factor = float(scale_factor_raw) except ValueError: log.warning(f"could not parse ScaleFactor: {scale_factor_raw}") device_type = _text_of(root, "DeviceType") + serial_number = _text_of(root, "SerialNumber") iq_file_name = _text_of(root, "IQFileName") # parse preview trace if present preview_trace_raw = _text_of(root, "PreviewTrace") preview_trace = _parse_preview_trace(preview_trace_raw) if preview_trace_raw else None - # check for devicetype field for hardware description, otherwise use generic description - hardware_description = device_type if device_type is not None else "Signal Hound Device" + # build hardware description with available information + hw_parts = [] + if device_type: + hw_parts.append(f"Recorded with {device_type}") + else: + hw_parts.append("Recorded with Signal Hound Device") + + if serial_number: + hw_parts.append(f"S/N: {serial_number}") + + if decimation: + hw_parts.append(f"decimation: {decimation}") + + hardware_description = ", ".join(hw_parts) if hw_parts else "Signal Hound Device" # strip the extension from the original file path base_file_name = xml_path.with_suffix("") @@ -209,27 +223,24 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # complex 16-bit integer IQ data > ci16_le in SigMF elem_size = np.dtype(np.int16).itemsize - elem_count = filesize // elem_size - log.debug("element count: %d", elem_count) - frame_bytes = 2 * elem_size - if filesize % frame_bytes != 0: - raise SigMFConversionError(f"File size {filesize} not divisible by {frame_bytes}; partial sample present") + frame_bytes = 2 * elem_size # I and Q components # calculate sample count using the original IQ data file size sample_count_calculated = filesize // frame_bytes log.debug("sample count: %d", sample_count_calculated) - # convert the datetime object to an ISO 8601 formatted string - secs = epoch_nanos // 1_000_000_000 - rem_ns = epoch_nanos % 1_000_000_000 - dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) - iso_8601_string = dt.strftime(SIGMF_DATETIME_ISO8601_FMT) + # convert the datetime object to an ISO 8601 formatted string if EpochNanos is present + iso_8601_string = None + if epoch_nanos is not None: + secs = epoch_nanos // 1_000_000_000 + rem_ns = epoch_nanos % 1_000_000_000 + dt = datetime.fromtimestamp(secs, tz=timezone.utc) + timedelta(microseconds=rem_ns / 1000) + iso_8601_string = dt.strftime(SIGMF_DATETIME_ISO8601_FMT) # base global metadata global_md = { SigMFFile.AUTHOR_KEY: getpass.getuser(), SigMFFile.DATATYPE_KEY: data_type, - SigMFFile.DESCRIPTION_KEY: "Signal Hound Spike Zero Span File converted to SigMF format", SigMFFile.HW_KEY: hardware_description, SigMFFile.NUM_CHANNELS_KEY: 1, SigMFFile.RECORDER_KEY: "Official SigMF Signal Hound converter", @@ -238,28 +249,28 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: } # add optional spike-specific fields to global metadata using spike: namespace - if reference_level is not None: - global_md["spike:reference_level"] = reference_level - if decimation is not None: - global_md["spike:decimation"] = decimation - if if_bandwidth is not None: - global_md["spike:if_bandwidth"] = if_bandwidth - if scale_factor is not None: - global_md["spike:scale_factor"] = scale_factor - if iq_file_name is not None: - global_md["spike:iq_filename"] = iq_file_name - # if preview_trace is not None and len(preview_trace) > 0: - # global_md["spike:preview_trace"] = preview_trace + # only include fields that aren't already represented in standard SigMF metadata + if reference_level: + global_md["spike:reference_level_dbm"] = reference_level + if scale_factor: + global_md["spike:scale_factor_mw"] = scale_factor # to convert raw to mW + if if_bandwidth: + global_md["spike:if_bandwidth_hz"] = if_bandwidth + if iq_file_name: + global_md["spike:iq_filename"] = iq_file_name # provenance + if preview_trace: + global_md["spike:preview_trace"] = preview_trace # max-hold trace # capture info capture_info = { - SigMFFile.DATETIME_KEY: iso_8601_string, SigMFFile.FREQUENCY_KEY: center_frequency, } + if iso_8601_string: + capture_info[SigMFFile.DATETIME_KEY] = iso_8601_string # create annotations array using calculated values annotations = [] - if if_bandwidth is not None: + if if_bandwidth: upper_frequency_edge = center_frequency + (if_bandwidth / 2.0) lower_frequency_edge = center_frequency - (if_bandwidth / 2.0) annotations.append( @@ -277,12 +288,12 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: def convert_iq_data(xml_path: Path, sample_count: int) -> np.ndarray: """ - Convert IQ data in .iq file to SigMF based on values in Zero Span XML file. + Convert IQ data in .iq file to SigMF based on values in Spike XML file. Parameters ---------- xml_path : Path - Path to the spike zero span XML file. + Path to the spike XML file. sample_count : int Number of samples to read. @@ -292,8 +303,7 @@ def convert_iq_data(xml_path: Path, sample_count: int) -> np.ndarray: Parsed samples. """ log.debug("parsing spike file data values") - base_file_name = Path(xml_path).with_suffix("") - iq_file_path = base_file_name.with_suffix(".iq") + iq_file_path = xml_path.with_suffix(".iq") # calculate element count (I and Q samples) elem_count = sample_count * 2 # *2 for I and Q samples @@ -366,11 +376,10 @@ def signalhound_to_sigmf( # create NCD if specified, otherwise create standard SigMF dataset or archive if create_ncd: - # add ncd-specific fields - global_info[SigMFFile.DATASET_KEY] = signalhound_path.with_suffix(".iq").name # spike files have no header or trailing bytes - capture_info[SigMFFile.HEADER_BYTES_KEY] = 0 + global_info[SigMFFile.DATASET_KEY] = signalhound_path.with_suffix(".iq").name global_info[SigMFFile.TRAILING_BYTES_KEY] = 0 + capture_info[SigMFFile.HEADER_BYTES_KEY] = 0 # build the .iq file path for data file base_file_name = signalhound_path.with_suffix("") @@ -441,7 +450,7 @@ def signalhound_to_sigmf( else: # write separate meta and data files - # convert iq data for zero span spike file + # convert iq data for spike file try: iq_data = convert_iq_data(signalhound_path, sample_count) except Exception as e: diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 2b17890..8cb09ec 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -30,7 +30,7 @@ SigMFFileError, SigMFFileExistsError, ) -from .utils import dict_merge, get_magic_bytes +from .utils import dict_merge class SigMFMetafile: @@ -1286,7 +1286,7 @@ def fromfile(filename, skip_checksum=False, autoscale=True): * a SigMF Metadata file (.sigmf-meta) * a SigMF Dataset file (.sigmf-data) * a SigMF Collection file (.sigmf-collection) - * a non-SigMF RF recording that can be converted (.wav, .cdif) + * a non-SigMF RF recording that can be converted (.wav, .cdif, .xml, etc.) Parameters ---------- @@ -1352,22 +1352,28 @@ def fromfile(filename, skip_checksum=False, autoscale=True): if not autoscale: # TODO: allow autoscale=False for converters warnings.warn("non-SigMF auto-detection conversion only supports autoscale=True; ignoring autoscale=False") - magic_bytes = get_magic_bytes(file_path, count=4, offset=0) - if magic_bytes == b"RIFF": - from .convert.wav import wav_to_sigmf + # lazy imports to avoid circular dependency + from .convert import detect_converter - return wav_to_sigmf(file_path, create_ncd=True) + try: + converter_type = detect_converter(file_path) - elif magic_bytes == b"BLUE": - from .convert.blue import blue_to_sigmf + if converter_type == "wav": + from .convert.wav import wav_to_sigmf - return blue_to_sigmf(file_path, create_ncd=True) + return wav_to_sigmf(file_path, create_ncd=True) + elif converter_type == "blue": + from .convert.blue import blue_to_sigmf - elif ext == ".xml" and magic_bytes.startswith(b" None: for form, atol in self.format_tolerance: self.write_minimal(form.encode()) meta = blue_to_sigmf(self.blue_path) - _validate_ncd(self, meta, self.blue_path) + validate_ncd(self, meta, self.blue_path) self.check_data_and_metadata(meta, form, atol) def test_pair_overwrite_protection(self) -> None: @@ -241,7 +240,7 @@ def test_create_ncd(self): """test direct NCD conversion""" for blue_path in self.blue_paths: meta = blue_to_sigmf(blue_path=blue_path) - _validate_ncd(self, meta, blue_path) + validate_ncd(self, meta, blue_path) if len(meta): # check sample read consistency np.testing.assert_allclose(meta.read_samples(count=10), meta[0:10], atol=1e-6) @@ -250,4 +249,4 @@ def test_fromfile_ncd(self): """test automatic NCD conversion with fromfile()""" for blue_path in self.blue_paths: meta = sigmf.fromfile(blue_path) - _validate_ncd(self, meta, blue_path) + validate_ncd(self, meta, blue_path) diff --git a/tests/test_convert_signalhound.py b/tests/test_convert_signalhound.py index e592655..da45f28 100644 --- a/tests/test_convert_signalhound.py +++ b/tests/test_convert_signalhound.py @@ -16,8 +16,7 @@ import sigmf from sigmf.convert.signalhound import signalhound_to_sigmf -from .test_convert_wav import _validate_ncd -from .testdata import get_nonsigmf_path +from .testdata import get_nonsigmf_path, validate_ncd class TestSignalHoundWithNonSigMFRepo(unittest.TestCase): @@ -64,7 +63,7 @@ def test_create_ncd(self): for hound_path in self.hound_paths: meta = signalhound_to_sigmf(signalhound_path=hound_path) target_path = hound_path.with_suffix(".iq") - _validate_ncd(self, meta, target_path) + validate_ncd(self, meta, target_path) if len(meta): # check sample read consistency np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) @@ -74,4 +73,4 @@ def test_fromfile_ncd(self): for hound_path in self.hound_paths: meta = sigmf.fromfile(hound_path) target_path = hound_path.with_suffix(".iq") - _validate_ncd(self, meta, target_path) + validate_ncd(self, meta, target_path) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index ae468a5..0a5fe37 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -16,26 +16,7 @@ import sigmf from sigmf.convert.wav import wav_to_sigmf -from .testdata import get_nonsigmf_path - - -def _validate_ncd(test: unittest.TestCase, meta: sigmf.SigMFFile, target_path: Path): - """non-conforming dataset has a specific structure""" - test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") - test.assertIsInstance(meta, sigmf.SigMFFile) - - global_info = meta.get_global_info() - capture_info = meta.get_captures() - - # validate NCD SigMF spec compliance - test.assertGreater(len(capture_info), 0, "Should have at least one capture") - test.assertIn("core:header_bytes", capture_info[0]) - if target_path.suffix != ".iq": - # skip for Signal Hound - test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") - test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") - test.assertIn("core:dataset", global_info, "Should have core:dataset field.") - test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") +from .testdata import get_nonsigmf_path, validate_ncd class TestWAVConverter(unittest.TestCase): @@ -111,7 +92,7 @@ def test_wav_to_sigmf_archive(self) -> None: def test_wav_to_sigmf_ncd(self) -> None: """test wav to sigmf conversion as Non-Conforming Dataset""" meta = wav_to_sigmf(wav_path=self.wav_path, create_ncd=True) - _validate_ncd(self, meta, self.wav_path) + validate_ncd(self, meta, self.wav_path) # verify data data = meta.read_samples() @@ -175,7 +156,7 @@ def test_create_ncd(self) -> None: """test direct NCD conversion""" for wav_path in self.wav_paths: meta = wav_to_sigmf(wav_path=wav_path) - _validate_ncd(self, meta, wav_path) + validate_ncd(self, meta, wav_path) # test file read _ = meta.read_samples(count=10) @@ -184,4 +165,4 @@ def test_autodetect_ncd(self) -> None: """test automatic NCD conversion""" for wav_path in self.wav_paths: meta = sigmf.fromfile(wav_path) - _validate_ncd(self, meta, wav_path) + validate_ncd(self, meta, wav_path) diff --git a/tests/testdata.py b/tests/testdata.py index 28e3ac7..2e93b99 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -27,6 +27,25 @@ def get_nonsigmf_path(test: unittest.TestCase) -> Path: return recordings_path +def validate_ncd(test: unittest.TestCase, meta: SigMFFile, target_path: Path): + """Validate that a SigMF object is a properly structured non-conforming dataset (NCD).""" + test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") + test.assertIsInstance(meta, SigMFFile) + + global_info = meta.get_global_info() + capture_info = meta.get_captures() + + # validate NCD SigMF spec compliance + test.assertGreater(len(capture_info), 0, "Should have at least one capture") + test.assertIn("core:header_bytes", capture_info[0]) + if target_path.suffix != ".iq": + # skip for Signal Hound + test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") + test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") + test.assertIn("core:dataset", global_info, "Should have core:dataset field.") + test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") + + TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) TEST_METADATA = { SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], From 7eaa74cc4969de4205eb0e84ebca3c911e532b78 Mon Sep 17 00:00:00 2001 From: Kyle A Logue Date: Wed, 18 Mar 2026 10:13:32 -0700 Subject: [PATCH 11/11] basic tests for signalhound without external repo --- sigmf/convert/signalhound.py | 4 +- tests/test_convert_signalhound.py | 101 ++++++++++++++++++++++++++++++ tests/test_convert_wav.py | 28 ++++----- 3 files changed, 115 insertions(+), 18 deletions(-) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 7666b6c..993ba37 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -203,9 +203,9 @@ def _build_metadata(xml_path: Path) -> Tuple[dict, dict, list, int]: # build hardware description with available information hw_parts = [] if device_type: - hw_parts.append(f"Recorded with {device_type}") + hw_parts.append(f"{device_type}") else: - hw_parts.append("Recorded with Signal Hound Device") + hw_parts.append("Signal Hound Device") if serial_number: hw_parts.append(f"S/N: {serial_number}") diff --git a/tests/test_convert_signalhound.py b/tests/test_convert_signalhound.py index da45f28..4017997 100644 --- a/tests/test_convert_signalhound.py +++ b/tests/test_convert_signalhound.py @@ -19,6 +19,107 @@ from .testdata import get_nonsigmf_path, validate_ncd +class TestSignalHoundConverter(unittest.TestCase): + """Create a realistic Signal Hound XML/IQ file pair and test conversion methods.""" + + def setUp(self) -> None: + """Create temp XML/IQ file pair with tone for testing.""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + self.iq_path = self.tmp_path / "test.iq" + self.xml_path = self.tmp_path / "test.xml" + + # Generate complex IQ test data + self.samp_rate = 48000 + self.center_freq = 915e6 + duration_s = 0.1 + num_samples = int(self.samp_rate * duration_s) + ttt = np.linspace(0, duration_s, num_samples, endpoint=False) + freq = 440 # A4 note + self.iq_data = 0.5 * np.exp(2j * np.pi * freq * ttt) # complex128, normalized to [-0.5, 0.5] + + # Convert complex IQ data to interleaved int16 format (ci16_le - Signal Hound "Complex Short") + scale = 2**15 # int16 range is -32768 to 32767 + ci_real = (self.iq_data.real * scale).astype(np.int16) + ci_imag = (self.iq_data.imag * scale).astype(np.int16) + iq_interleaved = np.empty((len(self.iq_data) * 2,), dtype=np.int16) + iq_interleaved[0::2] = ci_real + iq_interleaved[1::2] = ci_imag + + # Write IQ file as raw interleaved int16 + with open(self.iq_path, "wb") as iq_file: + iq_file.write(iq_interleaved.tobytes()) + + # Write minimal XML metadata file + with open(self.xml_path, "w") as xml_file: + xml_file.write( + f'\n' + f'\n' + f" {self.center_freq}\n" + f" {self.samp_rate}\n" + f" Complex Short\n" + f" {self.iq_path.name}\n" + f"\n" + ) + + def tearDown(self) -> None: + """Clean up temporary directory.""" + self.tmp_dir.cleanup() + + def _verify(self, meta: sigmf.SigMFFile) -> None: + """Verify metadata fields and data integrity.""" + self.assertIsInstance(meta, sigmf.SigMFFile) + self.assertEqual(meta.get_global_field("core:datatype"), "ci16_le") + self.assertEqual(meta.get_global_field("core:sample_rate"), self.samp_rate) + # center frequency is in capture metadata + self.assertEqual(meta.get_captures()[0]["core:frequency"], self.center_freq) + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to int16 quantization + self.assertTrue(np.allclose(self.iq_data, data, atol=1e-4)) + + def test_signalhound_to_sigmf_pair(self): + """Test standard Signal Hound to SigMF conversion with file pairs.""" + sigmf_path = self.tmp_path / "converted" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + self._verify(meta) + + # test overwrite protection + with self.assertRaises(sigmf.error.SigMFFileError) as context: + signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, overwrite=False) + self.assertIn("already exists", str(context.exception)) + + # test overwrite works + meta2 = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, overwrite=True) + self.assertIsInstance(meta2, sigmf.SigMFFile) + + def test_signalhound_to_sigmf_archive(self): + """Test Signal Hound to SigMF conversion with archive output.""" + sigmf_path = self.tmp_path / "converted_archive" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, out_path=sigmf_path, create_archive=True) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + self._verify(meta) + + # test overwrite protection + with self.assertRaises(sigmf.error.SigMFFileError) as context: + signalhound_to_sigmf( + signalhound_path=self.xml_path, out_path=sigmf_path, create_archive=True, overwrite=False + ) + self.assertIn("already exists", str(context.exception)) + + def test_signalhound_to_sigmf_ncd(self): + """Test Signal Hound to SigMF conversion as Non-Conforming Dataset.""" + meta = signalhound_to_sigmf(signalhound_path=self.xml_path, create_ncd=True) + target_path = self.iq_path + validate_ncd(self, meta, target_path) + self._verify(meta) + + class TestSignalHoundWithNonSigMFRepo(unittest.TestCase): """Test Signal Hound converter with real example files if available.""" diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index 0a5fe37..cf28f88 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -46,6 +46,15 @@ def tearDown(self) -> None: """clean up temporary directory""" self.tmp_dir.cleanup() + def _verify(self, meta: sigmf.SigMFFile) -> None: + """Verify metadata fields and data integrity.""" + self.assertIsInstance(meta, sigmf.SigMFFile) + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + def test_wav_to_sigmf_pair(self) -> None: """test standard wav to sigmf conversion with file pairs""" sigmf_path = self.tmp_path / "bar" @@ -53,11 +62,7 @@ def test_wav_to_sigmf_pair(self) -> None: filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") - # verify data - data = meta.read_samples() - self.assertGreater(len(data), 0, "Should read some samples") - # allow numerical differences due to PCM quantization - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self._verify(meta) # test overwrite protection with self.assertRaises(sigmf.error.SigMFFileError) as context: @@ -74,11 +79,7 @@ def test_wav_to_sigmf_archive(self) -> None: meta = wav_to_sigmf(wav_path=self.wav_path, out_path=sigmf_path, create_archive=True) filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") - # verify data - data = meta.read_samples() - self.assertGreater(len(data), 0, "Should read some samples") - # allow numerical differences due to PCM quantization - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self._verify(meta) # test overwrite protection with self.assertRaises(sigmf.error.SigMFFileError) as context: @@ -93,12 +94,7 @@ def test_wav_to_sigmf_ncd(self) -> None: """test wav to sigmf conversion as Non-Conforming Dataset""" meta = wav_to_sigmf(wav_path=self.wav_path, create_ncd=True) validate_ncd(self, meta, self.wav_path) - - # verify data - data = meta.read_samples() - # allow numerical differences due to PCM quantization - self.assertGreater(len(data), 0, "Should read some samples") - self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + self._verify(meta) # test overwrite protection when creating NCD with output path sigmf_path = self.tmp_path / "ncd_test"