# relMT - Program to compute relative earthquake moment tensors
# Copyright (C) 2024 Wasja Bloch, Doriane Drolet, Michael Bostock
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import numpy as np
from typing import Callable, Iterable
from relmt import core, signal, utils
logger = core.register_logger(__name__)
[docs]
def read_obspy_inventory_files(filenames: Iterable[str]):
"""
Read station files using :func:`obspy.core.inventory.inventory.read_inventory`
Parameters
----------
filenames:
Names of the compliant inventroy files
Returns
-------
:class:`obspy.core.inventory.inventory.Inventory`
Attention
---------
Depends on ``obspy``
"""
try:
from obspy import Inventory, read_inventory
except ModuleNotFoundError:
msg = "Could not import obspy.\n"
msg += "Please install relMT with optional obspy dependencies:\n"
msg += "pip install relmt[obspy]"
raise ModuleNotFoundError(msg)
inv = Inventory()
for fn in filenames:
inv += read_inventory(fn)
return inv
[docs]
def get_utm_zone(
latitudes: Iterable[float], longitudes: Iterable[float]
) -> tuple[int, str]:
"""
Get UTM zone number and letter from coordinates.
Parameters
----------
latitudes, longitudes:
Geographical coordinates to include in estimate
Returns
-------
num: int
Consensus UTM zone number
let: str
Consensus UTM zone letter
Raises
------
LookupError:
If coordinates exceed one zone.
Attention
---------
Depends on ``utm``
"""
try:
import utm
except ModuleNotFoundError:
msg = "Could not import utm.\n"
msg += "Please install relMT with optional obspy dependencies:\n"
msg += "pip install relmt[geo]"
raise ModuleNotFoundError(msg)
# Get possible zone letters
lets = [utm.latitude_to_zone_letter(lat) for lat in latitudes]
nums = [
utm.latlon_to_zone_number(lat, lon) for lat, lon in zip(latitudes, longitudes)
]
zones = set([f"{num}{let}" for num, let in zip(nums, lets)])
if len(zones) > 1:
msg = (
"No UTM zone consensus. Choices are: "
+ ", ".join([i for i in zones])
+ ". Choose one or narrow down coordinates."
)
raise LookupError(msg)
return nums[0], lets[0]
[docs]
def geoconverter_utm2latlon(
northing: float, easting: float, depth: float, utm_num: int, utm_let: str
) -> tuple[float, float, float]:
"""
Convert geographic to local UTM coordinates
Parameters
----------
northing, easting:
Cartesian coordinates (meters)
depth: float
Positive down in meters
utm_num:
UTM zone number
utm_let:
UTM zone letter
Return
------
latitude, longitude, depth : float
Geographical coordinates in degree and kilometers
Attention
---------
Depends on ``utm``
"""
try:
import utm
except ModuleNotFoundError:
msg = "Could not import utm.\n"
msg += "Please install relMT with optional geo dependencies:\n"
msg += "pip install relmt[geo]"
raise ModuleNotFoundError(msg)
latitude, longitude = utm.to_latlon(
easting, northing, utm_num, utm_let, strict=False
)
depth /= 1000
return latitude, longitude, depth
[docs]
def geoconverter_latlon2utm(
latitude: float, longitude: float, depth: float, utm_num: int, utm_let: str
) -> tuple[float, float, float]:
"""
Convert geographic to local UTM coordinates
Parameters
----------
latitude, longitude:
Geographic coordinates (degree)
depth:
Positive down in kilometers
utm_num:
UTM zone number
utm_let:
UTM zone letter
Return
------
north, east, depth : float
Local UTM coordinates in meters
Hint
----
Wrap into a custom function to use as a geoconverter when reading station
or event tables
.. code-block::
def geconverter_oslo(lat, lon, dep):
return geoconverter_latlon2utm(lat, lon, dep, 32, "V")
Attention
---------
Depends on ``utm``
"""
try:
import utm
except ModuleNotFoundError:
msg = "Could not import utm.\n"
msg += "Please install relMT with optional 'geo' dependencies:\n"
msg += "pip install relmt[geo]"
raise ModuleNotFoundError(msg)
east, north, _, _ = utm.from_latlon(
latitude, longitude, force_zone_number=utm_num, force_zone_letter=utm_let
)
depth *= 1000
return north, east, depth
[docs]
def read_station_inventory(
inventory, geoconverter: Callable, strict: bool = True
) -> dict[str, core.Station]:
"""
Parameters
----------
inventory: :class:`obspy.core.inventory.inventory.Inventory`
Must contain network and station codes, and coordinates.
geoconverter:
Function that takes longitude, latitude and depth as arguments and returns local
northing and easting and depth coordinates in meters. Depth conversion unused,
solely required for compatibility with geoconverter argument in
:func:`relmt.io.read_event_table`.
strict:
Raise a KeyError for repeated station codes with unequal coordinates. If
False, last occurrence overwrites previous occurrences
Returns
-------
Station dictionary
Raises
------
ValueError:
When station code contains the reserved character '_'
KeyError:
When repeated station codes are met and `stict=True`
Attention
---------
Depends on ``obspy``
"""
station_dict = dict()
for net in inventory:
for sta in net:
north, east, _ = geoconverter(sta.latitude, sta.longitude, 0)
depth = -sta.elevation
# One entry for standard, altanate and historical station code each
for scode in [sta.code, sta.alternate_code, sta.historical_code]:
if scode:
if "_" in scode:
msg = f"'_' character in station code {scode} is reserved."
raise ValueError(msg)
if scode in station_dict and strict:
n, e, d, _ = station_dict[scode]
if n != north or e != east or d != depth:
msg = f"Station code {scode} already contained."
raise KeyError(msg)
station_dict.update(
{scode: core.Station(north, east, depth, scode)}
)
return station_dict
[docs]
def read_catalog_picks(
catalog, event_dict: dict[str, core.Event], include_at: float = np.inf
) -> dict[str, core.Phase]:
"""Read arrival times from ObsPy Catalog object
Parameters
----------
catalog: :class:`obspy.core.event.catalog.Catalog`
Must contain station codes, phases, and arrival times.
event_dict:
The seismic event catalog
include_at:
Include picks within this time difference between event_dict and
catalog origin times (seconds)
Returns
-------
Phase dictionary
Attention
---------
Depends on ``obspy``
"""
cattimes = [ev.origins[0].time.timestamp for ev in catalog]
evtimes = {ev.time: evn for evn, ev in event_dict.items()}
lut = utils.approx_time_lookup(cattimes, evtimes, include_at=include_at)
phase_dict = dict()
for ev in catalog:
evn = evtimes[lut[ev.origins[0].time.timestamp]]
for pick in ev.picks:
sta = pick.waveform_id.station_code
pha = pick.phase_hint
phid = core.join_phaseid(evn, sta, pha)
phase_dict.update({phid: core.Phase(pick.time.timestamp, np.nan, np.nan)})
return phase_dict
[docs]
def spectrum(
sig: np.ndarray, sampling_rate: float, nfft: int = 0
) -> tuple[np.ndarray, np.ndarray]:
"""
Retrun the spectrum of a 1D signal
Parameters
----------
sig:
``(samples,)`` array holding the waveform
sampling_rate:
in Hertz
nfft:
Number of points in the fft
Returns
-------
Frequency vector (Hz) and corresponding amplitudes (input units)
Attention
---------
Depends on ``multitaper``
"""
try:
from multitaper import MTSpec
except ModuleNotFoundError:
msg = "Could not import multitaper.\n"
msg += "Please install relMT with optional multitaper dependencies:\n"
msg += "pip install relmt[multitaper]"
raise ModuleNotFoundError(msg)
tw = sig.shape[-1] / sampling_rate
freq, spec = MTSpec(sig, dt=1 / sampling_rate, nfft=nfft).rspec()
spec = np.sqrt(spec * tw)
return freq, spec
[docs]
def apparent_corner_frequency(
sig: np.ndarray,
sampling_rate: float,
fmin: float | None = None,
fmax: float | None = None,
) -> float:
"""
Find apparant corner frequency in velocity spectrum of 1D seismogram
Parameters
----------
sig:
``(samples,)`` velocity seismogram
sampling_rate:
in Hertz
fmin, fmax:
Minimum, maximum frequency to test (Hz)
Returns
-------
Maximum of the spectrum (Hz)
Attention
---------
Depends on ``multitaper``
"""
if sig.ndim == 1:
freqs, specs = spectrum(sig, sampling_rate)
else:
# Compute average spectrum of components
freqs, _ = spectrum(sig[0, :], sampling_rate)
specs = np.mean(
[spectrum(sig[ncha, :], sampling_rate)[1] for ncha in range(sig.shape[0])],
axis=0,
)
if0 = 0
if1 = len(freqs)
if fmin is not None:
if0 = np.argmin(abs(freqs - fmin))
if fmax is not None:
if1 = np.argmin(abs(freqs - fmax))
return freqs[if0:if1][np.argmax(specs[if0:if1])][0]
# @core._doc_config_args
[docs]
def optimal_bandpass(
wvarr: np.ndarray,
sampling_rate: float,
data_window: float,
phase_start: float,
phase_end: float,
fmin: float | None = None,
fmax: float | None = None,
min_snr: float = 0,
) -> tuple[float, float] | tuple[None, None]:
"""Find bandpass with signal-to-noise ratio above threshold
Parameters
----------
wvarr:
1D or 2D array holding the velocity event waveform(s)
fmin, fmax:
Minimum, maximum frequency to consider
min_snr:
Minimum signal to noise ratio (dB) to include
Returns
-------
highpass, lowpass: float
Optimal filter corners (Hz)
Attention
---------
Depends on ``multitaper``
"""
# Get signal and nois windows
isig, _ = signal.indices_signal(sampling_rate, phase_start, phase_end, data_window)
# Convert dB to fraction
min_snr = signal.fraction(min_snr)
# Force noise spectrum same size as frequence spectrum
if wvarr.ndim == 1:
# Preprocess
sig = signal.demean(wvarr[isig:])
noi = signal.demean(wvarr[:isig])
# Find frequency range of signal
freqs, specs = spectrum(sig, sampling_rate)
_, specn = spectrum(noi, sampling_rate, nfft=2 * len(freqs) - 1)
else:
sig = signal.demean(wvarr[:, isig:])
noi = signal.demean(wvarr[:, :isig])
# Compute average spectrum of components
freqs, _ = spectrum(sig[0, :], sampling_rate)
specs = np.mean(
[
spectrum(sig[ncha, :], sampling_rate, nfft=2 * len(freqs) - 1)[1]
for ncha in range(sig.shape[0])
],
axis=0,
)
specn = np.mean(
[
spectrum(noi[ncha, :], sampling_rate, nfft=2 * len(freqs) - 1)[1]
for ncha in range(noi.shape[0])
],
axis=0,
)
# If no frequency bracket is given, use spectrums range
if fmin is None:
fmin = np.min(freqs)
if fmax is None:
fmax = np.max(freqs)
# Find sampled frequencies
if0 = np.argmin(abs(freqs - fmin))
if1 = np.argmin(abs(freqs - fmax))
nfreq = if1 - if0
if nfreq < 1:
logger.warning("No band below threshold. Returning NaN.")
return np.nan, np.nan
# Find longest band above SNR threshold
trialbands = np.zeros((nfreq, nfreq))
dpower = np.log10(specs[if0:if1]) - np.log10(specn[if0:if1]) - np.log10(min_snr)
sumdp = np.cumsum(dpower)
for ifreq in range(nfreq):
trialbands[ifreq, ifreq:] = sumdp[ifreq:] - sumdp[ifreq]
# Passband that maximizes SNR
jf0, jf1 = np.unravel_index(np.argmax(trialbands), (nfreq, nfreq))
hpas = freqs[if0 + jf0][0]
lpas = freqs[if0 + jf1][0]
return hpas, lpas
[docs]
def focal_mechanism_to_mt(
strike: float | list[float],
dip: float | list[float],
rake: float | list[float],
magnitude: float | list[float] | None = None,
) -> core.MT | list[core.MT]:
"""Convert focal mechanism to moment tensor"""
try:
from pyrocko.moment_tensor import MomentTensor
except ModuleNotFoundError:
raise ModuleNotFoundError(core._module_hint("pyrocko"))
try:
momt = core.MT(
*MomentTensor(strike=strike, dip=dip, rake=rake, magnitude=magnitude).m6()
)
except TypeError:
# Make we iterate over all elements
if magnitude is None:
magnitude = [None] * len(strike)
assert len(strike) == len(rake) == len(dip) == len(magnitude)
momt = [
core.MT(*MomentTensor(strike=s, dip=d, rake=r, magnitude=m).m6())
for s, d, r, m in zip(strike, dip, rake, magnitude)
]
return momt