XGCa
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
streamed_parallel_for.hpp
Go to the documentation of this file.
1 #ifndef STREAMED_PARALLEL_FOR_HPP
2 #define STREAMED_PARALLEL_FOR_HPP
3 
4 namespace Streamed{
5 
6 enum StreamJob{
7  Sender=0,
11 };
12 
13 enum Option{
14  NoSend=0,
17 };
18 
19 enum Tasks{
22  Run,
26 };
27 
28 // Define streams here; check that USE_STERAMS isn't defined earlier
29 #ifdef USE_STREAMS
30 # error Preprocessor flag USE_STREAMS is already in use
31 #endif
32 
33 // Currently streams only set up for Cuda and only if OpenMP is available):
34 #if defined(USE_GPU) && (defined(USE_CUDA) || defined(USE_HIP))
35 # define USE_STREAMS
36 // typedef Kokkos::Cuda GPUStream;
37 # define GPU_STREAM(x) x,
38 #elif defined(USE_GPU)
39  // If GPU in use, but not Cuda (or no OpenMP), so streams currently unavailable
40  struct GPUStream { void fence(){Kokkos::fence();} }; // Call general fence
41 # define GPU_STREAM(x)
42 #else
43  // CPU only
44  struct GPUStream { void fence(){} }; // Fake fence if not using GPUs
45 # define GPU_STREAM(x)
46 #endif
47 
48 inline int partition_size(int i_partition, int n_soa_on_device, int n_partitions_of_device_aosoa){
49 #ifdef USE_STREAMS
50  return (n_soa_on_device + n_partitions_of_device_aosoa - i_partition - 1)/n_partitions_of_device_aosoa;
51 #else
52  // If not using streams, use the full device AoSoA as the first partition, and have two empty place-holder partitions
53  return (i_partition==0 ? n_soa_on_device : 0);
54 #endif
55 }
56 
61 template<typename T>
62 struct StreamView{
63 
65 
68 
69  T* pinned_send[2];
71 
82  template<typename H,typename D>
83  StreamView(H& view_h, D& view_d, bool stage_in_pinned_memory_in, int n_on_device, int n_partitions_of_device_view)
84  : h_view_ptr((T*)view_h.data()),
85  d_view_ptr((T*)view_d.data()),
86  stage_in_pinned_memory(stage_in_pinned_memory_in)
87  {
88 #ifdef USE_STREAMS
90  int i_largest_partition = 0;
91  int size_of_largest_partition = partition_size(i_largest_partition, n_on_device, n_partitions_of_device_view);
92 #ifdef USE_CUDA
93  cudaMallocHost((void**)&pinned_send[0], size_of_largest_partition*sizeof(T));
94  cudaMallocHost((void**)&pinned_send[1], size_of_largest_partition*sizeof(T));
95  cudaMallocHost((void**)&pinned_return[0], size_of_largest_partition*sizeof(T));
96  cudaMallocHost((void**)&pinned_return[1], size_of_largest_partition*sizeof(T));
97 #else
98  int ierr;
99  ierr = hipHostMalloc((void**)&pinned_send[0], size_of_largest_partition*sizeof(T));
100  ierr = hipHostMalloc((void**)&pinned_send[1], size_of_largest_partition*sizeof(T));
101  ierr = hipHostMalloc((void**)&pinned_return[0], size_of_largest_partition*sizeof(T));
102  ierr = hipHostMalloc((void**)&pinned_return[1], size_of_largest_partition*sizeof(T));
103 #endif
104  }
105 #endif
106  }
107 
114 #ifdef USE_STREAMS
116 #ifdef USE_CUDA
117  cudaFree(pinned_send[0]);
118  cudaFree(pinned_send[1]);
119  cudaFree(pinned_return[0]);
120  cudaFree(pinned_return[1]);
121 #else
122  int ierr;
123  ierr = hipFree(pinned_send[0]);
124  ierr = hipFree(pinned_send[1]);
125  ierr = hipFree(pinned_return[0]);
126  ierr = hipFree(pinned_return[1]);
127 #endif
128  }
129 #endif
130  }
131 
144  template<typename ST>
145  inline void copy_to_device(int offset_h, int offset_d, int n, int i_staged_area, ST& gpu_stream){
146  T* host_loc = (stage_in_pinned_memory ? pinned_send[i_staged_area] : h_view_ptr + offset_h);
147  Kokkos::View<T*, HostType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_h(host_loc,n);
148  Kokkos::View<T*, DeviceType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_d(d_view_ptr + offset_d,n);
149  Kokkos::deep_copy(GPU_STREAM(gpu_stream) view_d, view_h);
150  }
151 
164  template<typename ST>
165  inline void copy_to_host(int offset_h, int offset_d, int n, int i_staged_area, ST& gpu_stream){
166  T* host_loc = (stage_in_pinned_memory ? pinned_return[i_staged_area] : h_view_ptr + offset_h);
167  Kokkos::View<T*, HostType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_r_h(host_loc,n);
168  Kokkos::View<T*, DeviceType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_r_d(d_view_ptr + offset_d,n);
169  Kokkos::deep_copy(GPU_STREAM(gpu_stream) view_r_h, view_r_d);
170  }
171 
181  inline void copy_to_pinned(int offset_h, int n, int i_staging_area){
182  //std::memcpy(pinned_send, h_view_ptr + offset_h, n*sizeof(T));
183  #pragma omp parallel for
184  for(int i_p = 0; i_p<n; i_p++){
185  pinned_send[i_staging_area][i_p] = h_view_ptr[offset_h+i_p];
186  }
187  }
188 
198  inline void copy_from_pinned(int offset_h, int n, int i_staging_area){
199  //std::memcpy(h_view_ptr + offset_h, pinned_return, n*sizeof(T));
200  #pragma omp parallel for
201  for(int i_p = 0; i_p<n; i_p++){
202  h_view_ptr[offset_h+i_p] = pinned_return[i_staging_area][i_p];
203  }
204  }
205 };
206 
207 struct Task{
208  int n;
209  int offset;
210 
211  Task() : n(0), offset(0){}
212 
218  inline void advance(){
219  offset += n;
220  }
221 };
222 
251 template<typename Function, typename HostAoSoA, typename DeviceAoSoA>
252 void parallel_for(const std::string name, int n_ptl, Function func, Option option, HostAoSoA aosoa_h, DeviceAoSoA aosoa_d){
253  // Performance options
254 #ifdef USE_STREAMS
255  const int desired_n_ptl_per_chunk = 2e6; // How many particles are needed to saturate the GPU
256  const bool stage_in_pinned_memory = true;
257 #else
258  const bool stage_in_pinned_memory = false;
259 #endif
260  const bool verbose = false;
261 
262 #ifdef USE_STREAMS
263  // Initialize streams
264  auto gpu_streams = Kokkos::Experimental::partition_space(Kokkos::DefaultExecutionSpace(),1,1,1);
265 #else
266  // If not using streams, create a dummy variable
267  std::vector<GPUStream> gpu_streams(NStreams);
268 #endif
269 
270  // Total number of SoAs that need to be run on
271  int n_soa_total = aosoa_h.size()/VEC_LEN;
272  if(n_soa_total*VEC_LEN != aosoa_h.size()) {printf("\nERROR: streamed_parallel_for assumes the last SoA in the AoSoA is full\n"); exit(1);}
273 
274  // Total amount of SoAs that can fit on the device
275  int n_soa_on_device = aosoa_d.size()/VEC_LEN;
276  if(n_soa_on_device*VEC_LEN != aosoa_d.size()) {printf("\nERROR: streamed_parallel_for assumes the last device SoA in the AoSoA is full\n"); exit(1);}
277 
278  if(n_soa_on_device==0 && n_soa_total!=0) {printf("\nERROR: streamed_parallel_for requires non-zero amount of device memory\n"); exit(1);}
279 
280 #ifdef USE_STREAMS
281  int desired_n_soa_per_chunk = desired_n_ptl_per_chunk/VEC_LEN;
282  int n_partitions_of_device_aosoa = (n_soa_on_device+desired_n_soa_per_chunk-1)/desired_n_soa_per_chunk; // ceiling
283  n_partitions_of_device_aosoa = std::max(n_partitions_of_device_aosoa, 3);
284 #else
285  // If not streaming, then there is no need to partition device AoSoA; use 3 partitions to simplify task loop
286  int n_partitions_of_device_aosoa = 3;
287 #endif
288 
289  // Set up stream view to handle streaming of the aosoa to device and back
290  StreamView<typename HostAoSoA::soa_type> stream_view(aosoa_h, aosoa_d, stage_in_pinned_memory, n_soa_on_device, n_partitions_of_device_aosoa);
291 
292  // Set up tasks
293  std::vector<Task> tasks(NTasks);
294  std::vector<Tasks> ordered_tasks;
295  if(option!=NoSend){
296  if(stage_in_pinned_memory) ordered_tasks.push_back(ToPinned);
297  ordered_tasks.push_back(Send);
298  }
299  ordered_tasks.push_back(Run);
300  if(option!=NoReturn){
301  ordered_tasks.push_back(Return);
302  if(stage_in_pinned_memory) ordered_tasks.push_back(FromPinned);
303  }
304 
305  int n_soa_remaining = n_soa_total;
306  int i = 0;
307  bool finished_all = false;
308  //GPTLstart("stream_while_loop");
309  while(!finished_all){
310  if (verbose) printf("\nStep %d", i);
311  // Determine number of particles to send in next chunk
312  int i_partition = i % n_partitions_of_device_aosoa;
313  int p_size = partition_size(i_partition, n_soa_on_device, n_partitions_of_device_aosoa);
314  int n_first_op = std::min(p_size, n_soa_remaining);
315  n_soa_remaining -= n_first_op;
316 
317  // For next chunk, execute on the number of SoAs used by the previous operation
318  for (int it=ordered_tasks.size()-1; it>0; it--){
319  tasks[ordered_tasks[it]].n = tasks[ordered_tasks[it-1]].n;
320  }
321  tasks[ordered_tasks[0]].n = n_first_op;
322 
323  // For pinned memory, alternate staging areas
324  int i_staging_area = i%2;
325  int i_staged_area = (i+1)%2;
326 
327  // Copy chunk to device by recasting as View
328  if(tasks[Send].n>0){
329  int offset_send_d = tasks[Send].offset % n_soa_on_device;
330  stream_view.copy_to_device(tasks[Send].offset, offset_send_d, tasks[Send].n, i_staged_area, gpu_streams[Sender]);
331  if (verbose) printf("\n Copy (%d - %d) on host to (%d - %d) on device", tasks[Send].offset, tasks[Send].offset+tasks[Send].n, offset_send_d, offset_send_d+tasks[Send].n);
332  tasks[Send].advance();
333  }
334 
335  // Launch parallel_for
336  if(tasks[Run].n>0){
337  int offset_run_d = tasks[Run].offset % n_soa_on_device;
338  // Since this is a GPU-only feature, the parallel_for loops over particles, not vectors
339  // Particles bounds are:
340  int ptl_offset_d = offset_run_d*VEC_LEN;
341  int ptl_last_d = ptl_offset_d + tasks[Run].n*VEC_LEN;
342 
343  // Stop at n_ptl even if size of AoSoA is larger
344  int n_ptl_d = n_ptl - (tasks[Run].offset-offset_run_d)*VEC_LEN;
345  ptl_last_d = std::min(ptl_last_d, n_ptl_d);
346 
347  // Launch parallel_for
348 #ifdef USE_STREAMS
349  Kokkos::parallel_for(name.c_str(), Kokkos::RangePolicy<ExSpace>(gpu_streams[Runner], ptl_offset_d, ptl_last_d), func);
350 #else
351  Kokkos::parallel_for(name.c_str(), Kokkos::RangePolicy<ExSpace>(ptl_offset_d, ptl_last_d), func);
352 #endif
353  if (verbose) printf("\n Run (%d - %d) on device", offset_run_d, offset_run_d+tasks[Run].n);
354  if (verbose) printf("\n i.e. local ptl (%d - %d), global ptl (%d - %d)", ptl_offset_d, ptl_last_d, tasks[Run].offset*VEC_LEN, tasks[Run].offset*VEC_LEN+(ptl_last_d-ptl_offset_d));
355  tasks[Run].advance();
356  }
357 
358  // Copy finished chunk back by recasting as View
359  if(tasks[Return].n>0){
360  int offset_return_d = tasks[Return].offset % n_soa_on_device;
361  stream_view.copy_to_host(tasks[Return].offset, offset_return_d, tasks[Return].n, i_staged_area, gpu_streams[Returner]);
362  if (verbose) printf("\n Copy to (%d - %d) on host from (%d - %d) on device", tasks[Return].offset, tasks[Return].offset+tasks[Return].n, offset_return_d, offset_return_d+tasks[Return].n);
363  tasks[Return].advance();
364  }
365 
366  // Staging in pinned
367  // Put staging at end since it is on CPU and thus would block GPU launches if it were placed before them
368  if(tasks[ToPinned].n>0){
369  stream_view.copy_to_pinned(tasks[ToPinned].offset, tasks[ToPinned].n, i_staging_area);
370  if (verbose) printf("\n Copy (%d - %d) on host to pinned_send", tasks[ToPinned].offset, tasks[ToPinned].offset+tasks[ToPinned].n);
371  tasks[ToPinned].advance();
372  }
373 
374  if(tasks[FromPinned].n>0){
375  stream_view.copy_from_pinned(tasks[FromPinned].offset, tasks[FromPinned].n, i_staging_area);
376  if (verbose) printf("\n Copy (%d - %d) to host from pinned_return", tasks[FromPinned].offset, tasks[FromPinned].offset+tasks[FromPinned].n);
377  tasks[FromPinned].advance();
378  }
379 
380  // All processes must be complete before going to the next chunk
381  Kokkos::fence();
382 
383  // Advance to next partition
384  i++;
385 
386  // Exit condition: all operations are completed
387  finished_all = true;
388  for (int it = 0; it<NTasks; it++){ finished_all = finished_all && (tasks[it].n==0); }
389  }
390  //GPTLstop("stream_while_loop");
391  if (verbose) printf("\nComplete in %d steps", i);
392 }
393 
394 } // Namespace
395 #endif
Definition: streamed_parallel_for.hpp:7
void copy_to_device(int offset_h, int offset_d, int n, int i_staged_area, ST &gpu_stream)
Definition: streamed_parallel_for.hpp:145
Definition: streamed_parallel_for.hpp:8
T * pinned_send[2]
Definition: streamed_parallel_for.hpp:69
void advance()
Definition: streamed_parallel_for.hpp:218
~StreamView()
Definition: streamed_parallel_for.hpp:113
Definition: streamed_parallel_for.hpp:207
Definition: streamed_parallel_for.hpp:9
Definition: streamed_parallel_for.hpp:62
Definition: streamed_parallel_for.hpp:22
bool stage_in_pinned_memory
Definition: streamed_parallel_for.hpp:64
Tasks
Definition: streamed_parallel_for.hpp:19
Definition: streamed_parallel_for.hpp:16
Definition: streamed_parallel_for.hpp:14
int n
Definition: streamed_parallel_for.hpp:208
Definition: streamed_parallel_for.hpp:10
Definition: streamed_parallel_for.hpp:25
void copy_from_pinned(int offset_h, int n, int i_staging_area)
Definition: streamed_parallel_for.hpp:198
Definition: streamed_parallel_for.hpp:20
Definition: streamed_parallel_for.hpp:21
T * h_view_ptr
Definition: streamed_parallel_for.hpp:66
Option
Definition: streamed_parallel_for.hpp:13
StreamView(H &view_h, D &view_d, bool stage_in_pinned_memory_in, int n_on_device, int n_partitions_of_device_view)
Definition: streamed_parallel_for.hpp:83
void copy_to_host(int offset_h, int offset_d, int n, int i_staged_area, ST &gpu_stream)
Definition: streamed_parallel_for.hpp:165
StreamJob
Definition: streamed_parallel_for.hpp:6
#define GPU_STREAM(x)
Definition: streamed_parallel_for.hpp:37
T * pinned_return[2]
Definition: streamed_parallel_for.hpp:70
Definition: streamed_parallel_for.hpp:24
int offset
Definition: streamed_parallel_for.hpp:209
void copy_to_pinned(int offset_h, int n, int i_staging_area)
Definition: streamed_parallel_for.hpp:181
Definition: streamed_parallel_for.hpp:15
Definition: streamed_parallel_for.hpp:23
int partition_size(int i_partition, int n_soa_on_device, int n_partitions_of_device_aosoa)
Definition: streamed_parallel_for.hpp:48
void parallel_for(const std::string name, int n_ptl, Function func, Option option, HostAoSoA aosoa_h, DeviceAoSoA aosoa_d)
Definition: streamed_parallel_for.hpp:252
T * d_view_ptr
Definition: streamed_parallel_for.hpp:67
Task()
Definition: streamed_parallel_for.hpp:211