XGCa
load_balance.hpp
Go to the documentation of this file.
1 #ifndef LOAD_BALANCE_HPP
2 #define LOAD_BALANCE_HPP
3 
4 #include "timer_macro.hpp"
5 #include "shift.hpp"
6 #include "count_ptl_per_node.hpp"
7 #include "view_arithmetic.hpp"
8 #include "f0_redistribute.hpp"
9 
10 // Used for original fortran load balancer
11 extern "C" void calculate_load_imbalance(int istep, int gstep, double f0_cost);
12 extern "C" int assess_whether_to_rebalance_load(int istep);
13 extern "C" void reset_cost_trackers();
14 extern "C" void set_weights(int* gvid0_pid, double* ptl_count, double* f0_node_cost);
15 
16 class LoadRegion{
17  public:
18  enum class UpdateMethod{
19  NoHistory=0,
21  };
22 
23  private:
24 
25  View<double*,HostType> estimated_time_per_vertex;
26  std::string region_name;
27  std::vector<std::string> timer_names;
28  bool verbose;
35 
36 #ifdef USE_MPI
37  MPI_Comm inter_period_comm;
38  MPI_Comm period_comm;
39 #endif
40  int n_periods;
43 
45 
47 
48  void update_model_no_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
49  // Loop over ranks' timers
50  for(int i=0; i<all_periods_timings.size(); i++){
51  int node_offset = current_partition(i) - 1;
52  int next_node_offset = current_partition(i+1) - 1;
53  int nnodes = next_node_offset - node_offset;
54  // Best 0th order estimate is to assume all vertices took the same time
55  double time_per_node = all_periods_timings(i)/nnodes;
56  for(int j=0; j<nnodes; j++){
57  estimated_time_per_vertex(j + node_offset) = time_per_node;
58  }
59  }
60  }
61 
62  void update_model_exp_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
63  // Loop over ranks' timers
64  for(int i=0; i<all_periods_timings.size(); i++){
65  // Get partition owned by this rank
66  int node_offset = current_partition(i) - 1;
67  int next_node_offset = current_partition(i+1) - 1;
68  int nnodes = next_node_offset - node_offset;
69 
70  // Get average time per node for this rank
71  double avg_time_per_node = all_periods_timings(i)/nnodes;
72 
73  // Get expected time based on existing model
74  double expected_time = 0.0;
75  for(int j=0; j<nnodes; j++){
76  expected_time += estimated_time_per_vertex(j + node_offset);
77  }
78  double expected_time_per_node = expected_time/nnodes;
79 
80  // Difference between observed time and expected time
81  double extra_time_per_node = avg_time_per_node - expected_time_per_node;
82 
83  for(int j=0; j<nnodes; j++){
84  // Distribute extra time evenly
85  constexpr double adjustment_rate = 0.5;
86 
87  estimated_time_per_vertex(j + node_offset) += extra_time_per_node*adjustment_rate;
88  }
89  }
90  }
91 
93  // Get time from camtimers (accumulated)
94  double new_accumulated_time = 0.0;
95 
96  // Loop through all timers associated with this region, and add the accumulated time for each
97  for(int i=0; i<timer_names.size(); i++){
98  double accumulated_single_timer = 0;
99  int ierr = GPTLget_wallclock(timer_names[i].c_str(), -1, &accumulated_single_timer);
100 
101  // If ierr != 0, timer is not yet defined, so the accumulated time is zero
102  if(ierr != 0) accumulated_single_timer = 0.0;
103 
104  new_accumulated_time += accumulated_single_timer;
105  }
106 
107  // Subtract previous accumulated time to get time spent since last update
108  double time_spent = new_accumulated_time - time_accumulated;
109 
110  // Save new accumulated time
111  time_accumulated = new_accumulated_time;
112 
113  return time_spent;
114  }
115 
116  void touch_timers(){
117  for(int i=0; i<timer_names.size(); i++){
118  GPTLstart(timer_names[i].c_str());
119  GPTLstop(timer_names[i].c_str());
120  }
121  }
122 
123  public:
124 
125 #ifdef USE_MPI
126  LoadRegion(int n_vertices, const MPI_Comm& inter_period_comm, const MPI_Comm& period_comm, UpdateMethod update_method, bool verbose, std::string region_name, std::vector<std::string> timer_names)
128  verbose(verbose),
129  time_accumulated(0.0),
130  inter_period_comm(inter_period_comm),
131  period_comm(period_comm),
132  estimated_time_per_vertex("estimated_time_per_vertex",n_vertices),
138  {
139  // Touch GPTL timers to be sure they exist
140  touch_timers();
141 
142  // Initialize timer
143  reset_timer();
144 
145  // Get problem size from comms
146  MPI_Comm_rank(period_comm, &my_period_rank);
147  MPI_Comm_size(period_comm, &n_unique_ranks);
148  MPI_Comm_size(inter_period_comm, &n_periods);
149  }
150 #endif
151 
152  void reset_timer(){
153  // Reset timer by getting the time since the previous call and suppressing the output
154  double time_spent = get_time_since_previous_call();
155  }
156 
157  double get_estimated_time_per_vertex(int i) const{
158  return estimated_time_per_vertex(i);
159  }
160 
161  View<double*,HostType> get_estimated_time_per_vertex() const{
163  }
164 
166  return prediction_undershoot;
167  }
168 
171  }
172 
175  }
176 
178  return model_is_initialized;
179  }
180 
181  // Predict performance of new partition based on model
182  double get_largest_predicted_time(const View<int*,CLayout,HostType>& proposed_partition) const{
183  double largest_time = 0.0;
184  //int largest_time_ind = 0;
185  for(int i=0; i<(proposed_partition.size()-1); i++){
186  double proc_time = 0.0;
187  for(int i_node=proposed_partition(i)-1; i_node<proposed_partition(i+1)-1; i_node++){
188  proc_time += estimated_time_per_vertex(i_node);
189  }
190  if(proc_time>largest_time){
191  //largest_time_ind = i;
192  largest_time = proc_time;
193  }
194  }
195  return largest_time;
196  }
197 
199  reset_timer();
200 
201  // Model can now be used
202  model_is_initialized = true;
203  }
204 
205  void update_model(const View<int*,CLayout,HostType>& current_partition, double manual_time=-1.0){
207 
208  // Allocate for timing of each
209  View<double*, HostType> all_periods_timings(NoInit("all_periods_timings"), n_unique_ranks);
210 
211 #ifdef USE_MPI
212  // Get time spent since last model update
213  // Time can be entered manually; it's used if positive (i.e. the input was provided)
214  double time_spent_this_rank = (manual_time<0.0 ? get_time_since_previous_call()
215  : manual_time);
216 
217  // Reduce onto plane 0
218  // Could try MPI_SUM rather than MPI_MAX
219  double time_spent;
220  if(n_periods>1){
221  MPI_Reduce(&time_spent_this_rank, &time_spent, 1, MPI_DOUBLE, MPI_MAX, 0, inter_period_comm);
222  }else{
223  time_spent = time_spent_this_rank;
224  }
225 
226  // Gather from all ranks in plane 0
227  MPI_Gather(&time_spent, 1, MPI_DOUBLE, all_periods_timings.data(), 1, MPI_DOUBLE, 0, period_comm);
228 #endif
229 
230  if (is_rank_zero()){
231  // Look at the timing prediction made last time
233  if(verbose) printf("\nPredicted max time in this region was %1.5e", predicted_max_region_time);
234 
235  // Get max region time
237  double observed_sum_region_time = 0.0;
238  for(int i=0; i<all_periods_timings.size(); i++){
239  observed_max_region_time = std::max(observed_max_region_time, all_periods_timings(i));
240  observed_sum_region_time += all_periods_timings(i);
241  }
242  double observed_idle_region_time = observed_max_region_time*all_periods_timings.size() - observed_sum_region_time;
243  observed_load_imbalance = observed_idle_region_time/observed_sum_region_time;
244  if(verbose) printf("\nObserved max time in this region was %1.5e", observed_max_region_time);
245  if(verbose) printf("\n - Load imbalance (T_idle/T_work): %1.2f%%", observed_load_imbalance*100);
246 
247  // Take the max here so that there is never an assumed overshoot
248  if(predicted_max_region_time!=0.0){ // If time is zero, there hasn't been a prediction yet
250  }
251  if(verbose) printf("\nThe prediction undershot by a factor of %1.5e", observed_max_region_time/predicted_max_region_time);
252 
254  update_model_no_history(current_partition, all_periods_timings);
256  if(model_has_history){
257  update_model_exp_history(current_partition, all_periods_timings);
258  }else{
259  update_model_no_history(current_partition, all_periods_timings);
260  model_has_history = true;
261  }
262  }
263  }
264  }else{
265  exit_XGC("Error: Update method not available\n");
266  }
267  }
268 };
269 
271  public:
272  enum class WeightingAlgorithm{
275  Fortran,
276  Default
277  };
278 
279  enum class ConstraintOption{
280  ParticleCount=0
281  };
282 
283  enum class ReweightOption{
284  Always=0,
285  IfBetter
286  };
287 
289 
290  private:
291 
293 
294  std::vector<LoadRegion> regions;
295 
296  View<int*,CLayout,HostType> proposed_partition;
297 
298 #ifdef USE_MPI
299  MPI_Comm comm;
300 #endif
301 
303  bool verbose;
304 
306 
307  double get_even_division(const View<double*,HostType>& input, int n) const{
308  double total = 0.0;
309  Kokkos::parallel_reduce("sum", Kokkos::RangePolicy<HostExSpace>(0,input.size()), [=]( const int i, double& l_total){
310  l_total += input(i);
311  }, total);
312 
313  return (total/n);
314  }
315 
316 
317  bool greedily_fill_partition(const View<double*,HostType>& weight, const View<double*,HostType>& constraint1, double target_weight_per_rank){
318  int nnode = weight.size();
319  int nproc = proposed_partition.size()-1;
320 
321  // Start from rank 0 of the plane; assign nodes until it meets the desired weight per rank, then move to next rank
322  double constraint1_in_this_rank = 0.0;
323  double weight_in_this_rank = 0.0;
324  proposed_partition(0) = 1; // 1-indexed
325  int pid = 0;
326  bool assign_one_node_per_proc = false;
327  for(int i_node = 0; i_node<nnode; i_node++){
328  // Since every rank needs at last one node, switch to assigning at every
329  // iteration if there are only as many nodes left as ranks
330  if(nnode-i_node==nproc-pid) assign_one_node_per_proc = true;
331 
332  // Check if a criterion is met to consider the rank sufficiently loaded
333  bool rank_is_loaded = false;
334 
335  // Criterion 1: We are out of nodes
336  if(assign_one_node_per_proc) rank_is_loaded = true;
337 
338  // Criterion 2: We have hit a constraint
339  if(constraint1_in_this_rank>=constraint1_max) rank_is_loaded = true;
340 
341  // Criterion 3: We have loaded the rank with an even amount of work
342  if(weight_in_this_rank>=target_weight_per_rank) rank_is_loaded = true;
343 
344  // If there are enough particles assigned to this rank, move to the next rank
345  if(rank_is_loaded){
346  proposed_partition(pid+1) = i_node+1; // 1-indexed
347  if(verbose) printf("\nRank %d is loaded (Nodes %d-%d); weight=%1.4e, ptl*nplanes=%d", pid, proposed_partition(pid), proposed_partition(pid+1)-1, weight_in_this_rank, int(constraint1_in_this_rank));
348  constraint1_in_this_rank = 0.0;
349  weight_in_this_rank = 0.0;
350  pid++;
351  if(pid==nproc-1) break;
352  }
353  constraint1_in_this_rank += constraint1(i_node);
354  weight_in_this_rank += weight(i_node);
355  }
356 
357  // Last value will always be nnode+1
358  proposed_partition(nproc) = nnode+1;
359 
360  // Check n_ptl in final rank
361  constraint1_in_this_rank = 0.0;
362  for(int i_node=proposed_partition(nproc-1)-1; i_node<nnode; i_node++){
363  constraint1_in_this_rank += constraint1(i_node);
364  }
365  if(constraint1_in_this_rank>=constraint1_max){
366  // Return false if the constraint was not met
367  return false;
368  }else{
369  return true;
370  }
371  }
372 
373  // Predict performance of new partition based on model
374  double get_largest_predicted_time(const View<int*,CLayout,HostType>& partition, const View<double*,HostType>& weight) const{
375  double largest_time = 0.0;
376  //int largest_time_ind = 0;
377  for(int i=0; i<(partition.size()-1); i++){
378  double proc_time = 0.0;
379  for(int i_node=partition(i)-1; i_node<partition(i+1)-1; i_node++){
380  proc_time += weight(i_node);
381  }
382  if(proc_time>largest_time){
383  //largest_time_ind = i;
384  largest_time = proc_time;
385  }
386  }
387  return largest_time;
388  }
389 
390  void one_weight_balance(const View<double*,HostType>& weight, const View<double*,CLayout,HostType> constraint1){
391  int nproc = proposed_partition.size()-1;
392 
393  // Ideally, each rank would get the same amount of work, i.e. the average
394  double ideal_weight_per_rank = get_even_division(weight, nproc);
395 
396  // Make initial attempt, targeting even distribution of work
397  bool meets_constraints = greedily_fill_partition(weight, constraint1, ideal_weight_per_rank);
398 
399  if(!meets_constraints){
400  // An equal work distribution cannot be assigned due to a constraint.
401  // First, determine the weights if only the constraint were followed
402  meets_constraints = greedily_fill_partition(constraint1, constraint1, get_even_division(constraint1, nproc));
403  if(!meets_constraints) exit_XGC("\nUnexpected issue in load balance: constraint-based partition doesn't satisfy constraints\n");
404 
405  // This partition meets the constraints, but the proposed timing is likely unacceptable
406  double upper_limit_weight_per_rank = get_largest_predicted_time(proposed_partition, weight);
407  if(verbose) printf("The ideal distribution (%1.3e) does not satisfy constraints. This distribution (%1.3e) does.\n", ideal_weight_per_rank, upper_limit_weight_per_rank);
408 
409  // Bisect the difference between the original target time and the upper limit, and see if that satisfies the constraints
410  // Continue until the load balance has been minimized to a precision based on the the original desired time
411  const double desired_precision = 0.01; // Fraction of imbalance that's worth even checking whether it can be removed
412  double desired_step_size = ideal_weight_per_rank*desired_precision;
413  double step_size = upper_limit_weight_per_rank - ideal_weight_per_rank;
414  double compromise_weight_per_rank = upper_limit_weight_per_rank;
415  if(verbose) printf("\nEmploying a binary search to narrow down on the best option.");
416  while(step_size>desired_step_size){
417  // Halve the size of the step
418  step_size /= 2;
419 
420  if(meets_constraints){
421  // Try reducing the load imbalance since constraints were met
422  compromise_weight_per_rank -= step_size;
423  }else{
424  // Try raising the load imbalance since constraints were broken
425  compromise_weight_per_rank += step_size;
426  }
427 
428  // Get new partition
429  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
430  if(verbose) printf("\n Stepped by %1.3e to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
431  }
432  // In case we end at a partition that fails
433  while(!meets_constraints){
434  compromise_weight_per_rank += step_size;
435  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
436  if(verbose) printf("\n Stepped by %1.3e UP to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
437  }
438  if(verbose) printf("\n");
439  }
440  }
441 
442  // TODO
443  // Evaluate overall model performance
444  // MAIN_LOOP time vs sum of regions (% coverage)
445  //
446 
447  /* This function assesses the proposed partition and decides whether it is worth switching to.
448  * The simplest formulation is to recommend the new partition if the new partition is predicted to be faster
449  * than the existing one. There is also a factor to account for historical inaccuracy of the previous estimate.
450  * A future option would be to require a certain level of improvement before repartitioning. */
452  // Total time spent in regions (assuming that global barriers demarcate them):
453  double observed_max_all_rgns_time = 0.0;
454  for(int i=0; i<regions.size(); i++){
455  observed_max_all_rgns_time += regions[i].get_observed_max_region_time();
456  }
457 
458  // Total predicted time, accounting for observed prediction undershoot for each region
459  double predicted_max_all_rgns_time = 0.0;
460  for(int i=0; i<regions.size(); i++){
461  predicted_max_all_rgns_time += regions[i].get_largest_predicted_time(proposed_partition)*regions[i].get_prediction_undershoot();
462  }
463 
464  double fractional_improvement = 1.0 - predicted_max_all_rgns_time/observed_max_all_rgns_time;
465 
466  if(verbose){
467  printf("\nDetermining whether to adopt the proposed partition:");
468  printf("\n Observed time with current partition was: %1.3e", observed_max_all_rgns_time);
469  printf("\n Predicted time with proposed partition, adjusting for historical undershoot (of Rgn 0: %1.3e): %1.3e", regions[0].get_prediction_undershoot(), predicted_max_all_rgns_time);
470  printf("\n Adopted if Fractional improvement of adjusted prediction (%1.3e) exceeds specified threshold (%1.3e)\n", fractional_improvement, threshold_to_rebalance);
471  }
472 
473  // Recommend the new partition if the predicted time is less than the time observed with the current partition
474  if(fractional_improvement > threshold_to_rebalance){
475  return true;
476  }else{
477  return false;
478  }
479  }
480 
482  // Print new partition
483  for(int i = 0; i<proposed_partition.size()-1; i++){
484  int nnodes_for_proc = proposed_partition(i+1) - proposed_partition(i);
485  printf("process %d : nnodes = %d; range = (%d - %d)\n", i, nnodes_for_proc, proposed_partition(i), proposed_partition(i+1)-1);
486  }
487  }
488 
489  void update_model(const View<int*,CLayout,HostType>& current_partition){
490  for(int i=0; i<regions.size(); i++){
491  regions[i].update_model(current_partition);
492  }
493  }
494 
495  // Overload if providing timings manually rather than using camtimers. This is used for testing.
496  void update_model(const View<int*,CLayout,HostType>& current_partition, const std::vector<double>& manual_times){
497  for(int i=0; i<regions.size(); i++){
498  regions[i].update_model(current_partition, manual_times[i]);
499  }
500  }
501 
503  for(int i=0; i<regions.size(); i++){
504  regions[i].initialize_model();
505  }
506  }
507 
509  bool is_initialized=true;
510  for(int i=0; i<regions.size(); i++){
511  is_initialized = (is_initialized && regions[i].get_model_is_initialized());
512  }
513  return is_initialized;
514  }
515 
516  void propose_new_partition(const Kokkos::View<double*,Kokkos::LayoutRight,HostType>& ptl_count, WeightingAlgorithm weighting_algorithm){
517  auto constraint1 = ptl_count; // Constrain based on particle count (memory)
518  if(weighting_algorithm == WeightingAlgorithm::SingleRegionBalance){
519  if(regions.size()!=1) exit_XGC("Error: Load balancing is currently single-region only\n");
520  one_weight_balance(regions[0].get_estimated_time_per_vertex(), constraint1);
521  }else if(weighting_algorithm == WeightingAlgorithm::ParticleBalance){
522  one_weight_balance(ptl_count, constraint1);
523  }
524 
525  if(verbose){
526  printf("\nNewly PROPOSED partition:\n");
528  }
529  }
530 
532  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, WeightingAlgorithm weighting_algorithm){
533  if(weighting_algorithm==WeightingAlgorithm::Fortran){
534  // In the fortran algorithm the new distribution is calculated and set here
535  // Count number of particles belonging to each node
536  Kokkos::View<double**,Kokkos::LayoutRight,HostType> ptl_count = count_ptl_per_node_elec_main_ion(grid, magnetic_field, plasma, sml.electron_on);
537 
538  if(pol_decomp.gvid0_pid_h.size()==2){
539  // Skip set_weights if there is only one rank per plane
540  pol_decomp.gvid0_pid_h(0) = 1;
541  pol_decomp.gvid0_pid_h(1) = pol_decomp.nnodes_on_plane + 1;
542  }else{
543 #ifndef NO_FORTRAN_MODULES
544  TIMER("SET_WEIGHTS",
545  set_weights(pol_decomp.gvid0_pid_h.data(), ptl_count.data(), plasma.f0_node_cost.data()) );
546 #endif
547  }
548  }else{
549 #ifdef USE_MPI
550  // Broadcast proposal to all ranks
551  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
552 #endif
553 
554  // Copy proposal to pol_decomp
555  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
556 
557  if(is_rank_zero()){
558  printf("\nNEW PARTITION:\n");
560  }
561  }
562 
563  // Update bounds and device copy of the decomposition array
564  pol_decomp.update_pol_decomp();
565  GPTLstart("update_flux_surf");
566  pol_decomp.update_flux_surf(grid.flux_surfaces_h);
567  GPTLstop("update_flux_surf");
568  }
569 
571  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp,
572  const View<int*,CLayout,HostType>& old_partition){
573  if (plasma.f0_grid()){
574  // Move f information to correct MPI rank after decomposition update
575  TIMER("F0_REDISTRIBUTE",
576  f0_redistribute(plasma, pol_decomp, grid, magnetic_field, vgrid, old_partition) );
577  }
578 
579  // Shift particles to correct MPI rank after decomposition update
580  TIMER("SHIFT_R",
581  shift_all_species(plasma, grid, magnetic_field, pol_decomp) );
582  }
583 
584  bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, int istep, int gstep, double f0_cost){
585  if(reweight_option==ReweightOption::Always){
586  // No need to check if model is an improvement
587  return true;
588  }else{
589  if(weighting_algorithm==WeightingAlgorithm::Fortran){
590 #ifndef NO_FORTRAN_MODULES
591  // Calculate load imbalance
592  calculate_load_imbalance(istep, gstep, f0_cost);
593 
594  // Assess if load imbalance merits a rebalancing
595  int should_rebalance_int = assess_whether_to_rebalance_load(istep);
596 
597  // Reset fortran cost trackers
599 
600  return (should_rebalance_int==1);
601 #else
602  return false;
603 #endif
604  }else{
605  // Evaluate the proposed partition on rank 0
606  bool should_rebalance;
607  if(weighting_algorithm==WeightingAlgorithm::ParticleBalance){
608  // Since ParticleBalance doesn't contain an internal model, cant evaluate the partition
609  // so just always approve, for now.
610  // Probably better to have the model updatable by something other than time so it can still
611  // be assessed in this mode
612  return true;
613  }else{
614  if(is_rank_zero()) should_rebalance = recommend_proposed_partition();
615  // Convert to int because MPI_C_BOOL documentation is confusing
616  int should_rebalance_int = (should_rebalance ? 1 : 0);
617 #ifdef USE_MPI
618  MPI_Bcast(&should_rebalance_int, 1, MPI_INT, 0, comm);
619 #endif
620  return (should_rebalance_int==1);
621  }
622  }
623  }
624  }
625 
626  public:
627 
629 
630  LoadBalance(NLReader::NamelistReader& nlr, const DomainDecomposition<DeviceType>& pol_decomp, bool sync_planes = true)
631  : proposed_partition(NoInit("proposed_partition"), pol_decomp.gvid0_pid_h.layout())
632 #ifdef USE_MPI
633  , comm(sync_planes ? pol_decomp.mpi.comm : pol_decomp.mpi.plane_comm)
634 #endif
635  {
636 
637  nlr.use_namelist("load_balance_param", NLReader::NamelistReader::Optional);
638  double max_mem_redist_gb = nlr.get<double>("max_mem_redist_gb", 10.0); // Sets the maximum amount of memory per rank that can be allocated to particles, in gigabytes.
639  std::string weighting_algorithm_str = nlr.get<std::string>("weighting_algorithm", "Fortran"); // Load balancing method. "Fortran" is the default since the newer methods are experimental. "ParticleBalance" does not incorporate timing data, but only tries to equally distribute particles. "SingleRegionBalance" currently tries to balance the push.
640  threshold_to_rebalance = nlr.get<double>("threshold_to_rebalance", 0.02); // How much better the projected timing of a proposed load redistribution must be than the current distribution in order to adopt the new one. e.g. 0.02 sets a threshold of 2% better.
641  verbose = nlr.get<bool>("verbose", false); // Verbose output of the internal load distribution model and load balancing decisions.
642  std::string update_method_str = nlr.get<std::string>("update_method", "NoHistory"); // Methods for updating internal model of load distribution. "NoHistory" uses only the most recent time step, while "ExpHistory" averages the existing model with the new step's timing information. Does not apply when weighting_algorithm is "Fortran".
643 
644  shift_before_push = nlr.get<bool>("shift_before_push",false); // Calls a particle shift to evenly distribute particles on plane right before the particle push
645 
646  // Simple model for memory usage from load balance - just particles
647  double max_n_ptl_on_rank = max_mem_redist_gb*1024*1024*1024/80.0; // ~ 80 B per particle
648 
649  if(weighting_algorithm_str=="Fortran"){
651  }else if(weighting_algorithm_str=="ParticleBalance"){
653  }else if(weighting_algorithm_str=="SingleRegionBalance"){
655  }else{
656  exit_XGC("\nError: weighting_algorithm input not valid.");
657  }
658 
659 #ifdef USE_MPI
660  // If planes are sync'd, use intpl_comm to coordinate them; otherwise, use MPI_COMM_SELF
661  MPI_Comm inter_period_comm = sync_planes ? pol_decomp.mpi.intpl_comm : MPI_COMM_SELF;
662 
663  // Set up load balance regions
665  LoadRegion::UpdateMethod update_method;
666  if(update_method_str=="NoHistory"){
667  update_method = LoadRegion::UpdateMethod::NoHistory;
668  }else if(update_method_str=="ExpHistory"){
669  update_method = LoadRegion::UpdateMethod::ExpHistory;
670  }else{
671  exit_XGC("\nError: update_method input not valid.");
672  }
673 
674  regions.push_back(LoadRegion(pol_decomp.nnodes_on_plane, inter_period_comm, pol_decomp.mpi.plane_comm, update_method, verbose,
675  "Push",
676  {"ipc1:PUSHE",
677  "ipc1:PUSHI",
678  "ipc2:PUSHE",
679  "ipc2:PUSHI"}));
680  }
681 
682  // Hard-code for now
684 
685  if(constraint_option == ConstraintOption::ParticleCount){
686  // Normalize by n_planes since we are using the sum of planes as the constraint
687  int n_periods;
688  MPI_Comm_size(inter_period_comm, &n_periods);
689  constraint1_max = max_n_ptl_on_rank*n_periods;
690  }else{
691  exit_XGC("\nError: ParticleConstraint is only available ConstraintOption.\n");
692  }
693 #endif
694  }
695 
697  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, int istep, int gstep, ReweightOption reweight_option,
698  WeightingAlgorithm weighting_algorithm = WeightingAlgorithm::Default){
699 
700  // Skip rebalancing if poloidal decomposition is turned off. Normally we should put the function in
701  // an if statement rather than using return; kept here in case we might want some balancing for some reason
702  // even without poloidal decomposition
703  if(pol_decomp.pol_decomp==false) return;
704 
705  // Rebalance is trivial if there is only one rank on a plane (generalize this from no-MPI case)
706 #ifdef USE_MPI
707 
708  if(weighting_algorithm == WeightingAlgorithm::Default){
709  weighting_algorithm = default_weighting_algorithm;
710  }
711 
713  // Override input algorithm; if the default is Fortran, it should always use the fortran load balancer
714  weighting_algorithm = default_weighting_algorithm;
715  }
716 
717  if(is_rank_zero() && verbose){
718  if(weighting_algorithm==WeightingAlgorithm::Fortran) printf("\nLoad balance called with Fortran algorithm\n");
719  else if(weighting_algorithm==WeightingAlgorithm::ParticleBalance) printf("\nLoad balance called with Ptl algorithm\n");
720  else if(weighting_algorithm==WeightingAlgorithm::SingleRegionBalance) printf("\nLoad balance called with SingleRegion algorithm\n");
721  }
722 
723  GPTLstart("REBALANCE");
724  if(weighting_algorithm!=WeightingAlgorithm::Fortran){
725  GPTLstart("PTL_COUNT");
726  plasma.for_all_nonadiabatic_species([&](Species<DeviceType>& species){
727  long long int total_n_ptl = species.get_total_n_ptl();
728  int max_n_ptl = species.get_max_n_ptl();
729  double avg_n_ptl = (double)(total_n_ptl)/pol_decomp.mpi.nranks;
730  double max_ratio = max_n_ptl/avg_n_ptl;
731  if(is_rank_zero()) printf(" Species %d: (max/avg ptl per rank = %1.2f); total n_ptl = %lld\n", species.idx, max_ratio, total_n_ptl);
733  GPTLstop("PTL_COUNT");
734 
735  GPTLstart("PTL_COUNT_PER_NODE");
736  auto ptl_count = count_all_ptl_per_node(grid, magnetic_field, plasma);
737  GPTLstop("PTL_COUNT_PER_NODE");
738 
739  if(weighting_algorithm!=WeightingAlgorithm::ParticleBalance){
740  if(model_is_initialized()){
741  // Update the model with the latest timing
742  TIMER("LOAD_BAL_UPDATE",
743  update_model(pol_decomp.gvid0_pid_h) );
744  }else{
745  if(is_rank_zero() && verbose) printf("Initializing timing model, no partition proposal yet.");
747  GPTLstop("REBALANCE");
748  return;
749  }
750  }
751 
752  // With the updated model, proposed a new partition
753  GPTLstart("LOAD_BAL_NEW_PART");
754  if (is_rank_zero()) propose_new_partition(ptl_count, weighting_algorithm);
755  GPTLstop("LOAD_BAL_NEW_PART");
756  }
757 
758  GPTLstart("LOAD_BAL_REBAL");
759  double f0_cost = plasma.f0_grid() ? sum_view(plasma.f0_node_cost) : 0.0;
760  if(will_rebalance(reweight_option, weighting_algorithm, istep, gstep, f0_cost)){
761  // Save the old partition, it is used to save some communication in the f0 redistribution
762  View<int*,CLayout,HostType> old_partition(NoInit("old_partition"), pol_decomp.gvid0_pid_h.layout());
763  Kokkos::deep_copy(old_partition, pol_decomp.gvid0_pid_h);
764 
765  // Update pol_decomp
766  TIMER("LOAD_BAL_SET_NEW",
767  set_new_partition(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, weighting_algorithm) );
768 
769  // Update particles and f0
770  TIMER("LOAD_BAL_REDIST",
771  redistribute_load(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, old_partition) );
772  }
773  GPTLstop("LOAD_BAL_REBAL");
774  GPTLstop("REBALANCE");
775 #endif
776  }
777 
778 
779  // More generic rebalance
780  void rebalance(DomainDecomposition<DeviceType>& pol_decomp, const View<double*,CLayout,HostType>& constraint, const std::vector<double>& timings, double& load_imbalance, View<double*,HostType>& model_belief){
781 #ifdef USE_MPI
782  update_model(pol_decomp.gvid0_pid_h, timings);
783 
784  load_imbalance = regions[0].get_observed_load_imbalance();
785  model_belief = regions[0].get_estimated_time_per_vertex();
786 
787  // Propose new partition
789 
790  double f0_cost = 0.0;
791  int istep = 0;
792  int gstep = 0;
794  // Broadcast proposal to all ranks
795  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
796 
797  // Copy proposal to pol_decomp
798  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
799 
800  // Update bounds and device copy of the decomposition array
801  pol_decomp.update_pol_decomp();
802 
803  // If there were a real load, redistribute it here
804  }
805 #endif
806  }
807 };
808 
809 #endif
int nnodes_on_plane
Number of nodes on local plane.
Definition: domain_decomposition.hpp:89
bool pol_decomp
Use poloidal decomposition.
Definition: domain_decomposition.hpp:82
void update_pol_decomp()
Definition: domain_decomposition.cpp:235
Kokkos::View< int *, Kokkos::LayoutRight, HostType > gvid0_pid_h
Which processors get which vertices (host)
Definition: domain_decomposition.hpp:99
void update_flux_surf(const HostArray< VertexList > &surfaces)
Definition: domain_decomposition.cpp:124
HostArray< VertexList > flux_surfaces_h
Definition: grid.hpp:204
Definition: load_balance.hpp:270
WeightingAlgorithm default_weighting_algorithm
Definition: load_balance.hpp:292
ConstraintOption
Definition: load_balance.hpp:279
double threshold_to_rebalance
Definition: load_balance.hpp:302
bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, int istep, int gstep, double f0_cost)
Definition: load_balance.hpp:584
bool greedily_fill_partition(const View< double *, HostType > &weight, const View< double *, HostType > &constraint1, double target_weight_per_rank)
Definition: load_balance.hpp:317
double get_even_division(const View< double *, HostType > &input, int n) const
Definition: load_balance.hpp:307
double get_largest_predicted_time(const View< int *, CLayout, HostType > &partition, const View< double *, HostType > &weight) const
Definition: load_balance.hpp:374
void print_new_partition()
Definition: load_balance.hpp:481
double constraint1_max
Definition: load_balance.hpp:305
void initialize_model()
Definition: load_balance.hpp:502
void set_new_partition(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:531
void update_model(const View< int *, CLayout, HostType > &current_partition, const std::vector< double > &manual_times)
Definition: load_balance.hpp:496
std::vector< LoadRegion > regions
Definition: load_balance.hpp:294
void update_model(const View< int *, CLayout, HostType > &current_partition)
Definition: load_balance.hpp:489
LoadBalance()
Definition: load_balance.hpp:628
void propose_new_partition(const Kokkos::View< double *, Kokkos::LayoutRight, HostType > &ptl_count, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:516
bool model_is_initialized()
Definition: load_balance.hpp:508
WeightingAlgorithm
Definition: load_balance.hpp:272
bool shift_before_push
Definition: load_balance.hpp:288
void rebalance(DomainDecomposition< DeviceType > &pol_decomp, const View< double *, CLayout, HostType > &constraint, const std::vector< double > &timings, double &load_imbalance, View< double *, HostType > &model_belief)
Definition: load_balance.hpp:780
bool verbose
Definition: load_balance.hpp:303
LoadBalance(NLReader::NamelistReader &nlr, const DomainDecomposition< DeviceType > &pol_decomp, bool sync_planes=true)
Definition: load_balance.hpp:630
ReweightOption
Definition: load_balance.hpp:283
void rebalance(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, int istep, int gstep, ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm=WeightingAlgorithm::Default)
Definition: load_balance.hpp:696
void redistribute_load(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, const View< int *, CLayout, HostType > &old_partition)
Definition: load_balance.hpp:570
void one_weight_balance(const View< double *, HostType > &weight, const View< double *, CLayout, HostType > constraint1)
Definition: load_balance.hpp:390
bool recommend_proposed_partition()
Definition: load_balance.hpp:451
View< int *, CLayout, HostType > proposed_partition
Which processors get which vertices.
Definition: load_balance.hpp:296
Definition: load_balance.hpp:16
View< double *, HostType > get_estimated_time_per_vertex() const
Definition: load_balance.hpp:161
double get_estimated_time_per_vertex(int i) const
Definition: load_balance.hpp:157
double get_largest_predicted_time(const View< int *, CLayout, HostType > &proposed_partition) const
Definition: load_balance.hpp:182
View< double *, HostType > estimated_time_per_vertex
Definition: load_balance.hpp:25
double time_accumulated
Definition: load_balance.hpp:44
double get_time_since_previous_call()
Definition: load_balance.hpp:92
double get_prediction_undershoot() const
Definition: load_balance.hpp:165
void initialize_model()
Definition: load_balance.hpp:198
void update_model_exp_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:62
bool model_has_history
Definition: load_balance.hpp:30
UpdateMethod update_method
Definition: load_balance.hpp:46
bool get_model_is_initialized() const
Definition: load_balance.hpp:177
std::string region_name
Definition: load_balance.hpp:26
std::vector< std::string > timer_names
Definition: load_balance.hpp:27
int n_unique_ranks
How many ranks are in a 'period' (in tokamaks, in a plane)
Definition: load_balance.hpp:41
double prediction_undershoot
Definition: load_balance.hpp:34
bool verbose
Definition: load_balance.hpp:28
bool model_is_initialized
Definition: load_balance.hpp:29
double observed_max_region_time
Definition: load_balance.hpp:32
double observed_load_imbalance
Definition: load_balance.hpp:33
double predicted_max_region_time
Definition: load_balance.hpp:31
void update_model_no_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:48
double get_observed_max_region_time() const
Definition: load_balance.hpp:169
void update_model(const View< int *, CLayout, HostType > &current_partition, double manual_time=-1.0)
Definition: load_balance.hpp:205
int my_period_rank
Definition: load_balance.hpp:42
int n_periods
How many repeating periods there are; in tokamaks this is planes.
Definition: load_balance.hpp:40
void touch_timers()
Definition: load_balance.hpp:116
void reset_timer()
Definition: load_balance.hpp:152
UpdateMethod
Definition: load_balance.hpp:18
double get_observed_load_imbalance() const
Definition: load_balance.hpp:173
Definition: magnetic_field.hpp:12
Definition: NamelistReader.hpp:199
T get(const string &param, const T default_val, int val_ind=0)
Definition: NamelistReader.hpp:392
void use_namelist(const string &namelist, Options required=Required)
Definition: NamelistReader.hpp:366
@ Optional
Definition: NamelistReader.hpp:293
Definition: plasma.hpp:13
@ NoDevicePtl
Definition: plasma.hpp:101
Definition: sml.hpp:10
bool electron_on
Use kinetic electrons.
Definition: sml.hpp:57
Definition: species.hpp:75
int get_max_n_ptl()
Definition: species.hpp:743
int idx
Index in all_species.
Definition: species.hpp:78
long long int get_total_n_ptl()
Definition: species.hpp:732
Kokkos::View< double **, Kokkos::LayoutRight, HostType > count_ptl_per_node_elec_main_ion(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma, bool kinetic_electrons)
Definition: count_ptl_per_node.cpp:29
View< double *, CLayout, HostType > count_all_ptl_per_node(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma)
Definition: count_ptl_per_node.cpp:71
void f0_redistribute(Plasma &plasma, const DomainDecomposition< DeviceType > &pol_decomp, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, const View< int *, CLayout, HostType > &old_partition)
Definition: f0_redistribute.cpp:81
void exit_XGC(std::string msg)
Definition: globals.hpp:38
bool is_rank_zero()
Definition: globals.hpp:28
void reset_cost_trackers()
int assess_whether_to_rebalance_load(int istep)
void calculate_load_imbalance(int istep, int gstep, double f0_cost)
void set_weights(int *gvid0_pid, double *ptl_count, double *f0_node_cost)
Definition: magnetic_field.F90:1
logical false
Definition: module.F90:103
subroutine plasma(grid, itr, p, dene_out, deni_out, Te_out, Ti_out, Vparai_out, ignore_vacuum)
Calculate the plasma density, temperature, and parallel velocity for a point in triangle itr using pl...
Definition: neutral_totalf.F90:1541
void shift_all_species(Plasma &plasma, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const DomainDecomposition< DeviceType > &pol_decomp, bool evenly_distribute_ptl)
Definition: shift.cpp:440
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:69
Definition: velocity_grid.hpp:8
#define TIMER(N, F)
Definition: timer_macro.hpp:24
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:135