Source code for poppy.wfe

"""
Analytic optical element classes to introduce a specified wavefront
error in an OpticalSystem

 * ZernikeWFE
 * ParameterizedWFE (for use with hexike or zernike basis functions)
 * SineWaveWFE
 * TODO: MultiSineWaveWFE ?
 * TODO: PowerSpectrumWFE
 * TODO: KolmogorovWFE

"""

import numpy as np
import collections
from functools import wraps
import astropy.units as u

from .optics import AnalyticOpticalElement, CircularAperture
from .poppy_core import Wavefront, PlaneType, BaseWavefront
from poppy.fresnel import FresnelWavefront
from .physical_wavefront import PhysicalFresnelWavefront

from . import zernike
from . import utils
from . import accel_math

from .accel_math import xp, ensure_not_on_gpu


__all__ = ['WavefrontError', 'ParameterizedWFE', 'ZernikeWFE', 'SineWaveWFE',
           'StatisticalPSDWFE', 'PowerSpectrumWFE', 'KolmogorovWFE', 'ThermalBloomingWFE']

def _check_wavefront_arg(f):
    """Decorator that ensures the first positional method argument
    is a poppy.Wavefront or FresnelWavefront
    """

    @wraps(f)
    def wrapper(*args, **kwargs):
        if not isinstance(args[1], BaseWavefront):
            raise ValueError("The first argument must be a Wavefront or FresnelWavefront object.")
        else:
            return f(*args, **kwargs)
    return wrapper


[docs]class WavefrontError(AnalyticOpticalElement): """A base class for different sources of wavefront error Analytic optical elements that represent wavefront error should derive from this class and override methods appropriately. Defined to be a pupil-plane optic. """ def __init__(self, **kwargs): if 'planetype' not in kwargs: kwargs['planetype'] = PlaneType.pupil super(WavefrontError, self).__init__(**kwargs) # in general we will want to see phase rather than intensity at this plane self.wavefront_display_hint = 'phase'
[docs] @_check_wavefront_arg def get_opd(self, wave): """Construct the optical path difference array for a wavefront error source as evaluated across the pupil for an input wavefront `wave` Parameters ---------- wave : Wavefront Wavefront object with a `coordinates` method that returns (y, x) coordinate arrays in meters in the pupil plane """ raise NotImplementedError('Not implemented yet')
[docs] def rms(self): """RMS wavefront error induced by this surface""" raise NotImplementedError('Not implemented yet')
[docs] def peaktovalley(self): """Peak-to-valley wavefront error induced by this surface""" raise NotImplementedError('Not implemented yet')
def _wave_y_x_to_rho_theta(y, x, pupil_radius): """ Return wave coordinates in (rho, theta) for a Wavefront object normalized such that rho == 1.0 at the pupil radius Parameters ---------- wave : Wavefront Wavefront object with a `coordinates` method that returns (y, x) coordinate arrays in meters in the pupil plane pupil_radius : float Radius (in meters) of a circle circumscribing the pupil. """ if accel_math._USE_NUMEXPR and not accel_math._USE_CUPY: rho = accel_math.ne.evaluate("sqrt(x**2+y**2)/pupil_radius") theta = accel_math.ne.evaluate("arctan2(y / pupil_radius, x / pupil_radius)") else: rho = xp.sqrt(x ** 2 + y ** 2) / pupil_radius theta = xp.arctan2(y / pupil_radius, x / pupil_radius) return rho, theta
[docs]class ParameterizedWFE(WavefrontError): """ Define an optical element in terms of its distortion as decomposed into a set of orthonormal basis functions (e.g. Zernikes, Hexikes, etc.). Included basis functions are normalized such that user-provided coefficients correspond to meters RMS wavefront aberration for that basis function. Parameters ---------- coefficients : iterable of numbers The contribution of each term to the final distortion, in meters RMS wavefront error. The coefficients are interpreted as indices in the order of Noll et al. 1976: the first term corresponds to j=1, second to j=2, and so on. radius : float Pupil radius, in meters. Defines the region of the input wavefront array over which the distortion terms will be evaluated. For non-circular pupils, this should be the circle circumscribing the actual pupil shape. basis_factory : callable basis_factory will be called with the arguments `nterms`, `rho`, `theta`, and `outside`. `nterms` specifies how many terms to compute, starting with the j=1 term in the Noll indexing convention for `nterms` = 1 and counting up. `rho` and `theta` are square arrays holding the rho and theta coordinates at each pixel in the pupil plane. `rho` is normalized such that `rho` == 1.0 for pixels at `radius` meters from the center. `outside` contains the value to assign pixels outside the radius `rho` == 1.0. (Always 0.0, but provided for compatibility with `zernike.zernike_basis` and `zernike.hexike_basis`.) """ @utils.quantity_input(coefficients=u.meter, radius=u.meter) def __init__(self, name="Parameterized Distortion", coefficients=None, radius=1*u.meter, basis_factory=None, **kwargs): if not isinstance(basis_factory, collections.abc.Callable): raise ValueError("'basis_factory' must be a callable that can " "calculate basis functions") self.radius = radius self.coefficients = coefficients self.basis_factory = basis_factory self._default_display_size = radius * 3 super(ParameterizedWFE, self).__init__(name=name, **kwargs)
[docs] @_check_wavefront_arg def get_opd(self, wave): y, x = self.get_coordinates(wave) rho, theta = _wave_y_x_to_rho_theta(y, x, self.radius.to(u.meter).value) combined_distortion = xp.zeros(rho.shape) nterms = len(self.coefficients) computed_terms = self.basis_factory(nterms=nterms, rho=rho, theta=theta, outside=0.0) for idx, coefficient in enumerate(self.coefficients): if coefficient == 0.0: continue # save the trouble of a multiply-and-add of zeros coefficient_in_m = coefficient.to(u.meter).value combined_distortion += coefficient_in_m * computed_terms[idx] return combined_distortion
[docs]class ZernikeWFE(WavefrontError): """ Define an optical element in terms of its Zernike components by providing coefficients for each Zernike term contributing to the analytic optical element. Parameters ---------- coefficients : iterable of floats Specifies the coefficients for the Zernike terms, ordered according to the convention of Noll et al. JOSA 1976. The coefficient is in meters of optical path difference (not waves). radius : float Pupil radius, in meters, over which the Zernike terms should be computed such that rho = 1 at r = `radius`. """ @utils.quantity_input(coefficients=u.meter, radius=u.meter) def __init__(self, name="Zernike WFE", coefficients=None, radius=None, aperture_stop=False, **kwargs): if radius is None: raise ValueError("You must specify a radius for the unit circle " "over which the Zernike polynomials are normalized") self.radius = radius self.aperture_stop = aperture_stop self.coefficients = coefficients self.circular_aperture = CircularAperture(radius=self.radius, gray_pixel=False, **kwargs) self._default_display_size = radius * 3 kwargs.update({'name': name}) super(ZernikeWFE, self).__init__(**kwargs)
[docs] @_check_wavefront_arg def get_opd(self, wave): """ Parameters ---------- wave : poppy.Wavefront (or float) Incoming Wavefront before this optic to set wavelength and scale, or a float giving the wavelength in meters for a temporary Wavefront used to compute the OPD. """ # the Zernike optic, being normalized on a circle, is # implicitly also a circular aperture: aperture_intensity = self.circular_aperture.get_transmission(wave) pixelscale_m = wave.pixelscale.to(u.meter / u.pixel).value # whether we can use pre-cached zernikes for speed depends on whether # there are any coord offsets. See #229 has_offset_coords = (hasattr(self, "shift_x") or hasattr(self, "shift_y") or hasattr(self, "rotation")) if has_offset_coords: y, x = self.get_coordinates(wave) rho, theta = _wave_y_x_to_rho_theta(y, x, self.radius.to(u.meter).value) combined_zernikes = xp.zeros(wave.shape, dtype=xp.float64) for j, k in enumerate(self.coefficients, start=1): k_in_m = k.to(u.meter).value if has_offset_coords: combined_zernikes += k_in_m * zernike.zernike1( j, rho=rho, theta=theta, outside=0.0, noll_normalize=True ) else: try: combined_zernikes += k_in_m * zernike.cached_zernike1( j, wave.shape, pixelscale_m, self.radius.to(u.meter).value, outside=0.0, noll_normalize=True ) except TypeError: # can't use cached_zernikes if the user changed between CPU and CuPy y, x = self.get_coordinates(wave) rho, theta = _wave_y_x_to_rho_theta(y, x, self.radius.to(u.meter).value) combined_zernikes += k_in_m * zernike.zernike1( j, rho=rho, theta=theta, outside=0.0, noll_normalize=True ) combined_zernikes[aperture_intensity==0] = 0 return combined_zernikes
[docs] def get_transmission(self, wave): if self.aperture_stop: return self.circular_aperture.get_transmission(wave) else: return xp.ones(wave.shape)
[docs]class SineWaveWFE(WavefrontError): """ A single sine wave ripple across the optic Specified as a a spatial frequency in cycles per meter, an optional phase offset in cycles, and an amplitude. By default the wave is oriented in the X direction. Like any AnalyticOpticalElement class, you can also specify a rotation parameter to rotate the direction of the sine wave. (N.b. we intentionally avoid letting users specify this in terms of a spatial wavelength because that would risk potential ambiguity with the wavelength of light.) """ @utils.quantity_input(spatialfreq=1. / u.meter, amplitude=u.meter) def __init__(self, name='Sine WFE', spatialfreq=1.0, amplitude=1e-6, phaseoffset=0, **kwargs): super(WavefrontError, self).__init__(name=name, **kwargs) self.sine_spatial_freq = spatialfreq self.sine_phase_offset = phaseoffset # note, can't call this next one 'amplitude' since that's already a property self.sine_amplitude = amplitude
[docs] @_check_wavefront_arg def get_opd(self, wave): """ Parameters ---------- wave : poppy.Wavefront (or float) Incoming Wavefront before this optic to set wavelength and scale, or a float giving the wavelength in meters for a temporary Wavefront used to compute the OPD. """ y, x = self.get_coordinates(wave) # in meters opd = self.sine_amplitude.to(u.meter).value * \ np.sin(2 * np.pi * (x * self.sine_spatial_freq.to(1 / u.meter).value + self.sine_phase_offset)) return opd
[docs]class StatisticalPSDWFE(WavefrontError): """ Statistical PSD WFE class from power law for optical noise. Parameters ---------- name : string name of the optic index: float negative power law spectra index, defaults to 3 wfe: astropy quantity wfe in linear astropy units, defaults to 50 nm radius: astropy quantity radius of optic in linear astropy units, defaults to 1 m seed : integer seed for the random phase screen generator """ @utils.quantity_input(wfe=u.nm, radius=u.meter) def __init__(self, name='PSD WFE', index=3.0, wfe=50*u.nm, radius=1*u.meter, seed=None, **kwargs): super().__init__(name=name, **kwargs) self.index = index self.wfe = wfe self.radius = radius self.seed = seed
[docs] @_check_wavefront_arg def get_opd(self, wave): """ Parameters ---------- wave : poppy.Wavefront (or float) Incoming Wavefront before this optic to set wavelength and scale, or a float giving the wavelength in meters for a temporary Wavefront used to compute the OPD. """ y, x = self.get_coordinates(wave) rho, theta = _wave_y_x_to_rho_theta(y, x, self.radius.to(u.meter).value) rho[rho == 0] = 0.00001 # get rid of infinity: see Issue #452 psd = xp.power(rho, -self.index) # generate power-law PSD psd_random_state = xp.random.RandomState() psd_random_state.seed(self.seed) # if provided, set a seed for random number generator rndm_phase = psd_random_state.normal(size=(len(y), len(x))) # generate random phase screen rndm_psd = xp.fft.fftshift(xp.fft.fft2(xp.fft.fftshift(rndm_phase))) # FT of random phase screen to get random PSD scaled = xp.sqrt(psd) * rndm_psd # scale random PSD by power-law PSD phase_screen = xp.fft.ifftshift(xp.fft.ifft2(xp.fft.ifftshift(scaled))).real # FT of scaled random PSD makes phase screen phase_screen -= xp.mean(phase_screen) # force zero-mean self.opd = phase_screen / xp.std(phase_screen) * self.wfe.to(u.m).value # normalize to wanted input rms wfe return self.opd
[docs]class PowerSpectrumWFE(WavefrontError): r""" WFE model specified via a Power Spectral Density (PSD), or a list of multiple PSDs, which follow von Karman PSD model: :math:`P(k) = \frac{\beta} {\left( \left(\frac{1}{L_{0}}\right)^{2} + |k|^{2} \right)^{{\alpha/2}}} e^{-(|k|l_{0})^{2}} + \beta_{sr}` where: P: astropy quantity Power Spectral Density at a spatial frequency. Units: :math: `m^{2}m^{2}` Assumes surface units of meters (first :math: `m^{2}`) k: astropy quantity Spatial frequency value, units 1/m :math:`\alpha`: float The PSD index value :math:`\beta`: astropy quantity The normalization constant. In units of :math: `\frac{m^{2}}{m^{\alpha-2}}` Numerator assumes surface units of meters Denominator assumes spatial frequency units are 1/m :math:`L_{0}`: astropy quantity The outer scale value, where the low spatial frequency flattens. Units: m :math:`l_{0}`: float Inner scale value, where the high spatial frequency flattens. :math:`\beta_{sr}`: astropy quantity Surface roughness normalization. Should match units of PSD. References: Males, Jared. MagAO-X Preliminary-Design Review, Section 5.1: Optics Specifications, Eqn 1 https://magao-x.org/docs/handbook/appendices/pdr/ Lumbres, et al. In Prep. Parameters ---------- name : string name of the optic psd_parameters: list (for single PSD set) or list of lists (multiple PSDs) List of specified PSD parameters. If there are multiple PSDs, then each list element is a list of specified PSD parameters. i.e. [ [PSD_list_0], [PSD_list_1]] The PSD parameters in a list are ordered as follows: [alpha, beta, outer_scale, inner_scale, surf_roughness] where: alpha: float The PSD index value. beta: astropy quantity The normalization constant. In units of :math: `\frac{m^{2}}{m^{\alpha-2}}` Numerator assumes surface units of meters Denominator assumes spatial frequency units are 1/m outer_scale: astropy quantity The outer scale value, where the low spatial frequency flattens. Unit requirement: meters inner_scale: float Inner scale value, where the high spatial frequency flattens. surf_roughness: astropy quantity Surface roughness normalization. Should match units of PSD. psd_weight: iterable list of floats Specifies the weight multiplier to set onto each model PSD seed : integer Seed for the random phase screen generator apply_reflection: boolean Applies 2x scale for the OPD as needed for reflection. Default to False. Set to True if the PSD model only accounts for surface. screen_size: integer Sets how large the PSD matrix will be calculated. The PSD matrix needs to be larger than the wavefront for Fourier transform padding purposes. If None passed in, then code will default size to 4x wavefront's side. Default to None. rms: astropy quantity Optional. Use this to force the wfe RMS If a value is passed in, this is the surface rms value (not OPD) in meters. If None passed, then the wfe RMS produced is what shows up in PSD calculation. Default to None. incident_angle: astropy quantity Adjusts the WFE based on reflected beam distortion. Does not distort the beam (remains circular), but will get the rms equivalent value. Can be passed as either degrees or radians. Default is 0 degrees (paraxial). radius: astropy quantity Optional. However, mandatory if rms parameter is passed. If a value is passed in, this is the beam radius value for calculating the generated WFE rms to compare with the normalized rms value. Default to None. """ @utils.quantity_input(rms=u.nm, radius=u.meter, incident_angle=u.deg) def __init__(self, name='Model PSD WFE', psd_parameters=None, psd_weight=None, seed=None, apply_reflection=False, screen_size=None, rms=None, incident_angle=0*u.deg, radius=None, **kwargs): super().__init__(name=name, **kwargs) self.psd_parameters = psd_parameters self.seed = seed self.apply_reflection = apply_reflection self.screen_size = screen_size self.rms = rms if self.rms is not None and radius is None: raise ValueError("You must specify a radius for rms normalization.") self.radius = radius # check incident angle units if incident_angle >= 90*u.deg: raise ValueError("Incident angle must be less than 90 degrees, or equivalent in other units.") self.incident_angle = incident_angle if psd_weight is None: self.psd_weight = xp.ones((len(psd_parameters))) # default to equal weights else: self.psd_weight = psd_weight
[docs] @_check_wavefront_arg def get_opd(self, wave): """ Parameters ---------- wave : poppy.Wavefront (or float) Incoming Wavefront before this optic to set wavelength and scale, or a float giving the wavelength in meters for a temporary Wavefront used to compute the OPD. """ # check that screen size is at least larger than wavefront size wave_size = wave.shape[0] if wave.ispadded is True: # get true wave size if padded to oversample. wave_size = int(wave_size/wave.oversample) # check that screen size exists if self.screen_size is None: self.screen_size = wave.shape[0] if wave.ispadded is False: # sometimes the wave is not padded. self.screen_size = self.screen_size * 4 # default 4x, open for discussion elif self.screen_size < wave_size: raise Exception('PSD screen size smaller than wavefront size, recommend at least 2x larger') # get pixelscale to calculate spatial frequency spacing dk = 1/(self.screen_size * wave.pixelscale * u.pix) # eliminate the pixel units # build spatial frequency map cen = int(self.screen_size/2) maskY, maskX = xp.mgrid[-cen:cen, -cen:cen] ky = maskY*dk.to_value(1./u.m) kx = maskX*dk.to_value(1./u.m) k_map = xp.sqrt(kx**2 + ky**2) # unitless for the math, but actually 1/m # calculate the PSD psd = xp.zeros_like(k_map) # initialize the total PSD matrix for n in range(0, len(self.psd_weight)): # loop-internal localized PSD variables alpha = self.psd_parameters[n][0] beta = self.psd_parameters[n][1] outer_scale = self.psd_parameters[n][2] inner_scale = self.psd_parameters[n][3] surf_roughness = self.psd_parameters[n][4] # unit check psd_units = beta.unit / ((dk.unit**2)**(alpha/2)) assert surf_roughness.unit == psd_units, "PSD parameter units are not consistent, please re-evaluate parameters." surf_unit = (psd_units*(dk.unit**2))**(0.5) # initialize loop-internal PSD matrix psd_local = xp.zeros_like(psd) # Calculate the PSD equation denominator based on outer_scale presence if outer_scale.value == 0: # skip out or else PSD explodes # temporary overwrite of k_map at k=0 to stop div/0 problem k_map[cen][cen] = 1*dk.value # calculate PSD as normal psd_denom = (k_map**2)**(alpha/2) # calculate the immediate PSD value psd_interm = (beta.value*xp.exp(-((k_map*inner_scale)**2))/psd_denom) # overwrite PSD at k=0 to be 0 instead of the original infinity psd_interm[cen][cen] = 0 # return k_map to original state k_map[cen][cen] = 0 else: psd_denom = ((outer_scale.value**(-2)) + (k_map**2))**(alpha/2) # unitless currently psd_interm = (beta.value*xp.exp(-((k_map*inner_scale)**2))/psd_denom) # apply surface roughness psd_interm = psd_interm + surf_roughness.value # apply as the sum with the weight of the PSD model psd = psd + (self.psd_weight[n] * psd_interm) # this should all be m2 [surf_unit]2, but stay unitless for all calculations # set the random noise psd_random = xp.random.RandomState() psd_random.seed(self.seed) rndm_noise = xp.fft.fftshift(xp.fft.fft2(psd_random.normal(size=(self.screen_size, self.screen_size)))) psd_scaled = (xp.sqrt(psd/(wave.pixelscale.value**2)) * rndm_noise) # opd = ((np.fft.ifft2(np.fft.ifftshift(psd_scaled)).real*surf_unit).to(u.m)).value # opd = ((xp.fft.ifft2(xp.fft.ifftshift(psd_scaled)).real))*1e-9 # this is assuming the opd is calculated in nm opd = ((xp.fft.ifft2(xp.fft.ifftshift(psd_scaled)).real))*(1*surf_unit).to_value(u.m) # fixed the hardcoded 1e-9 # Set rms value based on the active region of beam if self.rms is not None: circ = CircularAperture(name='beam diameter', radius=self.radius) ap = circ.get_transmission(wave) opd_crop = utils.pad_or_crop_to_shape(array=opd, target_shape=wave.shape) active_ap = opd_crop[ap==True] rms_measure = xp.sqrt(xp.mean(xp.square(active_ap))) # measured rms from aperture opd *= self.rms.to(u.m).value/rms_measure # appropriately scales entire OPD # apply the angle adjustment for rms if self.incident_angle.value != 0: opd /= np.cos(self.incident_angle.to(u.radian).value) # for cupy, convert angle to radians to use just the value # Set reflection OPD if self.apply_reflection == True: opd *= 2 # Resize PSD screen to shape of wavefront if self.screen_size > wave.shape[0]: # crop to wave shape if needed opd = utils.pad_or_crop_to_shape(array=opd, target_shape=wave.shape) self.opd = opd return self.opd
[docs]class KolmogorovWFE(WavefrontError): """ A turbulent phase screen. This is an implementation of a turbulent phase screen as by the Kolmogorov theory of turbulence. Parameters ----------------- r0 : astropy.quantity Fried parameter (m). Cn2 : astropy.quantity Index-of-refraction structure constant (m^{-2/3}). dz : astropy.quantity Propagation distance (m). inner_scale : astropy.quantity Inner scale of the turbulence (m). The inner scale affects the calculation results if kind = 'von Karman', 'Tatarski', or 'Hill'. outer_scale : astropy.quantity Outer scale of the turbulence (m). The outer scale only affects the calculation results if kind='von Karman'. kind : string Kind of the spatial power spectrum. Must be one of 'Kolmogorov', 'Tatarski', 'von Karman', 'Hill'. seed : integer Seed for the random number generator when creating the phase screen. This can be helpful when multiple fields (for example different modes) should propagate through an identical atmosphere. References ------------------- For a general overview of the Kolmogorov theory, read L. C. Andrews and R. L. Phillips, Laser Beam Propagation Through Random Media, 2nd ed. (Society of Photo Optical, 2005). Other relevant references are mentioned in the respective functions. """ @utils.quantity_input(r0=u.meter, Cn2=u.meter**(-2/3), dz=u.meter, inner_scale=u.meter, outer_scale=u.meter) def __init__(self, name="Kolmogorov WFE", r0=None, Cn2=None, dz=None, inner_scale=None, outer_scale=None, kind='Kolmogorov', seed=None, **kwargs): if dz is None and not all(item is not None for item in [r0, Cn2]): raise ValueError('To prepare a turbulent phase screen, dz and either Cn2 or r0 must be given.') super(KolmogorovWFE, self).__init__(name=name, **kwargs) self.r0 = r0 self.Cn2 = Cn2 self.seed = seed self.dz = dz self.inner_scale = inner_scale self.outer_scale = outer_scale self.kind = kind
[docs] def get_opd(self, wave): """ Returns an optical path difference for a turbulent phase screen. Parameters ----------------- wave : wavefront object Wavefront to calculate the phase screen for. References ------------------- J. A. Fleck Jr, J. R. Morris, and M. D. Feit, Appl. Phys. 10, 129 (1976). E. M. Johansson and D. T. Gavel, in Proc. SPIE, edited by J. B. Breckinridge (International Society for Optics and Photonics, 1994), pp. 372–383. B. J. Herman and L. A. Strugala, in Proc. SPIE, edited by P. B. Ulrich and L. E. Wilson (International Society for Optics and Photonics, 1990), pp. 183–192. G. Gbur, J. Opt. Soc. Am. A 31, 2038 (2014). D. L. Knepp, Proc. IEEE 71, 722 (1983). """ npix = wave.shape[0] pixelscale = wave.pixelscale.to(u.m/u.pixel) * u.pix dq = 2.0*np.pi/npix/pixelscale # create complex random numbers with required symmetry a = self.rand_turbulent(npix) # get phase spectrum phi = self.power_spectrum(wave=wave, kind=self.kind) # calculate OPD # Note: Factor dq consequence of delta function having a unit opd_FFT = dq.value*a*xp.sqrt(2.0*np.pi*self.dz.to_value(u.m)*phi) opd = npix**2*xp.fft.ifft2(opd_FFT) self.opd = xp.real(opd) return self.opd
[docs] @utils.quantity_input(wavelength=u.meter) def get_Cn2(self, wavelength): """ Returns the index-of-refraction structure constant (m^-2/3). Parameters ----------------- wavelength : float The wavelength (m). References ------------------- B. J. Herman and L. A. Strugala, in Proc. SPIE, edited by P. B. Ulrich and L. E. Wilson (International Society for Optics and Photonics, 1990), pp. 183–192. """ if all(item is not None for item in [self.r0, self.dz]): r0 = self.r0.to(u.m) wavelength2 = wavelength.to(u.m)**2 return wavelength2/self.dz * (r0/0.185)**(-5.0/3.0) elif self.Cn2 is not None: return self.Cn2.to(u.m**(-2/3))
[docs] def rand_symmetrized(self, npix, sign): """ Returns a real-valued random number array of shape (npix, npix) with the symmetry required for a turbulent phase screen. Parameters ----------------- npix : int Number of pixels. sign : int Sign of mirror symmetry. Must be either +1 or -1. References ------------------- Eq. (65) in J. A. Fleck Jr, J. R. Morris, and M. D. Feit, Appl. Phys. 10, 129 (1976). """ if np.abs(sign) != 1: raise ValueError("sign must be either +1 or -1") sign = float(sign) # create Gaussian, zero-mean, unit variance random numbers random_numbers = xp.random.RandomState() random_numbers.seed(self.seed) a = random_numbers.normal(size=(npix, npix)) # apply required symmetry a[0, int(npix/2)+1:npix] = sign*a[0, 1:int(npix/2)][::-1] a[int(npix/2)+1:npix, 0] = sign*a[1:int(npix/2), 0][::-1] a[int(npix/2)+1:npix, int(npix/2)+1:npix] = sign*xp.rot90(a[1:int(npix/2), 1:int(npix/2)], 2) a[int(npix/2)+1:npix, 1:int(npix/2)] = sign*xp.rot90(a[1:int(npix/2), int(npix/2)+1:npix], 2) # remove any overall phase resulting from the zero-frequency component a[0, 0] = 0.0 return a
[docs] def rand_turbulent(self, npix): """ Returns a complex-valued random number array of shape (npix, npix) with the symmetry required for a turbulent phase screen. Parameters ----------------- npix : int Number of pixels. References ------------------- Eq. (63) in J. A. Fleck Jr, J. R. Morris, and M. D. Feit, Appl. Phys. 10, 129 (1976). """ # create real-valued random numbers with required symmetry a = self.rand_symmetrized(npix, 1) b = self.rand_symmetrized(npix, -1) # create complex-valued random number with required variance c = (a + 1j*b)/np.sqrt(2.0) return c
[docs] def power_spectrum(self, wave, kind='Kolmogorov'): """ Returns the spatial power spectrum. Parameters ----------------- wave : wavefront object Wavefront to calculate the power spectrum for. kind : string The type of the power spectrum, must be one of 'Kolmogorov', 'Tatarski', 'von Karman', 'Hill'. References ------------------- G. Gbur, J. Opt. Soc. Am. A 31, 2038 (2014). R. Frehlich, Appl. Opt. 39, 393 (2000). """ if not any(kind==item for item in ['Kolmogorov', 'Tatarski', 'von Karman', 'Hill']): raise ValueError('Kind of power spectrum not correctly defined.') Cn2 = self.get_Cn2(wave.wavelength) coordinates = wave.coordinates() npix = coordinates[0].shape[0] pixelscale = wave.pixelscale.to(u.m/u.pixel) * u.pixel q = xp.fft.fftfreq(npix, d=pixelscale.value)*2.0*np.pi # so q has units of 1/m? qx, qy = xp.meshgrid(q, q) q2 = (qx**2 + qy**2) if kind=='von Karman': if self.outer_scale is not None: q2 += 1.0/self.outer_scale.to_value(u.m)**2 else: raise ValueError('If von Karman kind of turbulent phase \ screen is chosen, the outer scale L_0 \ must be provided.') q2[0, 0] = xp.inf # this is to avoid a possible error message in the next line phi = 0.0330054*Cn2.value*q2**(-11.0/6.0) if kind=='Tatarski' or kind=='von Karman' or kind=='Hill': if self.inner_scale is not None: k2 = (qx**2 + qy**2) if kind=='Tatarski' or kind=='von Karman': m = (5.92/self.inner_scale.to_value(u.m))**2 phi *= xp.exp(-k2/m) elif kind=='Hill': m = xp.sqrt(k2)*self.inner_scale.to_value(u.m) # m is supposed to be dimensionless? phi *= (1.0 + 0.70937*m + 2.8235*m**2 - 0.28086*m**3 + 0.08277*m**4) * np.exp(-1.109*m) else: raise ValueError('If von Karman, Hill, or Tatarski kind \ of turbulent phase screen is chosen, the \ inner scale l_0 must be provided.') return phi
[docs]class ThermalBloomingWFE(WavefrontError): """ A thermal blooming phase screen. Parameters ----------------- abs_coeff : astropy.quantity Aerosol absorption coefficient (m^-1). dz : astropy.quantity Propagation distance (m). v0x : astropy.quantity x-component of ambient wind velocity (m.s^-1). v0y : astropy.quantity y-component of ambient wind velocity (m.s^-1). cp : astropy.quantity Specific isobaric heat capacity (J.kg^-1.K^-1). cV : astropy.quantity Specific isochore heat capacity (J.kg^-1.K^-1). rho0 : astropy.quantity Ambient mass density (kg.m^-3). eta : astropy.quantity Dynamic viscosity (Pa.s). p0 : astropy.quantity Ambient pressure (Pa). T0 : astropy.quantity Ambient temperature (K). direction : string Direction of wind velocity. Must be one of 'x' or 'y'. The direction affects the calculation results if isobaric=True. isobaric : bool Whether to use the isobaric approximation. Notes ------------------- Initial values are those for dry air at room temperature, taken from: https://www.engineeringtoolbox.com/dry-air-properties-d_973.html """ @utils.quantity_input(abs_coeff=1/u.meter, dz=u.meter, v0x=u.meter/u.second, v0y=u.meter/u.second, cp=u.Joule/u.kg/u.Kelvin, cV=u.Joule/u.kg/u.Kelvin, rho0=u.kg/u.meter**3, p0=u.Pascal, eta=u.Pascal*u.second, T0=u.Kelvin) def __init__(self, abs_coeff, dz, name="Thermal Blooming WFE", v0x=0.0*u.m/u.s, v0y=0.0*u.m/u.s, cp=1.0049*u.kJ/u.kg/u.K, cV=0.7178*u.kJ/u.kg/u.K, rho0=1.177*u.kg/u.m**3, eta=18.46*u.uPa*u.s, p0=101.325*u.kPa, T0=300.0*u.K, direction='x', isobaric=False, **kwargs): super(ThermalBloomingWFE, self).__init__(name=name, **kwargs) self.abs_coeff = abs_coeff.to(1/u.m).value self.dz = dz.to(u.m).value self.v0x = v0x.to(u.m/u.s).value self.v0y = v0y.to(u.m/u.s).value self.cp = cp.to(u.J/u.kg/u.K).value self.cV = cV.to(u.J/u.kg/u.K).value self.rho0 = rho0.to(u.kg/u.m**3).value self.eta = eta.to(u.Pa*u.s).value self.p0 = p0.to(u.Pa).value self.T0 = T0.to(u.Kelvin).value self.direction = direction self.isobaric = isobaric self.gamma = self.cp/self.cV self.cs2 = self.gamma*self.p0/self.rho0
[docs] def nat_conv_vel(self, wave): """ Approximation for natural convection velocity (m.s^-1). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the natural convection velocity for. References ------------------- Smith, D. C. High-power laser propagation: Thermal blooming. Proc. IEEE 65, 1679–1714 (1977). """ g = 9.81 # Gravitational constant (m.s^-2) P = wave.power return (2.0*self.abs_coeff*P*g / (self.rho0*self.cp*self.T0))**(1.0/3.0)
[docs] def get_opd(self, wave): """ Returns an optical path difference for a thermal blooming phase screen (m^-1). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the phase screen for. References ------------------- Fleck, J. A., Jr, Morris, J. R. & Feit, M. D. Time-dependent propagation of high energy laser beams through the atmosphere. Appl. Phys. 10, 129–160 (1976). Fleck, J. A., Jr, Morris, J. R. & Feit, M. D. Time-dependent propagation of high-energy laser beams through the atmosphere: II. Appl. Phys. 14, 99–115 (1977). """ # Check if correct wavefront object type if type(wave) is not PhysicalFresnelWavefront: raise AttributeError("The wavefront must be of type \ 'PhysicalFresnelWavefront' to calculate a \ thermal blooming phase screen.") # Set velocity components according to input if self.v0x==0.0 and self.v0y==0.0: # If stagnation point, use approximation for natural convection velocity self.v0y = -self.nat_conv_vel(wave) self.isobaric = True self.direction ='y' elif self.isobaric: if self.direction=='x' and self.v0x!=0.0: self.v0y = 0.0 elif self.direction=='y' and self.v0y!=0.0: self.v0x = 0.0 else: raise ValueError("The direction must be either 'x' or 'y' \ and the respective velocity non-vanishing.") else: # Use given values (defined in init) pass rho = self.rho(wave) opd = (wave.n0-1.0)*rho*self.dz/(wave.n0*self.rho0) self.opd = opd return xp.array(opd)
[docs] def rho(self, wave): """ Top-level routine to calculate density changes (kg.m^-3). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the density changes for. """ if (self.isobaric): rho = self.rho_isobaric(wave) else: rho = self.rho_nonisobaric(wave) return rho
[docs] def rho_isobaric(self, wave): """ Isobaric density variation (kg.m^-3). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the density changes for. References ------------------- Fleck, J. A., Jr, Morris, J. R. & Feit, M. D. Time-dependent propagation of high energy laser beams through the atmosphere. Appl. Phys. 10, 129–160 (1976). """ gamma = self.gamma cs2 = self.cs2 intens = ensure_not_on_gpu(wave.intensity) npix = wave.npix dx = wave.dx rho = np.zeros((npix, npix)) if (self.direction == 'x'): v0 = self.v0x if v0 > 0.0: for idx_x in range(npix): for idx_y in range(npix): rho[idx_x, idx_y] = np.sum(intens[0:idx_x, idx_y]) else: for idx_x in range(npix): for idx_y in range(npix): rho[idx_x, idx_y] = np.sum(intens[idx_x:-1, idx_y]) elif (self.direction == 'y'): v0 = self.v0y if v0 > 0.0: for idx_x in range(npix): for idx_y in range(npix): rho[idx_x, idx_y] = np.sum(intens[idx_x, 0:idx_y]) else: for idx_x in range(npix): for idx_y in range(npix): rho[idx_x, idx_y] = np.sum(intens[idx_x, idx_y:-1]) else: raise AttributeError('The direction must be either x or y.') rho *= -(gamma-1.0)*self.abs_coeff*dx/cs2/np.abs(v0) return xp.array(rho)
[docs] def rho_dot_FT(self, wave): """ Fourier transform of the derivative of the non-isobaric density variation (unit?). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the density changes for. References ------------------- Fleck, J. A., Jr, Morris, J. R. & Feit, M. D. Time-dependent propagation of high-energy laser beams through the atmosphere: II. Appl. Phys. 14, 99–115 (1977). """ if (self.v0x == 0.0 and self.v0y == 0.0): raise ValueError('The velocity must be non-zero in at least one direction.') npix = wave.npix gamma = self.gamma cs2 = self.cs2 rho0 = self.rho0 intens = ensure_not_on_gpu(wave.intensity) rho_dot_FT = np.fft.fft2(intens) rho_dot_FT *= -(gamma-1.0)*self.abs_coeff/cs2 q = ensure_not_on_gpu(wave.q) for idx_x in range(npix): for idx_y in range(npix): if idx_x == idx_y == 0: rho_dot_FT[idx_x, idx_y] *= 0.0 else: rho_dot_FT[idx_x, idx_y] *= (q[idx_x]**2 + q[idx_y]**2) rho_dot_FT[idx_x, idx_y] /= (q[idx_x]**2 + q[idx_y]**2 * (1.0 + 4.0j*self.eta*(self.v0x*q[idx_x] + self.v0y*q[idx_y])/rho0/cs2/3.0) - (self.v0x*q[idx_x] + self.v0y*q[idx_y])**2/cs2) return rho_dot_FT
[docs] def rho_nonisobaric(self, wave): """ Non-isobaric density variations (kg.m^-3). Parameters ----------------- wave : poppy.PhysicalFresnelWavefront Wavefront to calculate the density changes for. References ------------------- Fleck, J. A., Jr, Morris, J. R. & Feit, M. D. Time-dependent propagation of high-energy laser beams through the atmosphere: II. Appl. Phys. 14, 99–115 (1977). """ npix = wave.npix rho = np.zeros((npix, npix), dtype=complex) eps = 1.0e-16 # this is to prevent numpy.sign to return 0 dx = wave.dx beta = np.abs((self.v0y+eps)/(self.v0x+eps)) i_prime = np.sign(self.v0x+eps) j_prime = np.sign(self.v0y+eps) rho_dot_FT = self.rho_dot_FT(wave) rho_dot = np.fft.ifft2(rho_dot_FT) if beta > 1.0: a = dx/2/abs(self.v0y) if i_prime == 1 and j_prime == 1: for idx_x in range(npix): rho[idx_x, 0] = a*rho_dot[idx_x, 0] for idx_y in range(1, npix): for idx_x in range(npix): rho[idx_x, idx_y] = (1.0-1.0/beta)*rho[idx_x, idx_y-1] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-1.0/beta)*rho_dot[idx_x, idx_y-1] if idx_x != 0: rho[idx_x, idx_y] += 1.0/beta*rho[idx_x-1, idx_y-1] rho[idx_x, idx_y] += a/beta*rho_dot[idx_x-1, idx_y-1] elif i_prime == 1 and j_prime == -1: for idx_x in range(npix): rho[idx_x, -1] = a*rho_dot[idx_x, -1] for idx_y in reversed(range(npix-1)): for idx_x in range(npix): rho[idx_x, idx_y] = (1.0-1.0/beta)*rho[idx_x, idx_y+1] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-1.0/beta)*rho_dot[idx_x, idx_y+1] if idx_x != 0: rho[idx_x, idx_y] += 1.0/beta*rho[idx_x-1, idx_y+1] rho[idx_x, idx_y] += a/beta*rho_dot[idx_x-1, idx_y+1] elif i_prime == -1 and j_prime == 1: for idx_x in range(npix): rho[idx_x, 0] = a*rho_dot[idx_x, 0] for idx_y in range(1, npix): for idx_x in range(npix): rho[idx_x, idx_y] = (1.0-1.0/beta)*rho[idx_x, idx_y-1] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-1.0/beta)*rho_dot[idx_x, idx_y-1] if idx_x != npix-1: rho[idx_x, idx_y] += 1.0/beta*rho[idx_x+1, idx_y-1] rho[idx_x, idx_y] += a/beta*rho_dot[idx_x+1, idx_y-1] elif i_prime == -1 and j_prime == -1: for idx_x in range(npix): rho[idx_x, -1] = a*rho_dot[idx_x, -1] for idx_y in reversed(range(npix-1)): for idx_x in range(npix): rho[idx_x, idx_y] = (1.0-1.0/beta)*rho[idx_x, idx_y+1] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-1.0/beta)*rho_dot[idx_x, idx_y+1] if idx_x != npix-1: rho[idx_x, idx_y] += 1.0/beta*rho[idx_x+1, idx_y+1] rho[idx_x, idx_y] += a/beta*rho_dot[idx_x+1, idx_y+1] else: # beta <= 1.0 a = dx/2.0/abs(self.v0x) if i_prime == 1 and j_prime == 1: for idx_y in range(npix): rho[0, idx_y] = a*rho_dot[0, idx_y] for idx_x in range(1, npix): for idx_y in range(npix): rho[idx_x, idx_y] = (1.0-beta)*rho[idx_x-1, idx_y] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-beta)*rho_dot[idx_x-1, idx_y] if idx_y != 0: rho[idx_x, idx_y] += beta*rho[idx_x-1, idx_y-1] rho[idx_x, idx_y] += a*beta*rho_dot[idx_x-1, idx_y-1] elif i_prime == 1 and j_prime == -1: for idx_y in range(npix): rho[0, idx_y] = a*rho_dot[0, idx_y] for idx_x in range(1, npix): for idx_y in range(npix): rho[idx_x, idx_y] = (1.0-beta)*rho[idx_x-1, idx_y] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-beta)*rho_dot[idx_x-1, idx_y] if idx_y != npix-1: rho[idx_x, idx_y] += beta*rho[idx_x-1, idx_y+1] rho[idx_x, idx_y] += a*beta*rho_dot[idx_x-1, idx_y+1] elif i_prime == -1 and j_prime == 1: for idx_y in range(npix): rho[-1, idx_y] = a*rho_dot[-1, idx_y] for idx_x in reversed(range(0, npix-1)): for idx_y in range(npix): rho[idx_x, idx_y] = (1.0-beta)*rho[idx_x+1, idx_y] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-beta)*rho_dot[idx_x+1, idx_y] if idx_y != 0: rho[idx_x, idx_y] += beta*rho[idx_x+1, idx_y-1] rho[idx_x, idx_y] += a*beta*rho_dot[idx_x+1, idx_y-1] elif i_prime == -1 and j_prime == -1: for idx_y in range(npix): rho[-1, idx_y] = a*rho_dot[-1, idx_y] for idx_x in reversed(range(0, npix-1)): for idx_y in range(npix): rho[idx_x, idx_y] = (1.0-beta)*rho[idx_x+1, idx_y] rho[idx_x, idx_y] += a*rho_dot[idx_x, idx_y] rho[idx_x, idx_y] += a*(1.0-beta)*rho_dot[idx_x+1, idx_y] if idx_y != npix-1: rho[idx_x, idx_y] += beta*rho[idx_x+1, idx_y+1] rho[idx_x, idx_y] += a*beta*rho_dot[idx_x+1, idx_y+1] return rho.real