from math import pi, sqrt
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import pygimli as pg
import pybert as pb
from . mipmodelling import DCIPMModelling
from pybert.importer import importAsciiColumns, importRes2dInv
[docs]def importTDIPdata(filename, verbose=False):
"""Read in TDIP data."""
header = {}
ext = filename[filename.rfind('.')+1:]
if ext.lower() in ['txt', 'tx2', 'gdd']:
data, header = importAsciiColumns(filename, verbose=verbose,
return_header=True)
elif ext.lower() in ['dat', 'res2dinv']: # what about future BERT format?
data, header = importRes2dInv(filename, verbose=verbose,
return_header=True)
else:
data = pb.importData(filename)
ipkey = ''
testkeys = ['IP_#{}(mV/V)', 'M{}', 'ip{}']
for key in testkeys:
if data.exists(key.format(1)):
ipkey = key
MA = []
i = 1
while data.exists(ipkey.format(i)):
ma = data(ipkey.format(i))
if max(ma) <= 0:
break
MA.append(ma)
i += 1
MA = np.array(MA)
t = np.arange(MA.shape[0]) + 1 # default: t=gate no
if 'ipGateT' in header:
t = header['ipGateT'][:-1] + np.diff(header['ipGateT'])/2
testkeys = ['TM{}']
for key in testkeys:
if data.exists(key.format(1)):
tt = [data(key.format(i+1))[0] for i in range(
MA.shape[0])]
t = np.array(tt)
if sum(t) > 100:
t *= 0.001
return data, MA, t, header
[docs]class TDIPdata():
"""Class managing time-domain induced polarisation (TDIP) field data."""
def __init__(self, filename=None, **kwargs):
"""Constructor with optional data load.
Parameters
----------
filename : str
name of file to read in, allowed formats are:
* Syscal Pro Export File (*.txt)
* ABEM TXT export file (*.txt) or raw time series)
* Aarhus Workbench processed data (*.tx2)
* res2dinv data
**kwargs:
* paraGeometry : PLC (pygimli mesh holding geometry)
plc for the 2d inversion domain
* paraMesh : pygimli Mesh instance
plc for the 2d inversion domain
* verbose : bool
Be verbose.
"""
self.verbose = kwargs.get('verbose', False)
self.basename = 'base' # for saving results and images
self.figs = {} # figure container
self.header = {} # header for supplemental information
self.t = kwargs.pop('t', []) # time vector
self.data = kwargs.pop('data', None) # data container
self.rhoa = kwargs.pop('rhoa', None) # app. resistivity matrix [Ohm m]
self.MA = kwargs.pop('ma', None) # app. phases matrix [Grad, deg]
self.ERT = None # Resistivity manager class instance
self.sINV = None # single inversion instance
self.pd = None # paraDomain
self.res = None # resistivity
self.m = None # single-time spectral chargeability
self.customParaGeometry = kwargs.pop('paraGeometry', None)
self.customParaMesh = kwargs.pop('paraMesh', None)
if filename is not None:
self.load(filename, **kwargs)
self.data.set('k', pg.geometricFactor(self.data, dim=2))
def __repr__(self): # for print function
"""String representation of the class."""
out = ['TDIP data: ' + self.data.__str__() + ' nt=' + str(len(self.t))]
if hasattr(self, 'header'):
for key in self.header:
val = self.header[key]
if isinstance(val, str):
out.append(val)
elif isinstance(val, int) or isinstance(val, float):
out.append(key+' = '+str(val))
else:
out.append(key+' = array('+str(val.shape)+')')
return "\n".join(out)
[docs] def load(self, filename, **kwargs):
"""Try loading all supported file types."""
self.data, self.MA, self.t, self.header = importTDIPdata(filename)
self.data.checkDataValidity()
self.data.removeInvalid()
self.ensureRhoa()
self.basename = filename[:filename.rfind('.')]
return
assert isinstance(filename, str)
self.data = pb.importData(filename)
self.data.checkDataValidity()
self.data.removeInvalid()
ipkey = ''
testkeys = ['IP_#{}(mV/V)', 'M{}', 'ip{}']
for key in testkeys:
if key.format(1) in self.data.dataMap():
ipkey = key
break
MA = []
i = 1
while self.data.exists(ipkey.format(i)):
ma = self.data(ipkey.format(i))
if max(ma) <= 0:
break
MA.append(ma)
i += 1
self.MA = np.array(MA)
self.t = np.arange(self.MA.shape[0]) + 1 # default: t=gate no
testkeys = ['TM{}']
for key in testkeys:
if self.data.exists(key.format(1)):
tt = [self.data(key.format(i+1))[0] for i in range(
self.MA.shape[0])]
self.t = np.array(tt)
if sum(self.t) > 100:
self.t *= 0.001
self.basename = filename[:filename.rfind('.')]
self.ensureRhoa()
[docs] def ensureRhoa(self):
"""Make sure apparent resistivity is present in file."""
if not self.data.allNonZero('k'):
self.data.set('k', pb.geometricFactors(self.data, 2)) # check dim
if not self.data.allNonZero('rhoa'):
if not self.data.allNonZero('r'):
self.data.set('r', self.data('u')/self.data('i'))
self.data.set('rhoa', self.data('r') * self.data('k'))
[docs] def filter(self, tmin=0, tmax=1e9, kmax=1e6, electrode=None, forward=False,
a=None, b=None, m=None, n=None, ab=None, mn=None, corrSID=1,
nr=[]):
"""Filter data with respect to frequencies and geometric factor.
Parameters
----------
tmin : double
minimum frequency
tmax : double
maximum frequency
kmax : double
maximum (absolute) geometric factor
electrode : int
electrode to be removed completely
a/b/m/n : int
delete data with specific current or potential dipole lengths
ab/mn : int
delete data with specific current or potential dipole lengths
corrSID: int [1]
correct sensor index (like in data files)
"""
print("filtering: nd={:d}, nt={:d}".format(*self.MA.shape))
ind = (self.t >= tmin) & (self.t <= tmax)
self.MA = self.MA[ind, :]
self.t = self.t[ind]
ind = (np.abs(self.data('k')) <= kmax) # maximum geometric factor
ind[nr] = False # insividual numbers
am = self.data("m") - self.data("a")
if ab is not None:
ind[np.isclose(np.abs(self.data("b")-self.data("a")), ab)] = False
if mn is not None:
ind[np.isclose(np.abs(self.data("n")-self.data("m")), mn)] = False
print(sum(ind))
if forward:
ind[am < 0] = False # reverse measurements
print(sum(ind))
for name in ['a', 'b', 'm', 'n']:
u = list(np.atleast_1d(eval(name)))
if electrode is not None:
u.extend(list(np.atleast_1d(electrode)))
for uu in u:
ind = ind & np.not_equal(self.data(name) + corrSID, uu)
self.data.set('valid', pg.RVector(self.data.size()))
self.data.markValid(pg.find(ind))
self.data.removeInvalid()
self.MA = self.MA[:, ind]
if electrode is not None:
self.data.removeUnusedSensors()
print("filtered: nd={:d}, nf={:d}".format(*self.MA.shape))
[docs] def showRhoa(self, **kwargs):
"""Show apparent resistivity."""
kwargs.setdefault('cMap', 'Spectral_r')
kwargs.setdefault('logScale', True)
return pb.show(self.data, **kwargs)
[docs] def showMa(self, nr=0, **kwargs):
"""Show apparent chargeability."""
kwargs.setdefault('cMap', 'plasma')
kwargs.setdefault('logScale', True)
return pb.show(self.data, self.MA[nr], **kwargs)
[docs] def generatePDF(self, rdict=None, mdict=None, **kwargs):
"""Generate a multi-page pdf file with all data as pseudosections."""
if rdict is None:
rdict = dict(logScale=True, cMap='Spectral_r',
label=r'$\rho_a$ [$\Omega$m]', xlabel='x [m]')
if mdict is None:
mdict = dict(cMin=2.5, cMax=250, logScale=True, cMap='plasma',
label=r'$m_a$ [mV/V]', xlabel='x [m]')
mdict.update(**kwargs)
fig, ax = plt.subplots()
with PdfPages(self.basename+'-alldata.pdf') as pdf:
pb.show(self.data, 'rhoa', ax=ax, **rdict)
ax.set_title(r'$\rho_a$ in $\Omega$m')
fig.savefig(pdf, format='pdf')
for i, ma in enumerate(self.MA):
fig.clf()
ax = fig.add_subplot(111)
pb.show(self.data, ma, ax=ax, **mdict)
# tstr = ' (t={:g}-{:g})'.format(*header['ipGateT'][i:i+2])
# ax.set_title(r'$m_a$ in mV/V IP' + str(i+1) + tstr)
fig.savefig(pdf, format='pdf')
[docs] def showDecay(self, nr=[], ab=None, mn=None, verbose=True):
"""Show decay curves."""
data = self.data
if ab:
a = np.minimum(data('a'), data('b'))
b = np.maximum(data('a'), data('b'))
nr.extend(pg.find((a == min(ab)-1) & (b == max(ab)-1)))
if mn:
m = np.minimum(data('m'), data('n'))
n = np.maximum(data('m'), data('n'))
fi = pg.find((m == min(mn)-1) & (n == max(mn)-1))
if ab is not None: # already chose AB dipole => select
nr = np.intersect1d(nr, fi)
else:
nr.extend(fi)
if verbose:
print("nr=", nr)
if nr:
fig, ax = plt.subplots()
for nn in nr:
abmn = [int(self.data(t)[nn]+1) for t in ['a', 'b', 'm', 'n']]
lab = ('abmn: '+'{:d} '*4).format(*abmn)
ax.loglog(self.t, self.MA[:, nn], 'x-', label=lab)
ax.grid(True)
ax.legend()
return ax
[docs] def save(self, filename=None):
"""Save all data in some (yet-to-be-defined or -decided) format."""
pass
[docs] def invertRhoa(self, **kwargs):
"""Invert apparent resistivity values."""
if self.ERT is None:
self.ERT = pb.ERTManager()
self.ERT.setData(self.data)
if 'mesh' in kwargs:
self.ERT.setMesh(kwargs.pop('mesh'))
self.res = self.ERT.invert(**kwargs)
self.coverage = self.ERT.coverageDC()
[docs] def invertMa(self, nr=0, ma=None, **kwargs):
"""Invert for chargeability."""
if ma is None:
ma = self.MA[nr] * 0.001 # think about whether MA should be in V/V
maerr = np.ones_like(ma) * kwargs.pop('error', 0.005)
maerr[ma < 0] = 1e5
maerr[ma > 1] = 1e5
ma[ma < 0] = 0.001
ma[ma > 1] = 0.99
fIP = DCIPMModelling(self.ERT.fop, self.ERT.mesh, self.ERT.resistivity)
if 'regionFile' in kwargs:
fIP.regionManager().loadMap(kwargs.pop('regionFile'))
else:
if self.ERT.fop.regionManager().regionCount() > 1:
fIP.region(1).setBackground(True)
fIP.region(2).setConstraintType(1)
fIP.regionManager().setZWeight(kwargs.pop('zWeight', 1.0))
fIP.createRefinedForwardMesh(True)
tD, tM = pg.RTransLog(), pg.RTransLogLU(0, 0.99)
INV = pg.RInversion(ma, fIP, tD, tM, True, False)
mstart = pg.RVector(len(self.ERT.resistivity), pg.median(ma))
INV.setModel(mstart)
INV.setAbsoluteError(maerr)
INV.setLambda(kwargs.pop('lam', 100))
INV.setRobustData(kwargs.pop('robustData', False))
INV.setBlockyModel(kwargs.pop('blockyModel', False))
self.m = INV.run()
[docs] def individualInversion(self, **kwargs):
"""Carry out individual inversion for spectral chargeability."""
self.invertRhoa(**kwargs)
self.M = np.zeros((len(self.MA), len(self.res)))
for i, ma in enumerate(self.MA):
print('Inverting gate {}'.format(i))
self.invertMa(ma, **kwargs)
self.M[i] = self.m
if __name__ == "__main__":
pass
# from glob import glob
# filenames = glob('example*.dat')+glob('example*.txt')+glob('example*.tx2')
# tdip = TDIPdata() # filenames[2])
# self = tdip
# tdip.load(filenames[2])
# tdip.showRhoa()
# tdip.showMa(10)
# print(tdip)