# -*- coding: utf-8 -*-
################################ Begin license #################################
# Copyright (C) Laboratory of Imaging technologies,
# Faculty of Electrical Engineering,
# University of Ljubljana.
#
# This file is part of PyXOpto.
#
# PyXOpto is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PyXOpto is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PyXOpto. If not, see <https://www.gnu.org/licenses/>.
################################# End license ##################################
import os.path
import argparse
import matplotlib as mpl
from matplotlib.animation import FuncAnimation
import numpy as np
from xopto.util.animation.common import create_frame_animation, \
create_path_animation
from xopto.mcml import mc
from xopto.cl import clinfo
[docs]def create_sv_animation_xz(
data_source: str or dict, filename: str = None,
overwrite=False, logscale=False,
fps: float = None, duration: float = None,
axis_off: bool = False, title: str = None,
autoscale: bool = True, writer: str or mpl.animation.MovieWriter = None,
verbose: bool = False) -> FuncAnimation:
'''
Creates a continuous animation of the sampling volume projected
onto the x-z plane.
Parameters
----------
data_source: str or dict
File with the simulation results or simulation results as returned by
:py:func:`sv_fiber_incidence`.
filename: str
Output file for the animation.
overwrite: bool
If False, existing animation files will not be overwritten.
logscale: bool
If True, logarithmically scale the sampling volume data.
fps: float
Frame rate of the animation (1/s). Another way to control the frame
rate is specify the duration parameter.
duration: float
Duration of one full animation (s). The same effect can be achieved
with the fps parameter.
axis_off: bool
Turns off the axis and related labels.
title: str
Plot title.
autoscale: bool
Independently autoscale the intensity of each frame.
writer: str or mpl.animation.MovieWriter
Movie writer.
verbose: bool
Turns on verbose progress report.
Returns
-------
animation: FuncAnimation
Animation instance
'''
if isinstance(data_source, str):
data = np.load(data_source, allow_pickle=True)
elif isinstance(data_source, dict):
data = data_source
else:
raise TypeError('Data source must be a dict or a filename!')
if filename is None and isinstance(data_source, str):
filename = os.path.splitext(data_source)[0] + '.gif'
if filename is not None and os.path.isfile(filename) and not overwrite:
if verbose:
print('Animation file "{}" already exists!'.format(filename))
print('Animation will not be saved!')
filename = None
sv_dict = data['sv']
if not isinstance(sv_dict, dict):
sv_dict = sv_dict.item()
sv = mc.mcsv.SamplingVolume.fromdict(sv_dict)
x_range = sv.xaxis.edges[0]*1e3, sv.xaxis.edges[-1]*1e3
z_range = sv.zaxis.edges[-1]*1e3, sv.zaxis.edges[0]*1e3
extent = [x_range[0], x_range[1], z_range[0], z_range[1]]
data = np.array(data['sv_data'])
if data.ndim > 3:
data = data.mean(2)
frames = np.vstack([data, data[-2::-1]])
return create_frame_animation(
frames, filename, overwrite=overwrite, logscale=logscale,
extent=extent, xrange=x_range, yrange=z_range, title=title,
xlabel='$x$ (mm)', ylabel='$y$ (mm)', axis_off=axis_off,
autoscale=autoscale, fps=fps, duration=duration,
writer=writer, verbose=verbose)
[docs]def create_path_animation_xz(
data_source: str or dict, filename: str = None,
overwrite=False, fps: float = None,
duration: float = None, logscale: bool = False,
axis_off: bool = False, title: str = None,
autoscale: bool = True, writer: str or mpl.animation.MovieWriter = None,
verbose: bool = False) -> FuncAnimation:
'''
Creates a continuous animation of the packet paths that are projected
onto the x-z plane.
Parameters
----------
data_source: str or dict
File with the simulation results or simulation results as returned by
:py:func:`sv_fiber_incidence`.
filename: str
Output file for the animation. Use an empty string for no title.
overwrite: bool
If False, existing animation files will not be overwritten.
fps: float
Frame rate of the animation (1/s). Another way to control the frame
rate is specify the duration parameter.
duration: float
Duration of one full animation (s). The same effect can be achieved
with the fps parameter.
axis_off: bool
Turns off the axis and related labels.
title: str
Plot title.
logscale: bool
Not used by this animation.
autoscale: bool
Not used by this animation.
writer: str or mpl.animation.MovieWriter
Movie writer.
verbose: bool
Turns on verbose progress report.
Returns
-------
animation: AnimationFunc
Animation instance.
'''
if isinstance(data_source, str):
data = np.load(data_source, allow_pickle=True)
elif isinstance(data_source, dict):
data = data_source
else:
raise TypeError('Data source must be a dict or a filename!')
if filename is None and isinstance(data_source, str):
filename = os.path.splitext(data_source)[0] + '.gif'
if filename is not None and os.path.isfile(filename) and not overwrite:
if verbose:
print('Animation file "{}" already exists!'.format(filename))
print('Animation will not be saved!')
filename = None
sv_dict = data['sv']
if not isinstance(sv_dict, dict):
sv_dict = sv_dict.item()
sv = mc.mcsv.SamplingVolume.fromdict(sv_dict)
x_range = sv.xaxis.edges[0]*1e3, sv.xaxis.edges[-1]*1e3
z_range = sv.zaxis.edges[-1]*1e3, sv.zaxis.edges[0]*1e3
trace_data = data['trace_data']
trace_n= data['trace_n']
num_packets, trace_len_max = trace_data.shape
num_frames = trace_n.max()
# fill the remaining space in the trace with the terminal location
for packet_index in range(num_packets):
entries = trace_n[packet_index]
for coordinate in ('x', 'y', 'z'):
trace_data[packet_index][coordinate][entries:] = \
trace_data[packet_index][coordinate][entries - 1]
return create_path_animation(
trace_data[:]['x']*1e3, trace_data[:]['z']*1e3, filename,
xrange=x_range, yrange=z_range,
title=title, xlabel='$x$ (mm)', ylabel='$z$ (mm)',
axis_off=axis_off, fps=fps, duration=duration,
writer=writer, verbose=verbose)
[docs]def mc_sv_fiber_cli(description: str = None,
parser: argparse.ArgumentParser = None):
'''
Collects command line arguments and returns a dict of values.
Parameters
----------
description: str
Command line description.
parser: argparse.ArgumentParser
Externally created argument parser can be used to add custom command
line arguments.
Returns
-------
args: argparse.Namespace
Parsed command line arguments. If the parser was passed
as an input argument, use the returned namespace to extract
any custom parameters.
arguments: dict
Returns the collected arguments as a dict with the following keys:
- num_packets: int
Minimum number of packets that should reach the detector.
- batch_packets:
int Number of packets to launch in a single simulation run.
- filename: str
File for the simulation results.
- overwrite: bool
Overwrite existing file.
- verbose: bool
Enables verbose reports.
- animation: bool
Creates animation after completing the sampling volume simulations.
- fps: float
Frame rate (1/s) to use in the sampling volume animation.
- autoscale: bool
Independently autoscale the intensity of each frame.
- opencl_device: str
OpenCL device to use for the simulations.
- opencl_index: int
Index (zero-based) of the OpenCL device to use for the simulations.
'''
if description is None:
description = 'Sampling volume simulation'
parser = argparse.ArgumentParser(description='description')
parser.add_argument(
'-n', '--num-packets', dest='sv_num_packets', default=1000, type=int,
help='Minimum number of packets reaching the detector.')
parser.add_argument(
'-p', '--projection', dest='sv_projection', action='store_true',
help='Reduce the number of sampling volume accumulators '
'along the y axis to 1.')
parser.add_argument(
'-b', '--batch-packets', dest='batch_packets', default=1000000, type=int,
help='Number of photon packets to launch in a single Monte Carlo '
'simulation.')
parser.add_argument(
'-f', '--file', dest='filename', default='', type=str,
help='File for the simulation results. Note that the extension .npz '
'is addedd automatically.')
parser.add_argument(
'-o', '--overwrite', action='store_true',
help='Set this flag to force overwrite of existing files.')
parser.add_argument(
'-v', '--verbose', action='store_true',
help='Enables verbose progress report.')
parser.add_argument(
'-a', '--animation', action='store_true',
help='Create animation after completing the Monte Carlo simulations.')
parser.add_argument(
'--title', dest='title', type=str, default='',
help='Animation plot title.')
parser.add_argument(
'--axis-off', dest='axis_off', action='store_true',
help='Turn off animation plot axis and related labels.')
parser.add_argument(
'--fps', dest='fps', type=float, default=0.0,
help='Frame rate of the animation. Alternatively, use the duration '
'parameter to set the FPS.')
parser.add_argument(
'--duration', dest='duration', type=float, default=0.0,
help='Duration (s) of one full animation. Alternatively, use the fps '
'parameter to set the duration of the animation.')
parser.add_argument(
'-l', '--logscale', dest='logscale', action='store_true',
help='Log scale intensities in animations.')
parser.add_argument(
'--no-autoscale', dest='no_autoscale', action='store_true',
help='Disable independent automatic intensity scaling of the '
'individual frames in the animation.')
parser.add_argument(
'-d', '--device', dest='opencl_device', default='', type=str,
help='OpenCL device to use for the simulations.')
parser.add_argument(
'-i', '--index', dest='opencl_index', default=0, type=int,
help='OpenCL device index to use for the simulations.')
parser.add_argument(
'--sample-g', dest='sample_g', default=-1.0, type=float,
help='Anisotropy of the sample. Use a negative value of less than '
'-1 for default.')
parser.add_argument(
'--sample-mua', dest='sample_mua', default=-1.0, type=float,
help='Absorption coefficient of the sample (1/m). '
'Use a negative value for default.')
parser.add_argument(
'--sample-mus', dest='sample_mus', default=-1.0, type=float,
help='Scattering coefficient of the sample (1/m). '
'Use a negative value for default.')
parser.add_argument(
'--sample-n', dest='sample_n', default=-1.0, type=float,
help='Refractive index of the sample. '
'Use a negative value for default.')
parser.add_argument(
'--fiber-sds', dest='fiber_sds', default=-1.0, type=float,
help='Source detector separation (m).'
'Use a negative value for default.')
args = parser.parse_args()
filename = args.filename
if not filename:
filename = None
sv_num_packets = max(1, args.sv_num_packets)
batch_packets = max(1, args.batch_packets)
fps = args.fps
if fps <= 0.0:
fps = None
duration = args.duration
if duration <= 0.0:
duration = None
title = args.title
axis_off = args.axis_off
sample_mua = args.sample_mua
if sample_mua < 0.0:
sample_mua = None
sample_mus = args.sample_mus
if sample_mus < 0.0:
sample_mus = None
sample_g = args.sample_g
if sample_g <= -1.0:
sample_g = None
sample_n = args.sample_n
if sample_n <= 0.0:
sample_n = None
fiber_sds = args.fiber_sds
if fiber_sds < 0.0:
fiber_sds = None
return {'filename': filename, 'batch_packets': batch_packets,
'sv_num_packets': sv_num_packets, 'overwrite': args.overwrite,
'sv_projection': args.sv_projection,
'verbose': args.verbose, 'animation': args.animation,
'fps': fps, 'duration': duration,
'autoscale': not args.no_autoscale,
'title': title, 'axis_off': axis_off,
'opencl_device': args.opencl_device,
'opencl_index': args.opencl_index, 'logscale': args.logscale,
'sample_mua': sample_mua, 'sample_mus': sample_mus,
'sample_g': sample_g, 'sample_n': sample_n,
'fiber_sds': fiber_sds}, args
[docs]def main(sim_task: callable, ani_task: callable, config: dict):
'''
Code for the __main__ section of the sampling volume simulation modules.
Parameters
----------
sim_task: callable
A function that runs the sampling volume simulations.
ani_task: callable
A function that runs/prepares the animations from the simulation
results.
config: dict
Sampling volume simulation configuration as a dict. Note that the
entries of this configuration dict can change during the call.
'''
kwargs, parsed_args = mc_sv_fiber_cli(
'Sampling volume for optical fiber transmittance')
if kwargs['verbose']:
print('Arguments returned by the command line interface:')
print(kwargs)
if kwargs['batch_packets'] is not None:
config['batch_packets'] = kwargs['batch_packets']
if kwargs['sv_num_packets'] is not None:
config['sv_num_packets'] = kwargs['sv_num_packets']
if kwargs['sv_projection'] and 'sv' in config:
config['sv']['yaxis']['n'] = 1
if kwargs['sample_g'] is not None:
config['sample']['g'] = kwargs['sample_g']
if kwargs['sample_mua'] is not None:
config['sample']['mua'] = kwargs['sample_mua']
if kwargs['sample_mus'] is not None:
config['sample']['mus'] = kwargs['sample_mus']
if kwargs['sample_n'] is not None:
config['sample']['n'] = kwargs['sample_n']
if kwargs['fiber_sds'] is not None and 'fiber_sds' in config:
if isinstance(config['fiber_sds'], (float, int)):
config['fiber_sds'] = kwargs['fibers_sds']
elif isinstance(config['fiber_sds'], (np.ndarray, tuple, list)):
config['fiber_sds'] = np.linspace(
config['fiber_sds'][0],
kwargs['fiber_sds'],
len(config['fiber_sds'])
)
if kwargs['opencl_device']:
cl_device = clinfo.device(
kwargs['opencl_device'], index=kwargs['opencl_index'])
else:
cl_device = clinfo.gpus()[kwargs['opencl_index']]
try:
result = sim_task(
kwargs['filename'], config, cl_device=cl_device,
overwrite=kwargs['overwrite'], verbose=kwargs['verbose'])
except FileExistsError:
if kwargs['verbose']:
print('Existing file with simulation results found!')
print('Skipping simulations!')
pass
kwargs['animation'] = True
if kwargs['animation']:
filename = kwargs['filename']
if kwargs['filename'] is None:
filename = result
ani_task(
filename, overwrite=kwargs['overwrite'],
fps=kwargs['fps'], duration=kwargs['duration'],
title=kwargs['title'], axis_off=kwargs['axis_off'],
logscale=kwargs['logscale'], autoscale=kwargs['autoscale'],
verbose=kwargs['verbose'])