Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 54 additions & 60 deletions include/internal/comm_routines.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,26 +95,23 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_
const std::vector<comm_count_t>& send_counts, const std::vector<comm_count_t>& send_offsets,
T* recv_buff, const std::vector<comm_count_t>& recv_counts,
const std::vector<comm_count_t>& recv_offsets, cudecompCommAxis comm_axis, cudaStream_t stream) {
auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
auto comm = comm_info.mpi_comm;
auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
auto team = comm_info.nvshmem_team;
int self_rank = comm_info.rank;
auto aux_stream = handle->streams[handle->device_p2p_ce_count];

// Enforce sync dependency between transpose operations
CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->nvshmem_sync_event));

// Event dependency on external stream for intra-group transfers
CHECK_CUDA(cudaEventRecord(grid_desc->events[0], stream));
for (int i = 0; i < handle->device_p2p_ce_count; ++i) {
CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0));
}

// Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency
CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event));
CHECK_MPI(MPI_Barrier(comm));
// nvshmemx_team_sync_on_stream(team, stream);

cudecompNvshmemA2AParams<T> params;

// Inter-group transfers (non-blocking)
bool need_quiet = false;
params.send_buff = send_buff;
params.recv_buff = recv_buff;
int count = 0;
Expand All @@ -134,13 +131,11 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_
params.ntransfers = count;
cudecomp_nvshmem_alltoallv(params, stream);
count = 0;
need_quiet = true;
}
}
if (count != 0) {
params.ntransfers = count;
cudecomp_nvshmem_alltoallv(params, stream);
need_quiet = true;
}

// Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency)
Expand All @@ -151,19 +146,19 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_
int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank);
if (nvshmem_ptr(recv_buff, dst_rank_global)) {

if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 &&
if (comm_info.ngroups == 1 && handle->device_p2p_ce_count == 1 && count != 0 &&
count % CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ == 0) {
// For single group, single P2P CE (e.g. NVSwitch), synchronize NVSHMEM team every
// CUDECOMP_NVSHMEM_INTRAGROUP_SYNC_FREQ transfers This helps reduce CE contention due to accumulation of
// jitter.
for (int i = 0; i < handle->device_p2p_ce_count; ++i) {
CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[i]));
CHECK_CUDA(cudaStreamWaitEvent(handle->streams[handle->device_p2p_ce_count], grid_desc->events[0], 0));
CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->events[0], 0));
}

nvshmemx_team_sync_on_stream(team, handle->streams[handle->device_p2p_ce_count]);
nvshmemx_team_sync_on_stream(team, aux_stream);

CHECK_CUDA(cudaEventRecord(grid_desc->events[0], handle->streams[handle->device_p2p_ce_count]));
CHECK_CUDA(cudaEventRecord(grid_desc->events[0], aux_stream));
for (int i = 0; i < handle->device_p2p_ce_count; ++i) {
CHECK_CUDA(cudaStreamWaitEvent(handle->streams[i], grid_desc->events[0], 0));
}
Expand All @@ -186,12 +181,7 @@ nvshmemAlltoallV(const cudecompHandle_t& handle, const cudecompGridDesc_t& grid_
CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[0], 0));
}

if (need_quiet) { nvshmemx_quiet_on_stream(stream); }

// Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency
CHECK_CUDA(cudaStreamSynchronize(stream));
CHECK_MPI(MPI_Barrier(comm));
// nvshmemx_team_sync_on_stream(team, stream);
nvshmemx_barrier_on_stream(team, stream);
}
#endif

Expand Down Expand Up @@ -237,7 +227,7 @@ static void cudecompAlltoall(const cudecompHandle_t& handle, const cudecompGridD
#endif
}
case CUDECOMP_TRANSPOSE_COMM_NCCL: {
auto comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
// For fully intra-group alltoall, use distinct NCCL local comm instead of global comm as it is faster.
auto comm = (comm_info.ngroups == 1) ? *grid_desc->nccl_local_comm : *grid_desc->nccl_comm;

Expand Down Expand Up @@ -367,7 +357,7 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc
const std::vector<comm_count_t>& recv_offsets,
const std::vector<comm_count_t>& recv_offsets_nvshmem, cudecompCommAxis comm_axis,
const std::vector<int>& src_ranks, const std::vector<int>& dst_ranks, cudaStream_t stream,
bool& synced, cudecompTransposePerformanceSample* current_sample = nullptr) {
cudecompTransposePerformanceSample* current_sample = nullptr) {

// If there are no transfers to complete, quick return
if (send_counts.size() == 0 && recv_counts.size() == 0) { return; }
Expand Down Expand Up @@ -404,14 +394,16 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc
case CUDECOMP_TRANSPOSE_COMM_NVSHMEM_PL: {
#ifdef ENABLE_NVSHMEM
if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) {
auto comm =
(comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.mpi_comm : grid_desc->col_comm_info.mpi_comm;
// auto team = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.nvshmem_team
// : grid_desc->col_comm_info.nvshmem_team;
auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
auto pl_stream = handle->streams[0];
int self_rank = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info.rank : grid_desc->col_comm_info.rank;

bool barrier = false;
// Enforce sync dependency between transpose operations
CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->nvshmem_sync_event));

bool need_quiet = false;

// Inter-group transfers and self-copy (non-blocking)
for (int i = 0; i < src_ranks.size(); ++i) {
int src_rank = src_ranks[i];
int dst_rank = dst_ranks[i];
Expand All @@ -421,39 +413,44 @@ cudecompAlltoallPipelined(const cudecompHandle_t& handle, const cudecompGridDesc
CHECK_CUDA(cudaMemcpyAsync(recv_buff + recv_offsets_nvshmem[self_rank], send_buff + send_offsets[self_rank],
send_counts[self_rank] * sizeof(T), cudaMemcpyDeviceToDevice, stream));
} else {
int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank);
if (nvshmem_ptr(recv_buff, dst_rank_global)) { continue; }

CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0));
if (!synced) {
// Using cudaEventSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency
CHECK_CUDA(cudaEventSynchronize(grid_desc->nvshmem_sync_event));
CHECK_MPI(MPI_Barrier(comm));
// Only need to sync on the first remote operation of an alltoall sequence to ensure reads on other ranks
// from previous communication have completed.
synced = true;
}

int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank);
nvshmemx_putmem_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank],
send_counts[dst_rank] * sizeof(T), dst_rank_global, pl_stream);
nvshmemx_putmem_signal_nbi_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank],
send_buff + send_offsets[dst_rank], send_counts[dst_rank] * sizeof(T),
&comm_info.nvshmem_signals[comm_info.rank], 1, NVSHMEM_SIGNAL_SET,
dst_rank_global, pl_stream);

barrier = true;
need_quiet = true;
}
}

if (barrier) {
nvshmemx_quiet_on_stream(pl_stream);
// Using cudaStreamSynchronize + barrier instead of nvshmemx_team_sync_on_stream for lower latency
CHECK_CUDA(cudaStreamSynchronize(pl_stream));
CHECK_MPI(MPI_Barrier(comm));

// nvshmemx_team_sync_on_stream(team, pl_stream);
// for (int i = 0; i < src_ranks.size(); ++i) {
// int src_rank = src_ranks[i];
// int dst_rank = dst_ranks[i];
// if (src_rank != self_rank) {
// CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream));
// CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0));
// }
//}
// Intra-group transfers (blocking, scheduled after non-blocking inter-group transfers for concurrency)
for (int i = 0; i < src_ranks.size(); ++i) {
int src_rank = src_ranks[i];
int dst_rank = dst_ranks[i];

int dst_rank_global = getGlobalRank(handle, grid_desc, comm_axis, dst_rank);
if (!nvshmem_ptr(recv_buff, dst_rank_global) || src_rank == self_rank) { continue; }

CHECK_CUDA(cudaStreamWaitEvent(pl_stream, grid_desc->events[dst_rank], 0));

nvshmemx_putmem_signal_on_stream(recv_buff + recv_offsets_nvshmem[dst_rank], send_buff + send_offsets[dst_rank],
send_counts[dst_rank] * sizeof(T), &comm_info.nvshmem_signals[comm_info.rank],
1, NVSHMEM_SIGNAL_SET, dst_rank_global, pl_stream);
}

if (need_quiet) { nvshmemx_quiet_on_stream(pl_stream); }
for (int i = 0; i < src_ranks.size(); ++i) {
int src_rank = src_ranks[i];
int dst_rank = dst_ranks[i];
if (src_rank != self_rank) {
nvshmemx_signal_wait_until_on_stream(&comm_info.nvshmem_signals[src_rank], NVSHMEM_CMP_EQ, 1, pl_stream);
CHECK_CUDA(cudaEventRecord(grid_desc->events[dst_rank], pl_stream));
CHECK_CUDA(cudaStreamWaitEvent(stream, grid_desc->events[dst_rank], 0));
}
}
break;
} else {
Expand Down Expand Up @@ -594,8 +591,7 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG
case CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING: {
#ifdef ENABLE_NVSHMEM
if (nvshmem_ptr(send_buff, handle->rank) && nvshmem_ptr(recv_buff, handle->rank)) {
nvshmemx_quiet_on_stream(stream);
nvshmemx_sync_all_on_stream(stream);
nvshmemx_barrier_all_on_stream(stream);
for (int i = 0; i < send_counts.size(); ++i) {
if (peer_ranks[i] == handle->rank) {
// Self-copy with cudaMemcpy
Expand All @@ -608,14 +604,12 @@ static void cudecompSendRecvPair(const cudecompHandle_t& handle, const cudecompG
}
}
if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM_BLOCKING) {
nvshmemx_quiet_on_stream(stream);
nvshmemx_sync_all_on_stream(stream);
nvshmemx_barrier_all_on_stream(stream);
}
}

if (grid_desc->config.halo_comm_backend == CUDECOMP_HALO_COMM_NVSHMEM) {
nvshmemx_quiet_on_stream(stream);
nvshmemx_sync_all_on_stream(stream);
nvshmemx_barrier_all_on_stream(stream);
};
break;
} else {
Expand Down
1 change: 1 addition & 0 deletions include/internal/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ struct cudecompCommInfo {

#ifdef ENABLE_NVSHMEM
nvshmem_team_t nvshmem_team = NVSHMEM_TEAM_INVALID;
uint64_t* nvshmem_signals = nullptr;
#endif

bool mnnvl_active = false; // flag to indicate whether communicator has MNNVL connections
Expand Down
30 changes: 19 additions & 11 deletions include/internal/transpose.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,26 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c
T* o2 = work + pinfo_a.size;
T* o3 = output;

#ifdef ENABLE_NVSHMEM
if (transposeBackendRequiresNvshmem(grid_desc->config.transpose_comm_backend)) {
auto max_pencil_size_a = getGlobalMaxPencilSize(handle, grid_desc, ax_a);
o2 = work + max_pencil_size_a;
// Record event at start of transpose op for NVSHMEM team synchronization
CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream));

// NVSHMEM team synchronization between transpose operations
if (splits_a.size() != 1) {
auto& comm_info = (comm_axis == CUDECOMP_COMM_ROW) ? grid_desc->row_comm_info : grid_desc->col_comm_info;
auto team = comm_info.nvshmem_team;
auto aux_stream = handle->streams[handle->device_p2p_ce_count];
CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, stream));
CHECK_CUDA(cudaStreamWaitEvent(aux_stream, grid_desc->nvshmem_sync_event));
// Zero out signal buffer for this team here.
CHECK_CUDA(cudaMemsetAsync(comm_info.nvshmem_signals, 0, comm_info.nranks * sizeof(uint64_t), aux_stream));
nvshmemx_team_sync_on_stream(team, aux_stream);
CHECK_CUDA(cudaEventRecord(grid_desc->nvshmem_sync_event, aux_stream));
// Delay final stream wait dependency to alltoall to ensure sync runs concurrently with initial transpose/pack
}
}
#endif

cudecompTransposePerformanceSample* current_sample = nullptr;
if (handle->performance_report_enable) {
Expand Down Expand Up @@ -609,7 +623,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c
}

if (pipelined) {
bool nvshmem_synced = false;
for (int j = 0; j < splits_b.size(); ++j) {
int src_rank, dst_rank;
getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank);
Expand Down Expand Up @@ -639,8 +652,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c

if (o2 != o1) {
cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets,
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced,
current_sample);
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample);
}

if (o2 != o3) {
Expand Down Expand Up @@ -704,7 +716,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c
if (i > 0) { strides_out[i] = strides_out[i - 1] * extents_h[i - 1]; }
}

bool nvshmem_synced = false;
for (int j = 0; j < splits_b.size(); ++j) {
int src_rank, dst_rank;
getAlltoallPeerRanks(grid_desc, comm_axis, j, src_rank, dst_rank);
Expand Down Expand Up @@ -735,8 +746,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c

if (o2 != o1) {
cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets,
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced,
current_sample);
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample);
}
}

Expand All @@ -760,7 +770,6 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c
}
} else {
// Unpack
bool nvshmem_synced = false;
int memcpy_count = 0;
cudecompBatchedD2DMemcpy3DParams<T> memcpy_params;
for (int j = 0; j < splits_a.size(); ++j) {
Expand Down Expand Up @@ -793,8 +802,7 @@ static void cudecompTranspose_(int ax, int dir, const cudecompHandle_t handle, c

if (o2 != o1) {
cudecompAlltoallPipelined(handle, grid_desc, o1, send_counts, send_offsets, o2, recv_counts, recv_offsets,
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, nvshmem_synced,
current_sample);
recv_offsets_nvshmem, comm_axis, src_ranks, dst_ranks, stream, current_sample);
}
}

Expand Down
20 changes: 20 additions & 0 deletions src/autotune.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d
nvshmem_team_config_t tmp;
nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0,
&grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team);
grid_desc->row_comm_info.nvshmem_signals =
(uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t));
CHECK_CUDA(
cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t)));
grid_desc->col_comm_info.nvshmem_signals =
(uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t));
CHECK_CUDA(
cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t)));
#endif
}

Expand Down Expand Up @@ -451,6 +459,8 @@ void autotuneTransposeBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_d
#ifdef ENABLE_NVSHMEM
nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team);
nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team);
nvshmem_free(grid_desc->row_comm_info.nvshmem_signals);
nvshmem_free(grid_desc->col_comm_info.nvshmem_signals);
grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID;
grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID;
#endif
Expand Down Expand Up @@ -691,6 +701,14 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc,
nvshmem_team_config_t tmp;
nvshmem_team_split_2d(NVSHMEM_TEAM_WORLD, grid_desc->config.pdims[1], &tmp, 0,
&grid_desc->row_comm_info.nvshmem_team, &tmp, 0, &grid_desc->col_comm_info.nvshmem_team);
grid_desc->row_comm_info.nvshmem_signals =
(uint64_t*)nvshmem_malloc(grid_desc->row_comm_info.nranks * sizeof(uint64_t));
CHECK_CUDA(
cudaMemset(grid_desc->row_comm_info.nvshmem_signals, 0, grid_desc->row_comm_info.nranks * sizeof(uint64_t)));
grid_desc->col_comm_info.nvshmem_signals =
(uint64_t*)nvshmem_malloc(grid_desc->col_comm_info.nranks * sizeof(uint64_t));
CHECK_CUDA(
cudaMemset(grid_desc->col_comm_info.nvshmem_signals, 0, grid_desc->col_comm_info.nranks * sizeof(uint64_t)));
#endif
}

Expand Down Expand Up @@ -804,6 +822,8 @@ void autotuneHaloBackend(cudecompHandle_t handle, cudecompGridDesc_t grid_desc,
#ifdef ENABLE_NVSHMEM
nvshmem_team_destroy(grid_desc->row_comm_info.nvshmem_team);
nvshmem_team_destroy(grid_desc->col_comm_info.nvshmem_team);
nvshmem_free(grid_desc->row_comm_info.nvshmem_signals);
nvshmem_free(grid_desc->col_comm_info.nvshmem_signals);
grid_desc->row_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID;
grid_desc->col_comm_info.nvshmem_team = NVSHMEM_TEAM_INVALID;
#endif
Expand Down
Loading
Loading