diff --git a/include/internal/comm_routines.h b/include/internal/comm_routines.h index 321be0e..9d7ab45 100644 --- a/include/internal/comm_routines.h +++ b/include/internal/comm_routines.h @@ -95,13 +95,10 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ const std::vector& send_counts, const std::vector& send_offsets, T* recv_buff, const std::vector& recv_counts, const std::vector& recv_offsets, cudecompCommAxis comm_axis, cudaStream_t stream) { - auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto comm = comm_info.mpi_comm; auto team = comm_info.nvshmem_team; int self_rank = comm_info.rank; - auto aux_stream = handle->streams[handle->device_p2p_ce_count]; - - // Enforce sync dependency between transpose operations - CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->nvshmem_sync_event)); // Event dependency on external stream for intra-group transfers CHECK_CUDA(cudaEventRecord(grid_desc->events[0], stream)); @@ -109,9 +106,15 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0)); } + // Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency + CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event)); + CHECK_MPI(MPI_Barrier(comm)); + // nvshmemx_team_sync_on_stream(team, stream); + cudecompNvshmemA2AParams params; // Inter-group transfers (non-blocking) + bool need_quiet = false; params.send_buff = send_buff; params.recv_buff = recv_buff; int count = 0; @@ -131,11 +134,13 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ params.ntransfers = count; cudecomp_nvshmem_alltoallv(params, stream); count = 0; + need_quiet = true; } } if (count != 0) { params.ntransfers = count; cudecomp_nvshmem_alltoallv(params, stream); + need_quiet = true; } // Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency) @@ -146,19 +151,19 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); if (nvshmem_ptr(recv_buff, dst_rank_global)) { - if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 && count != 0 && + if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 && count % CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ == 0) { // For single group, single P2P CE (e.g. NVSwitch), synchronize NVSHMEM team every // CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ transfers This helps reduce CE contention due to accumulation of // jitter. for (int i = 0; i < handle->device_p2p_ce_count; ++i) { CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[i])); - CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->events[0], 0)); + CHECK_CUDA(cudaStreamWaitEvent(handle->streams[handle->device_p2p_ce_count], grid_desc->events[0], 0)); } - nvshmemx_team_sync_on_stream(team, aux_stream); + nvshmemx_team_sync_on_stream(team, handle->streams[handle->device_p2p_ce_count]); - CHECK_CUDA(cudaEventRecord(grid_desc->events[0], aux_stream)); + CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[handle->device_p2p_ce_count])); for (int i = 0; i < handle->device_p2p_ce_count; ++i) { CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0)); } @@ -181,7 +186,12 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_ CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[0], 0)); } - nvshmemx_barrier_on_stream(team, stream); + if (need_quiet) { nvshmemx_quiet_on_stream(stream); } + + // Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency + CHECK_CUDA(cudaStreamSynchronize(stream)); + CHECK_MPI(MPI_Barrier(comm)); + // nvshmemx_team_sync_on_stream(team, stream); } #endif @@ -227,7 +237,7 @@ static void cudecompAlltoall(const cudecompHandle_t& handle, const cudecompGridD #endif } case CUDECOMP_TRANSPOSE_COMM_NCCL: { - auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; // For fully intra-group alltoall, use distinct NCCL local comm instead of global comm as it is faster. auto comm = (comm_info.ngroups == 1) ? *grid_desc->nccl_local_comm : *grid_desc->nccl_comm; @@ -357,7 +367,7 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc const std::vector& recv_offsets, const std::vector& recv_offsets_nvshmem, cudecompCommAxis comm_axis, const std::vector& src_ranks, const std::vector& dst_ranks, cudaStream_t stream, - cudecompTransposePerformanceSample* current_sample = nullptr) { + bool& synced, cudecompTransposePerformanceSample* current_sample = nullptr) { // If there are no transfers to complete, quick return if (send_counts.size() == 0 && recv_counts.size() == 0) { return; } @@ -394,17 +404,14 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc case CUDECOMP_TRANSPOSE_COMM_NVSHMEM_PL: { #ifdef ENABLE_NVSHMEM if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) { - auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; + auto comm = + (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.mpi_comm : grid_desc->col_comm_info.mpi_comm; + // auto team = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.nvshmem_team + // : grid_desc->col_comm_info.nvshmem_team; auto pl_stream = handle->streams[0]; - auto aux_stream = handle->streams[handle->device_p2p_ce_count]; int self_rank = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.rank : grid_desc->col_comm_info.rank; - // Enforce sync dependency between transpose operations - CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->nvshmem_sync_event)); - - bool need_quiet = false; - - // Inter-group transfers and self-copy (non-blocking) + bool barrier = false; for (int i = 0; i < src_ranks.size(); ++i) { int src_rank = src_ranks[i]; int dst_rank = dst_ranks[i]; @@ -414,44 +421,39 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc CHECK_CUDA(cudaMemcpyAsync(recv_buff + recv_offsets_nvshmem[self_rank], send_buff + send_offsets[self_rank], send_counts[self_rank] * sizeof(T), cudaMemcpyDeviceToDevice, stream)); } else { - int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); - if (nvshmem_ptr(recv_buff, dst_rank_global)) { continue; } - CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0)); + if (!synced) { + // Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency + CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event)); + CHECK_MPI(MPI_Barrier(comm)); + // Only need to sync on the first remote operation of an alltoall sequence to ensure reads on other ranks + // from previous communication have completed. + synced = true; + } - nvshmemx_putmem_signal_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], - send_buff + send_offsets[dst_rank], send_counts[dst_rank] * sizeof(T), - &comm_info.nvshmem_signals[comm_info.rank], 1, NVSHMEM_SIGNAL_SET, - dst_rank_global, pl_stream); + int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); + nvshmemx_putmem_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank], + send_counts[dst_rank] * sizeof(T), dst_rank_global, pl_stream); - need_quiet = true; + barrier = true; } } - // Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency) - for (int i = 0; i < src_ranks.size(); ++i) { - int src_rank = src_ranks[i]; - int dst_rank = dst_ranks[i]; - - int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank); - if (!nvshmem_ptr(recv_buff, dst_rank_global) || src_rank == self_rank) { continue; } - - CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0)); - - nvshmemx_putmem_signal_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank], - send_counts[dst_rank] * sizeof(T), &comm_info.nvshmem_signals[comm_info.rank], - 1, NVSHMEM_SIGNAL_SET, dst_rank_global, pl_stream); - } - - if (need_quiet) { nvshmemx_quiet_on_stream(pl_stream); } - for (int i = 0; i < src_ranks.size(); ++i) { - int src_rank = src_ranks[i]; - int dst_rank = dst_ranks[i]; - if (src_rank != self_rank) { - nvshmemx_signal_wait_until_on_stream(&comm_info.nvshmem_signals[src_rank], NVSHMEM_CMP_EQ, 1, pl_stream); - CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream)); - CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0)); - } + if (barrier) { + nvshmemx_quiet_on_stream(pl_stream); + // Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency + CHECK_CUDA(cudaStreamSynchronize(pl_stream)); + CHECK_MPI(MPI_Barrier(comm)); + + // nvshmemx_team_sync_on_stream(team, pl_stream); + // for (int i = 0; i < src_ranks.size(); ++i) { + // int src_rank = src_ranks[i]; + // int dst_rank = dst_ranks[i]; + // if (src_rank != self_rank) { + // CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream)); + // CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0)); + // } + //} } break; } else { @@ -592,7 +594,8 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG case CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING: { #ifdef ENABLE_NVSHMEM if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) { - nvshmemx_barrier_all_on_stream(stream); + nvshmemx_quiet_on_stream(stream); + nvshmemx_sync_all_on_stream(stream); for (int i = 0; i < send_counts.size(); ++i) { if (peer_ranks[i] == handle->rank) { // Self-copy with cudaMemcpy @@ -605,12 +608,14 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG } } if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING) { - nvshmemx_barrier_all_on_stream(stream); + nvshmemx_quiet_on_stream(stream); + nvshmemx_sync_all_on_stream(stream); } } if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM) { - nvshmemx_barrier_all_on_stream(stream); + nvshmemx_quiet_on_stream(stream); + nvshmemx_sync_all_on_stream(stream); }; break; } else { diff --git a/include/internal/common.h b/include/internal/common.h index f11e9ca..d03de7a 100644 --- a/include/internal/common.h +++ b/include/internal/common.h @@ -127,7 +127,6 @@ struct cudecompCommInfo { #ifdef ENABLE_NVSHMEM nvshmem_team_t nvshmem_team = NVSHMEM_TEAM_INVALID; - uint64_t* nvshmem_signals = nullptr; #endif bool mnnvl_active = false; // flag to indicate whether communicator has MNNVL connections diff --git a/include/internal/transpose.h b/include/internal/transpose.h index 84cbe77..2b24a5a 100644 --- a/include/internal/transpose.h +++ b/include/internal/transpose.h @@ -236,26 +236,12 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c T* o2 = work + pinfo_a.size; T* o3 = output; -#ifdef ENABLE_NVSHMEM if (transposeBackendRequiresNvshmem(grid_desc->config.transpose_comm_backend)) { auto max_pencil_size_a = getGlobalMaxPencilSize(handle, grid_desc, ax_a); o2 = work + max_pencil_size_a; - - // NVSHMEM team synchronization between transpose operations - if (splits_a.size() != 1) { - auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info; - auto team = comm_info.nvshmem_team; - auto aux_stream = handle->streams[handle->device_p2p_ce_count]; - CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream)); - CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->nvshmem_sync_event)); - // Zero out signal buffer for this team here. - CHECK_CUDA(cudaMemsetAsync(comm_info.nvshmem_signals, 0, comm_info.nranks * sizeof(uint64_t), aux_stream)); - nvshmemx_team_sync_on_stream(team, aux_stream); - CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, aux_stream)); - // Delay final stream wait dependency to alltoall to ensure sync runs concurrently with initial transpose/pack - } + // Record event at start of transpose op for NVSHMEM team synchronization + CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream)); } -#endif cudecompTransposePerformanceSample* current_sample = nullptr; if (handle->performance_report_enable) { @@ -623,6 +609,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c } if (pipelined) { + bool nvshmem_synced = false; for (int j = 0; j < splits_b.size(); ++j) { int src_rank, dst_rank; getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank); @@ -652,7 +639,8 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, + current_sample); } if (o2 != o3) { @@ -716,6 +704,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (i > 0) { strides_out[i] = strides_out[i - 1] * extents_h[i - 1]; } } + bool nvshmem_synced = false; for (int j = 0; j < splits_b.size(); ++j) { int src_rank, dst_rank; getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank); @@ -746,7 +735,8 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, + current_sample); } } @@ -770,6 +760,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c } } else { // Unpack + bool nvshmem_synced = false; int memcpy_count = 0; cudecompBatchedD2DMemcpy3DParams memcpy_params; for (int j = 0; j < splits_a.size(); ++j) { @@ -802,7 +793,8 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c if (o2 != o1) { cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets, - recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample); + recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced, + current_sample); } } diff --git a/src/autotune.cc b/src/autotune.cc index d4872c8..f970b55 100644 --- a/src/autotune.cc +++ b/src/autotune.cc @@ -275,14 +275,6 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); - grid_desc->row_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); - grid_desc->col_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); #endif } @@ -459,8 +451,6 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d #ifdef ENABLE_NVSHMEM nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); - nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); - nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; #endif @@ -701,14 +691,6 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc, nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); - grid_desc->row_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); - grid_desc->col_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); #endif } @@ -822,8 +804,6 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc, #ifdef ENABLE_NVSHMEM nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); - nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); - nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID; #endif diff --git a/src/cudecomp.cc b/src/cudecomp.cc index 9daa234..5df3435 100644 --- a/src/cudecomp.cc +++ b/src/cudecomp.cc @@ -764,14 +764,6 @@ cudecompResult_t cudecompGridDescCreate(cudecompHandle_t handle, cudecompGridDes nvshmem_team_config_t tmp; nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0, &grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team); - grid_desc->row_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t))); - grid_desc->col_comm_info.nvshmem_signals = - (uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t)); - CHECK_CUDA( - cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t))); handle->n_grid_descs_using_nvshmem++; } else { // Finalize nvshmem to reclaim symmetric heap memory if not used @@ -899,11 +891,9 @@ cudecompResult_t cudecompGridDescDestroy(cudecompHandle_t handle, cudecompGridDe haloBackendRequiresNvshmem(grid_desc->config.halo_comm_backend)) { if (grid_desc->row_comm_info.nvshmem_team != NVSHMEM_TEAM_INVALID) { nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team); - nvshmem_free(grid_desc->row_comm_info.nvshmem_signals); } if (grid_desc->col_comm_info.nvshmem_team != NVSHMEM_TEAM_INVALID) { nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team); - nvshmem_free(grid_desc->col_comm_info.nvshmem_signals); } handle->n_grid_descs_using_nvshmem--;