from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from builtins import zip
from builtins import range
from builtins import object
import pytest
import numpy as np
from numpy.testing import assert_allclose
from pycuda.tools import mark_cuda_test
from ..bls import eebls_gpu, eebls_transit_gpu, \
q_transit, compile_bls, hone_solution,\
single_bls, eebls_gpu_custom, eebls_gpu_fast
[docs]def transit_model(phi0, q, delta, q1=0.):
def model(t, freq, q=q, phi0=phi0, delta=delta):
phi = t * freq - phi0
phi -= np.floor(phi)
if not hasattr(t, '__iter__'):
return -delta if np.absolute(phi) < q else 0
y = np.zeros(len(t))
y[np.absolute(phi) < q] -= delta
return y
return model
[docs]def plot_bls_sol(t, y, dy, freq, q, phi0):
w = np.power(dy, -2)
w /= sum(w)
phi_plot = np.linspace(0, 1, 50./q)
phi = (t * freq)
phi -= np.floor(phi)
dphi = phi - phi0 - np.floor(phi - phi0)
mask = dphi < q
ybt = np.dot(w[mask], y[mask]) / sum(w[mask])
yb0 = np.dot(w[~mask], y[~mask]) / sum(w[~mask])
delta = yb0 - ybt
model = transit_model(phi0, q, delta)
ym = model(phi_plot, 1.) + yb0
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.scatter(phi[~mask], y[~mask], c='k', s=1, alpha=0.1)
ax.scatter(phi[mask], y[mask], c='g', s=1, alpha=0.8)
ax.plot(phi_plot, ym, color='r')
ax.axvline(phi0, color='k', ls=':')
ax.axvline(phi0 + q, color='k', ls=':')
plt.show()
[docs]@pytest.fixture
def data(seed=100, sigma=0.1, ybar=12., snr=10, ndata=200, freq=10.,
q=0.01, phi0=None, baseline=1.):
rand = np.random.RandomState(seed)
if phi0 is None:
phi0 = rand.rand()
delta = snr * sigma / np.sqrt(ndata * q * (1 - q))
model = transit_model(phi0, q, delta)
t = baseline * np.sort(rand.rand(ndata))
y = model(t, freq) + sigma * rand.randn(len(t))
y += ybar - np.mean(y)
err = sigma * np.ones_like(y)
return t, y, err
[docs]def get_total_nbins(nbins0, nbinsf, dlogq):
nbins_tot = 0
while (int(x * nbins0) <= nbinsf):
nb = int(x * nbins0)
x *= 1 + dlogq
nbins_tot += nb
return nbins_tot
[docs]def mod1(x):
return x - np.floor(x)
[docs]def manual_binning(t, y, dy, freqs, nbins0, nbinsf, dlogq,
phi_min, phi_max, noverlap):
"""
for possible tests of the binning procedure. this
method has *not* been tested!
"""
w = np.power(dy, -2)
w /= sum(w)
yw = np.multiply(y, w)
nbins_tot = get_total_nbins(nbins0, nbinsf, dlogq)
yw_bins = np.zeros(nbins_tot * len(freqs) * noverlap)
w_bins = np.zeros(nbins_tot * len(freqs) * noverlap)
dphi = 1. / noverlap
for i, freq in enumerate(freqs):
nb = nbins0
nbtot = 0
x = 1.
while (int(x * nbins0) <= nbinsf):
nb = int(x * nbins0)
x *= 1 + dlogq
q = 1./nb
for s in range(noverlap):
phi = t * freq
bf = np.floor(nb * mod1(phi - s * q * dphi))
bf += i * nbins_tot * noverlap + s * nb + noverlap * nbtot
for b, YW, W in zip(bf[mask], yw[mask], w[mask]):
yw_bins[b] += YW
w_bins[b] += W
nbtot += nb
return yw_bins, w_bins
[docs]class TestBLS(object):
seed = 100
rand = np.random.RandomState(seed)
plot = False
rtol = 1e-3
atol = 1e-5
[docs] @pytest.mark.parametrize("freq", [0.3])
@pytest.mark.parametrize("phi0", [0.0, 0.5])
@pytest.mark.parametrize("dlogq", [0.2, -1])
@pytest.mark.parametrize("nstreams", [1, 3])
@pytest.mark.parametrize("freq_batch_size", [1, 3, None])
def test_transit_parameter_consistency(self, freq, phi0, dlogq, nstreams,
freq_batch_size):
q = q_transit(freq)
t, y, dy = data(snr=30, q=q, phi0=phi0, freq=freq, baseline=365.)
freqs, power, sols = eebls_transit_gpu(t, y, dy,
samples_per_peak=2,
freq_batch_size=freq_batch_size,
nstreams=nstreams,
dlogq=dlogq,
fmin=freq * 0.99,
fmax=freq * 1.01)
pcpu = [single_bls(t, y, dy, x[0], *x[1])
for x in zip(freqs, sols)]
pcpu = np.asarray(pcpu)
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(freqs, pcpu)
ax.plot(freqs, power)
plt.show()
sorted_results = sorted(zip(pcpu, power, freqs, sols),
key=lambda x: -abs(x[1] - x[0]))
for i, (pcs, pgs, freq, (qs, phs)) in enumerate(sorted_results):
if i > 10:
break
print(pcs, pgs, (qs, phs))
if self.plot:
plot_bls_sol(t, y, dy, freq, qs, phs)
pows, diffs = list(zip(*sorted(zip(pcpu,
np.absolute(power - pcpu)),
key=lambda x: -x[1])))
upper_bound = self.rtol * np.array(pows) + self.atol
mostly_ok = sum(np.array(diffs) > upper_bound) / len(pows) < 1e-2
not_too_bad = max(diffs) < 1e-1
print(max(diffs))
assert mostly_ok and not_too_bad
[docs] @pytest.mark.parametrize("freq", [1.0])
@pytest.mark.parametrize("phi_index", [0, 10, -1])
@pytest.mark.parametrize("q_index", [0, 5, -1])
@pytest.mark.parametrize("nstreams", [1, 3])
@pytest.mark.parametrize("freq_batch_size", [1, 3, None])
def test_custom(self, freq, q_index, phi_index, freq_batch_size, nstreams):
q_values = np.logspace(-1.1, -0.8, num=10)
phi_values = np.linspace(0, 1, int(np.ceil(2./min(q_values))))
q = q_values[q_index]
phi = phi_values[phi_index]
t, y, dy = data(snr=10, q=q, phi0=phi, freq=freq,
baseline=365.)
df = min(q_values) / (10 * (max(t) - min(t)))
freqs = np.linspace(freq - 10 * df, freq + 10 * df, 20)
power, gsols = eebls_gpu_custom(t, y, dy, freqs,
q_values, phi_values,
freq_batch_size=freq_batch_size,
nstreams=nstreams)
# Now get CPU values
qgrid, phigrid = np.meshgrid(q_values, phi_values)
bls = np.vectorize(single_bls, excluded=set([0, 1, 2, 3]))
blses = []
max_bls = []
cpu_bls = []
sols = []
for freq, (qg, phg) in zip(freqs, gsols):
p = bls(t, y, dy, freq, qgrid, phigrid)
pc = single_bls(t, y, dy, freq, qg, phg)
mind = np.unravel_index(p.argmax(), p.shape)
qsol = qgrid[mind]
phisol = phigrid[mind]
max_bls.append(p[mind])
cpu_bls.append(pc)
sols.append((qsol, phisol))
blses.append(p)
qsg, phsg = list(zip(*gsols))
qs, phs = list(zip(*sols))
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(freqs, max_bls)
ax.plot(freqs, power)
plt.show()
f, ax = plt.subplots()
ax.plot(freqs, qs)
ax.plot(freqs, qsg)
plt.show()
f, ax = plt.subplots()
ax.plot(freqs, phs)
ax.plot(freqs, phsg)
plt.show()
for bls_arr in [max_bls, cpu_bls]:
max_diffs = 0.5 * self.rtol * (bls_arr + power) + self.atol
diffs = np.absolute(bls_arr - power)
nbad = sum(diffs > max_diffs)
mostly_ok = nbad / float(len(power)) < 1e-2 or nbad == 1
not_too_bad = max(np.absolute(bls_arr - power) / power) < 1e-1
print(max(diffs) / max_diffs[np.argmax(max_diffs)])
if not (mostly_ok and not_too_bad):
print(nbad / float(len(power)), not_too_bad)
import matplotlib.pyplot as plt
plt.plot(freqs, power)
plt.plot(freqs, bls_arr)
plt.show()
assert(mostly_ok and not_too_bad)
[docs] @pytest.mark.parametrize("freq", [1.0])
@pytest.mark.parametrize("phi_index", [0, 10, -1])
@pytest.mark.parametrize("q_index", [0, 5, -1])
@pytest.mark.parametrize("nstreams", [1, 3])
@pytest.mark.parametrize("freq_batch_size", [1, 3, None])
def test_standard(self, freq, q_index, phi_index, nstreams, freq_batch_size):
q_values = np.logspace(-1.5, np.log10(0.1), num=100)
phi_values = np.linspace(0, 1, int(np.ceil(2./min(q_values))))
q = q_values[q_index]
phi = phi_values[phi_index]
t, y, dy = data(snr=10, q=q, phi0=phi, freq=freq,
baseline=365.)
df = min(q_values) / (10 * (max(t) - min(t)))
delta_f = 5 * df / freq
freqs = np.linspace(freq * (1 - delta_f),
(1 + delta_f) * freq,
int(5. * 2 * delta_f * freq / df))
power, gsols = eebls_gpu(t, y, dy, freqs,
qmin=0.1 * q, qmax=2.0 * q,
nstreams=nstreams, noverlap=2, dlogq=0.5,
freq_batch_size=freq_batch_size)
bls_c = [single_bls(t, y, dy, x[0], *x[1]) for x in zip(freqs, gsols)]
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(freqs, bls_c)
ax.plot(freqs, power)
plt.show()
inds = sorted(np.arange(len(power)),
key=lambda i: -abs(power[i] - bls_c[i]))
all_qs, all_phis = zip(*gsols)
for i in inds[:100]:
qs, phis = gsols[i]
print(power[i], bls_c[i], abs(power[i] - bls_c[i]),
qs, phis)
#plot_bls_sol(t, y, dy, freqs[i], qs, phis)
pows, diffs = list(zip(*sorted(zip(bls_c, np.absolute(power - bls_c)),
key=lambda x: -x[1])))
upper_bound = self.rtol * np.array(pows) + self.atol
mostly_ok = sum(np.array(diffs) > upper_bound) / len(pows) < 1e-2
not_too_bad = max(diffs) < 1e-1
print(diffs[0], pows[0])
assert mostly_ok and not_too_bad
# assert_allclose(bls_c, power, rtol=1e-3, atol=1e-5)
[docs] @pytest.mark.parametrize("freq", [1.0])
@pytest.mark.parametrize("dlogq", [0.5, -1.0])
@pytest.mark.parametrize("freq_batch_size", [1, 10, None])
@pytest.mark.parametrize("phi0", [0.0])
@pytest.mark.parametrize("use_fast", [True, False])
@pytest.mark.parametrize("nstreams", [1, 4])
def test_transit(self, freq, use_fast, freq_batch_size, nstreams, phi0, dlogq):
q = q_transit(freq)
samples_per_peak = 2
noverlap = 2
t, y, err = data(snr=10, q=q, phi0=phi0, freq=freq,
baseline=365.)
kw = dict(samples_per_peak=samples_per_peak,
freq_batch_size=freq_batch_size, dlogq=dlogq,
nstreams=nstreams, noverlap=noverlap,
fmin=0.9 * freq, fmax=1.1 * freq,
use_fast=use_fast)
if use_fast:
freqs, power = eebls_transit_gpu(t, y, err, **kw)
kw['use_fast'] = False
freqs, power_slow, sols = eebls_transit_gpu(t, y, err, **kw)
dfsol = freqs[np.argmax(power)] - freqs[np.argmax(power_slow)]
close_enough = abs(dfsol) * (max(t) - min(t)) / q < 3
if not close_enough:
import matplotlib.pyplot as plt
plt.plot(freqs, power, alpha=0.5)
plt.plot(freqs, power_slow, alpha=0.5)
plt.show()
assert(close_enough)
return
freqs, power, sols = eebls_transit_gpu(t, y, err, **kw)
power_cpu = np.array([single_bls(t, y, err, x[0], *x[1]) for x in zip(freqs, sols)])
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(freqs, power_cpu)
ax.plot(freqs, power)
pows, diffs = list(zip(*sorted(zip(power_cpu, power - power_cpu),
key=lambda x: -abs(x[1]))))
print(list(zip(pows[:10], diffs[:10])))
plt.show()
diffs = np.absolute(power - power_cpu)
upper_bound = 1e-3 * np.array(power_cpu) + 1e-5
mostly_ok = sum(np.array(diffs) > upper_bound) / len(diffs) < 1e-2
not_too_bad = max(diffs) < 1e-1
print(max(diffs))
assert mostly_ok and not_too_bad
[docs] @pytest.mark.parametrize("freq", [1.0])
@pytest.mark.parametrize("q", [0.1])
@pytest.mark.parametrize("phi0", [0.0])
@pytest.mark.parametrize("dphi", [0.0, 1.0])
@pytest.mark.parametrize("freq_batch_size", [None, 100])
@pytest.mark.parametrize("dlogq", [0.5, -1.0])
def test_fast_eebls(self, freq, q, phi0, freq_batch_size, dlogq, dphi,
**kwargs):
t, y, err = data(snr=50, q=q, phi0=phi0, freq=freq,
baseline=365.)
df = 0.25 * q / (max(t) - min(t))
fmin = 0.9 * freq
fmax = 1.1 * freq
nf = int(np.ceil((fmax - fmin) / df))
freqs = fmin + df * np.arange(nf)
kw = dict(qmin=1e-2, qmax=0.5, dphi=dphi,
freq_batch_size=freq_batch_size, dlogq=dlogq)
kw.update(kwargs)
power = eebls_gpu_fast(t, y, err, freqs, **kw)
power0, sols = eebls_gpu(t, y, err, freqs, **kw)
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(freqs, power, alpha=0.5)
ax.axvline(freq, ls=':', color='k')
ax.plot(freqs, power0, alpha=0.5)
ax.set_yscale('log')
plt.show()
# this is janky. Need better test
# to ensure we're getting the best results,
# but no apples-to-apples comparison is
# possible for eebls_gpu and eebls_gpu_fast
fmax_fast = freqs[np.argmax(power)]
fmax_regular = freqs[np.argmax(power0)]
assert(abs(fmax_fast - fmax_regular) * (max(t) - min(t)) / q < 3)