XGCa
load_balance.hpp
Go to the documentation of this file.
1 #ifndef LOAD_BALANCE_HPP
2 #define LOAD_BALANCE_HPP
3 
4 #include "timer_macro.hpp"
5 #include "shift.hpp"
6 #include "count_ptl_per_node.hpp"
7 #include "view_arithmetic.hpp"
8 #include "f0_redistribute.hpp"
9 
10 // Used for original fortran load balancer
11 extern "C" void calculate_load_imbalance(int istep, int gstep, double f0_cost);
12 extern "C" int assess_whether_to_rebalance_load(int istep);
13 extern "C" void reset_cost_trackers();
14 extern "C" void set_weights(int* gvid0_pid, double* ptl_count, double* f0_node_cost);
15 
16 class LoadRegion{
17  public:
18  enum class UpdateMethod{
19  NoHistory=0,
21  };
22 
23  private:
24 
25  View<double*,HostType> estimated_time_per_vertex;
26  std::string region_name;
27  std::vector<std::string> timer_names;
28  bool verbose;
35 
36 #ifdef USE_MPI
37  MPI_Comm inter_period_comm;
38  MPI_Comm period_comm;
39 #endif
40  int n_periods;
43 
45 
47 
48  void update_model_no_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
49  // Loop over ranks' timers
50  for(int i=0; i<all_periods_timings.size(); i++){
51  int node_offset = current_partition(i) - 1;
52  int next_node_offset = current_partition(i+1) - 1;
53  int nnodes = next_node_offset - node_offset;
54  // Best 0th order estimate is to assume all vertices took the same time
55  double time_per_node = all_periods_timings(i)/nnodes;
56  for(int j=0; j<nnodes; j++){
57  estimated_time_per_vertex(j + node_offset) = time_per_node;
58  }
59  }
60  }
61 
62  void update_model_exp_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
63  // Loop over ranks' timers
64  for(int i=0; i<all_periods_timings.size(); i++){
65  // Get partition owned by this rank
66  int node_offset = current_partition(i) - 1;
67  int next_node_offset = current_partition(i+1) - 1;
68  int nnodes = next_node_offset - node_offset;
69 
70  // Get average time per node for this rank
71  double avg_time_per_node = all_periods_timings(i)/nnodes;
72 
73  // Get expected time based on existing model
74  double expected_time = 0.0;
75  for(int j=0; j<nnodes; j++){
76  expected_time += estimated_time_per_vertex(j + node_offset);
77  }
78  double expected_time_per_node = expected_time/nnodes;
79 
80  // Difference between observed time and expected time
81  double extra_time_per_node = avg_time_per_node - expected_time_per_node;
82 
83  for(int j=0; j<nnodes; j++){
84  // Distribute extra time evenly
85  constexpr double adjustment_rate = 0.5;
86 
87  estimated_time_per_vertex(j + node_offset) += extra_time_per_node*adjustment_rate;
88  }
89  }
90  }
91 
93  // Get time from camtimers (accumulated)
94  double new_accumulated_time = 0.0;
95 
96  // Loop through all timers associated with this region, and add the accumulated time for each
97  for(int i=0; i<timer_names.size(); i++){
98  double accumulated_single_timer = 0;
99  int ierr = GPTLget_wallclock(timer_names[i].c_str(), -1, &accumulated_single_timer);
100 
101  // If ierr != 0, timer is not yet defined, so the accumulated time is zero
102  if(ierr != 0) accumulated_single_timer = 0.0;
103 
104  new_accumulated_time += accumulated_single_timer;
105  }
106 
107  // Subtract previous accumulated time to get time spent since last update
108  double time_spent = new_accumulated_time - time_accumulated;
109 
110  // Save new accumulated time
111  time_accumulated = new_accumulated_time;
112 
113  return time_spent;
114  }
115 
116  void touch_timers(){
117  for(int i=0; i<timer_names.size(); i++){
118  GPTLstart(timer_names[i].c_str());
119  GPTLstop(timer_names[i].c_str());
120  }
121  }
122 
123  public:
124 
125 #ifdef USE_MPI
126  LoadRegion(int n_vertices, const MPI_Comm& inter_period_comm, const MPI_Comm& period_comm, UpdateMethod update_method, bool verbose, std::string region_name, std::vector<std::string> timer_names)
128  verbose(verbose),
129  time_accumulated(0.0),
130  inter_period_comm(inter_period_comm),
131  period_comm(period_comm),
132  estimated_time_per_vertex("estimated_time_per_vertex",n_vertices),
138  {
139  // Touch GPTL timers to be sure they exist
140  touch_timers();
141 
142  // Initialize timer
143  reset_timer();
144 
145  // Get problem size from comms
146  MPI_Comm_rank(period_comm, &my_period_rank);
147  MPI_Comm_size(period_comm, &n_unique_ranks);
148  MPI_Comm_size(inter_period_comm, &n_periods);
149  }
150 #endif
151 
152  void reset_timer(){
153  // Reset timer by getting the time since the previous call and suppressing the output
154  double time_spent = get_time_since_previous_call();
155  }
156 
157  double get_estimated_time_per_vertex(int i) const{
158  return estimated_time_per_vertex(i);
159  }
160 
161  View<double*,HostType> get_estimated_time_per_vertex() const{
163  }
164 
166  return prediction_undershoot;
167  }
168 
171  }
172 
175  }
176 
178  return model_is_initialized;
179  }
180 
181  // Predict performance of new partition based on model
182  double get_largest_predicted_time(const View<int*,CLayout,HostType>& proposed_partition) const{
183  double largest_time = 0.0;
184  //int largest_time_ind = 0;
185  for(int i=0; i<(proposed_partition.size()-1); i++){
186  double proc_time = 0.0;
187  for(int i_node=proposed_partition(i)-1; i_node<proposed_partition(i+1)-1; i_node++){
188  proc_time += estimated_time_per_vertex(i_node);
189  }
190  if(proc_time>largest_time){
191  //largest_time_ind = i;
192  largest_time = proc_time;
193  }
194  }
195  return largest_time;
196  }
197 
199  reset_timer();
200 
201  // Model can now be used
202  model_is_initialized = true;
203  }
204 
205  void update_model(const View<int*,CLayout,HostType>& current_partition, double manual_time=-1.0){
207 
208  // Allocate for timing of each
209  View<double*, HostType> all_periods_timings(NoInit("all_periods_timings"), n_unique_ranks);
210 
211 #ifdef USE_MPI
212  // Get time spent since last model update
213  // Time can be entered manually; it's used if positive (i.e. the input was provided)
214  double time_spent_this_rank = (manual_time<0.0 ? get_time_since_previous_call()
215  : manual_time);
216 
217  // Reduce onto plane 0
218  // Could try MPI_SUM rather than MPI_MAX
219  double time_spent;
220  if(n_periods>1){
221  MPI_Reduce(&time_spent_this_rank, &time_spent, 1, MPI_DOUBLE, MPI_MAX, 0, inter_period_comm);
222  }else{
223  time_spent = time_spent_this_rank;
224  }
225 
226  // Gather from all ranks in plane 0
227  MPI_Gather(&time_spent, 1, MPI_DOUBLE, all_periods_timings.data(), 1, MPI_DOUBLE, 0, period_comm);
228 #endif
229 
230  if (is_rank_zero()){
231  // Look at the timing prediction made last time
233  if(verbose) printf("\nPredicted max time in this region was %1.5e", predicted_max_region_time);
234 
235  // Get max region time
237  double observed_sum_region_time = 0.0;
238  for(int i=0; i<all_periods_timings.size(); i++){
239  observed_max_region_time = std::max(observed_max_region_time, all_periods_timings(i));
240  observed_sum_region_time += all_periods_timings(i);
241  }
242  double observed_idle_region_time = observed_max_region_time*all_periods_timings.size() - observed_sum_region_time;
243  observed_load_imbalance = observed_idle_region_time/observed_sum_region_time;
244  if(verbose) printf("\nObserved max time in this region was %1.5e", observed_max_region_time);
245  if(verbose) printf("\n - Load imbalance (T_idle/T_work): %1.2f%%", observed_load_imbalance*100);
246 
247  // Take the max here so that there is never an assumed overshoot
248  if(predicted_max_region_time!=0.0){ // If time is zero, there hasn't been a prediction yet
250  }
251  if(verbose) printf("\nThe prediction undershot by a factor of %1.5e", observed_max_region_time/predicted_max_region_time);
252 
254  update_model_no_history(current_partition, all_periods_timings);
256  if(model_has_history){
257  update_model_exp_history(current_partition, all_periods_timings);
258  }else{
259  update_model_no_history(current_partition, all_periods_timings);
260  model_has_history = true;
261  }
262  }
263  }
264  }else{
265  exit_XGC("Error: Update method not available\n");
266  }
267  }
268 };
269 
271  public:
272  enum class WeightingAlgorithm{
275  Fortran,
276  Default
277  };
278 
279  enum class ConstraintOption{
280  ParticleCount=0
281  };
282 
283  enum class ReweightOption{
284  Always=0,
285  IfBetter
286  };
287 
288  private:
289 
291 
292  std::vector<LoadRegion> regions;
293 
294  View<int*,CLayout,HostType> proposed_partition;
295 
296 #ifdef USE_MPI
297  MPI_Comm comm;
298 #endif
299 
301  bool verbose;
302 
304 
305  double get_even_division(const View<double*,HostType>& input, int n) const{
306  double total = 0.0;
307  Kokkos::parallel_reduce("sum", Kokkos::RangePolicy<HostExSpace>(0,input.size()), [=]( const int i, double& l_total){
308  l_total += input(i);
309  }, total);
310 
311  return (total/n);
312  }
313 
314 
315  bool greedily_fill_partition(const View<double*,HostType>& weight, const View<double*,HostType>& constraint1, double target_weight_per_rank){
316  int nnode = weight.size();
317  int nproc = proposed_partition.size()-1;
318 
319  // Start from rank 0 of the plane; assign nodes until it meets the desired weight per rank, then move to next rank
320  double constraint1_in_this_rank = 0.0;
321  double weight_in_this_rank = 0.0;
322  proposed_partition(0) = 1; // 1-indexed
323  int pid = 0;
324  bool assign_one_node_per_proc = false;
325  for(int i_node = 0; i_node<nnode; i_node++){
326  // Since every rank needs at last one node, switch to assigning at every
327  // iteration if there are only as many nodes left as ranks
328  if(nnode-i_node==nproc-pid) assign_one_node_per_proc = true;
329 
330  // Check if a criterion is met to consider the rank sufficiently loaded
331  bool rank_is_loaded = false;
332 
333  // Criterion 1: We are out of nodes
334  if(assign_one_node_per_proc) rank_is_loaded = true;
335 
336  // Criterion 2: We have hit a constraint
337  if(constraint1_in_this_rank>=constraint1_max) rank_is_loaded = true;
338 
339  // Criterion 3: We have loaded the rank with an even amount of work
340  if(weight_in_this_rank>=target_weight_per_rank) rank_is_loaded = true;
341 
342  // If there are enough particles assigned to this rank, move to the next rank
343  if(rank_is_loaded){
344  proposed_partition(pid+1) = i_node+1; // 1-indexed
345  if(verbose) printf("\nRank %d is loaded (Nodes %d-%d); weight=%1.4e, ptl*nplanes=%d", pid, proposed_partition(pid), proposed_partition(pid+1)-1, weight_in_this_rank, int(constraint1_in_this_rank));
346  constraint1_in_this_rank = 0.0;
347  weight_in_this_rank = 0.0;
348  pid++;
349  if(pid==nproc-1) break;
350  }
351  constraint1_in_this_rank += constraint1(i_node);
352  weight_in_this_rank += weight(i_node);
353  }
354 
355  // Last value will always be nnode+1
356  proposed_partition(nproc) = nnode+1;
357 
358  // Check n_ptl in final rank
359  constraint1_in_this_rank = 0.0;
360  for(int i_node=proposed_partition(nproc-1)-1; i_node<nnode; i_node++){
361  constraint1_in_this_rank += constraint1(i_node);
362  }
363  if(constraint1_in_this_rank>=constraint1_max){
364  // Return false if the constraint was not met
365  return false;
366  }else{
367  return true;
368  }
369  }
370 
371  // Predict performance of new partition based on model
372  double get_largest_predicted_time(const View<int*,CLayout,HostType>& partition, const View<double*,HostType>& weight) const{
373  double largest_time = 0.0;
374  //int largest_time_ind = 0;
375  for(int i=0; i<(partition.size()-1); i++){
376  double proc_time = 0.0;
377  for(int i_node=partition(i)-1; i_node<partition(i+1)-1; i_node++){
378  proc_time += weight(i_node);
379  }
380  if(proc_time>largest_time){
381  //largest_time_ind = i;
382  largest_time = proc_time;
383  }
384  }
385  return largest_time;
386  }
387 
388  void one_weight_balance(const View<double*,HostType>& weight, const View<double*,CLayout,HostType> constraint1){
389  int nproc = proposed_partition.size()-1;
390 
391  // Ideally, each rank would get the same amount of work, i.e. the average
392  double ideal_weight_per_rank = get_even_division(weight, nproc);
393 
394  // Make initial attempt, targeting even distribution of work
395  bool meets_constraints = greedily_fill_partition(weight, constraint1, ideal_weight_per_rank);
396 
397  if(!meets_constraints){
398  // An equal work distribution cannot be assigned due to a constraint.
399  // First, determine the weights if only the constraint were followed
400  meets_constraints = greedily_fill_partition(constraint1, constraint1, get_even_division(constraint1, nproc));
401  if(!meets_constraints) exit_XGC("\nUnexpected issue in load balance: constraint-based partition doesn't satisfy constraints\n");
402 
403  // This partition meets the constraints, but the proposed timing is likely unacceptable
404  double upper_limit_weight_per_rank = get_largest_predicted_time(proposed_partition, weight);
405  if(verbose) printf("The ideal distribution (%1.3e) does not satisfy constraints. This distribution (%1.3e) does.\n", ideal_weight_per_rank, upper_limit_weight_per_rank);
406 
407  // Bisect the difference between the original target time and the upper limit, and see if that satisfies the constraints
408  // Continue until the load balance has been minimized to a precision based on the the original desired time
409  const double desired_precision = 0.01; // Fraction of imbalance that's worth even checking whether it can be removed
410  double desired_step_size = ideal_weight_per_rank*desired_precision;
411  double step_size = upper_limit_weight_per_rank - ideal_weight_per_rank;
412  double compromise_weight_per_rank = upper_limit_weight_per_rank;
413  if(verbose) printf("\nEmploying a binary search to narrow down on the best option.");
414  while(step_size>desired_step_size){
415  // Halve the size of the step
416  step_size /= 2;
417 
418  if(meets_constraints){
419  // Try reducing the load imbalance since constraints were met
420  compromise_weight_per_rank -= step_size;
421  }else{
422  // Try raising the load imbalance since constraints were broken
423  compromise_weight_per_rank += step_size;
424  }
425 
426  // Get new partition
427  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
428  if(verbose) printf("\n Stepped by %1.3e to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
429  }
430  // In case we end at a partition that fails
431  while(!meets_constraints){
432  compromise_weight_per_rank += step_size;
433  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
434  if(verbose) printf("\n Stepped by %1.3e UP to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
435  }
436  if(verbose) printf("\n");
437  }
438  }
439 
440  // TODO
441  // Evaluate overall model performance
442  // MAIN_LOOP time vs sum of regions (% coverage)
443  //
444 
445  /* This function assesses the proposed partition and decides whether it is worth switching to.
446  * The simplest formulation is to recommend the new partition if the new partition is predicted to be faster
447  * than the existing one. There is also a factor to account for historical inaccuracy of the previous estimate.
448  * A future option would be to require a certain level of improvement before repartitioning. */
450  // Total time spent in regions (assuming that global barriers demarcate them):
451  double observed_max_all_rgns_time = 0.0;
452  for(int i=0; i<regions.size(); i++){
453  observed_max_all_rgns_time += regions[i].get_observed_max_region_time();
454  }
455 
456  // Total predicted time, accounting for observed prediction undershoot for each region
457  double predicted_max_all_rgns_time = 0.0;
458  for(int i=0; i<regions.size(); i++){
459  predicted_max_all_rgns_time += regions[i].get_largest_predicted_time(proposed_partition)*regions[i].get_prediction_undershoot();
460  }
461 
462  double fractional_improvement = 1.0 - predicted_max_all_rgns_time/observed_max_all_rgns_time;
463 
464  if(verbose){
465  printf("\nDetermining whether to adopt the proposed partition:");
466  printf("\n Observed time with current partition was: %1.3e", observed_max_all_rgns_time);
467  printf("\n Predicted time with proposed partition, adjusting for historical undershoot (of Rgn 0: %1.3e): %1.3e", regions[0].get_prediction_undershoot(), predicted_max_all_rgns_time);
468  printf("\n Adopted if Fractional improvement of adjusted prediction (%1.3e) exceeds specified threshold (%1.3e)\n", fractional_improvement, threshold_to_rebalance);
469  }
470 
471  // Recommend the new partition if the predicted time is less than the time observed with the current partition
472  if(fractional_improvement > threshold_to_rebalance){
473  return true;
474  }else{
475  return false;
476  }
477  }
478 
480  // Print new partition
481  for(int i = 0; i<proposed_partition.size()-1; i++){
482  int nnodes_for_proc = proposed_partition(i+1) - proposed_partition(i);
483  printf("process %d : nnodes = %d; range = (%d - %d)\n", i, nnodes_for_proc, proposed_partition(i), proposed_partition(i+1)-1);
484  }
485  }
486 
487  void update_model(const View<int*,CLayout,HostType>& current_partition){
488  for(int i=0; i<regions.size(); i++){
489  regions[i].update_model(current_partition);
490  }
491  }
492 
493  // Overload if providing timings manually rather than using camtimers. This is used for testing.
494  void update_model(const View<int*,CLayout,HostType>& current_partition, const std::vector<double>& manual_times){
495  for(int i=0; i<regions.size(); i++){
496  regions[i].update_model(current_partition, manual_times[i]);
497  }
498  }
499 
501  for(int i=0; i<regions.size(); i++){
502  regions[i].initialize_model();
503  }
504  }
505 
507  bool is_initialized=true;
508  for(int i=0; i<regions.size(); i++){
509  is_initialized = (is_initialized && regions[i].get_model_is_initialized());
510  }
511  return is_initialized;
512  }
513 
514  void propose_new_partition(const Kokkos::View<double*,Kokkos::LayoutRight,HostType>& ptl_count, WeightingAlgorithm weighting_algorithm){
515  auto constraint1 = ptl_count; // Constrain based on particle count (memory)
516  if(weighting_algorithm == WeightingAlgorithm::SingleRegionBalance){
517  if(regions.size()!=1) exit_XGC("Error: Load balancing is currently single-region only\n");
518  one_weight_balance(regions[0].get_estimated_time_per_vertex(), constraint1);
519  }else if(weighting_algorithm == WeightingAlgorithm::ParticleBalance){
520  one_weight_balance(ptl_count, constraint1);
521  }
522 
523  if(verbose){
524  printf("\nNewly PROPOSED partition:\n");
526  }
527  }
528 
530  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, WeightingAlgorithm weighting_algorithm){
531  if(weighting_algorithm==WeightingAlgorithm::Fortran){
532  // In the fortran algorithm the new distribution is calculated and set here
533  // Count number of particles belonging to each node
534  Kokkos::View<double**,Kokkos::LayoutRight,HostType> ptl_count = count_ptl_per_node_elec_main_ion(grid, magnetic_field, plasma, sml.electron_on);
535 
536  if(pol_decomp.gvid0_pid_h.size()==2){
537  // Skip set_weights if there is only one rank per plane
538  pol_decomp.gvid0_pid_h(0) = 1;
539  pol_decomp.gvid0_pid_h(1) = pol_decomp.nnodes_on_plane + 1;
540  }else{
541 #ifndef NO_FORTRAN_MODULES
542  TIMER("SET_WEIGHTS",
543  set_weights(pol_decomp.gvid0_pid_h.data(), ptl_count.data(), plasma.f0_node_cost.data()) );
544 #endif
545  }
546  }else{
547 #ifdef USE_MPI
548  // Broadcast proposal to all ranks
549  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
550 #endif
551 
552  // Copy proposal to pol_decomp
553  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
554 
555  if(is_rank_zero()){
556  printf("\nNEW PARTITION:\n");
558  }
559  }
560 
561  // Update bounds and device copy of the decomposition array
562  pol_decomp.update_pol_decomp();
563  GPTLstart("update_flux_surf");
564  pol_decomp.update_flux_surf(grid.flux_surfaces_h);
565  GPTLstop("update_flux_surf");
566  }
567 
569  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp,
570  const View<int*,CLayout,HostType>& old_partition){
571  if (plasma.f0_grid()){
572  // Move f information to correct MPI rank after decomposition update
573  TIMER("F0_REDISTRIBUTE",
574  f0_redistribute(plasma, pol_decomp, grid, magnetic_field, vgrid, old_partition) );
575  }
576 
577  // Shift particles to correct MPI rank after decomposition update
578  TIMER("SHIFT_R",
579  shift_all_species(plasma, grid, magnetic_field, pol_decomp) );
580  }
581 
582  bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, int istep, int gstep, double f0_cost){
583  if(reweight_option==ReweightOption::Always){
584  // No need to check if model is an improvement
585  return true;
586  }else{
587  if(weighting_algorithm==WeightingAlgorithm::Fortran){
588 #ifndef NO_FORTRAN_MODULES
589  // Calculate load imbalance
590  calculate_load_imbalance(istep, gstep, f0_cost);
591 
592  // Assess if load imbalance merits a rebalancing
593  int should_rebalance_int = assess_whether_to_rebalance_load(istep);
594 
595  // Reset fortran cost trackers
597 
598  return (should_rebalance_int==1);
599 #else
600  return false;
601 #endif
602  }else{
603  // Evaluate the proposed partition on rank 0
604  bool should_rebalance;
605  if(weighting_algorithm==WeightingAlgorithm::ParticleBalance){
606  // Since ParticleBalance doesn't contain an internal model, cant evaluate the partition
607  // so just always approve, for now.
608  // Probably better to have the model updatable by something other than time so it can still
609  // be assessed in this mode
610  return true;
611  }else{
612  if(is_rank_zero()) should_rebalance = recommend_proposed_partition();
613  // Convert to int because MPI_C_BOOL documentation is confusing
614  int should_rebalance_int = (should_rebalance ? 1 : 0);
615 #ifdef USE_MPI
616  MPI_Bcast(&should_rebalance_int, 1, MPI_INT, 0, comm);
617 #endif
618  return (should_rebalance_int==1);
619  }
620  }
621  }
622  }
623 
624  public:
625 
627 
628  LoadBalance(NLReader::NamelistReader& nlr, const DomainDecomposition<DeviceType>& pol_decomp, bool sync_planes = true)
629  : proposed_partition(NoInit("proposed_partition"), pol_decomp.gvid0_pid_h.layout())
630 #ifdef USE_MPI
631  , comm(sync_planes ? pol_decomp.mpi.comm : pol_decomp.mpi.plane_comm)
632 #endif
633  {
634 
635  nlr.use_namelist("load_balance_param", NLReader::NamelistReader::Optional);
636  double max_mem_redist_gb = nlr.get<double>("max_mem_redist_gb", 10.0); // Sets the maximum amount of memory per rank that can be allocated to particles, in gigabytes.
637  std::string weighting_algorithm_str = nlr.get<std::string>("weighting_algorithm", "Fortran"); // Load balancing method. "Fortran" is the default since the newer methods are experimental. "ParticleBalance" does not incorporate timing data, but only tries to equally distribute particles. "SingleRegionBalance" currently tries to balance the push.
638  threshold_to_rebalance = nlr.get<double>("threshold_to_rebalance", 0.02); // How much better the projected timing of a proposed load redistribution must be than the current distribution in order to adopt the new one. e.g. 0.02 sets a threshold of 2% better.
639  verbose = nlr.get<bool>("verbose", false); // Verbose output of the internal load distribution model and load balancing decisions.
640  std::string update_method_str = nlr.get<std::string>("update_method", "NoHistory"); // Methods for updating internal model of load distribution. "NoHistory" uses only the most recent time step, while "ExpHistory" averages the existing model with the new step's timing information. Does not apply when weighting_algorithm is "Fortran".
641 
642  // Simple model for memory usage from load balance - just particles
643  double max_n_ptl_on_rank = max_mem_redist_gb*1024*1024*1024/80.0; // ~ 80 B per particle
644 
645  if(weighting_algorithm_str=="Fortran"){
647  }else if(weighting_algorithm_str=="ParticleBalance"){
649  }else if(weighting_algorithm_str=="SingleRegionBalance"){
651  }else{
652  exit_XGC("\nError: weighting_algorithm input not valid.");
653  }
654 
655 #ifdef USE_MPI
656  // If planes are sync'd, use intpl_comm to coordinate them; otherwise, use MPI_COMM_SELF
657  MPI_Comm inter_period_comm = sync_planes ? pol_decomp.mpi.intpl_comm : MPI_COMM_SELF;
658 
659  // Set up load balance regions
661  LoadRegion::UpdateMethod update_method;
662  if(update_method_str=="NoHistory"){
663  update_method = LoadRegion::UpdateMethod::NoHistory;
664  }else if(update_method_str=="ExpHistory"){
665  update_method = LoadRegion::UpdateMethod::ExpHistory;
666  }else{
667  exit_XGC("\nError: update_method input not valid.");
668  }
669 
670  regions.push_back(LoadRegion(pol_decomp.nnodes_on_plane, inter_period_comm, pol_decomp.mpi.plane_comm, update_method, verbose,
671  "Push",
672  {"ipc1:PUSHE",
673  "ipc1:PUSHI",
674  "ipc2:PUSHE",
675  "ipc2:PUSHI"}));
676  }
677 
678  // Hard-code for now
680 
681  if(constraint_option == ConstraintOption::ParticleCount){
682  // Normalize by n_planes since we are using the sum of planes as the constraint
683  int n_periods;
684  MPI_Comm_size(inter_period_comm, &n_periods);
685  constraint1_max = max_n_ptl_on_rank*n_periods;
686  }else{
687  exit_XGC("\nError: ParticleConstraint is only available ConstraintOption.\n");
688  }
689 #endif
690  }
691 
693  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, int istep, int gstep, ReweightOption reweight_option,
694  WeightingAlgorithm weighting_algorithm = WeightingAlgorithm::Default){
695 
696  // Skip rebalancing if poloidal decomposition is turned off. Normally we should put the function in
697  // an if statement rather than using return; kept here in case we might want some balancing for some reason
698  // even without poloidal decomposition
699  if(pol_decomp.pol_decomp==false) return;
700 
701  // Rebalance is trivial if there is only one rank on a plane (generalize this from no-MPI case)
702 #ifdef USE_MPI
703 
704  if(weighting_algorithm == WeightingAlgorithm::Default){
705  weighting_algorithm = default_weighting_algorithm;
706  }
707 
709  // Override input algorithm; if the default is Fortran, it should always use the fortran load balancer
710  weighting_algorithm = default_weighting_algorithm;
711  }
712 
713  if(is_rank_zero() && verbose){
714  if(weighting_algorithm==WeightingAlgorithm::Fortran) printf("\nLoad balance called with Fortran algorithm\n");
715  else if(weighting_algorithm==WeightingAlgorithm::ParticleBalance) printf("\nLoad balance called with Ptl algorithm\n");
716  else if(weighting_algorithm==WeightingAlgorithm::SingleRegionBalance) printf("\nLoad balance called with SingleRegion algorithm\n");
717  }
718 
719  GPTLstart("REBALANCE");
720  if(weighting_algorithm!=WeightingAlgorithm::Fortran){
721  GPTLstart("PTL_COUNT");
722  plasma.for_all_nonadiabatic_species([&](Species<DeviceType>& species){
723  long long int total_n_ptl = species.get_total_n_ptl();
724  int max_n_ptl = species.get_max_n_ptl();
725  double avg_n_ptl = (double)(total_n_ptl)/pol_decomp.mpi.nranks;
726  double max_ratio = max_n_ptl/avg_n_ptl;
727  if(is_rank_zero()) printf(" Species %d: (max/avg ptl per rank = %1.2f); total n_ptl = %lld\n", species.idx, max_ratio, total_n_ptl);
729  GPTLstop("PTL_COUNT");
730 
731  GPTLstart("PTL_COUNT_PER_NODE");
732  auto ptl_count = count_all_ptl_per_node(grid, magnetic_field, plasma);
733  GPTLstop("PTL_COUNT_PER_NODE");
734 
735  if(weighting_algorithm!=WeightingAlgorithm::ParticleBalance){
736  if(model_is_initialized()){
737  // Update the model with the latest timing
738  TIMER("LOAD_BAL_UPDATE",
739  update_model(pol_decomp.gvid0_pid_h) );
740  }else{
741  if(is_rank_zero() && verbose) printf("Initializing timing model, no partition proposal yet.");
743  GPTLstop("REBALANCE");
744  return;
745  }
746  }
747 
748  // With the updated model, proposed a new partition
749  GPTLstart("LOAD_BAL_NEW_PART");
750  if (is_rank_zero()) propose_new_partition(ptl_count, weighting_algorithm);
751  GPTLstop("LOAD_BAL_NEW_PART");
752  }
753 
754  GPTLstart("LOAD_BAL_REBAL");
755  double f0_cost = plasma.f0_grid() ? sum_view(plasma.f0_node_cost) : 0.0;
756  if(will_rebalance(reweight_option, weighting_algorithm, istep, gstep, f0_cost)){
757  // Save the old partition, it is used to save some communication in the f0 redistribution
758  View<int*,CLayout,HostType> old_partition(NoInit("old_partition"), pol_decomp.gvid0_pid_h.layout());
759  Kokkos::deep_copy(old_partition, pol_decomp.gvid0_pid_h);
760 
761  // Update pol_decomp
762  TIMER("LOAD_BAL_SET_NEW",
763  set_new_partition(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, weighting_algorithm) );
764 
765  // Update particles and f0
766  TIMER("LOAD_BAL_REDIST",
767  redistribute_load(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, old_partition) );
768  }
769  GPTLstop("LOAD_BAL_REBAL");
770  GPTLstop("REBALANCE");
771 #endif
772  }
773 
774 
775  // More generic rebalance
776  void rebalance(DomainDecomposition<DeviceType>& pol_decomp, const View<double*,CLayout,HostType>& constraint, const std::vector<double>& timings, double& load_imbalance, View<double*,HostType>& model_belief){
777 #ifdef USE_MPI
778  update_model(pol_decomp.gvid0_pid_h, timings);
779 
780  load_imbalance = regions[0].get_observed_load_imbalance();
781  model_belief = regions[0].get_estimated_time_per_vertex();
782 
783  // Propose new partition
785 
786  double f0_cost = 0.0;
787  int istep = 0;
788  int gstep = 0;
790  // Broadcast proposal to all ranks
791  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
792 
793  // Copy proposal to pol_decomp
794  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
795 
796  // Update bounds and device copy of the decomposition array
797  pol_decomp.update_pol_decomp();
798 
799  // If there were a real load, redistribute it here
800  }
801 #endif
802  }
803 };
804 
805 #endif
int nnodes_on_plane
Number of nodes on local plane.
Definition: domain_decomposition.hpp:85
bool pol_decomp
Use poloidal decomposition.
Definition: domain_decomposition.hpp:78
void update_pol_decomp()
Definition: domain_decomposition.cpp:226
Kokkos::View< int *, Kokkos::LayoutRight, HostType > gvid0_pid_h
Which processors get which vertices (host)
Definition: domain_decomposition.hpp:95
void update_flux_surf(const HostArray< VertexList > &surfaces)
Definition: domain_decomposition.cpp:115
HostArray< VertexList > flux_surfaces_h
Definition: grid.hpp:204
Definition: load_balance.hpp:270
WeightingAlgorithm default_weighting_algorithm
Definition: load_balance.hpp:290
ConstraintOption
Definition: load_balance.hpp:279
double threshold_to_rebalance
Definition: load_balance.hpp:300
bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, int istep, int gstep, double f0_cost)
Definition: load_balance.hpp:582
bool greedily_fill_partition(const View< double *, HostType > &weight, const View< double *, HostType > &constraint1, double target_weight_per_rank)
Definition: load_balance.hpp:315
double get_even_division(const View< double *, HostType > &input, int n) const
Definition: load_balance.hpp:305
double get_largest_predicted_time(const View< int *, CLayout, HostType > &partition, const View< double *, HostType > &weight) const
Definition: load_balance.hpp:372
void print_new_partition()
Definition: load_balance.hpp:479
double constraint1_max
Definition: load_balance.hpp:303
void initialize_model()
Definition: load_balance.hpp:500
void set_new_partition(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:529
void update_model(const View< int *, CLayout, HostType > &current_partition, const std::vector< double > &manual_times)
Definition: load_balance.hpp:494
std::vector< LoadRegion > regions
Definition: load_balance.hpp:292
void update_model(const View< int *, CLayout, HostType > &current_partition)
Definition: load_balance.hpp:487
LoadBalance()
Definition: load_balance.hpp:626
void propose_new_partition(const Kokkos::View< double *, Kokkos::LayoutRight, HostType > &ptl_count, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:514
bool model_is_initialized()
Definition: load_balance.hpp:506
WeightingAlgorithm
Definition: load_balance.hpp:272
void rebalance(DomainDecomposition< DeviceType > &pol_decomp, const View< double *, CLayout, HostType > &constraint, const std::vector< double > &timings, double &load_imbalance, View< double *, HostType > &model_belief)
Definition: load_balance.hpp:776
bool verbose
Definition: load_balance.hpp:301
LoadBalance(NLReader::NamelistReader &nlr, const DomainDecomposition< DeviceType > &pol_decomp, bool sync_planes=true)
Definition: load_balance.hpp:628
ReweightOption
Definition: load_balance.hpp:283
void rebalance(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, int istep, int gstep, ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm=WeightingAlgorithm::Default)
Definition: load_balance.hpp:692
void redistribute_load(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, const View< int *, CLayout, HostType > &old_partition)
Definition: load_balance.hpp:568
void one_weight_balance(const View< double *, HostType > &weight, const View< double *, CLayout, HostType > constraint1)
Definition: load_balance.hpp:388
bool recommend_proposed_partition()
Definition: load_balance.hpp:449
View< int *, CLayout, HostType > proposed_partition
Which processors get which vertices.
Definition: load_balance.hpp:294
Definition: load_balance.hpp:16
View< double *, HostType > get_estimated_time_per_vertex() const
Definition: load_balance.hpp:161
double get_estimated_time_per_vertex(int i) const
Definition: load_balance.hpp:157
double get_largest_predicted_time(const View< int *, CLayout, HostType > &proposed_partition) const
Definition: load_balance.hpp:182
View< double *, HostType > estimated_time_per_vertex
Definition: load_balance.hpp:25
double time_accumulated
Definition: load_balance.hpp:44
double get_time_since_previous_call()
Definition: load_balance.hpp:92
double get_prediction_undershoot() const
Definition: load_balance.hpp:165
void initialize_model()
Definition: load_balance.hpp:198
void update_model_exp_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:62
bool model_has_history
Definition: load_balance.hpp:30
UpdateMethod update_method
Definition: load_balance.hpp:46
bool get_model_is_initialized() const
Definition: load_balance.hpp:177
std::string region_name
Definition: load_balance.hpp:26
std::vector< std::string > timer_names
Definition: load_balance.hpp:27
int n_unique_ranks
How many ranks are in a 'period' (in tokamaks, in a plane)
Definition: load_balance.hpp:41
double prediction_undershoot
Definition: load_balance.hpp:34
bool verbose
Definition: load_balance.hpp:28
bool model_is_initialized
Definition: load_balance.hpp:29
double observed_max_region_time
Definition: load_balance.hpp:32
double observed_load_imbalance
Definition: load_balance.hpp:33
double predicted_max_region_time
Definition: load_balance.hpp:31
void update_model_no_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:48
double get_observed_max_region_time() const
Definition: load_balance.hpp:169
void update_model(const View< int *, CLayout, HostType > &current_partition, double manual_time=-1.0)
Definition: load_balance.hpp:205
int my_period_rank
Definition: load_balance.hpp:42
int n_periods
How many repeating periods there are; in tokamaks this is planes.
Definition: load_balance.hpp:40
void touch_timers()
Definition: load_balance.hpp:116
void reset_timer()
Definition: load_balance.hpp:152
UpdateMethod
Definition: load_balance.hpp:18
double get_observed_load_imbalance() const
Definition: load_balance.hpp:173
Definition: magnetic_field.hpp:12
Definition: NamelistReader.hpp:193
T get(const string &param, const T default_val, int val_ind=0)
Definition: NamelistReader.hpp:386
void use_namelist(const string &namelist, Options required=Required)
Definition: NamelistReader.hpp:360
@ Optional
Definition: NamelistReader.hpp:287
Definition: plasma.hpp:13
@ NoDevicePtl
Definition: plasma.hpp:101
Definition: sml.hpp:9
bool electron_on
Use kinetic electrons.
Definition: sml.hpp:55
Definition: species.hpp:75
int get_max_n_ptl()
Definition: species.hpp:743
int idx
Index in all_species.
Definition: species.hpp:78
long long int get_total_n_ptl()
Definition: species.hpp:732
Kokkos::View< double **, Kokkos::LayoutRight, HostType > count_ptl_per_node_elec_main_ion(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma, bool kinetic_electrons)
Definition: count_ptl_per_node.cpp:29
View< double *, CLayout, HostType > count_all_ptl_per_node(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma)
Definition: count_ptl_per_node.cpp:71
void f0_redistribute(Plasma &plasma, const DomainDecomposition< DeviceType > &pol_decomp, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, const View< int *, CLayout, HostType > &old_partition)
Definition: f0_redistribute.cpp:81
void exit_XGC(std::string msg)
Definition: globals.hpp:37
bool is_rank_zero()
Definition: globals.hpp:27
void reset_cost_trackers()
int assess_whether_to_rebalance_load(int istep)
void calculate_load_imbalance(int istep, int gstep, double f0_cost)
void set_weights(int *gvid0_pid, double *ptl_count, double *f0_node_cost)
Definition: magnetic_field.F90:1
logical false
Definition: module.F90:102
subroutine plasma(grid, itr, p, dene_out, deni_out, Te_out, Ti_out, Vparai_out, ignore_vacuum)
Calculate the plasma density, temperature, and parallel velocity for a point in triangle itr using pl...
Definition: neutral_totalf.F90:1548
void shift_all_species(Plasma &plasma, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const DomainDecomposition< DeviceType > &pol_decomp)
Definition: shift.cpp:383
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:69
Definition: velocity_grid.hpp:8
#define TIMER(N, F)
Definition: timer_macro.hpp:24
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:135