QMCPACK
WalkerControlMPI.cpp
Go to the documentation of this file.
1 //////////////////////////////////////////////////////////////////////////////////////
2 // This file is distributed under the University of Illinois/NCSA Open Source License.
3 // See LICENSE file in top directory for details.
4 //
5 // Copyright (c) 2020 QMCPACK developers.
6 //
7 // File developed by: Peter Doak, doakpw@ornl.gov, Oak Ridge National Lab
8 // Jeongnim Kim, jeongnim.kim@gmail.com, University of Illinois at Urbana-Champaign
9 // Jeremy McMinnis, jmcminis@gmail.com, University of Illinois at Urbana-Champaign
10 // Mark A. Berrill, berrillma@ornl.gov, Oak Ridge National Laboratory
11 //
12 // File created by: Jeongnim Kim, jeongnim.kim@gmail.com, University of Illinois at Urbana-Champaign
13 //////////////////////////////////////////////////////////////////////////////////////
14 
15 
16 #include <array>
17 #include <cmath>
18 #include <sstream>
19 
20 #include "WalkerControlMPI.h"
22 #include "Utilities/FairDivide.h"
24 
25 namespace qmcplusplus
26 {
27 //#define MCWALKERSET_MPI_DEBUG
28 
30 {
39 };
40 
42  {DMC_MPI_imbalance, "WalkerControlMPI::imbalance"},
43  {DMC_MPI_prebalance, "WalkerControlMPI::pre-loadbalance"},
44  {DMC_MPI_copyWalkers, "WalkerControlMPI::copyWalkers"},
45  {DMC_MPI_allreduce, "WalkerControlMPI::allreduce"},
46  {DMC_MPI_loadbalance, "WalkerControlMPI::loadbalance"},
47  {DMC_MPI_send, "WalkerControlMPI::send"},
48  {DMC_MPI_recv, "WalkerControlMPI::recv"}};
49 
50 /** default constructor
51  *
52  * set SwapMode? SwapMode is set to 1 but what does that mean?
53  * This object persists inside the SFNB which also persists
54  * The zeroing here will not happen in later QMC sections...
55  * This seems problematic in that NumWalkersSent will start at a
56  * value of no concern to the current section.
57  *
58  * In the new drivers SFNB should throw an except if there is attempted
59  * reuse of WalkerController
60  */
63 {
64  NumWalkersSent = 0;
65  SwapMode = 1;
66  Cur_min = 0;
67  Cur_max = 0;
68 }
69 
70 /** Perform branch and swap walkers as required
71  *
72  * It takes 5 steps:
73  * 1. sortWalkers marks good and bad walkers.
74  * 2. allreduce collects the number of good walkers + copies on every rank.
75  * 3. applyNmaxNmin avoids too large or too small global population.
76  * 4. swapWalkersSimple makes a decision of load balancing and send/recv walkers.
77  * Receiving side recycles bad walkers' memory first.
78  * 5. copyWalkers generates copies of good walkers.
79  * In order to minimize the memory footprint fluctuation
80  * the walker copying is placed as the last step.
81  * In order to reduce the time for allocating walker memory,
82  * this algorithm does not destroy the bad walkers in step 1.
83  * All the bad walkers are recycled as much as possible in step 3/4.
84  */
86 {
87  ScopedTimer local_timer(myTimers[DMC_MPI_branch]);
88  {
90  std::fill(curData.begin(), curData.end(), 0.0);
91  sortWalkers(W);
92  //use NumWalkersSent from the previous exchange
94  //update the number of walkers for this rank
95  //Causes implicit conversion to FullPrecRealType
97  //{ ScopedTimer local_timer(myTimers[DMC_MPI_imbalance]);
98  //}
99  {
100  ScopedTimer local_timer(myTimers[DMC_MPI_allreduce]);
102  }
103  measureProperties(iter);
105  for (int i = 0, j = LE_MAX; i < num_contexts_; i++, j++)
106  NumPerRank[i] = static_cast<int>(curData[j]);
107  int current_population = std::accumulate(NumPerRank.begin(), NumPerRank.end(), 0);
108 
109  Cur_pop = applyNmaxNmin(current_population);
110  }
111  {
114  }
115  {
117  copyWalkers(W);
118  }
119  //set Weight and Multiplicity to default values
120  for (auto& walker : W)
121  {
122  walker->Weight = 1.0;
123  walker->Multiplicity = 1.0;
124  }
125  //update the walkers offsets
126  W.setWalkerOffsets(FairOffSet);
127 
128  return Cur_pop;
129 }
130 
131 // determine new walker population on each MPI rank
133  int num_contexts,
134  int my_context,
135  std::vector<int>& num_per_rank,
136  std::vector<int>& fair_offset,
137  std::vector<int>& minus,
138  std::vector<int>& plus)
139 {
140  FairDivideLow(cur_pop, num_contexts, fair_offset);
141  for (int ip = 0; ip < num_contexts; ip++)
142  {
143  // (FairOffSet[ip + 1] - FairOffSet[ip]) gives the partiion ip walker pop
144  int dn = num_per_rank[ip] - (fair_offset[ip + 1] - fair_offset[ip]);
145  num_per_rank[ip] -= dn;
146  if (dn > 0)
147  {
148  plus.insert(plus.end(), dn, ip);
149  }
150  else if (dn < 0)
151  {
152  minus.insert(minus.end(), -dn, ip);
153  }
154  }
155 #ifndef NDEBUG
156  if (plus.size() != minus.size())
157  {
158  app_error() << "Walker send/recv pattern doesn't match. "
159  << "The send size " << plus.size() << " is not equal to the recv size " << minus.size() << " ."
160  << std::endl;
161  throw std::runtime_error("Trying to swap in WalkerControlMPI::swapWalkersSimple with mismatched queues");
162  }
163 #endif
164 }
165 
166 /** swap Walkers with Recv/Send or Irecv/Isend
167  *
168  * The algorithm ensures that the load per rank can differ only by one walker.
169  * Each MPI rank can only send or receive or be silent.
170  * The communication is one-dimensional and very local.
171  * If multiple copies of a walker need to be sent to the target rank, only send one.
172  * The number of copies is communicated ahead via blocking send/recv.
173  * Then the walkers are transferred via blocking or non-blocking send/recv.
174  * The blocking send/recv may become serialized and worsen load imbalance.
175  * Non blocking send/recv algorithm avoids serialization completely.
176  */
178 {
179  std::vector<int> minus, plus;
180  //legacy code does not modify NumPerRank in this call so we copy NumPerRank
181  std::vector<int> num_per_rank(NumPerRank);
183 
184  if (good_w.empty() && bad_w.empty())
185  {
186  app_error() << "It should never happen that no walkers, "
187  << "neither good nor bad, exist on a rank. "
188  << "Please report to developers. " << std::endl;
189  APP_ABORT("WalkerControlMPI::swapWalkersSimple no existing walker");
190  }
191 
192  Walker_t& wRef(*(good_w.empty() ? bad_w[0] : good_w[0]));
193  std::vector<std::unique_ptr<Walker_t>> newW;
194  std::vector<int> ncopy_newW;
195 #ifdef MCWALKERSET_MPI_DEBUG
196  std::array<char, 128> fname;
197  if (std::snprintf(fname.data(), fname.size() "test.%d", MyContext) < 0)
198  throw std::runtime_error("Error generating filename");
199  std::ofstream fout(fname.data(), std::ios::app);
200  //fout << NumSwaps << " " << Cur_pop << " ";
201  //for(int ic=0; ic<NumContexts; ic++) fout << NumPerRank[ic] << " ";
202  //fout << " | ";
203  //for(int ic=0; ic<NumContexts; ic++) fout << FairOffSet[ic+1]-FairOffSet[ic] << " ";
204  //fout << " | ";
205  for (int ic = 0; ic < plus.size(); ic++)
206  {
207  fout << plus[ic] << " ";
208  }
209  fout << " | ";
210  for (int ic = 0; ic < minus.size(); ic++)
211  {
212  fout << minus[ic] << " ";
213  }
214  fout << std::endl;
215 #endif
216  int nswap = plus.size();
217  // sort good walkers by the number of copies
218  assert(good_w.size() == ncopy_w.size());
219  std::vector<std::pair<int, int>> ncopy_pairs;
220  for (int iw = 0; iw < ncopy_w.size(); iw++)
221  ncopy_pairs.push_back(std::make_pair(ncopy_w[iw], iw));
222  std::sort(ncopy_pairs.begin(), ncopy_pairs.end());
223 
224  int nsend = 0;
225  struct job
226  {
227  const int walkerID;
228  const int target;
229  job(int wid, int target_in) : walkerID(wid), target(target_in){};
230  };
231  std::vector<job> job_list;
232  for (int ic = 0; ic < nswap; ic++)
233  {
234  if (plus[ic] == MyContext)
235  {
236  // always send the last good walker
237  auto& awalker = good_w[ncopy_pairs.back().second];
238  // count the possible copies in one send
239  int nsentcopy = 0;
240 
241  for (int id = ic + 1; id < nswap; id++)
242  if (plus[ic] == plus[id] && minus[ic] == minus[id] && ncopy_pairs.back().first > 0)
243  { // increment copy counter
244  ncopy_pairs.back().first--;
245  nsentcopy++;
246  }
247  else
248  { // not enough copies to send or not the same send/recv pair
249  break;
250  }
251 
252  // send the number of copies to the target
253  myComm->comm.send_value(nsentcopy, minus[ic]);
254  job_list.push_back(job(ncopy_pairs.back().second, minus[ic]));
255 #ifdef MCWALKERSET_MPI_DEBUG
256  fout << "rank " << plus[ic] << " sends a walker with " << nsentcopy << " copies to rank " << minus[ic]
257  << std::endl;
258 #endif
259 
260  // update counter and cursor
261  ++nsend;
262  ic += nsentcopy;
263 
264  // update copy counter
265  if (ncopy_pairs.back().first > 0)
266  {
267  ncopy_pairs.back().first--;
268  std::sort(ncopy_pairs.begin(), ncopy_pairs.end());
269  }
270  else
271  {
272  ncopy_pairs.pop_back();
273  bad_w.push_back(std::make_unique<Walker_t>(*awalker));
274  }
275  }
276  if (minus[ic] == MyContext)
277  {
278  std::unique_ptr<Walker_t> awalker;
279  if (!bad_w.empty())
280  {
281  awalker = std::move(bad_w.back());
282  bad_w.pop_back();
283  }
284 
285  int nsentcopy = 0;
286  // recv the number of copies from the target
287  myComm->comm.receive_n(&nsentcopy, 1, plus[ic]);
288  job_list.push_back(job(newW.size(), plus[ic]));
289  if (plus[ic] != plus[ic + nsentcopy] || minus[ic] != minus[ic + nsentcopy])
290  APP_ABORT("WalkerControlMPI::swapWalkersSimple send/recv pair checking failed!");
291 #ifdef MCWALKERSET_MPI_DEBUG
292  fout << "rank " << minus[ic] << " recvs a walker with " << nsentcopy << " copies from rank " << plus[ic]
293  << std::endl;
294 #endif
295 
296  // save the new walker
297  if (awalker)
298  {
299  newW.push_back(std::make_unique<Walker_t>(*awalker));
300  }
301  else
302  {
303  newW.push_back(nullptr);
304  }
305  ncopy_newW.push_back(nsentcopy);
306  // update cursor
307  ic += nsentcopy;
308  }
309  }
310 
311  if (nsend > 0)
312  {
313  std::vector<mpi3::request> requests;
314  // mark all walkers not in send
315  for (auto jobit = job_list.begin(); jobit != job_list.end(); jobit++)
316  good_w[jobit->walkerID]->SendInProgress = false;
317  for (auto jobit = job_list.begin(); jobit != job_list.end(); jobit++)
318  {
319  // pack data and send
320  auto& awalker = good_w[jobit->walkerID];
321  size_t byteSize = awalker->byteSize();
322  if (!awalker->SendInProgress)
323  {
324  awalker->updateBuffer();
325  awalker->SendInProgress = true;
326  }
327  if (use_nonblocking)
328  requests.push_back(myComm->comm.isend_n(awalker->DataSet.data(), byteSize, jobit->target));
329  else
330  {
331  ScopedTimer local_timer(myTimers[DMC_MPI_send]);
332  myComm->comm.send_n(awalker->DataSet.data(), byteSize, jobit->target);
333  }
334  }
335  if (use_nonblocking)
336  {
337  // wait all the isend
338  for (int im = 0; im < requests.size(); im++)
339  {
340  ScopedTimer local_timer(myTimers[DMC_MPI_send]);
341  requests[im].wait();
342  }
343  requests.clear();
344  }
345  }
346  else
347  {
348  std::vector<mpi3::request> requests;
349  for (auto jobit = job_list.begin(); jobit != job_list.end(); jobit++)
350  {
351  // recv and unpack data
352  auto& awalker = newW[jobit->walkerID];
353  if (!awalker)
354  awalker = std::make_unique<Walker_t>(wRef);
355  size_t byteSize = awalker->byteSize();
356  if (use_nonblocking)
357  requests.push_back(myComm->comm.ireceive_n(awalker->DataSet.data(), byteSize, jobit->target));
358  else
359  {
360  ScopedTimer local_timer(myTimers[DMC_MPI_recv]);
361  myComm->comm.receive_n(awalker->DataSet.data(), byteSize, jobit->target);
362  awalker->copyFromBuffer();
363  }
364  }
365  if (use_nonblocking)
366  {
367  std::vector<bool> not_completed(requests.size(), true);
368  bool completed = false;
369  while (!completed)
370  {
371  completed = true;
372  for (int im = 0; im < requests.size(); im++)
373  if (not_completed[im])
374  {
375  if (requests[im].completed())
376  {
377  newW[job_list[im].walkerID]->copyFromBuffer();
378  not_completed[im] = false;
379  }
380  else
381  completed = false;
382  }
383  }
384  requests.clear();
385  }
386  }
387  //save the number of walkers sent
388  NumWalkersSent = nsend;
389  // rebuild good_w and ncopy_w
390  std::vector<std::unique_ptr<Walker_t>> good_w_temp(std::move(good_w));
391  good_w.resize(ncopy_pairs.size());
392  ncopy_w.resize(ncopy_pairs.size());
393  for (int iw = 0; iw < ncopy_pairs.size(); iw++)
394  {
395  good_w[iw] = std::move(good_w_temp[ncopy_pairs[iw].second]);
396  ncopy_w[iw] = ncopy_pairs[iw].first;
397  }
398  //add walkers from other rank
399  if (newW.size())
400  {
401  good_w.insert(good_w.end(), std::make_move_iterator(newW.begin()), std::make_move_iterator(newW.end()));
402  ncopy_w.insert(ncopy_w.end(), ncopy_newW.begin(), ncopy_newW.end());
403  }
404 
405  assert(std::accumulate(ncopy_w.begin(), ncopy_w.end(), ncopy_w.size()) == num_per_rank[MyContext]);
406 }
407 
408 } // namespace qmcplusplus
std::vector< std::unique_ptr< Walker_t > > bad_w
IndexType NumWalkers
current number of walkers per processor
int copyWalkers(MCWalkerConfiguration &W)
legacy: copy good walkers to W
A set of walkers that are to be advanced by Metropolis Monte Carlo.
std::vector< int > NumPerRank
number of particle per rank
int branch(int iter, MCWalkerConfiguration &W, FullPrecRealType trigger) override
legacy: perform branch and swap walkers as required
helper functions for EinsplineSetBuilder
Definition: Configuration.h:43
std::vector< TimerIDName_t< T > > TimerNameList_t
Definition: TimerManager.h:156
std::ostream & app_error()
Definition: OutputManager.h:67
std::vector< int > ncopy_w
temporary storage for copy counters
static void determineNewWalkerPopulation(int cur_pop, int num_contexts, int my_context, std::vector< int > &num_per_rank, std::vector< int > &fair_offset, std::vector< int > &minus, std::vector< int > &plus)
creates the distribution plan
void FairDivideLow(int ntot, int npart, IV &adist)
partition ntot elements among npart
Definition: FairDivide.h:114
for(int i=0;i< size_test;++i) CHECK(Approx(gauss_random_vals[offset_for_rs+i])
WalkerControlMPI(Communicate *comm)
default constructor
bool use_nonblocking
Use non-blocking isend/irecv.
TimerNameList_t< DMC_MPI_Timers > DMCMPITimerNames
Wrapping information on parallelism.
Definition: Communicate.h:68
std::vector< int > FairOffSet
offset of the particle index for a fair distribution
void allreduce(T &)
A collection of functions for dividing fairly.
Communicate * myComm
pointer to Communicate
Definition: MPIObjectBase.h:62
void measureProperties(int iter)
take averages and writes to a file
std::vector< FullPrecRealType > curData
any temporary data includes many ridiculous conversions of integral types to and from fp ...
#define APP_ABORT(msg)
Widely used but deprecated fatal error macros from legacy code.
Definition: AppAbort.h:27
IndexType num_contexts_
number of contexts
Declaration of a TrialWaveFunction.
std::vector< std::unique_ptr< Walker_t > > good_w
temporary storage for good and bad walkers
void swapWalkersSimple(MCWalkerConfiguration &W)
legacy: swap implementation
Base class to control the walkers for DMC simulations.
QMCTraits::FullPrecRealType FullPrecRealType
typedef of FullPrecRealType
TimerManager< NewTimer > & getGlobalTimerManager()
int sortWalkers(MCWalkerConfiguration &W)
legacy: sort Walkers between good and bad and prepare branching
MCDataType< FullPrecRealType > EnsembleProperty
MCDataType< FullPrecRealType > ensemble_property_
ensemble properties
IndexType SwapMode
0 is default
A container class to represent a walker.
Definition: Walker.h:49
int applyNmaxNmin(int current_population)
legacy: apply per rank limit Nmax and Nmin