#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: Benjamin Vial# This file is part of nannos# License: GPLv3# See the documentation at nannos.gitlab.io__all__=["adaptive_sampler"]from.importnumpyasnpfrom.parallelimportparloop
[docs]defadaptive_sampler(f,x,max_bend=10,max_x_rel=1e-3,max_df=0.05,n_jobs=1):"""Adaptive sampler. Parameters ---------- f : function The function to sample. The first return value should be real scalar that would be used as a metric for adaptive sampling. x : _type_ _description_ max_bend : float, optional Maximum bend angle of the normalized angle between adjacent segments. For angles larger than the maximum bend angle, one of the adjacent intervals is subdivided, by default 10 max_x_rel : float, optional The relative space (relative to the sampling interval size) below which subdivision will not occur, by default 1e-3 max_df : float, optional The threshold below which the difference between adjacent result values will not cause an interval to be subdivided, by default 0.05 n_jobs : int, optional Number of parallel jobs, by default 1 Returns ------- tuple The sampling points and sampled values """ifn_jobs>1:@parloop(n_jobs=n_jobs)def_function_adapted(x):returnf(x)else:def_function_adapted(x):return[f(_x)for_xinx]cmax=np.cos(max_bend*np.pi/180)defget_new(x,y):x=np.sort(x)x_min=np.min(x)x_max=np.max(x)y_min=np.min(y)y_max=np.max(y)new_x=[]foriinrange(len(x)-2):x_tmp=x[i:i+3]y_tmp=y[i:i+3]xp,x0,xn=x_tmpyp,y0,yn=y_tmpmin_dx=max_x_rel*(x_max-x_min)min_dy=max_df*(y_max-y_min)ref_x=xn-x0<min_dxandx0-xp<min_dxref_y=abs(y0-yp)<min_dyandabs(yn-y0)<min_dylocal_y_min=np.min(y_tmp)local_y_max=np.max(y_tmp)dx0=(x0-xp)/(xn-xp)dx1=(xn-x0)/(xn-xp)dy0=(y0-yp)/(local_y_max-local_y_min)dy1=(yn-y0)/(local_y_max-local_y_min)bend=(dx0*dx1+dy0*dy1)/np.sqrt((dx0*dx0+dy0*dy0)*(dx1*dx1+dy1*dy1))bending=(bend)<cmaxordx1>3*dx0ordx0>3*dx1refine=bendingandnotref_xandnotref_yifrefine:seg=[]ifx0-xp<min_dx:isegment=1seg.append(isegment)ifxn-x0<min_dx:isegment=0seg.append(isegment)isegment=0ifx0-xp>xn-x0else1seg.append(isegment)seg=np.unique(seg)forisegmentinseg:x_new=0.5*sum(x_tmp[isegment:isegment+2])new_x.append(x_new)returnnp.unique(new_x)y=_function_adapted(x)ifhasattr(y[0],"__len__")andlen(y[0])>0:y_monitor=[_[0]for_iny]multi_output=Trueelse:multi_output=Falsey_monitor=y.copy()whileTrue:old_x=x.tolist()new_x=get_new(x,y_monitor)ifmulti_outputelseget_new(x,y)iflen(new_x)==0:breakx=np.hstack([x,new_x])x,iu=np.unique(x,return_index=True)q=[_xfor_xinxif_xnotinold_x]new_y=_function_adapted(q)y=np.vstack([y,new_y])ifmulti_outputelsenp.hstack([y,new_y])y=y[iu]ifmulti_output:new_y_monitor=[_[0]for_innew_y]y_monitor=np.hstack([y_monitor,new_y_monitor])y_monitor=y_monitor[iu]returnx,y