from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from builtins import zip
from builtins import range
from builtins import object
import pytest
from pycuda.tools import mark_cuda_test
import numpy as np
from numpy.testing import assert_allclose
from ..ce import ConditionalEntropyAsyncProcess
lsrtol = 1E-2
lsatol = 1E-5
seed = 100
rand = np.random.RandomState(seed)
[docs]@pytest.fixture
def data(sigma=0.1, ndata=500, freq=3., snr=1000, t0=0.):
t = np.sort(rand.rand(ndata)) + t0
y = snr * sigma * np.cos(2 * np.pi * freq * t) / np.sqrt(len(t))
y += sigma * rand.randn(len(t))
err = sigma * np.ones_like(y)
return t, y, err
[docs]def assert_similar(pdg0, pdg, top=5):
inds = (np.argsort(pdg0)[::-1])[:top]
p0 = np.asarray(pdg0)[inds]
p = np.asarray(pdg)[inds]
diff = np.absolute(p - p0)
assert(all(diff < lsrtol * 0.5 * (p + p0) + lsatol))
[docs]class TestCE(object):
plot = False
[docs] @pytest.mark.parametrize('ndatas', [1, 5, 10])
def test_multiple_datasets(self, ndatas, **kwargs):
datas = [data() for i in range(ndatas)]
proc = ConditionalEntropyAsyncProcess(**kwargs)
df = 0.02
max_freq = 1.1
min_freq = 0.9
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
mult_results = proc.run(datas, freqs=freqs)
proc.finish()
sing_results = []
for d in datas:
sing_results.extend(proc.run([d], freqs=freqs))
proc.finish()
for rb, rnb in zip(mult_results, sing_results):
fb, pb = rb
fnb, pnb = rnb
assert(not any(np.isnan(pb)))
assert(not any(np.isnan(pnb)))
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)
[docs] @pytest.mark.parametrize('ndatas', [1, 7])
@pytest.mark.parametrize('batch_size', [1, 3])
@pytest.mark.parametrize('use_double', [True, False])
@pytest.mark.parametrize('use_fast,weighted,shmem_lc,freq_batch_size',
[(True, False, False, 1),
(True, False, True, None),
(False, True, False, None),
(False, False, False, None)])
@pytest.mark.parametrize('phase_bins,phase_overlap',
[(10, 1)])
@pytest.mark.parametrize('mag_bins,mag_overlap',
[(5, 0)])
def test_batched_run(self, ndatas, batch_size, use_double,
mag_bins, phase_bins, mag_overlap,
phase_overlap, use_fast,
shmem_lc, weighted,
freq_batch_size):
datas = [data(ndata=rand.randint(50, 100))
for i in range(ndatas)]
kwargs = dict(use_double=use_double,
mag_bins=mag_bins,
phase_bins=phase_bins,
phase_overlap=phase_overlap,
mag_overlap=mag_overlap,
use_fast=use_fast,
weighted=weighted)
proc = ConditionalEntropyAsyncProcess(**kwargs)
df = 0.02
max_freq = 1.1
min_freq = 0.9
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
run_kw = dict(shmem_lc=shmem_lc, freqs=freqs,
freq_batch_size=freq_batch_size)
batched_results = proc.batched_run(datas, **run_kw)
proc.finish()
non_batched_results = []
for d in datas:
r = proc.run([d], freqs=freqs)
proc.finish()
non_batched_results.extend(r)
for rb, rnb in zip(batched_results, non_batched_results):
fb, pb = rb
fnb, pnb = rnb
assert(not any(np.isnan(pb)))
assert(not any(np.isnan(pnb)))
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)
[docs] @pytest.mark.parametrize('ndatas', [1, 7])
@pytest.mark.parametrize('batch_size', [1, 3])
@pytest.mark.parametrize('use_double', [True, False])
@pytest.mark.parametrize('use_fast,weighted,shmem_lc,freq_batch_size',
[(True, False, False, 1),
(True, False, True, None),
(False, True, False, None),
(False, False, False, None)])
@pytest.mark.parametrize('phase_bins,phase_overlap',
[(10, 1)])
@pytest.mark.parametrize('mag_bins,mag_overlap',
[(5, 0)])
def test_batched_run_const_nfreq(self, ndatas, batch_size, use_double,
mag_bins, phase_bins, mag_overlap,
phase_overlap, use_fast, weighted,
shmem_lc, freq_batch_size):
frequencies = np.sort(10 + rand.rand(ndatas) * 100.)
datas = [data(ndata=rand.randint(50, 100),
freq=freq)
for i, freq in enumerate(frequencies)]
kwargs = dict(use_double=use_double,
mag_bins=mag_bins,
phase_bins=phase_bins,
phase_overlap=phase_overlap,
mag_overlap=mag_overlap,
use_fast=use_fast)
proc = ConditionalEntropyAsyncProcess(**kwargs)
df = 0.02
max_freq = 1.1
min_freq = 0.9
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
run_kw = dict(shmem_lc=shmem_lc, freqs=freqs,
freq_batch_size=freq_batch_size)
batched_results = proc.batched_run_const_nfreq(datas, **run_kw)
proc.finish()
procnb = ConditionalEntropyAsyncProcess(**kwargs)
non_batched_results = []
for d, (frq, p) in zip(datas, batched_results):
r = procnb.run([d], **run_kw)
procnb.finish()
non_batched_results.extend(r)
for f0, (fb, pb), (fnb, pnb) in zip(frequencies, batched_results,
non_batched_results):
if self.plot:
import matplotlib.pyplot as plt
plt.plot(fnb, pnb, color='k', lw=3)
plt.plot(fb, pb, color='r')
plt.axvline(f0)
plt.show()
assert(not any(np.isnan(pb)))
assert(not any(np.isnan(pnb)))
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)
[docs] @pytest.mark.parametrize('use_double', [True, False])
@pytest.mark.parametrize('use_fast,weighted,shmem_lc,freq_batch_size',
[(True, False, False, 1),
(True, False, True, None),
(False, True, False, None),
(False, False, False, None)])
@pytest.mark.parametrize('phase_bins,phase_overlap',
[(10, 1)])
@pytest.mark.parametrize('mag_bins,mag_overlap',
[(5, 0)])
@pytest.mark.parametrize('freq', [10.0])
@pytest.mark.parametrize('t0', [0.0])
@pytest.mark.parametrize('balanced_magbins', [True, False])
def test_inject_and_recover(self, freq,
use_double, mag_bins, phase_bins, mag_overlap,
phase_overlap, use_fast, t0, balanced_magbins,
weighted, shmem_lc, freq_batch_size):
kwargs = dict(use_double=use_double,
mag_bins=mag_bins,
phase_bins=phase_bins,
phase_overlap=phase_overlap,
mag_overlap=mag_overlap,
use_fast=use_fast,
balanced_magbins=balanced_magbins,
weighted=weighted)
proc = ConditionalEntropyAsyncProcess(**kwargs)
t, y, err = data(freq=freq, t0=t0)
df = 1. / (max(t) - min(t)) / 10
max_freq = 1.1 * freq
min_freq = 0.9 * freq
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
run_kw = dict(shmem_lc=shmem_lc, freq_batch_size=freq_batch_size)
results = proc.large_run([(t, y, err)],
freqs=freqs, **run_kw)
proc.finish()
frq, p = results[0]
best_freq = frq[np.argmin(p)]
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(frq, p)
ax.axvline(freq, ls='-', color='k')
ax.axvline(best_freq, ls=':', color='r')
plt.show()
# print best_freq, freq, abs(best_freq - freq) / freq
assert(not any(np.isnan(p)))
assert(abs(best_freq - freq) / freq < 3E-2)
[docs] def test_large_run(self, make_plot=False, **kwargs):
proc = ConditionalEntropyAsyncProcess(**kwargs)
t, y, dy = data(sigma=0.01, ndata=100, freq=4.)
df = 0.001
max_freq = 100.
min_freq = df
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
r0 = proc.run([(t, y, dy)], freqs=freqs)
r1 = proc.large_run([(t, y, dy)], freqs=freqs, max_memory=1e7)
f0, p0 = r0[0]
f1, p1 = r1[0]
rel_err = max(np.absolute(p0 - p1)) / np.median(np.absolute(p0))
print(max(np.absolute(p0 - p1)), rel_err)
assert_allclose(p0, p1, rtol=1e-4, atol=1e-2)
[docs] @pytest.mark.parametrize('use_double', [True, False])
@pytest.mark.parametrize('use_fast,weighted,shmem_lc,freq_batch_size',
[(True, False, False, 1)])
@pytest.mark.parametrize('phase_bins,phase_overlap',
[(10, 1)])
@pytest.mark.parametrize('mag_bins,mag_overlap',
[(5, 0)])
@pytest.mark.parametrize('freq', [10.0])
@pytest.mark.parametrize('balanced_magbins', [True, False])
def test_time_shift_invariance(self, freq,
use_double, mag_bins, phase_bins,
mag_overlap, phase_overlap, use_fast,
balanced_magbins, weighted,
shmem_lc, freq_batch_size):
kwargs = dict(use_double=use_double,
mag_bins=mag_bins,
phase_bins=phase_bins,
phase_overlap=phase_overlap,
mag_overlap=mag_overlap,
use_fast=use_fast,
balanced_magbins=balanced_magbins,
weighted=weighted)
proc = ConditionalEntropyAsyncProcess(**kwargs)
run_kw = dict(shmem_lc=shmem_lc, freq_batch_size=freq_batch_size)
for t0 in [-1e4, 1e4]:
t, y, err = data(freq=freq)
df = 1. / (max(t) - min(t)) / 10
max_freq = 1.1 * freq
min_freq = 0.9 * freq
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
results = proc.run([(t, y, err)], freqs=freqs, **run_kw)
proc.finish()
frq, p = results[0]
results_shift = proc.run([(t + t0, y, err)], freqs=freqs, **run_kw)
frq_shft, p_shft = results_shift[0]
best_freq = frq[np.argmin(p)]
best_freq_shft = frq_shft[np.argmin(p_shft)]
if self.plot:
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(frq, p)
ax.plot(frq_shft, p_shft)
ax.axvline(freq, ls='-', color='k')
ax.axvline(best_freq, ls=':', color='r')
plt.show()
assert(not any(np.isnan(p)))
assert(not any(np.isnan(p_shft)))
baseline = max(t) - min(t)
delta_f = abs(best_freq - best_freq_shft)
top_freq_is_close = delta_f * baseline < 1
diffs = np.absolute(p - p_shft)
atol, rtol = 1e-1 * max(np.absolute(p)), 2e-1
upper_limit = atol + rtol * np.absolute(p)
pct_out_of_bounds = sum(diffs > upper_limit) / len(diffs)
print(pct_out_of_bounds, delta_f * baseline)
assert(top_freq_is_close and pct_out_of_bounds < 5e-2)
[docs] @pytest.mark.parametrize('use_double', [True, False])
@pytest.mark.parametrize('shmem_lc', [True, False])
@pytest.mark.parametrize('freq_batch_size', [1, None])
@pytest.mark.parametrize('phase_bins,phase_overlap,mag_bins,mag_overlap',
[(10, 0, 5, 0), (10, 1, 5, 1)])
@pytest.mark.parametrize('freq', [12.0])
@pytest.mark.parametrize('t0', [0.0])
#@pytest.mark.parametrize('balanced_magbins', [True, False])
@pytest.mark.parametrize('balanced_magbins', [False])
@pytest.mark.parametrize('weighted', [False])
@pytest.mark.parametrize('force_nblocks', [1, None])
@pytest.mark.parametrize('ndata', [300])
def test_fast(self, freq, use_double, mag_bins, phase_bins, mag_overlap,
phase_overlap, t0, balanced_magbins, weighted,
shmem_lc, freq_batch_size, force_nblocks, ndata):
kwargs = dict(use_double=use_double,
mag_bins=mag_bins,
phase_bins=phase_bins,
phase_overlap=phase_overlap,
mag_overlap=mag_overlap,
balanced_magbins=balanced_magbins,
weighted=weighted)
proc_fast = ConditionalEntropyAsyncProcess(use_fast=True, **kwargs)
proc_slow = ConditionalEntropyAsyncProcess(use_fast=False, **kwargs)
t, y, err = data(freq=freq, t0=t0, ndata=ndata)
df = 1. / (max(t) - min(t)) / 10
max_freq = 1.1 * freq
min_freq = 0.9 * freq
nf = int((max_freq - min_freq) / df)
freqs = min_freq + df * np.arange(nf)
run_kw = dict(shmem_lc=shmem_lc,
freq_batch_size=freq_batch_size,
force_nblocks=force_nblocks)
results_fast = proc_fast.run([(t + t0, y, err)], freqs=freqs,
**run_kw)
proc_fast.finish()
frq_fast, p_fast = results_fast[0]
results_slow = proc_slow.run([(t + t0, y, err)], freqs=freqs)
proc_slow.finish()
frq_slow, p_slow = results_slow[0]
max_diff = 2e-2 * max(np.absolute(p_slow))
if self.plot and \
not all(np.absolute(p_slow - p_fast) < max_diff):
import matplotlib.pyplot as plt
f, ax = plt.subplots()
ax.plot(frq_slow, p_slow, alpha=0.5)
ax.plot(frq_fast, p_fast, alpha=0.5)
ax.axvline(freq, ls='-', color='k')
plt.show()
f, ax = plt.subplots()
ax.plot(frq_slow, (p_slow - p_fast) / max(np.absolute(p_slow)))
ax.axvline(freq, ls='-', color='k')
plt.show()
# print best_freq, freq, abs(best_freq - freq) / freq
assert(not any(np.isnan(p_slow)))
assert(not any(np.isnan(p_fast)))
assert_allclose(p_slow, p_fast, atol=2e-2 * max(np.absolute(p_slow)))