XGCa
transfer_vertex_data.hpp
Go to the documentation of this file.
1 #ifndef TRANSFER_VERTEX_DATA_HPP
2 #define TRANSFER_VERTEX_DATA_HPP
3 
5 
6 inline int pair(int np, int p, int k){
7  int out = p ^ k;
8  return (out<np ? out : -1);
9 }
10 
11 
12 template<typename T>
13 inline int n_doubles_per_vertex(const T& array);
14 
15 // 1D view has 1 value per vertex
16 template<>
17 inline int n_doubles_per_vertex(const View<double*,CLayout,HostType>& array){
18  return 1;
19 }
20 
21 // 2D view has extent(1) values per vertex
22 template<>
23 inline int n_doubles_per_vertex(const View<double**,CLayout,HostType>& array){
24  return array.extent(1);
25 }
26 
27 // VGridDistribution has species*vr*vz values per vertex
28 template<>
30  return array.n_species()*array.n_vr()*array.n_vz();
31 }
32 
33 // std::vector<View<double*,CLayout,HostType>> has array.size()
34 template<>
35 inline int n_doubles_per_vertex(const std::vector<View<double*,CLayout,HostType>>& array){
36  return array.size();
37 }
38 
39 /* size_sum is used to determine the necessary buffer size when using the constructor that takes a template pack of Views
40  * The final template has only one View left, so returns the size of that View.
41  */
42 template<class V>
43 inline int total_n_doubles_per_vertex(const V& first) {
44  return n_doubles_per_vertex(first);
45 }
46 
47 /* size_sum is used to determine the necessary buffer size when using the constructor that takes a template pack of Views
48  * The generic template adds the first View in the template pack, then returns the size_sum of the remaining Views in the pack
49  */
50 template<class V, class... TRest>
51 inline int total_n_doubles_per_vertex(const V& first, const TRest&... args) {
52  return n_doubles_per_vertex(first) + total_n_doubles_per_vertex(args...);
53 }
54 
55 // Access values of a vertex i of a datatype
56 template<typename T>
57 KOKKOS_INLINE_FUNCTION double& vertex_access(const T& array, int i, int ip);
58 
59 // 1D view: The single index is assumed to be the vertex index
60 template<>
61 KOKKOS_INLINE_FUNCTION double& vertex_access(const View<double*,CLayout,HostType>& array, int i, int ip){
62  return array(i);
63 }
64 
65 // 2D view: The first index is assumed to be the vertex index
66 template<>
67 KOKKOS_INLINE_FUNCTION double& vertex_access(const View<double**,CLayout,HostType>& array, int i, int ip){
68  return array(i,ip);
69 }
70 
71 // VGridDistribution: Use the function VGridDistribution::pull_node_index for correct access
72 template<>
73 KOKKOS_INLINE_FUNCTION double& vertex_access(const VGridDistribution<HostType>& array, int i, int ip){
74  return array.pull_node_index(i, ip);
75 }
76 
77 
78 // Resize a data type to a new number of vertices
79 template<typename T>
80 inline void resize_n_vertices(int nnodes, T& array);
81 
82 template<typename T, typename... TRest>
83 inline void resize_n_vertices(int nnodes, T& array, TRest&... args){
84  resize_n_vertices(nnodes, array);
85  resize_n_vertices(nnodes, args...);
86 }
87 
88 // 1D view
89 template<>
90 inline void resize_n_vertices(int nnodes, View<double*,CLayout,HostType>& array){
91  array = View<double*,CLayout,HostType>(NoInit(array.label()), nnodes);
92 }
93 
94 // 2D view
95 template<>
96 inline void resize_n_vertices(int nnodes, View<double**,CLayout,HostType>& array){
97  array = View<double**,CLayout,HostType>(NoInit(array.label()), nnodes, array.extent(1));
98 }
99 
100 // VGridDistribution
101 template<>
102 inline void resize_n_vertices(int nnodes, VGridDistribution<HostType>& array){
103  array.resize_n_vertices(nnodes);
104 }
105 
106 // Load a single array to a buffer. The buffer is size (nvertices, nvals_per_vertex)
107 // n is the number of vertices being unloaded
108 // new_offset is where to write the array relative to the vertex range of the buffer (first dimension)
109 // old_offset is where to read the array relative to the vertex range of the array
110 // buffer is the buffer being written to
111 // arr_offset is where the values for the array are stored inside buffer (second dimension)
112 // array is the array being read from
113 template<class V>
114 inline void load_arrays(int n, int new_offset, int old_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const V& array) {
115  int n_vals = n_doubles_per_vertex(array);
116  Kokkos::parallel_for("pol_decomp_redist_load", Kokkos::RangePolicy<typename V::execution_space>(0, n), KOKKOS_LAMBDA(const int i){
117  int inew = i + new_offset;
118  int iold = i + old_offset;
119  for(int ip = 0; ip<n_vals; ip++){
120  buffer(inew,ip+arr_offset) = vertex_access(array, iold, ip);
121  }
122  });
123  Kokkos::fence();
124  arr_offset += n_vals;
125 }
126 
127 // Load multiple arrays by unloading the first in the pack, then calling this function recursively
128 template<class V, class... TRest>
129 inline void load_arrays(int n, int new_offset, int old_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const V& first, const TRest&... args) {
130  load_arrays(n, new_offset, old_offset, buffer, arr_offset, first);
131  load_arrays(n, new_offset, old_offset, buffer, arr_offset, args...);
132 }
133 
134 // Specialize for std::vector<View<double*,CLayout,HostType>>& array
135 template<>
136 inline void load_arrays(int n, int new_offset, int old_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const std::vector<View<double*,CLayout,HostType>>& array) {
137  int n_vals = n_doubles_per_vertex(array);
138  for(int ip=0; ip<n_vals; ip++){
139  auto view = array[ip];
140  Kokkos::parallel_for("pol_decomp_redist_load", Kokkos::RangePolicy<HostExSpace>(0, n), KOKKOS_LAMBDA(const int i){
141  int inew = i + new_offset;
142  int iold = i + old_offset;
143  buffer(inew,ip+arr_offset) = view(iold);
144  });
145  }
146  Kokkos::fence();
147  arr_offset += n_vals;
148 }
149 
150 // Load the entire buffer. Cannot use unload_arrays directly because arr_offset needs to be initialized to 0.
151 template<class... Vs>
152 inline void load_buffer(int n, int new_offset, int old_offset, const View<double**,CLayout,HostType>& buffer, const Vs&... arrays){
153  if(n<=0) return;
154 
155  int arr_offset = 0;
156  load_arrays(n, new_offset, old_offset, buffer, arr_offset, arrays...);
157 }
158 
159 // Unload a single array from a buffer. The buffer is size (nvertices, nvals_per_vertex)
160 // n is the number of vertices being unloaded
161 // buffer is the buffer being read from
162 // arr_offset is where the values for the array are stored inside buffer (second dimension)
163 // array is the array being written to
164 template<class V>
165 inline void unload_arrays(int n, int vert_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const V& array) {
166  int n_vals = n_doubles_per_vertex(array);
167  Kokkos::parallel_for("pol_decomp_redist_unload", Kokkos::RangePolicy<typename V::execution_space>(0, n), KOKKOS_LAMBDA(const int i){
168  for(int ip = 0; ip<n_vals; ip++){
169  vertex_access(array, i+vert_offset, ip) = buffer(i,ip+arr_offset);
170  }
171  });
172  Kokkos::fence();
173  arr_offset += n_vals;
174 }
175 
176 // Unload multiple arrays by unloading the first in the pack, then calling this function recursively
177 template<class V, class... TRest>
178 inline void unload_arrays(int n, int vert_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const V& first, const TRest&... args) {
179  unload_arrays(n, vert_offset, buffer, arr_offset, first);
180  unload_arrays(n, vert_offset, buffer, arr_offset, args...);
181 }
182 
183 // Specific specialization std::vector<View<double*,CLayout,HostType>>
184 template<>
185 inline void unload_arrays(int n, int vert_offset, const View<double**,CLayout,HostType>& buffer, int& arr_offset, const std::vector<View<double*,CLayout,HostType>>& array) {
186  int n_vals = n_doubles_per_vertex(array);
187  for(int ip=0; ip<n_vals; ip++){
188  auto view = array[ip];
189  Kokkos::parallel_for("pol_decomp_redist_unload", Kokkos::RangePolicy<HostExSpace>(0, n), KOKKOS_LAMBDA(const int i){
190  view(i+vert_offset) = buffer(i,ip+arr_offset);
191  });
192  }
193  Kokkos::fence();
194  arr_offset += n_vals;
195 }
196 
197 
198 // Count number of recvs. If not too many, prepost all receive requests (using XOR ordering and flow control)
200  int n_recv = 0;
201  for(int i=0;i<plan.cnts.size();i++){
202  if(plan.cnts(i)>0 && i!=plan.my_rank){ // Skip my_rank because those vertices are not received from elsewhere
203  n_recv+=1;
204  }
205  }
206  constexpr int MAXPREPOSTS = 20; // nothing magical to this number - just be reasonable
207  return (n_recv <= MAXPREPOSTS);
208 }
209 
210 inline int get_max_buf_size(const DistributionPlan& plan){
211  int max_buf_size = 1; // Minimum size 1 (0 probably ok)
212  for(int i=0;i<plan.cnts.size();i++){
213  if(i!=plan.my_rank){ // Skip my_rank because those vertices are not being sent anywhere
214  max_buf_size = std::max(max_buf_size, plan.cnts(i));
215  }
216  }
217  return max_buf_size;
218 }
219 
220 inline int get_sum_counts(const DistributionPlan& plan){
221  return sum_view(plan.cnts);
222 }
223 
224 template<class Device>
226  View<double**,CLayout,Device> view;
227  std::vector<MPI_Request> rrequests;
228  View<bool*,CLayout,HostType> rreq_assigned;
229 
230  std::vector<View<double**,CLayout,Device>> send_buffers;
231  std::vector<MPI_Request> srequests;
232  View<bool*,CLayout,HostType> sreq_assigned;
233 
235 
236  VertexBuffer(int nnode_in, int n_dbl_per_vertex, int n_ranks)
237  : view(NoInit("tmp_redistribute"), nnode_in, n_dbl_per_vertex),
238  rrequests(n_ranks),
239  rreq_assigned(NoInit("rreq_assigned"), n_ranks),
240  send_buffers(n_ranks),
241  srequests(n_ranks),
242  sreq_assigned(NoInit("sreq_assigned"), n_ranks)
243  {
244  Kokkos::deep_copy(rreq_assigned, false);
245  Kokkos::deep_copy(sreq_assigned, false);
246  }
247 
248  // Unload the entire buffer
249  template<class... Vs>
250  void unload(int vertex_offset, const Vs&... arrays) const{
251  if(nnode()<=0) return;
252 
253  int arr_offset = 0;
254  unload_arrays(nnode(), vertex_offset, view, arr_offset, arrays...);
255  }
256 
257  int nnode() const{
258  return view.extent(0);
259  }
260 
261  void await_recvs(){
262  for(int i=0; i<rrequests.size(); i++){
263  if(rreq_assigned(i)){
264  MPI_Wait(&(rrequests[i]), MPI_STATUS_IGNORE);
265  }
266  }
267  }
268 
269  void await_sends(){
270  for(int i=0; i<srequests.size(); i++){
271  if(sreq_assigned(i)){
272  MPI_Wait(&(srequests[i]), MPI_STATUS_IGNORE);
273 
274  // Can release send buffer now that sending is confirmed complete
275  send_buffers[i] = View<double**,CLayout,Device>();
276  }
277  }
278  }
279 };
280 
281 template<class... Vs>
282 inline VertexBuffer<HostType> transfer_data(const DistributionPlan& send_plan, const DistributionPlan& recv_plan, const MyMPI& mpi, bool async, const Vs&... arrays){
283  GPTLstart("TRANSFER_DATA_SETUP");
284  int n_dbl_per_vertex = total_n_doubles_per_vertex(arrays...);
285  int new_nnodes = get_sum_counts(recv_plan);
286  MPI_Comm comm = mpi.plane_comm;
287  int my_rank = mpi.my_plane_rank;
288  int n_ranks = mpi.n_plane_ranks;
289  GPTLstop("TRANSFER_DATA_SETUP");
290 
291  GPTLstart("TRANSFER_DATA_BUFFER_SETUP");
292  VertexBuffer<HostType> vertex_buffer(new_nnodes, n_dbl_per_vertex, n_ranks);
293  GPTLstop("TRANSFER_DATA_BUFFER_SETUP");
294 
295  constexpr int MSG_TAG = 5; // Arbitrary
296  constexpr int SIGNAL_TAG = 10; // Arbitrary
297 
298  // Count number of recvs. If not too many, prepost all receive requests (using XOR ordering and flow control)
299  GPTLstart("TRANSFER_DATA_PREPOST_SETUP");
300  const bool prepost_receive_requests = do_prepost_receive_requests(recv_plan);
301  GPTLstop("TRANSFER_DATA_PREPOST_SETUP");
302 
303  GPTLstart("TRANSFER_DATA_PREPOST_IRECV");
304  int p=1; while ( p < n_ranks ) p*=2; // get next power of 2 above n_ranks
305  if(prepost_receive_requests){
306  for(int i=1;i<p;i++){
307  int pid = pair(n_ranks,i,my_rank);
308  if (pid >= 0){
309  if (recv_plan.cnts(pid) > 0){
310  double* recv_addr = &vertex_buffer.view(recv_plan.displs(pid),0);
311  MPI_Irecv(recv_addr, recv_plan.cnts(pid)*n_dbl_per_vertex, MPI_DOUBLE, pid, MSG_TAG, comm, &(vertex_buffer.rrequests[pid]));
312  vertex_buffer.rreq_assigned(pid) = true;
313  if(!async){
314  int rsignal;
315  MPI_Send(&rsignal, 1, MPI_INTEGER, pid, SIGNAL_TAG, comm);
316  }
317  }
318  }
319  }
320  }
321  GPTLstop("TRANSFER_DATA_PREPOST_IRECV");
322 
323  GPTLstart("TRANSFER_DATA_LOAD_STAY_BUFFER");
324  // Copy what is staying on this rank
325  load_buffer(send_plan.cnts(my_rank), recv_plan.displs(my_rank), send_plan.displs(my_rank), vertex_buffer.view, arrays...);
326  GPTLstop("TRANSFER_DATA_LOAD_STAY_BUFFER");
327 
328  // Copy information that stays on the same process
329  // send/recv data using XOR ordering and flow control
330  GPTLstart("TRANSFER_DATA_ALLOC_SENDARR_SH");
331  View<double**,CLayout,HostType> sendarr_shared;
332  if(!async){
333  int n_sendarr = get_max_buf_size(send_plan);
334  sendarr_shared = View<double**,CLayout,HostType>(NoInit("sendarr_shared"), n_sendarr, n_dbl_per_vertex);
335  }
336  GPTLstop("TRANSFER_DATA_ALLOC_SENDARR_SH");
337 
338  for(int i=1;i<p;i++){
339  int pid = pair(n_ranks,i,my_rank);
340  if (pid >= 0){
341  if(!prepost_receive_requests){
342  GPTLstart("TRANSFER_DATA_IRECV");
343  if(recv_plan.cnts(pid) > 0){
344  // Post receive request
345  double* recv_addr = &vertex_buffer.view(recv_plan.displs(pid),0);
346  MPI_Irecv(recv_addr, recv_plan.cnts(pid)*n_dbl_per_vertex, MPI_DOUBLE, pid, MSG_TAG, comm, &(vertex_buffer.rrequests[pid]));
347  vertex_buffer.rreq_assigned(pid) = true;
348  if(!async){
349  int rsignal;
350  MPI_Send(&rsignal, 1, MPI_INTEGER, pid, SIGNAL_TAG, comm);
351  }
352  }
353  GPTLstop("TRANSFER_DATA_IRECV");
354  }
355 
356  if (send_plan.cnts(pid) > 0){
357  View<double**,CLayout,HostType> sendarr;
358  if(async){
359  GPTLstart("TRANSFER_DATA_ALLOC_SENDARR");
360  // Hang onto allocations if not using a blocking send
361  vertex_buffer.send_buffers[pid] = View<double**,CLayout,HostType>(NoInit("sendarr"), send_plan.cnts(pid), n_dbl_per_vertex);
362  sendarr = vertex_buffer.send_buffers[pid];
363  GPTLstop("TRANSFER_DATA_ALLOC_SENDARR");
364  }else{
365  GPTLstart("TRANSFER_DATA_SENDARR_SUB");
366  // Use the shared buffer since the data is sent before its next use
367  sendarr = Kokkos::subview(sendarr_shared, Kokkos::make_pair(0,send_plan.cnts(pid)), Kokkos::ALL());
368  GPTLstop("TRANSFER_DATA_SENDARR_SUB");
369  }
370 
371  GPTLstart("TRANSFER_DATA_LOAD_SEND_BUFFER");
372  // Fill send buffer
373  load_buffer(send_plan.cnts(pid), 0, send_plan.displs(pid), sendarr, arrays...);
374  GPTLstop("TRANSFER_DATA_LOAD_SEND_BUFFER");
375 
376  if(!async){
377  // Wait for signal, and then send
378  int ssignal;
379  MPI_Recv(&ssignal, 1, MPI_INTEGER, pid, SIGNAL_TAG, comm, MPI_STATUS_IGNORE);
380  }
381  GPTLstart("TRANSFER_DATA_SEND");
382  if(async){
383  MPI_Isend(sendarr.data(), sendarr.size(), MPI_DOUBLE, pid, MSG_TAG, comm, &(vertex_buffer.srequests[pid]));
384  vertex_buffer.sreq_assigned(pid) = true;
385  }else{
386  MPI_Rsend(sendarr.data(), sendarr.size(), MPI_DOUBLE, pid, MSG_TAG, comm);
387  }
388  GPTLstop("TRANSFER_DATA_SEND");
389  }
390 
391  if(!async){
392  // wait for messages here
393  if (recv_plan.cnts(pid) > 0){
394  // Note: since using XOR ordering and flow control, no reason to delay the wait
395  MPI_Wait(&(vertex_buffer.rrequests[pid]), MPI_STATUS_IGNORE);
396  }
397  }
398  }
399  }
400 
401  return vertex_buffer;
402 }
403 
404 #endif
KOKKOS_INLINE_FUNCTION int n_species() const
Definition: vgrid_distribution.hpp:187
void resize_n_vertices(int new_n_nodes)
Definition: vgrid_distribution.hpp:241
KOKKOS_INLINE_FUNCTION double & pull_node_index(int inode, int ip) const
Definition: vgrid_distribution.hpp:232
KOKKOS_INLINE_FUNCTION int n_vr() const
Definition: vgrid_distribution.hpp:191
KOKKOS_INLINE_FUNCTION int n_vz() const
Definition: vgrid_distribution.hpp:199
void parallel_for(const std::string name, int n_ptl, Function func, Option option, HostAoSoA aosoa_h, DeviceAoSoA aosoa_d)
Definition: streamed_parallel_for.hpp:252
Kokkos::ViewAllocateWithoutInitializing NoInit
Definition: space_settings.hpp:69
Definition: domain_decomposition.hpp:15
View< int *, CLayout, HostType > displs
Definition: domain_decomposition.hpp:17
View< int *, CLayout, HostType > cnts
Definition: domain_decomposition.hpp:16
int my_rank
Definition: domain_decomposition.hpp:18
Definition: my_mpi.hpp:19
int n_plane_ranks
Definition: my_mpi.hpp:40
int my_plane_rank
Definition: my_mpi.hpp:39
MPI_Comm plane_comm
Definition: my_mpi.hpp:38
Definition: transfer_vertex_data.hpp:225
VertexBuffer()
Definition: transfer_vertex_data.hpp:234
void unload(int vertex_offset, const Vs &... arrays) const
Definition: transfer_vertex_data.hpp:250
std::vector< View< double **, CLayout, Device > > send_buffers
Definition: transfer_vertex_data.hpp:230
VertexBuffer(int nnode_in, int n_dbl_per_vertex, int n_ranks)
Definition: transfer_vertex_data.hpp:236
std::vector< MPI_Request > srequests
Definition: transfer_vertex_data.hpp:231
void await_sends()
Definition: transfer_vertex_data.hpp:269
View< bool *, CLayout, HostType > sreq_assigned
Definition: transfer_vertex_data.hpp:232
View< bool *, CLayout, HostType > rreq_assigned
Definition: transfer_vertex_data.hpp:228
View< double **, CLayout, Device > view
Definition: transfer_vertex_data.hpp:226
int nnode() const
Definition: transfer_vertex_data.hpp:257
std::vector< MPI_Request > rrequests
Definition: transfer_vertex_data.hpp:227
void await_recvs()
Definition: transfer_vertex_data.hpp:261
static int GPTLstart(const char *name)
Definition: timer_macro.hpp:9
static int GPTLstop(const char *name)
Definition: timer_macro.hpp:10
void load_buffer(int n, int new_offset, int old_offset, const View< double **, CLayout, HostType > &buffer, const Vs &... arrays)
Definition: transfer_vertex_data.hpp:152
int total_n_doubles_per_vertex(const V &first)
Definition: transfer_vertex_data.hpp:43
void load_arrays(int n, int new_offset, int old_offset, const View< double **, CLayout, HostType > &buffer, int &arr_offset, const V &array)
Definition: transfer_vertex_data.hpp:114
int get_sum_counts(const DistributionPlan &plan)
Definition: transfer_vertex_data.hpp:220
bool do_prepost_receive_requests(const DistributionPlan &plan)
Definition: transfer_vertex_data.hpp:199
void unload_arrays(int n, int vert_offset, const View< double **, CLayout, HostType > &buffer, int &arr_offset, const V &array)
Definition: transfer_vertex_data.hpp:165
int pair(int np, int p, int k)
Definition: transfer_vertex_data.hpp:6
KOKKOS_INLINE_FUNCTION double & vertex_access(const T &array, int i, int ip)
void resize_n_vertices(int nnodes, T &array)
VertexBuffer< HostType > transfer_data(const DistributionPlan &send_plan, const DistributionPlan &recv_plan, const MyMPI &mpi, bool async, const Vs &... arrays)
Definition: transfer_vertex_data.hpp:282
int n_doubles_per_vertex(const T &array)
int get_max_buf_size(const DistributionPlan &plan)
Definition: transfer_vertex_data.hpp:210
T::value_type sum_view(const T &view)
Definition: view_arithmetic.hpp:135