diff --git a/include/internal/comm_routines.h b/include/internal/comm_routines.h index 9d7ab45..321be0e 100644 --- a/include/internal/comm_routines.h +++ b/include/internal/comm_routines.h @@ -95,10 +95,13 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ const std::vector& send_counts, const std::vector& send_offsets, T* recv_buff, const std::vector& recv_counts, const std::vector& recv_offsets, cudecompCommAxis comm_axis, cudaStream_t stream) { - auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; - auto comm = comm_info.mpi_comm; + auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; auto team = comm_info.nvshmem_team; int self_rank = comm_info.rank; + auto aux_stream = handle->streams[handle->device_p2p_ce_count]; + + // Enforce sync dependency between transpose operations + CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->nvshmem_sync_event)); // Event dependency on external stream for intra-group transfers CHECK_CUDA(cudaEventRecord(grid_desc->events[0], stream)); @@ -106,15 +109,9 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0)); } - // Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency - CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event)); - CHECK_MPI(MPI_Barrier(comm)); - // nvshmemx_team_sync_on_stream(team, stream); - cudecompNvshmemA2AParams params; // Inter-group transfers (non-blocking) - bool need_quiet = false; params.send_buff = send_buff; params.recv_buff = recv_buff; int count = 0; @@ -134,13 +131,11 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ params.ntransfers = count; cudecomp_nvshmem_alltoallv(params, stream); count = 0; - need_quiet = true; } } if (count != 0) { params.ntransfers = count; cudecomp_nvshmem_alltoallv(params, stream); - need_quiet = true; } // Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency) @@ -151,19 +146,19 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); if (nvshmem_ptr(recv_buff, dst_rank_global)) { - if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 && + if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 && count != 0 && count % CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ == 0) { // For single group, single P2P CE (e.g. NVSwitch), synchronize NVSHMEM team every // CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ transfers This helps reduce CE contention due to accumulation of // jitter. for (int i = 0; i < handle->device_p2p_ce_count; ++i) { CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[i])); - CHECK_CUDA(cudaStreamWaitEvent(handle->streams[handle->device_p2p_ce_count], grid_desc->events[0], 0)); + CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->events[0], 0)); } - nvshmemx_team_sync_on_stream(team, handle->streams[handle->device_p2p_ce_count]); + nvshmemx_team_sync_on_stream(team, aux_stream); - CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[handle->device_p2p_ce_count])); + CHECK_CUDA(cudaEventRecord(grid_desc->events[0], aux_stream)); for (int i = 0; i < handle->device_p2p_ce_count; ++i) { CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0)); } @@ -186,12 +181,7 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[0], 0)); } - if (need_quiet) { nvshmemx_quiet_on_stream(stream); } - - // Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency - CHECK_CUDA(cudaStreamSynchronize(stream)); - CHECK_MPI(MPI_Barrier(comm)); - // nvshmemx_team_sync_on_stream(team, stream); + nvshmemx_barrier_on_stream(team, stream); } #endif @@ -237,7 +227,7 @@ static void cudecompAlltoall(const cudecompHandle_t& handle, const cudecompGridD #endif } case CUDECOMP_TRANSPOSE_COMM_NCCL: { - auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; // For fully intra-group alltoall, use distinct NCCL local comm instead of global comm as it is faster. auto comm = (comm_info.ngroups == 1) ? *grid_desc->nccl_local_comm : *grid_desc->nccl_comm; @@ -367,7 +357,7 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc const std::vector& recv_offsets, const std::vector& recv_offsets_nvshmem, cudecompCommAxis comm_axis, const std::vector& src_ranks, const std::vector& dst_ranks, cudaStream_t stream, - bool& synced, cudecompTransposePerformanceSample* current_sample = nullptr) { + cudecompTransposePerformanceSample* current_sample = nullptr) { // If there are no transfers to complete, quick return if (send_counts.size() == 0 && recv_counts.size() == 0) { return; } @@ -404,14 +394,17 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc case CUDECOMP_TRANSPOSE_COMM_NVSHMEM_PL: { #ifdef ENABLE_NVSHMEM if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) { - auto comm = - (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.mpi_comm : grid_desc->col_comm_info.mpi_comm; - // auto team = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.nvshmem_team - // : grid_desc->col_comm_info.nvshmem_team; + auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; auto pl_stream = handle->streams[0]; + auto aux_stream = handle->streams[handle->device_p2p_ce_count]; int self_rank = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.rank : grid_desc->col_comm_info.rank; - bool barrier = false; + // Enforce sync dependency between transpose operations + CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->nvshmem_sync_event)); + + bool need_quiet = false; + + // Inter-group transfers and self-copy (non-blocking) for (int i = 0; i < src_ranks.size(); ++i) { int src_rank = src_ranks[i]; int dst_rank = dst_ranks[i]; @@ -421,39 +414,44 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc CHECK_CUDA(cudaMemcpyAsync(recv_buff + recv_offsets_nvshmem[self_rank], send_buff + send_offsets[self_rank], send_counts[self_rank] * sizeof(T), cudaMemcpyDeviceToDevice, stream)); } else { + int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); + if (nvshmem_ptr(recv_buff, dst_rank_global)) { continue; } + CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0)); - if (!synced) { - // Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency - CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event)); - CHECK_MPI(MPI_Barrier(comm)); - // Only need to sync on the first remote operation of an alltoall sequence to ensure reads on other ranks - // from previous communication have completed. - synced = true; - } - int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); - nvshmemx_putmem_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank], - send_counts[dst_rank] * sizeof(T), dst_rank_global, pl_stream); + nvshmemx_putmem_signal_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], + send_buff + send_offsets[dst_rank], send_counts[dst_rank] * sizeof(T), + &comm_info.nvshmem_signals[comm_info.rank], 1, NVSHMEM_SIGNAL_SET, + dst_rank_global, pl_stream); - barrier = true; + need_quiet = true; } } - if (barrier) { - nvshmemx_quiet_on_stream(pl_stream); - // Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency - CHECK_CUDA(cudaStreamSynchronize(pl_stream)); - CHECK_MPI(MPI_Barrier(comm)); - - // nvshmemx_team_sync_on_stream(team, pl_stream); - // for (int i = 0; i < src_ranks.size(); ++i) { - // int src_rank = src_ranks[i]; - // int dst_rank = dst_ranks[i]; - // if (src_rank != self_rank) { - // CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream)); - // CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0)); - // } - //} + // Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency) + for (int i = 0; i < src_ranks.size(); ++i) { + int src_rank = src_ranks[i]; + int dst_rank = dst_ranks[i]; + + int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); + if (!nvshmem_ptr(recv_buff, dst_rank_global) || src_rank == self_rank) { continue; } + + CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0)); + + nvshmemx_putmem_signal_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank], + send_counts[dst_rank] * sizeof(T), &comm_info.nvshmem_signals[comm_info.rank], + 1, NVSHMEM_SIGNAL_SET, dst_rank_global, pl_stream); + } + + if (need_quiet) { nvshmemx_quiet_on_stream(pl_stream); } + for (int i = 0; i < src_ranks.size(); ++i) { + int src_rank = src_ranks[i]; + int dst_rank = dst_ranks[i]; + if (src_rank != self_rank) { + nvshmemx_signal_wait_until_on_stream(&comm_info.nvshmem_signals[src_rank], NVSHMEM_CMP_EQ, 1, pl_stream); + CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream)); + CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0)); + } } break; } else { @@ -594,8 +592,7 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG case CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING: { #ifdef ENABLE_NVSHMEM if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) { - nvshmemx_quiet_on_stream(stream); - nvshmemx_sync_all_on_stream(stream); + nvshmemx_barrier_all_on_stream(stream); for (int i = 0; i < send_counts.size(); ++i) { if (peer_ranks[i] == handle->rank) { // Self-copy with cudaMemcpy @@ -608,14 +605,12 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG } } if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING) { - nvshmemx_quiet_on_stream(stream); - nvshmemx_sync_all_on_stream(stream); + nvshmemx_barrier_all_on_stream(stream); } } if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM) { - nvshmemx_quiet_on_stream(stream); - nvshmemx_sync_all_on_stream(stream); + nvshmemx_barrier_all_on_stream(stream); }; break; } else { diff --git a/include/internal/common.h b/include/internal/common.h index cd12caf..1ae2786 100644 --- a/include/internal/common.h +++ b/include/internal/common.h @@ -127,6 +127,7 @@ struct cudecompCommInfo { #ifdef ENABLE_NVSHMEM nvshmem_team_t nvshmem_team = NVSHMEM_TEAM_INVALID; + uint64_t* nvshmem_signals = nullptr; #endif }; diff --git a/include/internal/transpose.h b/include/internal/transpose.h index ddacb76..c10ee3c 100644 --- a/include/internal/transpose.h +++ b/include/internal/transpose.h @@ -236,12 +236,26 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c T* o2 = work + pinfo_a.size; T* o3 = output; +#ifdef ENABLE_NVSHMEM if (transposeBackendRequiresNvshmem(grid_desc->config.transpose_comm_backend)) { auto max_pencil_size_a = getGlobalMaxPencilSize(handle, grid_desc, ax_a); o2 = work + max_pencil_size_a; - // Record event at start of transpose op for NVSHMEM team synchronization - CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream)); + + // NVSHMEM team synchronization between transpose operations + if (splits_a.size() != 1) { + auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto team = comm_info.nvshmem_team; + auto aux_stream = handle->streams[handle->device_p2p_ce_count]; + CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream)); + CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->nvshmem_sync_event)); + // Zero out signal buffer for this team here. + CHECK_CUDA(cudaMemsetAsync(comm_info.nvshmem_signals, 0, comm_info.nranks * sizeof(uint64_t), aux_stream)); + nvshmemx_team_sync_on_stream(team, aux_stream); + CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, aux_stream)); + // Delay final stream wait dependency to alltoall to ensure sync runs concurrently with initial transpose/pack + } } +#endif cudecompTransposePerformanceSample* current_sample = nullptr; if (handle->performance_report_enable) { @@ -604,7 +618,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c } if (pipelined) { - bool nvshmem_synced = false; for (int j = 0; j < splits_b.size(); ++j) { int src_rank, dst_rank; getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank); @@ -634,8 +647,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, - current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); } if (o2 != o3) { @@ -699,7 +711,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (i > 0) { strides_out[i] = strides_out[i - 1] * extents_h[i - 1]; } } - bool nvshmem_synced = false; for (int j = 0; j < splits_b.size(); ++j) { int src_rank, dst_rank; getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank); @@ -730,8 +741,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, - current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); } } @@ -755,7 +765,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c } } else { // Unpack - bool nvshmem_synced = false; int memcpy_count = 0; cudecompBatchedD2DMemcpy3DParams memcpy_params; for (int j = 0; j < splits_a.size(); ++j) { @@ -788,8 +797,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, - current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); } } diff --git a/src/autotune.cc b/src/autotune.cc index f970b55..d4872c8 100644 --- a/src/autotune.cc +++ b/src/autotune.cc @@ -275,6 +275,14 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); + grid_desc->row_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); + grid_desc->col_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); #endif } @@ -451,6 +459,8 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d #ifdef ENABLE_NVSHMEM nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); + nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); + nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; #endif @@ -691,6 +701,14 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc, nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); + grid_desc->row_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); + grid_desc->col_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); #endif } @@ -804,6 +822,8 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc, #ifdef ENABLE_NVSHMEM nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); + nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); + nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; #endif diff --git a/src/cudecomp.cc b/src/cudecomp.cc index 5df3435..9daa234 100644 --- a/src/cudecomp.cc +++ b/src/cudecomp.cc @@ -764,6 +764,14 @@ cudecompResult_t cudecompGridDescCreate(cudecompHandle_t handle, cudecompGridDes nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); + grid_desc->row_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); + grid_desc->col_comm_info.nvshmem_signals = + (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); + CHECK_CUDA( + cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); handle->n_grid_descs_using_nvshmem++; } else { // Finalize nvshmem to reclaim symmetric heap memory if not used @@ -891,9 +899,11 @@ cudecompResult_t cudecompGridDescDestroy(cudecompHandle_t handle, cudecompGridDe haloBackendRequiresNvshmem(grid_desc->config.halo_comm_backend)) { if (grid_desc->row_comm_info.nvshmem_team != NVSHMEM_TEAM_INVALID) { nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); + nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); } if (grid_desc->col_comm_info.nvshmem_team != NVSHMEM_TEAM_INVALID) { nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); + nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); } handle->n_grid_descs_using_nvshmem--;