Source code for pybert.sip.sip

# -*- coding: utf-8 -*-
"""Spectral induced polarization (SIP) data handling and inversion."""

# general system imports
import sys
import os.path
from math import pi, sqrt

# numpy and plotting imports
import numpy as np
import matplotlib.pyplot as plt

from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.colorbar import ColorbarBase
from matplotlib.colors import LogNorm, Normalize

# pygimli library, utility functions and standard (lab) stuff
import pygimli as pg
from pygimli.physics.SIP import SIPSpectrum
from pygimli.physics.SIP.importexport import readSIP256file, fstring
from pygimli.physics.SIP.models import ColeColeRho
from pygimli.mplviewer import setCbarLevels, drawSensors

# pybert stuff for ERT-specific stuff
import pybert as pb
from pybert.data import plotERTData
from pybert.sip.sipmodelling import ERTTLmod, ERTMultiPhimod, DCIPMModelling
from pybert.importer import readAsDictionary


[docs]class SIPdata(): """Class for managing spectral induced polarisation (SIP) field data.""" def __init__(self, filename=None, **kwargs): """Constructor with optional data load. Parameters ---------- **kwargs: * paraGeometry : PLC plc for the 2d inversion domain * verbose : bool Be verbose. * takeall : bool Don't delete any data while reading res files. """ self.verbose = kwargs.get('verbose', False) # CR: The name of the members should be a little bit more literate. # TG: I agree, do we have to stick to the pylint/pyflakes conventions? # CR: would by nice .. btw. we defined own self.basename = 'base' # for saving results and images self.figs = {} # figure container self.freq = kwargs.pop('f', None) # frequency vector self.RHOA = kwargs.pop('RHOA', None) # app. resistivity matrix [Ohm m] self.PHIA = kwargs.pop('PHIA', None) # app. phases matrix [Grad, deg] self.data = kwargs.pop('data', None) # data container self.ERT = None # Resistivity manager class instance self.sINV = None # single inversion instance self.RES = None # matrix of (inverted) resistivities (redundant?) self.PHI = None # matrix of (inverted) phases self.pd = None # paraDomain self.res = None # (single-frequency) resistivity self.phi = None # (single-frequency) phase self.coverage = None # coverage vector (from single inversion) # Cole-Cole model self.m = None # chargeability (from model spectrum) self.tau = None # time constant (from model spectrum) self.c = None # Cole-Cole exponent (from model spectrum) self.fitChi2 = None # vector of fitting chi^2 for each model cell self.header = {} # some useful header information (any instrument) # TODO: SIP256C/D internals (to be removed!) self.ABMN = None # SIP256 internals, only for display self.DATA = None # dto. self.AB = None # dto. self.RU = None # dto. (will all be thrown away) self.nc = 0 # number of current injections (redundant) self.nv = 0 # number of voltage units (SIP256 only) self.customParaGeometry = kwargs.pop('paraGeometry', None) self.customParaMesh = kwargs.pop('paraMesh', None) if filename is not None: self.load(filename, **kwargs) self.data.set('k', pg.geometricFactor(self.data, dim=2)) def __repr__(self): # for print function """String representation of the class.""" out = ['SIP data: nf=' + str(len(self.freq)) + ' nc=' + str(self.nc) + ' nv=' + str(self.nv)] if hasattr(self, 'header'): for key in self.header: val = self.header[key] if isinstance(val, int) or isinstance(val, float): out.append(key+' = '+str(val)) else: out.append(key+' = array('+str(val.shape)+')') return "\n".join(out)
[docs] def load(self, filename, verbose=False, f=None, instr='SIP256', electrodes=None, takeall=False): """Load SIP data from file. Load SIP data from file. (either Radic RES, MPT or single files) Parameters ---------- filename: str, [str, ] single filename, basename or filename list for shm/rhoa/phia f : array frequency vector (not in all instrument data files) instr : str instrument name (as alternative to the frequency vector) electrodes : [[x,y],] Overrides sensor positions verbose : bool Be verbose. takeall : bool Don't delete any data while reading res files. """ if isinstance(filename, list): # data, RHOA and PHIA files self.data = pb.DataContainerERT(filename[0]) self.RHOA = np.loadtxt(filename[1]) self.PHIA = np.loadtxt(filename[2]) elif isinstance(filename, str): if filename.lower().rfind('.res') >= 0: # SIP 256 or Fuchs file self.header, self.DATA, self.AB, self.RU = readSIP256file( filename, verbose) self.basename = filename.rstrip('.res').rstrip('.RES') self.nc = self.header['Number_of_Readings'] self.nv = self.header['Number_of_Remote_Units'] self.organiseSIP256data(electrodes, takeall) elif (filename.lower().endswith('.mtp') or filename.endswith('.Data')): # MTP file self.header = {} self.loadMPTData(filename) self.sortFrequencies() elif os.path.isfile(filename): # full file name self.data = pb.DataContainerERT(filename) self.basename = filename[:-4] else: self.basename = filename if os.path.isfile(filename + '.shm'): self.data = pb.DataContainerERT(filename + '.shm') self.ABMN = np.column_stack( (self.data('a'), self.data('b'), self.data('m'), self.data('n'))) if os.path.isfile(filename + '.rhoa'): self.RHOA = np.loadtxt(filename + '.rhoa', skiprows=1) if os.path.isfile(filename + '.phia'): A = np.loadtxt(filename + '.phia') self.PHIA = A[1:, :] self.freq = A[0, :] self.sortFrequencies() if f is not None: self.freq = f if not hasattr(self, 'freq'): if instr == 'Fuchs': stf = 12000./2**np.arange(25) else: stf = [1000, 500, 266, 125, 80, 40, 20, 10, 5, 2.5, 1.25, 0.625, 0.3125, 0.156, 0.078, 0.039, 0.02, 0.01, 5e-3, 2.5e-3, 1.25e-3] self.freq = stf[self.RHOA.shape[1]-1::-1] if not hasattr(self, 'nc'): ab = self.data('a')*1000 + self.data('b') self.nc = len(np.unique(ab)) if not hasattr(self, 'nv'): mn = self.data('m')*1000 + self.data('n') self.nv = len(np.unique(mn))
[docs] def loadMPTData(self, filename): """Read Multi-phase technology (MPT) phase SIP field data files.""" with open(filename) as fid: dataact = False elecact = False ELEC, DATA = [], [] a, b, m, n = [], [], [], [] elmap = np.arange(256) elnum = 0 for line in fid: sp = line.split() if line.find("#elec_start") >= 0: elecact = True if line.find("#elec_end") >= 0: elecact = False if elecact and line.lower().find("elec") < 0: ELEC.append([float(sp[i]) for i in range(1, 5)]) elmap[int(sp[0].split(',')[1])] = elnum elnum += 1 if line.find("#data_start") >= 0: dataact = True if line.find("#data_end") >= 0: dataact = False if dataact and line.find("Frequency =") >= 0: self.freq = np.array(sp[4::5], dtype=np.float) if (dataact and line.find("!") < 0 and line.find("#") < 0 and line.find("**") < 0): a.append(elmap[int(sp[1].split(",")[1])]) b.append(elmap[int(sp[2].split(",")[1])]) m.append(elmap[int(sp[3].split(",")[1])]) n.append(elmap[int(sp[4].split(",")[1])]) # a.append(int(sp[1].split(",")[1])) # b.append(int(sp[2].split(",")[1])) # m.append(int(sp[3].split(",")[1])) # n.append(int(sp[4].split(",")[1])) DATA.append(np.array(sp[5:-7], dtype=np.float)) DATA = np.array(DATA) self.data = pb.DataContainerERT() for elec in ELEC: self.data.createSensor(pg.RVector3(elec[:3])) self.data.resize(len(a)) self.data.set("a", pg.RVector(np.asarray(a))) self.data.set("b", pg.RVector(np.asarray(b))) self.data.set("m", pg.RVector(np.asarray(m))) self.data.set("n", pg.RVector(np.asarray(n))) self.data.set("valid", pg.RVector(self.data.size(), 1)) self.data.set("k", pb.geometricFactors(self.data)) self.basename = filename.rstrip('.mtp').rstrip('.MTP') self.data.save(self.basename+".shm", "a b m n k") nf = DATA.shape[1] // 5 kk = np.reshape(self.data('k'), (-1, 1)) self.RHOA = kk * DATA[:, 0:nf*5:5] self.PHIA = -DATA[:, 2:nf*5+2:5] * 1e-3
[docs] def organiseSIP256data(self, electrodes=None, takeall=False): """Builds up empty data container with the quadrupoles. Parameters ---------- electrode : list [None] Overwrite the electrodes positions given in the SIP265.res file. takeall : bool [False] Don't delete any data while reading res files. """ self.freq = [] for line in self.header['FrequencyParameter']: if (len(line) < 7) or (line[6] == 1): self.freq.append(line[0].round(3)) self.freq = np.array(self.freq) # assemble measurement logics aa, bb, mm, nn, ii, iu = [], [], [], [], [], [] for ir in range(len(self.DATA)): readings = self.header['Readings'][ir] leftout = readings[3:] iA, iB = self.AB[ir] if ir < len(self.RU): ru = self.RU[ir] for iru, _ in enumerate(ru): iM = ru[iru] iN = iM + 1 while iN in leftout: iN += 1 if (iM > iB and iN-iM == iB-iA) or takeall: aa.append(iA) bb.append(iB) mm.append(iM) nn.append(iN) ii.append(ir) iu.append(iru) self.ABMN = np.column_stack((aa, bb, mm, nn)) # create data container self.data = pb.DataContainerERT() for line in self.header['Layout']: self.data.createSensor(pg.RVector3(line[2], 0., 0.)) if electrodes is not None: if len(electrodes) == self.data.sensorCount(): self.data.setSensorPositions(electrodes) else: print(self.data.sensorCount() != len(electrodes)) raise Exception("Electrode count mismatch. " "Cannot not overwrite Electrodes.") self.data.resize(len(aa)) if self.data.size() == 0: raise Exception("No data found.") self.data.set('a', pg.RVector(aa)-1) # np.array(aa)-1) self.data.set('b', pg.RVector(bb)-1) self.data.set('m', pg.RVector(mm)-1) self.data.set('n', pg.RVector(nn)-1) self.data.markValid(self.data('a') > -1) # assemble data matrices self.RHOA = np.ones((self.data.size(), len(self.freq))) * np.nan self.PHIA = np.ones((self.data.size(), len(self.freq))) * np.nan self.K = np.ones((self.data.size(), len(self.freq))) * np.nan self.I = np.ones((self.data.size(), len(self.freq))) * np.nan for i, _ in enumerate(ii): if ii[i] < len(self.DATA) and iu[i] < len(self.DATA[ii[i]]): A = self.DATA[ii[i]][iu[i]] for ifr in range(len(self.freq)): line = A[A[:, 0].round(3) == self.freq[ifr]] if len(line): self.RHOA[i, ifr] = np.abs(line[0][1]) self.PHIA[i, ifr] = -line[0][2] * pi / 180. # grad self.K[i, ifr] = line[0][7] self.I[i, ifr] = line[0][6] * 1e-3 else: print("RU "+str(iu[i])+" not present, RI="+str(ii[i])) self.sortFrequencies() # for i, f in enumerate(self.freq): # self.data.add('rhoa:'+ str(f), self.RHOA[:,i]) # self.data.add('phia:'+ str(f), self.PHIA[:,i]) # self.data.markInvalid(pg.isInfNaN(self.data('rhoa:'+ str(f)))) if electrodes is not None: self.RHOA = self.RHOA / self.K # print("correct rho") # print(self.RHOA[0]) # print(self.K[0]) # print(self.RHOA[0]) for i in range(len(self.K[0])): self.K[:, i] = pb.geometricFactors(self.data, dim=2) self.RHOA = abs(self.RHOA * self.K) # print(self.K[0]) # print(self.RHOA[0]) # exit() self.RHOA = np.ma.masked_invalid(self.RHOA) self.PHIA = np.ma.masked_invalid(self.PHIA) self.data.removeInvalid()
[docs] def addData(self, name): """Add data from another file or sip class.""" if isinstance(name, str): sip2 = SIPdata(name) else: sip2 = name if self.RHOA.shape[1] == sip2.RHOA.shape[1]: # same frequencies self.data.add(sip2.data) self.RHOA = np.vstack((self.RHOA, sip2.RHOA)) self.PHIA = np.vstack((self.PHIA, sip2.PHIA)) elif self.RHOA.shape[0] == sip2.RHOA.shape[0]: # same data self.freq = np.hstack((self.freq, sip2.freq)) self.RHOA = np.hstack((self.RHOA, sip2.RHOA)) self.PHIA = np.hstack((self.PHIA, sip2.PHIA)) self.sortFrequencies() else: pg.error("Neither number of data nor frequencies is equal. " + "Don't know how to combine data.")
[docs] def sortFrequencies(self): """Sort frequencies (and data) in increasing order.""" ind = np.argsort(self.freq) self.freq.sort() self.RHOA = self.RHOA[:, ind] self.PHIA = self.PHIA[:, ind] if hasattr(self, 'K'): self.K = self.K[:, ind] if hasattr(self, 'I'): self.I = self.I[:, ind]
[docs] def filter(self, fmin=0, fmax=1e9, kmax=1e6, electrode=None, forward=False, a=None, b=None, m=None, n=None, ab=None, mn=None, corrSID=1, nr=[]): """Filter data with respect to frequencies and geometric factor. Parameters ---------- fmin : double minimum frequency fmax : double maximum frequency kmax : double maximum (absolute) geometric factor electrode : int electrode to be removed completely a/b/m/n : int delete data with specific current or potential dipole lengths ab/mn : int delete data with specific current or potential dipole lengths corrSID: int [1] correct sensor index (like in data files) """ print("filtering: nd={:d}, nf={:d}".format(*self.RHOA.shape)) ind = (self.freq >= fmin) & (self.freq <= fmax) self.RHOA = self.RHOA[:, ind] self.PHIA = self.PHIA[:, ind] if self.RES is not None: self.RES = self.RES[:, ind] if self.PHI is not None: self.PHI = self.PHI[:, ind] self.freq = self.freq[ind] ind = (np.abs(self.data('k')) <= kmax) # maximum geometric factor ind[nr] = False # insividual numbers am = self.data("m") - self.data("a") if ab is not None: ind[np.isclose(np.abs(self.data("b")-self.data("a")), ab)] = False if mn is not None: ind[np.isclose(np.abs(self.data("n")-self.data("m")), mn)] = False print(sum(ind)) if forward: ind[am < 0] = False # reverse measurements print(sum(ind)) for name in ['a', 'b', 'm', 'n']: u = list(np.atleast_1d(eval(name))) if electrode is not None: u.extend(list(np.atleast_1d(electrode))) for uu in u: ind = ind & np.not_equal(self.data(name) + corrSID, uu) self.RHOA = self.RHOA[ind, :] self.PHIA = self.PHIA[ind, :] self.data.set('valid', pg.RVector(self.data.size())) self.data.markValid(pg.find(ind)) self.data.removeInvalid() if electrode is not None: self.data.removeUnusedSensors() print("filtered: nd={:d}, nf={:d}".format(*self.RHOA.shape))
[docs] def sortFreq(self): """Old version of sortFrequency (for backward compatibility).""" raise BaseException("sortFreq() deprecated, " "use sortFrequencies() instead")
[docs] def simulate(self, mesh, rhovec, mvec, tauvec, cvec, **kwargs): """Synthetic simulation based on Cole-Cole model.""" if "scheme" in kwargs: self.data = kwargs["scheme"] if "fr" in kwargs: self.freq = kwargs["fr"] noiseLevel = kwargs.pop('noiseLevel', 0) # Ca: 0.01 noiseAbs = kwargs.pop('noiseAbs', 1e-5) # Ca: 1e-5 verbose = kwargs.pop('verbose', False) ert = pb.Resistivity() self.RHOA = np.zeros((self.data.size(), len(self.freq))) self.PHIA = np.zeros((self.data.size(), len(self.freq))) for i, fr in enumerate(self.freq): res = ColeColeRho(fr, rhovec, mvec, tauvec, cvec) rhoai, phiai = ert.simulate(mesh, res=res[mesh.cellMarkers()], scheme=self.data, noiseLevel=noiseLevel, noiseAbs=noiseAbs, returnArray=True) phiai.setVal(pi - phiai[phiai > pi/2], pg.find(phiai > pi/2)) if verbose: phi = -np.angle(res) print('{:d}\t{:5e}\t{:.2f}\t{:.2f}'.format( i, fr, max(phi)*1000, max(phiai)*1000)) self.RHOA[:, i] = rhoai self.PHIA[:, i] = phiai
[docs] def saveData(self, basename=None): """Save data shm and .rhoa/.phia matrices.""" if basename is not None: self.basename = basename self.data.save(self.basename + '.shm', 'a b m n') self.writeDataMat()
[docs] def writeDataMat(self, fmt='%10.4f'): """Output the data as matrices called basename + ending rhoa/phia.""" self.sortFrequencies() np.savetxt(self.basename + '.rhoa', np.vstack((self.freq, self.RHOA)), fmt=fmt) np.savetxt(self.basename + '.phia', np.vstack((self.freq, self.PHIA)), fmt=fmt)
[docs] def writeAllData(self, floatfmt='%.2f'): # kind of deprecated (early try) """Output the data as complete matrices (including ABMN, k and f). phia = apparent neg. phase (mrad). """ header = 'a\tb\tm\tn\tk' fmt = '%d\t%d\t%d\t%d\t%f' for i, f in enumerate(self.freq): fmt += '\t' + floatfmt header += str('\t' + str(f)) # Looking for first index with valid K values kIdx = 0 for i in range(len(self.K[0])): if len(self.K[:, i]) != len(np.where(np.isnan(self.K[:, i]))[0]): kIdx = i break np.savetxt(self.basename + '_rhoa.all', np.column_stack((self.ABMN, self.K[:, kIdx], self.RHOA)), fmt=fmt, header=header) np.savetxt(self.basename + '_phia.all', np.column_stack((self.ABMN, self.K[:, kIdx], self.PHIA*1e3)), fmt=fmt, header=header)
[docs] def singleFrequencyData(self, ifr=0, kmax=None): """Return filled ERT data container for one frequency. ip =neg phase (mrad).""" if isinstance(ifr, float): # choose closest frequency ifr = np.argmin(np.abs(self.freq - ifr)) data1 = pb.DataContainerERT(self.data) data1.set('rhoa', np.array(self.RHOA[:, ifr])) data1.set('ip', np.array(self.PHIA[:, ifr] * 1000)) if hasattr(self, 'K'): data1.set('k', np.array(self.K[:, ifr])) data1.set('r', np.array(self.RHOA[:, ifr]) / np.array(self.K[:, ifr])) if hasattr(self, 'I'): data1.set('i', np.array(self.I[:, ifr])) data1.set('u', data1('r')*data1('i')) # DRY: why is here some filtering? .. we have a filter method if kmax is not None: data1.markInvalid(pg.abs(data1('k')) > kmax) data1.checkDataValidity() data1.removeInvalid() return data1
[docs] def writeSingleFrequencyData(self, kmax=None): """Write single frequency data in unified data format.""" for ifr, fri in enumerate(self.freq): data1 = self.singleFrequencyData(ifr, kmax=kmax) data1.checkDataValidity() if fri > 1.: fname = '{:02d}-{:d}Hz.ohm'.format(ifr, int(np.round(fri))) else: fname = '{:02d}-{:d}mHz.ohm'.format(ifr, int(np.round(fri*1e3))) data1.save(self.basename+'_'+fname, 'a b m n rhoa k u i ip')
[docs] def generateSpectraPDF(self, useall=False, maxphi=100., rlim=None, maxdist=999, figsize=(8.5, 11), **kwargs): """Make pdf file containing all spectra.""" pdf = PdfPages(self.basename + '-spectra.pdf') fig, ax = plt.subplots(figsize=figsize, nrows=2, sharex=True) colors = 'bgrcmyk' markers = ('x', 'o', 'v', '^', 's', 'p', '>', '<', '+', 'd') if hasattr(self, 'DATA'): # original SIP256C data structure fak = pi*1000./180 for i, Data in enumerate(self.DATA): act = 0 for j, data in enumerate(Data): freq = data[:, 0] rhoa, drhoa = data[:, 1], data[:, 3] phi, dphi = -data[:, 2]*fak, data[:, 4]*fak dist = j - self.AB[i][1] useit = (dist > 0) and (dist < maxdist) or useall if useit and np.isfinite(phi[0]) and np.isfinite(rhoa[0]): co = colors[(j-2) % 7] marker = markers[(j-2) // 7] ax[0].errorbar(freq, np.abs(rhoa), yerr=drhoa*rhoa/100, color=co, label=str(j), marker=marker, **kwargs) ax[1].errorbar(freq, phi, yerr=dphi, color=co, marker=marker, label=str(j)) act += 1 if act: if i == -1: ax[0].legend(loc='upper right', numpoints=1, ncol=3) ax[0].set_title(str(self.AB[i][0])+'-'+str(self.AB[i][1])) ax[0].set_xscale('log') ax[0].set_yscale('log') ax[1].set_xscale('log') ax[0].set_xlim(min(self.freq), max(self.freq)) # ax[0].set_xlabel('f in Hz') # shared x ax[1].set_xlabel('f in Hz') ax[0].set_ylabel(r'$\rho_a$ in $\Omega$m') ax[1].set_ylabel(r'-$\phi_a$ in mrad') ax[1].set_ylim(0., maxphi) if rlim is not None: ax[0].set_ylim(rlim) ax[0].grid(True) ax[1].grid(True) fig.savefig(pdf, format='pdf') ax[0].cla() ax[1].cla() for j in range(2, self.nv): col = colors[(j-2) % 7] marker = markers[(j-2) // 7] ax[0].plot(1, 1, '-', color=col, marker=marker, label='1,1') ax[0].legend(loc='upper right', numpoints=1, ncol=3) else: # %% cind = np.asarray((self.data('a')+1) * 100 + self.data('b')+1) for ci in np.unique(cind): ind = np.nonzero(cind == ci)[0] for ii in ind: rhoa, phia = self.RHOA[ii, :], self.PHIA[ii, :] j = int(self.data('m')[ii]) co = colors[j % 7] marker = markers[j // 7] ax[0].semilogx(self.freq, np.abs(rhoa), label=str(j), color=co, marker=marker, **kwargs) ax[1].semilogx(self.freq, phia*1000, color=co, marker=marker, label=str(j)) ax[0].set_yscale('log') if 'phiScale' in kwargs: ax[1].set_yscale(kwargs['phiScale']) ax[0].set_xlim(min(self.freq), max(self.freq)) # ax[0].set_xlabel('f in Hz') # shared x ax[1].set_xlabel('f in Hz') ax[0].set_ylabel(r'$\rho_a$ in $\Omega$m') ax[1].set_ylabel(r'-$\phi_a$ in mrad') ax[1].set_ylim(0., maxphi) if rlim is not None: ax[0].set_ylim(rlim) ax[0].grid(True) ax[1].grid(True) fig.savefig(pdf, format='pdf') ax[0].cla() ax[1].cla() # %% fig.savefig(pdf, format='pdf') pdf.close()
[docs] def generateDataPDF(self, kmax=None, ipmin=0, ipmax=None, rmin=None, rmax=None, figsize=(8, 10), **kwargs): """Generate multipage pdf document for all data as pseudosections. Each page contains app. res. and phase pseudosections for single phase. Parameters ---------- Colorscales: rmin : float [minvalues] minimum apparent resistivity in mrad rmax : float [maxvalues] minimum apparent resistivity in mrad ipmin : float [0] minimum apparent phase in mrad ipmax : float [maxvalues] minimum apparent phase in mrad figsize : tuple(width, height) figure size in inches **kwargs options to be passed to pb.show() """ if self.header is not None: if 'Layout' in self.header: xl = self.header['Layout'][[0, -1], 1] else: xp = pg.x(self.data.sensorPositions()) xl = [min(xp), max(xp)] if ipmax is None: ipmax = np.max(self.PHIA)*0.8*1000 if rmin is None: rmin = np.min(self.RHOA) if rmax is None: rmax = np.max(self.RHOA) pdf = PdfPages(self.basename + '-data.pdf') fig, ax = plt.subplots(nrows=2, figsize=figsize, sharex=True) cb1 = True cb2 = True for i, fri in enumerate(self.freq): if self.RHOA is not None and self.PHIA is not None: data = self.data rhoa = self.RHOA[:, i] phia = self.PHIA[:, i] * 1000. else: data = self.singleFrequencyData(fri, kmax=kmax) rhoa = data('rhoa') phia = data('ip') for axi in ax: axi.clear() cb1 = plotERTData(data, ax=ax[0], vals=rhoa, cMin=rmin, cMax=rmax, logScale=True, colorBar=cb1, cmap='viridis', label=r'apparent resistivity in $\Omega$m', **kwargs)[1] cb2 = plotERTData(data, ax=ax[1], vals=phia, cMin=ipmin, cMax=ipmax, logScale=False, colorBar=cb2, cmap='viridis', label='-apparent phase in mrad', **kwargs)[1] ax[0].set_title(fstring(fri)) if 0: ax[0].set_xlim(xl) plt.pause(0.01) fig.savefig(pdf, format='pdf') pdf.close()
[docs] def removeEpsilon(self, mode=2, verbose=True): """Remove high-frequency parts by fitting static epsilon.""" we0 = self.freq * 2 * np.pi * 8.854e-12 # Omega epsilon_0 for i in range(self.RHOA.shape[0]): # imaginary conductivity sigmai = 1/self.RHOA[i, :] * np.sin(self.PHIA[i, :]) epsr = sigmai / we0 # relative permittivity if mode == 0: er = 2*epsr[-1] - epsr[-2] # extrapolation else: er = np.mean(epsr[-mode:]) # mean of last ones sigmai -= max([er, 0]) * we0 # correct for static epsilon term if verbose: print(i, er) self.PHIA[i, :] = np.arcsin(sigmai*self.RHOA[i, :]) delattr(self, 'DATA') # make sure corrected spectra are plotted
[docs] def showSingleFrequencyData(self, fr=0, ax=None, what=None, **kwargs): """Show pseudosections of a single frequency.""" if ax is None: if what is None: # plot both fig, ax = plt.subplots(nrows=2, sharex=True, sharey=True) else: fig, ax = plt.subplots() data = self.singleFrequencyData(fr) if hasattr(ax, '__iter__'): # iterable, i.e. 2 axes pb.show(data, vals=data('rhoa'), ax=ax[0], **kwargs) pb.show(data, vals=data('ip'), ax=ax[1], **kwargs) else: if what is None: what = 'ip' pb.show(data, vals=data(what), ax=ax, **kwargs) return ax
[docs] def showAllFrequencyData(self, **kwargs): """Show pseudesections for all data in one plot with subplots.""" fig, ax = plt.subplots(ncols=2, nrows=len(self.freq), figsize=(10, 15), sharex=True, sharey=True) fig.subplots_adjust(hspace=0, wspace=0) for i, f in enumerate(self.freq): self.showSingleFrequencyData(f, ax=ax[i, :], **kwargs)
[docs] def createERTManager(self, **kwargs): """Create an ERT manager to do the ERT inversion with.""" self.ERT = pb.Resistivity(debug=0) # self.data is strange here for i, _ in enumerate(self.freq): try: self.data.set('rhoa', self.RHOA[:, i].data) # DRY self.ERT.setData(self.data) break except BaseException as e: print("Failed to set ERTManager data:", e) if self.customParaMesh is not None: self.ERT.setMesh(self.customParaMesh, refine=True) # elif self.customParaGeometry is not None: else: kwargs.setdefault('quality', 34.5) self.ERT.createMesh(plc=self.customParaGeometry, **kwargs) self.ERT.fop.setVerbose(False) self.pd = pg.Mesh(self.ERT.fop.regionManager().paraDomain()) self.pd.setCellMarkers(pg.RVector(self.pd.cellCount(), 2)) return self.ERT
[docs] def singleInversion(self, ifr=0, ipError=None, **kwargs): """Carry out single-frequency inversion with frequency (number). Parameters ---------- ifr : int [0] frequency number ipError : float error of ip measurements [10% of median ip data] lamIP : float [100] regularization parameter for IP inversion **kwargs passed to ERT.invert: * lam : float [20] regularization parameter * zWeight : float [0.7] relative vertical weight * maxIter : int [20] maximum iteration number * robustData : bool [False] robust data reweighting using an L1 scheme (IRLS reweighting) * blockyModel : bool [False] blocky model constraint using L1 reweighting roughness vector * startModelIsReference : bool [False] startmodel is the reference model for the inversion forwarded to createMesh * depth * quality * paraDX * maxCellArea """ if self.verbose: print("Resistivity inversion") lamIP = kwargs.pop('lamIP', kwargs.pop('lam', 100)) if self.ERT is None: if self.verbose: print("Creating ERT manager.") self.createERTManager() if isinstance(ifr, float): # choose closest frequency ifr = np.argmin(np.abs(self.freq - ifr)) # hack until clearout if pg.haveInfNaN(self.RHOA[:, ifr].data): print(self.RHOA[:, ifr].data) print("Skipping calculation for freq", ifr, "due to invalid resistivity values.") return # hack until clearout self.data.set('rhoa', self.RHOA[:, ifr].data) if not self.data.allNonZero('k'): self.data.set('k', pb.geometricFactor(self.data)) self.data.set('ip', self.PHIA[:, ifr].data) self.data.set('error', pg.RVector(self.data.size(), 0.03)) self.data.set('err', self.ERT.estimateError(self.data, absoluteUError=0.0001, relativeError=0.03)) self.ERT.setData(self.data) self.res = self.ERT.invert(**kwargs) if self.verbose: print("Res:", min(self.res), max(self.res)) self.pd = pg.Mesh(self.ERT.fop.regionManager().paraDomain()) self.pd.setCellMarkers(pg.RVector(self.pd.cellCount(), 2)) # CR why? TG: because in PD they are numbered but needed constant self.coverage = self.ERT.coverageDC() # fIP = pg.LinearModelling(self.ERT.mesh, self.ERT.fop.jacobian()) # if self.ERT.fop.regionManager().regionCount(): # if self.ERT.fop.regionManager().regionExists(1): # self.ERT.fop.region(1).setBackground(True) fIP = pg.LinearModelling(self.pd, self.ERT.fop.jacobian()) fIP.createRefinedForwardMesh(True) if self.verbose: print("IPData:", min(self.data('ip')), max(self.data('ip'))) if min(self.data('ip')) < 0: # check if ip is in radiant not mrad .. 1000!!! print("WARNING! found negative phases .. taking abs of ip data.") rhoai = self.data('rhoa') * pg.sin(pg.abs(self.data('ip'))) else: # check if ip is in radiant not mrad .. 1000!!! rhoai = self.data('rhoa') * pg.sin(self.data('ip')) iIP = pg.RInversion(rhoai, fIP, self.verbose) iIP.setRecalcJacobian(False) if ipError is None: ipError = pg.median(self.data('ip')) * 0.1 iIP.setAbsoluteError(pg.abs(rhoai/self.data('ip') * ipError)) tLog = pg.RTransLog() iIP.setTransModel(tLog) iIP.setLambda(lamIP) zWeight = kwargs.pop('zWeight', 0.3) if 'zweight' in kwargs: zWeight = kwargs.pop('zweight', 0.3) print("zweight option will be removed, Please use zWeight.") fIP.regionManager().setZWeight(zWeight) fIP.regionManager().setConstraintType(kwargs.pop('cType', 1)) iIP.setModel(pg.RVector(self.res.size(), pg.median(rhoai))) if self.verbose: print("IP inversion") ipModel = iIP.run() self.phi = np.arctan2(ipModel, self.res) iIP.echoStatus()
[docs] def singleMInversion(self, ifr=10, ipError=0.005, **kwargs): """Chargeability-based inversion.""" if ifr >= len(self.freq): ifr = len(self.freq) - 1 ma = pg.asvector(1 - self.RHOA[:, ifr] / self.RHOA[:, 0]) iperr = pg.RVector(self.data.size(), ipError) mmin, mmax = 0.001, 1.0 if kwargs.pop('verbose', True): print('discarding min/max', sum(ma < mmin), sum(ma > mmax)) ma[ma < mmin] = mmin iperr[ma < mmin] = 1e5 ma[ma > mmax] = mmax iperr[ma > mmax] = 1e5 fIP = DCIPMModelling(self.ERT.fop, self.ERT.fop.mesh(), self.res) fIP.region(1).setBackground(True) fIP.region(2).setConstraintType(1) fIP.region(2).setZWeight(kwargs.pop('zWeight', 0.3)) fIP.createRefinedForwardMesh(True) tD, tM = pg.RTrans(), pg.RTransLogLU(0.01, 1.0) INV = pg.RInversion(ma, fIP, tD, tM, True, False) mstart = pg.RVector(len(self.res), 0.01) # 10 mV/V INV.setModel(mstart) INV.setAbsoluteError(iperr) INV.setLambda(kwargs.pop('lam', 100)) INV.setRobustData(True) self.m = INV.run()
[docs] def individualInversion(self): """Carry out individual inversion for all frequencies ==> .RES.""" nf = len(self.freq) for i in range(nf): self.singleInversion(ifr=i) if i == 0: self.RES = np.zeros((self.pd.cellCount(), nf)) self.RES[:, i] = self.res
[docs] def showSingleResult(self, res=None, phi=None, ax=None, nr=0, rmin=None, rmax=None, imax=None, save=None, **kwargs): """Show resistivity and phase from single f inversion.""" if isinstance(res, int): nr = int(res) phi = self.PHI[:, nr] res = self.RES[:, nr] if res is None: res = self.res if phi is None: phi = self.phi if ax is None: fig, ax = plt.subplots(nrows=2) # , sharex=True, sharey=True) else: fig = ax[0].figure pg.show(self.pd, data=res, ax=ax[0], colorBar=True, logScale=False, cMax=rmax, cMin=rmin, label=r"Resistivity in $\Omega$m", coverage=self.coverage) if phi is None: raise Exception("no valid phi values found.") pg.show(self.pd, data=phi*1000., ax=ax[1], colorBar=True, logScale=False, cMax=imax, cMin=0, label=r"-$\phi$ in mrad", coverage=self.coverage, **kwargs) showElecs = kwargs.pop('showElectrodes', False) if showElecs: drawSensors(ax[0], self.data.sensorPositions()) drawSensors(ax[1], self.data.sensorPositions()) if save is True: save = self.basename + '-Ind{:02d}'.format(nr) + '.pdf' if type(save) is str: fig.savefig(save, bbox_inches='tight')
[docs] def simultaneousInversion(self, **kwargs): """Carry out both simultaneous resistivity and phase inversions.""" self.simultaneousResistivityInversion(**kwargs) self.simultaneousPhaseInversion(**kwargs)
[docs] def simultaneousResistivityInversion(self, **kwargs): """Carry out simultaneous resistivity inversion of all frequencies.""" self.verbose = kwargs.get('verbose', self.verbose) if self.ERT is None: self.ERT = self.createERTManager() self.res = self.ERT.invert(**kwargs) nf = self.RHOA.shape[1] sfwdR = ERTTLmod(nf=nf, data=self.data, mesh=self.ERT.mesh, rotate=kwargs.pop('rotate', True)) # self.sfwdR = ERTTLmod(nf=nf, fop=self.ERT.fop.mesh(), rotate=True) sfwdR.createRefinedForwardMesh(False) # make sure things are ok self.pd = sfwdR.pd2d self.pd.save(self.basename+'_pd.bms') if self.verbose: print(self.data.size()*nf, sfwdR.pd2d.cellCount()*nf) # there need to be a better preprocessing here to fight nan values alldata = self.RHOA.flatten(order='F').data startModel = pg.RVector(sfwdR.nc*nf, np.median(self.RHOA)) # %% tLog = pg.RTransLog() self.sINV = pg.RInversion(alldata, sfwdR, tLog, tLog, True) self.sINV.setRelativeError(0.02) self.sINV.setModel(startModel) self.sINV.setLambda(kwargs.pop('lam', 10)) sfwdR.regionManager().setZWeight(kwargs.pop('zWeight', 0.3)) res = self.sINV.run() self.RES = np.reshape(res, (nf, sfwdR.nc)).T
[docs] def simultaneousPhaseInversion(self, **kwargs): """Carry out simultaneous phase inversion of all frequencies.""" # self.verbose = kwargs.get('verbose', self.verbose) nf = self.RHOA.shape[1] if self.ERT is None: self.ERT = self.createERTManager() self.res = self.ERT.invert(**kwargs) # nm = self.pd.cellCount() # for c in self.pd.cells(): # c.setMarker(0) fwd = ERTMultiPhimod(self.pd, self.ERT.fop.jacobian(), nf, rotate=kwargs.pop('rotate', True)) fwd.createRefinedForwardMesh(False) fwd.regionManager().setZWeight(kwargs.pop('zWeight', 0.3)) alldata = pg.RVector(0) rhoa = self.ERT.inv.response() if len(rhoa) == 0: rhoa = self.RHOA[:, 0] for i in range(nf): if min(self.PHIA[:, i]) < 0: print("WARNING! found negative phases " ".. switch to abs of ip data.") rhoaii = rhoa*pg.sin(np.abs(np.array(self.PHIA[:, i]))) print(len(rhoaii)) alldata = pg.cat(alldata, rhoaii) print(len(alldata)) error = 0.001 * max(alldata) / alldata + 0.03 error[alldata <= 0] = 100 tD = pg.RTrans() tM = pg.RTransLogLU(0, max(self.res)) INV = pg.RInversion(pg.abs(alldata), fwd, tD, tM, True) if hasattr(self, 'phiXX'): startModel = np.tile(self.phi, nf) else: startModel = pg.RVector(fwd.nc * nf, pg.median(alldata)) INV.setModel(startModel) INV.setReferenceModel(pg.RVector(fwd.nc * nf, pg.median(startModel))) INV.setRelativeError(pg.abs(error)) INV.setRecalcJacobian(False) INV.setMaxIter(kwargs.pop('maxIter', 10)) INV.setLambda(kwargs.pop('lam', 50)) allresi = INV.run() RESI = np.reshape(allresi, (nf, fwd.nc)).T self.PHI = np.arctan(RESI / np.reshape(self.ERT.resistivity, (-1, 1)))
[docs] def saveResults(self): """Save inversion results to .rho and .phi file plus mesh.""" self.pd.save(self.basename+'_pd.bms') if hasattr(self, 'RES'): np.savetxt(self.basename+'.rho', self.RES) if hasattr(self, 'PHI'): np.savetxt(self.basename+'.phi', self.PHI) self.saveFit()
[docs] def saveFit(self): """Save fitted chargeability, time constant & exponent to file.""" if np.any(self.m) and np.any(self.tau) and np.any(self.c): np.savetxt(self.basename+'.mtc', np.column_stack( (self.m, self.tau, self.c, self.fitChi2)))
[docs] def loadFit(self): """Load fitted chargeability, time constant & exponent from file.""" self.m, self.tau, self.c, self.fitChi2 = np.loadtxt( self.basename+'.mtc', unpack=1)
[docs] def loadResults(self, take=0): """Load inversion results from file into self.RES/PHI. Set also single-frequency result (self.res/phi) by index, maximum (take < 0) or sum (take > nfreq) """ self.pd = pg.Mesh(self.basename+'_pd.bms') self.RES = np.loadtxt(self.basename+'.rho') self.PHI = np.loadtxt(self.basename+'.phi') self.chooseResult(take=take) if os.path.isfile('coverage.vector'): self.coverage = np.loadtxt('coverage.vector') # really dirty!
[docs] def chooseResult(self, take=0): """Choose single-frequency result (self.res/phi) from matrices self.RES/PHI by index, maximum (take < 0) or sum (take > nfreq) """ if take < 0: self.res = np.max(self.RES, axis=1) self.phi = np.max(self.PHI, axis=1) elif take > len(self.freq): self.res = np.sum(self.RES, axis=1) self.phi = np.sum(self.PHI, axis=1) else: self.res = self.RES[:, take] self.phi = self.PHI[:, take]
[docs] def printColeColeParameters(self, point): """Print Cole-Cole parameters for point or id""" if isinstance(point, int): cid = point else: cid = self.getCellID(point) print("Detected ID={:d} for point ({:.1f}, {:.1f})".format( cid, point[0], point[1])) fstr = r"rho={:.1f} Ohmm m={:.3f} tau={:3e} s c={:.2f}" vals = self.res[cid], self.m[cid], self.tau[cid], self.c[cid] print(fstr.format(*vals))
[docs] def showAllPhases(self, imax=200, figsize=(10, 16), **kwargs): """Show all model phases in subplots using the same colorscale.""" cmap = kwargs.pop('cmap', 'viridis') fig, ax = plt.subplots(nrows=len(self.freq)+1, sharex=False, figsize=figsize) fig.subplots_adjust(hspace=0, wspace=0) self.figs['phases'] = fig for i in range(len(self.freq)): pg.show(self.pd, self.PHI[:, i] * 1e3, ax=ax[i], logScale=False, cMin=0, cMax=imax, cmap=cmap, coverage=self.coverage, **kwargs) # if i: # ax[i].set_xticks([]) # ax[i].set_xlabel('') icbar = ColorbarBase(ax[-1], norm=Normalize(vmin=0, vmax=imax), orientation='horizontal', cmap=cmap) setCbarLevels(icbar, cMin=0, cMax=imax, nLevs=7) icbar.set_clim(0, imax) icbar.ax.set_title(r'$\phi$ in mrad') icbar.ax.set_aspect(1./25) return fig, ax
[docs] def showAllResistivities(self, figsize=(10, 16), **kwargs): """Show model resistivities in subplots using the same colorscale.""" cmap = kwargs.pop('cmap', 'viridis') cMin = kwargs.pop('cMin', np.min(self.RES)) cMax = kwargs.pop('cMax', np.max(self.RES)) fig, ax = plt.subplots(nrows=len(self.freq)+1, sharex=False, figsize=figsize) fig.subplots_adjust(hspace=0, wspace=0) self.figs['resistivities'] = fig for i in range(len(self.freq)): pg.show(self.pd, self.RES[:, i], ax=ax[i], logScale=True, cmap=cmap, cMin=cMin, cMax=cMax, **kwargs) # if i: # ax[i].set_xticks([]) cbar = ColorbarBase(ax[-1], norm=LogNorm(vmin=cMin, vmax=cMax), orientation='horizontal', cmap=cmap) setCbarLevels(cbar, cMin=cMin, cMax=cMax, nLevs=7) cbar.set_clim(cMin, cMax) cbar.ax.set_title(r'$\rho$ in $\Omega$m') cbar.ax.set_aspect(1./25) return fig, ax
[docs] def showAllResults(self, rmin=10, rmax=1000, imax=100, figsize=(10, 16), **kwargs): """Show resistivities and phases next to each other in subplots.""" cmap = kwargs.pop('cmap', 'viridis') fig, ax = plt.subplots(nrows=len(self.freq) + 1, ncols=2, figsize=figsize) fig.subplots_adjust(hspace=0, wspace=0) self.figs['results'] = fig showElecs = kwargs.pop('showElectrodes', False) for i, f in enumerate(self.freq): pg.show(self.pd, self.RES[:, i], ax=ax[i, 0], cMin=rmin, cMax=rmax, cmap=cmap, **kwargs) pg.show(self.pd, self.PHI[:, i]*1e3, ax=ax[i, 1], cMin=0, cMax=imax, cmap=cmap, logScale=False, **kwargs) if showElecs: drawSensors(ax[i, 0], self.data.sensorPositions()) drawSensors(ax[i, 1], self.data.sensorPositions()) ax[i, 0].text(ax[i, 0].get_xlim()[0], ax[i, 0].get_ylim()[0], 'f=' + fstring(f)) if i: ax[i, 0].set_xticks([]) ax[i, 1].set_xticks([]) rcbar = ColorbarBase(ax[-1, 0], norm=LogNorm(vmin=rmin, vmax=rmax), orientation='horizontal', cmap=cmap) setCbarLevels(rcbar, cMin=rmin, cMax=rmax, nLevs=7) icbar = ColorbarBase(ax[-1, 1], norm=Normalize(vmin=0, vmax=imax), orientation='horizontal', cmap=cmap) setCbarLevels(icbar, cMin=0, cMax=imax, nLevs=7) icbar.set_clim(0, imax) rcbar.ax.set_title(r'$\rho$ in $\Omega$m') icbar.ax.set_title(r'$\phi$ in mrad') rcbar.ax.set_aspect(1./25) icbar.ax.set_aspect(1./25) return fig, ax
[docs] def generateResultPDF(self, rmin=10, rmax=1000, imax=200, figsize=(12, 12), **kwargs): """Generate a multipage pdf with rho/phi for each frequency.""" cmap = kwargs.pop('cmap', 'viridis') pdf = PdfPages(self.basename + '-result.pdf') fig, ax = plt.subplots(nrows=2, figsize=figsize, sharex=True) cb1 = True cb2 = True for i, fri in enumerate(self.freq): fstr = '(f={:d} Hz)'.format(int(fri)) if fri < 1.: fstr = '(f={:d} mHz)'.format(int(fri*1e3)) for axi in ax: axi.cla() cb1 = pg.show(self.pd, self.RES[:, i], ax=ax[0], cMin=rmin, cMax=rmax, cmap=cmap, colorBar=cb1, label=r'Resistivity in $\Omega$m ' + fstr, **kwargs)[1] cb2 = pg.show(self.pd, self.PHI[:, i] * 1e3, ax=ax[1], cMin=0, cMax=imax, cmap=cmap, colorBar=cb2, label=r'$\phi$ in mrad ' + fstr, logScale=False, **kwargs)[1] fig.savefig(pdf, format='pdf') pdf.close()
[docs] def getCellID(self, pos): """Return cell ID of nearest cell to position.""" return self.pd.findCell(pg.RVector3(*pos)).id()
[docs] def getDataSpectrum(self, dataNo=None, abmn=None): """Return SIP spectrum class for single data number.""" if hasattr(abmn, '__iter__'): bb = pg.abs(self.data('a') - abmn[0]-1) + \ pg.abs(self.data('b') - abmn[1]-1) + \ pg.abs(self.data('m') - abmn[2]-1) + \ pg.abs(self.data('n') - abmn[3]-1) dataNo = np.argmin(bb) return SIPSpectrum(f=self.freq, amp=self.RHOA[dataNo, :], phi=self.PHIA[dataNo, :])
[docs] def getModelSpectrum(self, cellID): """Return SIP spectrum for single cell (id or position).""" if hasattr(cellID, '__iter__'): # tuple cellID = self.getCellID(cellID) return SIPSpectrum(f=self.freq, amp=self.RES[cellID, :], phi=self.PHI[cellID, :])
[docs] def showModelSpectrum(self, cellID, **kwargs): """Show SIP spectrum for single cell (id or position).""" spec = self.getModelSpectrum(cellID) spec.showData(**kwargs)
[docs] def showModelSpectra(self, positions, **kwargs): """Show model spectra for a number of positions or IDs.""" fig, ax = plt.subplots(nrows=2, sharex=True) LABELS = [] for pos in positions: label = 'x={:.1f} z={:.1f}'.format(*pos) LABELS.append(label) # kwargs['label'] = label self.showModelSpectrum(pos, ax=ax, **kwargs) for a in ax: a.set_ylim(auto=True) ax[0].set_xlim(min(self.freq), max(self.freq)) ax[0].legend(LABELS, loc='best') return fig, ax
[docs] def fitAllPhi(self, show=False, **kwargs): """Fit all phase spectra by cole-cole models.""" mpar = kwargs.pop('mpar', [0.1, 0, 1]) minf, maxf = min(self.freq), max(self.freq) taupar = kwargs.pop('mpar', [1./sqrt(minf*maxf), 0.1/maxf, 10/minf]) cpar = kwargs.pop('cpar', [0.25, 0, 1]) ePhi = kwargs.pop('ePhi', 0.001) nm = self.pd.cellCount() self.m = np.zeros(nm) self.tau = np.zeros(nm) self.c = np.zeros(nm) self.fitChi2 = np.zeros(nm) spec = SIPSpectrum(f=self.freq, amp=self.RES[0, :], phi=self.PHI[0, :]) for i in range(nm): spec.amp = self.RES[i, :] spec.phi = self.PHI[i, :] spec.fitCCPhi(ePhi=ePhi, mpar=mpar, taupar=taupar, cpar=cpar) self.m[i] = spec.mCC[0] self.tau[i] = spec.mCC[1] self.c[i] = spec.mCC[2] self.fitChi2[i] = spec.chi2 if show: self.showColeColeFit(**kwargs)
[docs] def showColeColeParameters(self, figsize=(8, 12), save=False, mlim=(None, None), tlim=(None, None), clim=(0, 0.5), mincov=0.05, **kwargs): """Show distribution of Cole-Cole parameters.""" if 'coverage' in kwargs: coverage = kwargs.pop('coverage') else: coverage = 1 / np.sqrt(self.fitChi2) coverage[coverage > 1] = 1 coverage[coverage < 0] = 0 coverage *= (1 - mincov) coverage += mincov fig, ax = plt.subplots(nrows=3, sharex=True, sharey=True, figsize=figsize) pg.show(self.pd, self.m, ax=ax[0], logScale=False, colorBar=True, coverage=coverage, cMin=mlim[0], cMax=mlim[1], label=r'chargeability $m$ [-]', **kwargs) pg.show(self.pd, self.tau, ax=ax[1], logScale=True, colorBar=True, coverage=coverage, cMin=tlim[0], cMax=tlim[1], label=r'time constant $\tau$ [s]', **kwargs) pg.show(self.pd, self.c, ax=ax[2], logScale=False, colorBar=True, cMin=clim[0], cMax=clim[1], coverage=coverage, label=r'relaxation exponent $c$ [-]', **kwargs) if save: fig.savefig(self.basename+'-CCfit.pdf', bbox_inches='tight') return fig, ax
[docs] def showColeColeFit(self, *args, **kwargs): """Redirecto to new name showColeColeParameters.""" return self.showColeColeParameters(*args, **kwargs)
[docs]def importSIP256Test(filename, verbose=False): """Read SIP256 file (RES format) and return a DataContainer. Experimental to be a little bit more flexible Read SIP256 file (RES format) and return a DataContainer. Supported: SIP256D TODO: UNICODE problems with ° sign TODO: find BEGIN END frequencies bug in fileformat TODO: read older versions Parameters ---------- filename: str *.RES file (SIP256 raw output file) verbose: bool Do some output [False]. Returns ------- data : pg.DataContainer Examples -------- data = importSIP256('myfile.res', True) """ def readSIP256Freqs_(content, start, endStr): freqs = [] for i, line in enumerate(content[start:]): if endStr in line: break vals = line.split() # 20000.00000000 48000.00000000 96 170 1 0.0 1 if int(vals[6]) == 1: freqs.append(float(vals[0])) return i+1, freqs def readSIP256Layout_(content, start, endStr): sensors = [] for i, line in enumerate(content[start:]): if endStr in line: break vals = line.split() sensors.append(float(vals[2])) return i+1, sensors def readReading_(content, nFreq, start, endStr): readings = [] # Reading: 1 / RU-A: 1 RU-B: 2 vals = content[start].split('\n')[0].split() eA = int(vals[4]) eB = int(vals[6]) for i in range(start+1, len(content)): line = content[i].split('\n')[0] if endStr in line: break if "Remote Unit:" in line: ru = line.split()[2] # handle awful file format inconsistencies here content[i+1] = content[i+1].replace('Frequency /Hz', 'Frequency/Hz') mat = readAsDictionary(content[i+1:i+2+nFreq]) i += nFreq+1 readings.append(mat) # print(ru, mat.keys(), mat['K.-F./m']) # vals = line.split() # #20000.00000000 48000.00000000 96 170 1 0.0 1 # if int(vals[6]) == 1: # freqs.append(float(vals[0])) return i+1, [eA, eB], readings with open(filename, 'r') as fi: content = fi.readlines() data = pb.DataContainerERT() version = content[0].split()[0] if version != 'SIP256D': print("Warning .. maybe wrong format .. until now this importer" + "only supports SIP256D. Pls. contact Carsten.", version) nReadings = 0 readings = [] freqs = [] injections = [] for i in range(len(content)): line = content[i].split('\n')[0] if '[Number of Readings]' in line: nReadings = int(line.split(']')[1]) for r in range(nReadings): readings.append(content[i + 2 + r].split()) i += nReadings + 2 if '[Begin Layout]' in line: i, sensors = readSIP256Layout_(content, start=i+1, endStr='[End Layout]') for i, s in enumerate(sensors): data.createSensor([s, 0, 0]) if '[Begin FrequencyParameter]' in line: i, freqs = readSIP256Freqs_(content, start=i+1, endStr='[End FrequencyParameter]') if 'Reading:' in line: i, [eA, eB], inj = readReading_(content, len(freqs), start=i, endStr='Reading') if eA != eB: injections.append([[eA, eB], inj]) print(freqs) nRU = len(readings[0]) - 3 nElecs = nRU + 1 count = 0 data.add('Ki', pg.RVector(0)) def k(a, b, m, n): try: a *= 4 b *= 4 m *= 4 n *= 4 return abs(1./(1./(2*np.pi) * (1./abs(m-a) - 1./abs(m-b) - 1./abs(n-a) + 1./abs(n-b)))) except: return np.nan for i, reading in enumerate(injections): eA = reading[0][0] eB = reading[0][1] AB = int((eB-eA)) config = np.array(readings[i], int) print(i+1, config) for j, meas in enumerate(reading[1]): eM = j + 1 eN = j + 2 if eN in config[3:]: eM = (eN-1) if eA == 9 and eB == 13 and eN == 3: print("File Format unknown .. hack here! ask Tino!") eN = 5 elif eA == 9 and eB == 13 and eN == 4: print("File Format unknown .. hack here! ask Tino!") eN = 5 else: eN = min(nElecs-1, eM + AB) # print("\t", i+1, eA, eB, eM, eN) # #eN = j + 1 # print(eA, eB, eM, eN, meas['K.-F./m'][0], readings[i][j+1]) # # print(reading['K.-F./m']) # if not np.isnan(meas['K.-F./m'][0]): if (abs(k(eA, eB, eM, eN) - meas['K.-F./m'][0]) > 1.3): print(eA, eB, eM, eN, k(eA, eB, eM, eN), meas['K.-F./m'][0], readings[i][j+1]) if not np.isnan(meas['K.-F./m'][0]): data.createFourPointData(count, eA-1, eB-1, eM-1, eN-1) data('Ki')[count] = meas['K.-F./m'][0] for f in range(len(meas['Frequency/Hz'])): freq = meas['Frequency/Hz'][f] rhoaName = 'rhoa:'+str(freq) if not data.exists(rhoaName): data.add(rhoaName, pg.RVector(2)) phaseName = 'pa:'+str(freq) if not data.exists(phaseName): data.add(phaseName, pg.RVector(2)) iName = 'i:'+str(freq) if not data.exists(iName): data.add(iName, pg.RVector(2)) data(phaseName)[count] = meas['PA/'][f] data(rhoaName)[count] = meas['RA/Ohmm'][f] data(iName)[count] = meas['IA/mA'][f] count += 1 data.set("k", pb.geometricFactors(data)) data.resize(count) return data
[docs]def main(argv): """Main.""" sip = SIPdata(argv) print(sip) sip.generateSpectraPDF() sip.generateDataPDF() sip.writeAllData() sip.writeDataMat() sip.writeSingleFrequencyData() sip.filter(kmax=30000) sip.singleInversion() sip.showSingleResult() sip.simultaneousInversion() sip.showAllResults()
if __name__ == "__main__": main(sys.argv[1]) pg.wait()