Source code for xopto.mcbase.mcutil.lut

# -*- coding: utf-8 -*-
################################ Begin license #################################
# Copyright (C) Laboratory of Imaging technologies,
#               Faculty of Electrical Engineering,
#               University of Ljubljana.
#
# This file is part of PyXOpto.
#
# PyXOpto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyXOpto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyXOpto. If not, see <https://www.gnu.org/licenses/>.
################################# End license ##################################

from typing import List, Tuple

import numpy as np
from scipy.interpolate import interp1d
from scipy.integrate import quad

from xopto.mcbase import mcobject
from xopto.mcbase import cltypes


[docs]class LinearLut(mcobject.McObject):
[docs] @staticmethod def cl_type(mc: mcobject.McObject) -> cltypes.Structure: ''' Create a Structure type that represents LinearLut in the OpenCL kernel. Parameters ---------- mc: mcobject.McObject Simulator instance. Returns ------- struct: Structure OpenCL structure type that is used to represents a LinearLut instance in the OpenCL kernel. The returned structure type implements the following fields: - first: mc_fp_t The value of the independent variable for the first element in the lookup table. - inv_span: mc_fp_t Inverse of the span of the independent variable (the span is defined as the difference between the value of the independent variable of the last and first element in the lookup table). - n: mc_size_t The number of elements in the lookup table. - offset: mc_size_t Offset/index of the first element in the lookup table from the start of the linear array that holds data of all the lookup tables. ''' T = mc.types class ClLinearLut(cltypes.Structure): _fields_ = [ ('first', T.mc_fp_t), ('inv_span', T.mc_fp_t), ('n', T.mc_size_t), ('offset', T.mc_size_t) ] return ClLinearLut
[docs] @staticmethod def fromfile(filename: str) -> 'LinearLut': ''' Load LinearLut instance from a npz file. Parameters ---------- filename: str Data file. Returns ------- lut: LinearLut A new LinearLut instance created from the file. ''' data = np.load(filename) return LinearLut(data['lut_data'], data['first'], data['last'])
def __init__(self, lut_data: np.ndarray or 'LinearLut', first: float = 0.0, last: float = 1.0): ''' Creates a linear lookup table instance. Samples in the lookup table are uniformly spread between the positions of the first and last lookup table element that are passed to the constructor. Parameters ---------- lut_data: LinearLut, np.ndarray vector or str If a numpy array, lookup table data as an array or array-like object. If a str, a filename from which to load the instance. If a LinearLut, a weak copy is created. first: float Position of the first element/entry in the lookup table. last: float Position of the last element/entry in the lookup table. ''' if isinstance(lut_data, LinearLut): lut = lut_data self._first = lut.first self._last = lut.last self._lut_data = np.copy(lut.data) elif isinstance(lut_data, str): np_data = np.load(lut_data) self._first = np_data['first'] self._last = np_data['last'] self._lut_data = np_data['lut_data'] else: self._first = float(first) self._last = float(last) self._lut_data = np.asarray(lut_data, dtype=np.float64) def _get_first(self) -> float: return self._first first = property( _get_first, None, None, 'Position of the first point in the lookup table.') def _get_last(self) -> float: return self._last last = property( _get_last, None, None, 'Position of the last point in the lookup table.') def _get_span(self) -> Tuple[float, float]: return self._last - self._first span = property( _get_span, None, None, 'Distance between the positions of the first and last point in the ' 'lookup table.') def _get_lut_data(self) -> np.ndarray: return self._lut_data data = property( _get_lut_data, None, None, 'Lookup table data as a flat numpy array.') def __call__(self, x: np.ndarray) -> np.ndarray: n = self._lut_data.size fp_ind = (np.asarray(x) - self._first)/(self._last - self._first)*(n - 1) ind_1 = np.clip(np.floor(fp_ind), 0, n - 1).astype(np.intp) ind_2 = np.clip(ind_1 + 1, 0, n - 1) w = fp_ind - ind_1 return (1.0 - w)*self._lut_data[ind_1] + w*self._lut_data[ind_2]
[docs] def cl_pack(self, mc: mcobject.McObject, target: cltypes.Structure = None) \ -> cltypes.Structure: ''' Fills the ctypes structure (target) with the data required by the Monte Carlo simulator. Parameters ---------- mc: McObject Simulator instance. target: cltypes.Struct Ctypes structure to be filled with the lut parameters. Returns ------- lut: cltypes.Struct Filled ctypes structured received as an input arguments. ''' if target is None: target_type = self.cl_type(mc) target = target_type() lut_entry = mc.append_r_lut(self._lut_data) target.offset = lut_entry.offset target.first = self.first target.inv_span = 1.0/(self.last - self.first) target.n = self.data.size return target
[docs] def todict(self, np2list: bool = True) -> dict: ''' Save the lookup table configuration and data to a dictionary. Parameters ---------- np2list: bool Convert numpy data array (lookup table) to a python list. Returns ------- data: dict Linear lookup table configuration as a dictionary. ''' return { 'type': 'LinearLut', 'first': self._first, 'last': self._last, 'lut_data': self._lut_data.tolist() if np2list else self._lut_data }
[docs] @staticmethod def fromdict(data: dict) -> 'LinearLut': ''' Create a linear lookup table instance from a dictionary. Parameters ---------- data: dict Dictionary created by the :py:meth:`LinearLut.todict` method. ''' rt_type = data.pop('type') if rt_type != 'LinearLut': raise TypeError('Expected "LinearLut" type bot got "{}"!'.\ format(rt_type)) return LinearLut(**data)
[docs] def save(self, filename: str): ''' Save instance to a compressed numpy file with extension .npz. ''' np.savez_compressed(filename, **self.todict(np2list=False))
[docs]class EmissionLut(LinearLut): def __init__(self, radiance: np.ndarray, costheta: np.ndarray=None, n: int=2000, npts: int=10000, meth: str='simps'): ''' Constructs a lookup table for numerical sampling of the emission angles of a source with the given angular emission characteristics. Azimuthal symmetry of the emission characteristics is assumed. The constructed lookup table should be sampled by a uniform random variable :math:`r` from [0, 1] and linear interpolation that will yield the emission angle cosine (costheta): .. math: f_{i} = r*n i_1 = int(fp_index) i_2 = min(i_1 + 1, n - 1) \\delta = f_i - i_1 costheta = lut[i_1]*(1.0 - \\delta) + lut[i_2 + 1]*\\delta Parameters ---------- radiance: np.ndarray, EmissionLut or str Relative source radiance as a function of the angle of incidence cosines given in the costheta array. If the value is an instance of EmissionLut, a new weak copy of the instance is created. If the value is a str, an instance is loaded from the file represented by the string. costheta: np.ndarray Cosines of the angle of incidence at which the values in the radiance array are defined. If None, a uniformly distributed vector of values from 0 to 1 is used. n: int Size of the lookup table that can be used to numerically sample the emission angles. npts: int Number of uniformly distributed control points from min(costheta) to max(costheta) used for simpson approximation of the radiance CDF. meth: str Method that is used to compute the radiance CDF. Use "simps" for Simpson or "quad" for adaptive-step integration. ''' lut_random = np.linspace(0.0, 1.0, n) if isinstance(radiance, (str, EmissionLut)): super().__init__(radiance) else: radiance = np.asarray(radiance, dtype=np.float64) if costheta is None: costheta = np.linspace(0.0, 1.0, radiance.size) else: costheta = np.asarray(costheta, dtype=np.float64) if radiance.size != costheta.size: raise ValueError( 'The sizes of the radiance and costheta array must be equal!') radiance_f = interp1d(costheta, radiance*np.sqrt(1.0 - costheta**2)) ct_range = costheta.min(), costheta.max() if meth == 'simps': npts = max(int((max(2*n, npts)//2))*2 + 1, 3) ct = np.linspace(ct_range[0], ct_range[1], max(n, npts)) cdf = np.zeros([int(ct.size//2) + 1]) radiance_ct = radiance_f(ct) dx = (ct[-1] - ct[0])/(ct.size - 1) cdf[1:] = dx/3.0*( radiance_ct[:-2:2] + 4.0*radiance_ct[1:-1:2] + radiance_ct[2::2]) cdf = cdf.cumsum() cdf /= cdf[-1] lut = interp1d(cdf, ct[::2])(lut_random) elif meth == 'quad': ct = np.linspace(ct_range[0], ct_range[1], max(n, npts)) cdf = np.zeros_like(ct) for index, ct_item in enumerate(ct): cdf[index] = quad(radiance_f, ct_range[0], ct_item)[0] cdf /= cdf[-1] lut = interp1d(cdf, ct)(lut_random) super().__init__(lut, 0.0, 1.0)
[docs]class CollectionLut(LinearLut): def __init__(self, sensitivity: np.ndarray, costheta: np.ndarray=None, n: int = 1000): ''' A lookup table of detector sensitivity (collection efficiency) as a function of the angle of incidence. Azimuthal symmetry of the collection sensitivity is assumed. Parameters ---------- sensitivity: np.ndarray or str Detector sensitivity (efficiency) as a function of the angle of incidence. If the value is a str, an instance is loaded from the file represented by the string. costheta: np.ndarray Angle of incidence cosines at which the detector sensitivities are defined. If None, a vector of uniformly distributed values from 0 to 1 is used. n: int The size of lookup table that will be used in Monte Carlo simulations. ''' if isinstance(sensitivity, (str, CollectionLut)): super().__init__(sensitivity) else: sensitivity = np.asarray(sensitivity, dtype=np.float64) if costheta is None: costheta = np.linspace(0.0, 1.0, sensitivity.size) else: costheta = np.asarray(costheta) ct = np.linspace(costheta.min(), costheta.max(), n) lut = interp1d(costheta, sensitivity)(ct) first, last = ct[0], ct[-1] super().__init__(lut, first, last)
[docs]class LutEntry: def __init__(self, manager: 'LutManager', data: np.ndarray, offset: int): ''' Lookup table entry. Parameters ---------- manager: LutManager Manager of this lookup table entry. data: np.ndarray Lookup table data as a numpy array. offset: int Offset of the first element of this lookup table in the common lookup table buffer. ''' self._manager = manager self._data = data self._offset = int(offset) def _get_offset(self) -> int: return self._offset offset = property(_get_offset, None, None, 'Offset of the first element of this lookup table ' 'entry from the beginning of the data buffer.') def _get_manager(self) -> 'LutManager': return self._manager manager = property(_get_manager, None, None, 'Lookup table manager that manages this entry.') def _get_data(self) -> np.ndarray: return self._data data = property(_get_manager, None, None, 'Data array of the entry.') def __eq__(self, other): if isinstance(other, LutEntry): return id(self.data) == id(other.data) else: return id(self.data) == id(other) def __ne__(self, other): return not self.__eq__(other)
[docs]class LutManager: ''' Manages the data of multiple lookup tables. The lookup tables can be easily packed into a single contiguous array that is passed to an OpenCL kernel. ''' def __init__(self, dtype): ''' Creates a new instance of the lookup table manager. Parameters ---------- dtype: np.dtype Data type of the array that will contain all the managed lookup table arrays. ''' self._data = [] self._offsets = {} self._size = 0 self._dtype = np.dtype(dtype) def _get_dtype(self) -> np.dtype: return self._dtype dtype = property(_get_dtype, None, None, 'Lookup table data type.')
[docs] def append(self, lut: np.ndarray, force: bool = False) \ -> Tuple[LutEntry, bool]: ''' Appends a new lookup table to the list of managed lookup tables and returns the offset from the first element of the contiguous array of all managed lookup tables. A lookup table is appended only if an existing lookup table with the same values is not found in the list of managed lookup tables. Parameters ---------- lut: np.ndarray vector A new lookup table to append to the list of managed lookup tables. The new lookup table is appended only if the same lookup table i not found in the list of existing lookup tables. force: bool If True, the lookup table is appended without comparing its content to the list of existing managed lookup tables. This improves the performance but can grow the total size of the contiguous data array due to duplicate lookup tables in the managed list. Returns ------- lut: LutEntry Lookup table entry representing the given data. added: bool True if a lookup table entry with the same data was not found among the existing managed lookup table entries. If the value of the force argument is True, this value is always returned as True. ''' offset = 0 existing_lut_found = False if not force: for existing_lut in self._data: if np.array_equal(existing_lut, lut): existing_lut_found = True break offset += existing_lut.size else: offset = self._size if not existing_lut_found: self._data.append(lut) self._size += lut.size lut_id = id(lut) if lut_id not in self._offsets: self._offsets[lut_id] = offset return LutEntry(manager=self, data=lut, offset=offset), not existing_lut_found
def __contains__(self, lut: np.ndarray or LutEntry) -> bool: ''' Returns True, if the lookup table has been already added to the managed list of lookup tables. Note that the Python id is used to identify lookup table object and not the lookup table content which is used wen appending a new lookup table. Parameters ---------- lut: np.ndarray Lookup table. Returns ------- contains: bool Returns True, if the lookup table has been already added to the object. ''' if isinstance(lut, LutEntry): return id(lut.data) in self._offsets else: return id(lut) in self._offsets
[docs] def offset(self, lut: np.ndarray or LutEntry) -> int: ''' Get the offset of the first element in the lookup table. Parameters ---------- lut: np.ndarray Lookup table array as passed to the append method. Returns ------- offset:int Offset of the first element or None if the lookup table is not found. ''' if isinstance(lut, LutEntry): return self._offsets.get(id(lut.data)) else: return self._offsets.get(id(lut))
[docs] def clear(self): ''' Clear the lookup table manager - remove all the managed data entries. ''' self._data = [] self._offsets = {} self._size = 0
def __len__(self): return self._size
[docs] def pack_into(self, target: np.ndarray = None, dtype: np.dtype = None) \ -> np.ndarray: ''' Move the lookup tables into a contiguous numpy array. If target is None, a new target array is created using the data type specified in dtype. If the target array is too small to fit all the data, a new target of the same data type is allocated. Parameters ---------- target: np.ndarray A contiguous numpy array that will hold the lookup tables on return. One of the input arguments target and dtype must not be None! dtype: np.dtype This data type overloads the data type passed to the constructor. Returns ------- data: np.ndarray A contiguous numpy array with all the lookup tables. ''' if dtype is None: dtype = self.dtype if target is None or target.dtype != dtype or target.size < self._size: target = np.empty((self._size,), dtype=dtype) offset = 0 for item in self._data: target[offset:offset + item.size] = item offset += item.size return target
[docs]class LutManagers: ''' Manages multiple lookup table managers of type :py:class:`LutManager`. ''' def __init__(self): self._lut_managers = {}
[docs] def manager(self, dtype: np.dtype) -> LutManager: ''' Return lookup table manager for the given data type. Lookup table managers are created on the first demand/use. Parameters ---------- dtype: np.dtype Numpy data type of the lookup table manager. Returns ------- allocator: LutManager Lookup table manager. Note ---- Lookup table managers can be retrieved by the [] operator that takes the numpy data type of the lookup table manager. ''' dtype = np.dtype(dtype) manager = self._lut_managers.get(dtype) if manager is None: self._lut_managers[dtype] = manager = LutManager(dtype) return manager
[docs] def clear(self): ''' Clear the data (managed data arrays) of all the managers. ''' for _, manager in self._lut_managers.items(): manager.clear()
def __getitem__(self, dtype: np.dtype) -> LutManager: return self.manager(dtype) def __str_(self): return 'LutManagers()' def __repr__(self): return '{} #{}'.format(self.__str__(), id(self))
[docs]class RestrictedLutManagers(LutManagers): def __init__(self, dtypes: List[np.dtype] or Tuple[np.dtype]): ''' Manages multiple lookup table managers of type :py:class:`LutManager`. The range of data types is restricted to list of dtypes. Parameters ---------- dtypes: list or tuple List of data types as numpy.dtype that will be supported by the instance. ''' dtypes = [np.dtype(item) for item in dtypes] self._dtypes = tuple(dtypes) if len(self._dtypes) != len(set(self._dtypes)): raise ValueError('Duplicate data types are not allowed in ' 'restricted lookup table managers.') super().__init__()
[docs] def manager(self, dtype: np.dtype) -> LutManager: ''' Return lookup table manager for the given data type. Lookup table managers are created on the first demand/use. Parameters ---------- dtype: np.dtype Numpy data type of the lookup table manager. Returns ------- allocator: LutManager Lookup table manager. Note ---- Lookup table managers can be retrieved by the [] operator that takes the numpy data type of the lookup table manager. ''' dtype = np.dtype(dtype) if dtype not in self._dtypes: raise TypeError( 'Lookup table manager for the given data type is not available!') return super().manager(dtype) dtype = np.dtype(dtype)
def __getitem__(self, dtype: np.dtype) -> LutManager: return self.manager(dtype) def __str_(self): return 'RestrictedLutManagers()' def __repr__(self): return '{} #{}'.format(self.__str__(), id(self))