Skip to content

aimbat.core

Core logic for AIMBAT.

All functions take a SQLModel Session and work with the models in aimbat.models. The main areas covered are:

  • Default event — get and set the default event (get_default_event, set_default_event).
  • Data — add data to the project, linking each source to its station, event, and seismogram records (add_data_to_project).
  • Events, seismograms, stations — query, update, and delete records; read and write parameters.
  • ICCS / MCCC — run the Iterative Cross-Correlation and Stack (run_iccs) and Multi-Channel Cross-Correlation (run_mccc) algorithms; update picks, time windows, and correlation thresholds.
  • Quality / views — retrieve and aggregate alignment quality metrics (get_quality_seismogram, get_quality_event, get_quality_station, dump_quality_event, dump_quality_station); structured view data for rendering (FieldSpec, FieldGroup, seismogram_quality_groups, event_quality_groups, station_quality_groups).
  • Snapshots — save, restore, and delete parameter snapshots (create_snapshot, rollback_to_snapshot).
  • Project — create and delete the project database (create_project, delete_project).

Classes:

Name Description
BoundICCS

An ICCS instance explicitly bound to a specific event.

FieldGroup

A labelled group of FieldSpec instances for display.

FieldSpec

A single labelled field value for display.

SeismogramQualityStats

Aggregated seismogram quality statistics computed from one or more seismograms.

Functions:

Name Description
add_data_to_project

Add data sources to the AIMBAT database.

build_iccs_from_snapshot

Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

clear_iccs_cache

Clear the process-level ICCS cache.

clear_mccc_quality

Clear MCCC quality metrics from the live quality tables for an event.

compute_parameters_hash

Compute a deterministic SHA-256 hash of the event's current parameters.

create_iccs_instance

Return a BoundICCS instance for the given event.

create_project

Initializes a new AIMBAT project database schema and triggers.

create_snapshot

Create a snapshot of the AIMBAT processing parameters and quality metrics.

delete_event

Delete an AimbatEvent from the database.

delete_project

Delete the AIMBAT project.

delete_seismogram

Delete an AimbatSeismogram from the database.

delete_snapshot

Delete an AIMBAT parameter snapshot.

delete_station

Delete an AimbatStation from the database.

dump_data_table

Return AIMBAT datasources table as a JSON-serialisable list of dicts.

dump_event_parameter_table

Dump the event parameter table data to json.

dump_event_table

Dump the table data to json serialisable list of dicts.

dump_quality_event

Return event MCCC quality as a JSON-serialisable dict.

dump_quality_station

Return station quality as a JSON-serialisable dict.

dump_seismogram_parameter_table

Dump the seismogram parameter table data to json serialisable list of dicts.

dump_seismogram_table

Dump the AimbatSeismogram table to json serialisable list of dicts.

dump_snapshot_tables

Dump snapshot data as a dict of lists of dicts.

dump_station_table

Create a JSON serialisable dict from the AimbatStation table data.

event_quality_groups

Return MCCC quality view data for an event.

get_completed_events

Get the events marked as completed.

get_data_for_event

Returns the data sources belonging to the given event.

get_default_event

Return the currently default event, or None if no event is set as default.

get_events_using_station

Get all events that use a particular station.

get_quality_event

Get MCCC quality metrics for an event from the most recent snapshot.

get_quality_seismogram

Get the quality snapshot for a seismogram from the most recent MCCC run.

get_quality_station

Get aggregated MCCC quality metrics for a station from the most recent snapshots.

get_seismogram_mccc_map

Return per-seismogram MCCC quality values for display from the live quality table.

get_selected_seismograms

Get the selected seismograms for the given event.

get_snapshots

Get the snapshots, optional filtered by event ID.

get_station_iccs_ccs

Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

get_stations_in_event

Get the stations for a particular event.

reset_seismogram_parameters

Reset an AimbatSeismogram's parameters to their default values.

resolve_event

Resolve an event from either an explicit ID or the default event.

rollback_to_snapshot

Rollback to an AIMBAT parameters snapshot.

run_iccs

Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

run_mccc

Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

seismogram_quality_groups

Return quality view data for a single seismogram.

set_default_event

Set the default event (i.e. the one being processed).

set_event_parameter

Set event parameter value for the given event.

set_seismogram_parameter

Set parameter value for an AimbatSeismogram instance.

snapshot_quality_groups

Return MCCC quality view data for a snapshot.

station_quality_groups

Return quality view data for a station.

sync_from_matching_hash

Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

sync_iccs_parameters

Sync an existing ICCS instance's parameters from the database.

validate_iccs_construction

Try to construct an ICCS instance for the event without caching the result.

BoundICCS dataclass

An ICCS instance explicitly bound to a specific event.

Use is_stale to detect whether the event's parameters have been modified (e.g. by a CLI command) since this instance was created.

Parameters:

Name Type Description Default
iccs ICCS
required
event_id UUID
required
created_at Timestamp
required

Methods:

Name Description
is_stale

Return True if the event has been modified since this ICCS was created.

Source code in src/aimbat/core/_iccs.py
@dataclass
class BoundICCS:
    """An ICCS instance explicitly bound to a specific event.

    Use `is_stale` to detect whether the event's parameters have been modified
    (e.g. by a CLI command) since this instance was created.
    """

    iccs: ICCS
    event_id: UUID
    created_at: Timestamp

    def is_stale(self, event: AimbatEvent) -> bool:
        """Return True if the event has been modified since this ICCS was created.

        Args:
            event: The event to check against.
        """
        if event.id != self.event_id:
            return True
        if event.last_modified is None:
            return False
        return event.last_modified > self.created_at

is_stale

is_stale(event: AimbatEvent) -> bool

Return True if the event has been modified since this ICCS was created.

Parameters:

Name Type Description Default
event AimbatEvent

The event to check against.

required
Source code in src/aimbat/core/_iccs.py
def is_stale(self, event: AimbatEvent) -> bool:
    """Return True if the event has been modified since this ICCS was created.

    Args:
        event: The event to check against.
    """
    if event.id != self.event_id:
        return True
    if event.last_modified is None:
        return False
    return event.last_modified > self.created_at

FieldGroup dataclass

A labelled group of FieldSpec instances for display.

When fields is empty the rendering layer should show empty_message if provided.

Parameters:

Name Type Description Default
title str
required
fields list[FieldSpec]

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

<dynamic>
empty_message str | None
None
Source code in src/aimbat/core/_quality.py
@dataclass
class FieldGroup:
    """A labelled group of `FieldSpec` instances for display.

    When `fields` is empty the rendering layer should show
    `empty_message` if provided.
    """

    title: str
    fields: list[FieldSpec] = field(default_factory=list)
    empty_message: str | None = None

FieldSpec dataclass

A single labelled field value for display.

The name is the canonical key used in JSON and enum lookups. The title is the human-readable label sourced from the model's Field(title=...). value and sem are raw Python values; formatters live in the rendering layer.

Parameters:

Name Type Description Default
name str
required
title str
required
value Any
required
sem Any
None
Source code in src/aimbat/core/_quality.py
@dataclass
class FieldSpec:
    """A single labelled field value for display.

    The `name` is the canonical key used in JSON and enum lookups.
    The `title` is the human-readable label sourced from the model's
    `Field(title=...)`. `value` and `sem` are raw Python values;
    formatters live in the rendering layer.
    """

    name: str
    title: str
    value: Any
    sem: Any = None

SeismogramQualityStats

Bases: BaseModel

Aggregated seismogram quality statistics computed from one or more seismograms.

All mean fields are None when no seismograms in the group have quality data. SEM fields are None when fewer than two values are available.

Parameters:

Name Type Description Default
count int
required
cc_mean float | None
None
cc_mean_sem float | None
None
mccc_cc_mean float | None
None
mccc_cc_mean_sem float | None
None
mccc_cc_std float | None
None
mccc_cc_std_sem float | None
None
mccc_error PydanticTimedelta | None
None
mccc_error_sem PydanticTimedelta | None
None
Source code in src/aimbat/core/_quality.py
class SeismogramQualityStats(BaseModel):
    """Aggregated seismogram quality statistics computed from one or more seismograms.

    All mean fields are None when no seismograms in the group have quality data.
    SEM fields are None when fewer than two values are available.
    """

    model_config = ConfigDict(frozen=True)

    count: int = Field(title="Count")
    cc_mean: float | None = Field(default=None, title="ICCS CC mean")
    cc_mean_sem: float | None = Field(default=None, title="ICCS CC mean SEM")
    mccc_cc_mean: float | None = Field(default=None, title="MCCC CC mean")
    mccc_cc_mean_sem: float | None = Field(default=None, title="MCCC CC mean SEM")
    mccc_cc_std: float | None = Field(default=None, title="MCCC CC std")
    mccc_cc_std_sem: float | None = Field(default=None, title="MCCC CC std SEM")
    mccc_error: PydanticTimedelta | None = Field(default=None, title="MCCC error")
    mccc_error_sem: PydanticTimedelta | None = Field(
        default=None, title="MCCC error SEM"
    )

add_data_to_project

add_data_to_project(
    session: Session,
    data_sources: Sequence[PathLike | str],
    data_type: DataType,
    station_id: UUID | None = None,
    event_id: UUID | None = None,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> None

Add data sources to the AIMBAT database.

What gets created depends on which capabilities data_type supports:

  • Station + event + seismogram: all three records are created and linked, and an AimbatDataSource entry is stored.
  • Station or event only (e.g. JSON_STATION, JSON_EVENT): only the relevant metadata records are created; no seismogram or data source entry is stored.

Use station_id or event_id to skip extracting station or event metadata from the data source and link to a pre-existing record instead.

Parameters:

Name Type Description Default
session Session

The SQLModel database session.

required
data_sources Sequence[PathLike | str]

List of data sources to add.

required
data_type DataType

Type of data.

required
station_id UUID | None

UUID of an existing station to use instead of extracting one from each data source.

None
event_id UUID | None

UUID of an existing event to use instead of extracting one from each data source.

None
dry_run bool

If True, do not commit changes to the database.

False
disable_progress_bar bool

Do not display progress bar.

True
Source code in src/aimbat/core/_data.py
def add_data_to_project(
    session: Session,
    data_sources: Sequence[os.PathLike | str],
    data_type: DataType,
    station_id: UUID | None = None,
    event_id: UUID | None = None,
    dry_run: bool = False,
    disable_progress_bar: bool = True,
) -> None:
    """Add data sources to the AIMBAT database.

    What gets created depends on which capabilities `data_type` supports:

    - Station + event + seismogram: all three records are created and linked,
      and an `AimbatDataSource` entry is stored.
    - Station or event only (e.g. `JSON_STATION`, `JSON_EVENT`): only the
      relevant metadata records are created; no seismogram or data source entry
      is stored.

    Use `station_id` or `event_id` to skip extracting station or event metadata
    from the data source and link to a pre-existing record instead.

    Args:
        session: The SQLModel database session.
        data_sources: List of data sources to add.
        data_type: Type of data.
        station_id: UUID of an existing station to use instead of extracting
            one from each data source.
        event_id: UUID of an existing event to use instead of extracting one
            from each data source.
        dry_run: If True, do not commit changes to the database.
        disable_progress_bar: Do not display progress bar.
    """

    logger.info(f"Adding {len(data_sources)} {data_type} data sources to project.")

    if station_id is not None and session.get(AimbatStation, station_id) is None:
        raise NoResultFound(f"No station found with ID {station_id}.")
    if event_id is not None and session.get(AimbatEvent, event_id) is None:
        raise NoResultFound(f"No event found with ID {event_id}.")

    # Snapshot existing IDs before entering the savepoint so we can identify
    # what would be new vs reused when running a dry run.
    if dry_run:
        existing_station_ids = set(session.exec(select(AimbatStation.id)).all())
        existing_event_ids = set(session.exec(select(AimbatEvent.id)).all())
        existing_seismogram_ids = set(session.exec(select(AimbatSeismogram.id)).all())

    try:
        added_datasources: list[AimbatDataSource] = []
        with session.begin_nested() as nested:
            for datasource in track(
                sequence=data_sources,
                description="Adding data ...",
                disable=disable_progress_bar,
            ):
                result = _process_datasource(
                    session, datasource, data_type, station_id, event_id
                )
                if result is not None:
                    added_datasources.append(result)

            if dry_run:
                logger.info("Dry run: displaying data that would be added.")
                if added_datasources:
                    session.flush()
                    _print_dry_run_results(
                        added_datasources,
                        existing_station_ids,
                        existing_event_ids,
                        existing_seismogram_ids,
                    )
                nested.rollback()
                logger.info("Dry run complete. Rolling back changes.")
                return

        session.commit()
        logger.info("Data added successfully.")

    except Exception as e:
        logger.error(f"Failed to add data. Rolling back changes. Error: {e}")
        raise

build_iccs_from_snapshot

build_iccs_from_snapshot(
    session: Session, snapshot_id: UUID
) -> BoundICCS

Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

Uses the snapshot's event and seismogram parameters (window, t1, flip, select, bandpass, etc.) but reads waveform data from the live datasources. Seismograms added after the snapshot was taken are not included in the snapshot — their live parameters are used instead. No DB writes occur at any point.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

ID of the AimbatSnapshot to load.

required

Returns:

Type Description
BoundICCS

BoundICCS instance built from the snapshot parameters.

Raises:

Type Description
ValueError

If no snapshot with the given ID is found.

Source code in src/aimbat/core/_iccs.py
def build_iccs_from_snapshot(session: Session, snapshot_id: UUID) -> BoundICCS:
    """Build a read-only BoundICCS from a snapshot's parameters and live waveform data.

    Uses the snapshot's event and seismogram parameters (window, t1, flip, select,
    bandpass, etc.) but reads waveform data from the live datasources. Seismograms
    added after the snapshot was taken are not included in the snapshot — their live
    parameters are used instead. No DB writes occur at any point.

    Args:
        session: Database session.
        snapshot_id: ID of the AimbatSnapshot to load.

    Returns:
        BoundICCS instance built from the snapshot parameters.

    Raises:
        ValueError: If no snapshot with the given ID is found.
    """
    logger.info(f"Building ICCS from snapshot {snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)
    if snapshot is None:
        raise ValueError(f"Snapshot {snapshot_id} not found.")

    ep = snapshot.event_parameters_snapshot
    snap_params = AimbatEventParametersBase.model_validate(ep)

    # Build a map from seismogram_parameters_id → snapshot parameters
    snap_seis_map = {
        sp.seismogram_parameters_id: sp
        for sp in snapshot.seismogram_parameters_snapshots
    }

    seismograms = []
    for seis in snapshot.event.seismograms:
        snap_sp = snap_seis_map.get(seis.parameters.id)
        if snap_sp is None:
            # Seismogram was added after the snapshot — use live parameters
            seis_params = AimbatSeismogramParametersBase.model_validate(seis.parameters)
        else:
            seis_params = AimbatSeismogramParametersBase.model_validate(snap_sp)
        seismograms.append(
            MiniIccsSeismogram(
                begin_time=seis.begin_time,
                delta=seis.delta,
                data=seis.data,
                t0=seis.t0,
                t1=seis_params.t1,
                flip=seis_params.flip,
                select=seis_params.select,
                extra={"id": seis.id},
            )
        )

    iccs = ICCS(
        seismograms=seismograms,
        window_pre=snap_params.window_pre,
        window_post=snap_params.window_post,
        ramp_width=snap_params.ramp_width,
        bandpass_apply=snap_params.bandpass_apply,
        bandpass_fmin=snap_params.bandpass_fmin,
        bandpass_fmax=snap_params.bandpass_fmax,
        min_cc=snap_params.min_cc,
        context_width=settings.context_width,
    )
    return BoundICCS(
        iccs=iccs,
        event_id=snapshot.event_id,
        created_at=Timestamp.now("UTC"),
    )

clear_iccs_cache

clear_iccs_cache() -> None

Clear the process-level ICCS cache.

Source code in src/aimbat/core/_iccs.py
def clear_iccs_cache() -> None:
    """Clear the process-level ICCS cache."""
    _iccs_cache.clear()

clear_mccc_quality

clear_mccc_quality(
    session: Session, event: AimbatEvent
) -> None

Clear MCCC quality metrics from the live quality tables for an event.

Sets all MCCC fields (mccc_rmse, mccc_error, mccc_cc_mean, mccc_cc_std) to None for the event and all its seismograms. ICCS CC values are not affected.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent whose quality should be cleared.

required
Source code in src/aimbat/core/_iccs.py
def clear_mccc_quality(session: Session, event: AimbatEvent) -> None:
    """Clear MCCC quality metrics from the live quality tables for an event.

    Sets all MCCC fields (`mccc_rmse`, `mccc_error`, `mccc_cc_mean`,
    `mccc_cc_std`) to `None` for the event and all its seismograms.
    ICCS CC values are not affected.

    Args:
        session: Database session.
        event: AimbatEvent whose quality should be cleared.
    """
    logger.debug(f"Clearing MCCC quality for event {event.id}.")

    if event.quality is not None:
        event.quality.mccc_rmse = None
        session.add(event.quality)

    for seis in event.seismograms:
        if seis.quality is not None:
            seis.quality.mccc_error = None
            seis.quality.mccc_cc_mean = None
            seis.quality.mccc_cc_std = None
            session.add(seis.quality)

    session.commit()

compute_parameters_hash

compute_parameters_hash(event: AimbatEvent) -> str

Compute a deterministic SHA-256 hash of the event's current parameters.

Hashes the event ID, all event-level parameters, and per-seismogram parameters. Seismograms are sorted by ID so the result is independent of load order. Including the event ID means hashes are inherently event-scoped and will never collide across events.

Excluded fields:

  • completed (event): does not affect seismogram processing.
  • select (seismogram): determines which seismograms are passed to MCCC but does not affect the computation for any individual seismogram. Membership of the actual MCCC run is captured by the seismogram quality records in the snapshot, so changing selection state should not invalidate a prior MCCC result.

Parameters:

Name Type Description Default
event AimbatEvent

AimbatEvent whose current parameters should be hashed.

required

Returns:

Type Description
str

Hex-encoded SHA-256 digest.

Source code in src/aimbat/core/_snapshot.py
def compute_parameters_hash(event: AimbatEvent) -> str:
    """Compute a deterministic SHA-256 hash of the event's current parameters.

    Hashes the event ID, all event-level parameters, and per-seismogram
    parameters. Seismograms are sorted by ID so the result is independent of
    load order. Including the event ID means hashes are inherently
    event-scoped and will never collide across events.

    Excluded fields:

    - `completed` (event): does not affect seismogram processing.
    - `select` (seismogram): determines which seismograms are passed to MCCC
      but does not affect the computation for any individual seismogram.
      Membership of the actual MCCC run is captured by the seismogram quality
      records in the snapshot, so changing selection state should not
      invalidate a prior MCCC result.

    Args:
        event: AimbatEvent whose current parameters should be hashed.

    Returns:
        Hex-encoded SHA-256 digest.
    """
    logger.debug(f"Computing parameters hash for event {event.id}.")

    # exclude completed field since it does not affect the seismograms directly.
    event_data = AimbatEventParametersBase.model_validate(event.parameters).model_dump(
        mode="json", exclude={"completed"}
    )
    event_data["event_id"] = str(event.id)
    seis_data = sorted(
        [
            {
                "seismogram_id": str(seis.id),
                **AimbatSeismogramParametersBase.model_validate(
                    seis.parameters
                ).model_dump(mode="json", exclude={"select"}),
            }
            for seis in event.seismograms
        ],
        key=lambda x: x["seismogram_id"],
    )
    payload = json.dumps(
        {"event": event_data, "seismograms": seis_data}, sort_keys=True
    )
    return hashlib.sha256(payload.encode()).hexdigest()

create_iccs_instance

create_iccs_instance(
    session: Session, event: AimbatEvent
) -> BoundICCS

Return a BoundICCS instance for the given event.

Returns the cached instance when it is still fresh (i.e. event.last_modified has not advanced since the instance was created). Otherwise builds a new one and updates the cache. ICCS CC values are written to the live quality table in a separate session so the caller's session is not affected.

MiniIccsSeismogram instances are constructed directly from each AimbatSeismogram, passing data by reference to the read-only io cache. No waveform data is copied.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required

Returns:

Type Description
BoundICCS

BoundICCS instance tied to the given event.

Source code in src/aimbat/core/_iccs.py
def create_iccs_instance(session: Session, event: AimbatEvent) -> BoundICCS:
    """Return a BoundICCS instance for the given event.

    Returns the cached instance when it is still fresh (i.e. `event.last_modified`
    has not advanced since the instance was created). Otherwise builds a new one
    and updates the cache. ICCS CC values are written to the live quality table in
    a separate session so the caller's session is not affected.

    `MiniIccsSeismogram` instances are constructed directly from each
    `AimbatSeismogram`, passing `data` by reference to the read-only io cache.
    No waveform data is copied.

    Args:
        session: Database session.
        event: AimbatEvent.

    Returns:
        BoundICCS instance tied to the given event.
    """
    cached = _iccs_cache.get(event.id)
    if cached is not None and not cached.is_stale(event):
        logger.debug(f"Returning cached BoundICCS for event {event.id}.")
        return cached

    logger.debug(f"Creating ICCS instance for event {event.id}.")
    bound = BoundICCS(
        iccs=_build_iccs(event),
        event_id=event.id,
        created_at=Timestamp.now("UTC"),
    )
    _iccs_cache[event.id] = bound
    _write_iccs_stats(event.id, bound.iccs)
    return bound

create_project

create_project(engine: Engine) -> None

Initializes a new AIMBAT project database schema and triggers.

Parameters:

Name Type Description Default
engine Engine

The SQLAlchemy/SQLModel Engine instance connected to the target database.

required

Raises:

Type Description
RuntimeError

If a project schema already exists in the target database.

Source code in src/aimbat/core/_project.py
def create_project(engine: Engine) -> None:
    """Initializes a new AIMBAT project database schema and triggers.

    Args:
        engine: The SQLAlchemy/SQLModel Engine instance connected to the target database.

    Raises:
        RuntimeError: If a project schema already exists in the target database.
    """

    # Import locally to ensure SQLModel registers all table metadata before create_all()
    import aimbat.models  # noqa: F401

    logger.info(f"Creating new project in {engine.url}")

    if _project_exists(engine):
        raise RuntimeError(
            f"Unable to create a new project: project already exists at {engine.url}!"
        )

    logger.debug("Creating database tables and loading defaults.")

    SQLModel.metadata.create_all(engine)

    if engine.name == "sqlite":
        with engine.begin() as connection:
            # Trigger 1: Handle updates to existing rows
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS single_default_event_update
                BEFORE UPDATE ON aimbatevent
                FOR EACH ROW WHEN NEW.is_default = TRUE
                BEGIN
                    UPDATE aimbatevent SET is_default = NULL 
                    WHERE is_default = TRUE AND id != NEW.id;
                END;
            """)
            )

            # Trigger 2: Handle brand new default events being inserted
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS single_default_event_insert
                BEFORE INSERT ON aimbatevent
                FOR EACH ROW WHEN NEW.is_default = TRUE
                BEGIN
                    UPDATE aimbatevent SET is_default = NULL
                    WHERE is_default = TRUE;
                END;
            """)
            )

            # Trigger 3: Track last modification time when event parameters change
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS event_modified_on_params_update
                AFTER UPDATE ON aimbateventparameters
                BEGIN
                    UPDATE aimbatevent SET last_modified = datetime('now')
                    WHERE id = NEW.event_id;
                END;
            """)
            )

            # Trigger 4: Track last modification time when seismogram parameters change
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS event_modified_on_seis_params_update
                AFTER UPDATE ON aimbatseismogramparameters
                BEGIN
                    UPDATE aimbatevent
                    SET last_modified = strftime('%Y-%m-%d %H:%M:%f', 'now')
                    WHERE id = (
                        SELECT event_id FROM aimbatseismogram
                        WHERE id = NEW.seismogram_id
                    );
                END;
            """)
            )

            # Trigger 5: Null all quality when event window/bandpass/ramp parameters change.
            # These parameters change the signal data used by both ICCS and MCCC.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_all_quality_on_window_bandpass_change
                AFTER UPDATE ON aimbateventparameters
                WHEN (NEW.window_pre IS NOT OLD.window_pre)
                  OR (NEW.window_post IS NOT OLD.window_post)
                  OR (NEW.ramp_width IS NOT OLD.ramp_width)
                  OR (NEW.bandpass_apply IS NOT OLD.bandpass_apply)
                  OR (NEW.bandpass_fmin IS NOT OLD.bandpass_fmin)
                  OR (NEW.bandpass_fmax IS NOT OLD.bandpass_fmax)
                BEGIN
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE event_id = NEW.event_id;
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL, mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = NEW.event_id
                    );
                END;
            """)
            )

            # Trigger 6: Null MCCC quality when MCCC-specific event parameters change.
            # These parameters affect only the MCCC inversion, not the underlying signal,
            # so iccs_cc remains valid.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_mccc_quality_on_mccc_params_change
                AFTER UPDATE ON aimbateventparameters
                WHEN (NEW.mccc_damp IS NOT OLD.mccc_damp)
                  OR (NEW.mccc_min_cc IS NOT OLD.mccc_min_cc)
                BEGIN
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE event_id = NEW.event_id;
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = NEW.event_id
                    );
                END;
            """)
            )

            # Trigger 7a: Null quality when flip changes on a seismogram.
            # Flipping a trace only affects the ICCS stack if the seismogram is selected.
            # MCCC stats are invalidated if the seismogram was included in the last MCCC
            # run, which is inferred from the presence of live mccc_cc_mean stats —
            # not from select, because MCCC may have been run with --all.
            # The event-level UPDATE is ordered before the per-seismogram UPDATE so that
            # the EXISTS check sees the original (non-nulled) stats in both statements.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_flip_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW.flip IS NOT OLD.flip
                BEGIN
                    -- Null iccs_cc for all event seismograms if selected (stack changed),
                    -- or just locally if deselected (the flipped seismogram's own CC is stale
                    -- even though the stack is unchanged).
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE (
                        NEW."select" = TRUE
                        AND seismogram_id IN (
                            SELECT id FROM aimbatseismogram WHERE event_id = (
                                SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                            )
                        )
                    ) OR (
                        NEW."select" IS NOT TRUE
                        AND seismogram_id = NEW.seismogram_id
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run (checked before these stats are nulled above)
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

            # Trigger 7b: Null quality when t1 changes on a seismogram.
            # ICCS: if selected, the stack is affected so iccs_cc is stale for all;
            # if deselected, only this seismogram's own iccs_cc is stale.
            # MCCC: invalidated whenever the seismogram was in the last MCCC run,
            # inferred from live mccc_cc_mean — not select — because MCCC may have
            # been run with --all, meaning a deselected seismogram could still be included.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_t1_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW.t1 IS NOT OLD.t1
                BEGIN
                    -- Null iccs_cc for all event seismograms if selected (stack changed),
                    -- otherwise only null locally.
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE (
                        NEW."select" = TRUE
                        AND seismogram_id IN (
                            SELECT id FROM aimbatseismogram WHERE event_id = (
                                SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                            )
                        )
                    ) OR (
                        NEW."select" IS NOT TRUE
                        AND seismogram_id = NEW.seismogram_id
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

            # Trigger 7c: Null quality when select changes on a seismogram.
            # ICCS stack composition changes in both directions (select → deselect and
            # vice versa), so iccs_cc is always invalidated for the whole event.
            # MCCC stats are only invalidated if the seismogram was in the last MCCC run,
            # inferred from live mccc_cc_mean — if MCCC was run with --all, changing
            # select does not change the MCCC set, so live stats remain valid.
            connection.execute(
                text("""
                CREATE TRIGGER IF NOT EXISTS null_quality_on_seis_select_change
                AFTER UPDATE ON aimbatseismogramparameters
                WHEN NEW."select" IS NOT OLD."select"
                BEGIN
                    -- Always null iccs_cc for the whole event (stack composition changed)
                    UPDATE aimbatseismogramquality
                    SET iccs_cc = NULL
                    WHERE seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );

                    -- Null event-level RMSE if this seismogram was in the last MCCC run
                    UPDATE aimbateventquality
                    SET mccc_rmse = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND event_id = (
                        SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                    );

                    -- Null per-seismogram MCCC stats for the whole event if this seismogram
                    -- was in the last MCCC run
                    UPDATE aimbatseismogramquality
                    SET mccc_cc_mean = NULL, mccc_cc_std = NULL, mccc_error = NULL
                    WHERE EXISTS (
                        SELECT 1 FROM aimbatseismogramquality
                        WHERE seismogram_id = NEW.seismogram_id
                          AND mccc_cc_mean IS NOT NULL
                    )
                      AND seismogram_id IN (
                        SELECT id FROM aimbatseismogram WHERE event_id = (
                            SELECT event_id FROM aimbatseismogram WHERE id = NEW.seismogram_id
                        )
                    );
                END;
            """)
            )

create_snapshot

create_snapshot(
    session: Session,
    event: AimbatEvent,
    comment: str | None = None,
) -> None

Create a snapshot of the AIMBAT processing parameters and quality metrics.

Parameter snapshots are always created. Quality snapshots are created whenever the corresponding live quality record has at least one non-None field. Seismogram quality is omitted when all quality fields are None (e.g. before any ICCS or MCCC run).

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
comment str | None

Optional comment.

None
Source code in src/aimbat/core/_snapshot.py
def create_snapshot(
    session: Session,
    event: AimbatEvent,
    comment: str | None = None,
) -> None:
    """Create a snapshot of the AIMBAT processing parameters and quality metrics.

    Parameter snapshots are always created. Quality snapshots are created
    whenever the corresponding live quality record has at least one non-None
    field. Seismogram quality is omitted when all quality fields are `None`
    (e.g. before any ICCS or MCCC run).

    Args:
        session: Database session.
        event: AimbatEvent.
        comment: Optional comment.
    """

    logger.info(f"Creating snapshot for event with id={event.id} with {comment=}.")

    event_parameters_snapshot = AimbatEventParametersSnapshot.model_validate(
        event.parameters,
        update={
            "id": uuid4(),  # we don't want to carry over the id from the input event parameters
            "parameters_id": event.parameters.id,
        },
    )
    logger.debug(
        f"Adding event parameters snapshot with id={event_parameters_snapshot.id} to snapshot."
    )

    seismogram_parameter_snapshots = []
    for aimbat_seismogram in event.seismograms:
        seismogram_parameter_snapshot = AimbatSeismogramParametersSnapshot.model_validate(
            aimbat_seismogram.parameters,
            update={
                "id": uuid4(),  # we don't want to carry over the id from the input seismogram parameters
                "seismogram_parameters_id": aimbat_seismogram.parameters.id,
            },
        )
        logger.debug(
            f"Adding seismogram parameters snapshot with id={seismogram_parameter_snapshot.id} to snapshot."
        )
        seismogram_parameter_snapshots.append(seismogram_parameter_snapshot)

    # Capture quality metrics from the live quality tables.
    event_quality_snap: AimbatEventQualitySnapshot | None = None
    seis_quality_snaps: list[AimbatSeismogramQualitySnapshot] = []

    if event.quality is not None and event.quality.mccc_rmse is not None:
        logger.debug("Capturing event quality snapshot from live quality table.")
        event_quality_snap = AimbatEventQualitySnapshot.model_validate(
            event.quality,
            update={
                "id": uuid4(),
                "event_quality_id": event.quality.id,
            },
        )

    for aimbat_seismogram in event.seismograms:
        sq = aimbat_seismogram.quality
        if sq is None:
            continue
        if any(
            v is not None
            for v in [sq.iccs_cc, sq.mccc_cc_mean, sq.mccc_cc_std, sq.mccc_error]
        ):
            logger.debug(
                f"Adding seismogram quality snapshot for seismogram {aimbat_seismogram.id}."
            )
            seis_quality_snaps.append(
                AimbatSeismogramQualitySnapshot.model_validate(
                    sq,
                    update={
                        "id": uuid4(),
                        "seismogram_quality_id": sq.id,
                    },
                )
            )

    aimbat_snapshot = AimbatSnapshot(
        event=event,
        event_parameters_snapshot=event_parameters_snapshot,
        seismogram_parameters_snapshots=seismogram_parameter_snapshots,
        event_quality_snapshot=event_quality_snap,
        seismogram_quality_snapshots=seis_quality_snaps,
        comment=comment,
        parameters_hash=compute_parameters_hash(event),
    )
    session.add(aimbat_snapshot)
    session.commit()

delete_event

delete_event(session: Session, event_id: UUID) -> None

Delete an AimbatEvent from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event ID.

required
Source code in src/aimbat/core/_event.py
def delete_event(session: Session, event_id: UUID) -> None:
    """Delete an AimbatEvent from the database.

    Args:
        session: Database session.
        event_id: Event ID.

    """

    logger.info(f"Deleting event {event_id}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(f"Unable to find event using id: {event_id}.")

    session.delete(event)
    session.commit()

delete_project

delete_project(engine: Engine) -> None

Delete the AIMBAT project.

Raises:

Type Description
RuntimeError

If unable to delete project.

Source code in src/aimbat/core/_project.py
def delete_project(engine: Engine) -> None:
    """Delete the AIMBAT project.

    Raises:
        RuntimeError: If unable to delete project.
    """

    logger.info(f"Deleting project in {engine=}.")

    if _project_exists(engine):
        if engine.driver == "pysqlite":
            database = engine.url.database
            engine.dispose()
            if database == ":memory:":
                logger.info("Running database in memory, nothing to delete.")
                return
            elif database:
                project_path = Path(database)
                logger.info(f"Deleting project file: {project_path=}")
                project_path.unlink()
                return
    raise RuntimeError("Unable to find/delete project.")

delete_seismogram

delete_seismogram(
    session: Session, seismogram_id: UUID
) -> None

Delete an AimbatSeismogram from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram ID.

required
Source code in src/aimbat/core/_seismogram.py
def delete_seismogram(session: Session, seismogram_id: UUID) -> None:
    """Delete an AimbatSeismogram from the database.

    Args:
        session: Database session.
        seismogram_id: Seismogram ID.

    """

    logger.info(f"Deleting seismogram {seismogram_id}.")

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        raise NoResultFound(f"No AimbatSeismogram found with {seismogram_id=}")

    session.delete(seismogram)
    session.commit()

delete_snapshot

delete_snapshot(
    session: Session, snapshot_id: UUID
) -> None

Delete an AIMBAT parameter snapshot.

Parameters:

Name Type Description Default
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def delete_snapshot(session: Session, snapshot_id: UUID) -> None:
    """Delete an AIMBAT parameter snapshot.

    Args:
        snapshot_id: Snapshot id.
    """
    logger.info(f"Deleting snapshot {snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)
    if snapshot is None:
        raise NoResultFound(f"Unable to find snapshot with {snapshot_id=}")

    session.delete(snapshot)
    session.commit()

delete_station

delete_station(session: Session, station_id: UUID) -> None

Delete an AimbatStation from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

ID of the station to delete.

required
Source code in src/aimbat/core/_station.py
def delete_station(session: Session, station_id: UUID) -> None:
    """Delete an AimbatStation from the database.

    Args:
        session: Database session.
        station_id: ID of the station to delete.
    """

    logger.info(f"Deleting station with id={station_id}.")

    station = session.get(AimbatStation, station_id)
    if station is None:
        raise NoResultFound(f"No AimbatStation found with {station_id=}")

    session.delete(station)
    session.commit()

dump_data_table

dump_data_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]

Return AIMBAT datasources table as a JSON-serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

UUID of the event to filter data sources by. If None, all data sources are returned.

None
by_alias bool

Whether to use field aliases.

False
by_title bool

Whether to use field titles (from the Pydantic model) for the field names in the output. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None

Returns:

Type Description
list[dict[str, Any]]

Aimbat datasources table as a list of dicts.

Source code in src/aimbat/core/_data.py
def dump_data_table(
    session: Session,
    event_id: UUID | None = None,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]]:
    """Return AIMBAT datasources table as a JSON-serialisable list of dicts.

    Args:
        session: Database session.
        event_id: UUID of the event to filter data sources by. If None, all data sources are returned.
        by_alias: Whether to use field aliases.
        by_title: Whether to use field titles (from the Pydantic model) for the
            field names in the output. Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.

    Returns:
        Aimbat datasources table as a list of dicts.
    """
    logger.debug("Dumping AIMBAT datasources table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatDataSource]] = TypeAdapter(
        Sequence[AimbatDataSource]
    )

    if event_id is not None:
        data_source = get_data_for_event(session, event_id)
    else:
        data_source = session.exec(select(AimbatDataSource)).all()

    data = adapter.dump_python(
        data_source, exclude=exclude, by_alias=by_alias, mode="json"
    )

    if by_title:
        title_map = get_title_map(AimbatDataSource)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_event_parameter_table

dump_event_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the event parameter table data to json.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names. Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter parameters by (if none is provided, parameters for all events are dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_event.py
def dump_event_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the event parameter table data to json.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names.
            Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter parameters by (if none is provided,
            parameters for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """

    logger.debug("Dumping AIMBAT event parameter table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatEventParameters]] = TypeAdapter(
        Sequence[AimbatEventParameters]
    )

    if event_id is not None:
        statement = select(AimbatEventParameters).where(
            AimbatEventParameters.event_id == event_id
        )
    else:
        statement = select(AimbatEventParameters)

    parameters = session.exec(statement).all()

    data = adapter.dump_python(
        parameters, mode="json", exclude=exclude, by_alias=by_alias
    )

    if by_title:
        title_map = get_title_map(AimbatEventParameters)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_event_table

dump_event_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]] | str

Dump the table data to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model.

False
by_alias bool

Whether to use serialization aliases for the field names.

False
by_title bool

Whether to use the field title metadata for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_event.py
def dump_event_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
) -> list[dict[str, Any]] | str:
    """Dump the table data to json serialisable list of dicts.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names.
        by_title: Whether to use the field title metadata for the field names in the
            output (only applicable when from_read_model is True). Mutually
            exclusive with by_alias.
        exclude: Set of field names to exclude from the output.

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """
    logger.debug("Dumping AIMBAT event table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    events = session.exec(select(AimbatEvent)).all()

    if from_read_model:
        event_reads = [AimbatEventRead.from_event(e, session=session) for e in events]
        adapter_reads: TypeAdapter[Sequence[AimbatEventRead]] = TypeAdapter(
            Sequence[AimbatEventRead]
        )
        data = adapter_reads.dump_python(
            event_reads, exclude=exclude, by_alias=by_alias, mode="json"
        )

        if by_title:
            title_map = get_title_map(AimbatEventRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatEvent]] = TypeAdapter(Sequence[AimbatEvent])
    return adapter.dump_json(events, exclude=exclude, by_alias=by_alias).decode()

dump_quality_event

dump_quality_event(
    session: Session, event_id: UUID
) -> dict[str, Any]

Return event MCCC quality as a JSON-serialisable dict.

Reads from the most recent snapshot that has quality data. Returns null values for all fields when no MCCC has been run.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event UUID.

required

Returns:

Type Description
dict[str, Any]

Flat dict with event quality and seismogram aggregate statistics.

dict[str, Any]

Timedelta values are serialised as total seconds (float).

Source code in src/aimbat/core/_quality.py
def dump_quality_event(session: Session, event_id: uuid.UUID) -> dict[str, Any]:
    """Return event MCCC quality as a JSON-serialisable dict.

    Reads from the most recent snapshot that has quality data. Returns null
    values for all fields when no MCCC has been run.

    Args:
        session: Database session.
        event_id: Event UUID.

    Returns:
        Flat dict with event quality and seismogram aggregate statistics.
        Timedelta values are serialised as total seconds (float).
    """
    event_quality, stats = get_quality_event(session, event_id)

    if event_quality is not None:
        result: dict[str, Any] = event_quality.model_dump(mode="json")
    else:
        result = {
            "event_id": str(event_id),
            **{k: None for k in AimbatEventQualityBase.model_fields},
        }

    result.update(_stats_dump(stats))
    return result

dump_quality_station

dump_quality_station(
    session: Session, station_id: UUID
) -> dict[str, Any]

Return station quality as a JSON-serialisable dict.

Aggregates seismogram quality across all events recorded at the station, with means and SEMs for all and selected seismograms.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

Station UUID.

required

Returns:

Type Description
dict[str, Any]

Flat dict with seismogram aggregate statistics.

dict[str, Any]

Timedelta values are serialised as total seconds (float).

Source code in src/aimbat/core/_quality.py
def dump_quality_station(session: Session, station_id: uuid.UUID) -> dict[str, Any]:
    """Return station quality as a JSON-serialisable dict.

    Aggregates seismogram quality across all events recorded at the station,
    with means and SEMs for all and selected seismograms.

    Args:
        session: Database session.
        station_id: Station UUID.

    Returns:
        Flat dict with seismogram aggregate statistics.
        Timedelta values are serialised as total seconds (float).
    """
    all_stats, selected_stats = get_quality_station(session, station_id)
    result: dict[str, Any] = {"station_id": str(station_id)}
    result.update(_stats_dump(all_stats))
    result.update(_stats_dump(selected_stats, prefix="selected_"))
    return result

dump_seismogram_parameter_table

dump_seismogram_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the seismogram parameter table data to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismogram parameters by (if none is provided, all seismogram parameters for all events are dumped).

None

Returns:

Type Description
list[dict[str, Any]]

list of dicts representing the seismogram parameters.

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_parameter_table(
    session: Session,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the seismogram parameter table data to json serialisable list of dicts.

    Args:
        session: Database session.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismogram parameters by (if none is provided,
            all seismogram parameters for all events are dumped).

    Returns:
        list of dicts representing the seismogram parameters.

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
    """
    logger.debug("Dumping AimbatSeismogramParameters table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    adapter: TypeAdapter[Sequence[AimbatSeismogramParameters]] = TypeAdapter(
        Sequence[AimbatSeismogramParameters]
    )

    if event_id is not None:
        statement = (
            select(AimbatSeismogramParameters)
            .join(AimbatSeismogram)
            .where(AimbatSeismogram.event_id == event_id)
        )
    else:
        statement = select(AimbatSeismogramParameters)

    parameters = session.exec(statement).all()

    data = adapter.dump_python(
        parameters, mode="json", exclude=exclude, by_alias=by_alias
    )

    if by_title:
        title_map = get_title_map(AimbatSeismogramParameters)
        return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

    return data

dump_seismogram_table

dump_seismogram_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Dump the AimbatSeismogram table to json serialisable list of dicts.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model.

False
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismograms by (if none is provided, seismograms for all events are dumped).

None

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_seismogram.py
def dump_seismogram_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Dump the AimbatSeismogram table to json serialisable list of dicts.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismograms by (if none is provided,
            seismograms for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """
    logger.debug("Dumping AIMBAT seismogram table to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    if event_id is not None:
        statement = select(AimbatSeismogram).where(
            AimbatSeismogram.event_id == event_id
        )
    else:
        statement = select(AimbatSeismogram)

    seismograms = session.exec(statement).all()

    if from_read_model:
        seismogram_reads = [
            AimbatSeismogramRead.from_seismogram(s, session=session)
            for s in seismograms
        ]
        adapter_reads: TypeAdapter[Sequence[AimbatSeismogramRead]] = TypeAdapter(
            Sequence[AimbatSeismogramRead]
        )
        data = adapter_reads.dump_python(
            seismogram_reads, mode="json", exclude=exclude, by_alias=by_alias
        )

        if by_title:
            title_map = get_title_map(AimbatSeismogramRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatSeismogram]] = TypeAdapter(
        Sequence[AimbatSeismogram]
    )

    return adapter.dump_python(
        seismograms, mode="json", exclude=exclude, by_alias=by_alias
    )

dump_snapshot_tables

dump_snapshot_tables(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> dict[str, list[dict[str, Any]]]

Dump snapshot data as a dict of lists of dicts.

Returns a structure with three keys:

  • snapshots: flat list of snapshot metadata.
  • event_parameters: flat list of event parameter snapshots.
  • seismogram_parameters: flat list of seismogram parameter snapshots.

Each entry includes a snapshot_id for cross-referencing.

Parameters:

Name Type Description Default
session Session

Database session.

required
from_read_model bool

Whether to dump from the read model (True) or the ORM model. Only affects the snapshots table.

False
by_alias bool

Whether to use serialization aliases for the field names in the output.

False
by_title bool

Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias.

False
exclude set[str] | None

Set of field names to exclude from the output.

None
event_id UUID | None

Event ID to filter seismograms by (if none is provided, seismograms for all events are dumped).

None
Source code in src/aimbat/core/_snapshot.py
def dump_snapshot_tables(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> dict[str, list[dict[str, Any]]]:
    """Dump snapshot data as a dict of lists of dicts.

    Returns a structure with three keys:

    - `snapshots`: flat list of snapshot metadata.
    - `event_parameters`: flat list of event parameter snapshots.
    - `seismogram_parameters`: flat list of seismogram parameter snapshots.

    Each entry includes a `snapshot_id` for cross-referencing.

    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
            Only affects the `snapshots` table.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismograms by (if none is provided,
            seismograms for all events are dumped).
    """
    logger.debug("Dumping AimbatSnapshot tables to json.")

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    snapshots = get_snapshots(session, event_id)

    event_params_adapter: TypeAdapter[Sequence[AimbatEventParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatEventParametersSnapshot])
    )
    event_snaps = [s.event_parameters_snapshot for s in snapshots]
    event_dicts = event_params_adapter.dump_python(
        event_snaps, mode="json", by_alias=by_alias
    )

    seis_params_adapter: TypeAdapter[Sequence[AimbatSeismogramParametersSnapshot]] = (
        TypeAdapter(Sequence[AimbatSeismogramParametersSnapshot])
    )
    seis_snaps = [sp for s in snapshots for sp in s.seismogram_parameters_snapshots]
    seis_dicts = seis_params_adapter.dump_python(
        seis_snaps, mode="json", by_alias=by_alias
    )

    if from_read_model:
        snapshot_read_adapter: TypeAdapter[Sequence[AimbatSnapshotRead]] = TypeAdapter(
            Sequence[AimbatSnapshotRead]
        )
        snapshots_read = [
            AimbatSnapshotRead.from_snapshot(s, session=session) for s in snapshots
        ]
        snapshot_dicts = snapshot_read_adapter.dump_python(
            snapshots_read, mode="json", by_alias=by_alias, exclude=exclude
        )

        if by_title:
            title_map = get_title_map(AimbatSnapshotRead)
            snapshot_dicts = [
                {title_map.get(k, k): v for k, v in row.items()}
                for row in snapshot_dicts
            ]
    else:
        snapshot_adapter: TypeAdapter[Sequence[AimbatSnapshot]] = TypeAdapter(
            Sequence[AimbatSnapshot]
        )
        snapshot_dicts = snapshot_adapter.dump_python(
            snapshots, mode="json", by_alias=by_alias, exclude=exclude
        )

    data: dict[str, list[dict[str, Any]]] = {
        "snapshots": snapshot_dicts,
        "event_parameters": event_dicts,
        "seismogram_parameters": seis_dicts,
    }

    return data

dump_station_table

dump_station_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]

Create a JSON serialisable dict from the AimbatStation table data. Args: session: Database session. from_read_model: Whether to dump from the read model (True) or the ORM model. by_alias: Whether to use serialization aliases for the field names in the output. by_title: Whether to use titles for the field names in the output (only applicable when from_read_model is True). Mutually exclusive with by_alias. exclude: Set of field names to exclude from the output. event_id: Event ID to filter seismograms by (if none is provided, seismograms for all events are dumped).

Raises:

Type Description
ValueError

If both by_alias and by_title are True.

ValueError

If by_title is True but from_read_model is False.

Source code in src/aimbat/core/_station.py
def dump_station_table(
    session: Session,
    from_read_model: bool = False,
    by_alias: bool = False,
    by_title: bool = False,
    exclude: set[str] | None = None,
    event_id: UUID | None = None,
) -> list[dict[str, Any]]:
    """Create a JSON serialisable dict from the AimbatStation table data.
    Args:
        session: Database session.
        from_read_model: Whether to dump from the read model (True) or the ORM model.
        by_alias: Whether to use serialization aliases for the field names in the output.
        by_title: Whether to use titles for the field names in the output (only
            applicable when from_read_model is True). Mutually exclusive with by_alias.
        exclude: Set of field names to exclude from the output.
        event_id: Event ID to filter seismograms by (if none is provided,
            seismograms for all events are dumped).

    Raises:
        ValueError: If both `by_alias` and `by_title` are True.
        ValueError: If `by_title` is True but `from_read_model` is False.
    """

    if by_alias and by_title:
        raise ValueError("Arguments 'by_alias' and 'by_title' are mutually exclusive.")

    if not from_read_model and by_title:
        raise ValueError("'by_title' is only supported when 'from_read_model' is True.")

    logger.debug("Dumping AIMBAT station table to json.")

    if exclude is not None:
        exclude: dict[str, set] = {"__all__": exclude}  # type: ignore[no-redef]

    if event_id is not None:
        statement = (
            select(AimbatStation)
            .join(AimbatSeismogram)
            .where(AimbatSeismogram.event_id == event_id)
            .distinct()
        )
    else:
        statement = select(AimbatStation)

    stations = session.exec(statement).all()

    if from_read_model:
        read_stations = [
            AimbatStationRead.from_station(
                station=s,
                session=session,
            )
            for s in stations
        ]
        read_adapter: TypeAdapter[Sequence[AimbatStationRead]] = TypeAdapter(
            Sequence[AimbatStationRead]
        )
        data = read_adapter.dump_python(
            read_stations, exclude=exclude, by_alias=by_alias, mode="json"
        )

        if by_title:
            title_map = get_title_map(AimbatStationRead)
            return [{title_map.get(k, k): v for k, v in row.items()} for row in data]

        return data

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])
    return adapter.dump_python(
        stations, mode="json", by_alias=by_alias, exclude=exclude
    )

event_quality_groups

event_quality_groups(
    session: Session, event_id: UUID
) -> list[FieldGroup]

Return MCCC quality view data for an event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event UUID.

required

Returns:

Type Description
list[FieldGroup]

Two FieldGroup instances: event-level statistics and

list[FieldGroup]

averages across the seismograms used in the inversion.

Source code in src/aimbat/core/_quality.py
def event_quality_groups(session: Session, event_id: uuid.UUID) -> list[FieldGroup]:
    """Return MCCC quality view data for an event.

    Args:
        session: Database session.
        event_id: Event UUID.

    Returns:
        Two `FieldGroup` instances: event-level statistics and
        averages across the seismograms used in the inversion.
    """
    event_quality, stats = get_quality_event(session, event_id)

    event_group = FieldGroup(title="Event statistics")
    if event_quality is not None:
        event_group.fields = _specs_from_model(event_quality, AimbatEventQualityBase)
    else:
        event_group.empty_message = "No event quality data — run MCCC first"

    return [
        event_group,
        FieldGroup(
            title=f"Averages across {stats.count} seismograms",
            fields=_specs_from_model(stats, SeismogramQualityStats),
        ),
    ]

get_completed_events

get_completed_events(
    session: Session,
) -> Sequence[AimbatEvent]

Get the events marked as completed.

Parameters:

Name Type Description Default
session Session

SQL session.

required
Source code in src/aimbat/core/_event.py
def get_completed_events(session: Session) -> Sequence[AimbatEvent]:
    """Get the events marked as completed.

    Args:
        session: SQL session.
    """

    logger.debug("Getting completed events from project.")

    statement = (
        select(AimbatEvent)
        .join(AimbatEventParameters)
        .where(AimbatEventParameters.completed == 1)
    )

    return session.exec(statement).all()

get_data_for_event

get_data_for_event(
    session: Session, event_id: UUID
) -> Sequence[AimbatDataSource]

Returns the data sources belonging to the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

UUID of the AimbatEvent.

required

Returns:

Type Description
Sequence[AimbatDataSource]

Sequence of AimbatDataSource objects belonging to the event.

Source code in src/aimbat/core/_data.py
def get_data_for_event(session: Session, event_id: UUID) -> Sequence[AimbatDataSource]:
    """Returns the data sources belonging to the given event.

    Args:
        session: Database session.
        event_id: UUID of the AimbatEvent.

    Returns:
        Sequence of AimbatDataSource objects belonging to the event.
    """

    logger.debug(f"Getting data sources for event {event_id}.")

    statement = (
        select(AimbatDataSource)
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.event_id == event_id)
    )
    return session.exec(statement).all()

get_default_event

get_default_event(session: Session) -> AimbatEvent | None

Return the currently default event, or None if no event is set as default.

Parameters:

Name Type Description Default
session Session

SQL session.

required

Returns:

Type Description
AimbatEvent | None

Default Event, or None.

Source code in src/aimbat/core/_default_event.py
def get_default_event(session: Session) -> AimbatEvent | None:
    """
    Return the currently default event, or None if no event is set as default.

    Args:
        session: SQL session.

    Returns:
        Default Event, or None.
    """

    logger.debug("Attempting to determine default event.")

    statement = select(AimbatEvent).where(AimbatEvent.is_default == 1)
    default_event = session.exec(statement).one_or_none()

    logger.debug(f"Default event: {default_event.id if default_event else None}")

    return default_event

get_events_using_station

get_events_using_station(
    session: Session, station_id: UUID
) -> Sequence[AimbatEvent]

Get all events that use a particular station.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

UUID of the station to return events for.

required

Returns: Events that use the station.

Source code in src/aimbat/core/_event.py
def get_events_using_station(
    session: Session, station_id: UUID
) -> Sequence[AimbatEvent]:
    """Get all events that use a particular station.

    Args:
        session: Database session.
        station_id: UUID of the station to return events for.

    Returns: Events that use the station.
    """

    logger.debug(f"Getting events for station: {station_id}.")

    statement = (
        select(AimbatEvent)
        .join(AimbatSeismogram)
        .join(AimbatStation)
        .where(AimbatStation.id == station_id)
    )

    events = session.exec(statement).all()

    logger.debug(f"Found {len(events)}.")

    return events

get_quality_event

get_quality_event(
    session: Session, event_id: UUID
) -> tuple[
    AimbatEventQualitySnapshot | None,
    SeismogramQualityStats,
]

Get MCCC quality metrics for an event from the most recent snapshot.

Returns the event-level quality record together with aggregated seismogram quality statistics across all seismograms included in that MCCC run.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

Event UUID.

required

Returns:

Type Description
AimbatEventQualitySnapshot | None

A tuple of (event_quality_snapshot, stats).

SeismogramQualityStats

event_quality_snapshot is None if no MCCC has been run yet.

Source code in src/aimbat/core/_quality.py
def get_quality_event(
    session: Session, event_id: uuid.UUID
) -> tuple[AimbatEventQualitySnapshot | None, SeismogramQualityStats]:
    """Get MCCC quality metrics for an event from the most recent snapshot.

    Returns the event-level quality record together with aggregated seismogram
    quality statistics across all seismograms included in that MCCC run.

    Args:
        session: Database session.
        event_id: Event UUID.

    Returns:
        A tuple of `(event_quality_snapshot, stats)`.
        `event_quality_snapshot` is None if no MCCC has been run yet.
    """
    logger.debug(f"Getting quality for event {event_id}.")

    stmt = (
        select(AimbatSnapshot)
        .join(
            AimbatEventQualitySnapshot,
            col(AimbatEventQualitySnapshot.snapshot_id) == col(AimbatSnapshot.id),
        )
        .where(col(AimbatSnapshot.event_id) == event_id)
        .order_by(col(AimbatSnapshot.time).desc())
        .limit(1)
    )
    latest = session.exec(stmt).first()

    if latest is None:
        return None, SeismogramQualityStats(count=0)

    event_quality = latest.event_quality_snapshot
    stats = _stats_from_quality_snapshots(latest.seismogram_quality_snapshots)
    return event_quality, stats

get_quality_seismogram

get_quality_seismogram(
    session: Session, seismogram_id: UUID
) -> AimbatSeismogramQualitySnapshot | None

Get the quality snapshot for a seismogram from the most recent MCCC run.

Returns the seismogram's quality record from the most recent snapshot that has event-level quality data. Returns None if no MCCC has been run, if the seismogram has no live quality record, or if the most recent MCCC run excluded this seismogram.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram UUID.

required

Returns:

Type Description
AimbatSeismogramQualitySnapshot | None

The AimbatSeismogramQualitySnapshot from the most recent MCCC snapshot

AimbatSeismogramQualitySnapshot | None

that includes this seismogram, or None.

Source code in src/aimbat/core/_quality.py
def get_quality_seismogram(
    session: Session, seismogram_id: uuid.UUID
) -> AimbatSeismogramQualitySnapshot | None:
    """Get the quality snapshot for a seismogram from the most recent MCCC run.

    Returns the seismogram's quality record from the most recent snapshot that
    has event-level quality data. Returns None if no MCCC has been run, if
    the seismogram has no live quality record, or if the most recent MCCC run
    excluded this seismogram.

    Args:
        session: Database session.
        seismogram_id: Seismogram UUID.

    Returns:
        The `AimbatSeismogramQualitySnapshot` from the most recent MCCC snapshot
        that includes this seismogram, or None.
    """
    logger.debug(f"Getting quality for seismogram {seismogram_id}.")

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        return None
    if seismogram.quality is None:
        return None
    quality_id = seismogram.quality.id

    snap_stmt = (
        select(AimbatSnapshot)
        .join(
            AimbatEventQualitySnapshot,
            col(AimbatEventQualitySnapshot.snapshot_id) == col(AimbatSnapshot.id),
        )
        .where(col(AimbatSnapshot.event_id) == seismogram.event_id)
        .order_by(col(AimbatSnapshot.time).desc())
        .limit(1)
    )
    latest = session.exec(snap_stmt).first()
    if latest is None:
        return None

    stmt = select(AimbatSeismogramQualitySnapshot).where(
        col(AimbatSeismogramQualitySnapshot.snapshot_id) == latest.id,
        col(AimbatSeismogramQualitySnapshot.seismogram_quality_id) == quality_id,
    )
    return session.exec(stmt).first()

get_quality_station

get_quality_station(
    session: Session, station_id: UUID
) -> tuple[SeismogramQualityStats, SeismogramQualityStats]

Get aggregated MCCC quality metrics for a station from the most recent snapshots.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

Station UUID.

required

Returns:

Type Description
tuple[SeismogramQualityStats, SeismogramQualityStats]

A tuple of (all_stats, selected_stats).

Source code in src/aimbat/core/_quality.py
def get_quality_station(
    session: Session, station_id: uuid.UUID
) -> tuple[SeismogramQualityStats, SeismogramQualityStats]:
    """Get aggregated MCCC quality metrics for a station from the most recent snapshots.

    Args:
        session: Database session.
        station_id: Station UUID.

    Returns:
        A tuple of `(all_stats, selected_stats)`.
    """
    logger.debug(f"Getting quality for station {station_id}.")

    # For each event that has seismograms at this station, get the most
    # recent snapshot with quality data and collect its quality records.
    stmt = (
        select(AimbatSeismogram.event_id)
        .where(col(AimbatSeismogram.station_id) == station_id)
        .distinct()
    )
    event_ids = session.exec(stmt).all()

    all_records: list[AimbatSeismogramQualitySnapshot] = []
    selected_records: list[AimbatSeismogramQualitySnapshot] = []

    for event_id in event_ids:
        snap_stmt = (
            select(AimbatSnapshot)
            .join(
                AimbatEventQualitySnapshot,
                col(AimbatEventQualitySnapshot.snapshot_id) == col(AimbatSnapshot.id),
            )
            .where(col(AimbatSnapshot.event_id) == event_id)
            .order_by(col(AimbatSnapshot.time).desc())
            .limit(1)
        )
        snap = session.exec(snap_stmt).first()
        if snap is None:
            continue

        # Seismograms at this station in this snapshot.
        station_seis_ids = {
            seis.id for seis in snap.event.seismograms if seis.station_id == station_id
        }
        select_map = {
            sp.parameters.seismogram_id: sp.select
            for sp in snap.seismogram_parameters_snapshots
        }

        for sq in snap.seismogram_quality_snapshots:
            seis_id = sq.quality.seismogram_id
            if seis_id in station_seis_ids:
                all_records.append(sq)
                if select_map.get(seis_id, False):
                    selected_records.append(sq)

    return _stats_from_quality_snapshots(all_records), _stats_from_quality_snapshots(
        selected_records
    )

get_seismogram_mccc_map

get_seismogram_mccc_map(
    event: AimbatEvent,
) -> dict[
    UUID, tuple[Timedelta | None, float, float | None]
]

Return per-seismogram MCCC quality values for display from the live quality table.

Reads directly from the AimbatSeismogramQuality live records for the event's seismograms. Only seismograms with a non-None mccc_cc_mean are included (i.e. those for which MCCC has been run).

Must be called within an active SQLModel session so that ORM relationships on event can lazy-load.

Parameters:

Name Type Description Default
event AimbatEvent

Default AimbatEvent.

required

Returns:

Type Description
dict[UUID, tuple[Timedelta | None, float, float | None]]

Mapping from seismogram ID to (mccc_error, mccc_cc_mean, mccc_cc_std).

dict[UUID, tuple[Timedelta | None, float, float | None]]

Empty when MCCC has not been run.

Source code in src/aimbat/core/_quality.py
def get_seismogram_mccc_map(
    event: AimbatEvent,
) -> dict[uuid.UUID, tuple[pd.Timedelta | None, float, float | None]]:
    """Return per-seismogram MCCC quality values for display from the live quality table.

    Reads directly from the `AimbatSeismogramQuality` live records for the
    event's seismograms. Only seismograms with a non-None `mccc_cc_mean` are
    included (i.e. those for which MCCC has been run).

    Must be called within an active SQLModel session so that ORM relationships
    on `event` can lazy-load.

    Args:
        event: Default AimbatEvent.

    Returns:
        Mapping from seismogram ID to `(mccc_error, mccc_cc_mean, mccc_cc_std)`.
        Empty when MCCC has not been run.
    """
    result: dict[uuid.UUID, tuple[pd.Timedelta | None, float, float | None]] = {}
    for seis in event.seismograms:
        sq = seis.quality
        if sq is not None and sq.mccc_cc_mean is not None:
            result[seis.id] = (sq.mccc_error, sq.mccc_cc_mean, sq.mccc_cc_std)
    return result

get_selected_seismograms

get_selected_seismograms(
    session: Session,
    event_id: UUID | None = None,
    all_events: bool = False,
) -> Sequence[AimbatSeismogram]

Get the selected seismograms for the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to get seismograms for (only used when all_events is False).

None
all_events bool

Get the selected seismograms for all events.

False

Returns: Selected seismograms.

Source code in src/aimbat/core/_seismogram.py
def get_selected_seismograms(
    session: Session, event_id: UUID | None = None, all_events: bool = False
) -> Sequence[AimbatSeismogram]:
    """Get the selected seismograms for the given event.

    Args:
        session: Database session.
        event_id: Event ID to get seismograms for (only used when all_events is False).
        all_events: Get the selected seismograms for all events.

    Returns: Selected seismograms.
    """

    if all_events is True:
        logger.debug("Selecting seismograms for all events.")
        statement = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .where(AimbatSeismogramParameters.select == 1)
        )
    else:
        if event_id is None:
            raise ValueError("An event must be provided when all_events is False.")
        logger.debug(f"Selecting seismograms for event {event_id} only.")
        statement = (
            select(AimbatSeismogram)
            .join(AimbatSeismogramParameters)
            .where(AimbatSeismogramParameters.select == 1)
            .where(AimbatSeismogram.event_id == event_id)
        )

    seismograms = session.exec(statement).all()

    logger.debug(f"Found {len(seismograms)} selected seismograms.")

    return seismograms

get_snapshots

get_snapshots(
    session: Session, event_id: UUID | None = None
) -> Sequence[AimbatSnapshot]

Get the snapshots, optional filtered by event ID.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID | None

Event ID to filter snapshots by (if none is provided, snapshots for all events are returned).

None

Returns: Snapshots.

Source code in src/aimbat/core/_snapshot.py
def get_snapshots(
    session: Session, event_id: UUID | None = None
) -> Sequence[AimbatSnapshot]:
    """Get the snapshots, optional filtered by event ID.

    Args:
        session: Database session.
        event_id: Event ID to filter snapshots by (if none is provided, snapshots for all events are returned).

    Returns: Snapshots.
    """
    logger.debug("Getting AIMBAT snapshots.")

    if event_id is None:
        statement = select(AimbatSnapshot)
    else:
        statement = select(AimbatSnapshot).where(AimbatSnapshot.event_id == event_id)

    logger.debug(f"Executing statement to get snapshots: {statement}")
    return session.exec(statement).all()

get_station_iccs_ccs

get_station_iccs_ccs(
    session: Session, station_id: UUID
) -> tuple[float | None, ...]

Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

ID of the station.

required

Returns: Tuple of ICCS CC values, one per seismogram (None if not yet computed).

Source code in src/aimbat/core/_station.py
def get_station_iccs_ccs(
    session: Session, station_id: UUID
) -> tuple[float | None, ...]:
    """Get ICCS cross-correlation coefficients for all seismograms of a station across all events.

    Args:
        session: Database session.
        station_id: ID of the station.

    Returns: Tuple of ICCS CC values, one per seismogram (None if not yet computed).
    """
    logger.debug(f"Getting ICCS CCs for {station_id=}.")

    statement = (
        select(AimbatSeismogramQuality.iccs_cc)
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.station_id == station_id)
    )

    return tuple(session.exec(statement).all())

get_stations_in_event

get_stations_in_event(
    session: Session, event_id: UUID, as_json: bool = False
) -> Sequence[AimbatStation] | list[dict[str, Any]]

Get the stations for a particular event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

ID of the event to get stations for.

required
as_json bool

Whether to return the result as JSON.

False

Returns: Stations in event.

Source code in src/aimbat/core/_station.py
def get_stations_in_event(
    session: Session, event_id: UUID, as_json: bool = False
) -> Sequence[AimbatStation] | list[dict[str, Any]]:
    """Get the stations for a particular event.

    Args:
        session: Database session.
        event_id: ID of the event to get stations for.
        as_json: Whether to return the result as JSON.

    Returns: Stations in event.
    """
    logger.debug(f"Getting stations for event: {event_id}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(f"Unable to find event with {event_id=}")

    statement = (
        select(AimbatStation)
        .distinct()
        .join(AimbatSeismogram)
        .where(AimbatSeismogram.event_id == event.id)
    )

    logger.debug(f"Executing query: {statement}")
    results = session.exec(statement).all()

    if not as_json:
        return results

    adapter: TypeAdapter[Sequence[AimbatStation]] = TypeAdapter(Sequence[AimbatStation])

    return adapter.dump_python(results, mode="json")

reset_seismogram_parameters

reset_seismogram_parameters(
    session: Session, seismogram_id: UUID
) -> None

Reset an AimbatSeismogram's parameters to their default values.

All fields defined on AimbatSeismogramParametersBase are reset to the values produced by a fresh default instance, so newly added fields are picked up automatically.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

ID of seismogram to reset parameters for.

required
Source code in src/aimbat/core/_seismogram.py
def reset_seismogram_parameters(session: Session, seismogram_id: UUID) -> None:
    """Reset an AimbatSeismogram's parameters to their default values.

    All fields defined on AimbatSeismogramParametersBase are reset to the
    values produced by a fresh default instance, so newly added fields are
    picked up automatically.

    Args:
        session: Database session.
        seismogram_id: ID of seismogram to reset parameters for.
    """

    logger.info(f"Resetting parameters for seismogram {seismogram_id}.")

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        raise NoResultFound(f"No AimbatSeismogram found with {seismogram_id=}")

    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    defaults = AimbatSeismogramParametersBase()
    for field_name in AimbatSeismogramParametersBase.model_fields:
        setattr(seismogram.parameters, field_name, getattr(defaults, field_name))
    session.add(seismogram)
    parameters_hash = compute_parameters_hash(seismogram.event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, seismogram.event)

resolve_event

resolve_event(
    session: Session, event_id: UUID | None = None
) -> AimbatEvent

Resolve an event from either an explicit ID or the default event.

Parameters:

Name Type Description Default
session Session

SQL session.

required
event_id UUID | None

Optional event ID.

None

Returns:

Type Description
AimbatEvent

The specified event or the default event.

Raises:

Type Description
NoResultFound

If an explicit event_id is given but not found.

NoResultFound

If no event_id is given and no default event is set.

Source code in src/aimbat/core/_default_event.py
def resolve_event(session: Session, event_id: UUID | None = None) -> AimbatEvent:
    """
    Resolve an event from either an explicit ID or the default event.

    Args:
        session: SQL session.
        event_id: Optional event ID.

    Returns:
        The specified event or the default event.

    Raises:
        NoResultFound: If an explicit event_id is given but not found.
        NoResultFound: If no event_id is given and no default event is set.
    """
    if event_id:
        logger.debug(f"Resolving event by explicit ID: {event_id}")
        event = session.get(AimbatEvent, event_id)
        if event is None:
            raise NoResultFound(f"No AimbatEvent found with id: {event_id}.")
        return event
    logger.debug("Falling back to default event for resolution.")
    event = get_default_event(session)
    if event is None:
        raise NoResultFound("No event selected.")
    return event

rollback_to_snapshot

rollback_to_snapshot(
    session: Session, snapshot_id: UUID
) -> None

Rollback to an AIMBAT parameters snapshot.

Parameters:

Name Type Description Default
snapshot_id UUID

Snapshot id.

required
Source code in src/aimbat/core/_snapshot.py
def rollback_to_snapshot(session: Session, snapshot_id: UUID) -> None:
    """Rollback to an AIMBAT parameters snapshot.

    Args:
        snapshot_id: Snapshot id.
    """

    logger.info(f"Rolling back to snapshot with id={snapshot_id}.")

    snapshot = session.get(AimbatSnapshot, snapshot_id)
    if snapshot is None:
        raise ValueError(f"No AimbatSnapshot found with {snapshot_id=}")

    # create object with just the parameters
    rollback_event_parameters = AimbatEventParametersBase.model_validate(
        snapshot.event_parameters_snapshot
    )
    logger.debug(
        f"Using event parameters snapshot with id={snapshot.event_parameters_snapshot.id} for rollback."
    )
    current_event_parameters = snapshot.event.parameters

    # setting attributes explicitly brings them into the session
    for k in AimbatEventParametersBase.model_fields.keys():
        v = getattr(rollback_event_parameters, k)
        logger.debug(f"Setting event parameter {k} to {v!r} for rollback.")
        setattr(current_event_parameters, k, v)

    session.add(current_event_parameters)

    for seismogram_parameters_snapshot in snapshot.seismogram_parameters_snapshots:
        rollback_seismogram_parameters = AimbatSeismogramParametersBase.model_validate(
            seismogram_parameters_snapshot
        )
        logger.debug(
            f"Using seismogram parameters snapshot with id={seismogram_parameters_snapshot.id} for rollback."
        )
        current_seismogram_parameters = seismogram_parameters_snapshot.parameters
        for k in AimbatSeismogramParametersBase.model_fields.keys():
            v = getattr(rollback_seismogram_parameters, k)
            logger.debug(f"Setting seismogram parameter {k} to {v!r} for rollback.")
            setattr(current_seismogram_parameters, k, v)
        session.add(current_seismogram_parameters)

    session.commit()
    sync_from_matching_hash(session, snapshot_id=snapshot_id)

run_iccs

run_iccs(
    session: Session,
    event: AimbatEvent,
    iccs: ICCS,
    autoflip: bool,
    autoselect: bool,
) -> IccsResult

Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance.

required
autoflip bool

If True, automatically flip seismograms to maximise cross-correlation.

required
autoselect bool

If True, automatically deselect seismograms whose cross-correlation falls below the threshold.

required

Returns:

Type Description
IccsResult

IccsResult from the algorithm run.

Source code in src/aimbat/core/_iccs.py
def run_iccs(
    session: Session, event: AimbatEvent, iccs: ICCS, autoflip: bool, autoselect: bool
) -> IccsResult:
    """Run the Iterative Cross-Correlation and Stack (ICCS) algorithm.

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance.
        autoflip: If True, automatically flip seismograms to maximise cross-correlation.
        autoselect: If True, automatically deselect seismograms whose cross-correlation
            falls below the threshold.

    Returns:
        IccsResult from the algorithm run.
    """

    logger.info(f"Running ICCS with {autoflip=}, {autoselect=}.")

    result = iccs(autoflip=autoflip, autoselect=autoselect)
    n_iter = len(result.convergence)
    status = "converged" if result.converged else "did not converge"
    logger.info(f"ICCS {status} after {n_iter} iterations.")
    _write_back_seismograms(session, iccs)
    _write_iccs_stats(event.id, iccs)
    return result

run_mccc

run_mccc(
    session: Session,
    event: AimbatEvent,
    iccs: ICCS,
    all_seismograms: bool,
) -> McccResult

Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance.

required
all_seismograms bool

If True, include deselected seismograms in the alignment.

required

Returns:

Type Description
McccResult

McccResult from the algorithm run.

Source code in src/aimbat/core/_iccs.py
def run_mccc(
    session: Session, event: AimbatEvent, iccs: ICCS, all_seismograms: bool
) -> McccResult:
    """Run the Multi-Channel Cross-Correlation (MCCC) algorithm.

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance.
        all_seismograms: If True, include deselected seismograms in the alignment.

    Returns:
        McccResult from the algorithm run.
    """

    logger.info(f"Running MCCC for event {event.id} with {all_seismograms=}.")

    result = iccs.run_mccc(
        all_seismograms=all_seismograms,
        min_cc=event.parameters.mccc_min_cc,
        damping=event.parameters.mccc_damp,
    )
    _write_back_seismograms(session, iccs)
    _write_iccs_stats(event.id, iccs)
    _write_mccc_quality(event.id, iccs, result, all_seismograms)
    return result

seismogram_quality_groups

seismogram_quality_groups(
    session: Session, seismogram_id: UUID
) -> list[FieldGroup]

Return quality view data for a single seismogram.

Parameters:

Name Type Description Default
session Session

Database session.

required
seismogram_id UUID

Seismogram UUID.

required

Returns:

Type Description
list[FieldGroup]

A single-element list containing one FieldGroup with per-seismogram

list[FieldGroup]

quality fields, or an empty group with a message if no quality data

list[FieldGroup]

exists yet.

Source code in src/aimbat/core/_quality.py
def seismogram_quality_groups(
    session: Session, seismogram_id: uuid.UUID
) -> list[FieldGroup]:
    """Return quality view data for a single seismogram.

    Args:
        session: Database session.
        seismogram_id: Seismogram UUID.

    Returns:
        A single-element list containing one `FieldGroup` with per-seismogram
        quality fields, or an empty group with a message if no quality data
        exists yet.
    """
    quality = get_quality_seismogram(session, seismogram_id)
    if quality is None:
        return [
            FieldGroup(
                title="",
                empty_message="No quality data — run MCCC first",
            )
        ]
    return [
        FieldGroup(
            title="",
            fields=_specs_from_model(quality, AimbatSeismogramQualityBase),
        )
    ]

set_default_event

set_default_event(session: Session, event_id: UUID) -> None

Set the default event (i.e. the one being processed).

Parameters:

Name Type Description Default
session Session

SQL session.

required
event_id UUID

UUID of AIMBAT Event to set as default one.

required
Source code in src/aimbat/core/_default_event.py
def set_default_event(session: Session, event_id: UUID) -> None:
    """
    Set the default event (i.e. the one being processed).

    Args:
        session: SQL session.
        event_id: UUID of AIMBAT Event to set as default one.
    """

    logger.debug(f"Setting default {event_id=}")

    new_default_event = session.get(AimbatEvent, event_id)
    if new_default_event is None:
        raise ValueError(f"No AimbatEvent found with id: {event_id}.")

    current_default_event = get_default_event(session)

    # unset the current default first
    if current_default_event is not None:
        if new_default_event.id == current_default_event.id:
            return
        current_default_event.is_default = None
        session.add(current_default_event)
        session.flush()

    # set new default
    new_default_event.is_default = True
    session.add(new_default_event)
    session.commit()

set_event_parameter

set_event_parameter(
    session: Session,
    event_id: UUID,
    name: EventParameter,
    value: Timedelta | bool | float | str,
    *,
    validate_iccs: bool = False,
) -> None

Set event parameter value for the given event.

Parameters:

Name Type Description Default
session Session

Database session.

required
event_id UUID

UUID of the event to set the parameter value for.

required
name EventParameter

Name of the parameter.

required
value Timedelta | bool | float | str

Value to set.

required
validate_iccs bool

If True, attempt ICCS construction with the new value before committing. Raises and leaves the database unchanged on failure.

False
Source code in src/aimbat/core/_event.py
def set_event_parameter(
    session: Session,
    event_id: UUID,
    name: EventParameter,
    value: Timedelta | bool | float | str,
    *,
    validate_iccs: bool = False,
) -> None:
    """Set event parameter value for the given event.

    Args:
        session: Database session.
        event_id: UUID of the event to set the parameter value for.
        name: Name of the parameter.
        value: Value to set.
        validate_iccs: If True, attempt ICCS construction with the new value
            before committing. Raises and leaves the database unchanged on failure.
    """
    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    logger.debug(f"Setting {name=} to {value} for event {event_id=}.")

    event = session.get(AimbatEvent, event_id)
    if event is None:
        raise NoResultFound(f"No AimbatEvent found with id: {event_id}.")

    # Perform Pydantic validation (including optional ICCS validation)
    parameters = AimbatEventParametersBase.model_validate(
        event.parameters,
        update={name: value},
        context={"validate_iccs": validate_iccs, "event": event},
    )

    setattr(event.parameters, name, getattr(parameters, name))
    session.add(event)
    parameters_hash = compute_parameters_hash(event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, event)

set_seismogram_parameter

set_seismogram_parameter(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None

Set parameter value for an AimbatSeismogram instance.

Parameters:

Name Type Description Default
session Session

Database session

required
seismogram_id UUID

Seismogram id.

required
name SeismogramParameter

Name of the parameter.

required
value Timestamp | bool | str

Value to set parameter to.

required
Source code in src/aimbat/core/_seismogram.py
def set_seismogram_parameter(
    session: Session,
    seismogram_id: UUID,
    name: SeismogramParameter,
    value: Timestamp | bool | str,
) -> None:
    """Set parameter value for an AimbatSeismogram instance.

    Args:
        session: Database session
        seismogram_id: Seismogram id.
        name: Name of the parameter.
        value: Value to set parameter to.

    """
    from ._iccs import clear_mccc_quality
    from ._snapshot import compute_parameters_hash, sync_from_matching_hash

    logger.debug(
        f"Setting seismogram {name=} to {value=} in seismogram {seismogram_id=}."
    )

    seismogram = session.get(AimbatSeismogram, seismogram_id)
    if seismogram is None:
        raise ValueError(f"No AimbatSeismogram found with {seismogram_id=}")

    parameters = AimbatSeismogramParametersBase.model_validate(
        seismogram.parameters, update={name: value}
    )
    setattr(seismogram.parameters, name, getattr(parameters, name))
    session.add(seismogram)
    parameters_hash = compute_parameters_hash(seismogram.event)
    if not sync_from_matching_hash(session, parameters_hash):
        clear_mccc_quality(session, seismogram.event)

snapshot_quality_groups

snapshot_quality_groups(
    session: Session, snapshot_id: UUID
) -> list[FieldGroup]

Return MCCC quality view data for a snapshot.

The number of groups depends on whether MCCC was run on all seismograms or only the selected ones, which is inferred from whether any non-selected seismogram has MCCC data in the snapshot.

Parameters:

Name Type Description Default
session Session

Database session.

required
snapshot_id UUID

Snapshot UUID.

required

Returns:

Type Description
list[FieldGroup]

Two FieldGroup instances: event-level MCCC statistics and

list[FieldGroup]

per-seismogram averages (scoped to selected or all seismograms

list[FieldGroup]

depending on how MCCC was run). Returns a single empty group

list[FieldGroup]

when no quality was captured.

Raises:

Type Description
ValueError

If no snapshot with the given ID is found.

Source code in src/aimbat/core/_quality.py
def snapshot_quality_groups(
    session: Session, snapshot_id: uuid.UUID
) -> list[FieldGroup]:
    """Return MCCC quality view data for a snapshot.

    The number of groups depends on whether MCCC was run on all seismograms or
    only the selected ones, which is inferred from whether any non-selected
    seismogram has MCCC data in the snapshot.

    Args:
        session: Database session.
        snapshot_id: Snapshot UUID.

    Returns:
        Two `FieldGroup` instances: event-level MCCC statistics and
        per-seismogram averages (scoped to selected or all seismograms
        depending on how MCCC was run). Returns a single empty group
        when no quality was captured.

    Raises:
        ValueError: If no snapshot with the given ID is found.
    """
    snapshot = session.get(AimbatSnapshot, snapshot_id)
    if snapshot is None:
        raise ValueError(f"Snapshot {snapshot_id} not found.")

    if snapshot.event_quality_snapshot is None:
        return [
            FieldGroup(
                title="",
                empty_message="No quality data — run MCCC then create a snapshot",
            )
        ]

    eq = snapshot.event_quality_snapshot

    event_specs = _specs_from_model(eq, AimbatEventQualityBase)

    all_sq = [
        sq
        for sq in snapshot.seismogram_quality_snapshots
        if sq.mccc_cc_mean is not None
    ]
    stats = _stats_from_quality_snapshots(all_sq)

    return [
        FieldGroup(title="Event statistics", fields=event_specs),
        FieldGroup(
            title="Averages across seismograms",
            fields=_specs_from_model(stats, SeismogramQualityStats),
        ),
    ]

station_quality_groups

station_quality_groups(
    session: Session, station_id: UUID
) -> list[FieldGroup]

Return quality view data for a station.

Parameters:

Name Type Description Default
session Session

Database session.

required
station_id UUID

Station UUID.

required

Returns:

Type Description
list[FieldGroup]

Two FieldGroup instances: averages across selected seismograms

list[FieldGroup]

and averages across all seismograms.

Source code in src/aimbat/core/_quality.py
def station_quality_groups(session: Session, station_id: uuid.UUID) -> list[FieldGroup]:
    """Return quality view data for a station.

    Args:
        session: Database session.
        station_id: Station UUID.

    Returns:
        Two `FieldGroup` instances: averages across selected seismograms
        and averages across all seismograms.
    """
    all_stats, selected_stats = get_quality_station(session, station_id)
    return [
        FieldGroup(
            title=f"Averages across {selected_stats.count} selected seismograms",
            fields=_specs_from_model(selected_stats, SeismogramQualityStats),
        ),
        FieldGroup(
            title=f"Averages across {all_stats.count} seismograms",
            fields=_specs_from_model(all_stats, SeismogramQualityStats),
        ),
    ]

sync_from_matching_hash

sync_from_matching_hash(
    session: Session,
    parameters_hash: str | None = None,
    snapshot_id: UUID | None = None,
) -> bool

Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

Searches all snapshots for candidates whose parameters_hash matches and that have MCCC quality data. When multiple candidates exist, snapshot_id is used as a tie-breaker (preferred if it is among them); otherwise the most recent candidate is used.

Parameters:

Name Type Description Default
session Session

Database session.

required
parameters_hash str | None

Hash to match against snapshot hashes. If None and snapshot_id is provided, the hash is derived from that snapshot.

None
snapshot_id UUID | None

Optional tie-breaker when multiple candidates share the same hash.

None

Returns:

Type Description
bool

True if quality metrics were synced, False if no suitable candidate

bool

was found.

Raises:

Type Description
ValueError

If both are provided but the hashes differ.

Source code in src/aimbat/core/_snapshot.py
def sync_from_matching_hash(
    session: Session,
    parameters_hash: str | None = None,
    snapshot_id: UUID | None = None,
) -> bool:
    """Sync live quality metrics from a snapshot whose parameter hash matches the given hash.

    Searches all snapshots for candidates whose `parameters_hash` matches and
    that have MCCC quality data. When multiple candidates exist, `snapshot_id`
    is used as a tie-breaker (preferred if it is among them); otherwise the
    most recent candidate is used.

    Args:
        session: Database session.
        parameters_hash: Hash to match against snapshot hashes. If None and
            `snapshot_id` is provided, the hash is derived from that snapshot.
        snapshot_id: Optional tie-breaker when multiple candidates share the
            same hash.

    Returns:
        True if quality metrics were synced, False if no suitable candidate
        was found.

    Raises:
        ValueError: If both are provided but the hashes differ.
    """
    if parameters_hash is None:
        if snapshot_id is None:
            return False
        snapshot = session.get(AimbatSnapshot, snapshot_id)
        if snapshot is None:
            raise ValueError(f"No AimbatSnapshot found with {snapshot_id=}")
        parameters_hash = snapshot.parameters_hash
        if parameters_hash is None:
            return False
    elif snapshot_id is not None:
        snapshot = session.get(AimbatSnapshot, snapshot_id)
        if snapshot is not None and snapshot.parameters_hash != parameters_hash:
            raise ValueError(
                f"Provided parameters_hash does not match hash on snapshot {snapshot_id}."
            )

    logger.debug(f"Looking for quality metrics to sync for hash {parameters_hash}.")

    candidates = [
        s
        for s in get_snapshots(session)
        if s.parameters_hash == parameters_hash
        and s.event_quality_snapshot is not None
        and s.event_quality_snapshot.mccc_rmse is not None
    ]
    if not candidates:
        logger.debug("No snapshot with matching hash and MCCC quality data found.")
        return False

    preferred = next((c for c in candidates if c.id == snapshot_id), None)
    snapshot = (
        preferred if preferred is not None else max(candidates, key=lambda s: s.time)
    )

    logger.info(f"Syncing quality metrics from snapshot {snapshot.id}.")

    event_quality_snap = snapshot.event_quality_snapshot
    assert event_quality_snap is not None
    live_event_quality = session.get(
        AimbatEventQuality, event_quality_snap.event_quality_id
    )
    if live_event_quality is None:
        logger.warning(
            f"Live event quality record {event_quality_snap.event_quality_id} not found; skipping event quality sync."
        )
    else:
        for k in AimbatEventQualityBase.model_fields:
            v = getattr(event_quality_snap, k)
            logger.debug(f"Setting event quality {k} to {v!r} from snapshot.")
            setattr(live_event_quality, k, v)
        session.add(live_event_quality)

    for seis_quality_snap in snapshot.seismogram_quality_snapshots:
        live_seis_quality = session.get(
            AimbatSeismogramQuality, seis_quality_snap.seismogram_quality_id
        )
        if live_seis_quality is None:
            logger.warning(
                f"Live seismogram quality record {seis_quality_snap.seismogram_quality_id} not found; skipping."
            )
            continue
        for k in AimbatSeismogramQualityBase.model_fields:
            v = getattr(seis_quality_snap, k)
            logger.debug(f"Setting seismogram quality {k} to {v!r} from snapshot.")
            setattr(live_seis_quality, k, v)
        session.add(live_seis_quality)

    session.commit()
    return True

sync_iccs_parameters

sync_iccs_parameters(
    session: Session, event: AimbatEvent, iccs: ICCS
) -> None

Sync an existing ICCS instance's parameters from the database.

Updates event-level and per-seismogram parameters without re-reading waveform data. Use this after operations that change parameters but not the seismogram list (e.g. rolling back to a snapshot).

Parameters:

Name Type Description Default
session Session

Database session.

required
event AimbatEvent

AimbatEvent.

required
iccs ICCS

ICCS instance to update in-place.

required
Source code in src/aimbat/core/_iccs.py
def sync_iccs_parameters(session: Session, event: AimbatEvent, iccs: ICCS) -> None:
    """Sync an existing ICCS instance's parameters from the database.

    Updates event-level and per-seismogram parameters without re-reading waveform
    data. Use this after operations that change parameters but not the
    seismogram list (e.g. rolling back to a snapshot).

    Args:
        session: Database session.
        event: AimbatEvent.
        iccs: ICCS instance to update in-place.
    """

    logger.debug(f"Syncing ICCS parameters from database for event {event.id}.")

    event_params = AimbatEventParametersBase.model_validate(event.parameters)
    for field_name in AimbatEventParametersBase.model_fields:
        if hasattr(iccs, field_name):
            setattr(iccs, field_name, getattr(event_params, field_name))

    for iccs_seis in iccs.seismograms:
        db_seis = session.get(AimbatSeismogram, iccs_seis.extra["id"])
        if db_seis is not None:
            seis_params = AimbatSeismogramParametersBase.model_validate(
                db_seis.parameters
            )
            for field_name in AimbatSeismogramParametersBase.model_fields:
                setattr(iccs_seis, field_name, getattr(seis_params, field_name))

    iccs.clear_cache()

validate_iccs_construction

validate_iccs_construction(
    event: AimbatEvent,
    parameters: AimbatEventParametersBase | None = None,
) -> None

Try to construct an ICCS instance for the event without caching the result.

Use this to check whether the event's current (possibly uncommitted) parameters are compatible with ICCS construction before persisting them to the database.

Parameters:

Name Type Description Default
event AimbatEvent

AimbatEvent.

required
parameters AimbatEventParametersBase | None

Optional AimbatEventParametersBase to use instead of the live event parameters (useful for validation).

None
Source code in src/aimbat/core/_iccs.py
def validate_iccs_construction(
    event: AimbatEvent, parameters: AimbatEventParametersBase | None = None
) -> None:
    """Try to construct an ICCS instance for the event without caching the result.

    Use this to check whether the event's current (possibly uncommitted) parameters
    are compatible with ICCS construction before persisting them to the database.

    Args:
        event: AimbatEvent.
        parameters: Optional AimbatEventParametersBase to use instead of the live
            event parameters (useful for validation).

    Raises:
        Any exception raised by ICCS construction (e.g. invalid parameter values).
    """
    _build_iccs(event, parameters=parameters)