Source code for nannos.sample

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Benjamin Vial
# This file is part of nannos
# License: GPLv3
# See the documentation at nannos.gitlab.io


__all__ = ["adaptive_sampler"]

from . import numpy as np
from .parallel import parloop


[docs]def adaptive_sampler(f, x, max_bend=10, max_x_rel=1e-3, max_df=0.05, n_jobs=1): """Adaptive sampler. Parameters ---------- f : function The function to sample. The first return value should be real scalar that would be used as a metric for adaptive sampling. x : _type_ _description_ max_bend : float, optional Maximum bend angle of the normalized angle between adjacent segments. For angles larger than the maximum bend angle, one of the adjacent intervals is subdivided, by default 10 max_x_rel : float, optional The relative space (relative to the sampling interval size) below which subdivision will not occur, by default 1e-3 max_df : float, optional The threshold below which the difference between adjacent result values will not cause an interval to be subdivided, by default 0.05 n_jobs : int, optional Number of parallel jobs, by default 1 Returns ------- tuple The sampling points and sampled values """ if n_jobs > 1: @parloop(n_jobs=n_jobs) def _function_adapted(x): return f(x) else: def _function_adapted(x): return [f(_x) for _x in x] cmax = np.cos(max_bend * np.pi / 180) def get_new(x, y): x = np.sort(x) x_min = np.min(x) x_max = np.max(x) y_min = np.min(y) y_max = np.max(y) new_x = [] for i in range(len(x) - 2): x_tmp = x[i : i + 3] y_tmp = y[i : i + 3] xp, x0, xn = x_tmp yp, y0, yn = y_tmp min_dx = max_x_rel * (x_max - x_min) min_dy = max_df * (y_max - y_min) ref_x = xn - x0 < min_dx and x0 - xp < min_dx ref_y = abs(y0 - yp) < min_dy and abs(yn - y0) < min_dy local_y_min = np.min(y_tmp) local_y_max = np.max(y_tmp) dx0 = (x0 - xp) / (xn - xp) dx1 = (xn - x0) / (xn - xp) dy0 = (y0 - yp) / (local_y_max - local_y_min) dy1 = (yn - y0) / (local_y_max - local_y_min) bend = (dx0 * dx1 + dy0 * dy1) / np.sqrt( (dx0 * dx0 + dy0 * dy0) * (dx1 * dx1 + dy1 * dy1) ) bending = (bend) < cmax or dx1 > 3 * dx0 or dx0 > 3 * dx1 refine = bending and not ref_x and not ref_y if refine: seg = [] if x0 - xp < min_dx: isegment = 1 seg.append(isegment) if xn - x0 < min_dx: isegment = 0 seg.append(isegment) isegment = 0 if x0 - xp > xn - x0 else 1 seg.append(isegment) seg = np.unique(seg) for isegment in seg: x_new = 0.5 * sum(x_tmp[isegment : isegment + 2]) new_x.append(x_new) return np.unique(new_x) y = _function_adapted(x) if hasattr(y[0], "__len__") and len(y[0]) > 0: y_monitor = [_[0] for _ in y] multi_output = True else: multi_output = False y_monitor = y.copy() while True: old_x = x.tolist() new_x = get_new(x, y_monitor) if multi_output else get_new(x, y) if len(new_x) == 0: break x = np.hstack([x, new_x]) x, iu = np.unique(x, return_index=True) q = [_x for _x in x if _x not in old_x] new_y = _function_adapted(q) y = np.vstack([y, new_y]) if multi_output else np.hstack([y, new_y]) y = y[iu] if multi_output: new_y_monitor = [_[0] for _ in new_y] y_monitor = np.hstack([y_monitor, new_y_monitor]) y_monitor = y_monitor[iu] return x, y