#!/usr/bin/env python
# relMT - Program to compute relative earthquake moment tensors
# Copyright (C) 2024 Wasja Bloch, Doriane Drolet, Michael Bostock
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import numpy as np
from scipy.linalg import svd
from scipy.sparse import coo_matrix
from itertools import combinations as combs
from relmt import utils, signal, core, ls, io
from threadpoolctl import threadpool_limits
import mccore
logger = core.register_logger(__name__)
def _test_phase_shape(phase, mtx):
"""Validate phase label and number of events in waveform matrix
Parameters
----------
phase:
Phase identifier, must be ``'P'`` or ``'S'``
mtx:
Waveform matrix ``(events, samples)`` or ``(events, components, samples)``
Raises
------
ValueError:
If `phase` is not ``'P'`` or ``'S'``
IndexError:
When not enough events are provided for the chosen phase
"""
if not (phase.startswith("P") or phase.startswith("S")):
raise ValueError("Phase must start with either 'P' or 'S'")
if (phase.startswith("P") and mtx.shape[1] < 2) or (
phase.startswith("S") and mtx.shape[1] < 3
):
msg = f"mtx contains only {mtx.shape[1]} events. "
msg += "At least 2 are required for P-waves, 3 for S waves."
raise IndexError(msg)
[docs]
def mccc_align(
mtx: np.ndarray,
phase: str,
sampling_rate: float,
maxshift: float,
ndec: int = 1,
combinations: np.ndarray = np.array([]),
do_scc: bool = True,
verbose: bool = False,
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""Compute time shifts that align the seismogram matrix
Applies the multi-channel cross-correlation method (Bostock et al. 2021, BSSA)
Parameters
----------
mtx:
Waveform matrix of shape ``(events, samples)``, with time aligned along
last axis
phase:
Seismic phase ('P' or 'S')
sampling_rate:
Sampling rate of the wavefrom matrix (Hertz)
maxshift:
Maximum shift to test (seconds)
ndec:
Test correlation function at every ndec'th point
combinations:
Only combine these indices of mtx. ``(combinations, 2)`` for P-waves,
``(combinations, 3)`` for S-waves. When empty, combine all events.
do_scc:
Compute S-wave correlation coefficients. May be slow. Else, returned cc
values are all zero
verbose:
Print diagnostic output
Returns
-------
time_shifts: :class:`numpy.ndarray`
Zero-mean time shifts of shape ``(events,)`` that maximize cc
cc: :class:`numpy.ndarray`
Nominal correlation coefficients between events at optimal time lag
dd: :class:`numpy.ndarray`
Pairwise differential arrival times
dd_res: :class:`numpy.ndarray`
Resiudal when computing time lags from pairwise differential arrival
times
pairs: :class:`numpy.ndarray`
Pairwise event combination indices pointing into ``mtx`` of shape
``(C,2)``, where `C` is `events * (events-1) / 2` for P waves and
`events * (events-1) * (events-2)/ 3` for S waves
Raises
------
IndexError:
When mtx contains less than 2 events for P waves, or less than 3
for S waves
ValueError:
When phase is not 'P' or 'S'
"""
_test_phase_shape(phase, mtx)
sampling_interval = 1 / sampling_rate
nev = mtx.shape[0]
icombine = True
if len(combinations) == 0:
icombine = False
ndim = 2
ncombi = int(nev * (nev - 1) / 2)
if phase.startswith("S"):
ndim = 3
ncombi = int(nev * (nev - 1) * (nev - 2) / 6)
# combinations = np.empty((0, ndim), int)
# Pre-combute combinations. This is also handled in the fortran codes,
# but combination needs to be consistent with actual array sizes, so
# parsing an empty array violates this
combinations = np.array(list(combs(range(nev), ndim)))
else:
ncombi = combinations.shape[0]
uc = np.unique(combinations)
if np.any(uc < 0) or np.any(uc > mtx.shape[0]):
outs = [f"{i}" for i in np.concatenate((uc[uc < 0], uc[uc > mtx.shape[0]]))]
msg = "'combinations' indices out of bounds: "
msg += ", ".join(outs)
raise IndexError(msg)
combinations += 1 # Fortran indexing
# Data, sampling_interval, maxlag, ndec
if phase.startswith("P"):
rowi, coli, valu, dd, cc = mccore.mccc_ppf(
mtx.T, sampling_interval, maxshift, ndec, combinations, verbose, nci=ncombi
)
elif phase.startswith("S"):
rowi, coli, valu, dd, cc = mccore.mccc_ssf0(
mtx.T,
sampling_interval,
maxshift,
ndec,
combinations,
do_scc,
verbose,
nci=ncombi,
)
combinations -= 1 # Return to python indexing
# When limiting combinations, the last elements of the matrix of the matrix
# remain unallocated. We detect this as -1 column indices
if icombine:
ivalid = coli > -1
rowi = rowi[ivalid]
coli = coli[ivalid]
valu = valu[ivalid]
dd = dd[: rowi[-1] + 1]
# Make cc cubic (S) or symmetric square (P) matrix
if phase.startswith("S") and not icombine:
cc = utils.reshape_ccvec(cc, nev)
elif phase.startswith("S"):
cc = utils.reshape_ccvec(cc, nev, combinations)
A = coo_matrix((valu, (rowi, coli)), dtype=np.float64).tocsc()
# TODO: The line below is quite a performance bottleneck
dt, dd_res = ls.solve_irls_sparse(A, dd)
# Event pairs in dd[:-1]
evpair = np.zeros((len(dd) - 1, 2), dtype=int)
evpair[:, 0] = coli[0 : 2 * (len(dd) - 1) : 2]
evpair[:, 1] = coli[1 : 2 * (len(dd) - 1) : 2]
return dt, cc, dd, dd_res, evpair
[docs]
def pca_objective(sigma: np.ndarray, phase: str, ns: int) -> float:
"""Principal component alignment objective function
for `P` phases:
.. math::
\\phi = s_0^2 / n
For `S` phases:
.. math::
\\phi = 1 - s_2 / (s_0 + s_1)
Parameters
----------
sigma:
Singular value vector of the singular value decompostion
phase:
Seismic phase to consider. 'P' or 'S'.
ns:
Number of seismograms
Returns
-------
The objective value
"""
# worst 0 -> 1 best
if phase.startswith("P"):
return sigma[0] ** 2 / ns
elif phase.startswith("S"):
return 1 - (sigma[2] / (sigma[0] + sigma[1]))
else:
raise ValueError(f"Unknwon phase: {phase}")
[docs]
def pca_align(
mtx: np.ndarray,
sampling_rate: float,
phase: str,
iterations: int = 50,
dphi: float = 0,
dtime: float = 0,
) -> tuple[np.ndarray, float]:
"""Align waveforms by principal component analysis
Implements method of Bostock et al. (2021, BSSA).
For P-waves, we maximize the first of the singular values :math:`s` of
:math:`n` waveforms:
.. math::
\\phi = s_0^2 / n
For S-waves, we maximize the sum of the first two singular values:
.. math::
\\phi = 1 - s_2 / (s_0 + s_1),
where :math:`\\phi` ranges from `0` (worst) to `1` (best)
Parameters
----------
mtx:
Waveform matrix of shape ``(events, samples)``, with time aligned along
last axis
sampling_rate:
Sampling rate of the wavefrom matrix (Hertz)
phase:
Which seismic phase to consider ('P' or 'S')
iterations:
Maximum number of iterations
dphi:
Minimum change in objective function to stop iteration
dtime:
Smallest maximal time shift to stop iteration
Returns
-------
time_shifts : :class:`numpy.ndarray`
Zero-mean time shifts of shape ``(events,)`` that maximize phi
phi : float
Value of objective function
Raises
------
IndexError:
When mtx contains less than 2 events for P waves, or less than 3
for S waves
ValueError:
When phase is not 'P' or 'S'
"""
_test_phase_shape(phase, mtx)
ns = mtx.shape[0]
tshift_tot = np.zeros(ns)
tshift = np.zeros(ns)
ip = True
if phase.startswith("S"):
ip = False
# Remove mean and normalize.
scomp = signal.norm_power(signal.demean(mtx))
scomp1 = np.zeros(scomp.shape)
# Decompose seismograms into principal components
# s * U are the contributions of U to each seismogram
# s are the singular values ("relative importance")
# Vh are the "principal seismograms"
U, s, Vh = svd(scomp)
# Delta epsilon (objective function)
this_dphi = np.inf
# Concentrate energy on first principal component.
phi_old = pca_objective(s, phase, ns)
logger.info(f"Starting pca_align for {phase} Phase")
logger.debug(f"Phi {phase} = {phi_old :1.1e}")
iter = 0
while (
iter < iterations
and dphi < this_dphi
and (iter == 0 or dtime < max(abs(tshift)))
):
# Keep running sum for total shift
tshift_tot = tshift_tot - tshift
iter += 1
if ip:
# Create rank 1 ...
scomp1 = s[0] * np.outer(U[:, 0], Vh[0, :])
else:
# ... rank 2 approximation from singular values
scomp1 = np.zeros(scomp.shape)
for i in np.arange(2):
scomp1 += s[i] * np.outer(U[:, i], Vh[i, :])
tshift = np.zeros((ns))
# Differentiate waveforms: 2 options - differentiate full waveform or its
# rank 2 approximation. The first can converge more slowly and sometimes
# achieves better objective but not always. Split difference by averaging the two.
scomp1d = (signal.differentiate(scomp + scomp1, sampling_rate)) / 2
# Create perturbation from higher PC's (>2), resembling derivative.
scomp2N = scomp - scomp1
# Compute estimate of shift and correlation coefficients.
# (Eq. 15) Bostock et al. (2021, BSSA)
for i in range(ns):
tshift[i] = np.dot(scomp1d[i, :], scomp2N[i, :]) / np.dot(
scomp1d[i, :], scomp1d[i, :]
)
# Apply zero sum constraint and shift
tshift -= tshift.mean()
scomp = signal.shift(scomp, tshift, sampling_rate)
# Re-computed SVD and compute new objective function
U, s, Vh = svd(scomp)
phi_new = pca_objective(s, phase, ns)
this_dphi = phi_new - phi_old
phi_old = phi_new
logger.debug(
"Iteration #{:d} phi: {:1.3e} (dphi: {:1.3e})".format(
iter, phi_new, this_dphi
)
)
logger.info("Finished after {:} iterations with eobj {:1.3e}".format(iter, phi_old))
return tshift_tot, phi_old
[docs]
def complete_paired_s_lag_times(
evpairs: np.ndarray,
dd: np.ndarray,
cc: np.ndarray,
dd_res: np.ndarray,
method: str = "median",
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Pair-wise from triplet-wise lag times from complete set.
Compute pair-wise from the triplet-wise lag times ``dd`` returned by
:func:`mccc_align`. Assume all combinations are present, i.e.
`npair = events * (events-1) * (events-2) / 3`
Parameters
----------
ev_pair:
``(npair, 2)`` event indices
dd:
Corresponding ``(npair,)`` pairwise differential arrival times
cc:
``(events, events, events)`` Cross correlation matrix
dd_res:
``(npair,)`` residuals of the alignment
method:
- 'median': take the median of the tripletwise lag times
- 'residual' choose the one with the lowest residual
- 'cc' choose the one with the highest cross-correlation coefficient
Returns
-------
ev_pair: :class:`numpy.ndarray`
``(events * (events-1) / 2, 2)`` event indices
dd_pair: :class:`numpy.ndarray`
Corresponding median differential arrival times
cc_pair: :class:`numpy.ndarray`
Corresponding average cross correlation coefficient
res_pair: :class:`numpy.ndarray`
Corresponding residuals of the differential arrival times. If 'method'
is 'median', the RMS of all lag times
"""
logger.debug("Gathering complete pair of S lag times")
# All event indices
nev = evpairs[-1, -1] + 1 # We except the highest event index at the end
# Strip last zero that constrained the linear system to solve for time shifts
dd1 = dd[:-1]
dd_res1 = dd_res[:-1]
if (
(nev * (nev - 1) * (nev - 2) / 3 != evpairs.shape[0])
or (evpairs.shape[0] != dd1.shape[0])
or (nev - 1) != np.max(evpairs)
):
msg = f"Arrays must be of same and correct length. Found {nev} events."
raise IndexError(msg)
# Event pair order for P
ev_pair = np.array(
[(a, b) for a in range(nev - 1) for b in range(a + 1, nev)], dtype=int
)
ddm = np.zeros(ev_pair.shape[0], dtype=float)
ccm = np.zeros_like(ddm)
res = np.zeros_like(ddm)
# Third implicit differential time of the triplets
implicit_dd = dd1[1::2] - dd1[0::2]
implicit_dd_res = dd_res1[1::2] - dd_res1[0::2]
implicit_evpairs = np.array([evpairs[0::2, 1], evpairs[1::2, 1]]).T
for n, ab in enumerate(ev_pair):
iin = np.all(evpairs == ab, axis=-1)
iin2 = np.all(implicit_evpairs == ab, axis=-1)
all_dds = np.concatenate((dd1[iin], implicit_dd[iin2]))
all_dd_res = np.abs(np.concatenate((dd_res1[iin], implicit_dd_res[iin2])))
# Choose a best time and weight from the set
if method == "median":
ddm[n] = np.median(all_dds)
ccm[n] = utils.fisher_average(cc[ab[0], ab[1], :])
res[n] = np.sqrt(np.sum(all_dd_res**2) / all_dds.shape[0])
else:
iother = np.full(cc.shape[2], True)
if method == "cc":
# Ignore zero cc values that arise from triplets containing the
# considered event pair
iother[ab] = False
ibest = np.argmax(cc[ab[0], ab[1], iother])
elif method == "residual":
ibest = np.argmin(all_dd_res)
else:
raise ValueError(f"Unknown 'measure': {method}")
ddm[n] = all_dds[ibest]
ccm[n] = cc[ab[0], ab[1], iother][ibest]
res[n] = all_dd_res[ibest]
return ev_pair, ddm, ccm, res
[docs]
def incomplete_paired_s_lag_times(
evpairs: np.ndarray,
dd: np.ndarray,
cc: np.ndarray,
dd_res: np.ndarray,
combinations: np.ndarray,
method: str = "cc",
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Pair-wise from triplet-wise lag times from incomplete set.
Compute pair-wise from the triplet-wise lag times ``dd`` returned by
:func:`mccc_align` with a `combinations` list parsed. Do not attempt to
compute implicit combinations
Parameters
----------
evpairs:
``(npair, 2)`` event indices
dd:
Corresponding pairwise differential arrival times
cc:
``(events, events, events)`` Cross correlation matrix
dd_res:
``(npair,)`` residuals of the alignment
combinations:
``(npair/2, 3)`` triplet combinations that form the incomplete
observations
method:
- 'residual' choose the one with the lowest residual
- 'cc' choose the one with the highest cross-correlation coefficient
Returns
-------
ev_pair: :class:`numpy.ndarray`
``(events * (events-1) / 2, 2)`` event indices
dd_pair: :class:`numpy.ndarray`
Corresponding median differential arrival times
cc_pair: :class:`numpy.ndarray`
Corresponding average cross correlation coefficient
res_pair: :class:`numpy.ndarray`
Corresponding residuals of the differential arrival times.
"""
logger.debug("Gathering incomplete pair of S lag times")
# Strip last zero that constrained the linear system to solve for time shifts
dd1 = dd[:-1]
dd_res1 = dd_res[:-1]
if evpairs.shape[0] != dd1.shape[0]:
msg = "Arrays must be of same length."
raise IndexError(msg)
# Present event pairs
uniq_pairs = np.array(sorted(set([(a, b) for a, b in evpairs])))
ddm = np.zeros(uniq_pairs.shape[0], dtype=float)
ccm = np.zeros_like(ddm)
res = np.zeros_like(ddm)
for n, ab in enumerate(uniq_pairs):
iin = np.all(evpairs == ab, axis=-1)
# All 3rd events of the combination
iab = np.sum((ab[0] == combinations) + (ab[1] == combinations), axis=-1) == 2
ic = (combinations[iab, :] != ab[0]) & (combinations[iab, :] != ab[1])
# But the 3rd event is not supposed to be on the first index, because
# this would be an "implicit" event pair, which we are not considering
# here.
ic[:, 0] = False
c = combinations[ic.nonzero()] # 3rd events of the 'ab' indexed by iin
all_dds = dd1[iin]
all_dd_res = dd_res1[iin]
if method == "cc":
ibest = np.argmax(cc[ab[0], ab[1], c])
elif method == "residual":
ibest = np.argmin(all_dd_res)
else:
raise ValueError(f"Unknown 'measure': {method}")
ddm[n] = all_dds[ibest]
ccm[n] = cc[ab[0], ab[1], c][ibest]
res[n] = all_dd_res[ibest]
return uniq_pairs, ddm, ccm, res
[docs]
def run(
wvarr: np.ndarray,
header: core.Header,
destination: tuple[str, str, int, str],
do_mccc: bool = True,
do_pca: bool = True,
mccc_combinations: np.ndarray = np.array([]),
lag_times: list[str] = [],
):
"""Align waveforms and save results to disk
Parameters
----------
wvarr:
``(events, channels, samples)`` waveform array
header:
Dictionary holding the processing parameters
destination:
Tuple holding station name, phase type, alignment iteration number and
destination root directory
do_mccc:
Align using multi-channel cross correlation.
do_pca:
Align using principal component analyses. This allows for sub-sample
alignment, but impedes to combine cross correlation lag times of two
successive runs.
mccc_combinations:
When aligning using mccc, only combine these pairs (P-waves) or triplets
(S-waves).
lag_times:
MCCC lag times to save to file:
* "P": P lag times
* "S-median": median of S lag time triplets
* "S-residual": S lag times with lowest residuals
* "S-cc": S lag times with highest cross-correlation values
"""
if not (do_pca or do_mccc):
raise ValueError(
"Nothing to do. Must set at least one of: 'do_pca' or 'do_mccc'"
)
pwv = utils.concat_components(
signal.demean_filter_window(wvarr, **header.kwargs(signal.demean_filter_window))
)
# Number of events
nev = pwv.shape[0]
if do_mccc:
maxshift = header["phase_end"] - header["phase_start"]
# Align using MCCC
dt_cc, _, ddmc, ddresmc, evpairsmc = mccc_align(
pwv,
verbose=True,
maxshift=maxshift,
combinations=mccc_combinations,
do_scc=False,
**header.kwargs(mccc_align),
)
if len(evpairsmc) < 1:
logger.warning(
f"{header['station']}_{header['phase']}: Nothing to process."
)
return
# Shift the traces
arr_shift = signal.shift_3d(wvarr, -dt_cc, **header.kwargs(signal.shift_3d))
wvmat_cc = utils.concat_components(
signal.demean_filter_window(
arr_shift, **header.kwargs(signal.demean_filter_window)
)
)
# Re-compute the cc's on the shifted traces
logger.info("Computing aligned correlation coefficients")
# Recompute correlations with shifted traces
ccijk, ccij, *_ = signal.correlation_averages(
wvmat_cc,
header["phase"],
set_autocorrelation=False,
)
methods = []
if header["phase"] == "P" and "P" in lag_times:
methods = [""]
if header["phase"] == "S":
methods = [
method.split("-")[0] for method in lag_times if method.startswith("S")
]
# Default returned ...
dd = ddmc[:-1] # ... diferential times
dd_res = ddresmc[:-1] # diferential time residuals
ccp = ccij[evpairsmc[:, 0], evpairsmc[:, 1]] # cc values
evpairs = evpairsmc # event pairs
# Use different methods to extract S lag times
for method in methods:
if header["phase"] == "S":
if evpairsmc.shape[0] == nev * (nev - 1) * (nev - 2) / 3:
# Complete set of lag time combinations
evpairs, dd, ccp, dd_res = complete_paired_s_lag_times(
evpairsmc, ddmc, ccijk, ddresmc, method
)
else:
# The set is incomplete, so we can't compute the implicit
# combinations
if method == "median":
continue
evpairs, dd, ccp, dd_res = incomplete_paired_s_lag_times(
evpairsmc, ddmc, ccijk, ddresmc, mccc_combinations, method
)
# Silly format before saving lag times
# Look up actual event names
evns = np.vectorize(header["events_"].__getitem__)(evpairs)
evdd = np.char.mod(
["% 9.0f", "% 9.0f", "%13.6e", "%6.3f", "%13.2e"],
np.hstack(
(evns, dd[:, np.newaxis], ccp[:, np.newaxis], dd_res[:, np.newaxis])
),
)
# Save the lag times
np.savetxt(
core.file("mccc_lag_times", *destination, suffix=method),
evdd,
fmt="%s",
header=" EventA EventB LagTime(s) CC Residual(s)",
)
# Sum alignment residuals for each event
nev = len(header["events_"])
rms = np.zeros(nev)
for iev in range(nev):
try:
iin = np.any(evpairs == iev, axis=-1)
rms[iev] = np.sqrt(np.sum(dd_res[iin] ** 2) / np.sum(iin))
except IndexError:
rms[iev] = np.nan
# Save the time shifts
io.save_results(
core.file("mccc_time_shift", *destination), np.vstack((dt_cc, rms)).T
)
# Save the cc matrix, values are on the shifted traces
io.save_results(core.file("cc_matrix", *destination), ccij)
else:
# If we don't align with mccc, just, preprocess the input array
wvmat_cc = utils.concat_components(
signal.demean_filter_window(
wvarr, **header.kwargs(signal.demean_filter_window)
)
)
dt_cc = np.zeros(wvarr.shape[0])
if do_pca:
# Align using PCA
dt_pca, phi = pca_align(wvmat_cc, dphi=1e-9, **header.kwargs(pca_align))
io.save_results(core.file("pca_time_shift", *destination), dt_pca)
io.save_results(core.file("pca_objective", *destination), phi)
# Apply shift to input array and save
arr_shift = signal.shift_3d(
wvarr,
-dt_pca - dt_cc,
**header.kwargs(signal.shift_3d),
)
# We are saving a numpy array, not matlab
header["matlab_variable"] = None
# Fresh start with QC parameters
header["min_expansion_coefficient_norm"] = None
header["min_signal_noise_ratio"] = None
header["min_correlation"] = None
# Don't accidentialy combine only few neighbors in amplitude
header["combine_neighbors"] = None
# Write out header and array files
arrf = core.file("waveform_array", *destination)
hdrf = core.file("waveform_header", *destination)
# Save everything
np.save(arrf, arr_shift)
header.to_file(hdrf, True)
[docs]
def run_limited(*args):
"""Run the alignment with limited threadpool to avoid oversubscription
See :func:`run` for details on the arguments.
"""
# TODO: Test if we can dynamically adjust limits
with threadpool_limits(limits=1):
run(*args)