#!/usr/bin/env python
# relMT - Program to compute relative earthquake moment tensors
# Copyright (C) 2024 Wasja Bloch, Doriane Drolet, Michael Bostock
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Entry points for relMT command line interface."""
from relmt import io, utils, align, core, signal, amp, ls, mt, qc, angle, plot
from scipy import sparse
from pathlib import Path
import yaml
import numpy as np
import sys
import multiprocessing as mp
from multiprocessing import shared_memory as sm
from threadpoolctl import threadpool_limits
from argparse import ArgumentParser, Namespace
from collections import defaultdict
logger = core.register_logger(__name__)
[docs]
def align_entry(
config: core.Config,
directory: Path = Path(),
iteration: int = 0,
do_mccc: bool = True,
do_pca: bool = True,
overwrite: bool = False,
) -> None:
"""
Align waveform files and write results into next alignment directory.
This function is called when executing 'relmt align' from the command line.
Parameters
----------
config:
Configuration object with alignment parameters. Content of the file read
by the '--config' option.
directory:
Root directory of the project, containing the 'data/' and 'align?/'
subfolders. Path of the file referenced by the '--config' option.
iteration:
Current alignment iteration number. `0`: read from 'data/', `>0` read from
alignment iteration folder. Number parsed to the '--alignment' option.
do_mccc:
Align by multi-channel cross-correlation. Activated by '--mccc'
do_pca:
Align by principal component analysis. Activated by '--pca'
overwrite:
Overwrite existing aligned waveform files. Activated by '--overwrite'
"""
excl = io.read_exclude_file(core.file("exclude", directory=directory))
wvids = core.iterate_waveid(directory, iteration, excl)
ncpu = config["ncpu"]
lag_times = config["lag_times"]
# Initialize event dict, only load it when we combine nearest neighbors
evd = {}
args = []
for wvid in wvids:
logger.debug(f"Getting arguments for: {wvid}")
station, phase = core.split_waveid(wvid)
source = (station, phase, iteration)
dest = (station, phase, iteration + 1)
# Check the destination
if not overwrite: # Try to read the destination files
try:
_ = io.read_waveform_array_header(*dest)
except FileNotFoundError:
pass # Continue when they are absent
else:
continue # Do not process when they exist
# Read the soure
try:
arr, hdr = io.read_waveform_array_header(*source)
except FileNotFoundError:
logger.debug("Trying Matlab file.")
try:
arr, hdr = io.read_waveform_array_header(*source, matlab=True)
except OSError:
logger.debug("Expected an absent file. Continuing.")
continue
# QC: Events included for this wavetrain
iin, _ = qc.included_events(
excl, return_bool=True, **hdr.kwargs(qc.included_events)
)
if not any(iin):
logger.warning(f"No events included for {wvid}. Continuing.")
continue
# Read optional pair list and check for singolton events
if hdr["combinations_from_file"] and hdr["combine_neighbors"]:
logger.warning(
"'combinations_from_file' given. Ignoring 'combine_neighbors'"
)
if hdr["combinations_from_file"]:
pairs = io.read_combinations(core.file("combination", *source))
iin &= np.isin(hdr["events_"], np.unique(list(pairs)))
elif hdr["combine_neighbors"]:
if not evd:
# Load the event dict, if not done yet
evd = io.read_event_table(config["event_file"])
evns = np.array(hdr["events_"])[iin] # Only parse included events
pairs = utils.nearest_neighbors(hdr["combine_neighbors"], evns, evd)
iin &= np.isin(hdr["events_"], np.unique(list(pairs)))
# Enough events left?
if (phase.startswith("P") and sum(iin) < 2) or (
phase.startswith("S") and sum(iin) < 3
):
logger.warning(f"Not enough events for {wvid}")
continue
# Only parse valid events
hdr["events_"] = list(np.array(hdr["events_"])[iin])
# Convert pair to triplet combinations and return indices into arr
combinations = np.array([])
if hdr["combinations_from_file"] or hdr["combine_neighbors"]:
logger.debug("Finding valid combinations")
combinations = utils.valid_combinations(hdr["events_"], pairs, hdr["phase"])
logger.info(f"Found {combinations.shape[0]} combinations for {wvid}")
arr = arr[iin, :, :]
args.append((arr, hdr, dest, do_mccc, do_pca, combinations, lag_times))
if ncpu > 1:
with mp.Pool(ncpu) as pool:
pool.starmap(align.run_limited, args)
else:
for arg in args:
align.run(*arg)
[docs]
def exclude_entry(
config: core.Config,
iteration: int = 0,
overwrite: bool = False,
directory: Path = Path(),
do_nodata: bool = False,
do_snr: bool = False,
do_cc: bool = False,
do_ecn: bool = False,
cc_from_file: bool = False,
) -> None:
"""Exclude phase observations based on waveform quality criteria.
Thresholds are read from the resective waveform header files and can be
different for each phase. Default values from a `default-hdr.yaml` are
respected. `None` thresholds are ignored.
This function is called when executing 'relmt exclude' from the command line.
Parameters
----------
config:
Configuration object with station file. Content of the file read
by the '--config' option.
iteration:
Current alignment iteration number. `0`: read from 'data/', `>0` read from
alignment iteration folder. Number parsed to the '--alignment' option.
overwrite:
Overwrite existing entry exclude file. Activated by '--overwrite'. If
False, append to existing lists instead.
directory:
Root directory of the project, containing the 'data/' and 'align?/'
subfolders. Path of the file referenced by the '--config' option.
do_nodata:
Exclude traces with `NaN` data, null data or data with all values below
absolute 'null_threshold'. Activated by '--nodata'
do_snr:
Exclude traces with signal-to-noise ratio below 'min_signal_noise_ratio'.
Activated by '--snr'
do_cc:
Exclude traces with average cross-correlation below 'min_correlation'.
Activated by '--cc'
do_ecn:
Exclude traces with expansion coefficient norm below
'min_expansion_coefficient_norm'. Activated by '--ecn'
cc_from_file:
Read cross-correlation values from alignment produre. Changes in
time window and frequency band are then not considerd in the exclusion
based on cc.
"""
exf = core.file("exclude", directory=directory)
try:
logger.info(f"Reading excludes from: {exf}")
excl = io.read_exclude_file(exf)
except OSError:
logger.info(f"No exlusion file found. Creating: {exf}")
excl = core.exclude
io.save_yaml(exf, excl)
# Collect new excludes in this dictionary
excludes = {"no_data": [], "snr": [], "cc": [], "ecn": []}
for wvid in core.iterate_waveid(directory, iteration):
sta, pha = core.split_waveid(wvid)
try:
arr, hdr = io.read_waveform_array_header(
sta,
pha,
iteration,
)
except FileNotFoundError:
continue
events = np.array(hdr["events_"])
# Boolean indices of events with no data
ind = qc.index_nonzero_events(
arr, null_threshold=hdr["null_threshold"], return_bool=True, return_not=True
)
logger.debug(f"{wvid}: {sum(ind)} traces with no data")
if all(ind):
excl["waveform"] += [wvid]
continue
isnr = np.full_like(ind, False)
if (minsnr := hdr["min_signal_noise_ratio"]) is not None and do_snr:
snr = signal.signal_noise_ratio(
arr, **hdr.kwargs(signal.signal_noise_ratio)
)
isnr = snr < minsnr
logger.debug(f"{wvid}: {sum(isnr)} traces with SNR < {minsnr}")
icc = np.full_like(ind, False)
if (mincc := hdr["min_correlation"]) is not None and do_cc:
if cc_from_file:
# Read cc from file
ccij = np.loadtxt(
core.file("cc_matrix", sta, pha, iteration, directory)
)
cc = utils.fisher_average(ccij)
else:
# Recompute cc based on header values
mat = utils.concat_components(
signal.demean_filter_window(
arr, **hdr.kwargs(signal.demean_filter_window)
)
)
_, _, cc, _ = signal.correlation_averages(mat, hdr["phase"], False)
icc = cc < mincc
logger.debug(f"{wvid}: {sum(icc)} traces with CC < {mincc}")
iecn = np.full_like(ind, False)
if (minecn := hdr["min_expansion_coefficient_norm"]) is not None and do_ecn:
arr = signal.demean_filter_window(
arr, **hdr.kwargs(signal.demean_filter_window)
)
ec_score = qc.expansion_coefficient_norm(arr, pha)
iecn = ec_score < minecn
logger.debug(f"{wvid}: {sum(iecn)} traces with ECN < {minecn}")
# Write the full phase ID to the exclude lists
excludes["no_data"] += [core.join_phaseid(iev, sta, pha) for iev in events[ind]]
excludes["snr"] += [core.join_phaseid(iev, sta, pha) for iev in events[isnr]]
excludes["cc"] += [core.join_phaseid(iev, sta, pha) for iev in events[icc]]
excludes["ecn"] += [core.join_phaseid(iev, sta, pha) for iev in events[iecn]]
# Add the exludes already present, in case we don't want to overwrite
if not overwrite:
excludes["no_data"] += excl["phase_auto_nodata"]
excludes["snr"] += excl["phase_auto_snr"]
excludes["cc"] += excl["phase_auto_cc"]
excludes["ecn"] += excl["phase_auto_ecn"]
# Write everything into the exclude dict
if do_nodata:
logger.info(f"Excluding {len(excludes['no_data'])} invalid traces")
excl["phase_auto_nodata"] = excludes["no_data"]
if do_snr:
logger.info(f"Excluding {len(excludes['snr'])} traces due to high SNR")
excl["phase_auto_snr"] = excludes["snr"]
if do_cc:
logger.info(f"Excluding {len(excludes['cc'])} traces due to low CC")
excl["phase_auto_cc"] = excludes["cc"]
if do_ecn:
logger.info(f"Excluding {len(excludes['ecn'])} traces due to low ECN")
excl["phase_auto_ecn"] = excludes["ecn"]
# Save it to file
io.save_yaml(exf, excl)
[docs]
def amplitude_entry(
config: core.Config,
directory: Path = Path(),
iteration: int = 0,
overwrite: bool = False,
) -> None:
"""Compute relative amplitude measurements and save to file.
This function is called when executing 'relmt amplitude' from the command line.
Parameters
----------
config:
Configuration object with amplitude measurement parameters. Content of
the file read by the '--config' option.
directory:
Root directory of the project, containing the 'align?/' and 'amplitude/'
subfolders. Path of the file referenced by the '--config' option.
iteration:
Read waveforms from this alignment iteration. `0`: read from 'data/',
`>0` read from 'align?'. Number parsed to the '--alignment' option.
overwrite:
Overwrite existing amplitude measurement and passband files. Activated
by '--overwrite'
"""
ampdir = directory / "amplitude"
if not ampdir.exists():
logger.info(f"Target directory does not exist: {ampdir}. Creating.")
ampdir.mkdir()
ncpu = config["ncpu"]
try:
evf = directory / config["event_file"]
stf = directory / config["station_file"]
except TypeError:
raise ValueError(
"No event file specified in config. Cannot compute amplitudes."
)
compare_method = config["amplitude_measure"] # direct or indirect
if compare_method not in ["direct", "indirect"]:
raise ValueError(f"Unknown 'amplitude_measure': {compare_method}. Exiting.")
filter_method = config["amplitude_filter"] # auto, manual, or fixed
if filter_method not in ["auto", "manual", "fixed"]:
raise ValueError(f"Unknown 'amplitude_filter': {filter_method}. Exiting.")
exclude = io.read_exclude_file(core.file("exclude", directory=directory))
# Exclude some observations
wvids = core.iterate_waveid(directory, iteration, exclude)
# Read the events
event_dict = io.read_event_table(evf)
station_dict = io.read_station_table(stf)
pasbnds = {}
if filter_method == "manual":
logger.info("Applying filters set in header files.")
# Write the manually set filter corners into the dictionary
for wvid in wvids:
logger.debug(f"Working on: {wvid}")
sta, pha = core.split_waveid(wvid)
try:
hdr = io.read_header(
core.file("waveform_header", sta, pha, iteration, directory),
default_name=core.file(
"waveform_header", n_align=iteration, directory=directory
),
)
except FileNotFoundError as e:
logger.debug(e)
continue
if hdr["highpass"] is None or hdr["lowpass"] is None:
logger.warning(
f"Filter corner not set in header for {wvid}. Setting to nan."
)
pasbnds[wvid] = {
evn: [hdr["highpass"] or np.nan, hdr["lowpass"] or np.nan]
for evn in hdr["events_"]
}
elif filter_method == "auto":
logger.info("Finding filter automatically")
# Read or compute bassbands
bpf = core.file(
"bandpass", directory=directory, suffix=config["amplitude_suffix"]
)
# Read the bandpass if it exists
if bpf.exists() and not overwrite:
logger.info(f"Reading bandpass from file: {bpf}")
with open(bpf, "r") as fid:
pasbnds = yaml.safe_load(fid)
# Compute it if not
else:
logger.info("No bandpass file found or overwrite option set. Computing.")
for wvid in wvids:
logger.debug(f"Working on: {wvid}")
sta, pha = core.split_waveid(wvid)
try:
arr, hdr = io.read_waveform_array_header(
sta, pha, iteration, directory
)
except FileNotFoundError as e:
logger.debug(e)
continue
pasbnds[wvid] = signal.phase_passbands(
arr,
hdr,
event_dict,
**config.kwargs(signal.phase_passbands),
exclude=exclude,
)
io.save_yaml(bpf, pasbnds, format_bandpass=True)
logger.info(f"Saved bandpass to file: {bpf}")
else:
raise ValueError(f"Unknown 'amplitude_filter': {filter_method}")
if config["lowpass_event_phase_quantile"] is not None:
pasbnds = utils.common_event_phase_lowpass(
pasbnds, config["lowpass_event_phase_quantile"]
)
# Collect the arguments to the amplitude function
pargs = []
sargs = []
shmd = {}
logger.info("Collecting observations. This may take a while...")
if compare_method == "direct":
for wvid in wvids:
sta, pha = core.split_waveid(wvid)
try:
arr, hdr = io.read_waveform_array_header(sta, pha, iteration, directory)
except FileNotFoundError as e:
logger.debug(e)
continue
# Make sure what's excluded is excluded
ievs, evns = qc.included_events(exclude, **hdr.kwargs(qc.included_events))
if len(ievs) < 1:
logger.warning(f"No events included for {wvid}. Continuing.")
continue
xarr = arr[ievs, :, :]
hdr["events_"] = evns
if hdr["combinations_from_file"] or hdr["combine_neighbors"]:
if hdr["combinations_from_file"]:
pairs = io.read_combinations(
core.file("combination", sta, pha, iteration, directory)
)
logger.info(f"Using {len(pairs)} combinations from file for {wvid}")
else:
nn = hdr["combine_neighbors"]
pairs = utils.nearest_neighbors(nn, evns, event_dict)
logger.info(
f"Using {len(pairs)} combinations from {nn} nearest neighbors for {wvid}"
)
combs = utils.valid_combinations(evns, pairs, pha)
elif pha.startswith("P"):
combs = core.iterate_event_pair(len(evns))
else:
combs = core.iterate_event_triplet(len(evns))
# Make a shared array (Inspired by CatGPT 4o)
dtype = xarr.dtype
shape = xarr.shape
shmd[wvid] = sm.SharedMemory(create=True, size=xarr.nbytes)
shm = shmd[wvid]
sharr = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
sharr[:] = xarr[:] # Copy data into shared memory
mname = shm.name
# Look up correct passband and collect the arguments
if pha.startswith("P"):
for a, b in combs:
hpas, lpas = signal.choose_passband(
[pasbnds[wvid][evns[i]][0] for i in [a, b]],
[pasbnds[wvid][evns[i]][1] for i in [a, b]],
config["min_dynamic_range"],
)
# Passband is within dynamic range
if hpas is not None:
pargs.append(
(
(a, b),
mname,
shape,
dtype,
hdr,
hpas,
lpas,
evns[a],
evns[b],
)
)
if pha.startswith("S"):
for a, b, c in combs:
hpas, lpas = signal.choose_passband(
[pasbnds[wvid][evns[i]][0] for i in [a, b, c]],
[pasbnds[wvid][evns[i]][1] for i in [a, b, c]],
config["min_dynamic_range"],
)
# Passband is within dynamic range
if hpas is not None:
sargs.append(
(
(a, b, c),
mname,
shape,
dtype,
hdr,
hpas,
lpas,
evns[a],
evns[b],
evns[c],
)
)
# Argumets above pertain to these functions
p_amp_fun = amp.paired_p_amplitudes
s_amp_fun = amp.triplet_s_amplitudes
elif compare_method == "indirect":
for wvid in wvids:
sta, pha = core.split_waveid(wvid)
try:
arr, hdr = io.read_waveform_array_header(sta, pha, iteration, directory)
except FileNotFoundError as e:
logger.debug(e)
continue
# Exclude the excluded events
ievs, evns = qc.included_events(exclude, **hdr.kwargs(qc.included_events))
xarr = arr[ievs, :, :]
hdr["events_"] = evns
# Choose highest highpass and lowest lowpass
# Note all event passbands are equl if filter method is "manual"
hpas, lpas = signal.choose_passband(
[pasbnds[wvid][evn][0] for evn in pasbnds[wvid] if evn in evns],
[pasbnds[wvid][evn][1] for evn in pasbnds[wvid] if evn in evns],
config["min_dynamic_range"],
)
# If we are strict (positive min_dynamic_range), don't process.
if hpas is None:
continue
if pha.startswith("P"):
pargs += [(xarr, hdr, hpas, lpas)]
if pha.startswith("S"):
sargs += [(xarr, hdr, hpas, lpas)]
# Arguments above pertain to these functions
p_amp_fun = amp.principal_p_amplitudes
s_amp_fun = amp.principal_s_amplitudes
else:
raise ValueError(f"Unknown 'amplitude_measure': {compare_method}")
# Number of P and S combinations
npc = len(pargs)
nsc = len(sargs)
if compare_method == "indirect":
# n choose 2
npc = sum([(nev := arg[0].shape[0]) * (nev - 1) // 2 - 1 for arg in pargs])
# n choose 3
nsc = sum(
[(nev := arg[0].shape[0]) * (nev - 1) * (nev - 2) // 6 - 1 for arg in pargs]
)
logger.info(f"Collected {npc} P- and {nsc} S-combinations")
logger.info("Computing relative P-amplitudes...")
# First process and save P ...
if ncpu > 1:
with mp.Pool(ncpu) as pool:
result = pool.starmap(p_amp_fun, pargs)
else:
result = [p_amp_fun(*arg) for arg in pargs]
if compare_method == "indirect":
# Result is a list of list. Let's make it just a list
abA = []
for pamplist in result:
abA.extend(pamplist)
else:
abA = result
abA = amp.sort_amplitudes(abA, event_dict, station_dict)
io.save_amplitudes(
core.file(
"amplitude_observation",
sta,
"P",
directory=directory,
suffix=config["amplitude_suffix"],
),
abA,
)
logger.info("Computing relative S-amplitudes...")
# ... later S
if ncpu > 1:
with mp.Pool(ncpu) as pool:
result = pool.starmap(s_amp_fun, sargs)
else:
result = [s_amp_fun(*arg) for arg in sargs]
if compare_method == "indirect":
# Result is a list of list. Let's make it just a list
abcB = []
for samplist in result:
abcB.extend(samplist)
else:
abcB = result
abcB = amp.sort_amplitudes(abcB, event_dict, station_dict)
io.save_amplitudes(
core.file(
"amplitude_observation",
sta,
"S",
directory=directory,
suffix=config["amplitude_suffix"],
),
abcB,
)
# Let's release the shared memory
logger.debug("Releasing shared memory")
for shm in shmd.values():
shm.close()
shm.unlink()
[docs]
def admit_entry(config: core.Config, directory: Path = Path()) -> None:
"""Admit amplitude measurements into the inversion
This function is called when executing 'relmt admit' from the command line.
Parameters
----------
config:
Configuration object with admission parameters. Content of the file read
by the '--config' option.
directory:
Root directory of the project, containing the 'amplitude/' subfolder.
Path of the file referenced by the '--config' option.
"""
# A-priori meassures
max_mis = config["max_amplitude_misfit"]
max_smis = config["max_s_amplitude_misfit"] or max_mis
max_mag_diff = config["max_magnitude_difference"]
min_shared_path = config["min_shared_path"]
max_s1 = config["max_s_sigma1"]
max_ev_dist = config["max_event_distance"]
min_eq = config["min_equations"]
min_sta = config["min_stations"]
max_gap = config["max_gap"]
two_s = config["two_s_equations"]
max_p_equations = config["max_p_equations"]
max_s_equations = config["max_s_equations"]
eq_batches = config["equation_batches"]
keep_ev = config["keep_events"]
exclude = io.read_yaml(core.file("exclude", directory=directory))
exclude_wvid = exclude["waveform"]
ampsuf = config["amplitude_suffix"]
admsuf = config["admit_suffix"]
outsuf = f"{ampsuf}-{admsuf}"
exclude_phase = set(exclude["phase_manual"]).union(
exclude["phase_auto_nodata"]
+ exclude["phase_auto_snr"]
+ exclude["phase_auto_ecn"]
)
exclude_events = set(exclude["event"])
try:
evd = io.read_event_table(directory / config["event_file"])
except TypeError:
if max_mag_diff is not None or max_ev_dist is not None:
raise ValueError(
"No event file specified in config. Cannot apply magnitude or distance criteria."
)
else:
logger.warning(
"No event file specified. Cannot sort output amplitudes by magnitude or azimuth"
)
evd = {}
try:
std = io.read_station_table(directory / config["station_file"])
except TypeError:
logger.warning(
"No station file specified. Cannot sort output amplitudes by station azimuth"
)
std = {}
try:
phd = io.read_phase_table(directory / config["phase_file"])
except TypeError:
raise ValueError(
"No phase file specified in config. Cannot sanitize amplitude file."
)
for phase_type in "PS":
infile = core.file(
"amplitude_observation",
directory=directory,
phase=phase_type,
suffix=ampsuf,
)
amps = io.read_amplitudes(infile, phase_type)
exclude_stations = set(
core.split_waveid(wvid)[0]
for wvid in exclude_wvid
if core.split_waveid(wvid)[1] == phase_type
).union(exclude["station"])
# Read the files again, this time unpack QCed values in arrays
if phase_type == "P":
sta, pha, eva, evb, amp1, mis, cc, *_ = io.read_amplitudes(
infile, phase_type, unpack=True
)
s1 = np.full_like(mis, -np.inf) # Never exclude
amp2 = np.zeros_like(amp1) # No second amplitude
evc = eva
else:
sta, pha, eva, evb, evc, amp1, amp2, mis, cc, s1, *_ = io.read_amplitudes(
infile, phase_type, unpack=True
)
# Direct exlusions using numpy
# Stations
iout = np.isin(sta, list(exclude_stations))
logger.info(
f"Excluded {(nout := sum(iout))} {phase_type}-observations because stations "
"or waveforms are excluded"
)
# Phases
iout |= [
np.any(
(
core.join_phaseid(a, s, ph) in exclude_phase,
core.join_phaseid(b, s, ph) in exclude_phase,
core.join_phaseid(c, s, ph) in exclude_phase,
a in exclude_events,
b in exclude_events,
c in exclude_events,
)
)
for a, b, c, s, ph in zip(eva, evb, evc, sta, pha)
]
logger.info(
f"Excluded {sum(iout) - nout} more {phase_type}-observations from exclude file"
)
# Valid amplitude
iout |= ~np.isfinite(amp1) | ~np.isfinite(amp2)
logger.info(
f"Excluded {sum(iout) - nout} more {phase_type}-observations due bad amplitude"
)
# Misfit
nout = sum(iout)
if np.isfinite(max_mis) and phase_type == "P":
iout |= ~(mis < max_mis) # Exclude also nans
logger.info(
f"Excluded {sum(iout) - nout} more P-observations due to high misfit"
)
if np.isfinite(max_smis) and phase_type == "S":
iout |= ~(mis < max_smis) # Exclude also nans
logger.info(
f"Excluded {sum(iout) - nout} more S-observations due to high misfit"
)
# Sigma1
nout = sum(iout)
if max_s1 < 1.0 and phase_type == "S":
iout |= ~(s1 < max_s1) # Exclude also nans
logger.info(
f"Excluded {sum(iout) - nout} more {phase_type}-observations due high sigma1"
)
iin = (~iout).nonzero()[0] # Included amplitude indices
# Reduce the list using indices
amps = [amps[n] for n in iin]
# Only include observations for which we have takeoff angles, ...
amps = qc.clean_by_valid_takeoff_angle(amps, phd)
# ... within magnitude difference ...
if max_mag_diff is not None:
amps = qc.clean_by_magnitude_difference(amps, evd, max_mag_diff)
# ... within inter-event distance ...
if max_ev_dist is not None:
amps = qc.clean_by_event_distance(amps, evd, max_ev_dist)
if min_shared_path is not None:
amps = qc.clean_by_shared_path(amps, evd, std, min_shared_path)
# Let's not overwrite, but save for later use
if phase_type == "P":
pamps = amps.copy()
else:
samps = amps # No need to copy as we leave the "PS"-loop either way
# Make sure we have enough equations
pamps, samps = qc.clean_by_equation_station_count_gap(
pamps, samps, phd, min_eq, min_sta, max_gap, two_s
)
# Make sure we don't have too many equations
if max_s_equations is not None or max_p_equations is not None:
max_p_equations = max_p_equations or len(pamps)
max_s_equations = max_s_equations or len(samps) * (int(two_s) + 1)
pamps, samps = qc.reduce_equations(
pamps,
samps,
phd,
max_p_equations,
max_s_equations,
two_s,
keep_ev,
min_eq,
min_sta,
max_gap,
max_mis,
max_smis,
max_s1,
eq_batches,
)
if len(pamps) + len(samps) == 0:
raise RuntimeError("No observations left. Relax your admission criteria.")
# Write to file
for phase_type, amps in zip("PS", [pamps, samps]):
if evd and std:
amps = amp.sort_amplitudes(amps, evd, std)
fac = 1
if phase_type == "S" and two_s:
fac = 2
logger.info(f"Selected {len(amps)*fac} {phase_type} equations")
outfile = core.file(
"amplitude_observation",
directory=directory,
phase=phase_type,
suffix=outsuf,
)
io.save_amplitudes(outfile, amps)
[docs]
def solve_entry(
config: core.Config, directory: Path = Path(), do_predict=False, iteration: int = 0
) -> None:
"""Construct linear system ans solve for moment tensors.
This function is called when executing 'relmt solve' from the command line.
Parameters
----------
config:
Configuration object with solver parameters. Content of the file read
by the '--config' option.
directory:
Root directory of the project, containing the 'amplitude/' and 'result/'
subfolders. Path of the file referenced by the '--config' option.
do_predict:
If `True`, additionaly predict amplitudes of the found solution. This
option is slow.
iteration:
Current alignment iteration number. `0`: read from 'data/', `>0` read from
alignment iteration folder. Number parsed to the '--alignment' option.
Only required when do_predict is `True`
"""
# These values require a conscious choice
try:
evf = directory / config["event_file"]
stf = directory / config["station_file"]
phf = directory / config["phase_file"]
refmtf = directory / config["reference_mt_file"]
except TypeError:
raise ValueError("Event, station, phase, reference MT files are required.")
irefs = config["reference_mts"]
if irefs is None:
raise ValueError(f"Need a reference MT, not: {irefs}")
# These values have defaults
max_mis = config["max_amplitude_misfit"]
max_smis = config["max_s_amplitude_misfit"] or max_mis
min_mis = config["min_amplitude_misfit"]
min_weight = config["min_amplitude_weight"]
constraint = config["mt_constraint"]
ncpu = config["ncpu"]
two_s = config["two_s_equations"]
refmt_weight = config["reference_weight"]
nboot = config["bootstrap_samples"]
sfac = 1 + int(two_s)
ampsuf = config["amplitude_suffix"]
admsuf = config["admit_suffix"]
resuf = config["result_suffix"]
insuf = core.combine_suffixes(ampsuf, admsuf)
outsuf = core.combine_suffixes(insuf, resuf)
synsuf = core.combine_suffixes(outsuf, core.synthetic_amplitude_suffix)
mt_elements = ls.mt_elements(constraint)
evd = io.read_event_table(evf)
phd = io.read_phase_table(phf)
stad = io.read_station_table(stf)
mtd = io.read_mt_table(refmtf, harvard_convention=config["harvard_convention"])
# Read amplitudes from file
p_amplitudes = io.read_amplitudes(
core.file(
"amplitude_observation", phase="P", suffix=insuf, directory=directory
),
"P",
)
s_amplitudes = io.read_amplitudes(
core.file(
"amplitude_observation", phase="S", suffix=insuf, directory=directory
),
"S",
)
# Find the connected events, given moment tensors
links = qc.connected_events(irefs, p_amplitudes, s_amplitudes)
incl_ev = list(links) # Only event list, no connection count
# Reduced set of amplitudes, containing only valid observations
pamp_subset = [
pamp
for pamp in p_amplitudes
if all(np.isin([pamp.event_a, pamp.event_b], incl_ev))
]
samp_subset = [
samp
for samp in s_amplitudes
if all(np.isin([samp.event_a, samp.event_b, samp.event_c], incl_ev))
]
n_p = len(pamp_subset)
n_s = len(samp_subset) * sfac
n_ref = len(irefs)
# Build homogenos part of linear system
logger.info(
f"Building linear system of {n_p} P, {n_s} S, and {n_ref * mt_elements} "
"reference equations."
)
isparse = True
if isparse:
# as sparse array
Ah, bh = ls.homogenous_equations_sparse(
pamp_subset,
samp_subset,
incl_ev,
stad,
evd,
phd,
constraint,
None,
two_s,
ncpu,
)
else:
# as dense array
Ah, bh = ls.homogenous_equations(
pamp_subset,
samp_subset,
incl_ev,
stad,
evd,
phd,
constraint,
None,
two_s,
)
logger.info("Computing norms and weights...")
# Weight applied by row, independent of values of Ah
mis_weights = np.vstack(
[
ls.weight_misfit(amp, min_mis, max_mis, min_weight, "P")
for amp in pamp_subset
]
+ [
ls.weight_misfit(amp, min_mis, max_smis, min_weight, "S", two_s)
for amp in samp_subset
]
)
pm_wght, sm_wght, _ = ls.unpack_equation_vector(
mis_weights, n_p, 0, mt_elements, two_s
)
amp_norm = np.vstack(
[1.0 for _ in pamp_subset]
+ [ls.weight_s_amplitude(amp, two_s) for amp in samp_subset]
)
pa_norm, sa_norm, _ = ls.unpack_equation_vector(
amp_norm, n_p, 0, mt_elements, two_s
)
# Apply amplitude norm before measuring the equation norm.
# After this operation, Ah is in the -R to R range (i.e. close to 1).
if isparse:
Ah = (Ah * amp_norm).tocsc(copy=False)
else:
Ah *= amp_norm
# Normalization applied to columns
ev_norm = ls.norm_event_median_value(Ah, mt_elements)
if isparse:
Ah = (Ah * ev_norm).tocsc(copy=False)
else:
Ah *= ev_norm
# Equation norm
eq_norm = ls.condition_by_norm(Ah)
p_norm, s_norm, _ = ls.unpack_equation_vector(eq_norm, n_p, 0, mt_elements, two_s)
# Apply the weights only after measuring the norm
if isparse:
Ah = (Ah * mis_weights).tocsc(copy=False)
Ah = (Ah * eq_norm).tocsc(copy=False)
else:
Ah *= mis_weights
Ah *= eq_norm
# Build inhomogenous equations
Ai, bi = ls.reference_mt_equations(irefs, mtd, incl_ev, constraint)
# Collect and apply weights
mean_moment = mt.mean_moment([mtd[iev] for iev in irefs])
# Indices of reference events
incl_ref = [incl_ev.index(iref) for iref in irefs]
# Normalization of the reference event
refev_norm = ls.reference_mt_event_norm(ev_norm, incl_ref, mt_elements)
Ai *= refmt_weight
bi *= refmt_weight / mean_moment / refev_norm
# Scale of resulting relative moment tensors
ev_scale = mean_moment * ev_norm
# Dense matrix
if isparse:
# Sparse matrix
A = sparse.vstack((Ah, Ai), format="csc")
else:
# Dense matrix
A = sparse.coo_matrix(np.vstack((Ah, Ai))).tocsc()
b = np.vstack((bh, bi))
# Save weights and norms for analysis
io.save_amplitudes(
core.file("amplitude_summary", phase="P", suffix=outsuf, directory=directory),
pamp_subset,
[pa_norm.flat[:], p_norm.flat[:], pm_wght.flat[:]],
["AmplNorm", "EquaNorm", "MisfitWght"],
["{:8.3e}", "{:8.2e}", "{:10.8f}"],
)
io.save_amplitudes(
core.file("amplitude_summary", phase="S", suffix=outsuf, directory=directory),
samp_subset,
[sa_norm[:, 0], s_norm[:, 0], s_norm[:, 1], sm_wght[:, 0]],
["AmplNorm", "EquaNorm1", "EquaNorm2", "MisfitWght"],
["{:8.3e}", "{:8.2e}", "{:8.2e}", "{:10.8f}"],
)
logger.info(
f"Solving linear system of {A.shape[1]} variables and {A.shape[0]} equations..."
)
# Invert and save results
m, residuals = ls.solve_lsmr(A, b, ev_scale)
# Moment residuals
p_residuals, s_residuals, _ = ls.unpack_equation_vector(
residuals, n_p, n_ref, mt_elements, two_s
)
relmts = {
incl_ev[i]: momt
for i, momt in enumerate(mt.mt_tuples(m, constraint))
if any(momt)
}
# Save the results right away
io.write_mt_table(
relmts,
core.file("relative_mt", suffix=outsuf, directory=directory),
harvard_convention=config["harvard_convention"],
)
# Indices in subsets of events
ievp = utils.event_indices(pamp_subset)
ievs = utils.event_indices(samp_subset)
if do_predict:
# Compute synthetic amplitudes and posteori-residuals
logger.info("Computing synthetic amplitudes and residuals")
p_pairs = [(pamp.station, pamp.event_a, pamp.event_b) for pamp in pamp_subset]
p_sta = set(p_pair[0] for p_pair in p_pairs)
Asyn, p_sigmas = amp.synthetic_p(relmts, evd, stad, phd, p_pairs)
s_triplets = [
(samp.station, samp.event_a, samp.event_b, samp.event_c)
for samp in samp_subset
]
s_sta = set(s_trip[0] for s_trip in s_triplets)
Bsyn, _, s_sigmas = amp.synthetic_s(
relmts,
evd,
stad,
phd,
s_triplets,
False,
)
# Load the waveforms to compute a posteori misfits and ccs
arrd = {}
hdrd = {}
for stas, pha in zip([p_sta, s_sta], "PS"):
for sta in stas:
wvid = core.join_waveid(sta, pha)
arrd[wvid], hdrd[wvid] = io.read_waveform_array_header(
sta, pha, iteration, directory
)
logger.info("Computing amplitude misfits and correlations...")
# ppms, ppcs, spms, spcs = amp.predicted_misfit_correlation(
# pamp_subset, Asyn, samp_subset, Bsyn, arrd, hdrd, max_workers=ncpu,
# )
ppms, ppcs = zip(
*[
(
amp.p_misfit(
mat := utils.concat_components(
signal.demean_filter_window(
arrd[core.join_waveid(pamp.station, "P")][
[
(
hdr := hdrd[
core.join_waveid(pamp.station, "P")
]
)["events_"].index(pamp.event_a),
hdr["events_"].index(pamp.event_b),
]
],
hdr["sampling_rate"],
hdr["phase_start"],
hdr["phase_end"],
hdr["taper_length"],
pamp.highpass,
pamp.lowpass,
)
),
Aab,
),
amp.p_reconstruction_correlation(mat),
)
for pamp, Aab in zip(pamp_subset, Asyn)
]
)
# S-wave posterior misfit and correlation
spms, spcs = zip(
*[
(
amp.s_misfit(
mat := utils.concat_components(
signal.demean_filter_window(
arrd[core.join_waveid(samp.station, "S")][
[
(
hdr := hdrd[
core.join_waveid(samp.station, "S")
]
)["events_"].index(samp.event_a),
hdr["events_"].index(samp.event_b),
hdr["events_"].index(samp.event_c),
]
],
hdr["sampling_rate"],
hdr["phase_start"],
hdr["phase_end"],
hdr["taper_length"],
samp.highpass,
samp.lowpass,
)
),
B[0],
B[1],
),
amp.s_reconstruction_correlation(mat, B[0], B[1]),
)
for samp, B in zip(samp_subset, Bsyn)
]
)
logger.info("Collecting results...")
pamp_synthetic = [
core.P_Amplitude_Ratio(
pamp.station,
pamp.phase,
pamp.event_a,
pamp.event_b,
Aab,
ppm,
ppc,
p_sigma[0],
p_sigma[1],
pamp.highpass,
pamp.lowpass,
)
for pamp, Aab, p_sigma, ppm, ppc in zip(
pamp_subset, Asyn, p_sigmas, ppms, ppcs
)
]
samp_synthetic = [
core.S_Amplitude_Ratios(
samp.station,
samp.phase,
samp.event_a,
samp.event_b,
samp.event_c,
B[0],
B[1],
spm,
spc,
ss[0],
ss[1],
ss[2],
samp.highpass,
samp.lowpass,
)
for samp, B, ss, spm, spc in zip(samp_subset, Bsyn, s_sigmas, spms, spcs)
]
# Amplitude oberservations
Aobs = np.array([pamp.amp_ab for pamp in pamp_subset])
B1obs = np.array([samp.amp_abc for samp in samp_subset])
B2obs = np.array([samp.amp_acb for samp in samp_subset])
# Amplitude residuals
Ares = Aobs / Asyn
B1res = B1obs / Bsyn[:, 0]
B2res = B2obs / Bsyn[:, 1]
logger.info("Collecting event statistics...")
# amplitude RMS per event
amp_rmss = {
evn: np.sqrt(
np.sum(utils.signed_log(Ares[ievp[evn]]) ** 2)
+ np.sum(utils.signed_log(B1res[ievs[evn]]) ** 2)
+ np.sum(utils.signed_log(B2res[ievs[evn]]) ** 2)
)
/ (len(ievp[evn]) + sfac * len(ievs[evn]))
for evn in relmts
}
# Synthetic amplitudes
for ph, amps in zip("PS", [pamp_synthetic, samp_synthetic]):
outfile = core.file(
"amplitude_observation",
directory=directory,
phase=ph,
suffix=synsuf,
)
io.save_amplitudes(outfile, amps)
# Save reduced amplitude set to file. Use the output suffix.
io.save_amplitudes(
core.file(
"amplitude_summary", phase="P", suffix=synsuf, directory=directory
),
pamp_subset,
[p_residuals, Ares, mis_weights[:n_p].flat[:], amp_norm[:n_p].flat[:]]
+ [p_norm.flat[:], Asyn, ppms],
[" MomResidual", "AmpResidual", "MisfitWght", "AmplWght", "EquaNorm"]
+ ["PredAab ", "PredMis"],
["{:11.3e}", "{:11.3e}", "{:10.5f}", "{:8.4f}", "{:7.2e}"]
+ ["{:8.2e}", "{:7.5f}"],
)
# Recall: s_residual and eq_norm do not have nan for the 2nd S equation, if we excluded them
io.save_amplitudes(
core.file(
"amplitude_summary", phase="S", suffix=synsuf, directory=directory
),
samp_subset,
[s_residuals[:, 0], s_residuals[:, 1], B1res, B2res]
+ [mis_weights[n_p : n_p + n_s : sfac].flat[:]]
+ [amp_norm[n_p : n_p + n_s : sfac].flat[:]]
+ [s_norm[:, 0], s_norm[:, 1], Bsyn[:, 0], Bsyn[:, 1], spms],
[" MomResidual1", "MomResidual2", "AmpResidual1", "AmpResidual2"]
+ ["MisfitWght", "AmplWght", "EquaNorm1", "EquaNorm2", "PredBabc"]
+ ["PredBacb", "PredMis"],
["{:12.3e}", "{:12.3e}", "{:12.3e}", "{:12.3e}", "{:10.5f}"]
+ ["{:8.4f}", "{:9.3e}", "{:9.3e}", "{:8.2e}", "{:8.2e}", "{:7.5f}"],
)
# Moment RMS per event
mom_rmss = {
evn: np.sqrt(
np.sum(p_residuals[ievp[evn]] ** 2) + np.sum(s_residuals[ievs[evn], :] ** 2)
)
/ (len(ievp[evn]) + 2 * len(ievs[evn]))
for evn in relmts
}
# Average misfits and correlation coefficients
misps, ccps = np.array([(amp.misfit, amp.correlation) for amp in pamp_subset]).T
misss, ccss = np.array([(amp.misfit, amp.correlation) for amp in samp_subset]).T
avmiss = {
evn: np.mean(np.concatenate((misps[ievp[evn]], misss[ievs[evn]])))
for evn in relmts
}
avccs = {
evn: utils.fisher_average(np.concatenate((ccps[ievp[evn]], ccss[ievs[evn]])))
for evn in relmts
}
gaps = angle.azimuth_gap(phd, pamp_subset, samp_subset)
if not do_predict:
# We need predictions to compute the amplitude RMS
amp_rmss = {evn: np.nan for evn in relmts}
# Bootstrap
if nboot is not None and nboot > 0:
logger.info(f"Computing {nboot} bootstrap samples...")
m_boots = ls.bootstrap_lsmr(A, b, ev_scale, n_p, n_s, nboot, 0, 1)
# Convert to MT dict
# bootmts = {i: [] for i in range(m_boots.shape[1] // mt_elements)}
bootmts = {i: [] for i in incl_ev}
for m_boot in m_boots:
for i, momt in enumerate(mt.mt_tuples(m_boot, constraint)):
bootmts[incl_ev[i]].append(momt)
# Make and save a
io.write_mt_table(
bootmts,
core.file("relative_mt", suffix=outsuf + "-boot", directory=directory),
harvard_convention=config["harvard_convention"],
)
boot_rms = mt.norm_rms(bootmts, relmts)
boot_kag = mt.kagan_rms(bootmts, relmts)
else:
boot_rms = {}
boot_kag = {}
sumf = core.file("mt_summary", suffix=outsuf, directory=directory)
logger.info(f"Saving full summary to: {sumf}")
io.save_mt_result_summary(
sumf,
evd,
relmts,
gaps,
links,
avmiss,
avccs,
mom_rmss,
amp_rmss,
boot_rms,
boot_kag,
)
[docs]
def plot_alignment_entry(
arrf: Path,
config: core.Config | None = None,
do_exclude: bool = False,
sort: str = "pci",
highlight_events: list[int] = [],
cc_method: str = "calculate",
saveas: str | None = None,
confirm: bool = True,
) -> None:
"""Plot the waveform array and parameters relevant to judging the alignment
Parameters
----------
arrf:
Path to the waveform array file to plot
config:
Configuration object. If given, event and station tables are read from
files specified in the configuration.
do_exclude:
Read the exclude file and only plot events that are not excluded.
sort:
Sorting method for events. See `plot.alignment` for options.
highligh_events:
List of event IDs to highlight in the plot.
cc_method:
Method to obtain the cross-correlation matrix:
* 'calculate': Re-calculate using current header values.
* 'file': Use cross-correlation values from alignment procedure. Faster,
but cc values do not reflect changes in time window or filter band
* 'none': Do not show cc values.
saveas:
If given, save the figure to the given path instead of showing it
"""
# Find where we are
subdir = arrf.parts[-2]
directory = Path(*arrf.parts[:-2])
iteration = 0
if subdir.startswith("align"):
iteration = int(subdir.replace("align", ""))
# Load the header first
hdrf = arrf.with_name(arrf.stem.replace("-wvarr", "-hdr.yaml"))
hdr = io.read_header(
hdrf,
default_name=core.file(
"waveform_header", n_align=iteration, directory=directory
),
)
phase = hdr["phase"]
station = hdr["station"]
event_list = np.array(hdr["events_"])
iin = np.arange(len(event_list), dtype=int)
event_dict = {}
station_dict = {}
if config is not None:
if config["event_file"] is not None:
evf = directory / config["event_file"]
event_dict = io.read_event_table(evf)
if config["station_file"] is not None:
stf = directory / config["station_file"]
station_dict = io.read_station_table(stf)
dest = (station, phase, iteration, directory)
arr = np.load(arrf)
try:
dt_mccc, dt_rms = np.loadtxt(core.file("mccc_time_shift", *dest), unpack=True)
except (FileNotFoundError, ValueError):
dt_mccc = dt_rms = None
try:
dt_pca = np.loadtxt(core.file("pca_time_shift", *dest))
except FileNotFoundError:
dt_pca = None
if do_exclude:
excl = io.read_exclude_file(core.file("exclude", directory=directory))
iin, event_list = qc.included_events(excl, **hdr.kwargs(qc.included_events))
# Get cc from file or recompute
if cc_method == "file":
try:
ccf = core.file("cc_matrix", *dest)
ccij = np.loadtxt(ccf)
except FileNotFoundError:
logger.warning(
f"Cross-correlation file not found: {ccf}. Not showing cc values."
)
ccij = None
elif cc_method == "calculate":
mat = utils.concat_components(
signal.demean_filter_window(
arr[iin, :, :], **hdr.kwargs(signal.demean_filter_window)
)
)
_, ccij, _, _ = signal.correlation_averages(mat, hdr["phase"], False)
elif cc_method == "none":
ccij = None
else:
raise ValueError(f"Unknown cc_method: {cc_method}")
fig, _ = plot.alignment(
arr,
hdr,
dt_mccc,
dt_rms,
dt_pca,
ccij,
event_list,
event_dict,
station_dict,
sort,
highlight_events,
)
if saveas is not None:
fig.savefig(saveas)
elif confirm:
input("Press any key to continue...")
[docs]
def plot_spectra_entry(
arrf: Path,
bandpf: str | None = None,
highlight: list[int] = [],
integrate: bool = False,
saveas: Path | str | None = None,
) -> None:
"""Plot spectra of the waveform array. Highlight events or filter bands
Parameters
----------
arrf:
Path to the waveform array file to plot
bandpf:
Path to the bandpass filter file
highlight:
List of event IDs to highlight in the plot.
integrate:
Integrate waveforms
saveas:
If given, save the figure to the given path instead of showing it
"""
# Find where we are
subdir = arrf.parts[-2]
directory = Path(*arrf.parts[:-2])
iteration = 0
if subdir.startswith("align"):
iteration = int(subdir.replace("align", ""))
# Load the header first
hdrf = arrf.with_name(arrf.stem.replace("-wvarr", "-hdr.yaml"))
hdr = io.read_header(
hdrf,
default_name=core.file(
"waveform_header", n_align=iteration, directory=directory
),
)
arr = np.load(arrf)
if bandpf is None:
# Single band from header
bandpassd = {
core.join_waveid(hdr["station"], hdr["phase"]): defaultdict(
lambda: (hdr["highpass"] or np.nan, hdr["lowpass"] or np.nan)
)
}
else:
bandpassd = io.read_yaml(bandpf)
fig, _ = plot.spectra(arr, hdr, bandpassd, highlight, integrate)
if saveas is not None:
fig.savefig(saveas)
else:
input("Press any key to continue...")
[docs]
def plot_amplitude_entry(
ampfile: Path,
highlight: list[int] = [],
saveas: Path | str | None = None,
) -> None:
"""Plot amlitude observations
Parameters
----------
ampfile:
Path to the amplitude or amplitude-summary file to plot
highlight:
List of event IDs to highlight in the plot.
saveas:
If given, save the figure to this path instead of showing it interactively
"""
if ampfile.stem.startswith("P-"):
amp = io.read_amplitudes(ampfile, "P")
usecols = (12, 13)
elif ampfile.stem.startswith("S-"):
amp = io.read_amplitudes(ampfile, "S")
usecols = (14, 15, 16, 17)
else:
raise ValueError(f"Unknown phase in file name: {ampfile}")
try:
norm_weigts = np.loadtxt(ampfile, usecols=usecols)
norms = norm_weigts[:, :-1]
weights = norm_weigts[:, -1:]
except ValueError:
norms = weights = None
fig, _ = plot.amplitudes(amp, highlight, norms=norms, weights=weights)
fig.suptitle(f"File: {ampfile}")
if saveas is not None:
fig.savefig(saveas)
else:
input("Press any key to continue...")
[docs]
def plot_connections_entry(
ampfile: Path,
sfile: Path | None = None,
highlight: list[int] | None = None,
saveas: Path | str | None = None,
) -> None:
"""Plot graph representation of amplitude-observation event connections.
Parameters
----------
ampfile:
Path to the primary amplitude or amplitude-summary file to plot.
sfile:
Optional second S-amplitude file to combine with the primary file.
highlight:
Event IDs to highlight.
saveas:
If given, save the figure to this path instead of showing it interactively.
"""
if ampfile.stem.startswith("P-"):
amp = io.read_amplitudes(ampfile, "P")
elif ampfile.stem.startswith("S-"):
amp = io.read_amplitudes(ampfile, "S")
else:
raise ValueError(f"Unknown phase in file name: {ampfile}")
s_amp = None
if sfile is not None:
s_amp = io.read_amplitudes(sfile, "S")
ax = plot.amplitude_connections(amp, s_amp, highlight=highlight)
fig = ax.figure
title = f"File: {ampfile}"
if sfile is not None:
title += f"\nS file: {sfile}"
fig.suptitle(title)
if saveas is not None:
fig.savefig(saveas)
else:
input("Press any key to continue...")
# Mapping of sorting/coloring keys to their description
_attr_keys = {
"number": "Event ID",
"name": "Event Name",
"mag": "Input magnitude",
"mw": "Relative moment magnitude",
"gap": "Azimuthal gap (deg)",
"links": "Total links",
"p-links": "P links",
"s-links": "S links",
"moment-rms": "Moment RMS (scaled Nm)",
"amplitude-rms": "Amplitude RMS",
"boot-moment": "normalized Bootstrap Moment (Nm/M0)",
"boot-kagan": "Bootstrap Kagan angle (deg)",
}
[docs]
def plot_mt_entry(
mtfile: Path,
config: core.Config,
highlight: list[int] = [],
overlay_dc_at: float = 1.0,
sort_by: str | None = None,
color_by: str | None = None,
saveas: Path | str | None = None,
) -> None:
"""Plot moment tensors
Parameters
----------
mtfile:
Path to the moment tensor file to plot
config:
Configuration object
highlight:
List of event IDs to highlight in the plot.
overlay_dc_at:
Overlay DC component at this fraction of the full moment
saveas:
If given, save the figure to this path instead of showing it interactively
"""
# Attributes only found in sumary file
summary_atts = [
"gap",
"links",
"p-links",
"s-links",
"moment-rms",
"amplitude-rms",
"boot-moment",
"boot-kagan",
]
mtd = io.read_mt_table(mtfile, harvard_convention=config["harvard_convention"])
evd = io.read_event_table(config["event_file"])
evns = np.array(list(mtd))
names = [evd[evn].name for evn in mtd]
mags = np.array([evd[evn].mag for evn in mtd])
mws = np.array(
[mt.magnitude_of_moment(mt.moment_of_vector(momt)) for momt in mtd.values()]
)
if sort_by in summary_atts or color_by in summary_atts:
try:
gap, plinks, slinks, mrms, arms, brms, bkag = np.loadtxt(
mtfile, usecols=(14, 16, 17, 20, 21, 22, 23), unpack=True
)
except IndexError:
msg = "Too few columns in mtfile. Please give a 'mt_summary' file."
raise IndexError(msg)
else:
gap = plinks = slinks = mrms = arms = brms = bkag = np.full(len(mtd), np.nan)
keys = {
None: np.arange(len(mtd)),
"number": evns,
"name": names,
"mag": -mags,
"mw": -mws,
"gap": gap,
"links": plinks + slinks,
"p-links": plinks,
"s-links": slinks,
"moment-rms": mrms,
"amplitude-rms": arms,
"boot-moment": brms,
"boot-kagan": bkag,
}
try:
isort = np.argsort(keys[sort_by])
except KeyError:
raise ValueError(f"Unknown sort attribute: {sort_by}")
colord = {}
colorn = None
if color_by is not None:
try:
colord = {evn: val for evn, val in zip(evns, keys[color_by])}
colorn = _attr_keys[color_by]
except KeyError:
raise ValueError(f"Unknown coloring attribute: {color_by}")
# Apply sorting
mtd = {iev: mtd[iev] for iev in np.array(list(mtd))[isort]}
fig, _ = plot.mt_matrix(
mtd,
highlight,
values=colord,
valuename=colorn,
overlay_dc_at=overlay_dc_at,
)
fig.suptitle(f"File: {mtfile}")
if saveas is not None:
fig.savefig(saveas)
else:
input("Press any key to continue...")
plot_mt_entry.__doc__ += """
sort_by:
Sorting method for MTs. One of:
- None: as in file
- {:}
""".format(
"\n - ".join(f"'{key}': {_attr_keys[key]}" for key in _attr_keys)
)
plot_mt_entry.__doc__ += """
color_by:
Color MTs by attribute. One of:
- None: no coloring
- {}
""".format(
"\n - ".join(f"'{key}': {_attr_keys[key]}" for key in _attr_keys)
)
[docs]
def make_parser() -> ArgumentParser:
"""Create the ArgumentParser for the relMT command line interface."""
# Common options for various modes
option = {
"overwrite": (
("-o", "--overwrite"),
dict(help="Overwrite existing files", action="store_true"),
),
"config": (
("-c", "--config"),
dict(
type=Path,
help="Use this configuration file",
default=core.file("config"),
),
),
"alignment": (
("-a", "--alignment"),
dict(type=int, nargs="?", help="Alignment iteration", default=0),
),
"highlight": (
("--highlight",),
dict(
type=int,
nargs="+",
help="Event IDs to highligh in the plot",
default=[],
),
),
"exclude": (
("--exclude",),
dict(
action="store_true",
help="Exclude events listed in the exclude file",
),
),
"saveas": (
("--saveas", "-s"),
dict(
type=Path,
help="Save the figure to file",
),
),
}
parser = ArgumentParser(
description="""
Software for computing relative seismic moment tensors"""
)
subpars = parser.add_subparsers(dest="mode")
# Subparsers
init_p = subpars.add_parser("init", help="Initialize default directories and files")
align_p = subpars.add_parser(
"align",
help="Align waveforms",
epilog=("When neither '--pca' nor '--mccc' are given, " "we assume both."),
)
exclude_p = subpars.add_parser(
"exclude",
help=(
"Exclude phase observations from alignment based on criteria in "
"header files"
),
)
amp_p = subpars.add_parser(
"amplitude", help="Measure relative amplitudes on aligned waveforms"
)
admit_p = subpars.add_parser(
"admit",
help=(
"Apply admission parameters from configuration file to "
"amplitude measurements"
),
)
solve_p = subpars.add_parser(
"solve", help="Compute moment tensors from amplitude measurements"
)
# Now set the functions to be called
init_p.set_defaults(command=core.init)
align_p.set_defaults(command=align_entry)
exclude_p.set_defaults(command=exclude_entry)
amp_p.set_defaults(command=amplitude_entry)
admit_p.set_defaults(command=admit_entry)
solve_p.set_defaults(command=solve_entry)
# Subparser arguments
init_p.add_argument(
"directory",
type=Path,
default=".",
nargs="?",
help="Name of the directory to initiate",
)
# Sub arguments of the alignment routine
align_p.add_argument(*option["config"][0], **option["config"][1])
align_p.add_argument(*option["alignment"][0], **option["alignment"][1])
align_p.add_argument(*option["overwrite"][0], **option["overwrite"][1])
align_p.add_argument(
"--mccc",
action="store_true",
help="Align with Multi-Channel Cross Correlation (MCCC)",
)
# Sub arguments of the alignment routine
align_p.add_argument(
"--pca",
action="store_true",
help="Align with principal component analysis (PCA).",
)
# Sub arguments of the exlusion routine
exclude_p.add_argument(
"--nodata",
action="store_true",
help="Exlude data with no data or data containing NaNs",
)
exclude_p.add_argument(
"--snr",
action="store_true",
help=(
"Exlude data with signal to noise ratio lower than "
"'min_signal_noise_ratio' in the station header file"
),
)
exclude_p.add_argument(
"--cc",
action="store_true",
help=(
"Exlude data with correlation coefficient lower than "
"'min_correlation' in the station header file"
),
)
exclude_p.add_argument(
"--ecn",
action="store_true",
help=(
"Exlude data with expansion coefficient norm lower than "
"'min_expansion_coefficient_norm' in the station header file"
),
)
exclude_p.add_argument(
"--overwrite",
"-o",
help=(
"Overwrite existing entries per category. Never overwrites manually"
"exluded phases"
),
action="store_true",
)
exclude_p.add_argument(*option["alignment"][0], **option["alignment"][1])
exclude_p.add_argument(*option["config"][0], **option["config"][1])
exclude_p.add_argument(
"--cc_from_file",
action="store_true",
help=(
"Use cross-correlation values from alignment procedure. Faster, "
"but cc values do not reflect changes in time window or filter band."
),
)
# Amplitude sub-arguments
amp_p.add_argument(*option["config"][0], **option["config"][1])
amp_p.add_argument(*option["alignment"][0], **option["alignment"][1])
amp_p.add_argument(*option["overwrite"][0], **option["overwrite"][1])
# Admit sub-arguments
admit_p.add_argument(*option["config"][0], **option["config"][1])
# Sub arguments of the solve routine
solve_p.add_argument(*option["config"][0], **option["config"][1])
solve_p.add_argument(*option["alignment"][0], **option["alignment"][1])
solve_p.add_argument(*option["overwrite"][0], **option["overwrite"][1])
solve_p.add_argument(
"--predict",
action="store_true",
help=(
"Predict relative amplitudes of the solution and compute "
"prediction misfits"
),
)
# Plot alignment
plot_align_p = subpars.add_parser(
"plot-alignment", help="Plot waveform alignment resuts to screen"
)
plot_align_p.set_defaults(command=plot_alignment_entry)
plot_align_p.add_argument(*option["config"][0], **option["config"][1])
plot_align_p.add_argument(
"file",
type=Path,
help="Path to -wvarr.npy file",
)
plot_align_p.add_argument(
"--sort",
type=str,
help="The sorting to apply: 'pci' (default), 'magnitude', 'none'",
choices=["pci", "magnitude", "none"],
default="pci",
)
plot_align_p.add_argument(*option["highlight"][0], **option["highlight"][1])
plot_align_p.add_argument(*option["exclude"][0], **option["exclude"][1])
plot_align_p.add_argument(
"--cc",
help=(
"Method to obtain the cross-correlation matrix:"
"* calculate: Re-calculate using current header values."
"* file: Use cross-correlation values from alignment procedure. Faster, "
"but cc values do not reflect changes in time window or filter band."
"* none: Do not show cc values."
),
choices=["calculate", "file", "none"],
default="calculate",
)
plot_align_p.add_argument(*option["saveas"][0], **option["saveas"][1])
# Plot spectra
plot_spec_p = subpars.add_parser(
"plot-spectra", help="Plot waveform spectra to screen"
)
plot_spec_p.add_argument(*option["config"][0], **option["config"][1])
plot_spec_p.add_argument(
"waveformfile",
type=Path,
help="Path to -wvarr.npy file",
)
plot_spec_p.add_argument(
"bandpassfile",
nargs="?",
default=None,
help="Path to bandpass.yaml file",
)
plot_spec_p.add_argument(*option["highlight"][0], **option["highlight"][1])
plot_spec_p.add_argument(
"--integrate",
action="store_true",
help="Integrate waveforms (e.g. velocity to displacement)",
)
plot_spec_p.add_argument(*option["saveas"][0], **option["saveas"][1])
# Plot Amplitudes
plot_amp_p = subpars.add_parser(
"plot-amplitude", help="Plot waveform amplitudes to screen"
)
plot_amp_p.add_argument(
"file", type=Path, help="Path to amplitude or amplitude-summary file"
)
plot_amp_p.add_argument(*option["highlight"][0], **option["highlight"][1])
plot_amp_p.add_argument(*option["saveas"][0], **option["saveas"][1])
# Plot amplitude connections
plot_conn_p = subpars.add_parser(
"plot-connections",
help="Plot event connections of relative amplitude observations",
)
plot_conn_p.add_argument(
"file", type=Path, help="Path to primary P- or S-amplitude file"
)
plot_conn_p.add_argument(
"--sfile",
"-f",
type=Path,
default=None,
help="Optional second S-amplitude file",
)
plot_conn_p.add_argument(*option["highlight"][0], **option["highlight"][1])
plot_conn_p.add_argument(*option["saveas"][0], **option["saveas"][1])
# Plot MTs
plot_mt_p = subpars.add_parser("plot-mt", help="Plot waveform spectra to screen")
plot_mt_p.add_argument(*option["config"][0], **option["config"][1])
plot_mt_p.add_argument(
"mtfile",
type=Path,
help="Path to mt summary file",
)
plot_mt_p.add_argument(
"--dc",
type=float,
help="Overlay DC component at this fraction of the full moment (0-1)",
default=1.0,
)
plot_mt_p.add_argument(*option["highlight"][0], **option["highlight"][1])
plot_mt_p.add_argument(
"--sort-by",
type=str,
help=(
"Sorting method for MTs. One of: "
"* " + "\n* ".join(f"'{key}': {desc}" for key, desc in _attr_keys.items())
),
choices=list(_attr_keys.keys()),
default=None,
)
plot_mt_p.add_argument(
"--color-by",
type=str,
help=(
"Coloring method for MTs. One of: "
"* " + "\n* ".join(f"'{key}': {desc}" for key, desc in _attr_keys.items())
),
choices=list(_attr_keys.keys()),
default=None,
)
plot_mt_p.add_argument(*option["saveas"][0], **option["saveas"][1])
return parser
[docs]
def get_arguments(args: list[str] | None = None) -> Namespace:
"""Collect the command line arguments.
Parameters
----------
args:
List of command line arguments. If ``None``, collect via ArgumentParser.
Returns
-------
Parsed arguments
"""
parser = make_parser()
parsed = parser.parse_args(args)
if parsed.mode is None:
parser.print_help(sys.stderr)
sys.exit(1)
return parsed
[docs]
def main(args=None):
"""Entry point for the relMT command line interface
Parameters
----------
args:
Optional list of command line arguments. If ``None``, use
:data:`sys.argv`.
Returns
-------
None
Executes the selected subcommand and exits
"""
# Subdirectory, e.g. A_Muji
parsed = get_arguments(args)
if parsed.mode == "init":
parent = parsed.directory
parsed.command(parent)
return
try:
conff = parsed.config
config = io.read_config(conff)
parent = conff.parent
except AttributeError:
config = core.Config()
parent = Path()
if not parsed.mode.startswith("plot-") and parsed.mode != "admit":
n_align = parsed.alignment
overwrite = parsed.overwrite
# Convert parsers to function kwargs
kwargs = dict(directory=parent)
if parsed.mode == "solve":
kwargs.update(dict(iteration=n_align, do_predict=parsed.predict))
if parsed.mode == "amplitude" or parsed.mode == "align" or parsed.mode == "exclude":
kwargs.update(dict(iteration=n_align, overwrite=overwrite))
if parsed.mode == "align":
if parsed.mccc:
logger.info("Aligning with MCCC")
if parsed.pca:
logger.info("Aligning with PCA")
if not (parsed.pca or parsed.mccc):
logger.info("Aligning with MCCC and PCA")
parsed.pca = True
parsed.mccc = True
kwargs.update(dict(do_pca=parsed.pca, do_mccc=parsed.mccc))
if parsed.mode == "exclude":
kwargs.update(
dict(
do_nodata=parsed.nodata,
do_snr=parsed.snr,
do_cc=parsed.cc,
do_ecn=parsed.ecn,
cc_from_file=parsed.cc_from_file,
)
)
if parsed.mode == "plot-alignment":
plot_alignment_entry(
parsed.file,
config,
parsed.exclude,
parsed.sort,
parsed.highlight,
parsed.cc,
parsed.saveas,
)
return
if parsed.mode == "plot-amplitude":
plot_amplitude_entry(
parsed.file,
parsed.highlight,
parsed.saveas,
)
return
if parsed.mode == "plot-connections":
plot_connections_entry(
parsed.file,
parsed.sfile,
parsed.highlight,
parsed.saveas,
)
return
if parsed.mode == "plot-spectra":
plot_spectra_entry(
parsed.waveformfile,
parsed.bandpassfile,
parsed.highlight,
parsed.integrate,
parsed.saveas,
)
return
if parsed.mode == "plot-mt":
plot_mt_entry(
parsed.mtfile,
config,
parsed.highlight,
parsed.dc,
parsed.sort_by,
parsed.color_by,
parsed.saveas,
)
return
# The command to be executed is defined above for each of the subparsers
parsed.command(config, **kwargs)
if __name__ == "__main__":
main()