XGC1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
load_balance.hpp
Go to the documentation of this file.
1 #ifndef LOAD_BALANCE_HPP
2 #define LOAD_BALANCE_HPP
3 
4 #include "timer_macro.hpp"
5 #include "shift.hpp"
6 #include "count_ptl_per_node.hpp"
7 #include "view_arithmetic.hpp"
8 #include "f0_redistribute.hpp"
9 
10 // Used for original fortran load balancer
11 extern "C" void calculate_load_imbalance(double f0_cost);
12 extern "C" int assess_whether_to_rebalance_load();
13 extern "C" void reset_cost_trackers();
14 extern "C" void set_weights(int* gvid0_pid, double* ptl_count, double* f0_node_cost);
15 
16 class LoadRegion{
17  public:
18  enum class UpdateMethod{
19  NoHistory=0,
21  };
22 
23  private:
24 
25  View<double*,HostType> estimated_time_per_vertex;
26  std::string region_name;
27  std::vector<std::string> timer_names;
28  bool verbose;
35 
36 #ifdef USE_MPI
37  MPI_Comm inter_period_comm;
38  MPI_Comm period_comm;
39 #endif
40  int n_periods;
43 
45 
47 
48  void update_model_no_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
49  // Loop over ranks' timers
50  for(int i=0; i<all_periods_timings.size(); i++){
51  int node_offset = current_partition(i) - 1;
52  int next_node_offset = current_partition(i+1) - 1;
53  int nnodes = next_node_offset - node_offset;
54  // Best 0th order estimate is to assume all vertices took the same time
55  double time_per_node = all_periods_timings(i)/nnodes;
56  for(int j=0; j<nnodes; j++){
57  estimated_time_per_vertex(j + node_offset) = time_per_node;
58  }
59  }
60  }
61 
62  void update_model_exp_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
63  // Loop over ranks' timers
64  for(int i=0; i<all_periods_timings.size(); i++){
65  // Get partition owned by this rank
66  int node_offset = current_partition(i) - 1;
67  int next_node_offset = current_partition(i+1) - 1;
68  int nnodes = next_node_offset - node_offset;
69 
70  // Get average time per node for this rank
71  double avg_time_per_node = all_periods_timings(i)/nnodes;
72 
73  // Get expected time based on existing model
74  double expected_time = 0.0;
75  for(int j=0; j<nnodes; j++){
76  expected_time += estimated_time_per_vertex(j + node_offset);
77  }
78  double expected_time_per_node = expected_time/nnodes;
79 
80  // Difference between observed time and expected time
81  double extra_time_per_node = avg_time_per_node - expected_time_per_node;
82 
83  for(int j=0; j<nnodes; j++){
84  // Distribute extra time evenly
85  constexpr double adjustment_rate = 0.5;
86 
87  estimated_time_per_vertex(j + node_offset) += extra_time_per_node*adjustment_rate;
88  }
89  }
90  }
91 
93  // Get time from camtimers (accumulated)
94  double new_accumulated_time = 0.0;
95 
96  // Loop through all timers associated with this region, and add the accumulated time for each
97  for(int i=0; i<timer_names.size(); i++){
98  double accumulated_single_timer = 0;
99  int ierr = GPTLget_wallclock(timer_names[i].c_str(), -1, &accumulated_single_timer);
100 
101  // If ierr != 0, timer is not yet defined, so the accumulated time is zero
102  if(ierr != 0) accumulated_single_timer = 0.0;
103 
104  new_accumulated_time += accumulated_single_timer;
105  }
106 
107  // Subtract previous accumulated time to get time spent since last update
108  double time_spent = new_accumulated_time - time_accumulated;
109 
110  // Save new accumulated time
111  time_accumulated = new_accumulated_time;
112 
113  return time_spent;
114  }
115 
116  void touch_timers(){
117  for(int i=0; i<timer_names.size(); i++){
118  GPTLstart(timer_names[i].c_str());
119  GPTLstop(timer_names[i].c_str());
120  }
121  }
122 
123  public:
124 
125 #ifdef USE_MPI
126  LoadRegion(int n_vertices, const MPI_Comm& inter_period_comm, const MPI_Comm& period_comm, UpdateMethod update_method, bool verbose, std::string region_name, std::vector<std::string> timer_names)
127  : model_is_initialized(false),
128  verbose(verbose),
129  time_accumulated(0.0),
130  inter_period_comm(inter_period_comm),
131  period_comm(period_comm),
132  estimated_time_per_vertex("estimated_time_per_vertex",n_vertices),
134  update_method(update_method),
135  model_has_history(false),
136  region_name(region_name),
137  timer_names(timer_names)
138  {
139  // Touch GPTL timers to be sure they exist
140  touch_timers();
141 
142  // Initialize timer
143  reset_timer();
144 
145  // Get problem size from comms
146  MPI_Comm_rank(period_comm, &my_period_rank);
147  MPI_Comm_size(period_comm, &n_unique_ranks);
148  MPI_Comm_size(inter_period_comm, &n_periods);
149  }
150 #endif
151 
152  void reset_timer(){
153  // Reset timer by getting the time since the previous call and suppressing the output
154  double time_spent = get_time_since_previous_call();
155  }
156 
157  double get_estimated_time_per_vertex(int i) const{
158  return estimated_time_per_vertex(i);
159  }
160 
161  View<double*,HostType> get_estimated_time_per_vertex() const{
163  }
164 
166  return prediction_undershoot;
167  }
168 
171  }
172 
175  }
176 
178  return model_is_initialized;
179  }
180 
181  // Predict performance of new partition based on model
182  double get_largest_predicted_time(const View<int*,CLayout,HostType>& proposed_partition) const{
183  double largest_time = 0.0;
184  //int largest_time_ind = 0;
185  for(int i=0; i<(proposed_partition.size()-1); i++){
186  double proc_time = 0.0;
187  for(int i_node=proposed_partition(i)-1; i_node<proposed_partition(i+1)-1; i_node++){
188  proc_time += estimated_time_per_vertex(i_node);
189  }
190  if(proc_time>largest_time){
191  //largest_time_ind = i;
192  largest_time = proc_time;
193  }
194  }
195  return largest_time;
196  }
197 
199  reset_timer();
200 
201  // Model can now be used
202  model_is_initialized = true;
203  }
204 
205  void update_model(const View<int*,CLayout,HostType>& current_partition, double manual_time=-1.0){
207 
208  // Allocate for timing of each
209  View<double*, HostType> all_periods_timings(NoInit("all_periods_timings"), n_unique_ranks);
210 
211 #ifdef USE_MPI
212  // Get time spent since last model update
213  // Time can be entered manually; it's used if positive (i.e. the input was provided)
214  double time_spent_this_rank = (manual_time<0.0 ? get_time_since_previous_call()
215  : manual_time);
216 
217  // Reduce onto plane 0
218  // Could try MPI_SUM rather than MPI_MAX
219  double time_spent;
220  if(n_periods>1){
221  MPI_Reduce(&time_spent_this_rank, &time_spent, 1, MPI_DOUBLE, MPI_MAX, 0, inter_period_comm);
222  }else{
223  time_spent = time_spent_this_rank;
224  }
225 
226  // Gather from all ranks in plane 0
227  MPI_Gather(&time_spent, 1, MPI_DOUBLE, all_periods_timings.data(), 1, MPI_DOUBLE, 0, period_comm);
228 #endif
229 
230  if (is_rank_zero()){
231  // Look at the timing prediction made last time
233  if(verbose) printf("\nPredicted max time in this region was %1.5e", predicted_max_region_time);
234 
235  // Get max region time
237  double observed_sum_region_time = 0.0;
238  for(int i=0; i<all_periods_timings.size(); i++){
239  observed_max_region_time = std::max(observed_max_region_time, all_periods_timings(i));
240  observed_sum_region_time += all_periods_timings(i);
241  }
242  double observed_idle_region_time = observed_max_region_time*all_periods_timings.size() - observed_sum_region_time;
243  observed_load_imbalance = observed_idle_region_time/observed_sum_region_time;
244  if(verbose) printf("\nObserved max time in this region was %1.5e", observed_max_region_time);
245  if(verbose) printf("\n - Load imbalance (T_idle/T_work): %1.2f%%", observed_load_imbalance*100);
246 
247  // Take the max here so that there is never an assumed overshoot
248  if(predicted_max_region_time!=0.0){ // If time is zero, there hasn't been a prediction yet
250  }
251  if(verbose) printf("\nThe prediction undershot by a factor of %1.5e", observed_max_region_time/predicted_max_region_time);
252 
254  update_model_no_history(current_partition, all_periods_timings);
256  if(model_has_history){
257  update_model_exp_history(current_partition, all_periods_timings);
258  }else{
259  update_model_no_history(current_partition, all_periods_timings);
260  model_has_history = true;
261  }
262  }
263  }
264  }else{
265  exit_XGC("Error: Update method not available\n");
266  }
267  }
268 };
269 
271  public:
272  enum class WeightingAlgorithm{
275  Fortran,
276  Default
277  };
278 
279  enum class ConstraintOption{
280  ParticleCount=0
281  };
282 
283  enum class ReweightOption{
284  Always=0,
285  IfBetter
286  };
287 
288  private:
289 
291 
292  std::vector<LoadRegion> regions;
293 
294  View<int*,CLayout,HostType> proposed_partition;
295 
296 #ifdef USE_MPI
297  MPI_Comm comm;
298 #endif
299 
301  bool verbose;
302 
304 
305  double get_even_division(const View<double*,HostType>& input, int n) const{
306  double total = 0.0;
307  Kokkos::parallel_reduce("sum", Kokkos::RangePolicy<HostExSpace>(0,input.size()), [=]( const int i, double& l_total){
308  l_total += input(i);
309  }, total);
310 
311  return (total/n);
312  }
313 
314 
315  bool greedily_fill_partition(const View<double*,HostType>& weight, const View<double*,HostType>& constraint1, double target_weight_per_rank){
316  int nnode = weight.size();
317  int nproc = proposed_partition.size()-1;
318 
319  // Start from rank 0 of the plane; assign nodes until it meets the desired weight per rank, then move to next rank
320  double constraint1_in_this_rank = 0.0;
321  double weight_in_this_rank = 0.0;
322  proposed_partition(0) = 1; // 1-indexed
323  int pid = 0;
324  bool assign_one_node_per_proc = false;
325  for(int i_node = 0; i_node<nnode; i_node++){
326  // Since every rank needs at last one node, switch to assigning at every
327  // iteration if there are only as many nodes left as ranks
328  if(nnode-i_node==nproc-pid) assign_one_node_per_proc = true;
329 
330  // Check if a criterion is met to consider the rank sufficiently loaded
331  bool rank_is_loaded = false;
332 
333  // Criterion 1: We are out of nodes
334  if(assign_one_node_per_proc) rank_is_loaded = true;
335 
336  // Criterion 2: We have hit a constraint
337  if(constraint1_in_this_rank>=constraint1_max) rank_is_loaded = true;
338 
339  // Criterion 3: We have loaded the rank with an even amount of work
340  if(weight_in_this_rank>=target_weight_per_rank) rank_is_loaded = true;
341 
342  // If there are enough particles assigned to this rank, move to the next rank
343  if(rank_is_loaded){
344  proposed_partition(pid+1) = i_node+1; // 1-indexed
345  if(verbose) printf("\nRank %d is loaded (Nodes %d-%d); weight=%1.4e, ptl*nplanes=%d", pid, proposed_partition(pid), proposed_partition(pid+1)-1, weight_in_this_rank, int(constraint1_in_this_rank));
346  constraint1_in_this_rank = 0.0;
347  weight_in_this_rank = 0.0;
348  pid++;
349  if(pid==nproc-1) break;
350  }
351  constraint1_in_this_rank += constraint1(i_node);
352  weight_in_this_rank += weight(i_node);
353  }
354 
355  // Last value will always be nnode+1
356  proposed_partition(nproc) = nnode+1;
357 
358  // Check n_ptl in final rank
359  constraint1_in_this_rank = 0.0;
360  for(int i_node=proposed_partition(nproc-1)-1; i_node<nnode; i_node++){
361  constraint1_in_this_rank += constraint1(i_node);
362  }
363  if(constraint1_in_this_rank>=constraint1_max){
364  // Return false if the constraint was not met
365  return false;
366  }else{
367  return true;
368  }
369  }
370 
371  // Predict performance of new partition based on model
372  double get_largest_predicted_time(const View<int*,CLayout,HostType>& partition, const View<double*,HostType>& weight) const{
373  double largest_time = 0.0;
374  //int largest_time_ind = 0;
375  for(int i=0; i<(partition.size()-1); i++){
376  double proc_time = 0.0;
377  for(int i_node=partition(i)-1; i_node<partition(i+1)-1; i_node++){
378  proc_time += weight(i_node);
379  }
380  if(proc_time>largest_time){
381  //largest_time_ind = i;
382  largest_time = proc_time;
383  }
384  }
385  return largest_time;
386  }
387 
388  void one_weight_balance(const View<double*,HostType>& weight, const View<double*,CLayout,HostType> constraint1){
389  int nproc = proposed_partition.size()-1;
390 
391  // Ideally, each rank would get the same amount of work, i.e. the average
392  double ideal_weight_per_rank = get_even_division(weight, nproc);
393 
394  // Make initial attempt, targeting even distribution of work
395  bool meets_constraints = greedily_fill_partition(weight, constraint1, ideal_weight_per_rank);
396 
397  if(!meets_constraints){
398  // An equal work distribution cannot be assigned due to a constraint.
399  // First, determine the weights if only the constraint were followed
400  meets_constraints = greedily_fill_partition(constraint1, constraint1, get_even_division(constraint1, nproc));
401  if(!meets_constraints) exit_XGC("\nUnexpected issue in load balance: constraint-based partition doesn't satisfy constraints\n");
402 
403  // This partition meets the constraints, but the proposed timing is likely unacceptable
404  double upper_limit_weight_per_rank = get_largest_predicted_time(proposed_partition, weight);
405  if(verbose) printf("The ideal distribution (%1.3e) does not satisfy constraints. This distribution (%1.3e) does.\n", ideal_weight_per_rank, upper_limit_weight_per_rank);
406 
407  // Bisect the difference between the original target time and the upper limit, and see if that satisfies the constraints
408  // Continue until the load balance has been minimized to a precision based on the the original desired time
409  const double desired_precision = 0.01; // Fraction of imbalance that's worth even checking whether it can be removed
410  double desired_step_size = ideal_weight_per_rank*desired_precision;
411  double step_size = upper_limit_weight_per_rank - ideal_weight_per_rank;
412  double compromise_weight_per_rank = upper_limit_weight_per_rank;
413  if(verbose) printf("\nEmploying a binary search to narrow down on the best option.");
414  while(step_size>desired_step_size){
415  // Halve the size of the step
416  step_size /= 2;
417 
418  if(meets_constraints){
419  // Try reducing the load imbalance since constraints were met
420  compromise_weight_per_rank -= step_size;
421  }else{
422  // Try raising the load imbalance since constraints were broken
423  compromise_weight_per_rank += step_size;
424  }
425 
426  // Get new partition
427  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
428  if(verbose) printf("\n Stepped by %1.3e to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
429  }
430  // In case we end at a partition that fails
431  while(!meets_constraints){
432  compromise_weight_per_rank += step_size;
433  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
434  if(verbose) printf("\n Stepped by %1.3e UP to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
435  }
436  if(verbose) printf("\n");
437  }
438  }
439 
440  // TODO
441  // Evaluate overall model performance
442  // MAIN_LOOP time vs sum of regions (% coverage)
443  //
444 
445  /* This function assesses the proposed partition and decides whether it is worth switching to.
446  * The simplest formulation is to recommend the new partition if the new partition is predicted to be faster
447  * than the existing one. There is also a factor to account for historical inaccuracy of the previous estimate.
448  * A future option would be to require a certain level of improvement before repartitioning. */
450  // Total time spent in regions (assuming that global barriers demarcate them):
451  double observed_max_all_rgns_time = 0.0;
452  for(int i=0; i<regions.size(); i++){
453  observed_max_all_rgns_time += regions[i].get_observed_max_region_time();
454  }
455 
456  // Total predicted time, accounting for observed prediction undershoot for each region
457  double predicted_max_all_rgns_time = 0.0;
458  for(int i=0; i<regions.size(); i++){
459  predicted_max_all_rgns_time += regions[i].get_largest_predicted_time(proposed_partition)*regions[i].get_prediction_undershoot();
460  }
461 
462  double fractional_improvement = 1.0 - predicted_max_all_rgns_time/observed_max_all_rgns_time;
463 
464  if(verbose){
465  printf("\nDetermining whether to adopt the proposed partition:");
466  printf("\n Observed time with current partition was: %1.3e", observed_max_all_rgns_time);
467  printf("\n Predicted time with proposed partition, adjusting for historical undershoot (of Rgn 0: %1.3e): %1.3e", regions[0].get_prediction_undershoot(), predicted_max_all_rgns_time);
468  printf("\n Adopted if Fractional improvement of adjusted prediction (%1.3e) exceeds specified threshold (%1.3e)\n", fractional_improvement, threshold_to_rebalance);
469  }
470 
471  // Recommend the new partition if the predicted time is less than the time observed with the current partition
472  if(fractional_improvement > threshold_to_rebalance){
473  return true;
474  }else{
475  return false;
476  }
477  }
478 
480  // Print new partition
481  for(int i = 0; i<proposed_partition.size()-1; i++){
482  int nnodes_for_proc = proposed_partition(i+1) - proposed_partition(i);
483  printf("process %d : nnodes = %d; range = (%d - %d)\n", i, nnodes_for_proc, proposed_partition(i), proposed_partition(i+1)-1);
484  }
485  }
486 
487  void update_model(const View<int*,CLayout,HostType>& current_partition){
488  for(int i=0; i<regions.size(); i++){
489  regions[i].update_model(current_partition);
490  }
491  }
492 
493  // Overload if providing timings manually rather than using camtimers. This is used for testing.
494  void update_model(const View<int*,CLayout,HostType>& current_partition, const std::vector<double>& manual_times){
495  for(int i=0; i<regions.size(); i++){
496  regions[i].update_model(current_partition, manual_times[i]);
497  }
498  }
499 
501  for(int i=0; i<regions.size(); i++){
502  regions[i].initialize_model();
503  }
504  }
505 
507  bool is_initialized=true;
508  for(int i=0; i<regions.size(); i++){
509  is_initialized = (is_initialized && regions[i].get_model_is_initialized());
510  }
511  return is_initialized;
512  }
513 
514  void propose_new_partition(const Kokkos::View<double*,Kokkos::LayoutRight,HostType>& ptl_count, WeightingAlgorithm weighting_algorithm){
515  auto constraint1 = ptl_count; // Constrain based on particle count (memory)
516  if(weighting_algorithm == WeightingAlgorithm::SingleRegionBalance){
517  if(regions.size()!=1) exit_XGC("Error: Load balancing is currently single-region only\n");
518  one_weight_balance(regions[0].get_estimated_time_per_vertex(), constraint1);
519  }else if(weighting_algorithm == WeightingAlgorithm::ParticleBalance){
520  one_weight_balance(ptl_count, constraint1);
521  }
522 
523  if(verbose){
524  printf("\nNewly PROPOSED partition:\n");
526  }
527  }
528 
530  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, WeightingAlgorithm weighting_algorithm){
531  if(weighting_algorithm==WeightingAlgorithm::Fortran){
532  // In the fortran algorithm the new distribution is calculated and set here
533  // Count number of particles belonging to each node
534  Kokkos::View<double**,Kokkos::LayoutRight,HostType> ptl_count = count_ptl_per_node_elec_main_ion(grid, magnetic_field, plasma, sml.electron_on);
535 
536  if(pol_decomp.gvid0_pid_h.size()==2){
537  // Skip set_weights if there is only one rank per plane
538  pol_decomp.gvid0_pid_h(0) = 1;
539  pol_decomp.gvid0_pid_h(1) = pol_decomp.nnodes_on_plane + 1;
540  }else{
541  TIMER("SET_WEIGHTS",
542  set_weights(pol_decomp.gvid0_pid_h.data(), ptl_count.data(), plasma.f0_node_cost.data()) );
543  }
544  }else{
545 #ifdef USE_MPI
546  // Broadcast proposal to all ranks
547  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
548 #endif
549 
550  // Copy proposal to pol_decomp
551  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
552 
553  if(is_rank_zero()){
554  printf("\nNEW PARTITION:\n");
556  }
557  }
558 
559  // Update bounds and device copy of the decomposition array
560  pol_decomp.update_pol_decomp();
561  }
562 
564  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp,
565  const View<int*,CLayout,HostType>& old_partition){
566  if (plasma.f0_grid()){
567  // Move f information to correct MPI rank after decomposition update
568  TIMER("F0_REDISTRIBUTE",
569  f0_redistribute(plasma, pol_decomp, grid, magnetic_field, vgrid, old_partition) );
570  }
571 
572  // Shift particles to correct MPI rank after decomposition update
573  TIMER("SHIFT_R",
574  shift_all_species(plasma, grid, magnetic_field, pol_decomp, Shift::NoPhase0) );
575  }
576 
577  bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, double f0_cost){
578  if(reweight_option==ReweightOption::Always){
579  // No need to check if model is an improvement
580  return true;
581  }else{
582  if(weighting_algorithm==WeightingAlgorithm::Fortran){
583  // Calculate load imbalance
584  calculate_load_imbalance(f0_cost);
585 
586  // Assess if load imbalance merits a rebalancing
587  int should_rebalance_int = assess_whether_to_rebalance_load();
588 
589  // Reset fortran cost trackers
591 
592  return (should_rebalance_int==1);
593  }else{
594  // Evaluate the proposed partition on rank 0
595  bool should_rebalance;
596  if(weighting_algorithm==WeightingAlgorithm::ParticleBalance){
597  // Since ParticleBalance doesn't contain an internal model, cant evaluate the partition
598  // so just always approve, for now.
599  // Probably better to have the model updatable by something other than time so it can still
600  // be assessed in this mode
601  return true;
602  }else{
603  if(is_rank_zero()) should_rebalance = recommend_proposed_partition();
604  // Convert to int because MPI_C_BOOL documentation is confusing
605  int should_rebalance_int = (should_rebalance ? 1 : 0);
606 #ifdef USE_MPI
607  MPI_Bcast(&should_rebalance_int, 1, MPI_INT, 0, comm);
608 #endif
609  return (should_rebalance_int==1);
610  }
611  }
612  }
613  }
614 
615  public:
616 
617  LoadBalance(NLReader::NamelistReader& nlr, const DomainDecomposition<DeviceType>& pol_decomp, bool sync_planes = true)
618  : proposed_partition(NoInit("proposed_partition"), pol_decomp.gvid0_pid_h.layout())
619 #ifdef USE_MPI
620  , comm(sync_planes ? pol_decomp.mpi.comm : pol_decomp.mpi.plane_comm)
621 #endif
622  {
623  double max_mem_redist_gb = 10.0;
624  std::string weighting_algorithm_str = "Fortran";
625  std::string update_method_str = "NoHistory";
626  if(nlr.namelist_present("load_balance_param")){
627  nlr.use_namelist("load_balance_param");
628  max_mem_redist_gb = nlr.get<double>("max_mem_redist_gb", 10.0); // Sets the maximum amount of memory per rank that can be allocated to particles, in gigabytes.
629  weighting_algorithm_str = nlr.get<std::string>("weighting_algorithm", "Fortran"); // Load balancing method. "Fortran" is the default since the newer methods are experimental. "ParticleBalance" does not incorporate timing data, but only tries to equally distribute particles. "SingleRegionBalance" currently tries to balance the push.
630  threshold_to_rebalance = nlr.get<double>("threshold_to_rebalance", 0.02); // How much better the projected timing of a proposed load redistribution must be than the current distribution in order to adopt the new one. e.g. 0.02 sets a threshold of 2% better.
631  verbose = nlr.get<bool>("verbose", false); // Verbose output of the internal load distribution model and load balancing decisions.
632  update_method_str = nlr.get<std::string>("update_method", "NoHistory"); // Methods for updating internal model of load distribution. "NoHistory" uses only the most recent time step, while "ExpHistory" averages the existing model with the new step's timing information. Does not apply when weighting_algorithm is "Fortran".
633  }else{
634  threshold_to_rebalance = 0.02;
635  verbose = false;
636  }
637 
638  // Simple model for memory usage from load balance - just particles
639  double max_n_ptl_on_rank = max_mem_redist_gb*1024*1024*1024/80.0; // ~ 80 B per particle
640 
641  if(weighting_algorithm_str=="Fortran"){
643  }else if(weighting_algorithm_str=="ParticleBalance"){
645  }else if(weighting_algorithm_str=="SingleRegionBalance"){
647  }else{
648  exit_XGC("\nError: weighting_algorithm input not valid.");
649  }
650 
651 #ifdef USE_MPI
652  // If planes are sync'd, use intpl_comm to coordinate them; otherwise, use MPI_COMM_SELF
653  MPI_Comm inter_period_comm = sync_planes ? pol_decomp.mpi.intpl_comm : MPI_COMM_SELF;
654 
655  // Set up load balance regions
657  LoadRegion::UpdateMethod update_method;
658  if(update_method_str=="NoHistory"){
659  update_method = LoadRegion::UpdateMethod::NoHistory;
660  }else if(update_method_str=="ExpHistory"){
661  update_method = LoadRegion::UpdateMethod::ExpHistory;
662  }else{
663  exit_XGC("\nError: update_method input not valid.");
664  }
665 
666  regions.push_back(LoadRegion(pol_decomp.nnodes_on_plane, inter_period_comm, pol_decomp.mpi.plane_comm, update_method, verbose,
667  "Push",
668  {"ipc1:PUSHE",
669  "ipc1:PUSHI",
670  "ipc2:PUSHE",
671  "ipc2:PUSHI"}));
672  }
673 
674  // Hard-code for now
676 
677  if(constraint_option == ConstraintOption::ParticleCount){
678  // Normalize by n_planes since we are using the sum of planes as the constraint
679  int n_periods;
680  MPI_Comm_size(inter_period_comm, &n_periods);
681  constraint1_max = max_n_ptl_on_rank*n_periods;
682  }else{
683  exit_XGC("\nError: ParticleConstraint is only available ConstraintOption.\n");
684  }
685 #endif
686  }
687 
689  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, ReweightOption reweight_option,
690  WeightingAlgorithm weighting_algorithm = WeightingAlgorithm::Default){
691 
692  // Skip rebalancing if poloidal decomposition is turned off. Normally we should put the function in
693  // an if statement rather than using return; kept here in case we might want some balancing for some reason
694  // even without poloidal decomposition
695  if(pol_decomp.pol_decomp==false) return;
696 
697  // Rebalance is trivial if there is only one rank on a plane (generalize this from no-MPI case)
698 #ifdef USE_MPI
699 
700  if(weighting_algorithm == WeightingAlgorithm::Default){
701  weighting_algorithm = default_weighting_algorithm;
702  }
703 
704  if(default_weighting_algorithm==WeightingAlgorithm::Fortran){
705  // Override input algorithm; if the default is Fortran, it should always use the fortran load balancer
706  weighting_algorithm = default_weighting_algorithm;
707  }
708 
709  if(is_rank_zero() && verbose){
710  if(weighting_algorithm==WeightingAlgorithm::Fortran) printf("\nLoad balance called with Fortran algorithm\n");
711  else if(weighting_algorithm==WeightingAlgorithm::ParticleBalance) printf("\nLoad balance called with Ptl algorithm\n");
712  else if(weighting_algorithm==WeightingAlgorithm::SingleRegionBalance) printf("\nLoad balance called with SingleRegion algorithm\n");
713  }
714 
715  GPTLstart("REBALANCE");
716  if(weighting_algorithm!=WeightingAlgorithm::Fortran){
717  GPTLstart("PTL_COUNT");
719  long long int total_n_ptl = species.get_total_n_ptl();
720  int max_n_ptl = species.get_max_n_ptl();
721  double avg_n_ptl = (double)(total_n_ptl)/pol_decomp.mpi.nranks;
722  double max_ratio = max_n_ptl/avg_n_ptl;
723  if(is_rank_zero()) printf(" Species %d: (max/avg ptl per rank = %1.2f); total n_ptl = %lld\n", species.idx, max_ratio, total_n_ptl);
725  GPTLstop("PTL_COUNT");
726 
727  GPTLstart("PTL_COUNT_PER_NODE");
728  auto ptl_count = count_all_ptl_per_node(grid, magnetic_field, plasma);
729  GPTLstop("PTL_COUNT_PER_NODE");
730 
731  if(weighting_algorithm!=WeightingAlgorithm::ParticleBalance){
732  if(model_is_initialized()){
733  // Update the model with the latest timing
734  TIMER("LOAD_BAL_UPDATE",
735  update_model(pol_decomp.gvid0_pid_h) );
736  }else{
737  if(is_rank_zero() && verbose) printf("Initializing timing model, no partition proposal yet.");
738  initialize_model();
739  GPTLstop("REBALANCE");
740  return;
741  }
742  }
743 
744  // With the updated model, proposed a new partition
745  GPTLstart("LOAD_BAL_NEW_PART");
746  if (is_rank_zero()) propose_new_partition(ptl_count, weighting_algorithm);
747  GPTLstop("LOAD_BAL_NEW_PART");
748  }
749 
750  GPTLstart("LOAD_BAL_REBAL");
751  double f0_cost = plasma.f0_grid() ? sum_view(plasma.f0_node_cost) : 0.0;
752  if(will_rebalance(reweight_option, weighting_algorithm, f0_cost)){
753  // Save the old partition, it is used to save some communication in the f0 redistribution
754  View<int*,CLayout,HostType> old_partition(NoInit("old_partition"), pol_decomp.gvid0_pid_h.layout());
755  Kokkos::deep_copy(old_partition, pol_decomp.gvid0_pid_h);
756 
757  // Update pol_decomp
758  TIMER("LOAD_BAL_SET_NEW",
759  set_new_partition(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, weighting_algorithm) );
760 
761  // Update particles and f0
762  TIMER("LOAD_BAL_REDIST",
763  redistribute_load(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, old_partition) );
764  }
765  GPTLstop("LOAD_BAL_REBAL");
766  GPTLstop("REBALANCE");
767 #endif
768  }
769 
770 
771  // More generic rebalance
772  void rebalance(DomainDecomposition<DeviceType>& pol_decomp, const View<double*,CLayout,HostType>& constraint, const std::vector<double>& timings, double& load_imbalance, View<double*,HostType>& model_belief){
773 #ifdef USE_MPI
774  update_model(pol_decomp.gvid0_pid_h, timings);
775 
776  load_imbalance = regions[0].get_observed_load_imbalance();
777  model_belief = regions[0].get_estimated_time_per_vertex();
778 
779  // Propose new partition
780  if (is_rank_zero()) propose_new_partition(constraint, default_weighting_algorithm);
781 
782  double f0_cost = 0.0;
783  if(will_rebalance(ReweightOption::IfBetter, default_weighting_algorithm, f0_cost)){
784  // Broadcast proposal to all ranks
785  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
786 
787  // Copy proposal to pol_decomp
788  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
789 
790  // Update bounds and device copy of the decomposition array
791  pol_decomp.update_pol_decomp();
792 
793  // If there were a real load, redistribute it here
794  }
795 #endif
796  }
797 };
798 
799 #endif
bool model_has_history
Definition: load_balance.hpp:30
View< double *, HostType > estimated_time_per_vertex
Definition: load_balance.hpp:25
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, double f0_cost)
Definition: load_balance.hpp:577
bool is_rank_zero()
Definition: globals.hpp:27
void one_weight_balance(const View< double *, HostType > &weight, const View< double *, CLayout, HostType > constraint1)
Definition: load_balance.hpp:388
double threshold_to_rebalance
Definition: load_balance.hpp:300
T get(const string &param, const T default_val, int val_ind=0)
Definition: NamelistReader.hpp:373
double time_accumulated
Definition: load_balance.hpp:44
bool f0_grid() const
Definition: plasma.hpp:202
double predicted_max_region_time
Definition: load_balance.hpp:31
double observed_max_region_time
Definition: load_balance.hpp:32
double get_largest_predicted_time(const View< int *, CLayout, HostType > &partition, const View< double *, HostType > &weight) const
Definition: load_balance.hpp:372
Definition: velocity_grid.hpp:8
Definition: sml.hpp:8
void update_model_exp_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:62
int n_unique_ranks
How many ranks are in a &#39;period&#39; (in tokamaks, in a plane)
Definition: load_balance.hpp:41
LoadBalance(NLReader::NamelistReader &nlr, const DomainDecomposition< DeviceType > &pol_decomp, bool sync_planes=true)
Definition: load_balance.hpp:617
subroutine plasma(grid, itr, p, dene_out, deni_out, Te_out, Ti_out, Vparai_out)
Calculate the plasma density, temperature, and parallel velocity for a point in triangle itr using pl...
Definition: neutral_totalf.F90:1224
void rebalance(DomainDecomposition< DeviceType > &pol_decomp, const View< double *, CLayout, HostType > &constraint, const std::vector< double > &timings, double &load_imbalance, View< double *, HostType > &model_belief)
Definition: load_balance.hpp:772
Definition: plasma.hpp:107
Definition: NamelistReader.hpp:193
Definition: magnetic_field.hpp:12
void propose_new_partition(const Kokkos::View< double *, Kokkos::LayoutRight, HostType > &ptl_count, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:514
int idx
Index in all_species.
Definition: species.hpp:78
bool verbose
Definition: load_balance.hpp:301
void for_all_nonadiabatic_species(F func, DevicePtlOpt device_ptl_opt=UseDevicePtl)
Definition: plasma.hpp:126
double prediction_undershoot
Definition: load_balance.hpp:34
double get_even_division(const View< double *, HostType > &input, int n) const
Definition: load_balance.hpp:305
Kokkos::View< double **, Kokkos::LayoutRight, HostType > count_ptl_per_node_elec_main_ion(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma, bool kinetic_electrons)
Definition: count_ptl_per_node.cpp:29
double observed_load_imbalance
Definition: load_balance.hpp:33
void update_pol_decomp()
Definition: domain_decomposition.tpp:209
void update_model(const View< int *, CLayout, HostType > &current_partition)
Definition: load_balance.hpp:487
long long int get_total_n_ptl()
Definition: species.hpp:693
void f0_redistribute(Plasma &plasma, const DomainDecomposition< DeviceType > &pol_decomp, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, const View< int *, CLayout, HostType > &old_partition)
Definition: f0_redistribute.cpp:182
void set_new_partition(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:529
#define TIMER(N, F)
Definition: timer_macro.hpp:24
void rebalance(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm=WeightingAlgorithm::Default)
Definition: load_balance.hpp:688
void use_namelist(const string &namelist)
Definition: NamelistReader.hpp:355
ConstraintOption
Definition: load_balance.hpp:279
void update_model(const View< int *, CLayout, HostType > &current_partition, const std::vector< double > &manual_times)
Definition: load_balance.hpp:494
double get_estimated_time_per_vertex(int i) const
Definition: load_balance.hpp:157
int my_period_rank
Definition: load_balance.hpp:42
int nnodes_on_plane
Number of nodes on local plane.
Definition: domain_decomposition.hpp:53
void calculate_load_imbalance(double f0_cost)
void initialize_model()
Definition: load_balance.hpp:198
WeightingAlgorithm default_weighting_algorithm
Definition: load_balance.hpp:290
View< int *, CLayout, HostType > proposed_partition
Which processors get which vertices.
Definition: load_balance.hpp:294
double get_time_since_previous_call()
Definition: load_balance.hpp:92
bool get_model_is_initialized() const
Definition: load_balance.hpp:177
UpdateMethod
Definition: load_balance.hpp:18
std::string region_name
Definition: load_balance.hpp:26
bool verbose
Definition: load_balance.hpp:28
std::vector< LoadRegion > regions
Definition: load_balance.hpp:292
bool namelist_present(const string &namelist)
Definition: NamelistReader.hpp:351
UpdateMethod update_method
Definition: load_balance.hpp:46
bool model_is_initialized
Definition: load_balance.hpp:29
bool electron_on
Use kinetic electrons.
Definition: sml.hpp:63
void reset_cost_trackers()
void redistribute_load(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, const View< int *, CLayout, HostType > &old_partition)
Definition: load_balance.hpp:563
double get_largest_predicted_time(const View< int *, CLayout, HostType > &proposed_partition) const
Definition: load_balance.hpp:182
void initialize_model()
Definition: load_balance.hpp:500
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:103
bool recommend_proposed_partition()
Definition: load_balance.hpp:449
void exit_XGC(std::string msg)
Definition: globals.hpp:37
bool greedily_fill_partition(const View< double *, HostType > &weight, const View< double *, HostType > &constraint1, double target_weight_per_rank)
Definition: load_balance.hpp:315
int n_periods
How many repeating periods there are; in tokamaks this is planes.
Definition: load_balance.hpp:40
Definition: magnetic_field.F90:1
Definition: load_balance.hpp:16
void print_new_partition()
Definition: load_balance.hpp:479
WeightingAlgorithm
Definition: load_balance.hpp:272
int assess_whether_to_rebalance_load()
View< double *, HostType > get_estimated_time_per_vertex() const
Definition: load_balance.hpp:161
Definition: plasma.hpp:13
double constraint1_max
Definition: load_balance.hpp:303
void set_weights(int *gvid0_pid, double *ptl_count, double *f0_node_cost)
View< double *, CLayout, HostType > count_all_ptl_per_node(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma)
Definition: count_ptl_per_node.cpp:71
ReweightOption
Definition: load_balance.hpp:283
View< double *, CLayout, HostType > f0_node_cost
Definition: plasma.hpp:36
void shift_all_species(Plasma &plasma, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const DomainDecomposition< DeviceType > &pol_decomp, Shift::ShiftPh0 shift_ph0)
Definition: shift.cpp:301
void reset_timer()
Definition: load_balance.hpp:152
Definition: species.hpp:75
void update_model(const View< int *, CLayout, HostType > &current_partition, double manual_time=-1.0)
Definition: load_balance.hpp:205
bool model_is_initialized()
Definition: load_balance.hpp:506
std::vector< std::string > timer_names
Definition: load_balance.hpp:27
Definition: shift.hpp:10
bool pol_decomp
Use poloidal decomposition.
Definition: domain_decomposition.hpp:46
double get_observed_max_region_time() const
Definition: load_balance.hpp:169
Definition: load_balance.hpp:270
void touch_timers()
Definition: load_balance.hpp:116
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:69
void update_model_no_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:48
double get_prediction_undershoot() const
Definition: load_balance.hpp:165
double get_observed_load_imbalance() const
Definition: load_balance.hpp:173
int get_max_n_ptl()
Definition: species.hpp:704
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
Kokkos::View< int *, Kokkos::LayoutRight, HostType > gvid0_pid_h
Which processors get which vertices (host)
Definition: domain_decomposition.hpp:63