*This article was originally published in Computers in Physics*,
Volume 7, Number 4, 1993, pp. 418-424. Revised for MPI, Nov., 1998

Department of Physics and Astronomy

University of California, Los Angeles

Los Angeles, CA 90095-1547

Recently, a number of computer vendors have provided automatic parallelizing compilers for shared memory computers. This article is about how to use such compilers to design and write parallel code which can be used both on shared memory as well as distributed memory computers. The process of writing parallel code then becomes similar to the process of writing vectorized code, even for distributed memory computers. A number of guidelines for producing portable code are suggested and are illustrated with examples. Portability is an important consideration if programming time is valuable. Furthermore, parallel code can be debugged serially for the most part, which simplifies the process.

PARAMETER (NX=32) DIMENSION F(NX) DO 10 J = 1, NX F(J) = J 10 CONTINUEThis can be parallelized by breaking up the single loop of length NX into NVP loops of length NXP as follows:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP,NVP) DO 30 K = 1, NVP JOFF = NXP*(K - 1) DO 20 J = 1, NXP F(J,K) = J + JOFF 20 CONTINUE 30 CONTINUEwhere for simplicity I have assumed that NX is an exact multiple of NVP. Although the array F now has a different dimension, it contains the same number of words and the same data as before (the formula J+NXP*(K-1) is just the location in linear storage of the element (J,K) of the two dimensional Fortran array F). The compiler can now run the outer index K in parallel. In general, compilers will give messages when it can parallelize the outer loop and give reasons why when it can not. For example, here is a sample output from the IBM compiler for this loop:

DIMENSION F(NXP,NVP) PARA +------- DO 30 K = 1, NVP VECT|+------- DO 20 J = 1, NXP ||_______ F(J,K) = J + JOFF | _______The Cray preprocessor gives similar information:

dimension f(nxp,nvp) P--- do 30 k = 1, nvp P joff = nxp*(k - 1) P V- do 20 j = 1, nxp P V f(j,k) = j + joff P V- 20 continue P- 30 continueOne can thus mentally associate each index K with one of NVP parallel processors. Note that the variable JOFF is special because if the outer loop is run in parallel, there have to be NVP copies of it, one for each processor, since its value depends on the index K. Such variables are known as private variables. Variables such as NXP which do not depend on K are known as shared variables, since each processor has the same value for that variable. Some years ago, extensions to Fortran were defined to help compilers recognize such parallelism. (Cray microtasking is an example of this.) These extensions were in the nature of explicitly declaring the loop over K a parallel loop and declaring the variable JOFF a private variable. Generally such extensions are no longer needed, as the compilers are usually capable of recognizing these declarations without user intervention. In reality, private variables are actually arrays of size NVP, and to avoid confusion, one can always declare them to be such as follows:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP,NVP), JOFF(NVP) DO 30 K = 1, NVP JOFF(K) = NXP*(K - 1) DO 20 J = 1, NXP F(J,K) = J + JOFF(K) 20 CONTINUE 30 CONTINUEOn a distributed memory computer, one can think of each index K as residing on a different computer, so it is no longer an explicit loop index or array dimension. Instead K comes from a function call which returns a different processor ID (0 < ID <NVP-1) on each node:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP) CALL MPI_COMM_RANK(MPI_COMM_WORLD,ID,IERR) K = ID + 1 JOFF = NXP*(K - 1) DO 40 J = 1, NXP F(J) = J + JOFF 40 CONTINUEHere we are using a call to the MPI standard message-passing library to determine the processor ID. I will use the index K to identify processors,which is one larger than the usual convention on distributed memory computers.

Finally, it is possible to combine the two versions of this loop into one version that would run on both computers. To do this, we keep the loop index K, but its extent is now given by a new variable NBLOK, which is equal to NVP on a shared memory machine but has the value 1 on a distributed memory machine. If the value of K is needed other than for indexing, then it is given by a new combination K + KOFF. On a shared memory machine, KOFF is always zero and K varies, while on a distributed memory machine, K is always 1 and KOFF varies, but the combination K + KOFF always has the same value on both machines. Whether a machine is a shared or distributed memory computer is controlled by a new parameter MSHARE, which has the value 1 for shared memory and 0 for distributed memory. The result is:

PARAMETER (NX=2,NVP=4,MSHARE=1) PARAMETER (NXP=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXP,NBLOK) IF (MSHARE.EQ.1) THEN KOFF = 0 ELSE CALL MPI_COMM_RANK(MPI_COMM_WORLD,KOFF,IERR) ENDIF DO 60 K = 1, NBLOK JOFF = NXP*(K + KOFF - 1) DO 50 J = 1, NXP F(J,K) = J + JOFF 50 CONTINUE 60 CONTINUEOn a shared memory computer we can use a dummy function MPI_COMM_RANK since it is not called and may not be available.

One important advantage of this approach is that the program can be run on serial computers or run on some parallel computers with the automatic parallelization turned off. This makes debugging much easier. Up to now, we have associated the outer loop index with one of NVP processors. On many shared memory computers, NVP is not necessarily the actual number of physical processors. But on distributed memory computers, it often is. Thus for portability, it is best to design code so that NVP can equal the number of physical processors.

PARAMETER (NX=32) DIMENSION F(NX) FSUM = 0. DO 10 J = 1, NX FSUM = FSUM + F(J) 10 CONTINUEThis can be written as a double loop for a shared memory computer trivially:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP,NVP) FSUM = 0. DO 30 K = 1, NVP DO 20 J = 1, NXP FSUM = FSUM + F(J,K) 20 CONTINUE 30 CONTINUEOn the distributed memory computer, the outer loop is implicit, as in the previous example, and one has:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP) FSUM = 0. DO 40 J = 1, NXP FSUM = FSUM + F(J) 40 CONTINUE CALL MPI_ALLREDUCE(FSUM,TEMP,1,MPI_REAL,MPI_SUM,MPI_COMM_WORLD,IERR) FSUM = TEMPThe call to subroutine MPI_ALLREDUCE sums all instances of the variable FSUM across processors and distributes the answer to all processors in the variable TEMP. Thus on the distributed memory computers, a partial sum over the index J is first performed by each processor. Then the partial sums are added by the subroutine MPI_ALLREDUCE. Finally, the portable version of the loop looks like:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXP=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXP,NBLOK) FSUM = 0. DO 60 K = 1, NBLOK DO 50 J = 1, NXP FSUM = FSUM + F(J,K) 50 CONTINUE 60 CONTINUE IF (MSHARE.EQ.0) THEN CALL MPI_ALLREDUCE(FSUM,TEMP,1,MPI_REAL,MPI_SUM,MPI_COMM_WORLD,IERR) FSUM = TEMP ENDIFOn a shared memory computer we can use a dummy function MPI_ALLREDUCE since it is not called and may not be available.

PARAMETER (NX=32) DIMENSION F(NX), R(NX) DO 10 J = 2, NX R(J) = F(J) - F(J-1) 10 CONTINUE R(1) = F(1) - F(NX)When one breaks this up into a double loop as before, one obtains:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP,NVP), R(NXP,NVP) DO 30 K = 1, NVP DO 20 J = 2, NXP R(J,K) = F(J,K) - F(J-1,K) 20 CONTINUE IF (K.GT.1) THEN R(1,K) = F(1,K) - F(NXP,K-1) ELSE R(1,K) = F(1,K) - F(NXP,K+NVP-1) ENDIF 30 CONTINUENotice that we now have expressions such as:

R(1,K) = F(1,K) - F(NXP,K-1)where both K and K-1 appear as indices in the same expressions. That means that the processor associated with K needs information from the processor associated with K-1. On a shared memory machine, this presents no problem, and both the Cray and IBM compiler will parallelize the loop 30 above. However, on a distributed memory machine, this means that one processor must physically send data to another processor, so this loop must be written differently.

First, it should be noted that on a distributed memory computer, one cannot write expressions which involve mixed data from different processors (that is, involving different K indices), other than simply copying the data. Second, on distributed memory processors, a given processor does not get data from another processor, it sends data to another. That means that when copying data one should have the value K on the right hand side rather than on the left hand side as in our example. Using these restrictions, we can make this example more portable by writing it as follows:

PARAMETER (NX=32,NVP=4) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP,NVP), R(NXP,NVP) DO 50 K = 1, NVP DO 40 J = 2, NXP R(J,K) = F(J,K) - F(J-1,K) 40 CONTINUE IF (K.LT.NVP) THEN R(1,K+1) = F(NXP,K) ELSE R(1,K-NVP+1) = F(NXP,K) ENDIF 50 CONTINUE DO 60 K = 1, NVP R(1,K) = F(1,K) - R(1,K) 60 CONTINUENote that in this new version we now have two loops over K (loops 50 and 60), where we had only one before (loop 30). In loop 50, we copied the data which involved different K indices. Then in loop 60, we performed the actual arithmetic where everyone has the same index K. In some cases, loop 50 may require a compiler directive to tell the compiler there is no recursion. Now let's look at how one would write this example on a distributed memory computer:

PARAMETER (NX=32,NVP=4,) PARAMETER (NXP=NX/NVP) DIMENSION F(NXP), R(NXP) DIMENSION ISTATUS(MPI_STATUS_SIZE) CALL MPI_COMM_RANK(MPI_COMM_WORLD,ID,IERR) K = ID + 1 DO 70 J = 2, NXP R(J) = F(J) - F(J-1) 70 CONTINUE KLEFT = K - 1 IF (KLEFT.LT.1) KLEFT = KLEFT + NVP KRIGHT = K + 1 IF (KRIGHT.GT.NVP) KRIGHT = KRIGHT - NVP CALL MPI_IRECV(R,1,MPI_REAL,KLEFT-1,1,MPI_COMM_WORLD,MSID,IERR) CALL MPI_SEND(F(NXP),1,MPI_REAL,KRIGHT-1,1,MPI_COMM_WORLD,IERR) CALL MPI_WAIT(MSID,ISTATUS,IERR) R(1) = F(1) - R(1)The loop index K is now implicit as in the previous distributed memory examples. The first loop (50) has been replaced by calls to MPI_IRECV and MPI_SEND, which is one way distibuted memory computers sends data between processors. In this example, the processor associated with index K first uses MPI_IRECV to tell the system where to receive data from processor K-1 or K+NVP-1. (The data is not necessarily received yet). Processor K then uses MPI_SEND to send the contents of F(NXP) either to processor K+1 or to processor K-NVP+1. Finally, processor K calls MPI_WAIT to make sure that data from processor K-1 or K+NVP-1 has been received and then stores it in R(1).

Finally, it is possible to combine both versions of this loop into one:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXP=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXP,NBLOK), R(NXP,NBLOK) DIMENSION ISTATUS(MPI_STATUS_SIZE) IF (MSHARE.EQ.1) THEN KOFF = 0 ELSE CALL MPI_COMM_RANK(MPI_COMM_WORLD,KOFF,IERR) ENDIF DO 90 K = 1, NBLOK DO 80 J = 2, NXP R(J,K) = F(J,K) - F(J-1,K) 80 CONTINUE KK = K + KOFF KLEFT = KK - 1 IF (KLEFT.LT.1) KLEFT = KLEFT + NVP IF (MSHARE.EQ.1) THEN R(1,K) = F(NXP,KLEFT) ELSE KRIGHT = KK + 1 IF (KRIGHT.GT.NVP) KRIGHT = KRIGHT - NVP CALL MPI_IRECV(R(1,K),1,MPI_REAL,KLEFT-1,1,MPI_COMM_WORLD,MSID,IERR) CALL MPI_SEND(F(NXP,K),1,MPI_REAL,KRIGHT-1,1,MPI_COMM_WORLD,IERR) CALL MPI_WAIT(MSID,ISTATUS,IERR) ENDIF R(1,K) = F(1,K) - R(1,K) 90 CONTINUEAs before, on a shared memory computer we can use a dummy functions for MPI_IRECV, MPI_SEND, and MPI_WAIT, since they are not called and may not be available.

I have also made one other change in the shared memory segment of loop 90. I have not used the form shown in loop 50 above, where the index K is on the right hand side, but rather the form in loop 30 above, where the index K is on the left hand side. The reason is that some parallel compilers have difficulty recognizing the absence of recursion without compiler directives when the index on the left hand side is not K. So to avoid having to add compiler directives, I have written the loop for the shared memory version in the form shown.

PARAMETER (NX=32) DIMENSION F(NX), N(NX) L = 0 DO 10 J = 1, NX IF (F(J).GT.BOUND) THEN L = L + 1 N(L) = J ENDIF 10 CONTINUEIn general, this loop cannot be vectorized (except by the IBM compiler) because the value of the counter L depends on its previous value and it is then subsequently used as an address inside the loop. Following our usual procedure one can write this loop in the following portable fashion:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXP=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXP,NBLOK), N(NXP,NBLOK) DO 30 K = 1, NBLOK L = 0 DO 20 J = 1, NXP IF (F(J,K).GT.BOUND) THEN L = L + 1 N(L,K) = J ENDIF 20 CONTINUE 30 CONTINUENote that we have not actually removed the recursion. We have just replaced it with NVP separate recursions which one can then run in parallel.

For more complex recursions than this one, such as random number generators, one must exercise more care in initializing the separate recursions before being able to run them in parallel. Note that since the index K has no data dependencies, one could also use this technique to vectorize the outer loop instead of running it in parallel.

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXPMAX=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXPMAX,NBLOK), R(NXPMAX, NBLOK), NXP(NBLOK) DO 20 K = 1, NBLOK DO 10 J = 1, NXP(K) F(J,K) = R(J,K) 10 CONTINUE 20 CONTINUEIn this example, the range of the inner loop index NXP(K) depends on the value of the outer loop index. In other words, the length of the inner loop is different for each processor. The Cray precompiler can parallelize this, but the IBM cannot. The IBM compiler must have all inner loops of the same length, that is, it has a SIMD view of the world where all processors are executing the same instruction. The Cray precompiler, on the other hand, has a MIMD view of the world, where different processors can execute different instructions. Generally, such differences are fixed by having all processors executing the same instruction, and masking out unneeded ones with an IF statement. Thus a more portable version of this example is given by:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER(NXPMAX=NXNVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXPMAX,NBLOK), R(NXPMAX, NBLOK), NXP(NBLOK) DO 40 K = 1, NBLOK DO 30 J = 1, NXPMAX IF (J.LE.NXP(K)) THEN F(J,K) = R(J,K) ENDIF 30 CONTINUE 40 CONTINUEA similar situation occurs if one or more processors need to skip a loop altogether:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXPMAX=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXPMAX,NBLOK), R(NXPMAX, NBLOK), NXP(NBLOK) DO 70 K = 1, NBLOK IF (NXP(K).LE.NXPMAX/2) THEN DO 50 J = 1, NXP(K) F(J,K) = R(J,K) 50 CONTINUE ELSE DO 60 J = 1, NXP(K) F(J,K) = -R(J,K) 60 CONTINUE ENDIF 70 CONTINUEThe solution is to put all the IF tests inside the inner loop:

PARAMETER (NX=32,NVP=4,MSHARE=1) PARAMETER (NXPMAX=NX/NVP,NBLOK=1+MSHARE*(NVP-1)) DIMENSION F(NXPMAX,NBLOK), R(NXPMAX, NBLOK), NXP(NBLOK) DO 90 K = 1, NBLOK DO 80 J = 1, NXPMAX IF ((NXP(K).LE.NXPMAX/2).AND.(J.LE.NXP(K))) THEN F(J,K) = R(J,K) ELSEIF ((NXP(K).GT.NXPMAX/2).AND.(J.LE.NXP(K))) THEN F(J,K) = -R(J,K) ENDIF 80 CONTINUE 90 CONTINUEThis form is more portable, but less efficient on some computers.

As an illustration, the first simple example can be written in cmFortran as follows:

parameter (nx=32,nvp=4,mshare=1) parameter (nxp=nxnvp,nblok=1+mshare*(nvp-1)) dimension f(nxp,nblok) CMF$ LAYOUT f(:serial,:news) koff = 0 do 10 j = 1, nxp f(j,:) = j + nxp*([1:nblok]+koff-1) 10 continuewhere the index K is replaced by a colon to indicate that all elements K are to be operated on and the brackets indicate an array whose elements are the integers from 1 to nblok. The LAYOUT compiler directive declares that the second index in the array f is distributed across processors, and the first index is not. Although this example works, there is an even simpler way to write this loop using the forall construction:

parameter (nx=32,nvp=4,mshare=1) parameter (nxp=nx/nvp,nblok=1+mshare*(nvp-1)) dimension f(nxp,nblok) CMF$ LAYOUT f(:serial,:news) koff = 0 forall (j=1:nxp,k=1:nblok) f(j,k) = j + nxp*(k + koff - 1)The second example of a summation reduction can be written even more simply, since a summation reduction operator already exists in cmFortran:

parameter (nx=32,nvp=4,mshare=1) parameter (nxp=nx/nvp,nblok=1+mshare*(nvp-1)) dimension f(nxp,nblok) CMF$ LAYOUT f(:serial,:news) fsum = sum(f)The third example using communication can be written simply using the circular shift communication primitive:

parameter (nx=32,nvp=4,mshare=1) parameter (nxp=nx/nvp,nblok=1+mshare*(nvp-1)) dimension f(nxp,nblok), r(nxp,nblok) CMF$ LAYOUT f(:serial,:news), r(:serial,:news) r(1,:) = cshift(f(nxp,:),1,shift=1) r(1,:) = f(1,:) - r(1,:) forall (j=2:nxp,k=1:nblok) r(j,k) = f(j,k) - f(j-1,k)The example with recursion can be written using the technique of inverting the order of the double loops discussed at the beginning of this section:

parameter (nx=32,nvp=4,mshare=1) parameter (nxp=nx/nvp,nblok=1+mshare*(nvp-1)) dimension f(nxp,nblok), n(nxp,nblok), l(nblok) CMF$ LAYOUT f(:serial,:news), n(:serial,:news), l(:news) l = 0 do 20 j = 1, nxp where (f(j,:).gt.bound) l = l + 1 forall (k=1:nblok,f(j,k).gt.bound) n(l(k),k) = j 20 continueFinally, the last example with masks can be written:

parameter (nx=32,nvp=4,mshare=1) parameter (nxpmax=nx/nvp,nblok=1+mshare*(nvp-1)) dimension f(nxpmax,nblok), r(nxpmax,nblok), nxp(nblok) CMF$ LAYOUT f(:serial,:news), r(:serial,news), nxp(:news) forall (j=1:nxpmax,k=1:nblok, 1(nxp(k).le.nxpmax/2).and.(j.le.nxp(k))) f(j,k) = r(j,k) forall (j=1:nxpmax,k=1:nblok, 1(nxp(k).gt.nxpmax/2).and.(j.le.nxp(k))) f(j,k) = -r(j,k)On the Connection Machine, it is best to make NVP as large as possible.

Using these rules we have written a (nearly) portable one dimensional particle-in-cell code, similar to the one described in Ref. [1], using a General Concurrent PIC algorithm, described in Ref. [2]. There were only three subroutines which involved communications, hence had non-portable constructs. They were an FFT subroutine (in six places), a subroutine for passing particles from one processor to another (in four places), and a subroutine for adding and replicating guard cells (in four places). The code has been run successfully run in parallel on the Intel IPSC/860, the Cray C-90, the IBM ES/9000, and a cluster of workstations, and of course, it runs sequentially on a serial computer. In fact, this code could even be run on disk-based systems, where communication calls are replaced by read and writes to disk, but this has not been done.

The value of using the shared memory automatic parallelizing compilers for writing parallel code first became evident while porting the Intel version of the code described in Ref. [2] to the IBM ES/9000. During this conversion, it became clear that the structure of the code on shared and distributed memory computers was the same, except that the outermost subscript was explicit on the shared memory computers, but implicit (i.e., invisible) on the distributed memory computers. Furthermore, the compilers were very useful in finding mistakes in parallelism, and could be debugged serially in a familiar environment. (One just had to trust the compiler that it was safe to run in parallel, one did not actually have to run it in parallel.) Now I have reversed the process, and all new code for distributed memory computers is first written in a shared memory environment with these new compilers. This gets most of the bugs out. Then the code is moved to the distributed memory environment. Only one new kind of bug might be introduced at this point, namely synchronization errors, which I will not discuss here. Another benefit for beginners is that they do not have to struggle with parallel debuggers. For a production code which will run only on a distributed memory computer, one could eliminate the outer loop index at the end.

When writing portable code, one must write for the lowest common denominator and sometimes this results in performance sacrifices for some machines. Examples of this include an extra loop index (which is always 1) when running on a distributed memory machine, an extra step in copying data when a compiler can parallelize with the data as is, or IF tests inside the innermost loop when they could have been placed outside that loop on some computers. Experiences with the particle-in-cell code show that such extra overhead for the critical loops was quite small, a few per cent. The subroutine with the largest overhead (40%) was the distributed 1d real to complex FFT, but little time was spent in this routine. Whether such overhead is important in some other code strongly depends on what the critical loops in that code are. However, programming time is often a very scarce resource, and one will often trade even substantial inefficiencies in computer time to save programmer time.

When all these rules are followed, the code which results look a great deal like cmFortran code written in Fortran 77. Such code is, as one can see from the examples, quite a bit easier to write in cmFortran or Fortran 90 than in Fortran 77. So there is some hope that as Fortran90 or some high performance Fortran style compilers become more prevalent, parallel programming will get easier in the future.

- V. K. Decyk and J. E. Slottow, "Supercomputers in the Classroom," Computers in Physics, vol. 3, no. 2, (1989), p. 50.
- P. C. Liewer and V. K. Decyk, "A General Concurrent Particle-in-Cell Algorithm," J. Computational Phys. 85, 302 (1989).

Rev. 28 Nov 98;