XGC1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
load_balance.hpp
Go to the documentation of this file.
1 #ifndef LOAD_BALANCE_HPP
2 #define LOAD_BALANCE_HPP
3 
4 #include "timer_macro.hpp"
5 #include "shift.hpp"
6 #include "count_ptl_per_node.hpp"
7 #include "view_arithmetic.hpp"
8 #include "f0_redistribute.hpp"
9 
10 // Used for original fortran load balancer
11 extern "C" void calculate_load_imbalance(double f0_cost);
12 extern "C" int assess_whether_to_rebalance_load();
13 extern "C" void reset_cost_trackers();
14 extern "C" void set_weights(int* gvid0_pid, double* ptl_count, double* f0_node_cost);
15 
16 class LoadRegion{
17  public:
18  enum class UpdateMethod{
19  NoHistory=0,
21  };
22 
23  private:
24 
25  View<double*,HostType> estimated_time_per_vertex;
26  std::string region_name;
27  std::vector<std::string> timer_names;
28  bool verbose;
35 
36 #ifdef USE_MPI
37  MPI_Comm inter_period_comm;
38  MPI_Comm period_comm;
39 #endif
40  int n_periods;
43 
45 
47 
48  void update_model_no_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
49  // Loop over ranks' timers
50  for(int i=0; i<all_periods_timings.size(); i++){
51  int node_offset = current_partition(i) - 1;
52  int next_node_offset = current_partition(i+1) - 1;
53  int nnodes = next_node_offset - node_offset;
54  // Best 0th order estimate is to assume all vertices took the same time
55  double time_per_node = all_periods_timings(i)/nnodes;
56  for(int j=0; j<nnodes; j++){
57  estimated_time_per_vertex(j + node_offset) = time_per_node;
58  }
59  }
60  }
61 
62  void update_model_exp_history(const View<int*,CLayout,HostType>& current_partition, const View<double*, HostType>& all_periods_timings){
63  // Loop over ranks' timers
64  for(int i=0; i<all_periods_timings.size(); i++){
65  // Get partition owned by this rank
66  int node_offset = current_partition(i) - 1;
67  int next_node_offset = current_partition(i+1) - 1;
68  int nnodes = next_node_offset - node_offset;
69 
70  // Get average time per node for this rank
71  double avg_time_per_node = all_periods_timings(i)/nnodes;
72 
73  // Get expected time based on existing model
74  double expected_time = 0.0;
75  for(int j=0; j<nnodes; j++){
76  expected_time += estimated_time_per_vertex(j + node_offset);
77  }
78  double expected_time_per_node = expected_time/nnodes;
79 
80  // Difference between observed time and expected time
81  double extra_time_per_node = avg_time_per_node - expected_time_per_node;
82 
83  for(int j=0; j<nnodes; j++){
84  // Distribute extra time evenly
85  constexpr double adjustment_rate = 0.5;
86 
87  estimated_time_per_vertex(j + node_offset) += extra_time_per_node*adjustment_rate;
88  }
89  }
90  }
91 
93  // Get time from camtimers (accumulated)
94  double new_accumulated_time = 0.0;
95 
96  // Loop through all timers associated with this region, and add the accumulated time for each
97  for(int i=0; i<timer_names.size(); i++){
98  double accumulated_single_timer = 0;
99  int ierr = GPTLget_wallclock(timer_names[i].c_str(), -1, &accumulated_single_timer);
100 
101  // If ierr != 0, timer is not yet defined, so the accumulated time is zero
102  if(ierr != 0) accumulated_single_timer = 0.0;
103 
104  new_accumulated_time += accumulated_single_timer;
105  }
106 
107  // Subtract previous accumulated time to get time spent since last update
108  double time_spent = new_accumulated_time - time_accumulated;
109 
110  // Save new accumulated time
111  time_accumulated = new_accumulated_time;
112 
113  return time_spent;
114  }
115 
116  void touch_timers(){
117  for(int i=0; i<timer_names.size(); i++){
118  GPTLstart(timer_names[i].c_str());
119  GPTLstop(timer_names[i].c_str());
120  }
121  }
122 
123  public:
124 
125 #ifdef USE_MPI
126  LoadRegion(int n_vertices, const MPI_Comm& inter_period_comm, const MPI_Comm& period_comm, UpdateMethod update_method, bool verbose, std::string region_name, std::vector<std::string> timer_names)
127  : model_is_initialized(false),
128  verbose(verbose),
129  time_accumulated(0.0),
130  inter_period_comm(inter_period_comm),
131  period_comm(period_comm),
132  estimated_time_per_vertex("estimated_time_per_vertex",n_vertices),
134  update_method(update_method),
135  model_has_history(false),
136  region_name(region_name),
137  timer_names(timer_names)
138  {
139  // Touch GPTL timers to be sure they exist
140  touch_timers();
141 
142  // Initialize timer
143  reset_timer();
144 
145  // Get problem size from comms
146  MPI_Comm_rank(period_comm, &my_period_rank);
147  MPI_Comm_size(period_comm, &n_unique_ranks);
148  MPI_Comm_size(inter_period_comm, &n_periods);
149  }
150 #endif
151 
152  void reset_timer(){
153  // Reset timer by getting the time since the previous call and suppressing the output
154  double time_spent = get_time_since_previous_call();
155  }
156 
157  double get_estimated_time_per_vertex(int i) const{
158  return estimated_time_per_vertex(i);
159  }
160 
161  View<double*,HostType> get_estimated_time_per_vertex() const{
163  }
164 
166  return prediction_undershoot;
167  }
168 
171  }
172 
175  }
176 
178  return model_is_initialized;
179  }
180 
181  // Predict performance of new partition based on model
182  double get_largest_predicted_time(const View<int*,CLayout,HostType>& proposed_partition) const{
183  double largest_time = 0.0;
184  //int largest_time_ind = 0;
185  for(int i=0; i<(proposed_partition.size()-1); i++){
186  double proc_time = 0.0;
187  for(int i_node=proposed_partition(i)-1; i_node<proposed_partition(i+1)-1; i_node++){
188  proc_time += estimated_time_per_vertex(i_node);
189  }
190  if(proc_time>largest_time){
191  //largest_time_ind = i;
192  largest_time = proc_time;
193  }
194  }
195  return largest_time;
196  }
197 
199  reset_timer();
200 
201  // Model can now be used
202  model_is_initialized = true;
203  }
204 
205  void update_model(const View<int*,CLayout,HostType>& current_partition, double manual_time=-1.0){
207 
208  // Allocate for timing of each
209  View<double*, HostType> all_periods_timings(NoInit("all_periods_timings"), n_unique_ranks);
210 
211 #ifdef USE_MPI
212  // Get time spent since last model update
213  // Time can be entered manually; it's used if positive (i.e. the input was provided)
214  double time_spent_this_rank = (manual_time<0.0 ? get_time_since_previous_call()
215  : manual_time);
216 
217  // Reduce onto plane 0
218  // Could try MPI_SUM rather than MPI_MAX
219  double time_spent;
220  if(n_periods>1){
221  MPI_Reduce(&time_spent_this_rank, &time_spent, 1, MPI_DOUBLE, MPI_MAX, 0, inter_period_comm);
222  }else{
223  time_spent = time_spent_this_rank;
224  }
225 
226  // Gather from all ranks in plane 0
227  MPI_Gather(&time_spent, 1, MPI_DOUBLE, all_periods_timings.data(), 1, MPI_DOUBLE, 0, period_comm);
228 #endif
229 
230  if (is_rank_zero()){
231  // Look at the timing prediction made last time
233  if(verbose) printf("\nPredicted max time in this region was %1.5e", predicted_max_region_time);
234 
235  // Get max region time
237  double observed_sum_region_time = 0.0;
238  for(int i=0; i<all_periods_timings.size(); i++){
239  observed_max_region_time = std::max(observed_max_region_time, all_periods_timings(i));
240  observed_sum_region_time += all_periods_timings(i);
241  }
242  double observed_idle_region_time = observed_max_region_time*all_periods_timings.size() - observed_sum_region_time;
243  observed_load_imbalance = observed_idle_region_time/observed_sum_region_time;
244  if(verbose) printf("\nObserved max time in this region was %1.5e", observed_max_region_time);
245  if(verbose) printf("\n - Load imbalance (T_idle/T_work): %1.2f%%", observed_load_imbalance*100);
246 
247  // Take the max here so that there is never an assumed overshoot
248  if(predicted_max_region_time!=0.0){ // If time is zero, there hasn't been a prediction yet
250  }
251  if(verbose) printf("\nThe prediction undershot by a factor of %1.5e", observed_max_region_time/predicted_max_region_time);
252 
254  update_model_no_history(current_partition, all_periods_timings);
256  if(model_has_history){
257  update_model_exp_history(current_partition, all_periods_timings);
258  }else{
259  update_model_no_history(current_partition, all_periods_timings);
260  model_has_history = true;
261  }
262  }
263  }
264  }else{
265  exit_XGC("Error: Update method not available\n");
266  }
267  }
268 };
269 
271  public:
272  enum class WeightingAlgorithm{
275  Fortran,
276  Default
277  };
278 
279  enum class ConstraintOption{
280  ParticleCount=0
281  };
282 
283  enum class ReweightOption{
284  Always=0,
285  IfBetter
286  };
287 
288  private:
289 
291 
292  std::vector<LoadRegion> regions;
293 
294  View<int*,CLayout,HostType> proposed_partition;
295 
296 #ifdef USE_MPI
297  MPI_Comm comm;
298 #endif
299 
301  bool verbose;
302 
304 
305  double get_even_division(const View<double*,HostType>& input, int n) const{
306  double total = 0.0;
307  Kokkos::parallel_reduce("sum", Kokkos::RangePolicy<HostExSpace>(0,input.size()), [=]( const int i, double& l_total){
308  l_total += input(i);
309  }, total);
310 
311  return (total/n);
312  }
313 
314 
315  bool greedily_fill_partition(const View<double*,HostType>& weight, const View<double*,HostType>& constraint1, double target_weight_per_rank){
316  int nnode = weight.size();
317  int nproc = proposed_partition.size()-1;
318 
319  // Start from rank 0 of the plane; assign nodes until it meets the desired weight per rank, then move to next rank
320  double constraint1_in_this_rank = 0.0;
321  double weight_in_this_rank = 0.0;
322  proposed_partition(0) = 1; // 1-indexed
323  int pid = 0;
324  bool assign_one_node_per_proc = false;
325  for(int i_node = 0; i_node<nnode; i_node++){
326  // Since every rank needs at last one node, switch to assigning at every
327  // iteration if there are only as many nodes left as ranks
328  if(nnode-i_node==nproc-pid) assign_one_node_per_proc = true;
329 
330  // Check if a criterion is met to consider the rank sufficiently loaded
331  bool rank_is_loaded = false;
332 
333  // Criterion 1: We are out of nodes
334  if(assign_one_node_per_proc) rank_is_loaded = true;
335 
336  // Criterion 2: We have hit a constraint
337  if(constraint1_in_this_rank>=constraint1_max) rank_is_loaded = true;
338 
339  // Criterion 3: We have loaded the rank with an even amount of work
340  if(weight_in_this_rank>=target_weight_per_rank) rank_is_loaded = true;
341 
342  // If there are enough particles assigned to this rank, move to the next rank
343  if(rank_is_loaded){
344  proposed_partition(pid+1) = i_node+1; // 1-indexed
345  if(verbose) printf("\nRank %d is loaded (Nodes %d-%d); weight=%1.4e, ptl*nplanes=%d", pid, proposed_partition(pid), proposed_partition(pid+1)-1, weight_in_this_rank, int(constraint1_in_this_rank));
346  constraint1_in_this_rank = 0.0;
347  weight_in_this_rank = 0.0;
348  pid++;
349  if(pid==nproc-1) break;
350  }
351  constraint1_in_this_rank += constraint1(i_node);
352  weight_in_this_rank += weight(i_node);
353  }
354 
355  // Last value will always be nnode+1
356  proposed_partition(nproc) = nnode+1;
357 
358  // Check n_ptl in final rank
359  constraint1_in_this_rank = 0.0;
360  for(int i_node=proposed_partition(nproc-1)-1; i_node<nnode; i_node++){
361  constraint1_in_this_rank += constraint1(i_node);
362  }
363  if(constraint1_in_this_rank>=constraint1_max){
364  // Return false if the constraint was not met
365  return false;
366  }else{
367  return true;
368  }
369  }
370 
371  // Predict performance of new partition based on model
372  double get_largest_predicted_time(const View<int*,CLayout,HostType>& partition, const View<double*,HostType>& weight) const{
373  double largest_time = 0.0;
374  //int largest_time_ind = 0;
375  for(int i=0; i<(partition.size()-1); i++){
376  double proc_time = 0.0;
377  for(int i_node=partition(i)-1; i_node<partition(i+1)-1; i_node++){
378  proc_time += weight(i_node);
379  }
380  if(proc_time>largest_time){
381  //largest_time_ind = i;
382  largest_time = proc_time;
383  }
384  }
385  return largest_time;
386  }
387 
388  void one_weight_balance(const View<double*,HostType>& weight, const View<double*,CLayout,HostType> constraint1){
389  int nproc = proposed_partition.size()-1;
390 
391  // Ideally, each rank would get the same amount of work, i.e. the average
392  double ideal_weight_per_rank = get_even_division(weight, nproc);
393 
394  // Make initial attempt, targeting even distribution of work
395  bool meets_constraints = greedily_fill_partition(weight, constraint1, ideal_weight_per_rank);
396 
397  if(!meets_constraints){
398  // An equal work distribution cannot be assigned due to a constraint.
399  // First, determine the weights if only the constraint were followed
400  meets_constraints = greedily_fill_partition(constraint1, constraint1, get_even_division(constraint1, nproc));
401  if(!meets_constraints) exit_XGC("\nUnexpected issue in load balance: constraint-based partition doesn't satisfy constraints\n");
402 
403  // This partition meets the constraints, but the proposed timing is likely unacceptable
404  double upper_limit_weight_per_rank = get_largest_predicted_time(proposed_partition, weight);
405  if(verbose) printf("The ideal distribution (%1.3e) does not satisfy constraints. This distribution (%1.3e) does.\n", ideal_weight_per_rank, upper_limit_weight_per_rank);
406 
407  // Bisect the difference between the original target time and the upper limit, and see if that satisfies the constraints
408  // Continue until the load balance has been minimized to a precision based on the the original desired time
409  const double desired_precision = 0.01; // Fraction of imbalance that's worth even checking whether it can be removed
410  double desired_step_size = ideal_weight_per_rank*desired_precision;
411  double step_size = upper_limit_weight_per_rank - ideal_weight_per_rank;
412  double compromise_weight_per_rank = upper_limit_weight_per_rank;
413  if(verbose) printf("\nEmploying a binary search to narrow down on the best option.");
414  while(step_size>desired_step_size){
415  // Halve the size of the step
416  step_size /= 2;
417 
418  if(meets_constraints){
419  // Try reducing the load imbalance since constraints were met
420  compromise_weight_per_rank -= step_size;
421  }else{
422  // Try raising the load imbalance since constraints were broken
423  compromise_weight_per_rank += step_size;
424  }
425 
426  // Get new partition
427  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
428  if(verbose) printf("\n Stepped by %1.3e to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
429  }
430  // In case we end at a partition that fails
431  while(!meets_constraints){
432  compromise_weight_per_rank += step_size;
433  meets_constraints = greedily_fill_partition(weight, constraint1, compromise_weight_per_rank);
434  if(verbose) printf("\n Stepped by %1.3e UP to %1.3e. The new partition does%s meet constraints.", step_size, compromise_weight_per_rank, meets_constraints?"" : "nt");
435  }
436  if(verbose) printf("\n");
437  }
438  }
439 
440  // TODO
441  // Evaluate overall model performance
442  // MAIN_LOOP time vs sum of regions (% coverage)
443  //
444 
445  /* This function assesses the proposed partition and decides whether it is worth switching to.
446  * The simplest formulation is to recommend the new partition if the new partition is predicted to be faster
447  * than the existing one. There is also a factor to account for historical inaccuracy of the previous estimate.
448  * A future option would be to require a certain level of improvement before repartitioning. */
450  // Total time spent in regions (assuming that global barriers demarcate them):
451  double observed_max_all_rgns_time = 0.0;
452  for(int i=0; i<regions.size(); i++){
453  observed_max_all_rgns_time += regions[i].get_observed_max_region_time();
454  }
455 
456  // Total predicted time, accounting for observed prediction undershoot for each region
457  double predicted_max_all_rgns_time = 0.0;
458  for(int i=0; i<regions.size(); i++){
459  predicted_max_all_rgns_time += regions[i].get_largest_predicted_time(proposed_partition)*regions[i].get_prediction_undershoot();
460  }
461 
462  double fractional_improvement = 1.0 - predicted_max_all_rgns_time/observed_max_all_rgns_time;
463 
464  if(verbose){
465  printf("\nDetermining whether to adopt the proposed partition:");
466  printf("\n Observed time with current partition was: %1.3e", observed_max_all_rgns_time);
467  printf("\n Predicted time with proposed partition, adjusting for historical undershoot (of Rgn 0: %1.3e): %1.3e", regions[0].get_prediction_undershoot(), predicted_max_all_rgns_time);
468  printf("\n Adopted if Fractional improvement of adjusted prediction (%1.3e) exceeds specified threshold (%1.3e)\n", fractional_improvement, threshold_to_rebalance);
469  }
470 
471  // Recommend the new partition if the predicted time is less than the time observed with the current partition
472  if(fractional_improvement > threshold_to_rebalance){
473  return true;
474  }else{
475  return false;
476  }
477  }
478 
480  // Print new partition
481  for(int i = 0; i<proposed_partition.size()-1; i++){
482  int nnodes_for_proc = proposed_partition(i+1) - proposed_partition(i);
483  printf("process %d : nnodes = %d; range = (%d - %d)\n", i, nnodes_for_proc, proposed_partition(i), proposed_partition(i+1)-1);
484  }
485  }
486 
487  void update_model(const View<int*,CLayout,HostType>& current_partition){
488  for(int i=0; i<regions.size(); i++){
489  regions[i].update_model(current_partition);
490  }
491  }
492 
493  // Overload if providing timings manually rather than using camtimers. This is used for testing.
494  void update_model(const View<int*,CLayout,HostType>& current_partition, const std::vector<double>& manual_times){
495  for(int i=0; i<regions.size(); i++){
496  regions[i].update_model(current_partition, manual_times[i]);
497  }
498  }
499 
501  for(int i=0; i<regions.size(); i++){
502  regions[i].initialize_model();
503  }
504  }
505 
507  bool is_initialized=true;
508  for(int i=0; i<regions.size(); i++){
509  is_initialized = (is_initialized && regions[i].get_model_is_initialized());
510  }
511  return is_initialized;
512  }
513 
514  void propose_new_partition(const Kokkos::View<double*,Kokkos::LayoutRight,HostType>& ptl_count, WeightingAlgorithm weighting_algorithm){
515  auto constraint1 = ptl_count; // Constrain based on particle count (memory)
516  if(weighting_algorithm == WeightingAlgorithm::SingleRegionBalance){
517  if(regions.size()!=1) exit_XGC("Error: Load balancing is currently single-region only\n");
518  one_weight_balance(regions[0].get_estimated_time_per_vertex(), constraint1);
519  }else if(weighting_algorithm == WeightingAlgorithm::ParticleBalance){
520  one_weight_balance(ptl_count, constraint1);
521  }
522 
523  if(verbose){
524  printf("\nNewly PROPOSED partition:\n");
526  }
527  }
528 
530  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, WeightingAlgorithm weighting_algorithm){
531  if(weighting_algorithm==WeightingAlgorithm::Fortran){
532  // In the fortran algorithm the new distribution is calculated and set here
533  // Count number of particles belonging to each node
534  Kokkos::View<double**,Kokkos::LayoutRight,HostType> ptl_count = count_ptl_per_node_elec_main_ion(grid, magnetic_field, plasma, sml.electron_on);
535 
536  if(pol_decomp.gvid0_pid_h.size()==2){
537  // Skip set_weights if there is only one rank per plane
538  pol_decomp.gvid0_pid_h(0) = 1;
539  pol_decomp.gvid0_pid_h(1) = pol_decomp.nnodes_on_plane + 1;
540  }else{
541  TIMER("SET_WEIGHTS",
542  set_weights(pol_decomp.gvid0_pid_h.data(), ptl_count.data(), plasma.f0_node_cost.data()) );
543  }
544  }else{
545 #ifdef USE_MPI
546  // Broadcast proposal to all ranks
547  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
548 #endif
549 
550  // Copy proposal to pol_decomp
551  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
552 
553  if(is_rank_zero()){
554  printf("\nNEW PARTITION:\n");
556  }
557  }
558 
559  // Update bounds and device copy of the decomposition array
560  pol_decomp.update_pol_decomp();
561  }
562 
564  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp,
565  const View<int*,CLayout,HostType>& old_partition){
566  if (plasma.f0_grid()){
567  // Move f information to correct MPI rank after decomposition update
568  TIMER("F0_REDISTRIBUTE",
569  f0_redistribute(plasma, pol_decomp, grid, magnetic_field, vgrid, old_partition) );
570  }
571 
572  // Shift particles to correct MPI rank after decomposition update
573  TIMER("SHIFT_R",
574  shift_all_species(plasma, grid, magnetic_field, pol_decomp, Shift::NoPhase0) );
575  }
576 
577  bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, double f0_cost){
578  if(reweight_option==ReweightOption::Always){
579  // No need to check if model is an improvement
580  return true;
581  }else{
582  if(weighting_algorithm==WeightingAlgorithm::Fortran){
583  // Calculate load imbalance
584  calculate_load_imbalance(f0_cost);
585 
586  // Assess if load imbalance merits a rebalancing
587  int should_rebalance_int = assess_whether_to_rebalance_load();
588 
589  // Reset fortran cost trackers
591 
592  return (should_rebalance_int==1);
593  }else{
594  // Evaluate the proposed partition on rank 0
595  bool should_rebalance;
596  if(weighting_algorithm==WeightingAlgorithm::ParticleBalance){
597  // Since ParticleBalance doesn't contain an internal model, cant evaluate the partition
598  // so just always approve, for now.
599  // Probably better to have the model updatable by something other than time so it can still
600  // be assessed in this mode
601  return true;
602  }else{
603  if(is_rank_zero()) should_rebalance = recommend_proposed_partition();
604  // Convert to int because MPI_C_BOOL documentation is confusing
605  int should_rebalance_int = (should_rebalance ? 1 : 0);
606 #ifdef USE_MPI
607  MPI_Bcast(&should_rebalance_int, 1, MPI_INT, 0, comm);
608 #endif
609  return (should_rebalance_int==1);
610  }
611  }
612  }
613  }
614 
615  public:
616 
617  LoadBalance(NLReader::NamelistReader& nlr, const DomainDecomposition<DeviceType>& pol_decomp, bool sync_planes = true)
618  : proposed_partition(NoInit("proposed_partition"), pol_decomp.gvid0_pid_h.layout())
619 #ifdef USE_MPI
620  , comm(sync_planes ? pol_decomp.mpi.comm : pol_decomp.mpi.plane_comm)
621 #endif
622  {
623  double max_mem_redist_gb = 10.0;
624  std::string weighting_algorithm_str = "Fortran";
625  std::string update_method_str = "NoHistory";
626  if(nlr.namelist_present("load_balance_param")){
627  nlr.use_namelist("load_balance_param");
628  max_mem_redist_gb = nlr.get<double>("max_mem_redist_gb", 10.0);
629  weighting_algorithm_str = nlr.get<std::string>("weighting_algorithm", "Fortran");
630  threshold_to_rebalance = nlr.get<double>("threshold_to_rebalance", 0.02);
631  verbose = nlr.get<bool>("verbose", false);
632  update_method_str = nlr.get<std::string>("update_method", "NoHistory");
633  }else{
634  threshold_to_rebalance = 0.02;
635  verbose = false;
636  }
637 
638  // Simple model for memory usage from load balance - just particles
639  double max_n_ptl_on_rank = max_mem_redist_gb*1024*1024*1024/80.0; // ~ 80 B per particle
640 
641  if(weighting_algorithm_str=="Fortran"){
643  }else if(weighting_algorithm_str=="ParticleBalance"){
645  }else if(weighting_algorithm_str=="SingleRegionBalance"){
647  }else{
648  exit_XGC("\nError: weighting_algorithm input not valid.");
649  }
650 
651 #ifdef USE_MPI
652  // If planes are sync'd, use intpl_comm to coordinate them; otherwise, use MPI_COMM_SELF
653  MPI_Comm inter_period_comm = sync_planes ? pol_decomp.mpi.intpl_comm : MPI_COMM_SELF;
654 
655  // Set up load balance regions
657  LoadRegion::UpdateMethod update_method;
658  if(update_method_str=="NoHistory"){
659  update_method = LoadRegion::UpdateMethod::NoHistory;
660  }else if(update_method_str=="ExpHistory"){
661  update_method = LoadRegion::UpdateMethod::ExpHistory;
662  }else{
663  exit_XGC("\nError: update_method input not valid.");
664  }
665 
666  regions.push_back(LoadRegion(pol_decomp.nnodes_on_plane, inter_period_comm, pol_decomp.mpi.plane_comm, update_method, verbose,
667  "Push",
668  {"ipc1:PUSHE",
669  "ipc1:PUSHI",
670  "ipc2:PUSHE",
671  "ipc2:PUSHI"}));
672  }
673 
674  // Hard-code for now
676 
677  if(constraint_option == ConstraintOption::ParticleCount){
678  // Normalize by n_planes since we are using the sum of planes as the constraint
679  int n_periods;
680  MPI_Comm_size(inter_period_comm, &n_periods);
681  constraint1_max = max_n_ptl_on_rank*n_periods;
682  }else{
683  exit_XGC("\nError: ParticleConstraint is only available ConstraintOption.\n");
684  }
685 #endif
686  }
687 
689  const VelocityGrid& vgrid, Plasma& plasma, DomainDecomposition<DeviceType>& pol_decomp, ReweightOption reweight_option,
690  WeightingAlgorithm weighting_algorithm = WeightingAlgorithm::Default){
691 
692  // Skip rebalancing if poloidal decomposition is turned off. Normally we should put the function in
693  // an if statement rather than using return; kept here in case we might want some balancing for some reason
694  // even without poloidal decomposition
695  if(pol_decomp.pol_decomp==false) return;
696 
697  // Rebalance is trivial if there is only one rank on a plane (generalize this from no-MPI case)
698 #ifdef USE_MPI
699 
700  if(weighting_algorithm == WeightingAlgorithm::Default){
701  weighting_algorithm = default_weighting_algorithm;
702  }
703 
704  if(default_weighting_algorithm==WeightingAlgorithm::Fortran){
705  // Override input algorithm; if the default is Fortran, it should always use the fortran load balancer
706  weighting_algorithm = default_weighting_algorithm;
707  }
708 
709  if(is_rank_zero() && verbose){
710  if(weighting_algorithm==WeightingAlgorithm::Fortran) printf("\nLoad balance called with Fortran algorithm\n");
711  else if(weighting_algorithm==WeightingAlgorithm::ParticleBalance) printf("\nLoad balance called with Ptl algorithm\n");
712  else if(weighting_algorithm==WeightingAlgorithm::SingleRegionBalance) printf("\nLoad balance called with SingleRegion algorithm\n");
713  }
714 
715  GPTLstart("REBALANCE");
716  if(weighting_algorithm!=WeightingAlgorithm::Fortran){
717  GPTLstart("PTL_COUNT");
719  long long int total_n_ptl = species.get_total_n_ptl();
720  int max_n_ptl = species.get_max_n_ptl();
721  double avg_n_ptl = (double)(total_n_ptl)/pol_decomp.mpi.nranks;
722  double max_ratio = max_n_ptl/avg_n_ptl;
723  if(is_rank_zero()) printf(" Species %d: (max/avg ptl per rank = %1.2f); total n_ptl = %lld\n", species.idx, max_ratio, total_n_ptl);
725  GPTLstop("PTL_COUNT");
726 
727  GPTLstart("PTL_COUNT_PER_NODE");
728  auto ptl_count = count_all_ptl_per_node(grid, magnetic_field, plasma);
729  GPTLstop("PTL_COUNT_PER_NODE");
730 
731  if(weighting_algorithm!=WeightingAlgorithm::ParticleBalance){
732  if(model_is_initialized()){
733  // Update the model with the latest timing
734  TIMER("LOAD_BAL_UPDATE",
735  update_model(pol_decomp.gvid0_pid_h) );
736  }else{
737  if(is_rank_zero() && verbose) printf("Initializing timing model, no partition proposal yet.");
738  initialize_model();
739  GPTLstop("REBALANCE");
740  return;
741  }
742  }
743 
744  // With the updated model, proposed a new partition
745  GPTLstart("LOAD_BAL_NEW_PART");
746  if (is_rank_zero()) propose_new_partition(ptl_count, weighting_algorithm);
747  GPTLstop("LOAD_BAL_NEW_PART");
748  }
749 
750  GPTLstart("LOAD_BAL_REBAL");
751  double f0_cost = plasma.f0_grid() ? sum_view(plasma.f0_node_cost) : 0.0;
752  if(will_rebalance(reweight_option, weighting_algorithm, f0_cost)){
753  // Save the old partition, it is used to save some communication in the f0 redistribution
754  View<int*,CLayout,HostType> old_partition(NoInit("old_partition"), pol_decomp.gvid0_pid_h.layout());
755  Kokkos::deep_copy(old_partition, pol_decomp.gvid0_pid_h);
756 
757  // Update pol_decomp
758  TIMER("LOAD_BAL_SET_NEW",
759  set_new_partition(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, weighting_algorithm) );
760 
761  // Update particles and f0
762  TIMER("LOAD_BAL_REDIST",
763  redistribute_load(sml, grid, magnetic_field, vgrid, plasma, pol_decomp, old_partition) );
764  }
765  GPTLstop("LOAD_BAL_REBAL");
766  GPTLstop("REBALANCE");
767 #endif
768  }
769 
770 
771  // More generic rebalance
772  void rebalance(DomainDecomposition<DeviceType>& pol_decomp, const View<double*,CLayout,HostType>& constraint, const std::vector<double>& timings, double& load_imbalance, View<double*,HostType>& model_belief){
773 #ifdef USE_MPI
774  update_model(pol_decomp.gvid0_pid_h, timings);
775 
776  load_imbalance = regions[0].get_observed_load_imbalance();
777  model_belief = regions[0].get_estimated_time_per_vertex();
778 
779  // Propose new partition
780  if (is_rank_zero()) propose_new_partition(constraint, default_weighting_algorithm);
781 
782  double f0_cost = 0.0;
783  if(will_rebalance(ReweightOption::IfBetter, default_weighting_algorithm, f0_cost)){
784  // Broadcast proposal to all ranks
785  MPI_Bcast(proposed_partition.data(), proposed_partition.size(), MPI_INT, 0, comm);
786 
787  // Copy proposal to pol_decomp
788  Kokkos::deep_copy(pol_decomp.gvid0_pid_h, proposed_partition);
789 
790  // Update bounds and device copy of the decomposition array
791  pol_decomp.update_pol_decomp();
792 
793  // If there were a real load, redistribute it here
794  }
795 #endif
796  }
797 };
798 
799 #endif
bool model_has_history
Definition: load_balance.hpp:30
View< double *, HostType > estimated_time_per_vertex
Definition: load_balance.hpp:25
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
bool will_rebalance(ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm, double f0_cost)
Definition: load_balance.hpp:577
bool is_rank_zero()
Definition: globals.hpp:27
void one_weight_balance(const View< double *, HostType > &weight, const View< double *, CLayout, HostType > constraint1)
Definition: load_balance.hpp:388
double threshold_to_rebalance
Definition: load_balance.hpp:300
T get(const string &param, const T default_val, int val_ind=0)
Definition: NamelistReader.hpp:373
double time_accumulated
Definition: load_balance.hpp:44
bool f0_grid() const
Definition: plasma.hpp:202
double predicted_max_region_time
Definition: load_balance.hpp:31
double observed_max_region_time
Definition: load_balance.hpp:32
double get_largest_predicted_time(const View< int *, CLayout, HostType > &partition, const View< double *, HostType > &weight) const
Definition: load_balance.hpp:372
Definition: velocity_grid.hpp:8
Definition: sml.hpp:8
void update_model_exp_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:62
int n_unique_ranks
How many ranks are in a &#39;period&#39; (in tokamaks, in a plane)
Definition: load_balance.hpp:41
LoadBalance(NLReader::NamelistReader &nlr, const DomainDecomposition< DeviceType > &pol_decomp, bool sync_planes=true)
Definition: load_balance.hpp:617
subroutine plasma(grid, itr, p, dene_out, deni_out, Te_out, Ti_out, Vparai_out)
Calculate the plasma density, temperature, and parallel velocity for a point in triangle itr using pl...
Definition: neutral_totalf.F90:1224
void rebalance(DomainDecomposition< DeviceType > &pol_decomp, const View< double *, CLayout, HostType > &constraint, const std::vector< double > &timings, double &load_imbalance, View< double *, HostType > &model_belief)
Definition: load_balance.hpp:772
Definition: plasma.hpp:107
Definition: NamelistReader.hpp:193
Definition: magnetic_field.hpp:12
void propose_new_partition(const Kokkos::View< double *, Kokkos::LayoutRight, HostType > &ptl_count, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:514
int idx
Index in all_species.
Definition: species.hpp:78
bool verbose
Definition: load_balance.hpp:301
void for_all_nonadiabatic_species(F func, DevicePtlOpt device_ptl_opt=UseDevicePtl)
Definition: plasma.hpp:126
double prediction_undershoot
Definition: load_balance.hpp:34
double get_even_division(const View< double *, HostType > &input, int n) const
Definition: load_balance.hpp:305
Kokkos::View< double **, Kokkos::LayoutRight, HostType > count_ptl_per_node_elec_main_ion(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma, bool kinetic_electrons)
Definition: count_ptl_per_node.cpp:29
double observed_load_imbalance
Definition: load_balance.hpp:33
void update_pol_decomp()
Definition: domain_decomposition.tpp:213
void update_model(const View< int *, CLayout, HostType > &current_partition)
Definition: load_balance.hpp:487
long long int get_total_n_ptl()
Definition: species.hpp:693
void f0_redistribute(Plasma &plasma, const DomainDecomposition< DeviceType > &pol_decomp, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, const View< int *, CLayout, HostType > &old_partition)
Definition: f0_redistribute.cpp:184
void set_new_partition(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, WeightingAlgorithm weighting_algorithm)
Definition: load_balance.hpp:529
#define TIMER(N, F)
Definition: timer_macro.hpp:24
void rebalance(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, ReweightOption reweight_option, WeightingAlgorithm weighting_algorithm=WeightingAlgorithm::Default)
Definition: load_balance.hpp:688
void use_namelist(const string &namelist)
Definition: NamelistReader.hpp:355
ConstraintOption
Definition: load_balance.hpp:279
void update_model(const View< int *, CLayout, HostType > &current_partition, const std::vector< double > &manual_times)
Definition: load_balance.hpp:494
double get_estimated_time_per_vertex(int i) const
Definition: load_balance.hpp:157
int my_period_rank
Definition: load_balance.hpp:42
int nnodes_on_plane
Number of nodes on local plane.
Definition: domain_decomposition.hpp:53
void calculate_load_imbalance(double f0_cost)
void initialize_model()
Definition: load_balance.hpp:198
WeightingAlgorithm default_weighting_algorithm
Definition: load_balance.hpp:290
View< int *, CLayout, HostType > proposed_partition
Which processors get which vertices.
Definition: load_balance.hpp:294
double get_time_since_previous_call()
Definition: load_balance.hpp:92
bool get_model_is_initialized() const
Definition: load_balance.hpp:177
UpdateMethod
Definition: load_balance.hpp:18
std::string region_name
Definition: load_balance.hpp:26
bool verbose
Definition: load_balance.hpp:28
std::vector< LoadRegion > regions
Definition: load_balance.hpp:292
bool namelist_present(const string &namelist)
Definition: NamelistReader.hpp:351
UpdateMethod update_method
Definition: load_balance.hpp:46
bool model_is_initialized
Definition: load_balance.hpp:29
bool electron_on
Use kinetic electrons.
Definition: sml.hpp:63
void reset_cost_trackers()
void redistribute_load(const Simulation< DeviceType > &sml, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const VelocityGrid &vgrid, Plasma &plasma, DomainDecomposition< DeviceType > &pol_decomp, const View< int *, CLayout, HostType > &old_partition)
Definition: load_balance.hpp:563
double get_largest_predicted_time(const View< int *, CLayout, HostType > &proposed_partition) const
Definition: load_balance.hpp:182
void initialize_model()
Definition: load_balance.hpp:500
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:87
bool recommend_proposed_partition()
Definition: load_balance.hpp:449
void exit_XGC(std::string msg)
Definition: globals.hpp:37
bool greedily_fill_partition(const View< double *, HostType > &weight, const View< double *, HostType > &constraint1, double target_weight_per_rank)
Definition: load_balance.hpp:315
int n_periods
How many repeating periods there are; in tokamaks this is planes.
Definition: load_balance.hpp:40
Definition: magnetic_field.F90:1
Definition: load_balance.hpp:16
void print_new_partition()
Definition: load_balance.hpp:479
WeightingAlgorithm
Definition: load_balance.hpp:272
int assess_whether_to_rebalance_load()
View< double *, HostType > get_estimated_time_per_vertex() const
Definition: load_balance.hpp:161
Definition: plasma.hpp:13
double constraint1_max
Definition: load_balance.hpp:303
void set_weights(int *gvid0_pid, double *ptl_count, double *f0_node_cost)
View< double *, CLayout, HostType > count_all_ptl_per_node(const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, Plasma &plasma)
Definition: count_ptl_per_node.cpp:71
ReweightOption
Definition: load_balance.hpp:283
View< double *, CLayout, HostType > f0_node_cost
Definition: plasma.hpp:36
void shift_all_species(Plasma &plasma, const Grid< DeviceType > &grid, const MagneticField< DeviceType > &magnetic_field, const DomainDecomposition< DeviceType > &pol_decomp, Shift::ShiftPh0 shift_ph0)
Definition: shift.cpp:301
void reset_timer()
Definition: load_balance.hpp:152
Definition: species.hpp:75
void update_model(const View< int *, CLayout, HostType > &current_partition, double manual_time=-1.0)
Definition: load_balance.hpp:205
bool model_is_initialized()
Definition: load_balance.hpp:506
std::vector< std::string > timer_names
Definition: load_balance.hpp:27
Definition: shift.hpp:10
bool pol_decomp
Use poloidal decomposition.
Definition: domain_decomposition.hpp:46
double get_observed_max_region_time() const
Definition: load_balance.hpp:169
Definition: load_balance.hpp:270
void touch_timers()
Definition: load_balance.hpp:116
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:68
void update_model_no_history(const View< int *, CLayout, HostType > &current_partition, const View< double *, HostType > &all_periods_timings)
Definition: load_balance.hpp:48
double get_prediction_undershoot() const
Definition: load_balance.hpp:165
double get_observed_load_imbalance() const
Definition: load_balance.hpp:173
int get_max_n_ptl()
Definition: species.hpp:704
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
Kokkos::View< int *, Kokkos::LayoutRight, HostType > gvid0_pid_h
Which processors get which vertices (host)
Definition: domain_decomposition.hpp:63