#!/usr/bin/env python
# -*- coding: utf-8 -*-
# This file is part of the
# Scikit-NeuroMSI Project (https://github.com/renatoparedes/scikit-neuromsi).
# Copyright (c) 2021-2025, Renato Paredes; Cabral, Juan
# License: BSD 3-Clause
# Full Text:
# https://github.com/renatoparedes/scikit-neuromsi/blob/main/LICENSE.txt
# =============================================================================
# DOCS
# =============================================================================
"""Implementation of I/O for skneuromsi.
This module provides functions for storing and loading NDResult and
NDResultCollection objects to and from files or file-like objects using a
zip-based format.
The NDResult and NDResultCollection objects are serialized using a combination
of JSON (for metadata) and NetCDF (for the underlying nddata). The resulting
files are zip archives containing the serialized metadata and data.
"""
# =============================================================================
# IMPORTS
# =============================================================================
import datetime as dt
import json
import platform
import sys
import zipfile
from tqdm.auto import tqdm
import xarray as xa
from . import core, ndcollection
from .utils import custom_json
# =============================================================================
# CONSTANTS
# =============================================================================
#: Default metadata
_DEFAULT_METADATA = {
"skneuromsi": ".".join(map(str, core.VERSION)),
"authors": "Paredes, Cabral & Seriès",
"author_email": "paredesrenato92@gmail.com",
"affiliation": [
(
"Cognitive Science Group, "
"Instituto de Investigaciones Psicológicas, "
"Facultad de Psicología - UNC-CONICET. "
"Córdoba, Córdoba, Argentina."
),
(
"Department of Psychology, "
"Pontifical Catholic University of Peru, Lima, Peru."
),
(
"The University of Edinburgh, School of Informatics, "
"Edinburgh, United Kingdom."
),
(
"Gerencia De Vinculacion Tecnológica "
"Comisión Nacional de Actividades Espaciales (CONAE), "
"Falda del Cañete, Córdoba, Argentina."
),
(
"Instituto De Astronomía Teorica y Experimental - "
"Observatorio Astronómico Córdoba (IATE-OAC-UNC-CONICET), "
"Cordoba, Argentina."
),
],
"url": "https://github.com/renatoparedes/scikit-neuromsi",
"platform": platform.platform(),
"system_encoding": sys.getfilesystemencoding(),
"Python": sys.version,
"format_version": 0.1,
}
class _Keys:
"""Constants for keys used in metadata dictionaries."""
UTC_TIMESTAMP_KEY = "utc_timestamp"
OBJ_TYPE_KEY = "object_type"
OBJ_KWARGS_KEY = "object_kwargs"
OBJ_SIZE_KEY = "object_size"
EXTRA_METADATA_KEYS = "extra"
class _ZipFileNames:
"""Constants for filenames used within zip archives."""
METADATA = "metadata.json"
NDDATA = "nddata.nc"
class _ObjTypes:
"""Constants for object type identifiers."""
NDRESULT_TYPE = "ndresult"
NDCOLLETION_TYPE = "ndcollection"
class _Compression:
"""Constants for compression settings."""
COMPRESSION = zipfile.ZIP_DEFLATED
COMPRESS_LEVEL = 9
# =============================================================================
# STORE
# =============================================================================
def _prepare_ndc_metadata(
size, obj_type, obj_kwargs, utc_timestamp, extra_metadata
):
"""Prepare metadata for an NDResultCollection.
Parameters
----------
size : int
The number of NDResult objects in the collection.
obj_type : str
The type of the object being serialized
(e.g., 'ndresult' or 'ndcollection').
obj_kwargs : dict
Additional keyword arguments to include in the metadata.
utc_timestamp : datetime.datetime
The UTC timestamp to include in the metadata.
extra_metadata : dict
Additional custom metadata to include.
Returns
-------
dict
The prepared metadata dictionary.
"""
# prepare metadata with the default values, time and custom metadata
nc_metadata = _DEFAULT_METADATA.copy()
nc_metadata.update(
{
_Keys.OBJ_SIZE_KEY: size,
_Keys.UTC_TIMESTAMP_KEY: utc_timestamp,
_Keys.OBJ_TYPE_KEY: obj_type,
_Keys.OBJ_KWARGS_KEY: obj_kwargs,
_Keys.EXTRA_METADATA_KEYS: extra_metadata,
}
)
return nc_metadata
def _ndr_split_and_serialize(ndresult):
"""Split an NDResult into metadata and data, and serialize them.
Parameters
----------
ndresult : NDResult
The NDResult object to split and serialize.
Returns
-------
tuple
A tuple containing the serialized NDResult data (as NetCDF bytes)
and the serialized NDResult metadata (as a JSON string).
"""
# convert the ndresult to dict and extract the xarray
ndresult_kwargs = ndresult.to_dict()
ndr_nddata = ndresult_kwargs.pop("nddata")
ndr_metadata = {
_Keys.OBJ_TYPE_KEY: _ObjTypes.NDRESULT_TYPE,
_Keys.OBJ_KWARGS_KEY: ndresult_kwargs,
}
ndr_nddata_nc = ndr_nddata.to_netcdf(None)
ndr_metadata_json = custom_json.dumps(ndr_metadata, indent=2)
return ndr_nddata_nc, ndr_metadata_json
def _mk_ndr_in_zip_paths(idx):
"""Generate the zip paths for an NDResult at a given index.
Parameters
----------
idx : int
The index of the NDResult.
Returns
-------
tuple
A tuple containing the metadata filename and NDResult data filename.
"""
ndr_metadata_filename = f"ndr_{idx}/{_ZipFileNames.METADATA}"
ndr_nddata_filename = f"ndr_{idx}/{_ZipFileNames.NDDATA}"
return ndr_metadata_filename, ndr_nddata_filename
# API STORE ===================================================================
[docs]
def store_ndresults_collection(
path_or_stream, ndrcollection, *, metadata=None, tqdm_cls=tqdm, **kwargs
):
"""Store an NDResultCollection to a file or stream.
Parameters
----------
path_or_stream : str or file-like object
The file path or stream to write the NDResultCollection to.
ndrcollection : NDResultCollection
The NDResultCollection object to store.
metadata : dict, optional
Additional metadata to include in the output file.
**kwargs
Additional keyword arguments to pass to zipfile.ZipFile.
Raises
------
TypeError
If `ndrcollection` is not an instance of NDResultCollection.
"""
if not isinstance(ndrcollection, ndcollection.NDResultCollection):
raise TypeError(
"'ndrcollection' must be an instance "
f"of {ndcollection.NDResultCollection!r}"
)
# default parameters for zipfile
kwargs.setdefault("compression", _Compression.COMPRESSION)
kwargs.setdefault("compresslevel", _Compression.COMPRESS_LEVEL)
# timestamp
timestamp = dt.datetime.utcnow()
# collection of metadata
ndc_metadata = _prepare_ndc_metadata(
size=len(ndrcollection),
obj_type=_ObjTypes.NDCOLLETION_TYPE,
obj_kwargs={"name": ndrcollection.name},
utc_timestamp=timestamp,
extra_metadata=metadata or {},
)
# serialize metadataa
ndc_metadata_json = custom_json.dumps(ndc_metadata, indent=2)
if tqdm_cls:
ndrcollection = tqdm_cls(
ndrcollection,
total=len(ndrcollection),
desc=f"Saving '{str(path_or_stream)}'",
)
with zipfile.ZipFile(path_or_stream, "w", **kwargs) as zip_fp:
# write every ndresult
for idx, ndresult in enumerate(ndrcollection):
# determine the directory
ndr_metadata_filename, ndr_nddata_filename = _mk_ndr_in_zip_paths(
idx
)
# serielize the ndresult
ndr_nddata_nc, ndr_metadata_json = _ndr_split_and_serialize(
ndresult
)
# write
zip_fp.writestr(ndr_nddata_filename, ndr_nddata_nc)
zip_fp.writestr(ndr_metadata_filename, ndr_metadata_json)
del ndresult, ndr_nddata_nc, ndr_metadata_json
# write the collection metadata.json
zip_fp.writestr(_ZipFileNames.METADATA, ndc_metadata_json)
[docs]
def store_ndresult(path_or_stream, ndresult, *, metadata=None, **kwargs):
"""
Store a single NDResult object to a file or stream.
Parameters
----------
path_or_stream : str or file-like object
The file path or stream to write the NDResult to.
ndresult : NDResult
The NDResult object to store.
metadata : dict, optional
Additional metadata to include in the output file.
**kwargs
Additional keyword arguments to pass to store_ndrcollection.
Raises
------
TypeError
If `ndresult` is not an instance of NDResult.
"""
if not isinstance(ndresult, core.NDResult):
raise TypeError(f"'ndresult' must be an instance of {core.NDResult!r}")
cls_name = type(ndresult).__name__
ndrcollection = ndcollection.NDResultCollection.from_ndresults(
cls_name, [ndresult]
)
store_ndresults_collection(
path_or_stream,
ndrcollection,
metadata=metadata,
tqdm_cls=None,
**kwargs,
)
# =============================================================================
# READ
# =============================================================================
def _check_object_type(obj_type, expected):
"""Check that an object type matches the expected value.
Parameters
----------
obj_type : str
The object type to check.
expected : str
The expected object type.
Raises
------
ValueError
If the object type does not match the expected value.
"""
if obj_type != expected:
raise ValueError(f"'object_type' != {expected!r}. Found {obj_type!r}")
def _generate_ndresults(*, zip_fp, size, tqdm_cls):
"""Read NDResult objects from a zip file into a storage backend."""
indexes = range(size)
if tqdm_cls:
indexes = tqdm_cls(iterable=indexes, desc="Reading ndresults")
for idx in indexes:
# determine the directory
ndr_metadata_filename, ndr_nddata_filename = _mk_ndr_in_zip_paths(idx)
with zip_fp.open(ndr_metadata_filename) as fp:
ndr_metadata = json.load(fp)
obj_type = ndr_metadata.pop(_Keys.OBJ_TYPE_KEY)
_check_object_type(obj_type, _ObjTypes.NDRESULT_TYPE)
with zip_fp.open(ndr_nddata_filename) as fp:
nddata = xa.open_dataarray(fp).compute()
ndresult_kwargs = ndr_metadata[_Keys.OBJ_KWARGS_KEY]
ndresult = core.NDResult(nddata=nddata, **ndresult_kwargs)
yield ndresult
# API READ ====================================================================
[docs]
def open_ndresults_collection(
path_or_stream,
*,
compression_params=core.DEFAULT_COMPRESSION_PARAMS,
expected_size=None,
tqdm_cls=tqdm,
**kwargs,
):
"""Retrieve an NDResultCollection from a file or stream.
Parameters
----------
path_or_stream : str or file-like object
The file path or stream to read the NDResultCollection from.
compression_params : dict, optional
Compression parameters for the NDResultCollection.
expected_size : int, optional
The expected number of NDResult objects in the collection.
tqdm_cls : callable, optional
The tqdm class to use for progress bars.
**kwargs
Additional keyword arguments to pass to zipfile.ZipFile.
Returns
-------
NDResultCollection
The retrieved NDResultCollection object.
Raises
------
ValueError
If the expected size doesn't match the actual size of the collection.
"""
with zipfile.ZipFile(path_or_stream, "r", **kwargs) as zip_fp:
# open the collection metadata
with zip_fp.open(_ZipFileNames.METADATA) as fp:
ndc_metadata = custom_json.load(fp)
# validate the object type
obj_type = ndc_metadata.pop(_Keys.OBJ_TYPE_KEY)
_check_object_type(obj_type, _ObjTypes.NDCOLLETION_TYPE)
# extract the extra arguments needed to create an dncollection
ndcollection_kwargs = ndc_metadata[_Keys.OBJ_KWARGS_KEY]
# retrieve the collection size and check if the size is correct
size = ndc_metadata[_Keys.OBJ_SIZE_KEY]
if expected_size is not None and size != int(expected_size):
raise ValueError(
f"{str(path_or_stream)}: Expected {expected_size} "
f"results, but {size} were found"
)
# create the tag for the storage
tag = ndcollection_kwargs.pop("name", "<UNKNOW>")
nd_results_gen = _generate_ndresults(
zip_fp=zip_fp, size=size, tqdm_cls=tqdm_cls
)
# store the results inside the ndr collection
ndr_collection = ndcollection.NDResultCollection.from_ndresults(
name=tag,
results=nd_results_gen,
tqdm_cls=tqdm_cls,
compression_params=compression_params,
**ndcollection_kwargs,
)
return ndr_collection
[docs]
def open_ndresult(path_or_stream, **kwargs):
"""
Open a single NDResult object from a file or stream.
Parameters
----------
path_or_stream : str or file-like object
The file path or stream to read the NDResult from.
**kwargs
Additional keyword arguments to pass to open_ndrcollection.
Returns
-------
NDResult
The retrieved NDResult object.
"""
ndr_collection = open_ndresults_collection(
path_or_stream,
expected_size=1,
compression_params=None,
tqdm_cls=None,
**kwargs,
)
return ndr_collection[0]
# SHORTCUTS ===================================================================
to_ndr = store_ndresult
read_ndr = open_ndresult
to_ndc = store_ndresults_collection
read_ndc = open_ndresults_collection