# relMT - Program to compute relative earthquake moment tensors
# Copyright (C) 2024 Wasja Bloch, Doriane Drolet, Michael Bostock
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Functions to set up and solve the linear systems"""
import numpy as np
from numpy.typing import NDArray
from scipy.sparse.linalg import lsmr
from scipy.sparse import spmatrix, csc_array, coo_array
from relmt import utils, core
from relmt import mt as relmtmt
import multiprocessing as mp
import logging
logger = core.register_logger(__name__)
# Machine epsilon
EPS = np.finfo(float).eps
def _null_to_eps(arr):
"""Take values below machine precision to machine precision"""
arr[np.abs(arr) < EPS] = EPS
return arr
[docs]
def gamma(azimuth: float, plunge: float) -> np.ndarray:
"""
Produce 3-element direction cosine vector
Parameters
----------
azimuth:
Degree east of north
plunge:
Degree down from horizontal
Returns
-------
``(3,)`` direction cosines along north (x), east (y), down (z) axis
"""
azi = azimuth * np.pi / 180
plu = plunge * np.pi / 180
return np.array([np.cos(azi) * np.cos(plu), np.sin(azi) * np.cos(plu), np.sin(plu)])
[docs]
def dircoeff_general_p(gamma: np.ndarray) -> np.ndarray:
"""
6-element direction coefficeint vector for unconstrained moment tensor
Implementation of Ploudre & Bostock (2019, GJI), Eq. A2
Parameters
----------
gamma:
Direction cosines produced by :func:`gamma`
Returns
-------
``(6,)`` directional coefficients
"""
return np.array(
[
gamma[0] ** 2,
gamma[1] ** 2,
gamma[2] ** 2,
2 * gamma[0] * gamma[1],
2 * gamma[0] * gamma[2],
2 * gamma[1] * gamma[2],
]
)
[docs]
def dircoeff_deviatoric_p(gamma: np.ndarray) -> np.ndarray:
"""
5-element direction coefficeint vector for deviatoric moment tensor
Implementation of Ploudre & Bostock (2019, GJI), Eq. A8
Parameters
----------
gamma:
Direction cosines produced by :func:`gamma`
Returns
-------
``(5,)`` directional coefficients
"""
return np.array(
[
gamma[0] ** 2 - gamma[2] ** 2,
gamma[1] ** 2 - gamma[2] ** 2,
2 * gamma[0] * gamma[1],
2 * gamma[0] * gamma[2],
2 * gamma[1] * gamma[2],
]
)
[docs]
def dircoeff_general_s(
gamma: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
3 6-element direction coefficeint vectors for unconstrained moment tensor
Implementation of Ploudre & Bostock (2019, GJI), Eq. A3
Parameters
----------
gamma:
Direction cosines produced by :func:`gamma`
Returns
-------
``(6,)`` directional coefficients
"""
gs1 = np.array(
[
gamma[0] * (1 - gamma[0] ** 2),
-gamma[0] * gamma[1] ** 2,
-gamma[0] * gamma[2] ** 2,
gamma[1] * (1 - 2 * gamma[0] ** 2),
gamma[2] * (1 - 2 * gamma[0] ** 2),
-2 * gamma[0] * gamma[1] * gamma[2],
]
)
gs2 = np.array(
[
-gamma[0] ** 2 * gamma[1],
gamma[1] * (1 - gamma[1] ** 2),
-gamma[1] * gamma[2] ** 2,
gamma[0] * (1 - 2 * gamma[1] ** 2),
-2 * gamma[0] * gamma[1] * gamma[2],
gamma[2] * (1 - 2 * gamma[1] ** 2),
]
)
gs3 = np.array(
[
-gamma[0] ** 2 * gamma[2],
-gamma[1] ** 2 * gamma[2],
gamma[2] * (1 - gamma[2] ** 2),
-2 * gamma[0] * gamma[1] * gamma[2],
gamma[0] * (1 - 2 * gamma[2] ** 2),
gamma[1] * (1 - 2 * gamma[2] ** 2),
]
)
return gs1, gs2, gs3
[docs]
def dircoeff_deviatoric_s(
gamma: np.ndarray,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
3 5-element direction coefficeint vectors for deviatoric moment tensor
Implementation of Ploudre & Bostock (2019, GJI), Eq. A9
Parameters
----------
gamma: :class:`numpy.ndarray`
Direction cosines produced by :func:`gamma`
Returns
-------
``(5,)`` directional coefficients
"""
gs1 = np.array(
[
gamma[0] * (1 - gamma[0] ** 2 + gamma[2] ** 2),
gamma[0] * (gamma[2] ** 2 - gamma[1] ** 2),
gamma[1] * (1 - 2 * gamma[0] ** 2),
gamma[2] * (1 - 2 * gamma[0] ** 2),
-2 * gamma[0] * gamma[1] * gamma[2],
]
)
gs2 = np.array(
[
gamma[1] * (gamma[2] ** 2 - gamma[0] ** 2),
gamma[1] * (1 - gamma[1] ** 2 + gamma[2] ** 2),
gamma[0] * (1 - 2 * gamma[1] ** 2),
-2 * gamma[0] * gamma[1] * gamma[2],
gamma[2] * (1 - 2 * gamma[1] ** 2),
]
)
gs3 = np.array(
[
gamma[2] * (gamma[2] ** 2 - gamma[0] ** 2 - 1),
gamma[2] * (gamma[2] ** 2 - gamma[1] ** 2 - 1),
-2 * gamma[0] * gamma[1] * gamma[2],
gamma[0] * (1 - 2 * gamma[2] ** 2),
gamma[1] * (1 - 2 * gamma[2] ** 2),
]
)
return gs1, gs2, gs3
[docs]
def distance_ratio(
event_a: core.Event, event_b: core.Event, station: core.Station
) -> float | np.ndarray:
"""Ratio of the distance of `event_a` and `event_b` to `station`
R = distance(event_a, station) / distance(event_b, station)
"""
return utils.cartesian_distance(
*event_a[:3], *station[:3]
) / utils.cartesian_distance(*event_b[:3], *station[:3])
[docs]
def p_equation(
p_amplitude: core.P_Amplitude_Ratio,
in_events: list,
station: core.Station,
event_dict: dict[int, core.Event],
phase_dictionary: dict[str, core.Phase],
nmt: int,
return_sparse: bool = False,
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
"""
Equation with P wave constraint on the liniear system
Relative amplitudes are corrected by direction coefficient g and the
distance ratio from the event pair to the station.
Parameters
----------
p_amplitude:
One relative P amplitude observation
in_events:
Events part of the linear system
station:
Station on which the observation has been made
event_dict:
The full seismic event catalog
phase_dictionary:
All phase observations
nmt:
Number of moment tensor elements
return_sparse:
Return values data and column indices compatible with
:class:`scipy.sparse.coo_matrix`
Returns
-------
``(1, events * nmt)`` line of the left hand side of the linear system if
return_sparse = False
Tuple of ``(2 * nmt,)`` column indices and ``(2 * nmt,)`` values of the
sparse matrix if return_sparse = True
"""
def _gamma(ev, sta, pha):
phid = core.join_phaseid(ev, sta, pha)
azi, inc = phase_dictionary[phid].azimuth, phase_dictionary[phid].plunge
return gamma(azi, inc)
# Set up one line of the A matrix
ampl = p_amplitude.amp_ab
ieva = p_amplitude.event_a
ievb = p_amplitude.event_b
# Make sure amplitude < 1
if abs(ampl) > 1:
# Invert amplitude and switch indexing
ampl = 1 / ampl
ieva = p_amplitude.event_b
ievb = p_amplitude.event_a
ia = in_events.index(ieva)
ib = in_events.index(ievb)
ga = _gamma(ieva, station.name, "P")
gb = _gamma(ievb, station.name, "P")
for iev, g in zip([ieva, ievb], [ga, gb]):
if not np.all(np.isfinite(g)):
msg = "Missing take-off angle for phase: "
msg += f"{core.join_phaseid(iev, p_amplitude.station, 'P')}."
raise ValueError(msg)
rab = distance_ratio(event_dict[ieva], event_dict[ievb], station)
if nmt == 6:
gap = dircoeff_general_p(ga)
gbp = dircoeff_general_p(gb)
elif nmt == 5:
gap = dircoeff_deviatoric_p(ga)
gbp = dircoeff_deviatoric_p(gb)
else:
raise ValueError(f"'nmt must be '5' or '6', not: '{nmt}'")
va = -gap
vb = gbp * ampl * rab
if np.any(_null_to_eps(np.abs([va, vb])) <= EPS):
msg = "Matrix value below machine precission for event combination "
msg += f"{ieva}, {ievb} on station {station.name}"
logging.warning(msg)
if return_sparse:
colis = np.concatenate(
(range(nmt * ia, nmt * ia + nmt), range(nmt * ib, nmt * ib + nmt))
)
valus = np.concatenate((va, vb))
return colis, valus
# Reutrn a line of a dense matrix. Elevate intended zeros to EPS, so
# they are not removed when constructing the sparse matrix later
line = np.zeros(nmt * len(in_events))
line[nmt * ia : nmt * ia + nmt] = _null_to_eps(va)
line[nmt * ib : nmt * ib + nmt] = _null_to_eps(vb)
return np.array(line)
[docs]
def s_equations(
s_amplitude: core.S_Amplitude_Ratios,
in_events: list,
station: core.Station,
event_dict: dict[int, core.Event],
phase_dictionary: dict[str, core.Phase],
nmt: int,
coefficient_indices: tuple[int, int] | None = None,
return_sparse: bool = False,
two_s_equations: bool = True,
) -> np.ndarray | tuple[np.ndarray, np.ndarray]:
"""
Two equations with S-wave constraints on the linear system
Relative amplitudes are corrected by direction coefficient g and the
distance ratio from the event pair to the station.
Parameters
----------
s_amplitude:
One pair of relative S amplitude observations
in_events:
Events part of the linear system
station:
Station on which the observation has been made
event_dict:
The full seismic event catalog
phase_dictionary:
All phase observations
nmt:
Number of moment tensor elements
coefficient_indices:
Select two indices (0, 1, or 2) of directional coefficients to use. If
`None`, chose those with the largest norm
return_sparse:
Return values data and column indices compatible with :class:`scipy.sparse.coo_matrix`
two_s_equations:
Keep the second S equation with the lower norm of the polarization vector
Returns
-------
``(2, events * nmt)`` lines of the left hand side of the linear system if
return_sparse = False, and `two_s_equations` = True, or
``(1, events * nmt)`` if return_sparse = False, and
`two_s_equations` = False
Tuple of ``(6 * nmt,)`` column indices and ``(6 * nmt,)`` values of the
sparse matrix if return_sparse = True and `two_s_equations` = False.
The first set of 3*nmt indices and values corresponds to one line, the
second set to the next.
Tuple of ``(3 * nmt,)`` column indices and ``(3 * nmt,)`` values of the
sparse matrix if return_sparse = True and `two_s_equations` = False.
"""
def _gamma(ev, sta, pha):
phid = core.join_phaseid(ev, sta, pha)
azi, inc = phase_dictionary[phid].azimuth, phase_dictionary[phid].plunge
return gamma(azi, inc)
nev = len(in_events)
line1 = np.zeros((nmt * nev))
line2 = np.zeros((nmt * nev))
ieva = s_amplitude.event_a
ievb = s_amplitude.event_b
ievc = s_amplitude.event_c
# Indices in the linear system
ia = in_events.index(ieva)
ib = in_events.index(ievb)
ic = in_events.index(ievc)
ga = _gamma(ieva, station.name, "S")
gb = _gamma(ievb, station.name, "S")
gc = _gamma(ievc, station.name, "S")
for iev, g in zip([ieva, ievb, ievc], [ga, gb, gc]):
if not np.all(np.isfinite(g)):
logging.warning(
f"Missing take-off angle in event {iev}. Retruning all zeros"
)
return np.array([line1, line2])
amp_abc = s_amplitude.amp_abc
amp_acb = s_amplitude.amp_acb
rab = distance_ratio(event_dict[ieva], event_dict[ievb], station)
rac = distance_ratio(event_dict[ieva], event_dict[ievc], station)
# g[abc]s are 3-tuple. Only two are independent
if nmt == 6:
gas = dircoeff_general_s(ga)
gbs = dircoeff_general_s(gb)
gcs = dircoeff_general_s(gc)
elif nmt == 5:
gas = dircoeff_deviatoric_s(ga)
gbs = dircoeff_deviatoric_s(gb)
gcs = dircoeff_deviatoric_s(gc)
else:
raise ValueError(f"'nmt must be '5' or '6', not: '{nmt}'")
# Now find set of directional coefficients that records most of the amplitude:
# Choose the coefficients with the two highest norms
if coefficient_indices is None:
g0 = np.concatenate((gas[0], gbs[0], gcs[0]))
g1 = np.concatenate((gas[1], gbs[1], gcs[1]))
g2 = np.concatenate((gas[2], gbs[2], gcs[2]))
# But first check which elements are above machine precission
iis = np.nonzero(np.all(np.abs([g0, g1, g2]) > EPS, axis=1))[0]
if len(iis) == 2:
i1, i2 = iis
else:
# Keep order of equal elements, so tests succeed.
i1, i2 = np.argsort(np.linalg.norm((g0, g1, g2), axis=1), kind="stable")[
[1, 2]
]
else:
i1 = coefficient_indices[0]
i2 = coefficient_indices[1]
a1 = -gas[i1]
b1 = amp_abc * rab * gbs[i1]
c1 = amp_acb * rac * gcs[i1]
a2 = -gas[i2]
b2 = amp_abc * rab * gbs[i2]
c2 = amp_acb * rac * gcs[i2]
if np.any(_null_to_eps(np.abs([a1, b1, c1, a2, b2, c2])) < EPS):
msg = "Matrix value below machine precission for event combination "
msg += f"{ieva}, {ievb}, {ievc} on station {station.name}"
logging.warning(msg)
# For S waves, each event triplet has two lines in the matrix
if return_sparse:
ias = range(nmt * ia, nmt * ia + nmt)
ibs = range(nmt * ib, nmt * ib + nmt)
ics = range(nmt * ic, nmt * ic + nmt)
if not two_s_equations:
colis = np.concatenate((ias, ibs, ics))
valus = np.concatenate((a1, b1, c1))
else:
colis = np.concatenate((ias, ibs, ics, ias, ibs, ics))
valus = np.concatenate((a1, b1, c1, a2, b2, c2))
return colis, valus
# Return lines of a dense matrix. Elevate intended zeros to EPS, so
# they are not removed when constructing the sparse matrix later
line1[nmt * ia : nmt * ia + nmt] = _null_to_eps(a1)
line1[nmt * ib : nmt * ib + nmt] = _null_to_eps(b1)
line1[nmt * ic : nmt * ic + nmt] = _null_to_eps(c1)
if not two_s_equations:
return np.array([line1])
line2[nmt * ia : nmt * ia + nmt] = _null_to_eps(a2)
line2[nmt * ib : nmt * ib + nmt] = _null_to_eps(b2)
line2[nmt * ic : nmt * ic + nmt] = _null_to_eps(c2)
return np.array([line1, line2])
[docs]
def weight_misfit(
amplitude: core.S_Amplitude_Ratios | core.P_Amplitude_Ratio,
min_misfit: float,
max_misfit: float,
min_weight: float,
phase: str,
two_s_equations: bool = True,
) -> np.ndarray:
"""Weights [min_weight ... 1] for each row of the amplitude matrix by misfit
Maps misfit values above `max_misfit` to `min_weight` and below `min_misfit`
to unit weight.
weight = 1 - (mis - min_misfit) * (1 - min_weight) / (max_misfit - min_misfit)
Note
----
A misfit of 0 indicates that the reconstruction and the target waveform are
identical. A misfit of 1 that the residual of the waveform reconstruction
has the same norm that the waveform itself.
Parameters
----------
amplitude:
One amplitude ratio
min_misfit:
Minimum misfit that receives unit weight
max_misfit:
Maximum allowed misfit receives zero weight
min_weight:
Weight of the maximum misfit.
phase:
If 'S', return each weight twice to account for two equations per S wave
observation
Returns
-------
Weight of shape ``(1,)`` if `phase=P`, or ``(2,1)`` if `phase=S`
Raises
------
ValueError:
If 'phase' is not 'P' or 'S'
"""
def mis_fun(mis):
# Maps min_misfit -> 1.0, max_misfit -> min_weight
wght = 1 - (mis - min_misfit) * (1 - min_weight) / (max_misfit - min_misfit)
return max(min(wght, 1.0), min_weight)
if phase.upper() == "P":
return np.array(mis_fun(amplitude.misfit))
elif phase.upper() == "S":
if two_s_equations:
return np.array(2 * [mis_fun(amplitude.misfit)])[:, np.newaxis]
return np.array([mis_fun(amplitude.misfit)])[:, np.newaxis]
else:
raise ValueError("'phase' must be 'P' or 'S'")
[docs]
def weight_s_amplitude(
s_amplitudes: core.S_Amplitude_Ratios, two_s_equations: bool = True
) -> np.ndarray:
"""Weights for each row of the amplitude matrix by S-wave amplitdue
Weight is the inverse of the larger amplitude, but not more than 1.
weight = max(1, (1 / max(abs(ampl.amp_abc, ampl.amp_acb))))
Parameters
----------
s_amplitudes:
One pair of S-amplitude ratios
two_s_equations:
Keep the second S equation with the lower norm of the polarization vector
Returns
-------
``(2, 1)`` column vector of weights if `two_s_equations` is True,
else
``(1, 1)`` column vector of weights
"""
wght = min(1.0, (1 / max(np.abs([s_amplitudes.amp_abc, s_amplitudes.amp_acb]))))
# There are 2 equations per S amplitude reading.
if two_s_equations:
return np.array(2 * [wght])[:, np.newaxis]
return np.array([wght])[:, np.newaxis]
[docs]
def homogenous_equations(
p_amplitudes: list[core.P_Amplitude_Ratio],
s_amplitudes: list[core.S_Amplitude_Ratios],
in_events: list[int],
station_dictionary: dict[str, core.Station],
event_dict: dict[int, core.Event],
phase_dictionary: dict[str, core.Phase],
constraint: str,
s_coefficients: tuple[int, int] | None = None,
two_s_equations: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
"""Homogenous part of the linear system Am = b
Some more description of the function
Parameters
----------
p_amplitudes:
P observations to include in the system
s_amplitudes:
S observations to include in the system
in_events:
Included events. Columns blocks of A and row blocks of b and m will
reference into this vector.
station_dictionary:
Lookup table for station coordinates
event_dict:
The seismic event catalog
phase_dictionary:
Lookup table for ray take-off angles
constraint:
Constraint on the moment tensor solution
s_coefficients:
Indices (0, 1, or 2) of S-wave directional coefficients to use. If
`None`, chose those with the largest norm
two_s_equations:
Keep the second S equation with the lower norm of the polarization vector
Returns
-------
Ah: :class:`numpy.ndarray`
``(p_amplitudes + 2 * s_amplitudes, event_dict * mt_elements)`` left-hand side of the homogenous part of linear system
bh: :class:`numpy.ndarray`
``(event_dict * mt_elements, 1)`` zero column vector, right-hand side of the homogenous linear system
"""
nmt = mt_elements(constraint)
sfac = 1 + int(two_s_equations)
# Number of equations
peq = len(p_amplitudes) # P observations
seq = sfac * len(s_amplitudes) # S observations
neq = peq + seq
nmod = nmt * len(in_events) # Length of model vector
# Set up homogenous matrix and data vector
Ah = np.zeros((neq, nmod))
bh = np.zeros((neq, 1))
# Populate first with P amplitdues
for n, pamp in enumerate(p_amplitudes):
station = station_dictionary[pamp.station]
# Create one P observation
Ah[n, :] = p_equation(
pamp, in_events, station, event_dict, phase_dictionary, nmt
)
# Populate then with S amplitdues
for n, samp in enumerate(s_amplitudes):
station = station_dictionary[samp.station]
# Create two S observations
lines = s_equations(
samp,
in_events,
station,
event_dict,
phase_dictionary,
nmt,
s_coefficients,
False,
two_s_equations,
)
row = peq + sfac * n
Ah[[row, row + 1], :] = lines
return Ah, bh
[docs]
def homogenous_equations_sparse(
p_amplitudes: list[core.P_Amplitude_Ratio],
s_amplitudes: list[core.S_Amplitude_Ratios],
in_events: list[int],
station_dictionary: dict[str, core.Station],
event_dict: dict[int, core.Event],
phase_dictionary: dict[str, core.Phase],
constraint: str,
s_coefficients: tuple[int, int] | None = None,
two_s_equations: bool = True,
ncpu: int = 1,
) -> tuple[csc_array, np.ndarray]:
"""Homogenous part of the linear system Am = b
This functions writes the matrices directly in
:class:`scipy.sparse.coo_matrix` format
Parameters
----------
p_amplitudes:
P observations to include in the system
s_amplitudes:
S observations to include in the system
in_events:
Included events. Columns blocks of A and row blocks of b and m will
reference into this vector.
station_dictionary:
Lookup table for station coordinates
event_dict:
The seismic event catalog
phase_dictionary:
Lookup table for ray take-off angles
constraint:
Constraint on the moment tensor solution
s_coefficients:
Indices (0, 1, or 2) of S-wave directional coefficients to use. If
`None`, chose those with the largest norm
ncpu:
Number of parallel processes
two_s_equations:
Keep the second S equation with the lower norm of the polarization vector
Returns
-------
Ah: :class:`scipy.sparse.coo_matrix`
``(p_amplitudes + 2 * s_amplitudes, event_dict * mt_elements)`` left-hand side of the homogenous part of linear system
bh: :class:`numpy.ndarray`
``(event_dict * mt_elements, 1)`` zero column vector, right-hand side of the homogenous linear system
"""
nmt = mt_elements(constraint)
# Number of equations
peq = len(p_amplitudes) # P observations
seq = (1 + int(two_s_equations)) * len(s_amplitudes) # S observations
neq = peq + seq
bh = np.zeros((neq, 1))
# Collect P-arguments ...
pargs = []
for pamp in p_amplitudes:
station = station_dictionary[pamp.station]
# Only pass subset dicts to avoid huge memory overhead
this_evd = {evn: event_dict[evn] for evn in [pamp.event_a, pamp.event_b]}
this_phd = {
(phid := core.join_phaseid(evn, station.name, "P")): phase_dictionary[phid]
for evn in [pamp.event_a, pamp.event_b]
}
pargs.append((pamp, in_events, station, this_evd, this_phd, nmt, True))
# And S-arguments
sargs = []
for samp in s_amplitudes:
station = station_dictionary[samp.station]
# Only pass subset dicts to avoid huge memory overhead
this_evd = {
evn: event_dict[evn] for evn in [samp.event_a, samp.event_b, samp.event_c]
}
this_phd = {
(phid := core.join_phaseid(evn, station.name, "S")): phase_dictionary[phid]
for evn in [samp.event_a, samp.event_b, samp.event_c]
}
sargs.append(
(
samp,
in_events,
station,
this_evd,
this_phd,
nmt,
s_coefficients,
True,
two_s_equations,
)
)
with mp.Pool(ncpu) as pool:
pcolval = pool.starmap(p_equation, pargs)
with mp.Pool(ncpu) as pool:
scolval = pool.starmap(s_equations, sargs)
vals = np.concatenate([line[1] for line in pcolval + scolval])
cols = np.concatenate([line[0] for line in pcolval + scolval])
prows = [[p] * (2 * nmt) for p in range(peq)]
if two_s_equations:
srows = [
[peq + s] * (3 * nmt) + [peq + s + 1] * (3 * nmt) for s in range(0, seq, 2)
]
else:
srows = [[peq + s] * (3 * nmt) for s in range(seq)]
rows = np.concatenate(prows + srows)
Ah = coo_array((vals, (rows, cols))).tocsc()
return Ah, bh
_mt_elements = {"none": 6, "deviatoric": 5}
[docs]
def mt_elements(constraint: str) -> int:
"""
Number of elements of the moment tensor
Parameters
----------
constraint:
* 'none' - invert for full moment tensor
* 'deviatoric' - no isotropic component
Returns
-------
`6` for 'none', `5` for 'deviatoric'
Raises
------
ValueError:
If unknown constaint
"""
try:
return _mt_elements[constraint]
except KeyError:
msg = (
"Constraint must be one of: "
+ ", ".join(_mt_elements.keys())
+ f". Not: '{constraint}'"
)
raise ValueError(msg)
[docs]
def reference_mt_equations(
reference_events: list[int],
refmt_dict: dict[int, core.MT],
included_events: list[int],
constraint: str,
) -> tuple[np.ndarray, np.ndarray]:
"""
Inhomogenous part of the linear system Am = b
Absolute moment tensor constraint on the linear system
Parameters
----------
reference_events:
Indices to the reference events
refmt_dict:
Lookup table from event index to moment tensor
included_events:
Event IDs that form the columns of the linear system
constraint:
Constraint to impose on the moment tensor
Returns
-------
Ai: :class:`numpy.ndarray`
``(reference_events * mt_elements, number_events * mt_elements)``
Left-hand side of the inhomogenous part of the linear system.
bi: :class:`numpy.ndarray`
``(reference_events * mt_elements, 1)``
Elements of the reference moment tensor, right-hand side of the
inhomogenous part of the linear system.
"""
nmt = mt_elements(constraint)
nev = len(included_events)
nref = len(reference_events)
bi = np.zeros((nmt * nref, 1))
Ai = np.zeros((nmt * nref, nmt * nev))
for n, evid in enumerate(reference_events):
# On which column in the matrix lies the reference event
iev = included_events.index(evid)
# Identity matrix for reference event
Ai[n * nmt : n * nmt + nmt, :] = _reference_mt_matrix(iev, nev, nmt)
# Reference MT refomated into column vector
bi[n * nmt : n * nmt + nmt] = _reference_mt_data_vector(
refmt_dict[evid],
constraint=constraint,
)
return Ai, bi
[docs]
def reference_mt_event_norm(
ev_norm: np.ndarray, ref_mts: list[int], nmt: int
) -> np.ndarray:
"""Pull event normalization factor for reference moment tensor out of matrix
event normalization"""
indices = [iev * nmt + mt_element for iev in ref_mts for mt_element in range(nmt)]
return ev_norm[indices][:, np.newaxis]
def _reference_mt_matrix(
reference_index: int, number_of_events: int, nmt: int
) -> np.ndarray:
"""
Left-hand side of the inhomogenous part of the linear system.
Identity matrix that imposes one reference moment tensor constraint.
Parameters
----------
reference_index:
Event index of the reference event
number_of_events:
Total number of events considered
nmt:
Number of elements of the moment tensor
Returns
-------
:class:``numpy.ndarray``
``(nmt, nmt*number_of_events)`` identity matrix with ones at inidices
corresponding to event columns and mt_element rows
"""
lines = np.zeros((nmt, nmt * number_of_events))
lines[:, nmt * reference_index : nmt * reference_index + nmt] = np.eye(nmt)
return np.array(lines)
def _reference_mt_data_vector(mt: core.MT, constraint: str) -> np.ndarray:
"""
Column vector holding the reference moment tensor
Parameters
----------
mt:
Tuple containing reference moment tensor
constraint:
Which constraint to apply to the moment tensor
Returns
-------
:class:``numpy.ndarray``
Column vector of the reference moment tensor
Warns
-----
When a deviatoric constraint is supplied, but `mt` is > 1% isotropic
"""
moment = relmtmt.moment_of_tensor(relmtmt.mt_array(mt))
if constraint == "none":
return np.array(mt)[:, np.newaxis]
elif constraint == "deviatoric":
if (iso := np.abs(np.sum([mt.nn, mt.ee, mt.dd])) / moment) > 0.01:
logger.warning(
"Reference moment tensor is not deviatoric ({:.0f}% isotorpic)".format(
iso * 100
)
)
return np.array([mt.nn, mt.ee, mt.ne, mt.nd, mt.ed])[:, np.newaxis]
else:
raise ValueError(
f"'contraint' must be 'none' or 'deviatoric', not: '{constraint}'"
)
[docs]
def condition_by_norm(mat: np.ndarray, n_homogenous: int | None = None) -> np.ndarray:
"""
Weights to condition the homogenous part of the amplitude matrix
Each weight is the inverse L2-norm of the non-zero values of each equation
Parameters
----------
mat:
``(N, ev*nmt)`` amplitude matrix
n_homogenous:
Number of rows that constitute the homogenous part of the matrix. If
given, only produce weights for the first 'n_homogenous' lines of the
matrix.
Returns
-------
``(1, ev*nmt)`` column vector of equation normalization factors
"""
if n_homogenous is None:
n_homogenous = mat.shape[0]
# TODO: This is a bottleneck. Vectorize computation of factors
factors = np.ones(mat.shape[0])
for n, line in enumerate(mat[:n_homogenous, :]):
try:
# We're dealing with a sparse array, for which devision is not defined
norm = 1 / np.linalg.norm(line[line.nonzero()].asformat("array"))
except AttributeError:
norm = 1 / np.linalg.norm(line[line.nonzero()])
if ~np.isfinite(norm):
logger.warning(f"Non finite norm for Eq. {n}. Setting to 0.")
norm = 0.0
factors[n] = norm
# Column vector multiplies as expected with matrix rows
return factors[:, np.newaxis]
[docs]
def solve_lsmr(
A: np.ndarray, b: np.ndarray, ev_scale: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
"""
Solve linear system using :func:`scipy.sparse.linalg.lsmr`
Parameters
----------
A:
Matrix A in the linear system Ax = b of shape ``(M, N)``
b:
Data vector b ``(1,M)`` in the linear system Ax = b
ev_scale:
Event-wise condition factors ``(1,M)`` A was devided by
Returns
-------
x: :class:`numpy.ndarray`
The resulting model vector
eps: :class:`numpy.ndarray`
Residuals Am - b -> 0. Residuals are computed before multiplying `m`
with `ev_scale`
"""
logger.info("Running lsmr...")
m, istop, itn, *_ = lsmr(A, b, btol=1e-8, atol=1e-8, maxiter=10000)
logger.info(f"lsmr ended after {itn} iterations with exit code {istop}")
# Residuals
# eps = A[:, :] * m
eps = A @ m - b.flatten() # make b 1-dimensional
return m * ev_scale, eps
[docs]
def unpack_equation_vector(
eq_vector: np.ndarray,
p_lines: int,
ref_lines: int,
nmt: int,
two_s_equations: bool = True,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Split vector pertaining to equations of the amplitude matrix (e.g.
residuals, equation norm) into P-, S-, and reference MT parts
Parameters
----------
eq_vector:
Reisdual vector
p_lines:
Number of equations with P-information
ref_lines:
Number of equations with reference MT information
nmt:
Number of elements that parameterize an MT
two_s_equations:
We kept the second S equation
Returns
-------
p_part, s_part, ref_part: :class:`numpy.ndarray`
Vector contianing the part pertaining to the P-, S-, and reference equations
"""
eq_vector = np.array(eq_vector)
peps = eq_vector[:p_lines] # P-amplitude ...
# P and S lines combined, as a positive integer (avoid indexing -0)
s_lines = len(eq_vector) - p_lines - nmt * ref_lines
# S amplitude ...
if two_s_equations:
seps = eq_vector[p_lines : p_lines + s_lines].reshape(-1, 2)
else:
# If we don't have a second S equation, fille second column with NaN
seps = np.full((s_lines, 2), np.nan)
seps[:, 0] = eq_vector[p_lines : p_lines + s_lines].flatten()
reps = eq_vector[p_lines + s_lines :] # Reference
return peps, seps, reps
[docs]
def bootstrap_lsmr(
A: np.ndarray,
b: np.ndarray,
ev_scale: float,
peq: int,
seq: int,
bootstrap_samples: int,
seed: int = 0,
ncpu: int = 1,
) -> np.ndarray:
"""
Draw bootstrap samples from A and solve the modified linear system Am = b
Rows of A a drawn with repetion, where:
- Rows representing P observations are drawn individually.
- Rows representing S observations are drawn as pairs that contain the same
Bacb and Bacb relative amplitudes.
- Rows representing the reference MT are always included
Parameters
----------
A:
``(N, M)`` martix of the linear system Am = b
b:
``(1, N)`` solution vector of the linear system Am = b
ev_scale:
``(1, N)`` event-wise condition factors A was devided by
peq, seq:
Number of equations in A representing P and S wave constraints, respectivley
bootstrap_samples:
Number of random realizations of the linear system
seed:
Random number seed supplied to np.random.default_rng
ncpu:
Number of parallel process to launch
Returns
-------
``(bootstrap_samples * M)`` bootstrap model vectors
"""
def run_lsmr(A, b):
m, istop, *_ = lsmr(A, b, btol=1e-8, atol=1e-8, maxiter=10000)
if istop <= 2:
return m * ev_scale
else:
return np.full_like(m, np.nan)
rng = np.random.default_rng(seed)
n_homogenous = peq + seq
# Always draw the inhomogenous equations
samples_i = np.array((np.arange(n_homogenous, A.shape[0]),) * bootstrap_samples)
# Bootstrap the homogenous part
# Draw single P
samples_p = rng.choice(peq, size=(bootstrap_samples, peq))
# Only draw every 2nd S equation...
samples_s1 = rng.choice(
range(peq, peq + seq, 2), size=(bootstrap_samples, int(seq / 2))
)
# ... and it's pair
samples_s2 = samples_s1 + 1
# Stack P, S, and Reference equations.
# Remeber we iterate over over first dimension, so rows in A are here
# stored in 2nd dimension
samples = np.hstack((samples_p, samples_s1, samples_s2, samples_i))
m_boot = np.full((bootstrap_samples, A.shape[1]), np.nan)
if ncpu <= 1:
for n, isamp in enumerate(samples):
logger.debug(f"Bootstrap: {n}")
m_boot[n, :] = run_lsmr(A[isamp, :], b[isamp, :])
else:
raise NotImplementedError("Parallel processesing needs to be implemented")
return m_boot
[docs]
def solve_irls_sparse(
G: spmatrix,
d: np.ndarray,
tolerance: float = 1e-5,
eps: float = 1e-6,
efac: float = 1.3,
) -> tuple[np.ndarray, np.ndarray]:
"""
Iteratively reweighted least squares for sparse matrices
Solves for m in d = Gm using :func:`scipy.sparse.linalg.lsmr` and iteratiley re-scaling d
Parameters
----------
G:
Matrix G in the linear system
d:
Vector d in the linear system (pair-wise time shifts)
tolerance:
Average absolute change in model update mean(abs(m - m0)) to stop
iteration. Should fall below sampling intervall (Units of m)
eps:
Truncate smallest residuals to this value to stabilize 1/residual
weighting (Units of d)
efac:
Multiply epsilon by this value when iterative solution stagnates
Returns
-------
m : :class:`numpy.ndarray`
Model vector (absolute time shifts per trace)
r : :class:`numpy.ndarray`
Data residuals (unaccounted pair wise time shifts)
"""
# First iteration solve using least squares.
m = lsmr(G, d)[0]
tol = np.inf
it = 0
while tol > tolerance:
tol1 = tol
m0 = m
it = it + 1
# Compute residual vector, and take those near zero values
# to epsilon.
res = np.abs(G @ m0 - d)
res[res < eps] = eps
# Normalize by max value and reciprocate. Take sqrt so as to use the
# LSQR function that requires G not (G.T G)
whgt = np.sqrt(1 / res)
# Now convert to diagonal weighting elements according to 1-norm.
# Extra square root is just to allow the problem to be solved using
# the LSQR (least squares) division, i.e.:
#
# G'*R*(G*m-d)=0 or
# G'*sqrt(R)'*(sqrt(R)*G*m-sqrt(R)*d) = B'*(B*m-d0) = 0.
#
# Thus we solve a modified system where
# A-->B=sqrt(R)*A and
# d-->d0=sqrt(R)*d.
#
# Create the equivalent least-squares problem.
d0 = whgt * d
# Broadcasting whgt across columns.
G0 = G.copy()
G0.data = G0.data * np.take(whgt, G0.indices)
m, istop = lsmr(G0, d0)[:2]
logger.debug("LSMR finished with exit code {:d}".format(istop))
# Evaluate tolerance as mean np.abs(m-m0). This should go to zero as
# solution to NXN subsystem is approached and should fall below sample
# interval dt.
tol = np.mean(np.abs(m - m0))
logger.debug("Iteration {:d} misfit is {:3.1e}".format(it, tol))
# Increase eps if solution starts to stagnate.
if tol > tol1:
eps = eps * efac
logger.debug("eps increased to: {:3.1e}".format(eps))
r = G @ m - d
return m, r