Source code for pybert.tdip.tdipdata

from math import pi, sqrt

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

import pygimli as pg
import pybert as pb
from . mipmodelling import DCIPMModelling


from pybert.importer import importAsciiColumns, importRes2dInv


[docs]def importTDIPdata(filename, verbose=False): """Read in TDIP data.""" header = {} ext = filename[filename.rfind('.')+1:] if ext.lower() in ['txt', 'tx2', 'gdd']: data, header = importAsciiColumns(filename, verbose=verbose, return_header=True) elif ext.lower() in ['dat', 'res2dinv']: # what about future BERT format? data, header = importRes2dInv(filename, verbose=verbose, return_header=True) else: data = pb.importData(filename) ipkey = '' testkeys = ['IP_#{}(mV/V)', 'M{}', 'ip{}'] for key in testkeys: if data.exists(key.format(1)): ipkey = key MA = [] i = 1 while data.exists(ipkey.format(i)): ma = data(ipkey.format(i)) if max(ma) <= 0: break MA.append(ma) i += 1 MA = np.array(MA) t = np.arange(MA.shape[0]) + 1 # default: t=gate no if 'ipGateT' in header: t = header['ipGateT'][:-1] + np.diff(header['ipGateT'])/2 testkeys = ['TM{}'] for key in testkeys: if data.exists(key.format(1)): tt = [data(key.format(i+1))[0] for i in range( MA.shape[0])] t = np.array(tt) if sum(t) > 100: t *= 0.001 return data, MA, t, header
[docs]class TDIPdata(): """Class managing time-domain induced polarisation (TDIP) field data.""" def __init__(self, filename=None, **kwargs): """Constructor with optional data load. Parameters ---------- filename : str name of file to read in, allowed formats are: * Syscal Pro Export File (*.txt) * ABEM TXT export file (*.txt) or raw time series) * Aarhus Workbench processed data (*.tx2) * res2dinv data **kwargs: * paraGeometry : PLC (pygimli mesh holding geometry) plc for the 2d inversion domain * paraMesh : pygimli Mesh instance plc for the 2d inversion domain * verbose : bool Be verbose. """ self.verbose = kwargs.get('verbose', False) self.basename = 'base' # for saving results and images self.figs = {} # figure container self.header = {} # header for supplemental information self.t = kwargs.pop('t', []) # time vector self.data = kwargs.pop('data', None) # data container self.rhoa = kwargs.pop('rhoa', None) # app. resistivity matrix [Ohm m] self.MA = kwargs.pop('ma', None) # app. phases matrix [Grad, deg] self.ERT = None # Resistivity manager class instance self.sINV = None # single inversion instance self.pd = None # paraDomain self.res = None # resistivity self.m = None # single-time spectral chargeability self.customParaGeometry = kwargs.pop('paraGeometry', None) self.customParaMesh = kwargs.pop('paraMesh', None) if filename is not None: self.load(filename, **kwargs) self.data.set('k', pg.geometricFactor(self.data, dim=2)) def __repr__(self): # for print function """String representation of the class.""" out = ['TDIP data: ' + self.data.__str__() + ' nt=' + str(len(self.t))] if hasattr(self, 'header'): for key in self.header: val = self.header[key] if isinstance(val, str): out.append(val) elif isinstance(val, int) or isinstance(val, float): out.append(key+' = '+str(val)) else: out.append(key+' = array('+str(val.shape)+')') return "\n".join(out)
[docs] def load(self, filename, **kwargs): """Try loading all supported file types.""" self.data, self.MA, self.t, self.header = importTDIPdata(filename) self.data.checkDataValidity() self.data.removeInvalid() self.ensureRhoa() self.basename = filename[:filename.rfind('.')] return assert isinstance(filename, str) self.data = pb.importData(filename) self.data.checkDataValidity() self.data.removeInvalid() ipkey = '' testkeys = ['IP_#{}(mV/V)', 'M{}', 'ip{}'] for key in testkeys: if key.format(1) in self.data.dataMap(): ipkey = key break MA = [] i = 1 while self.data.exists(ipkey.format(i)): ma = self.data(ipkey.format(i)) if max(ma) <= 0: break MA.append(ma) i += 1 self.MA = np.array(MA) self.t = np.arange(self.MA.shape[0]) + 1 # default: t=gate no testkeys = ['TM{}'] for key in testkeys: if self.data.exists(key.format(1)): tt = [self.data(key.format(i+1))[0] for i in range( self.MA.shape[0])] self.t = np.array(tt) if sum(self.t) > 100: self.t *= 0.001 self.basename = filename[:filename.rfind('.')] self.ensureRhoa()
[docs] def ensureRhoa(self): """Make sure apparent resistivity is present in file.""" if not self.data.allNonZero('k'): self.data.set('k', pb.geometricFactors(self.data, 2)) # check dim if not self.data.allNonZero('rhoa'): if not self.data.allNonZero('r'): self.data.set('r', self.data('u')/self.data('i')) self.data.set('rhoa', self.data('r') * self.data('k'))
[docs] def filter(self, tmin=0, tmax=1e9, kmax=1e6, electrode=None, forward=False, a=None, b=None, m=None, n=None, ab=None, mn=None, corrSID=1, nr=[]): """Filter data with respect to frequencies and geometric factor. Parameters ---------- tmin : double minimum frequency tmax : double maximum frequency kmax : double maximum (absolute) geometric factor electrode : int electrode to be removed completely a/b/m/n : int delete data with specific current or potential dipole lengths ab/mn : int delete data with specific current or potential dipole lengths corrSID: int [1] correct sensor index (like in data files) """ print("filtering: nd={:d}, nt={:d}".format(*self.MA.shape)) ind = (self.t >= tmin) & (self.t <= tmax) self.MA = self.MA[ind, :] self.t = self.t[ind] ind = (np.abs(self.data('k')) <= kmax) # maximum geometric factor ind[nr] = False # insividual numbers am = self.data("m") - self.data("a") if ab is not None: ind[np.isclose(np.abs(self.data("b")-self.data("a")), ab)] = False if mn is not None: ind[np.isclose(np.abs(self.data("n")-self.data("m")), mn)] = False print(sum(ind)) if forward: ind[am < 0] = False # reverse measurements print(sum(ind)) for name in ['a', 'b', 'm', 'n']: u = list(np.atleast_1d(eval(name))) if electrode is not None: u.extend(list(np.atleast_1d(electrode))) for uu in u: ind = ind & np.not_equal(self.data(name) + corrSID, uu) self.data.set('valid', pg.RVector(self.data.size())) self.data.markValid(pg.find(ind)) self.data.removeInvalid() self.MA = self.MA[:, ind] if electrode is not None: self.data.removeUnusedSensors() print("filtered: nd={:d}, nf={:d}".format(*self.MA.shape))
[docs] def showRhoa(self, **kwargs): """Show apparent resistivity.""" kwargs.setdefault('cMap', 'Spectral_r') kwargs.setdefault('logScale', True) return pb.show(self.data, **kwargs)
[docs] def showMa(self, nr=0, **kwargs): """Show apparent chargeability.""" kwargs.setdefault('cMap', 'plasma') kwargs.setdefault('logScale', True) return pb.show(self.data, self.MA[nr], **kwargs)
[docs] def generatePDF(self, rdict=None, mdict=None, **kwargs): """Generate a multi-page pdf file with all data as pseudosections.""" if rdict is None: rdict = dict(logScale=True, cMap='Spectral_r', label=r'$\rho_a$ [$\Omega$m]', xlabel='x [m]') if mdict is None: mdict = dict(cMin=2.5, cMax=250, logScale=True, cMap='plasma', label=r'$m_a$ [mV/V]', xlabel='x [m]') mdict.update(**kwargs) fig, ax = plt.subplots() with PdfPages(self.basename+'-alldata.pdf') as pdf: pb.show(self.data, 'rhoa', ax=ax, **rdict) ax.set_title(r'$\rho_a$ in $\Omega$m') fig.savefig(pdf, format='pdf') for i, ma in enumerate(self.MA): fig.clf() ax = fig.add_subplot(111) pb.show(self.data, ma, ax=ax, **mdict) # tstr = ' (t={:g}-{:g})'.format(*header['ipGateT'][i:i+2]) # ax.set_title(r'$m_a$ in mV/V IP' + str(i+1) + tstr) fig.savefig(pdf, format='pdf')
[docs] def showDecay(self, nr=[], ab=None, mn=None, verbose=True): """Show decay curves.""" data = self.data if ab: a = np.minimum(data('a'), data('b')) b = np.maximum(data('a'), data('b')) nr.extend(pg.find((a == min(ab)-1) & (b == max(ab)-1))) if mn: m = np.minimum(data('m'), data('n')) n = np.maximum(data('m'), data('n')) fi = pg.find((m == min(mn)-1) & (n == max(mn)-1)) if ab is not None: # already chose AB dipole => select nr = np.intersect1d(nr, fi) else: nr.extend(fi) if verbose: print("nr=", nr) if nr: fig, ax = plt.subplots() for nn in nr: abmn = [int(self.data(t)[nn]+1) for t in ['a', 'b', 'm', 'n']] lab = ('abmn: '+'{:d} '*4).format(*abmn) ax.loglog(self.t, self.MA[:, nn], 'x-', label=lab) ax.grid(True) ax.legend() return ax
[docs] def save(self, filename=None): """Save all data in some (yet-to-be-defined or -decided) format.""" pass
[docs] def invertRhoa(self, **kwargs): """Invert apparent resistivity values.""" if self.ERT is None: self.ERT = pb.ERTManager() self.ERT.setData(self.data) if 'mesh' in kwargs: self.ERT.setMesh(kwargs.pop('mesh')) self.res = self.ERT.invert(**kwargs) self.coverage = self.ERT.coverageDC()
[docs] def invertMa(self, nr=0, ma=None, **kwargs): """Invert for chargeability.""" if ma is None: ma = self.MA[nr] * 0.001 # think about whether MA should be in V/V maerr = np.ones_like(ma) * kwargs.pop('error', 0.005) maerr[ma < 0] = 1e5 maerr[ma > 1] = 1e5 ma[ma < 0] = 0.001 ma[ma > 1] = 0.99 fIP = DCIPMModelling(self.ERT.fop, self.ERT.mesh, self.ERT.resistivity) if 'regionFile' in kwargs: fIP.regionManager().loadMap(kwargs.pop('regionFile')) else: if self.ERT.fop.regionManager().regionCount() > 1: fIP.region(1).setBackground(True) fIP.region(2).setConstraintType(1) fIP.regionManager().setZWeight(kwargs.pop('zWeight', 1.0)) fIP.createRefinedForwardMesh(True) tD, tM = pg.RTransLog(), pg.RTransLogLU(0, 0.99) INV = pg.RInversion(ma, fIP, tD, tM, True, False) mstart = pg.RVector(len(self.ERT.resistivity), pg.median(ma)) INV.setModel(mstart) INV.setAbsoluteError(maerr) INV.setLambda(kwargs.pop('lam', 100)) INV.setRobustData(kwargs.pop('robustData', False)) INV.setBlockyModel(kwargs.pop('blockyModel', False)) self.m = INV.run()
[docs] def individualInversion(self, **kwargs): """Carry out individual inversion for spectral chargeability.""" self.invertRhoa(**kwargs) self.M = np.zeros((len(self.MA), len(self.res))) for i, ma in enumerate(self.MA): print('Inverting gate {}'.format(i)) self.invertMa(ma, **kwargs) self.M[i] = self.m
if __name__ == "__main__": pass # from glob import glob # filenames = glob('example*.dat')+glob('example*.txt')+glob('example*.tx2') # tdip = TDIPdata() # filenames[2]) # self = tdip # tdip.load(filenames[2]) # tdip.showRhoa() # tdip.showMa(10) # print(tdip)