diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 6fbf8c0..9c058d2 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -115,3 +115,24 @@ method-based approach. Only core **global** fields support attribute access. Capture and annotation fields must still be accessed using the traditional ``get_captures()`` and ``get_annotations()`` methods. + +-------------------------------- +Control Fixed-Point Data Scaling +-------------------------------- + +For fixed-point datasets, you can control whether samples are automatically scaled to floating-point values: + +.. code-block:: python + + import sigmf + + # Default behavior: autoscale fixed-point data to [-1.0, 1.0] range + handle = sigmf.fromfile("fixed_point_data.sigmf") + samples = handle.read_samples() # Returns float32/complex64 + + # Disable autoscaling to access raw integer values + handle_raw = sigmf.fromfile("fixed_point_data.sigmf", autoscale=False) + raw_samples = handle_raw.read_samples() # Returns original integer types + + # Both slicing and read_samples() respect the autoscale setting + assert handle[0:10].dtype == handle.read_samples(count=10).dtype diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 2c3d46d..7db8e80 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.3.0" +__version__ = "1.4.0" # matching version of the SigMF specification __specification__ = "1.2.6" diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 7f4c1d3..25bac69 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -29,7 +29,9 @@ class SigMFArchiveReader: map_readonly : bool, optional Indicate whether assignments on the numpy.memmap are allowed. archive_buffer : buffer, optional - + Alternative buffer to read archive from. + autoscale : bool, optional + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). Raises ------ @@ -41,7 +43,7 @@ class SigMFArchiveReader: If metadata is invalid. """ - def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None): + def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None, autoscale=True): if name is not None: path = Path(name) if path.suffix != SIGMF_ARCHIVE_EXT: @@ -90,7 +92,7 @@ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_bu if data_offset is None: raise SigMFFileError("No .sigmf-data file found in archive!") - self.sigmffile = SigMFFile(metadata=json_contents) + self.sigmffile = SigMFFile(metadata=json_contents, autoscale=autoscale) self.sigmffile.validate() self.sigmffile.set_data_file( diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 845cb30..4de20d5 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -182,7 +182,7 @@ class SigMFFile(SigMFMetafile): ] VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS} - def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True): + def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True): """ API for SigMF I/O @@ -198,6 +198,9 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu When True will skip calculating hash on data_file (if present) to check against metadata. map_readonly: bool, default True Indicates whether assignments on the numpy.memmap are allowed. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0) + for all sample reading operations including slicing. """ super().__init__() self.data_file = None @@ -205,6 +208,7 @@ def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksu self.sample_count = 0 self._memmap = None self.is_complex_data = False # numpy.iscomplexobj(self._memmap) is not adequate for fixed-point complex case + self.autoscale = autoscale self.set_metadata(metadata) if global_info is not None: @@ -310,10 +314,39 @@ def __next__(self): def __getitem__(self, sli): mem = self._memmap[sli] # matches behavior of numpy.ndarray.__getitem__() + # apply _return_type conversion if set if self._return_type is None: - return mem - - # is_fixed_point and is_complex + # no special conversion needed + if not self.autoscale: + return mem + else: + # apply autoscaling for fixed-point data when autoscale=True + dtype = dtype_info(self.get_global_field(self.DATATYPE_KEY)) + is_fixedpoint_data = dtype["is_fixedpoint"] + + if is_fixedpoint_data: + # apply scaling for fixed-point data + is_unsigned_data = dtype["is_unsigned"] + component_size = dtype["component_size"] + data_type_out = np.dtype("f4") if not self.is_complex_data else np.dtype("f4, f4") + + data = mem.astype(data_type_out) + data = data.view(np.dtype("f4")) + if is_unsigned_data: + data -= 2 ** (component_size * 8 - 1) + data *= 2 ** -(component_size * 8 - 1) + data = data.view(data_type_out) + if self.is_complex_data: + data = data.view(np.complex64) + # for single-channel complex data, flatten the last dimension + if data.ndim > 1 and self.get_num_channels() == 1: + data = data.flatten() + return data[0] if isinstance(sli, int) else data + else: + # floating-point data, no scaling needed + return mem + + # handle complex data type conversion if self._memmap.ndim == 2: # num_channels == 1 ray = mem[:, 0].astype(self._return_type) + 1.0j * mem[:, 1].astype(self._return_type) @@ -740,7 +773,7 @@ def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False): self.dump(fp, pretty=pretty) fp.write("\n") # text files should end in carriage return - def read_samples_in_capture(self, index=0, autoscale=True): + def read_samples_in_capture(self, index=0): """ Reads samples from the specified captures segment in its entirety. @@ -763,9 +796,9 @@ def read_samples_in_capture(self, index=0, autoscale=True): "an integer number of samples across channels. It may be invalid." ) - return self._read_datafile(cb[0], (cb[1] - cb[0]) // self.get_sample_size(), autoscale, False) + return self._read_datafile(cb[0], (cb[1] - cb[0]) // self.get_sample_size()) - def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=False): + def read_samples(self, start_index=0, count=-1): """ Reads the specified number of samples starting at the specified index from the associated data file. @@ -775,16 +808,12 @@ def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=F Starting sample index from which to read. count : int, default -1 Number of samples to read. -1 will read whole file. - autoscale : bool, default True - If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0) - raw_components : bool, default False - If True read and return the sample components (individual I & Q for complex, samples for real) - with no conversions or interleaved channels. Returns ------- data : ndarray Samples are returned as an array of float or complex, with number of dimensions equal to NUM_CHANNELS_KEY. + Scaling behavior depends on the autoscale parameter set during construction. """ if count == 0: raise IOError("Number of samples must be greater than zero, or -1 for all samples.") @@ -800,9 +829,9 @@ def read_samples(self, start_index=0, count=-1, autoscale=True, raw_components=F if not self._is_conforming_dataset(): warnings.warn(f"Recording dataset appears non-compliant, resulting data may be erroneous") - return self._read_datafile(first_byte, count * self.num_channels, autoscale, False) + return self._read_datafile(first_byte, count * self.get_num_channels()) - def _read_datafile(self, first_byte, nitems, autoscale, raw_components): + def _read_datafile(self, first_byte, nitems): """ internal function for reading samples from datafile """ @@ -832,18 +861,15 @@ def _read_datafile(self, first_byte, nitems, autoscale, raw_components): # return reshaped view for num_channels # first dimension will be double size if `is_complex_data` data = data.reshape(data.shape[0] // num_channels, num_channels) - if not raw_components: - data = data.astype(data_type_out) - if autoscale and is_fixedpoint_data: - data = data.view(np.dtype("f4")) - if is_unsigned_data: - data -= 2 ** (component_size * 8 - 1) - data *= 2 ** -(component_size * 8 - 1) - data = data.view(data_type_out) - if self.is_complex_data: - data = data.view(np.complex64) - else: - data = data.view(component_type_in) + data = data.astype(data_type_out) + if self.autoscale and is_fixedpoint_data: + data = data.view(np.dtype("f4")) + if is_unsigned_data: + data -= 2 ** (component_size * 8 - 1) + data *= 2 ** -(component_size * 8 - 1) + data = data.view(data_type_out) + if self.is_complex_data: + data = data.view(np.complex64) if self.data_file is not None: fp.close() @@ -1144,18 +1170,34 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): return None -def fromarchive(archive_path, dir=None, skip_checksum=False): +def fromarchive(archive_path, dir=None, skip_checksum=False, autoscale=True): """Extract an archive and return a SigMFFile. The `dir` parameter is no longer used as this function has been changed to access SigMF archives without extracting them. + + Parameters + ---------- + archive_path: str + Path to `sigmf-archive` tarball. + dir: str, optional + No longer used. Kept for compatibility. + skip_checksum: bool, default False + Skip dataset checksum calculation. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). + + Returns + ------- + SigMFFile + Instance created from archive. """ from .archivereader import SigMFArchiveReader - return SigMFArchiveReader(archive_path, skip_checksum=skip_checksum).sigmffile + return SigMFArchiveReader(archive_path, skip_checksum=skip_checksum, autoscale=autoscale).sigmffile -def fromfile(filename, skip_checksum=False): +def fromfile(filename, skip_checksum=False, autoscale=True): """ Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. @@ -1171,6 +1213,8 @@ def fromfile(filename, skip_checksum=False): Path for SigMF Metadata, Dataset, Archive or Collection (with or without extension). skip_checksum: bool, default False When True will not read entire dataset to calculate hash. + autoscale: bool, default True + If dataset is in a fixed-point representation, scale samples from (min, max) to (-1.0, 1.0). Returns ------- @@ -1187,7 +1231,7 @@ def fromfile(filename, skip_checksum=False): ext = file_path.suffix if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): - return fromarchive(archive_fn, skip_checksum=skip_checksum) + return fromarchive(archive_fn, skip_checksum=skip_checksum, autoscale=autoscale) if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): collection_fp = open(collection_fn, "rb") @@ -1207,7 +1251,7 @@ def fromfile(filename, skip_checksum=False): meta_fp.close() data_fn = get_dataset_filename_from_metadata(meta_fn, metadata) - return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum) + return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum, autoscale=autoscale) def get_sigmf_filenames(filename): diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index 621f37a..e93e24d 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -60,7 +60,7 @@ def test_access_data_without_untar(self): if complex_prefix == "c": # complex data will be half as long target_count //= 2 - self.assertTrue(np.all(np.iscomplex(readback_samples))) + self.assertTrue(np.iscomplexobj(readback_samples)) if num_channels != 1: # check expected # of channels self.assertEqual( diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index 4c3668c..f2171ae 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -197,9 +197,10 @@ def test_multichannel_seek(self): SigMFFile.DATATYPE_KEY: "cu16_le", SigMFFile.NUM_CHANNELS_KEY: 3, }, + autoscale=False, ) # read after the first sample - temp_samples = temp_signal.read_samples(start_index=1, autoscale=False) + temp_samples = temp_signal.read_samples(start_index=1) # ensure samples are in the order we expect self.assertTrue(np.all(temp_samples[:, 0] == np.array([6 + 7j, 12 + 13j]))) @@ -240,74 +241,70 @@ def tearDown(self) -> None: """remove temporary dir""" shutil.rmtree(self.temp_dir) - def prepare(self, data: list, meta: dict, dtype: type) -> SigMFFile: + def prepare(self, data: list, meta: dict, dtype: type, autoscale: bool = True) -> SigMFFile: """write some data and metadata to temporary paths""" np.array(data, dtype=dtype).tofile(self.temp_path_data) with open(self.temp_path_meta, "w") as handle: json.dump(meta, handle) - meta = sigmf.fromfile(self.temp_path_meta, skip_checksum=True) + meta = sigmf.fromfile(self.temp_path_meta, skip_checksum=True, autoscale=autoscale) return meta def test_000(self) -> None: """compliant two-capture recording""" - meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False) self.assertEqual(256, meta._count_samples()) self.assertTrue(meta._is_conforming_dataset()) self.assertTrue((0, 0), meta.get_capture_byte_boundarys(0)) self.assertTrue((0, 256), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples(autoscale=False))) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples())) self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0))) - self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1))) def test_001(self) -> None: """two capture recording with header_bytes and trailing_bytes set""" - meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8) + meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) def test_002(self) -> None: """two capture recording with multiple header_bytes set""" - meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8) + meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((176, 240), meta.get_capture_byte_boundarys(1)) - self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) def test_003(self) -> None: """three capture recording with multiple header_bytes set""" - meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8) + meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) self.assertTrue((32, 64), meta.get_capture_byte_boundarys(0)) self.assertTrue((64, 160), meta.get_capture_byte_boundarys(1)) - self.assertTrue((192, 256), meta.get_capture_byte_boundarys(2)) - self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1, autoscale=False))) - self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2, autoscale=False))) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(2)) + self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2))) def test_004(self) -> None: """two channel version of 000""" - meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) self.assertEqual(96, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) - self.assertTrue( - np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0, autoscale=False)) - ) - self.assertTrue( - np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1, autoscale=False)) - ) + self.assertTrue((32, 96), meta.get_capture_byte_boundarys(0)) + self.assertTrue((96, 160), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1))) def test_slicing_ru8(self) -> None: """slice real uint8""" - meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False) self.assertTrue(np.array_equal(meta[:], TEST_U8_DATA0)) self.assertTrue(np.array_equal(meta[6], TEST_U8_DATA0[6])) self.assertTrue(np.array_equal(meta[1:-1], TEST_U8_DATA0[1:-1])) @@ -320,12 +317,13 @@ def test_slicing_rf32(self) -> None: def test_slicing_multiple_channels(self) -> None: """slice multiple channels""" - meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + meta_raw = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) + meta_scaled = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) # use raw data for this test channelized = np.array(TEST_U8_DATA4).reshape((-1, 2)) - self.assertTrue(np.array_equal(meta[:][:], channelized)) - self.assertTrue(np.array_equal(meta[10:20, 0], meta.read_samples(autoscale=False)[10:20, 0])) - self.assertTrue(np.array_equal(meta[0], channelized[0])) - self.assertTrue(np.array_equal(meta[1, :], channelized[1])) + self.assertTrue(np.array_equal(meta_scaled[:][:], channelized)) + self.assertTrue(np.array_equal(meta_raw[10:20, 0], meta_raw.read_samples()[10:20, 0])) + self.assertTrue(np.array_equal(meta_scaled[0], channelized[0])) + self.assertTrue(np.array_equal(meta_scaled[1, :], channelized[1])) def simulate_capture(sigmf_md, n, capture_len):