37 outputBuffer(nullptr),outputBufferSize(0),outputBufferPosition(0),
39 destinationRank(destination),
40 run(const_cast<
G4Run*>(aRun)),
45#define DMSG( LVL , MSG ) { if ( verbose > LVL ) { G4cout << MSG << G4endl; } }
50 G4int nevts =
run->GetNumberOfEvent();
51 DMSG( 1 ,
"G4VUserMPIrunMerger::Send() : Sending a G4run ("
52 <<
run<<
") with "<<nevts<<
" events to: "<<destination);
58 G4int newbuffsize = 0;
60 newbuffsize += (el.dt.Get_size()*el.count);
62 char* buffer =
new char[newbuffsize];
65 std::fill(buffer,buffer+newbuffsize,0);
72#ifdef G4MPI_USE_MPI_PACK_NOT_CONST
73 MPI_Pack(
const_cast<void*
>(el.p_data),el.count,el.dt,
75 MPI_Pack(el.p_data,el.count,el.dt,
84 DMSG(2 ,
"G4VUserMPIrunMerger::Send() : Done ");
91 DMSG( 1 ,
"G4VUserMPIrunMerger::Receive(...) , this rank : "
92 <<parentComm->Get_rank()<<
" and receiving from : "<<source);
100 const G4int newbuffsize = status.Get_count(MPI::PACKED);
101 DMSG(2,
"Preparing to receive buffer of size: "<<newbuffsize);
104 DMSG(3,
"New larger buffer expected, resize");
107 buffer =
new char[newbuffsize];
110 std::fill(buffer,buffer+newbuffsize,0);
120 if ( aNewRun ==
nullptr ) aNewRun =
new G4Run;
129 for ( G4int i = 0 ; i<nevets ; ++i ) aNewRun->RecordEvent(
nullptr);
132 DMSG(2,
"Before G4Run::Merge : "<<
run->GetNumberOfEvent());
133 run->Merge( aNewRun );
134 DMSG(2,
"After G4Run::Merge : "<<
run->GetNumberOfEvent());
143 DMSG(0,
"G4VUserMPIrunMerger::Merge called");
145 const unsigned int myrank = parentComm->Get_rank();
150 DMSG(1,
"Comm world size is 1, nothing to do");
155 const G4double sttime = MPI::Wtime();
158 typedef std::function<void(
unsigned int)> handler_t;
159 using std::placeholders::_1;
162 std::function<void(
void)> barrier =
181 const G4double elapsed = MPI::Wtime() - sttime;
187 G4cout<<
"G4VUserMPIrunMerger::Merge() - data transfer performances: "
188 <<double(total)/1000./elapsed<<
" kB/s"
189 <<
" (Total Data Transfer= "<<double(total)/1000<<
" kB in "
190 <<elapsed<<
" s)."<<G4endl;
194 DMSG(0,
"G4VUserMPIrunMerger::Merge done");
static G4MPImanager * GetManager()
G4int GetActiveSize() const
const MPI::Intracomm * GetComm() const
void OutputUserData(void *input_data, const MPI::Datatype &dt, int count)
unsigned int destinationRank
G4int outputBufferPosition
void Send(const unsigned int destination)
std::vector< const_registered_data > input_userdata
std::vector< registered_data > output_userdata
virtual G4Run * UnPack()=0
MPI::Intracomm COMM_G4COMMAND_
void InputUserData(void *input_data, const MPI::Datatype &dt, int count)
void SetupOutputBuffer(char *buff, G4int size, G4int position)
void Receive(const unsigned int source)
void Merge(std::function< void(unsigned int)> senderF, std::function< void(unsigned int)> receiverF, std::function< void(void)> barrierF, unsigned int commSize, unsigned int myrank)