Source code for relmt.angle

"""
Functions used to compute take-off angles from 1D velocity model

Copied from SKHASH

Significant portions of the functions in this file are based on the Fortran HASH
code originally written by Jeanne L. Hardebeck & Peter M. Shearer, and all of it
is inspired by their work. Please cite the appropriate references if you use
this code.
"""

# relMT - Program to compute relative earthquake moment tensors
# Copyright (C) 2024 Wasja Bloch, Doriane Drolet, Michael Bostock
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# this program. If not, see <http://www.gnu.org/licenses/>.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import numpy as np
from relmt import core, utils

logger = core.register_logger(__name__)


[docs] def hash_plunge_table( depth_velocity: np.ndarray, depths: np.ndarray, distances: np.ndarray, nray: int, station_depth: float | None = None, ): """ Create tables of takeoff plunge angles given a 1D velocity model. Modified from Public Domain code SKHASH (Skoumal, Hardebeck & Shearer, 2024; Hardebeck & Shearer, 2002) by courtesey of USGS. Parameters ---------- depth_velocity: ``(layers, 2)`` array of depth and seismic velocity (arbitrary consistent units, e.g. m and m/s or km and km/s) depths: Array of depths of the seismic source (unit as above) dists: Array of source-receiver distances (unit as above). Must start at 0-range nray: Number of rays traced station_depth: Depth of the station in the velocity model (unit as above). If None, station is assumed at the top of the velocity model, consistent with SKHASH implementation. Returns ------- ``(distance, depth)`` lookup table of takeoff plunge angles """ # TODO: consider station at depth (borehole) # If the velocity model does not reach to the deepest depth, extend to that # point if depth_velocity[:, 0][-1] < depths[-1]: depth_velocity = np.vstack((depth_velocity, depth_velocity[-1, :])) depth_velocity[-1, 0] = depths[-1] + 1 depth_velocity[-1, 1] = depth_velocity[-1, 1] + 0.001 # Querry source depth and and station distance ndel = len(distances) ndep = len(depths) table = np.full((ndel, ndep), np.nan) pmin = 0 # Pull out irregularly spaced model depth and value... z = depth_velocity[:, 0] alpha = depth_velocity[:, 1] # ... now add one extra layer at the bottom nz = len(z) z = np.hstack((z, z[nz - 1])) alpha = np.hstack((alpha, alpha[nz - 1])) # Re-sample velocity model onto depth-vector # Add velocity steps in-between grid points. for i in range(nz - 1, 0, -1): # Irregular depth of the v model from bottom up for idep in range(ndep - 1, -1, -1): # Regular depth vector if (z[i - 1] <= (depths[idep] - 0.00001)) & ( z[i] >= (depths[idep] + 0.00001) ): z = np.insert(z, i, z[i - 1]) alpha = np.insert(alpha, i, alpha[i - 1]) z[i] = depths[idep] frac = (z[i] - z[i - 1]) / (z[i + 1] - z[i - 1]) alpha[i] = alpha[i - 1] + frac * (alpha[i + 1] - alpha[i - 1]) # Decimal places to round to when comparing depths. dec = -int(np.floor(np.log10(min(np.diff(z[:-1]))))) if station_depth is not None: if station_depth < z[0] or station_depth > z[-1]: raise ValueError( f"Station depth ({station_depth}) is outside the velocity model " f"bounds ({z[0]}, {z[-1]})." ) # Depth index and depth on grid of the station ista = np.argmin(np.abs(z - station_depth)) sdep = z[ista] # Crop velocity model at station depth z = z[ista:] - sdep alpha = alpha[ista:] # do the ray tracing slow = 1 / alpha pmax = slow[0] pstep = (pmax - pmin) / nray npmax = int((pmax + pstep / 2 - pmin) / pstep) + 1 depxcor = np.zeros((npmax, ndep)) # depucor = np.zeros((npmax, ndep)) # Get's initialized at correct size below deptcor = np.zeros((npmax, ndep)) tmp_ind = np.where(depths != 0)[0] depxcor[:, tmp_ind] = -999 deptcor[:, tmp_ind] = -999 ptab = np.linspace(pmin, pmin + pstep * (npmax - 1), num=npmax) # Layer thicknesses and slownesses h_array = z[1:] - z[:-1] utop = slow[:-1] ubot = slow[1:] """LAYERTRACE equivalent""" dx = np.zeros((npmax, len(utop))) dt = np.zeros((npmax, len(utop))) irtr = np.zeros((npmax, len(utop)), dtype=int) qs = np.zeros((npmax, len(utop))) qs[:] = np.nan qr = np.zeros((npmax, len(utop))) qr[:] = np.nan ytop = utop - ptab[:, np.newaxis] ytop_pos_flag = ytop > 0 qs[ytop_pos_flag] = ( ytop[ytop_pos_flag] * (utop + ptab[:, np.newaxis])[ytop_pos_flag] ) qs[ytop_pos_flag] = np.sqrt(qs[ytop_pos_flag]) qr = np.arctan2(qs, ptab[:, np.newaxis]) b = np.ma.divide(-np.log(ubot / utop), h_array) b = b.filled(np.nan) # integral at upper limit, 1/b factor omitted until end etau = qs - qr * ptab[:, np.newaxis] ex = qr # check lower limit to see if we have turning point ybot = ubot - ptab[:, np.newaxis] # if turning point, then no contribution from bottom point y_subzero_flag = ybot <= 0 y_greaterzero_flag = ybot > 0 irtr[y_subzero_flag] = 2 irtr[y_greaterzero_flag] = 1 irtr[~ytop_pos_flag] = 0 dx[y_subzero_flag] = ex[y_subzero_flag] dx = dx / b dtau = etau / b dt[y_subzero_flag] = ( dtau[y_subzero_flag] + (dx * ptab[:, np.newaxis])[y_subzero_flag] ) # converts tau to t q = np.zeros(ybot.shape) q[:] = np.nan q[y_greaterzero_flag] = ybot[y_greaterzero_flag] * ( (ubot + ptab[:, np.newaxis])[y_greaterzero_flag] ) qs = np.sqrt(q) qr = np.arctan2(qs, ptab[:, np.newaxis]) etau = etau - qs + ptab[:, np.newaxis] * qr ex = ex - qr exb = ex / b dtau = etau / b dx[y_greaterzero_flag] = exb[y_greaterzero_flag] dt[y_greaterzero_flag] = ( dtau[y_greaterzero_flag] + (exb * ptab[:, np.newaxis])[y_greaterzero_flag] ) # Ensures values after ray has turned are nan x = (irtr == 0) | (irtr == 2) idx = np.arange(npmax), x.argmax(axis=1) tmp = x[idx] == True idx_1 = idx[0][tmp], idx[1][tmp] + 1 tmp_1 = idx_1[1] < (len(utop) - 1) idx_1 = idx_1[0][tmp_1], idx_1[1][tmp_1] for xx in range(len(idx_1[0])): row = idx_1[0][xx] col = idx_1[1][xx] dx[row, col:] = np.nan dt[row, col:] = np.nan # Distance table, travel time table deltab = np.nansum(dx, axis=1) * 2 tttab = np.nansum(dt, axis=1) * 2 idx_2 = idx[0][tmp], idx[1][tmp] tmp_2 = idx_2[1] < (len(utop) - 1) idx_2 = idx_2[0][tmp_2], idx_2[1][tmp_2] for xx in range(len(idx_2[0])): row = idx_2[0][xx] col = idx_2[1][xx] dx[row, col:] = np.nan dt[row, col:] = np.nan depxcor = np.cumsum(dx, axis=1) deptcor = np.cumsum(dt, axis=1) # Find the corresponding indices, round to avoid floating point issues. output_col_ind = np.where(np.isin(np.round(z, dec), np.round(depths, dec)))[0] - 1 depxcor = depxcor[:, output_col_ind] deptcor = deptcor[:, output_col_ind] depxcor[:, 0] = 0 deptcor[:, 0] = 0 # Original code fails if size changes because velocity model is cropped # depucor[:] = slow[output_col_ind + 1] depucor = np.vstack([slow[output_col_ind + 1]] * npmax) depucor[np.isnan(depxcor)] = -999 depxcor[np.isnan(depxcor)] = -999 deptcor[np.isnan(deptcor)] = -999 # Only this many dephts are recoverd, given cropped velocity model ndep = len(output_col_ind) x = np.diff(depxcor, axis=0) <= 0 idx = x.argmax(axis=0) + 1 # TODO: The line below caused IndexError for certain velocity models # Because x is one smaller than depxor along dimension 0 # (the +1 above points correctly into idx, but not into tmp # tmp = x[idx, np.arange(ndep)] == False # So we substract that one index to find the one that should be set to the # maximum ray index. This should be safe. tmp = x[idx - 1, np.arange(ndep)] == False idx[tmp] = npmax - 1 for idep in range(ndep): # upgoing rays from source xsave_up = depxcor[: (idx[idep]), idep] tsave_up = deptcor[: (idx[idep]), idep] usave_up = depucor[: (idx[idep]), idep] psave_up = -1 * ptab[: (idx[idep])] # downgoing rays from source down_idx = np.where((depxcor[:, idep] != -999) & (deltab != -999))[0][::-1] xsave_down = deltab[down_idx] - depxcor[down_idx, idep] tsave_down = tttab[down_idx] - deptcor[down_idx, idep] usave_down = depucor[down_idx, idep] psave_down = ptab[down_idx] # Merges upgoing and downgoing ray arrays xsave = np.hstack([xsave_up, xsave_down]) tsave = np.hstack([tsave_up, tsave_down]) usave = np.hstack([usave_up, usave_down]) psave = np.hstack([psave_up, psave_down]) # Now search the rays closest to the querry distances scr1 = np.zeros(ndel) for idel in range(1, ndel): del_x = distances[idel] ind = np.where((xsave[:-1] <= del_x) & (xsave[1:] >= del_x))[0] + 1 frac = (del_x - xsave[ind - 1]) / (xsave[ind] - xsave[ind - 1]) t1 = tsave[ind - 1] + frac * (tsave[ind] - tsave[ind - 1]) min_ind = ind[np.argmin(t1)] scr1[idel] = psave[min_ind] / usave[min_ind] angle = np.rad2deg(np.arcsin(scr1)) angle_flag = angle >= 0 angle *= -1 # SKHASH convention angle[angle_flag] += 180 table[:, idep] = angle if distances[0] == 0: # SKHASH convention table[0, :] = 0.0 # relMT convention table -= 90.0 if station_depth is not None and ista > 0: # Move values from surface to station depth table[:, ista:] = table[:, :-ista] # Set values above the station to nan table[:, :ista] = np.nan return table
[docs] def clean_vmodel(vmodel: np.ndarray) -> np.ndarray: """Clean velocity model as to adhere to create_takeoff_angle convention Function courtesy of the SKHASH developers Parameters ---------- vmodel: ``(layers, 3)`` table holding depth,(m) P and S-wave velocity (m/s) Returns ------- ``(layers, 3)`` tabel with monotonically increasing velocities """ if len(vmodel) == 1: msg = ( "Velocity model has only a single velocity. " "There must be at least two points." ) raise IndexError(msg) else: if not (all(np.diff(vmodel[:, 0]) >= 0)): msg = ( "The velocity model is expected to be ordered in terms of " "increasing depth. Ordering it now. " "Unexpected results may occurr." ) logger.warning(msg) vm_sort_ind = np.argsort(vmodel[:, 0]) vmodel = vmodel[vm_sort_ind, :] if np.any(idup := (np.diff(vmodel[:, 0]) == 0)): msg = ( "Duplicate velocities for a given depth. Removing layers: " + ", ".join(map(str, idup.nonzero()[0])) ) logger.warning(msg) vmodel = np.delete( vmodel, np.where(idup)[0], axis=0, ) if vmodel[:, 0].max() < 1000: msg = ( "Velocity model only reaches down to " f"{vmodel[:, 0].max()} m. Is this intentional?" ) logger.warning(msg) if vmodel[:, 1].max() < 1000: msg = ( "Maximum velocity is only " f"{vmodel[:, 1].max()} m/s. Is this intentional?" ) logger.warning(msg) # If there are constant velocity layers, merge those rows drop_constant_vel_ind = np.where(np.diff(vmodel[:, 1]) <= 0)[0] + 1 if len(drop_constant_vel_ind) > 0: msg = ( "Constant velocities for layers: " + ", ".join(map(str, drop_constant_vel_ind)) + ". Deleting." ) logger.warning(msg) vmodel = np.delete(vmodel, drop_constant_vel_ind, axis=0) return vmodel
[docs] def plunge(x1: float, y1: float, z1: float, x2: float, y2: float, z2: float) -> float: """Plunge (degree down from horizontal) from point 1 to point 2""" epi_dist = np.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) return np.arctan2(z2 - z1, epi_dist) * 180 / np.pi
[docs] def azimuth( x1: float | np.ndarray, y1: float | np.ndarray, x2: float | np.ndarray, y2: float | np.ndarray, ) -> float | np.ndarray: """Azimuth (degree x -> y) from point (x1, y1) to (x2, y2)""" azi = (np.arctan2((y2 - y1), (x2 - x1)) * 180 / np.pi) % 360.0 return azi
[docs] def azimuth_gap( phase_dict: dict[str, core.Phase], p_amplitudes: list[core.P_Amplitude_Ratio], s_amplitudes: list[core.S_Amplitude_Ratios], ) -> dict[int, list[float]]: """Azimuthal gap for each event Parameters ---------- phase_dict: Dictionary of phases p_amplitudes: List of P-wave amplitude ratios s_amplitudes: List of S-wave amplitude ratios Returns ------- Mapping of event number to azimuthal gaps in descending order in degrees """ ipev = utils.event_indices(p_amplitudes) isev = utils.event_indices(s_amplitudes) evns = set(ipev).union(isev) # Mapping Phase ID -> azimuth azis = {phid: phase.azimuth for phid, phase in phase_dict.items()} # Mapping Event -> Wave IDs wvobs = { evn: set( [(p_amplitudes[pev].station, p_amplitudes[pev].phase) for pev in ipev[evn]] + [ (s_amplitudes[sev].station, s_amplitudes[sev].phase) for sev in isev[evn] ] ) for evn in evns } gaps = {} for evn in evns: evazis = sorted( [ azis[core.join_phaseid(evn, *sta_pha)] for sta_pha in wvobs[evn] if core.join_phaseid(evn, *sta_pha) in azis ] ) if len(evazis) < 1: logger.debug(f"No azimuths for event {evn}") continue gaps[evn] = sorted( # account for gap spanning 0 deg here --v np.diff(sorted(set([azi for azi in evazis] + [evazis[0] + 360]))) )[::-1] return gaps