# -*- coding: utf-8 -*-
"""Spectral induced polarization (SIP) data handling and inversion."""
# general system imports
import sys
import os.path
from math import pi, sqrt
# numpy and plotting imports
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from matplotlib.colorbar import ColorbarBase
from matplotlib.colors import LogNorm, Normalize
# pygimli library, utility functions and standard (lab) stuff
import pygimli as pg
from pygimli.physics.SIP import SIPSpectrum
from pygimli.physics.SIP.importexport import readSIP256file, fstring
from pygimli.physics.SIP.models import ColeColeRho
from pygimli.mplviewer import setCbarLevels, drawSensors
# pybert stuff for ERT-specific stuff
import pybert as pb
from pybert.data import plotERTData
from pybert.sip.sipmodelling import ERTTLmod, ERTMultiPhimod, DCIPMModelling
from pybert.importer import readAsDictionary
[docs]class SIPdata():
"""Class for managing spectral induced polarisation (SIP) field data."""
def __init__(self, filename=None, **kwargs):
"""Constructor with optional data load.
Parameters
----------
**kwargs:
* paraGeometry : PLC
plc for the 2d inversion domain
* verbose : bool
Be verbose.
* takeall : bool
Don't delete any data while reading res files.
"""
self.verbose = kwargs.get('verbose', False)
# CR: The name of the members should be a little bit more literate.
# TG: I agree, do we have to stick to the pylint/pyflakes conventions?
# CR: would by nice .. btw. we defined own
self.basename = 'base' # for saving results and images
self.figs = {} # figure container
self.freq = kwargs.pop('f', None) # frequency vector
self.RHOA = kwargs.pop('RHOA', None) # app. resistivity matrix [Ohm m]
self.PHIA = kwargs.pop('PHIA', None) # app. phases matrix [Grad, deg]
self.data = kwargs.pop('data', None) # data container
self.ERT = None # Resistivity manager class instance
self.sINV = None # single inversion instance
self.RES = None # matrix of (inverted) resistivities (redundant?)
self.PHI = None # matrix of (inverted) phases
self.pd = None # paraDomain
self.res = None # (single-frequency) resistivity
self.phi = None # (single-frequency) phase
self.coverage = None # coverage vector (from single inversion)
# Cole-Cole model
self.m = None # chargeability (from model spectrum)
self.tau = None # time constant (from model spectrum)
self.c = None # Cole-Cole exponent (from model spectrum)
self.fitChi2 = None # vector of fitting chi^2 for each model cell
self.header = {} # some useful header information (any instrument)
# TODO: SIP256C/D internals (to be removed!)
self.ABMN = None # SIP256 internals, only for display
self.DATA = None # dto.
self.AB = None # dto.
self.RU = None # dto. (will all be thrown away)
self.nc = 0 # number of current injections (redundant)
self.nv = 0 # number of voltage units (SIP256 only)
self.customParaGeometry = kwargs.pop('paraGeometry', None)
self.customParaMesh = kwargs.pop('paraMesh', None)
if filename is not None:
self.load(filename, **kwargs)
self.data.set('k', pg.geometricFactor(self.data, dim=2))
def __repr__(self): # for print function
"""String representation of the class."""
out = ['SIP data: nf=' + str(len(self.freq)) + ' nc=' +
str(self.nc) + ' nv=' + str(self.nv)]
if hasattr(self, 'header'):
for key in self.header:
val = self.header[key]
if isinstance(val, int) or isinstance(val, float):
out.append(key+' = '+str(val))
else:
out.append(key+' = array('+str(val.shape)+')')
return "\n".join(out)
[docs] def load(self, filename, verbose=False, f=None, instr='SIP256',
electrodes=None, takeall=False):
"""Load SIP data from file.
Load SIP data from file. (either Radic RES, MPT or single files)
Parameters
----------
filename: str, [str, ]
single filename, basename or filename list for shm/rhoa/phia
f : array
frequency vector (not in all instrument data files)
instr : str
instrument name (as alternative to the frequency vector)
electrodes : [[x,y],]
Overrides sensor positions
verbose : bool
Be verbose.
takeall : bool
Don't delete any data while reading res files.
"""
if isinstance(filename, list): # data, RHOA and PHIA files
self.data = pb.DataContainerERT(filename[0])
self.RHOA = np.loadtxt(filename[1])
self.PHIA = np.loadtxt(filename[2])
elif isinstance(filename, str):
if filename.lower().rfind('.res') >= 0: # SIP 256 or Fuchs file
self.header, self.DATA, self.AB, self.RU = readSIP256file(
filename, verbose)
self.basename = filename.rstrip('.res').rstrip('.RES')
self.nc = self.header['Number_of_Readings']
self.nv = self.header['Number_of_Remote_Units']
self.organiseSIP256data(electrodes, takeall)
elif (filename.lower().endswith('.mtp') or
filename.endswith('.Data')): # MTP file
self.header = {}
self.loadMPTData(filename)
self.sortFrequencies()
elif os.path.isfile(filename): # full file name
self.data = pb.DataContainerERT(filename)
self.basename = filename[:-4]
else:
self.basename = filename
if os.path.isfile(filename + '.shm'):
self.data = pb.DataContainerERT(filename + '.shm')
self.ABMN = np.column_stack(
(self.data('a'), self.data('b'),
self.data('m'), self.data('n')))
if os.path.isfile(filename + '.rhoa'):
self.RHOA = np.loadtxt(filename + '.rhoa', skiprows=1)
if os.path.isfile(filename + '.phia'):
A = np.loadtxt(filename + '.phia')
self.PHIA = A[1:, :]
self.freq = A[0, :]
self.sortFrequencies()
if f is not None:
self.freq = f
if not hasattr(self, 'freq'):
if instr == 'Fuchs':
stf = 12000./2**np.arange(25)
else:
stf = [1000, 500, 266, 125, 80, 40, 20, 10, 5, 2.5, 1.25,
0.625, 0.3125, 0.156, 0.078, 0.039, 0.02, 0.01, 5e-3,
2.5e-3, 1.25e-3]
self.freq = stf[self.RHOA.shape[1]-1::-1]
if not hasattr(self, 'nc'):
ab = self.data('a')*1000 + self.data('b')
self.nc = len(np.unique(ab))
if not hasattr(self, 'nv'):
mn = self.data('m')*1000 + self.data('n')
self.nv = len(np.unique(mn))
[docs] def loadMPTData(self, filename):
"""Read Multi-phase technology (MPT) phase SIP field data files."""
with open(filename) as fid:
dataact = False
elecact = False
ELEC, DATA = [], []
a, b, m, n = [], [], [], []
elmap = np.arange(256)
elnum = 0
for line in fid:
sp = line.split()
if line.find("#elec_start") >= 0:
elecact = True
if line.find("#elec_end") >= 0:
elecact = False
if elecact and line.lower().find("elec") < 0:
ELEC.append([float(sp[i]) for i in range(1, 5)])
elmap[int(sp[0].split(',')[1])] = elnum
elnum += 1
if line.find("#data_start") >= 0:
dataact = True
if line.find("#data_end") >= 0:
dataact = False
if dataact and line.find("Frequency =") >= 0:
self.freq = np.array(sp[4::5], dtype=np.float)
if (dataact and line.find("!") < 0 and line.find("#") < 0 and
line.find("**") < 0):
a.append(elmap[int(sp[1].split(",")[1])])
b.append(elmap[int(sp[2].split(",")[1])])
m.append(elmap[int(sp[3].split(",")[1])])
n.append(elmap[int(sp[4].split(",")[1])])
# a.append(int(sp[1].split(",")[1]))
# b.append(int(sp[2].split(",")[1]))
# m.append(int(sp[3].split(",")[1]))
# n.append(int(sp[4].split(",")[1]))
DATA.append(np.array(sp[5:-7], dtype=np.float))
DATA = np.array(DATA)
self.data = pb.DataContainerERT()
for elec in ELEC:
self.data.createSensor(pg.RVector3(elec[:3]))
self.data.resize(len(a))
self.data.set("a", pg.RVector(np.asarray(a)))
self.data.set("b", pg.RVector(np.asarray(b)))
self.data.set("m", pg.RVector(np.asarray(m)))
self.data.set("n", pg.RVector(np.asarray(n)))
self.data.set("valid", pg.RVector(self.data.size(), 1))
self.data.set("k", pb.geometricFactors(self.data))
self.basename = filename.rstrip('.mtp').rstrip('.MTP')
self.data.save(self.basename+".shm", "a b m n k")
nf = DATA.shape[1] // 5
kk = np.reshape(self.data('k'), (-1, 1))
self.RHOA = kk * DATA[:, 0:nf*5:5]
self.PHIA = -DATA[:, 2:nf*5+2:5] * 1e-3
[docs] def organiseSIP256data(self, electrodes=None, takeall=False):
"""Builds up empty data container with the quadrupoles.
Parameters
----------
electrode : list [None]
Overwrite the electrodes positions given in the SIP265.res file.
takeall : bool [False]
Don't delete any data while reading res files.
"""
self.freq = []
for line in self.header['FrequencyParameter']:
if (len(line) < 7) or (line[6] == 1):
self.freq.append(line[0].round(3))
self.freq = np.array(self.freq)
# assemble measurement logics
aa, bb, mm, nn, ii, iu = [], [], [], [], [], []
for ir in range(len(self.DATA)):
readings = self.header['Readings'][ir]
leftout = readings[3:]
iA, iB = self.AB[ir]
if ir < len(self.RU):
ru = self.RU[ir]
for iru, _ in enumerate(ru):
iM = ru[iru]
iN = iM + 1
while iN in leftout:
iN += 1
if (iM > iB and iN-iM == iB-iA) or takeall:
aa.append(iA)
bb.append(iB)
mm.append(iM)
nn.append(iN)
ii.append(ir)
iu.append(iru)
self.ABMN = np.column_stack((aa, bb, mm, nn))
# create data container
self.data = pb.DataContainerERT()
for line in self.header['Layout']:
self.data.createSensor(pg.RVector3(line[2], 0., 0.))
if electrodes is not None:
if len(electrodes) == self.data.sensorCount():
self.data.setSensorPositions(electrodes)
else:
print(self.data.sensorCount() != len(electrodes))
raise Exception("Electrode count mismatch. "
"Cannot not overwrite Electrodes.")
self.data.resize(len(aa))
if self.data.size() == 0:
raise Exception("No data found.")
self.data.set('a', pg.RVector(aa)-1) # np.array(aa)-1)
self.data.set('b', pg.RVector(bb)-1)
self.data.set('m', pg.RVector(mm)-1)
self.data.set('n', pg.RVector(nn)-1)
self.data.markValid(self.data('a') > -1)
# assemble data matrices
self.RHOA = np.ones((self.data.size(), len(self.freq))) * np.nan
self.PHIA = np.ones((self.data.size(), len(self.freq))) * np.nan
self.K = np.ones((self.data.size(), len(self.freq))) * np.nan
self.I = np.ones((self.data.size(), len(self.freq))) * np.nan
for i, _ in enumerate(ii):
if ii[i] < len(self.DATA) and iu[i] < len(self.DATA[ii[i]]):
A = self.DATA[ii[i]][iu[i]]
for ifr in range(len(self.freq)):
line = A[A[:, 0].round(3) == self.freq[ifr]]
if len(line):
self.RHOA[i, ifr] = np.abs(line[0][1])
self.PHIA[i, ifr] = -line[0][2] * pi / 180. # grad
self.K[i, ifr] = line[0][7]
self.I[i, ifr] = line[0][6] * 1e-3
else:
print("RU "+str(iu[i])+" not present, RI="+str(ii[i]))
self.sortFrequencies()
# for i, f in enumerate(self.freq):
# self.data.add('rhoa:'+ str(f), self.RHOA[:,i])
# self.data.add('phia:'+ str(f), self.PHIA[:,i])
# self.data.markInvalid(pg.isInfNaN(self.data('rhoa:'+ str(f))))
if electrodes is not None:
self.RHOA = self.RHOA / self.K
# print("correct rho")
# print(self.RHOA[0])
# print(self.K[0])
# print(self.RHOA[0])
for i in range(len(self.K[0])):
self.K[:, i] = pb.geometricFactors(self.data, dim=2)
self.RHOA = abs(self.RHOA * self.K)
# print(self.K[0])
# print(self.RHOA[0])
# exit()
self.RHOA = np.ma.masked_invalid(self.RHOA)
self.PHIA = np.ma.masked_invalid(self.PHIA)
self.data.removeInvalid()
[docs] def addData(self, name):
"""Add data from another file or sip class."""
if isinstance(name, str):
sip2 = SIPdata(name)
else:
sip2 = name
if self.RHOA.shape[1] == sip2.RHOA.shape[1]: # same frequencies
self.data.add(sip2.data)
self.RHOA = np.vstack((self.RHOA, sip2.RHOA))
self.PHIA = np.vstack((self.PHIA, sip2.PHIA))
elif self.RHOA.shape[0] == sip2.RHOA.shape[0]: # same data
self.freq = np.hstack((self.freq, sip2.freq))
self.RHOA = np.hstack((self.RHOA, sip2.RHOA))
self.PHIA = np.hstack((self.PHIA, sip2.PHIA))
self.sortFrequencies()
else:
pg.error("Neither number of data nor frequencies is equal. " +
"Don't know how to combine data.")
[docs] def sortFrequencies(self):
"""Sort frequencies (and data) in increasing order."""
ind = np.argsort(self.freq)
self.freq.sort()
self.RHOA = self.RHOA[:, ind]
self.PHIA = self.PHIA[:, ind]
if hasattr(self, 'K'):
self.K = self.K[:, ind]
if hasattr(self, 'I'):
self.I = self.I[:, ind]
[docs] def filter(self, fmin=0, fmax=1e9, kmax=1e6, electrode=None, forward=False,
a=None, b=None, m=None, n=None, ab=None, mn=None, corrSID=1,
nr=[]):
"""Filter data with respect to frequencies and geometric factor.
Parameters
----------
fmin : double
minimum frequency
fmax : double
maximum frequency
kmax : double
maximum (absolute) geometric factor
electrode : int
electrode to be removed completely
a/b/m/n : int
delete data with specific current or potential dipole lengths
ab/mn : int
delete data with specific current or potential dipole lengths
corrSID: int [1]
correct sensor index (like in data files)
"""
print("filtering: nd={:d}, nf={:d}".format(*self.RHOA.shape))
ind = (self.freq >= fmin) & (self.freq <= fmax)
self.RHOA = self.RHOA[:, ind]
self.PHIA = self.PHIA[:, ind]
if self.RES is not None:
self.RES = self.RES[:, ind]
if self.PHI is not None:
self.PHI = self.PHI[:, ind]
self.freq = self.freq[ind]
ind = (np.abs(self.data('k')) <= kmax) # maximum geometric factor
ind[nr] = False # insividual numbers
am = self.data("m") - self.data("a")
if ab is not None:
ind[np.isclose(np.abs(self.data("b")-self.data("a")), ab)] = False
if mn is not None:
ind[np.isclose(np.abs(self.data("n")-self.data("m")), mn)] = False
print(sum(ind))
if forward:
ind[am < 0] = False # reverse measurements
print(sum(ind))
for name in ['a', 'b', 'm', 'n']:
u = list(np.atleast_1d(eval(name)))
if electrode is not None:
u.extend(list(np.atleast_1d(electrode)))
for uu in u:
ind = ind & np.not_equal(self.data(name) + corrSID, uu)
self.RHOA = self.RHOA[ind, :]
self.PHIA = self.PHIA[ind, :]
self.data.set('valid', pg.RVector(self.data.size()))
self.data.markValid(pg.find(ind))
self.data.removeInvalid()
if electrode is not None:
self.data.removeUnusedSensors()
print("filtered: nd={:d}, nf={:d}".format(*self.RHOA.shape))
[docs] def sortFreq(self):
"""Old version of sortFrequency (for backward compatibility)."""
raise BaseException("sortFreq() deprecated, "
"use sortFrequencies() instead")
[docs] def simulate(self, mesh, rhovec, mvec, tauvec, cvec, **kwargs):
"""Synthetic simulation based on Cole-Cole model."""
if "scheme" in kwargs:
self.data = kwargs["scheme"]
if "fr" in kwargs:
self.freq = kwargs["fr"]
noiseLevel = kwargs.pop('noiseLevel', 0) # Ca: 0.01
noiseAbs = kwargs.pop('noiseAbs', 1e-5) # Ca: 1e-5
verbose = kwargs.pop('verbose', False)
ert = pb.Resistivity()
self.RHOA = np.zeros((self.data.size(), len(self.freq)))
self.PHIA = np.zeros((self.data.size(), len(self.freq)))
for i, fr in enumerate(self.freq):
res = ColeColeRho(fr, rhovec, mvec, tauvec, cvec)
rhoai, phiai = ert.simulate(mesh,
res=res[mesh.cellMarkers()],
scheme=self.data,
noiseLevel=noiseLevel,
noiseAbs=noiseAbs,
returnArray=True)
phiai.setVal(pi - phiai[phiai > pi/2], pg.find(phiai > pi/2))
if verbose:
phi = -np.angle(res)
print('{:d}\t{:5e}\t{:.2f}\t{:.2f}'.format(
i, fr, max(phi)*1000, max(phiai)*1000))
self.RHOA[:, i] = rhoai
self.PHIA[:, i] = phiai
[docs] def saveData(self, basename=None):
"""Save data shm and .rhoa/.phia matrices."""
if basename is not None:
self.basename = basename
self.data.save(self.basename + '.shm', 'a b m n')
self.writeDataMat()
[docs] def writeDataMat(self, fmt='%10.4f'):
"""Output the data as matrices called basename + ending rhoa/phia."""
self.sortFrequencies()
np.savetxt(self.basename + '.rhoa',
np.vstack((self.freq, self.RHOA)), fmt=fmt)
np.savetxt(self.basename + '.phia',
np.vstack((self.freq, self.PHIA)), fmt=fmt)
[docs] def writeAllData(self, floatfmt='%.2f'): # kind of deprecated (early try)
"""Output the data as complete matrices (including ABMN, k and f).
phia = apparent neg. phase (mrad).
"""
header = 'a\tb\tm\tn\tk'
fmt = '%d\t%d\t%d\t%d\t%f'
for i, f in enumerate(self.freq):
fmt += '\t' + floatfmt
header += str('\t' + str(f))
# Looking for first index with valid K values
kIdx = 0
for i in range(len(self.K[0])):
if len(self.K[:, i]) != len(np.where(np.isnan(self.K[:, i]))[0]):
kIdx = i
break
np.savetxt(self.basename + '_rhoa.all',
np.column_stack((self.ABMN, self.K[:, kIdx], self.RHOA)),
fmt=fmt, header=header)
np.savetxt(self.basename + '_phia.all',
np.column_stack((self.ABMN, self.K[:, kIdx],
self.PHIA*1e3)),
fmt=fmt, header=header)
[docs] def singleFrequencyData(self, ifr=0, kmax=None):
"""Return filled ERT data container for one frequency.
ip =neg phase (mrad)."""
if isinstance(ifr, float): # choose closest frequency
ifr = np.argmin(np.abs(self.freq - ifr))
data1 = pb.DataContainerERT(self.data)
data1.set('rhoa', np.array(self.RHOA[:, ifr]))
data1.set('ip', np.array(self.PHIA[:, ifr] * 1000))
if hasattr(self, 'K'):
data1.set('k', np.array(self.K[:, ifr]))
data1.set('r',
np.array(self.RHOA[:, ifr]) / np.array(self.K[:, ifr]))
if hasattr(self, 'I'):
data1.set('i', np.array(self.I[:, ifr]))
data1.set('u', data1('r')*data1('i'))
# DRY: why is here some filtering? .. we have a filter method
if kmax is not None:
data1.markInvalid(pg.abs(data1('k')) > kmax)
data1.checkDataValidity()
data1.removeInvalid()
return data1
[docs] def writeSingleFrequencyData(self, kmax=None):
"""Write single frequency data in unified data format."""
for ifr, fri in enumerate(self.freq):
data1 = self.singleFrequencyData(ifr, kmax=kmax)
data1.checkDataValidity()
if fri > 1.:
fname = '{:02d}-{:d}Hz.ohm'.format(ifr, int(np.round(fri)))
else:
fname = '{:02d}-{:d}mHz.ohm'.format(ifr,
int(np.round(fri*1e3)))
data1.save(self.basename+'_'+fname, 'a b m n rhoa k u i ip')
[docs] def generateSpectraPDF(self, useall=False, maxphi=100., rlim=None,
maxdist=999, figsize=(8.5, 11), **kwargs):
"""Make pdf file containing all spectra."""
pdf = PdfPages(self.basename + '-spectra.pdf')
fig, ax = plt.subplots(figsize=figsize, nrows=2, sharex=True)
colors = 'bgrcmyk'
markers = ('x', 'o', 'v', '^', 's', 'p', '>', '<', '+', 'd')
if hasattr(self, 'DATA'): # original SIP256C data structure
fak = pi*1000./180
for i, Data in enumerate(self.DATA):
act = 0
for j, data in enumerate(Data):
freq = data[:, 0]
rhoa, drhoa = data[:, 1], data[:, 3]
phi, dphi = -data[:, 2]*fak, data[:, 4]*fak
dist = j - self.AB[i][1]
useit = (dist > 0) and (dist < maxdist) or useall
if useit and np.isfinite(phi[0]) and np.isfinite(rhoa[0]):
co = colors[(j-2) % 7]
marker = markers[(j-2) // 7]
ax[0].errorbar(freq, np.abs(rhoa), yerr=drhoa*rhoa/100,
color=co, label=str(j), marker=marker,
**kwargs)
ax[1].errorbar(freq, phi, yerr=dphi,
color=co, marker=marker, label=str(j))
act += 1
if act:
if i == -1:
ax[0].legend(loc='upper right', numpoints=1, ncol=3)
ax[0].set_title(str(self.AB[i][0])+'-'+str(self.AB[i][1]))
ax[0].set_xscale('log')
ax[0].set_yscale('log')
ax[1].set_xscale('log')
ax[0].set_xlim(min(self.freq), max(self.freq))
# ax[0].set_xlabel('f in Hz') # shared x
ax[1].set_xlabel('f in Hz')
ax[0].set_ylabel(r'$\rho_a$ in $\Omega$m')
ax[1].set_ylabel(r'-$\phi_a$ in mrad')
ax[1].set_ylim(0., maxphi)
if rlim is not None:
ax[0].set_ylim(rlim)
ax[0].grid(True)
ax[1].grid(True)
fig.savefig(pdf, format='pdf')
ax[0].cla()
ax[1].cla()
for j in range(2, self.nv):
col = colors[(j-2) % 7]
marker = markers[(j-2) // 7]
ax[0].plot(1, 1, '-', color=col, marker=marker, label='1,1')
ax[0].legend(loc='upper right', numpoints=1, ncol=3)
else:
# %%
cind = np.asarray((self.data('a')+1) * 100 + self.data('b')+1)
for ci in np.unique(cind):
ind = np.nonzero(cind == ci)[0]
for ii in ind:
rhoa, phia = self.RHOA[ii, :], self.PHIA[ii, :]
j = int(self.data('m')[ii])
co = colors[j % 7]
marker = markers[j // 7]
ax[0].semilogx(self.freq, np.abs(rhoa), label=str(j),
color=co, marker=marker, **kwargs)
ax[1].semilogx(self.freq, phia*1000, color=co,
marker=marker, label=str(j))
ax[0].set_yscale('log')
if 'phiScale' in kwargs:
ax[1].set_yscale(kwargs['phiScale'])
ax[0].set_xlim(min(self.freq), max(self.freq))
# ax[0].set_xlabel('f in Hz') # shared x
ax[1].set_xlabel('f in Hz')
ax[0].set_ylabel(r'$\rho_a$ in $\Omega$m')
ax[1].set_ylabel(r'-$\phi_a$ in mrad')
ax[1].set_ylim(0., maxphi)
if rlim is not None:
ax[0].set_ylim(rlim)
ax[0].grid(True)
ax[1].grid(True)
fig.savefig(pdf, format='pdf')
ax[0].cla()
ax[1].cla()
# %%
fig.savefig(pdf, format='pdf')
pdf.close()
[docs] def generateDataPDF(self, kmax=None, ipmin=0, ipmax=None, rmin=None,
rmax=None, figsize=(8, 10), **kwargs):
"""Generate multipage pdf document for all data as pseudosections.
Each page contains app. res. and phase pseudosections for single phase.
Parameters
----------
Colorscales:
rmin : float [minvalues]
minimum apparent resistivity in mrad
rmax : float [maxvalues]
minimum apparent resistivity in mrad
ipmin : float [0]
minimum apparent phase in mrad
ipmax : float [maxvalues]
minimum apparent phase in mrad
figsize : tuple(width, height)
figure size in inches
**kwargs
options to be passed to pb.show()
"""
if self.header is not None:
if 'Layout' in self.header:
xl = self.header['Layout'][[0, -1], 1]
else:
xp = pg.x(self.data.sensorPositions())
xl = [min(xp), max(xp)]
if ipmax is None:
ipmax = np.max(self.PHIA)*0.8*1000
if rmin is None:
rmin = np.min(self.RHOA)
if rmax is None:
rmax = np.max(self.RHOA)
pdf = PdfPages(self.basename + '-data.pdf')
fig, ax = plt.subplots(nrows=2, figsize=figsize, sharex=True)
cb1 = True
cb2 = True
for i, fri in enumerate(self.freq):
if self.RHOA is not None and self.PHIA is not None:
data = self.data
rhoa = self.RHOA[:, i]
phia = self.PHIA[:, i] * 1000.
else:
data = self.singleFrequencyData(fri, kmax=kmax)
rhoa = data('rhoa')
phia = data('ip')
for axi in ax:
axi.clear()
cb1 = plotERTData(data, ax=ax[0], vals=rhoa, cMin=rmin, cMax=rmax,
logScale=True, colorBar=cb1, cmap='viridis',
label=r'apparent resistivity in $\Omega$m',
**kwargs)[1]
cb2 = plotERTData(data, ax=ax[1], vals=phia,
cMin=ipmin, cMax=ipmax,
logScale=False, colorBar=cb2, cmap='viridis',
label='-apparent phase in mrad',
**kwargs)[1]
ax[0].set_title(fstring(fri))
if 0:
ax[0].set_xlim(xl)
plt.pause(0.01)
fig.savefig(pdf, format='pdf')
pdf.close()
[docs] def removeEpsilon(self, mode=2, verbose=True):
"""Remove high-frequency parts by fitting static epsilon."""
we0 = self.freq * 2 * np.pi * 8.854e-12 # Omega epsilon_0
for i in range(self.RHOA.shape[0]):
# imaginary conductivity
sigmai = 1/self.RHOA[i, :] * np.sin(self.PHIA[i, :])
epsr = sigmai / we0 # relative permittivity
if mode == 0:
er = 2*epsr[-1] - epsr[-2] # extrapolation
else:
er = np.mean(epsr[-mode:]) # mean of last ones
sigmai -= max([er, 0]) * we0 # correct for static epsilon term
if verbose:
print(i, er)
self.PHIA[i, :] = np.arcsin(sigmai*self.RHOA[i, :])
delattr(self, 'DATA') # make sure corrected spectra are plotted
[docs] def showSingleFrequencyData(self, fr=0, ax=None, what=None, **kwargs):
"""Show pseudosections of a single frequency."""
if ax is None:
if what is None: # plot both
fig, ax = plt.subplots(nrows=2, sharex=True, sharey=True)
else:
fig, ax = plt.subplots()
data = self.singleFrequencyData(fr)
if hasattr(ax, '__iter__'): # iterable, i.e. 2 axes
pb.show(data, vals=data('rhoa'), ax=ax[0], **kwargs)
pb.show(data, vals=data('ip'), ax=ax[1], **kwargs)
else:
if what is None:
what = 'ip'
pb.show(data, vals=data(what), ax=ax, **kwargs)
return ax
[docs] def showAllFrequencyData(self, **kwargs):
"""Show pseudesections for all data in one plot with subplots."""
fig, ax = plt.subplots(ncols=2, nrows=len(self.freq), figsize=(10, 15),
sharex=True, sharey=True)
fig.subplots_adjust(hspace=0, wspace=0)
for i, f in enumerate(self.freq):
self.showSingleFrequencyData(f, ax=ax[i, :], **kwargs)
[docs] def createERTManager(self, **kwargs):
"""Create an ERT manager to do the ERT inversion with."""
self.ERT = pb.Resistivity(debug=0)
# self.data is strange here
for i, _ in enumerate(self.freq):
try:
self.data.set('rhoa', self.RHOA[:, i].data) # DRY
self.ERT.setData(self.data)
break
except BaseException as e:
print("Failed to set ERTManager data:", e)
if self.customParaMesh is not None:
self.ERT.setMesh(self.customParaMesh, refine=True)
# elif self.customParaGeometry is not None:
else:
kwargs.setdefault('quality', 34.5)
self.ERT.createMesh(plc=self.customParaGeometry, **kwargs)
self.ERT.fop.setVerbose(False)
self.pd = pg.Mesh(self.ERT.fop.regionManager().paraDomain())
self.pd.setCellMarkers(pg.RVector(self.pd.cellCount(), 2))
return self.ERT
[docs] def singleInversion(self, ifr=0, ipError=None, **kwargs):
"""Carry out single-frequency inversion with frequency (number).
Parameters
----------
ifr : int [0]
frequency number
ipError : float
error of ip measurements [10% of median ip data]
lamIP : float [100]
regularization parameter for IP inversion
**kwargs passed to ERT.invert:
* lam : float [20]
regularization parameter
* zWeight : float [0.7]
relative vertical weight
* maxIter : int [20]
maximum iteration number
* robustData : bool [False]
robust data reweighting using an L1 scheme (IRLS reweighting)
* blockyModel : bool [False]
blocky model constraint using L1 reweighting roughness vector
* startModelIsReference : bool [False]
startmodel is the reference model for the inversion
forwarded to createMesh
* depth
* quality
* paraDX
* maxCellArea
"""
if self.verbose:
print("Resistivity inversion")
lamIP = kwargs.pop('lamIP', kwargs.pop('lam', 100))
if self.ERT is None:
if self.verbose:
print("Creating ERT manager.")
self.createERTManager()
if isinstance(ifr, float): # choose closest frequency
ifr = np.argmin(np.abs(self.freq - ifr))
# hack until clearout
if pg.haveInfNaN(self.RHOA[:, ifr].data):
print(self.RHOA[:, ifr].data)
print("Skipping calculation for freq", ifr,
"due to invalid resistivity values.")
return
# hack until clearout
self.data.set('rhoa', self.RHOA[:, ifr].data)
if not self.data.allNonZero('k'):
self.data.set('k', pb.geometricFactor(self.data))
self.data.set('ip', self.PHIA[:, ifr].data)
self.data.set('error', pg.RVector(self.data.size(), 0.03))
self.data.set('err', self.ERT.estimateError(self.data,
absoluteUError=0.0001,
relativeError=0.03))
self.ERT.setData(self.data)
self.res = self.ERT.invert(**kwargs)
if self.verbose:
print("Res:", min(self.res), max(self.res))
self.pd = pg.Mesh(self.ERT.fop.regionManager().paraDomain())
self.pd.setCellMarkers(pg.RVector(self.pd.cellCount(), 2))
# CR why? TG: because in PD they are numbered but needed constant
self.coverage = self.ERT.coverageDC()
# fIP = pg.LinearModelling(self.ERT.mesh, self.ERT.fop.jacobian())
# if self.ERT.fop.regionManager().regionCount():
# if self.ERT.fop.regionManager().regionExists(1):
# self.ERT.fop.region(1).setBackground(True)
fIP = pg.LinearModelling(self.pd, self.ERT.fop.jacobian())
fIP.createRefinedForwardMesh(True)
if self.verbose:
print("IPData:", min(self.data('ip')), max(self.data('ip')))
if min(self.data('ip')) < 0:
# check if ip is in radiant not mrad .. 1000!!!
print("WARNING! found negative phases .. taking abs of ip data.")
rhoai = self.data('rhoa') * pg.sin(pg.abs(self.data('ip')))
else:
# check if ip is in radiant not mrad .. 1000!!!
rhoai = self.data('rhoa') * pg.sin(self.data('ip'))
iIP = pg.RInversion(rhoai, fIP, self.verbose)
iIP.setRecalcJacobian(False)
if ipError is None:
ipError = pg.median(self.data('ip')) * 0.1
iIP.setAbsoluteError(pg.abs(rhoai/self.data('ip') * ipError))
tLog = pg.RTransLog()
iIP.setTransModel(tLog)
iIP.setLambda(lamIP)
zWeight = kwargs.pop('zWeight', 0.3)
if 'zweight' in kwargs:
zWeight = kwargs.pop('zweight', 0.3)
print("zweight option will be removed, Please use zWeight.")
fIP.regionManager().setZWeight(zWeight)
fIP.regionManager().setConstraintType(kwargs.pop('cType', 1))
iIP.setModel(pg.RVector(self.res.size(), pg.median(rhoai)))
if self.verbose:
print("IP inversion")
ipModel = iIP.run()
self.phi = np.arctan2(ipModel, self.res)
iIP.echoStatus()
[docs] def singleMInversion(self, ifr=10, ipError=0.005, **kwargs):
"""Chargeability-based inversion."""
if ifr >= len(self.freq):
ifr = len(self.freq) - 1
ma = pg.asvector(1 - self.RHOA[:, ifr] / self.RHOA[:, 0])
iperr = pg.RVector(self.data.size(), ipError)
mmin, mmax = 0.001, 1.0
if kwargs.pop('verbose', True):
print('discarding min/max', sum(ma < mmin), sum(ma > mmax))
ma[ma < mmin] = mmin
iperr[ma < mmin] = 1e5
ma[ma > mmax] = mmax
iperr[ma > mmax] = 1e5
fIP = DCIPMModelling(self.ERT.fop, self.ERT.fop.mesh(), self.res)
fIP.region(1).setBackground(True)
fIP.region(2).setConstraintType(1)
fIP.region(2).setZWeight(kwargs.pop('zWeight', 0.3))
fIP.createRefinedForwardMesh(True)
tD, tM = pg.RTrans(), pg.RTransLogLU(0.01, 1.0)
INV = pg.RInversion(ma, fIP, tD, tM, True, False)
mstart = pg.RVector(len(self.res), 0.01) # 10 mV/V
INV.setModel(mstart)
INV.setAbsoluteError(iperr)
INV.setLambda(kwargs.pop('lam', 100))
INV.setRobustData(True)
self.m = INV.run()
[docs] def individualInversion(self):
"""Carry out individual inversion for all frequencies ==> .RES."""
nf = len(self.freq)
for i in range(nf):
self.singleInversion(ifr=i)
if i == 0:
self.RES = np.zeros((self.pd.cellCount(), nf))
self.RES[:, i] = self.res
[docs] def showSingleResult(self, res=None, phi=None, ax=None, nr=0,
rmin=None, rmax=None, imax=None, save=None,
**kwargs):
"""Show resistivity and phase from single f inversion."""
if isinstance(res, int):
nr = int(res)
phi = self.PHI[:, nr]
res = self.RES[:, nr]
if res is None:
res = self.res
if phi is None:
phi = self.phi
if ax is None:
fig, ax = plt.subplots(nrows=2) # , sharex=True, sharey=True)
else:
fig = ax[0].figure
pg.show(self.pd, data=res, ax=ax[0], colorBar=True,
logScale=False, cMax=rmax, cMin=rmin,
label=r"Resistivity in $\Omega$m",
coverage=self.coverage)
if phi is None:
raise Exception("no valid phi values found.")
pg.show(self.pd, data=phi*1000., ax=ax[1], colorBar=True,
logScale=False, cMax=imax, cMin=0,
label=r"-$\phi$ in mrad",
coverage=self.coverage, **kwargs)
showElecs = kwargs.pop('showElectrodes', False)
if showElecs:
drawSensors(ax[0], self.data.sensorPositions())
drawSensors(ax[1], self.data.sensorPositions())
if save is True:
save = self.basename + '-Ind{:02d}'.format(nr) + '.pdf'
if type(save) is str:
fig.savefig(save, bbox_inches='tight')
[docs] def simultaneousInversion(self, **kwargs):
"""Carry out both simultaneous resistivity and phase inversions."""
self.simultaneousResistivityInversion(**kwargs)
self.simultaneousPhaseInversion(**kwargs)
[docs] def simultaneousResistivityInversion(self, **kwargs):
"""Carry out simultaneous resistivity inversion of all frequencies."""
self.verbose = kwargs.get('verbose', self.verbose)
if self.ERT is None:
self.ERT = self.createERTManager()
self.res = self.ERT.invert(**kwargs)
nf = self.RHOA.shape[1]
sfwdR = ERTTLmod(nf=nf, data=self.data, mesh=self.ERT.mesh,
rotate=kwargs.pop('rotate', True))
# self.sfwdR = ERTTLmod(nf=nf, fop=self.ERT.fop.mesh(), rotate=True)
sfwdR.createRefinedForwardMesh(False) # make sure things are ok
self.pd = sfwdR.pd2d
self.pd.save(self.basename+'_pd.bms')
if self.verbose:
print(self.data.size()*nf, sfwdR.pd2d.cellCount()*nf)
# there need to be a better preprocessing here to fight nan values
alldata = self.RHOA.flatten(order='F').data
startModel = pg.RVector(sfwdR.nc*nf, np.median(self.RHOA))
# %%
tLog = pg.RTransLog()
self.sINV = pg.RInversion(alldata, sfwdR, tLog, tLog, True)
self.sINV.setRelativeError(0.02)
self.sINV.setModel(startModel)
self.sINV.setLambda(kwargs.pop('lam', 10))
sfwdR.regionManager().setZWeight(kwargs.pop('zWeight', 0.3))
res = self.sINV.run()
self.RES = np.reshape(res, (nf, sfwdR.nc)).T
[docs] def simultaneousPhaseInversion(self, **kwargs):
"""Carry out simultaneous phase inversion of all frequencies."""
# self.verbose = kwargs.get('verbose', self.verbose)
nf = self.RHOA.shape[1]
if self.ERT is None:
self.ERT = self.createERTManager()
self.res = self.ERT.invert(**kwargs)
# nm = self.pd.cellCount()
# for c in self.pd.cells():
# c.setMarker(0)
fwd = ERTMultiPhimod(self.pd, self.ERT.fop.jacobian(), nf,
rotate=kwargs.pop('rotate', True))
fwd.createRefinedForwardMesh(False)
fwd.regionManager().setZWeight(kwargs.pop('zWeight', 0.3))
alldata = pg.RVector(0)
rhoa = self.ERT.inv.response()
if len(rhoa) == 0:
rhoa = self.RHOA[:, 0]
for i in range(nf):
if min(self.PHIA[:, i]) < 0:
print("WARNING! found negative phases "
".. switch to abs of ip data.")
rhoaii = rhoa*pg.sin(np.abs(np.array(self.PHIA[:, i])))
print(len(rhoaii))
alldata = pg.cat(alldata, rhoaii)
print(len(alldata))
error = 0.001 * max(alldata) / alldata + 0.03
error[alldata <= 0] = 100
tD = pg.RTrans()
tM = pg.RTransLogLU(0, max(self.res))
INV = pg.RInversion(pg.abs(alldata), fwd, tD, tM, True)
if hasattr(self, 'phiXX'):
startModel = np.tile(self.phi, nf)
else:
startModel = pg.RVector(fwd.nc * nf, pg.median(alldata))
INV.setModel(startModel)
INV.setReferenceModel(pg.RVector(fwd.nc * nf, pg.median(startModel)))
INV.setRelativeError(pg.abs(error))
INV.setRecalcJacobian(False)
INV.setMaxIter(kwargs.pop('maxIter', 10))
INV.setLambda(kwargs.pop('lam', 50))
allresi = INV.run()
RESI = np.reshape(allresi, (nf, fwd.nc)).T
self.PHI = np.arctan(RESI / np.reshape(self.ERT.resistivity, (-1, 1)))
[docs] def saveResults(self):
"""Save inversion results to .rho and .phi file plus mesh."""
self.pd.save(self.basename+'_pd.bms')
if hasattr(self, 'RES'):
np.savetxt(self.basename+'.rho', self.RES)
if hasattr(self, 'PHI'):
np.savetxt(self.basename+'.phi', self.PHI)
self.saveFit()
[docs] def saveFit(self):
"""Save fitted chargeability, time constant & exponent to file."""
if np.any(self.m) and np.any(self.tau) and np.any(self.c):
np.savetxt(self.basename+'.mtc', np.column_stack(
(self.m, self.tau, self.c, self.fitChi2)))
[docs] def loadFit(self):
"""Load fitted chargeability, time constant & exponent from file."""
self.m, self.tau, self.c, self.fitChi2 = np.loadtxt(
self.basename+'.mtc', unpack=1)
[docs] def loadResults(self, take=0):
"""Load inversion results from file into self.RES/PHI.
Set also single-frequency result (self.res/phi) by index,
maximum (take < 0) or sum (take > nfreq)
"""
self.pd = pg.Mesh(self.basename+'_pd.bms')
self.RES = np.loadtxt(self.basename+'.rho')
self.PHI = np.loadtxt(self.basename+'.phi')
self.chooseResult(take=take)
if os.path.isfile('coverage.vector'):
self.coverage = np.loadtxt('coverage.vector') # really dirty!
[docs] def chooseResult(self, take=0):
"""Choose single-frequency result (self.res/phi) from matrices
self.RES/PHI by index, maximum (take < 0) or sum (take > nfreq)
"""
if take < 0:
self.res = np.max(self.RES, axis=1)
self.phi = np.max(self.PHI, axis=1)
elif take > len(self.freq):
self.res = np.sum(self.RES, axis=1)
self.phi = np.sum(self.PHI, axis=1)
else:
self.res = self.RES[:, take]
self.phi = self.PHI[:, take]
[docs] def printColeColeParameters(self, point):
"""Print Cole-Cole parameters for point or id"""
if isinstance(point, int):
cid = point
else:
cid = self.getCellID(point)
print("Detected ID={:d} for point ({:.1f}, {:.1f})".format(
cid, point[0], point[1]))
fstr = r"rho={:.1f} Ohmm m={:.3f} tau={:3e} s c={:.2f}"
vals = self.res[cid], self.m[cid], self.tau[cid], self.c[cid]
print(fstr.format(*vals))
[docs] def showAllPhases(self, imax=200, figsize=(10, 16), **kwargs):
"""Show all model phases in subplots using the same colorscale."""
cmap = kwargs.pop('cmap', 'viridis')
fig, ax = plt.subplots(nrows=len(self.freq)+1,
sharex=False, figsize=figsize)
fig.subplots_adjust(hspace=0, wspace=0)
self.figs['phases'] = fig
for i in range(len(self.freq)):
pg.show(self.pd, self.PHI[:, i] * 1e3, ax=ax[i], logScale=False,
cMin=0, cMax=imax, cmap=cmap, coverage=self.coverage,
**kwargs)
# if i:
# ax[i].set_xticks([])
# ax[i].set_xlabel('')
icbar = ColorbarBase(ax[-1], norm=Normalize(vmin=0, vmax=imax),
orientation='horizontal', cmap=cmap)
setCbarLevels(icbar, cMin=0, cMax=imax, nLevs=7)
icbar.set_clim(0, imax)
icbar.ax.set_title(r'$\phi$ in mrad')
icbar.ax.set_aspect(1./25)
return fig, ax
[docs] def showAllResistivities(self, figsize=(10, 16), **kwargs):
"""Show model resistivities in subplots using the same colorscale."""
cmap = kwargs.pop('cmap', 'viridis')
cMin = kwargs.pop('cMin', np.min(self.RES))
cMax = kwargs.pop('cMax', np.max(self.RES))
fig, ax = plt.subplots(nrows=len(self.freq)+1,
sharex=False, figsize=figsize)
fig.subplots_adjust(hspace=0, wspace=0)
self.figs['resistivities'] = fig
for i in range(len(self.freq)):
pg.show(self.pd, self.RES[:, i], ax=ax[i], logScale=True,
cmap=cmap, cMin=cMin, cMax=cMax, **kwargs)
# if i:
# ax[i].set_xticks([])
cbar = ColorbarBase(ax[-1], norm=LogNorm(vmin=cMin, vmax=cMax),
orientation='horizontal', cmap=cmap)
setCbarLevels(cbar, cMin=cMin, cMax=cMax, nLevs=7)
cbar.set_clim(cMin, cMax)
cbar.ax.set_title(r'$\rho$ in $\Omega$m')
cbar.ax.set_aspect(1./25)
return fig, ax
[docs] def showAllResults(self, rmin=10, rmax=1000, imax=100, figsize=(10, 16),
**kwargs):
"""Show resistivities and phases next to each other in subplots."""
cmap = kwargs.pop('cmap', 'viridis')
fig, ax = plt.subplots(nrows=len(self.freq) + 1, ncols=2,
figsize=figsize)
fig.subplots_adjust(hspace=0, wspace=0)
self.figs['results'] = fig
showElecs = kwargs.pop('showElectrodes', False)
for i, f in enumerate(self.freq):
pg.show(self.pd, self.RES[:, i], ax=ax[i, 0],
cMin=rmin, cMax=rmax, cmap=cmap,
**kwargs)
pg.show(self.pd, self.PHI[:, i]*1e3, ax=ax[i, 1],
cMin=0, cMax=imax, cmap=cmap, logScale=False,
**kwargs)
if showElecs:
drawSensors(ax[i, 0], self.data.sensorPositions())
drawSensors(ax[i, 1], self.data.sensorPositions())
ax[i, 0].text(ax[i, 0].get_xlim()[0],
ax[i, 0].get_ylim()[0],
'f=' + fstring(f))
if i:
ax[i, 0].set_xticks([])
ax[i, 1].set_xticks([])
rcbar = ColorbarBase(ax[-1, 0], norm=LogNorm(vmin=rmin, vmax=rmax),
orientation='horizontal', cmap=cmap)
setCbarLevels(rcbar, cMin=rmin, cMax=rmax, nLevs=7)
icbar = ColorbarBase(ax[-1, 1], norm=Normalize(vmin=0, vmax=imax),
orientation='horizontal', cmap=cmap)
setCbarLevels(icbar, cMin=0, cMax=imax, nLevs=7)
icbar.set_clim(0, imax)
rcbar.ax.set_title(r'$\rho$ in $\Omega$m')
icbar.ax.set_title(r'$\phi$ in mrad')
rcbar.ax.set_aspect(1./25)
icbar.ax.set_aspect(1./25)
return fig, ax
[docs] def generateResultPDF(self, rmin=10, rmax=1000, imax=200, figsize=(12, 12),
**kwargs):
"""Generate a multipage pdf with rho/phi for each frequency."""
cmap = kwargs.pop('cmap', 'viridis')
pdf = PdfPages(self.basename + '-result.pdf')
fig, ax = plt.subplots(nrows=2, figsize=figsize, sharex=True)
cb1 = True
cb2 = True
for i, fri in enumerate(self.freq):
fstr = '(f={:d} Hz)'.format(int(fri))
if fri < 1.:
fstr = '(f={:d} mHz)'.format(int(fri*1e3))
for axi in ax:
axi.cla()
cb1 = pg.show(self.pd, self.RES[:, i], ax=ax[0],
cMin=rmin, cMax=rmax, cmap=cmap, colorBar=cb1,
label=r'Resistivity in $\Omega$m ' + fstr,
**kwargs)[1]
cb2 = pg.show(self.pd, self.PHI[:, i] * 1e3, ax=ax[1],
cMin=0, cMax=imax, cmap=cmap, colorBar=cb2,
label=r'$\phi$ in mrad ' + fstr, logScale=False,
**kwargs)[1]
fig.savefig(pdf, format='pdf')
pdf.close()
[docs] def getCellID(self, pos):
"""Return cell ID of nearest cell to position."""
return self.pd.findCell(pg.RVector3(*pos)).id()
[docs] def getDataSpectrum(self, dataNo=None, abmn=None):
"""Return SIP spectrum class for single data number."""
if hasattr(abmn, '__iter__'):
bb = pg.abs(self.data('a') - abmn[0]-1) + \
pg.abs(self.data('b') - abmn[1]-1) + \
pg.abs(self.data('m') - abmn[2]-1) + \
pg.abs(self.data('n') - abmn[3]-1)
dataNo = np.argmin(bb)
return SIPSpectrum(f=self.freq, amp=self.RHOA[dataNo, :],
phi=self.PHIA[dataNo, :])
[docs] def getModelSpectrum(self, cellID):
"""Return SIP spectrum for single cell (id or position)."""
if hasattr(cellID, '__iter__'): # tuple
cellID = self.getCellID(cellID)
return SIPSpectrum(f=self.freq, amp=self.RES[cellID, :],
phi=self.PHI[cellID, :])
[docs] def showModelSpectrum(self, cellID, **kwargs):
"""Show SIP spectrum for single cell (id or position)."""
spec = self.getModelSpectrum(cellID)
spec.showData(**kwargs)
[docs] def showModelSpectra(self, positions, **kwargs):
"""Show model spectra for a number of positions or IDs."""
fig, ax = plt.subplots(nrows=2, sharex=True)
LABELS = []
for pos in positions:
label = 'x={:.1f} z={:.1f}'.format(*pos)
LABELS.append(label)
# kwargs['label'] = label
self.showModelSpectrum(pos, ax=ax, **kwargs)
for a in ax:
a.set_ylim(auto=True)
ax[0].set_xlim(min(self.freq), max(self.freq))
ax[0].legend(LABELS, loc='best')
return fig, ax
[docs] def fitAllPhi(self, show=False, **kwargs):
"""Fit all phase spectra by cole-cole models."""
mpar = kwargs.pop('mpar', [0.1, 0, 1])
minf, maxf = min(self.freq), max(self.freq)
taupar = kwargs.pop('mpar', [1./sqrt(minf*maxf), 0.1/maxf, 10/minf])
cpar = kwargs.pop('cpar', [0.25, 0, 1])
ePhi = kwargs.pop('ePhi', 0.001)
nm = self.pd.cellCount()
self.m = np.zeros(nm)
self.tau = np.zeros(nm)
self.c = np.zeros(nm)
self.fitChi2 = np.zeros(nm)
spec = SIPSpectrum(f=self.freq, amp=self.RES[0, :], phi=self.PHI[0, :])
for i in range(nm):
spec.amp = self.RES[i, :]
spec.phi = self.PHI[i, :]
spec.fitCCPhi(ePhi=ePhi, mpar=mpar, taupar=taupar, cpar=cpar)
self.m[i] = spec.mCC[0]
self.tau[i] = spec.mCC[1]
self.c[i] = spec.mCC[2]
self.fitChi2[i] = spec.chi2
if show:
self.showColeColeFit(**kwargs)
[docs] def showColeColeParameters(self, figsize=(8, 12), save=False,
mlim=(None, None), tlim=(None, None),
clim=(0, 0.5), mincov=0.05, **kwargs):
"""Show distribution of Cole-Cole parameters."""
if 'coverage' in kwargs:
coverage = kwargs.pop('coverage')
else:
coverage = 1 / np.sqrt(self.fitChi2)
coverage[coverage > 1] = 1
coverage[coverage < 0] = 0
coverage *= (1 - mincov)
coverage += mincov
fig, ax = plt.subplots(nrows=3, sharex=True, sharey=True,
figsize=figsize)
pg.show(self.pd, self.m, ax=ax[0], logScale=False, colorBar=True,
coverage=coverage, cMin=mlim[0], cMax=mlim[1],
label=r'chargeability $m$ [-]', **kwargs)
pg.show(self.pd, self.tau, ax=ax[1], logScale=True, colorBar=True,
coverage=coverage, cMin=tlim[0], cMax=tlim[1],
label=r'time constant $\tau$ [s]', **kwargs)
pg.show(self.pd, self.c, ax=ax[2], logScale=False, colorBar=True,
cMin=clim[0], cMax=clim[1], coverage=coverage,
label=r'relaxation exponent $c$ [-]', **kwargs)
if save:
fig.savefig(self.basename+'-CCfit.pdf', bbox_inches='tight')
return fig, ax
[docs] def showColeColeFit(self, *args, **kwargs):
"""Redirecto to new name showColeColeParameters."""
return self.showColeColeParameters(*args, **kwargs)
[docs]def importSIP256Test(filename, verbose=False):
"""Read SIP256 file (RES format) and return a DataContainer.
Experimental to be a little bit more flexible
Read SIP256 file (RES format) and return a DataContainer.
Supported: SIP256D
TODO: UNICODE problems with ° sign
TODO: find BEGIN END frequencies bug in fileformat
TODO: read older versions
Parameters
----------
filename: str
*.RES file (SIP256 raw output file)
verbose: bool
Do some output [False].
Returns
-------
data : pg.DataContainer
Examples
--------
data = importSIP256('myfile.res', True)
"""
def readSIP256Freqs_(content, start, endStr):
freqs = []
for i, line in enumerate(content[start:]):
if endStr in line:
break
vals = line.split()
# 20000.00000000 48000.00000000 96 170 1 0.0 1
if int(vals[6]) == 1:
freqs.append(float(vals[0]))
return i+1, freqs
def readSIP256Layout_(content, start, endStr):
sensors = []
for i, line in enumerate(content[start:]):
if endStr in line:
break
vals = line.split()
sensors.append(float(vals[2]))
return i+1, sensors
def readReading_(content, nFreq, start, endStr):
readings = []
# Reading: 1 / RU-A: 1 RU-B: 2
vals = content[start].split('\n')[0].split()
eA = int(vals[4])
eB = int(vals[6])
for i in range(start+1, len(content)):
line = content[i].split('\n')[0]
if endStr in line:
break
if "Remote Unit:" in line:
ru = line.split()[2]
# handle awful file format inconsistencies here
content[i+1] = content[i+1].replace('Frequency /Hz',
'Frequency/Hz')
mat = readAsDictionary(content[i+1:i+2+nFreq])
i += nFreq+1
readings.append(mat)
# print(ru, mat.keys(), mat['K.-F./m'])
# vals = line.split()
# #20000.00000000 48000.00000000 96 170 1 0.0 1
# if int(vals[6]) == 1:
# freqs.append(float(vals[0]))
return i+1, [eA, eB], readings
with open(filename, 'r') as fi:
content = fi.readlines()
data = pb.DataContainerERT()
version = content[0].split()[0]
if version != 'SIP256D':
print("Warning .. maybe wrong format .. until now this importer" +
"only supports SIP256D. Pls. contact Carsten.", version)
nReadings = 0
readings = []
freqs = []
injections = []
for i in range(len(content)):
line = content[i].split('\n')[0]
if '[Number of Readings]' in line:
nReadings = int(line.split(']')[1])
for r in range(nReadings):
readings.append(content[i + 2 + r].split())
i += nReadings + 2
if '[Begin Layout]' in line:
i, sensors = readSIP256Layout_(content, start=i+1,
endStr='[End Layout]')
for i, s in enumerate(sensors):
data.createSensor([s, 0, 0])
if '[Begin FrequencyParameter]' in line:
i, freqs = readSIP256Freqs_(content, start=i+1,
endStr='[End FrequencyParameter]')
if 'Reading:' in line:
i, [eA, eB], inj = readReading_(content, len(freqs), start=i,
endStr='Reading')
if eA != eB:
injections.append([[eA, eB], inj])
print(freqs)
nRU = len(readings[0]) - 3
nElecs = nRU + 1
count = 0
data.add('Ki', pg.RVector(0))
def k(a, b, m, n):
try:
a *= 4
b *= 4
m *= 4
n *= 4
return abs(1./(1./(2*np.pi) * (1./abs(m-a) - 1./abs(m-b) -
1./abs(n-a) + 1./abs(n-b))))
except:
return np.nan
for i, reading in enumerate(injections):
eA = reading[0][0]
eB = reading[0][1]
AB = int((eB-eA))
config = np.array(readings[i], int)
print(i+1, config)
for j, meas in enumerate(reading[1]):
eM = j + 1
eN = j + 2
if eN in config[3:]:
eM = (eN-1)
if eA == 9 and eB == 13 and eN == 3:
print("File Format unknown .. hack here! ask Tino!")
eN = 5
elif eA == 9 and eB == 13 and eN == 4:
print("File Format unknown .. hack here! ask Tino!")
eN = 5
else:
eN = min(nElecs-1, eM + AB)
# print("\t", i+1, eA, eB, eM, eN)
# #eN = j + 1
# print(eA, eB, eM, eN, meas['K.-F./m'][0], readings[i][j+1])
#
# print(reading['K.-F./m'])
# if not np.isnan(meas['K.-F./m'][0]):
if (abs(k(eA, eB, eM, eN) - meas['K.-F./m'][0]) > 1.3):
print(eA, eB, eM, eN, k(eA, eB, eM, eN), meas['K.-F./m'][0],
readings[i][j+1])
if not np.isnan(meas['K.-F./m'][0]):
data.createFourPointData(count, eA-1, eB-1, eM-1, eN-1)
data('Ki')[count] = meas['K.-F./m'][0]
for f in range(len(meas['Frequency/Hz'])):
freq = meas['Frequency/Hz'][f]
rhoaName = 'rhoa:'+str(freq)
if not data.exists(rhoaName):
data.add(rhoaName, pg.RVector(2))
phaseName = 'pa:'+str(freq)
if not data.exists(phaseName):
data.add(phaseName, pg.RVector(2))
iName = 'i:'+str(freq)
if not data.exists(iName):
data.add(iName, pg.RVector(2))
data(phaseName)[count] = meas['PA/'][f]
data(rhoaName)[count] = meas['RA/Ohmm'][f]
data(iName)[count] = meas['IA/mA'][f]
count += 1
data.set("k", pb.geometricFactors(data))
data.resize(count)
return data
[docs]def main(argv):
"""Main."""
sip = SIPdata(argv)
print(sip)
sip.generateSpectraPDF()
sip.generateDataPDF()
sip.writeAllData()
sip.writeDataMat()
sip.writeSingleFrequencyData()
sip.filter(kmax=30000)
sip.singleInversion()
sip.showSingleResult()
sip.simultaneousInversion()
sip.showAllResults()
if __name__ == "__main__":
main(sys.argv[1])
pg.wait()