from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from builtins import zip
from builtins import range
from builtins import object
import numpy as np
import pytest
from numpy.testing import assert_allclose
from astropy.stats.lombscargle import LombScargle
from ..lombscargle import LombScargleAsyncProcess
from pycuda.tools import mark_cuda_test
import pycuda.autoinit
spp = 3
nfac = 3
lsrtol = 1E-2
lsatol = 1E-2
nfft_sigma = 5
rand = np.random.RandomState(100)
[docs]@pytest.fixture
def data(seed=100, sigma=0.1, ndata=100, freq=3.):
t = np.sort(rand.rand(ndata))
y = np.cos(2 * np.pi * freq * t)
y += sigma * rand.randn(len(t))
err = sigma * np.ones_like(y)
return t, y, err
[docs]def assert_similar(pdg0, pdg, top=5):
inds = (np.argsort(pdg0)[::-1])[:top]
p0 = np.asarray(pdg0)[inds]
p = np.asarray(pdg)[inds]
diff = np.absolute(p - p0)
res = sorted(zip(p0, p, diff), key=lambda x: -x[2])
for p0v, pv, dv in res:
if dv > 1e-3:
print(p0v, pv, dv)
assert_allclose(p, p0, atol=lsatol, rtol=lsrtol)
assert(all(diff < lsrtol * 0.5 * (p + p0) + lsatol))
[docs]class TestLombScargle(object):
[docs] def test_against_astropy_double(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=True,
sigma=nfft_sigma)
results = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
use_fft=True,
samples_per_peak=spp)
ls_proc.finish()
fgpu, pgpu = results[0]
power = LombScargle(t, y, err).power(fgpu)
assert_similar(power, pgpu)
[docs] def test_against_astropy_single(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=False,
sigma=nfft_sigma)
results = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp)
ls_proc.finish()
fgpu, pgpu = results[0]
power = LombScargle(t, y, err).power(fgpu)
assert_similar(power, pgpu)
[docs] def test_ls_kernel(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=False,
sigma=nfft_sigma)
results = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp)
ls_proc.finish()
fgpu, pgpu = results[0]
ls = LombScargle(t, y, err, fit_mean=True, center_data=False)
power = ls.power(fgpu)
assert_similar(power, pgpu)
[docs] def test_ls_kernel_direct_sums(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=True,
sigma=nfft_sigma)
results = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp, use_fft=False)
ls_proc.finish()
fgpu, pgpu = results[0]
ls = LombScargle(t, y, err, fit_mean=True, center_data=True)
power = ls.power(fgpu)
assert_similar(power, pgpu)
[docs] def test_ls_kernel_direct_sums_is_consistent(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=False,
sigma=nfft_sigma)
results_ds = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp, use_fft=False)
ls_proc.finish()
fgpu_ds, pgpu_ds = results_ds[0]
results_reg = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp, use_cpu_nfft=True)
ls_proc.finish()
fgpu_reg, pgpu_reg = results_reg[0]
assert_similar(pgpu_reg, pgpu_ds)
[docs] def test_ls_kernel_direct_sums_against_python(self):
t, y, err = data()
ls_proc = LombScargleAsyncProcess(use_double=False, sigma=nfft_sigma)
result_ds = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp, use_fft=False)
ls_proc.finish()
fgpu_ds, pgpu_ds = result_ds[0]
result_reg = ls_proc.run([(t, y, err)], nyquist_factor=nfac,
samples_per_peak=spp,
use_fft=False,
python_dir_sums=True)
ls_proc.finish()
fgpu_reg, pgpu_reg = result_reg[0]
assert_similar(pgpu_reg, pgpu_ds)
[docs] def test_multiple_datasets(self, ndatas=5):
datas = [data() for i in range(ndatas)]
ls_proc = LombScargleAsyncProcess(sigma=nfft_sigma)
mult_results = ls_proc.run(datas, nyquist_factor=nfac,
samples_per_peak=spp)
ls_proc.finish()
sing_results = []
for d in datas:
sing_results.extend(ls_proc.run([d], nyquist_factor=nfac,
samples_per_peak=spp))
ls_proc.finish()
for rb, rnb in zip(mult_results, sing_results):
fb, pb = rb
fnb, pnb = rnb
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)
[docs] def test_batched_run(self, ndatas=5, batch_size=5, sigma=nfft_sigma,
samples_per_peak=spp, nyquist_factor=nfac,
**kwargs):
datas = [data(ndata=rand.randint(50, 100))
for i in range(ndatas)]
ls_proc = LombScargleAsyncProcess(sigma=sigma, **kwargs)
kw = dict(nyquist_factor=nyquist_factor,
samples_per_peak=samples_per_peak)
batched_results = ls_proc.batched_run(datas, **kw)
ls_proc.finish()
non_batched_results = []
for d in datas:
r = ls_proc.run([d], nyquist_factor=nyquist_factor,
samples_per_peak=samples_per_peak)
ls_proc.finish()
non_batched_results.extend(r)
for rb, rnb in zip(batched_results, non_batched_results):
fb, pb = rb
fnb, pnb = rnb
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)
[docs] def test_batched_run_const_nfreq(self, make_plot=False, ndatas=27,
batch_size=5, sigma=nfft_sigma,
samples_per_peak=spp,
nyquist_factor=nfac,
**kwargs):
frequencies = 10 + rand.rand(ndatas) * 100.
datas = [data(ndata=rand.randint(50, 100),
freq=freq)
for i, freq in enumerate(frequencies)]
ls_proc = LombScargleAsyncProcess(sigma=sigma, **kwargs)
kw = dict(samples_per_peak=spp,
batch_size=batch_size)
kw.update(kwargs)
batched_results = ls_proc.batched_run_const_nfreq(datas, **kw)
ls_proc.finish()
ls_procnb = LombScargleAsyncProcess(sigma=nfft_sigma,
use_double=False, **kwargs)
non_batched_results = []
for d, (frq, p) in zip(datas, batched_results):
r = ls_procnb.run([d], freqs=frq, **kwargs)
ls_procnb.finish()
non_batched_results.extend(r)
# for f0, (fb, pb), (fnb, pnb) in zip(frequencies, batched_results,
# non_batched_results):
# print f0, fb[np.argmax(pb)], fnb[np.argmax(pnb)]
for f0, (fb, pb), (fnb, pnb) in zip(frequencies, batched_results,
non_batched_results):
if make_plot:
import matplotlib.pyplot as plt
plt.plot(fnb, pnb, color='k', lw=3)
plt.plot(fb, pb, color='r')
plt.axvline(f0)
plt.show()
assert_allclose(pnb, pb, rtol=lsrtol, atol=lsatol)
assert_allclose(fnb, fb, rtol=lsrtol, atol=lsatol)