XGCa
async_reassignment.hpp
Go to the documentation of this file.
1 #ifndef ASYNC_REASSIGNMENT_HPP
2 #define ASYNC_REASSIGNMENT_HPP
3 
4 #include "timer_macro.hpp"
5 #include "space_settings.hpp"
6 #include "globals.hpp"
7 #include "species.hpp"
9 #include "distribution.hpp"
10 #include "col_grid.hpp"
11 #include "velocity_grid.hpp"
12 #include "view_arithmetic.hpp"
13 #include "transfer_vertex_data.hpp"
14 #include "collisions.hpp"
15 
16 extern "C" void my_qsort_f(int left,int right,int* idx,float* psi_surf2_tmp,int npsi_surf2);
17 
19  static constexpr bool ASYNC_TRANSFER = true;
20  bool this_rank_sends_work; // Whether this rank sends work to another
21  bool this_rank_recvs_work; // Whether this rank receives work from another
26  int first_send_offset; // The offset of the first vertex sent from this rank, LOCAL index
27  int rank_sending_to_my_rank; // Algorithm currently assumes an underloaded rank can only receive work from one other
28  int recv_global_offset; // The global offset of the received work
29 
30  public:
31 
33 
34  // TOY PROBLEM
35  // set: send_plan, recv_plan, this_rank_sends_work, this_rank_recvs_work, rank_sending_to_my_rank, first_send_offset, recv_global_offset
37  static constexpr int N_RANKS_RECEIVING = 7; // Testing
38 
39  int n_ranks = pol_decomp.mpi.n_plane_ranks;
40  int my_rank = pol_decomp.mpi.my_plane_rank;
41 
42  int big_rank = n_ranks-1;
43 
44  int nnodes_on_big_rank = pol_decomp.gvid0_pid_h(big_rank+1) - pol_decomp.gvid0_pid_h(big_rank);
45  int nnodes_on_a_recv_rank = pol_decomp.gvid0_pid_h(1) - pol_decomp.gvid0_pid_h(0);
46  int excess_nodes = nnodes_on_big_rank - nnodes_on_a_recv_rank;
47  float fraction_excess_to_send = N_RANKS_RECEIVING*1.0/(1.0+N_RANKS_RECEIVING);
48  int nnodes_to_send = excess_nodes*fraction_excess_to_send;
49 
50  // Round down to nearest N_RANKS_RECEIVING for convenience until offset info is sent too
51  nnodes_to_send /= N_RANKS_RECEIVING;
52  nnodes_to_send *= N_RANKS_RECEIVING;
53 
54  int nnodes_to_each = nnodes_to_send/N_RANKS_RECEIVING;
55 
56  if(my_rank==big_rank){
57  this_rank_sends_work = true;
58 
59  // Used to unload buffer
60  first_send_offset = nnodes_on_big_rank - nnodes_to_send;
61 
62  for(int i=0; i<N_RANKS_RECEIVING; i++){
63  send_plan.cnts(i) = nnodes_to_each;
64  send_plan.displs(i) = first_send_offset + i*nnodes_to_each;
65  }
66  }else if(my_rank<N_RANKS_RECEIVING){
67  this_rank_recvs_work = true;
68  rank_sending_to_my_rank = big_rank;
69 
70  recv_plan.cnts(rank_sending_to_my_rank) = nnodes_to_each;
72 
73  int sending_rank_node_offset = pol_decomp.gvid0_pid_h(big_rank) - 1;
74  int offset_of_all_nnodes_sent = sending_rank_node_offset + nnodes_on_big_rank - nnodes_to_send;
75  recv_global_offset = offset_of_all_nnodes_sent + nnodes_to_each*recv_plan.my_rank;
76  }
77  }
78 
79  // Sets send_plan, recv_plan, this_rank_sends_work, this_rank_recvs_work, rank_sending_to_my_rank, first_send_offset, recv_global_offset
80  void rebalance(const CollisionGrid<DeviceType>& col_grid, const DomainDecomposition<DeviceType>& pol_decomp, const VertexList& assigned_original){
81  int n_ranks = pol_decomp.mpi.n_plane_ranks;
82  int my_rank = pol_decomp.mpi.my_plane_rank;
83 
84  // Sum col_grid.timing_all for each rank
85  View<float*, HostType> rank_time("rank_time",n_ranks);
86  for(int i=0; i<n_ranks; i++){
87  for(int inode=pol_decomp.gvid0_pid_h(i)-1; inode<pol_decomp.gvid0_pid_h(i+1)-1; inode++){
88  rank_time(i) += col_grid.timing_all(inode);
89  }
90  }
91 
92  // Get sort index of ranks
93  View<int*, HostType> sort_index_r(NoInit("sort_index"),n_ranks);
94  for(int i=0; i<n_ranks; i++) sort_index_r(i) = i + 1; // idx is one-indexed because it is reordered in Fortran
95  int left=1; int right=n_ranks;
96  my_qsort_f(left,right,sort_index_r.data(),rank_time.data(),n_ranks);
97  View<int*, HostType> sort_index(NoInit("sort_index"),n_ranks);
98  for(int i=0; i<n_ranks; i++) sort_index(i) = sort_index_r(n_ranks-1-i) - 1;
99 
100  // Get average time per rank
101  float average_time = sum_view(rank_time)/rank_time.size();
102 
103  // Adjust to more realistic target
104  constexpr float IMBALANCE_MAX = 1.2;
105  float target_time = average_time*IMBALANCE_MAX;
106  float target_recv_time = average_time*1.1; // ?
107 
108 if(is_rank_zero()){
109  printf("\nAsync collisions: average: %1.3e, target for sender: %1.3e, target for recver: %1.3e\n", average_time, target_time, target_recv_time);
110 }
111 
112  int irecv_sorted = n_ranks-1;
113 
114  constexpr int MIN_PACKET_SIZE = 8; // Don't bother sending fewer vertices
115  int last_packet_size = MIN_PACKET_SIZE;
116 
117  // Start with most expensive rank
118  for(int isend_sorted=0; isend_sorted<n_ranks; isend_sorted++){
119  int isend = sort_index(isend_sorted);
120  int last_node = pol_decomp.gvid0_pid_h(isend+1) - 2;
121  int first_node = pol_decomp.gvid0_pid_h(isend) - 1;
122 
123  float time_to_send = rank_time(isend) - target_time;
124  int irecv = sort_index(irecv_sorted);
125  float recv_max = target_recv_time - rank_time(irecv);
126 
127  // Nothing to send
128  if(time_to_send<=0.0){
129  //continue;
130  // Since we are sorted by time, we can break from the loop rather than continue
131  break;
132  }
133 
134  // No space to receive. Since we are sorted by time, we can break from the loop rather than continue to next receiver
135  if(recv_max<=0.0){
136  break;
137  }
138 
139  if(last_packet_size<MIN_PACKET_SIZE){
140  break; // Assume that it's not worth sending a small packet, and that the loop is roughly organized by possible packet size (may not be true due to variations in n_iterations and n_subcycles)
141  }
142 
143  int current_n_to_send = 0;
144  int current_n_real_to_send = 0; // Counts only vertices with nonzero timing
145  float current_time = 0.0;
146  float cumul_time = 0.0;
147  // Work backwards loading nodes (must leave at least one node, so use > rather than >=
148  for(int inode=last_node; inode>first_node; inode--){ // This is GLOBAL index
149  // Add vertex to current packet
150  current_n_to_send++;
151  if(col_grid.timing_all(inode)!=0.0) current_n_real_to_send++;
152  current_time += col_grid.timing_all(inode);
153  cumul_time += col_grid.timing_all(inode);
154 
155  bool done_sending = (current_time>=time_to_send || inode==first_node+1);
156  bool done_recving = (current_time>=recv_max);
157 
158  bool packet_complete = (done_sending || done_recving);
159 
160  if(packet_complete){
161  // If the packet is too small, break from the recv loop; the sender loop will break too
162  last_packet_size = current_n_real_to_send;
163  if(last_packet_size<MIN_PACKET_SIZE) break;
164 
165  // Assign the packet
166  if(my_rank==isend){
167  send_plan.cnts(irecv) = current_n_to_send;
168  send_plan.displs(irecv) = inode - pol_decomp.node_offset;
169  this_rank_sends_work = true; // Updated repeatedly
170  first_send_offset = send_plan.displs(irecv); // Updated repeatedly, last is correct
171  }
172  if(my_rank==irecv){
173  recv_plan.cnts(isend) = current_n_to_send;
174  recv_plan.displs(isend) = 0;
175  this_rank_recvs_work = true;
176  rank_sending_to_my_rank = isend;
177  recv_global_offset = inode;
178  }
179 if(is_rank_zero()) printf("\n Packet: %d recv from %d: %d nds [%d-%d] (%d real) (time est: %1.3e; %1.3e->%1.3e, %1.3e->%1.3e)", irecv, isend, current_n_to_send,inode, inode+current_n_to_send, current_n_real_to_send, current_time, rank_time(irecv), rank_time(irecv)+current_time, rank_time(isend)-cumul_time+current_time, rank_time(isend)-cumul_time);
180 
181  // Move to next recver, *even if this one is not full*
182  irecv_sorted -= 1;
183  irecv = sort_index(irecv_sorted);
184  recv_max = target_recv_time - rank_time(irecv);
185 
186  if(recv_max<=0.0) break; // Next receiver can't accept anything, we are done
187 
188  // Reset counters
189  current_n_to_send = 0;
190  current_n_real_to_send = 0;
191  current_time = 0.0;
192  }
193 
194  // If all nodes that need to be reassigned have been, move to next sender
195  if(done_sending) break;
196  } // recver loop
197  } // sender loop
198  }
199 
200  void rebalance_plan(const CollisionGrid<DeviceType>& col_grid, const DomainDecomposition<DeviceType>& pol_decomp, VertexList& assigned_original){
201  int n_ranks = pol_decomp.mpi.n_plane_ranks;
202  int my_rank = pol_decomp.mpi.my_plane_rank;
203 
204  // Initialize plans to 0.0
205  send_plan = DistributionPlan(my_rank, n_ranks);
206  recv_plan = DistributionPlan(my_rank, n_ranks);
207  Kokkos::deep_copy(send_plan.cnts, 0.0);
208  Kokkos::deep_copy(send_plan.displs, 0.0);
209  Kokkos::deep_copy(recv_plan.cnts, 0.0);
210  Kokkos::deep_copy(recv_plan.displs, 0.0);
211 
212  // Set send_plan, recv_plan, this_rank_sends_work, this_rank_recvs_work, rank_sending_to_my_rank, first_send_offset
213  //toy_problem_rebalance(col_grid, pol_decomp);
214  rebalance(col_grid, pol_decomp, assigned_original);
215 
216  // Adjust assignments
218  // Shorten assignment to not include the sent nodes
219  assigned_original = assigned_original & VertexList(pol_decomp.node_offset, pol_decomp.node_offset + first_send_offset);
220  }
222  GPTLstart("F_COL_ASSGN");
223  // Get sending rank's assignment
224  VertexList assigned_to_sender = col_grid.vertices & pol_decomp.vertex_list(rank_sending_to_my_rank);
225  //bool symmetric_f = false;
226  //if(symmetric_f || !pol_decomp.pol_decomp) mpi_split_assignments(pol_decomp, symmetric_f, assigned_to_sender); // BUG - uses wrong my_rank
227  GPTLstop("F_COL_ASSGN");
228 
229  GPTLstart("F_COL_ASSGN2");
230  int nodes_assigned = recv_plan.cnts(rank_sending_to_my_rank);
231  assigned = assigned_to_sender & VertexList(recv_global_offset, recv_global_offset+nodes_assigned);
232  GPTLstop("F_COL_ASSGN2");
233  }
234 
235  // result plan is reversed:
236  res_send_plan = DistributionPlan(my_rank, n_ranks);
237  res_recv_plan = DistributionPlan(my_rank, n_ranks);
238  Kokkos::deep_copy(res_send_plan.cnts, recv_plan.cnts);
239  Kokkos::deep_copy(res_recv_plan.cnts, send_plan.cnts);
240  Kokkos::deep_copy(res_send_plan.displs, recv_plan.displs);
241  Kokkos::deep_copy(res_recv_plan.displs, send_plan.displs);
242 
244  // res_recv_plan (and recv_plan) are mappings into a buffer, so start at 0.0
245  for(int i=0; i<res_recv_plan.displs.size(); i++){
247  }
248  }
249  }
250 
252  const View<double*,CLayout,HostType>& node_cost, VertexList& assigned_original)
254  {
255  // Choose rebalance plan and split assignment
256  GPTLstart("ASYNC_COL_REBALANCE");
257  rebalance_plan(col_grid, pol_decomp, assigned_original);
258  GPTLstop("ASYNC_COL_REBALANCE");
259 
260  if(this_rank_sends_work && this_rank_recvs_work) exit_XGC("\nError: Cannot send and receive work\n");
261 
263  GPTLstart("ASYNC_COL_RECV_INPUTS");
264  // irecv excess vertices
265  inp_buffer = transfer_data(send_plan, recv_plan, pol_decomp.mpi, ASYNC_TRANSFER, col_spall.den_moment, col_spall.temp_moment, col_spall.f, col_spall.fg_temp_ev_all);
266  GPTLstop("ASYNC_COL_RECV_INPUTS");
267  }
268 
270  GPTLstart("ASYNC_COL_SEND_INPUTS");
271  // Send excess vertices
272  inp_buffer = transfer_data(send_plan, recv_plan, pol_decomp.mpi, ASYNC_TRANSFER, col_spall.den_moment, col_spall.temp_moment, col_spall.f, col_spall.fg_temp_ev_all);
273  GPTLstop("ASYNC_COL_SEND_INPUTS");
274 
275  if(ASYNC_TRANSFER){
276  GPTLstart("ASYNC_COL_RECV_RESULTS");
277  // Irecv results
278  res_buffer = transfer_data(res_send_plan, res_recv_plan, pol_decomp.mpi, ASYNC_TRANSFER, df0g_tmp, node_cost);
279  GPTLstop("ASYNC_COL_RECV_RESULTS");
280  }
281  }
282  }
283 
284  void execute(const CollisionGrid<DeviceType>& col_grid, double dt, const DomainDecomposition<DeviceType>& pol_decomp, const CollisionSpecies<DeviceType>& col_spall, const View<int*,CLayout,HostType>& converged_all, const VGridDistribution<HostType>& df0g_tmp,
285  const View<double*,CLayout,HostType>& node_cost){
287  GPTLstart("ASYNC_COL_AWAIT_INPUTS");
288  // Wait for the data to arrive from other rank
290  GPTLstop("ASYNC_COL_AWAIT_INPUTS");
291 
292  // Resize decomposed allocations whose data was placed in the buffer
293  int nnode = inp_buffer.nnode();
294  GPTLstart("ASYNC_COL_SPALL2");
295  CollisionSpecies<DeviceType> col_spall2(col_spall, nnode);
296  GPTLstop("ASYNC_COL_SPALL2");
297  GPTLstart("ASYNC_COL_UNLOAD_INPUTS");
298  inp_buffer.unload(0, col_spall2.den_moment, col_spall2.temp_moment, col_spall2.f, col_spall2.fg_temp_ev_all);
299  GPTLstop("ASYNC_COL_UNLOAD_INPUTS");
300 
301  // Allocate df0g_tmp and set to zero to ensure that skipped/unconverged nodes don't contribute
302  GPTLstart("ASYNC_COL_DF0G2");
303  VGridDistribution<HostType> df0g_tmp2(col_spall2.f.n_species(), col_spall2.f);
304  GPTLstop("ASYNC_COL_DF0G2");
305 
306  // Set up some required inputs/outputs
307  int node_offset = recv_global_offset;
308  const View<int*,CLayout,HostType> converged_local = Kokkos::subview(converged_all, Kokkos::make_pair(node_offset,node_offset+nnode));
309  const View<int*,CLayout,HostType> n_subcycles_local = Kokkos::subview(col_grid.n_subcycles, Kokkos::make_pair(node_offset,node_offset+nnode));
310  GPTLstart("ASYNC_COL_NC2");
311  View<double*,CLayout,HostType> node_cost2("node_cost2", nnode);
312  GPTLstop("ASYNC_COL_NC2");
313 
314  GPTLstart("F_COL_SHIFT_ASSGN");
315  // Shift to correspond to local nodes
316  assigned.shift(-node_offset);
317  GPTLstop("F_COL_SHIFT_ASSGN");
318 
319  // Run newly assigned work
320  GPTLstart("ASYNC_COL_SUBCYCLE");
321  subcycle_collisions(col_grid, col_spall2, dt, n_subcycles_local, assigned, converged_local, df0g_tmp2, node_cost2);
322  GPTLstop("ASYNC_COL_SUBCYCLE");
323 
324  // Irecv results. Note for results, send_plan and recv_plan are reversed
325  // No need to send converged_local, because converged_all is all-reduced below
326  GPTLstart("ASYNC_COL_SEND_RESULTS");
327  res_buffer = transfer_data(res_send_plan, res_recv_plan, pol_decomp.mpi, ASYNC_TRANSFER, df0g_tmp2, node_cost2);
328  GPTLstop("ASYNC_COL_SEND_RESULTS");
329 
330  // Wait on sends, seems unavoidable
331  GPTLstart("ASYNC_COL_AWAIT_RES_SENDS");
333  GPTLstop("ASYNC_COL_AWAIT_RES_SENDS");
334  }
335 
337  // Make sure sends are complete. This could/should be done earlier (mid-collisions) but not sure how right now. Probably need to reserve a thread for that
338  GPTLstart("ASYNC_COL_AWAIT_INP_SENDS");
340  GPTLstop("ASYNC_COL_AWAIT_INP_SENDS");
341 
342  if(!ASYNC_TRANSFER){
343  GPTLstart("ASYNC_COL_RECV_RESULTS");
344  // Irecv results
345  res_buffer = transfer_data(res_send_plan, res_recv_plan, pol_decomp.mpi, ASYNC_TRANSFER, df0g_tmp, node_cost);
346  GPTLstop("ASYNC_COL_RECV_RESULTS");
347  }
348 
349  // Wait for the results to arrive from other rank
350  GPTLstart("ASYNC_COL_AWAIT_RESULTS");
352  GPTLstop("ASYNC_COL_AWAIT_RESULTS");
353 
354  // Unload into local result arrays
355  GPTLstart("ASYNC_COL_UNLOAD_RESULTS");
356  res_buffer.unload(first_send_offset, df0g_tmp, node_cost);
357  GPTLstop("ASYNC_COL_UNLOAD_RESULTS");
358  }
359  }
360 };
361 
362 #endif
void my_qsort_f(int left, int right, int *idx, float *psi_surf2_tmp, int npsi_surf2)
Definition: async_reassignment.hpp:18
DistributionPlan send_plan
Definition: async_reassignment.hpp:22
DistributionPlan res_recv_plan
Definition: async_reassignment.hpp:23
int rank_sending_to_my_rank
Definition: async_reassignment.hpp:27
AsyncReassignment(const DomainDecomposition< DeviceType > &pol_decomp, const CollisionGrid< DeviceType > &col_grid, const CollisionSpecies< DeviceType > &col_spall, const VGridDistribution< HostType > &df0g_tmp, const View< double *, CLayout, HostType > &node_cost, VertexList &assigned_original)
Definition: async_reassignment.hpp:251
void execute(const CollisionGrid< DeviceType > &col_grid, double dt, const DomainDecomposition< DeviceType > &pol_decomp, const CollisionSpecies< DeviceType > &col_spall, const View< int *, CLayout, HostType > &converged_all, const VGridDistribution< HostType > &df0g_tmp, const View< double *, CLayout, HostType > &node_cost)
Definition: async_reassignment.hpp:284
AsyncReassignment()
Definition: async_reassignment.hpp:32
void rebalance(const CollisionGrid< DeviceType > &col_grid, const DomainDecomposition< DeviceType > &pol_decomp, const VertexList &assigned_original)
Definition: async_reassignment.hpp:80
bool this_rank_sends_work
Definition: async_reassignment.hpp:20
DistributionPlan recv_plan
Definition: async_reassignment.hpp:22
bool this_rank_recvs_work
Definition: async_reassignment.hpp:21
static constexpr bool ASYNC_TRANSFER
Definition: async_reassignment.hpp:19
VertexBuffer< HostType > inp_buffer
Definition: async_reassignment.hpp:24
int recv_global_offset
Definition: async_reassignment.hpp:28
void toy_problem_rebalance(const CollisionGrid< DeviceType > &col_grid, const DomainDecomposition< DeviceType > &pol_decomp)
Definition: async_reassignment.hpp:36
VertexBuffer< HostType > res_buffer
Definition: async_reassignment.hpp:24
VertexList assigned
Definition: async_reassignment.hpp:25
void rebalance_plan(const CollisionGrid< DeviceType > &col_grid, const DomainDecomposition< DeviceType > &pol_decomp, VertexList &assigned_original)
Definition: async_reassignment.hpp:200
DistributionPlan res_send_plan
Definition: async_reassignment.hpp:23
int first_send_offset
Definition: async_reassignment.hpp:26
VertexList vertices
Definition: col_grid.hpp:135
View< float *, CLayout, HostType > timing_all
Definition: col_grid.hpp:133
View< int *, CLayout, HostType > n_subcycles
Number of subcycles for each vertex.
Definition: col_grid.hpp:132
Definition: col_species.hpp:105
View< double **, CLayout, HostType > den_moment
Definition: col_species.hpp:119
View< double **, CLayout, HostType > temp_moment
Definition: col_species.hpp:120
const VGridDistribution< HostType > f
Definition: col_species.hpp:107
std::vector< View< double *, CLayout, HostType > > fg_temp_ev_all
Definition: col_species.hpp:123
VertexList vertex_list() const
Definition: domain_decomposition.cpp:279
Kokkos::View< int *, Kokkos::LayoutRight, HostType > gvid0_pid_h
Which processors get which vertices (host)
Definition: domain_decomposition.hpp:95
int node_offset
Offset of first mesh node belonging to this MPI rank.
Definition: domain_decomposition.hpp:91
KOKKOS_INLINE_FUNCTION int n_species() const
Definition: vgrid_distribution.hpp:187
Definition: vertex_list.hpp:53
void shift(int shift_val)
Definition: vertex_list.cpp:246
void subcycle_collisions(const CollisionGrid< DeviceType > &col_grid, const CollisionSpecies< DeviceType > &col_spall, double dt, const View< int *, CLayout, HostType > &n_subcycles_local, VertexList &assigned, const View< int *, CLayout, HostType > &converged_local, const VGridDistribution< HostType > &df0g_tmp, const View< double *, CLayout, HostType > &node_cost)
Definition: collisions.cpp:188
void exit_XGC(std::string msg)
Definition: globals.hpp:37
bool is_rank_zero()
Definition: globals.hpp:27
logical false
Definition: module.F90:102
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:69
Definition: domain_decomposition.hpp:15
View< int *, CLayout, HostType > displs
Definition: domain_decomposition.hpp:17
View< int *, CLayout, HostType > cnts
Definition: domain_decomposition.hpp:16
int my_rank
Definition: domain_decomposition.hpp:18
void unload(int vertex_offset, const Vs &... arrays) const
Definition: transfer_vertex_data.hpp:250
void await_sends()
Definition: transfer_vertex_data.hpp:269
int nnode() const
Definition: transfer_vertex_data.hpp:257
void await_recvs()
Definition: transfer_vertex_data.hpp:261
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
VertexBuffer< HostType > transfer_data(const DistributionPlan &send_plan, const DistributionPlan &recv_plan, const MyMPI &mpi, bool async, const Vs &... arrays)
Definition: transfer_vertex_data.hpp:282
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:135