1 #ifndef STREAMED_PARALLEL_FOR_HPP
2 #define STREAMED_PARALLEL_FOR_HPP
30 # error Preprocessor flag USE_STREAMS is already in use
34 #if defined(USE_GPU) && (defined(USE_CUDA) || defined(USE_HIP))
37 # define GPU_STREAM(x) x,
38 #elif defined(USE_GPU)
40 struct GPUStream {
void fence(){Kokkos::fence();} };
41 # define GPU_STREAM(x)
44 struct GPUStream {
void fence(){} };
45 # define GPU_STREAM(x)
48 inline int partition_size(
int i_partition,
int n_soa_on_device,
int n_partitions_of_device_aosoa){
50 return (n_soa_on_device + n_partitions_of_device_aosoa - i_partition - 1)/n_partitions_of_device_aosoa;
53 return (i_partition==0 ? n_soa_on_device : 0);
82 template<
typename H,
typename D>
83 StreamView(H& view_h, D& view_d,
bool stage_in_pinned_memory_in,
int n_on_device,
int n_partitions_of_device_view)
90 int i_largest_partition = 0;
91 int size_of_largest_partition =
partition_size(i_largest_partition, n_on_device, n_partitions_of_device_view);
93 cudaMallocHost((
void**)&
pinned_send[0], size_of_largest_partition*
sizeof(T));
94 cudaMallocHost((
void**)&
pinned_send[1], size_of_largest_partition*
sizeof(T));
95 cudaMallocHost((
void**)&
pinned_return[0], size_of_largest_partition*
sizeof(T));
96 cudaMallocHost((
void**)&
pinned_return[1], size_of_largest_partition*
sizeof(T));
99 ierr = hipHostMalloc((
void**)&
pinned_send[0], size_of_largest_partition*
sizeof(T));
100 ierr = hipHostMalloc((
void**)&
pinned_send[1], size_of_largest_partition*
sizeof(T));
101 ierr = hipHostMalloc((
void**)&
pinned_return[0], size_of_largest_partition*
sizeof(T));
102 ierr = hipHostMalloc((
void**)&
pinned_return[1], size_of_largest_partition*
sizeof(T));
144 template<
typename ST>
145 inline void copy_to_device(
int offset_h,
int offset_d,
int n,
int i_staged_area, ST& gpu_stream){
147 Kokkos::View<T*, HostType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_h(host_loc,n);
148 Kokkos::View<T*, DeviceType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_d(
d_view_ptr + offset_d,n);
149 Kokkos::deep_copy(
GPU_STREAM(gpu_stream) view_d, view_h);
164 template<
typename ST>
165 inline void copy_to_host(
int offset_h,
int offset_d,
int n,
int i_staged_area, ST& gpu_stream){
167 Kokkos::View<T*, HostType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_r_h(host_loc,n);
168 Kokkos::View<T*, DeviceType, Kokkos::MemoryTraits<Kokkos::Unmanaged>> view_r_d(
d_view_ptr + offset_d,n);
169 Kokkos::deep_copy(
GPU_STREAM(gpu_stream) view_r_h, view_r_d);
183 #pragma omp parallel for
184 for(
int i_p = 0; i_p<n; i_p++){
200 #pragma omp parallel for
201 for(
int i_p = 0; i_p<n; i_p++){
251 template<
typename Function,
typename HostAoSoA,
typename DeviceAoSoA>
252 void parallel_for(
const std::string name,
int n_ptl, Function func,
Option option, HostAoSoA aosoa_h, DeviceAoSoA aosoa_d){
255 const int desired_n_ptl_per_chunk = 2e6;
256 const bool stage_in_pinned_memory =
true;
258 const bool stage_in_pinned_memory =
false;
260 const bool verbose =
false;
264 auto gpu_streams = Kokkos::Experimental::partition_space(Kokkos::DefaultExecutionSpace(),1,1,1);
267 std::vector<GPUStream> gpu_streams(
NStreams);
271 int n_soa_total = aosoa_h.size()/VEC_LEN;
272 if(n_soa_total*VEC_LEN != aosoa_h.size()) {printf(
"\nERROR: streamed_parallel_for assumes the last SoA in the AoSoA is full\n"); exit(1);}
275 int n_soa_on_device = aosoa_d.size()/VEC_LEN;
276 if(n_soa_on_device*VEC_LEN != aosoa_d.size()) {printf(
"\nERROR: streamed_parallel_for assumes the last device SoA in the AoSoA is full\n"); exit(1);}
278 if(n_soa_on_device==0 && n_soa_total!=0) {printf(
"\nERROR: streamed_parallel_for requires non-zero amount of device memory\n"); exit(1);}
281 int desired_n_soa_per_chunk = desired_n_ptl_per_chunk/VEC_LEN;
282 int n_partitions_of_device_aosoa = (n_soa_on_device+desired_n_soa_per_chunk-1)/desired_n_soa_per_chunk;
283 n_partitions_of_device_aosoa = std::max(n_partitions_of_device_aosoa, 3);
286 int n_partitions_of_device_aosoa = 3;
293 std::vector<Task> tasks(
NTasks);
294 std::vector<Tasks> ordered_tasks;
296 if(stage_in_pinned_memory) ordered_tasks.push_back(
ToPinned);
297 ordered_tasks.push_back(
Send);
299 ordered_tasks.push_back(
Run);
301 ordered_tasks.push_back(
Return);
302 if(stage_in_pinned_memory) ordered_tasks.push_back(
FromPinned);
305 int n_soa_remaining = n_soa_total;
307 bool finished_all =
false;
309 while(!finished_all){
310 if (verbose) printf(
"\nStep %d", i);
312 int i_partition = i % n_partitions_of_device_aosoa;
313 int p_size =
partition_size(i_partition, n_soa_on_device, n_partitions_of_device_aosoa);
314 int n_first_op = std::min(p_size, n_soa_remaining);
315 n_soa_remaining -= n_first_op;
318 for (
int it=ordered_tasks.size()-1; it>0; it--){
319 tasks[ordered_tasks[it]].n = tasks[ordered_tasks[it-1]].n;
321 tasks[ordered_tasks[0]].n = n_first_op;
324 int i_staging_area = i%2;
325 int i_staged_area = (i+1)%2;
329 int offset_send_d = tasks[
Send].offset % n_soa_on_device;
331 if (verbose) printf(
"\n Copy (%d - %d) on host to (%d - %d) on device", tasks[
Send].offset, tasks[
Send].offset+tasks[
Send].n, offset_send_d, offset_send_d+tasks[
Send].n);
332 tasks[
Send].advance();
337 int offset_run_d = tasks[
Run].offset % n_soa_on_device;
340 int ptl_offset_d = offset_run_d*VEC_LEN;
341 int ptl_last_d = ptl_offset_d + tasks[
Run].n*VEC_LEN;
344 int n_ptl_d = n_ptl - (tasks[
Run].offset-offset_run_d)*VEC_LEN;
345 ptl_last_d = std::min(ptl_last_d, n_ptl_d);
351 Kokkos::parallel_for(name.c_str(), Kokkos::RangePolicy<ExSpace>(ptl_offset_d, ptl_last_d), func);
353 if (verbose) printf(
"\n Run (%d - %d) on device", offset_run_d, offset_run_d+tasks[
Run].n);
354 if (verbose) printf(
"\n i.e. local ptl (%d - %d), global ptl (%d - %d)", ptl_offset_d, ptl_last_d, tasks[
Run].offset*VEC_LEN, tasks[
Run].offset*VEC_LEN+(ptl_last_d-ptl_offset_d));
355 tasks[
Run].advance();
360 int offset_return_d = tasks[
Return].offset % n_soa_on_device;
362 if (verbose) printf(
"\n Copy to (%d - %d) on host from (%d - %d) on device", tasks[
Return].offset, tasks[
Return].offset+tasks[
Return].n, offset_return_d, offset_return_d+tasks[
Return].n);
370 if (verbose) printf(
"\n Copy (%d - %d) on host to pinned_send", tasks[
ToPinned].offset, tasks[
ToPinned].offset+tasks[
ToPinned].n);
388 for (
int it = 0; it<
NTasks; it++){ finished_all = finished_all && (tasks[it].n==0); }
391 if (verbose) printf(
"\nComplete in %d steps", i);
Definition: streamed_parallel_for.hpp:7
void copy_to_device(int offset_h, int offset_d, int n, int i_staged_area, ST &gpu_stream)
Definition: streamed_parallel_for.hpp:145
Definition: streamed_parallel_for.hpp:8
T * pinned_send[2]
Definition: streamed_parallel_for.hpp:69
void advance()
Definition: streamed_parallel_for.hpp:218
~StreamView()
Definition: streamed_parallel_for.hpp:113
Definition: streamed_parallel_for.hpp:207
Definition: streamed_parallel_for.hpp:9
Definition: streamed_parallel_for.hpp:62
Definition: streamed_parallel_for.hpp:22
bool stage_in_pinned_memory
Definition: streamed_parallel_for.hpp:64
Tasks
Definition: streamed_parallel_for.hpp:19
Definition: streamed_parallel_for.hpp:16
Definition: streamed_parallel_for.hpp:14
int n
Definition: streamed_parallel_for.hpp:208
Definition: streamed_parallel_for.hpp:10
Definition: streamed_parallel_for.hpp:25
void copy_from_pinned(int offset_h, int n, int i_staging_area)
Definition: streamed_parallel_for.hpp:198
Definition: streamed_parallel_for.hpp:20
Definition: streamed_parallel_for.hpp:21
T * h_view_ptr
Definition: streamed_parallel_for.hpp:66
Option
Definition: streamed_parallel_for.hpp:13
StreamView(H &view_h, D &view_d, bool stage_in_pinned_memory_in, int n_on_device, int n_partitions_of_device_view)
Definition: streamed_parallel_for.hpp:83
void copy_to_host(int offset_h, int offset_d, int n, int i_staged_area, ST &gpu_stream)
Definition: streamed_parallel_for.hpp:165
StreamJob
Definition: streamed_parallel_for.hpp:6
#define GPU_STREAM(x)
Definition: streamed_parallel_for.hpp:37
T * pinned_return[2]
Definition: streamed_parallel_for.hpp:70
Definition: streamed_parallel_for.hpp:24
int offset
Definition: streamed_parallel_for.hpp:209
void copy_to_pinned(int offset_h, int n, int i_staging_area)
Definition: streamed_parallel_for.hpp:181
Definition: streamed_parallel_for.hpp:15
Definition: streamed_parallel_for.hpp:23
int partition_size(int i_partition, int n_soa_on_device, int n_partitions_of_device_aosoa)
Definition: streamed_parallel_for.hpp:48
void parallel_for(const std::string name, int n_ptl, Function func, Option option, HostAoSoA aosoa_h, DeviceAoSoA aosoa_d)
Definition: streamed_parallel_for.hpp:252
T * d_view_ptr
Definition: streamed_parallel_for.hpp:67
Task()
Definition: streamed_parallel_for.hpp:211