/* Partial MPI library based on Open Transport in the Macintosh OS, using TCP/IP protocol. No local buffering of messages is implemented, so that all messages must be received in the order sent, and receives with wildcard sources are not supported. the following subroutines are implemented: MPI_Init, MPI_Finalize, MPI_Send, MPI_Recv, MPI_Isend, MPI_Irecv MPI_Test, MPI_Wait, MPI_Sendrecv, MPI_Ssend, MPI_Issend, MPI_Waitall MPI_Waitany, MPI_Get_count, MPI_Initialized, MPI_Comm_size MPI_Comm_rank, MPI_Comm_dup, MPI_Comm_split, MPI_Comm_free MPI_Cart_create, MPI_Cart_coords, MPI_Cart_get, MPI_Cart_shift MPI_Cart_rank, MPI_Cart_sub, MPI_Dims_create MPI_Bcast, MPI_Barrier, MPI_Reduce, MPI_Scan MPI_Allreduce, MPI_Gather, MPI_Allgather, MPI_Scatter, MPI_Alltoall MPI_Gatherv, MPI_Allgatherv, MPI_Scatterv, MPI_Alltoallv MPI_Reduce_scatter, MPI_Abort, MPI_Wtime, MPI_Wtick, MPI_Type_extent MPI_Request_free, MPI_Get_processor_name Open Transport is described in Inside Macintosh: Networking with Open Transport, verson 1.3 [Apple Computer, Cupertino, CA, 1997], and at: http://developer.apple.com/techpubs/mac/NetworkingOT/NetworkingWOT-2. html The Message Passing Interface (MPI) is described in the reference, M. Snir, S. Otto, S. Huss-Lederman, D. Walker, and J. Dongarra, MPI: The Complete Reference [MIT Press, Cambridge, MA,1996]. The file MPIerrs is used throughout for error messages written by viktor k. decyk, ucla copyright 1999, regents of the university of california. all rights reserved. no warranty for proper operation of this software is given or implied. software or information may be copied, distributed, and used at own risk; it may not be distributed without this notice included verbatim with each file. update: march 26, 2004 */ #define TARGET_API_MAC_CARBON 1 #include #include #include #include #include "macmpi.h" #include #include #include #include #include #include #include /* MAXS = maximum number of nodes connected */ #define MAXS 32 /* MAXM = maximum number of outstanding messages on a node */ #define MAXM (2*MAXS) /* MAXC = maximum number of communicators */ #define MAXC 10 /* MAXD = maximum number of topology dimensions */ #define MAXD 6 /* LDM = (0,1) = (NO,YES) register AppleTalk port for Launch Den Mother */ #define LDM 0 /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id cfig0 = OTConfiguration structure template epref = array of endpoint references for each participating node ioc = array of context pointers for notifier function nevents = log of unknown notifier events stime = first time stamp if MPI_Init successful mapcomm = communicator map */ static int nproc = -1, idproc; static OTConfigurationRef cfig0 = 0; static EndpointRef epref[MAXS+1]; static int ioc[MAXS+1][4], nevents[MAXS+1], mapcomm[MAXC][MAXS+MAXD+3]; static OTTimeStamp stime; static OTNotifyUPP notifierUPP = 0; /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters header = message envelope rwrec = read/write record for asynchronous messages trash = trash bin for unwanted data mqueue = message request queue */ struct RWRec { EndpointRef ref; int ioflag; void *buf; size_t nbytes; OTFlags flags; void *sbuf; int sln; int len; OTTimeStamp ts[2]; int nextm; int nfatal; }; struct ipdata { OTData first; OTData second; int hdata[4]; }; static int monitor = 1, curreq[MAXM][5], mqueue[MAXS+1][2]; static struct ipdata header[MAXM]; static struct RWRec rwrec[MAXM]; static unsigned char trash[1024]; /* internal common block for message window cpptr = pointer to window structure crect = current drag region nsp = amount of space between boxes nbx = size of box nds = number of message sizes monitored mbs = maximum maximum bandwidth in MB/sec expected */ static WindowPtr cpptr = 0; static Rect crect; static short nsp = 8, nbx = 16, nds = 24, mbs = 10; /* internal common block for adsp nameid = adsp portname ID */ static OTNameID nameid = 0; static FILE *unit2; /* prototypes for internal procedures */ int regport(int portnum); void delport(); static pascal void notifier(void* context, OTEventCode event, OTResult result, void* cookie); long otpinit(int np, TBind *reqad, TBind *repad); long setopts(EndpointRef epref, unsigned long level, unsigned long name, long *value); int ioresult(int *pblock); int checkesc(long stk); void sndmsgf(MPI_Request request, int dest); void rcvmsgf(MPI_Request request, int source); static int imax(int val1, int val2); static int imin(int val1, int val2); static float flmax(float val1, float val2); static float flmin(float val1, float val2); static double dmax(double val1, double val2); static double dmin(double val1, double val2); static void iredux(int *recvbuf, int *sendbuf, int offset, int count, MPI_Op op); static void fredux(float *recvbuf, float *sendbuf, int offset, int count, MPI_Op op); static void dredux(double *recvbuf, double *sendbuf, int offset, int count, MPI_Op op); void writerrs(char *source, int ierror); void rwstat(int request, FILE *unit); void wqueue(FILE *unit); void messwin(int nvp); void logmess(int idp, int lstat, int lsize, int mticks, int tag); void showmess(int idp, int istat, int istyle); void showdism(int ibin, int nbin, int mbin, int lmax, int istyle); void shospeed(float atime, float ctime, float arate, float crate); void Logname(char *name); void Set_Mon(int monval); int Get_Mon(); void delmess(); /* function definitions */ int MPI_Init(int *argc, char ***argv) { /* initialize the MPI execution environment input: argc, argv, output: none local data */ int ierror, nerr, nv, i; int portnum = 0, pshift = 0, pnumid[3], epnum[8]; long oss, response; char *cnerr = 0; OTConfigurationRef cfig = 0; EndpointRef epref0 = 0; TEndpointInfo epinfo; InetInterfaceInfo info; InetHost address, oldadd; char ename[36], myself[18], location[18], fname[18]; InetAddress addrb, addrl, addrn; TBind reqad, repad; TCall scall; OTTimeStamp ptime, delta; MPI_Status stat; FILE *unit3; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id cfig0 = OTConfiguration structure template epref = array of endpoint references for each participating node ioc = array of context pointers for notifier function ioc[i][0] = endpoint reference for notifier for endpoint i ioc[i][1] = processor id for listener for endpoint i ioc[i][2] = handle for current receive from endpoint i ioc[i][3] = handle for current send to endpoint i nevents = log of unknown notifier events stime = first time stamp if MPI_Init successful mapcomm = communicator map mapcomm[i][0:nproc-1] = actual proc id for given rank in communicator i mapcomm[i][nproc:MAXS-1] = MPI_UNDEFINED mapcomm[i][MAXSi] = number of processes in comm i mapcomm[i][MAXS+1] = rank for this node in comm i mapcomm[i][MAXS+2] = ndims = number of dimensions in topology in comm i mapcomm[i][iMAXS+3:MAXS+2+ndims] = size of dimension in comm i, negative if non-periodic mapcomm[i][MAXS+3+ndims:MAXS+2+MAXD] = 0 */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters, see rwstat header = message envelope hdata[i][0] = communicator, hdata[i][1] = tag, hdata[i][2] = datatype hdata[i][3] = length of data (in bytes) for message handle i rwrec = read/write record for asynchronous messages, see rwstat mqueue = message request queue mqueue[i][0] = end of message queue for receives from endpoint i mqueue[i][1] = end of message queue for sends to endpoint i */ /* internal common block for adsp epref0 = endpoint reference used by tcp and adsp providers */ /* Initialize common block data */ for (i = 0; i < MAXS+1; i++) { epref[i] = 0; nevents[i] = 0; for (nv = 0; nv < 2; nv++) mqueue[i][nv] = 0; } for (i = 0; i < MAXC; i++) { for (nv = 0; nv < MAXS+MAXD+3; nv++) mapcomm[i][nv] = 0; } for (i = 0; i < MAXM; i++) { for (nv = 0; nv < 5; nv++) curreq[i][nv] = 0; } /* set MPI_COMM_WORLD and MPI_COMM_SELF mapping */ for (i = 0; i < MAXS; i++) { mapcomm[0][i] = i; mapcomm[1][i] = MPI_UNDEFINED; } /* Open error file */ unit2 = fopen("MPIerrs","w"); if (monitor==2) fprintf(unit2,"MPI_Init started\n"); /* Open Open Transport */ /* get information about the operating environment */ Gestalt(gestaltSystemVersion,&response); /* Check if MacOS 8.6 or higher is installed */ if (response < 2144) { fprintf(unit2,"MacMPI_X requires MacOS 8.6 or higher\n"); nv = response/256; response = response - 256*nv; nerr = response/16; response = response - 16*nerr; sprintf(ename,"%d.%d%d\n",nv,nerr,response); fprintf(unit2,"MacOS detected: %s\n",ename); ierror = -7; return ierror; } /* Initialize Open Transport for use by application */ oss = InitOpenTransportInContext(kInitOTForApplicationMask,NULL); if (oss) { fprintf(unit2,"Cannot Open Transport, oss = %d\n",oss); ierror = oss; return ierror; } /* Obtain the current time stamp */ OTGetTimeStamp(&stime); /* Create a structure defining a provider's configuration */ cfig0 = OTCreateConfiguration("tcp"); if (!cfig0) { fprintf(unit2,"Insufficient Memory for tcp Endpoint\n"); ierror = -4; CloseOpenTransportInContext(NULL); return ierror; } else if (cfig0==((OTConfigurationRef)-1L)) { fprintf(unit2,"Invalid Configuration for tcp Endpoint\n"); ierror = -4; CloseOpenTransportInContext(NULL); return ierror; } /* Copy an OTConfiguration structure */ cfig = OTCloneConfiguration(cfig0); /* Create a synchronous endpoint provider */ epref0 = OTOpenEndpointInContext(cfig,0,&epinfo,&oss,NULL); /* Obtain information about the Internet environment */ oss = OTInetGetInterfaceInfo(&info,kDefaultInetInterface); if (oss) { fprintf(unit2,"INS GetInterfaceInfo Error, oss = %d\n",oss); if (oss==kOTNotFoundErr) { fprintf(unit2,"Requested information does not exist\n"); fprintf(unit2,"TCP/IP may not be active\n"); } ierror = oss; oss = OTCloseProvider(epref0); CloseOpenTransportInContext(NULL); return ierror; } /* Close the dummy endpoint provider */ oss = OTCloseProvider(epref0); address = info.fAddress; oldadd = address; /* Everyone opens a port */ /* Open file containing portnum (and possibly participating nodes) first line in nodelist file on all nodes contains common portnum if the file is missing or empty, a default number of 5013 is used */ unit3 = fopen("nodelist_ip","r"); if (!unit3) unit3 = fopen("nodelist","r"); if (unit3) { cnerr = fgets(location,11,unit3); if (!cnerr) strcpy(location,"5013"); /* Replace trailing newlines with nulls */ cnerr = strchr(location,'\n'); if (cnerr) cnerr[0] = '\0'; if (location[0]=='\0') strcpy(location,"5013"); } else strcpy(location,"5013"); /* Convert string to integer */ nerr = sscanf(location,"%d",&portnum); if ((!nerr) || (portnum < 5000) || (portnum > 49152)) portnum = 5013; /* Construct InetAddress of listener */ addrn.fAddressType = AF_INET; addrn.fPort = portnum; addrn.fHost = info.fAddress; /* Convert an address into a character string */ OTInetHostToString(addrn.fHost,myself); strcpy(ename,myself); /* Establish port connection end */ /* Set TNetbuf for specifying address */ reqad.addr.len = 16; reqad.addr.buf = (unsigned char *)&addrn; reqad.qlen = 1; /* Set TNetbuf for address which is returned */ repad.addr.maxlen = 16; repad.addr.buf = (unsigned char *)&addrb; /* Set TNetbuf for specifying address for local computer */ scall.addr.len = 16; scall.addr.buf = (unsigned char *)&addrn; nproc = 0; /* Create Univeral Proc Ptr for notifier function */ notifierUPP = NewOTNotifyUPP(¬ifier); /* Initialize synchronous listener endpoint provider */ oss = otpinit(MAXS,&reqad,&repad); if (oss) { if (oss==kOTNoAddressErr) fprintf(unit2,"portnum likely already exists = %d\n",portnum); ierror = oss; nerr = MPI_Finalize(); return ierror; } /* Save copy of portnum */ pnumid[0] = addrn.fPort; /* Register adsp endpoint for puppy */ if (LDM) { ierror = regport(portnum); if (ierror) { fprintf(unit2,"Unable to register adsp port for puppy\n"); /* nerr = MPI_Finalize(); */ } } /* Pass processor id to listener notifier */ ioc[MAXS][1] = nproc + 1; /* debug */ if (monitor==2) { fprintf(unit2,"local host=%s,port=%d\n",myself,addrb.fPort); } /* Determine if node is master (idproc=0) or slave (idproc>0). on the master node, the second and subsequent lines of nodelist file contain IP addresses of the nodes participating, in dotted-decimal format (for example, "12.13.14.15"). if this list of nodes is missing, then the node is a slave. every node also makes a connection to itself. */ if (unit3) { cnerr = fgets(location,17,unit3); } /* Must be slave */ if (!unit3 || !cnerr) idproc = 1; else { /* Replace trailing newlines with nulls */ cnerr = strchr(location,'\n'); if (cnerr) cnerr[0] = '\0'; /* Must be slave */ if (location[0]=='\0') idproc = 1; /* May be master */ else { if (((!strcmp(location,"self")) || (!strcmp(location,myself))) && (pnumid[0]==portnum)) { idproc = 0; /* Delete file if empty */ if (!fseek(unit2,0,SEEK_END)) { i = ftell(unit2); fclose(unit2); if (!i) remove("MPIerrs"); } sprintf(fname,"MPIerrs%.2d",idproc); unit2 = fopen(fname,"w"); } else /* Must be slave */ idproc = 1; } } /* Update port number */ portnum = pnumid[0]; /* * * * begin main iteration loop * * * Prepare to accept connection */ /* Establish connection end */ /* Connections to oneself requires a new socket */ L10: addrb.fPort = kOTAnyInetAddress; /* Set TNetbuf for specifying address */ reqad.addr.len = 16; reqad.addr.buf = (unsigned char *)&addrb; reqad.qlen = 0; /* Set TNetbuf for address which is returned */ repad.addr.maxlen = 16; repad.addr.buf = (unsigned char *)&addrl; /* Initialize synchronous endpoint provider */ oss = otpinit(nproc,&reqad,&repad); if (oss) { ierror = oss; nerr = MPI_Finalize(); return ierror; } /* For connections to oneself, jump to OTConnect */ if (idproc==nproc) { ioc[MAXS][1] = MAXS+1; goto L70; } /* Obtain the current time stamp */ OTGetTimeStamp(&ptime); /* Wait for connection */ L20: if (ioresult(ioc[MAXS]) > 0) { if (checkesc(1)) { ierror = -9; writerrs("MPI_Init: ",ierror); return ierror; } /* Wait up to one minute for next connection */ else { if (OTElapsedMilliseconds(&ptime) < 60000) goto L20; else { fprintf(unit2,"OTListen Wait Time Exceeded\n"); ierror = -5; nerr = MPI_Finalize(); return ierror; } } } else { oss = ioresult(ioc[MAXS]); if (oss) { fprintf(unit2,"OTListen failed, oss=%d\n",oss); ierror = oss; nerr = MPI_Finalize(); return ierror; } } /* Receive portnum for verification and processor id from remote node */ nproc += 1; mapcomm[0][MAXS] = nproc; /* Reset iocompletion flag to notifier */ ioc[MAXS][1] = nproc + 1; ierror = MPI_Recv(epnum,3,MPI_INT,nproc-1,3,0,&stat); /* Extract processor id on first connection */ if (nproc==1) { idproc = epnum[1]; pshift = portnum - epnum[2]; portnum = epnum[2]; mapcomm[0][MAXS+1] = idproc; /* Delete file if empty */ if (!fseek(unit2,0,SEEK_END)) { i = ftell(unit2); fclose(unit2); if (!i) remove("MPIerrs"); } sprintf(fname,"MPIerrs%.2d",idproc); unit2 = fopen(fname,"w"); } /* Check if remote portnum agrees with local portnum */ nerr = pnumid[0] - epnum[0]; /* Send reject flag to remote node */ ierror = MPI_Send(&nerr,1,MPI_INT,nproc-1,4,0); /* Reject if portnums disagree */ if (nerr) { fprintf(unit2,"Session rejected, idproc = %d\n",idproc); fprintf(unit2,"Portnums do not agree\n"); ierror = 5; fprintf(unit2,"remote portnum = %d\n",epnum[0]); nerr = MPI_Finalize(); return ierror; } /* Check for processor number overflow */ if (nproc > MAXS) { fprintf(unit2,"processor number overflow, nproc = %d\n",nproc); ierror = 6; nerr = MPI_Finalize(); return ierror; } /* debug */ if (monitor==2) fprintf(unit2,"connection accepted with idproc=%d\n",nproc-1); /* Accept more connections */ if (idproc >= nproc) goto L10; /* Master prepares to start connection */ /* Find internet address of requested node */ /* Convert a character string into an address */ L40: oss = OTInetStringToHost(location,&address); if (oss) { fprintf(unit2,"OTInetStringToHost failed, oss = %d\n",oss); if (oss==kOTBadAddressErr) { fprintf(unit2,"Invalid IP address = %s\n",location); fprintf(unit2,"Invalid nodelist file possibly being used\n"); } ierror = oss; nerr = MPI_Finalize(); return ierror; } /* second cpu on a node uses incremented port number */ if (address==oldadd) pshift = pshift + 1; else { pshift = 0; oldadd = address; } pnumid[0] = portnum + pshift; addrn.fAddressType = AF_INET; addrn.fPort = pnumid[0]; addrn.fHost = address; /* Convert an address into a character string */ OTInetHostToString(addrn.fHost,ename); /* Establish connection end */ /* Create a new socket for this endpoint */ addrb.fPort = kOTAnyInetAddress; /* Set TNetbuf for specifying address */ reqad.addr.len = 16; reqad.addr.buf = (unsigned char *)&addrb; reqad.qlen = 0; /* Set TNetbuf for address which is returned */ repad.addr.maxlen = 16; repad.addr.buf = (unsigned char *)&addrl; /* Initialize synchronous endpoint provider */ oss = otpinit(nproc,&reqad,&repad); if (oss) { ierror = oss; nerr = MPI_Finalize(); return ierror; } /* Set TNetbuf for specifying address for remote computer */ scall.addr.len = 16; scall.addr.buf = (unsigned char *)&addrn; /* Set TNetbuf for specifying options */ L70: scall.opt.len = 0; scall.opt.buf = 0; /* Set TNetbuf for specifying data */ scall.udata.len = 0; scall.udata.buf = 0; /* debug */ if (monitor==2) fprintf(unit2,"requesting connection with =%s,%d\n",ename,pnumid[0]); /* Pause to let OS get some time */ if (checkesc(1)) { ierror = -9; writerrs("MPI_Init: ",ierror); return ierror; } /* Obtain the current time stamp */ OTGetTimeStamp(&ptime); OTGetTimeStamp(&delta); /* Request a connection to a remote peer */ oss = OTConnect(epref[nproc],&scall,0); /* Wait for connection */ L80: if (ioresult(ioc[nproc]) > 0) { if (checkesc(1)) { ierror = -9; writerrs("MPI_Init: ",ierror); return ierror; } /* Wait up to one minute for next connection */ else { if (OTElapsedMilliseconds(&ptime) < 60000) { response = OTGetEndpointState(epref[nproc]); /* Try again every second if no response */ if ((response < 5) && (OTElapsedMilliseconds(&delta) > 1000)) { oss = OTConnect(epref[nproc],&scall,0); /* Obtain the current time stamp */ OTGetTimeStamp(&delta); } goto L80; } else { fprintf(unit2,"OTConnect Wait Time Exceeded\n"); fprintf(unit2,"Trying to start location = %s,%d\n",ename,pnumid[0]); ierror = -6; nerr = MPI_Finalize(); return ierror; } } } else { oss = ioresult(ioc[nproc]); if (oss) { fprintf(unit2,"OTConnect Error, oss = %d\n",oss); fprintf(unit2,"Trying to start location = %s,%d\n",ename,pnumid[0]); if (oss==kOTOutStateErr) { fprintf(unit2,"Endpoint not in an appropriate state\n"); fprintf(unit2,"Remote node may be unknown or inaccessible\n"); } ierror = oss; nerr = MPI_Finalize(); return ierror; } } /* debug */ if (monitor==2) fprintf(unit2,"tentative connection started with =%s,%d\n",ename,pnumid[0]); /* Send portnum for verification and processor id to remote node */ nerr = nproc; nproc += 1; mapcomm[0][MAXS] = nproc; if (nerr > idproc) { /* Set processor id */ pnumid[1] = nerr; pnumid[2] = portnum; ierror = MPI_Send(pnumid,3,MPI_INT,nproc-1,3,0); /* Read and check reject flag */ ierror = MPI_Recv(&nerr,1,MPI_INT,nproc-1,4,0,&stat); if (nerr) { fprintf(unit2,"Connection rejected, reject info = %d\n",nerr); fprintf(unit2,"Portnums do not agree, idproc = %d\n",nproc-1); ierror = 12; nerr = MPI_Finalize(); return ierror; } } /* Check for processor number overflow */ if (nproc > MAXS) { fprintf(unit2,"processor number overflow, nproc = %d\n",nproc); ierror = 6; nerr = MPI_Finalize(); return ierror; } /* debug */ if (monitor==2) fprintf(unit2,"connection confirmed with idproc=%d\n",nproc-1); /* Pass current location to next node */ if (nproc > (idproc+2)) nerr = MPI_Send(location,4,MPI_INT,idproc+1,1,0); /* Read location of next node from file */ if (idproc==0) { if ((nproc >= 2) || (!strcmp(location,"self")) || (!strcmp(location,myself))) { if (!unit3) goto L90; cnerr = fgets(location,17,unit3); if (!cnerr) goto L90; /* Replace trailing newlines with nulls */ cnerr = strchr(location,'\n'); if (cnerr) cnerr[0] = '\0'; if (location[0]=='\0') goto L90; } } /* Receive location of next node from another processor */ else { nerr = MPI_Recv(location,4,MPI_INT,idproc-1,1,0,&stat); /* End of file marker received */ if (stat.len==0) goto L90; } /* Start another connection */ goto L40; /* * * * end main iteration loop * * * */ /* All expected nodes activated */ L90: nv = nproc - 1; /* debug */ if (monitor==2) fprintf(unit2,"all nodes activated, idproc, nproc=%d,%d\n", idproc,nproc); /* Send null record to next processor */ if (idproc < nv) nerr = MPI_Send(location,0,MPI_INT,idproc+1,1,0); if (unit3) fclose(unit3); /* Check number of processors */ if (idproc==nv) { for (i = 1; i <= nv; i++) { nerr = MPI_Send(&nproc,1,MPI_INT,nv-i,2,0); } } else { nerr = MPI_Recv(&response,1,MPI_INT,nv,2,0,&stat); /* Local processor does not agree with last processor on total number */ if (response != nproc) { fprintf(unit2,"processor number error, local/remote nproc = %d,%d\n", nproc,response); ierror = 7; nerr = MPI_Finalize(); return ierror; } } /* Clear unused MPI_COMM_WORLD mapping */ for (i = nproc; i < MAXS; i++) { mapcomm[0][i] = MPI_UNDEFINED; } mapcomm[0][MAXS+2] = 0; /* set MPI_COMM_SELF */ mapcomm[1][0] = idproc; mapcomm[1][MAXS] = 1; mapcomm[1][MAXS+1] = 0; mapcomm[1][MAXS+2] = 0; /* Create window for showing MPI message status */ if (monitor > 0) { messwin(nproc); checkesc(1); if (monitor==2) fprintf(unit2,"MPI_Init complete\n\n"); } /* Set error code to success */ ierror = 0; return ierror; } int regport(int portnum) { /* Finds name of current computer by first finding network address of current computer, then finding a registered entity with type name "PPCToolBox" which resides at that address. The object name corresponding to that entity is the current computer name. This only works if the user has set Program Linking on in the File Sharing control panel. If computer name is found, then a portname is created whose entity name consists of object = computer name, type = portnum If it already exists, it is first deleted, then registered. local data */ #include /* kATalkFullSelfSend = command to turn on classic ADSP SelfSend */ #define kATalkFullSelfSend (MIOC_CMD(MIOC_ATALK,47)) int ierror, address, nodeid, nv, i; long oss; unsigned char selfsend = 1; unsigned char *ucptr = 0; OTConfigurationRef cfig = 0; ATSvcRef aref = 0; TNetbuf info; struct AppleTalkInfo atinfo; MapperRef mref = 0; TLookupRequest req; char ename[36]; unsigned char rbuff[1888]; TLookupReply rep; TLookupBuffer *club; DDPAddress addrb; NBPEntity aname; NBPAddress addrn; TRegisterRequest treq; TRegisterReply trep; /* internal common block for adsp nameid = adsp portname ID */ ierror = 0; /* Open a synchronous AppleTalk service provider */ aref = OTOpenAppleTalkServicesInContext(kDefaultAppleTalkServicesPath,0,&oss,NULL); if (!aref) { fprintf(unit2,"Invalid ATS, oss=%d\n",oss); ierror = oss; return ierror; } /* Set TNetbuf to receive AppleTalk Info */ info.maxlen = sizeof(struct AppleTalkInfo); info.len = 0; info.buf = (unsigned char *)&atinfo; /* Obtain information about the AppleTalk environment */ oss = OTATalkGetInfo(aref,&info); if (oss) { fprintf(unit2,"ATS GetInfo Error, oss = %d\n",oss); ierror = oss; OTCloseProvider(aref); return ierror; } address = atinfo.fOurAddress.fNetwork; nodeid = atinfo.fOurAddress.fNodeID; /* Close a provider of any type */ oss = OTCloseProvider(aref); if (oss) fprintf(unit2,"Close ATS Provider Error, oss = %d\n",oss); /* Create a structure defining a provider's configuration */ cfig = OTCreateConfiguration("nbp"); if (!cfig) { fprintf(unit2,"Insufficient Memory for nbp Mapper\n"); ierror = -4; return ierror; } else if (cfig==((OTConfigurationRef)-1L)) { fprintf(unit2,"Invalid Configuration for nbp Mapper\n"); ierror = -4; return ierror; } /* Create a synchronous mapper provider */ mref = OTOpenMapperInContext(cfig,0,&oss,NULL); if (!mref) { fprintf(unit2,"Invalid NBP Mapper, oss=%d\n",oss); ierror = oss; OTDestroyConfiguration(cfig); return ierror; } /* Enable ADSP SelfSend */ /* Send a module-specific command to Open Transport protocol module */ oss = OTIoctl(mref,kATalkFullSelfSend,(void *)selfsend); if (oss < 0) fprintf(unit2,"OTIoctl SelfSend Error, oss=%d\n",oss); /* Save old value */ else selfsend = oss; /* Set a provider to wait or block until function can complete */ oss = OTSetBlocking(mref); if (oss) { fprintf(unit2,"NBP Mapper Set Blocking Error, oss = %d\n",oss); ierror = oss; OTCloseProvider(mref); return ierror; } /* Look for names with type "PPCToolBox" */ strcpy(ename,"=:PPCToolBox@*"); /* Set TNetbuf for name to be looked up */ req.name.len = 14; req.name.buf = (unsigned char *)ename; /* Set TNetbuf to use defaults for address search */ req.addr.len = 0; req.addr.buf = 0; /* Set number of names you expect to be returned */ req.maxcnt = 16; /* Set timeout to default */ req.timeout = 0; req.flags = 0; /* Set TNetbuf for names which are found */ rep.names.maxlen = 1888; rep.names.buf = &rbuff[0]; /* Find all addresses that correspond to a particular name or pattern */ oss = OTLookupName(mref,&req,&rep); if (oss) { fprintf(unit2,"OTLookup Error, rspcount, oss = %d,%d\n", rep.rspcount,oss); if (oss==kOTNoDataErr) fprintf(unit2,"Cannot find PPCToolBox on any computer\n"); ierror = oss; } else { nv = 0; i = 1; L5: club = (TLookupBuffer *)&rbuff[nv]; /* Get network address of found entity */ addrb = *(DDPAddress *)&club->fAddressBuffer[0]; /* Check if network and nodeid agrees */ if ((addrb.fNetwork != address) || (addrb.fNodeID != nodeid)) { i += 1; if (i > rep.rspcount) { fprintf(unit2,"Cannot find PPCToolBox on current computer\n"); ierror = 8; } /* Look up more addresses */ else { nv += ((club->fAddressLength-1)/4 + (club->fNameLength-1)/4 + 3)*4; goto L5; } } /* Found correct address */ else ierror = 0; } if (ierror) { fprintf(unit2,"Program Linking may not be enabled\n"); OTCloseProvider(mref); return ierror; } /* Extract object name */ nv = club->fAddressLength; ucptr = (unsigned char *)&club->fAddressBuffer[nv]; nv = club->fNameLength; OTSetNBPEntityFromAddress(&aname,ucptr,nv); /* Check if port name is already being used */ sprintf(ename,"%d",portnum); /* Set the type part of an NBP entity structure */ OTSetNBPType(&aname,ename); /* Set the zone part of an NBP entity structure */ OTSetNBPZone(&aname,"*"); /* Store an NBP entity structure as an NBP address string */ OTSetAddressFromNBPEntity(addrn.fNBPNameBuffer,&aname); addrn.fAddressType = 258; /* Obtain the size of an NBP entity structure formatted as string */ nv = OTGetNBPEntityLengthAsAddress(&aname); addrn.fNBPNameBuffer[nv] = '\0'; /* Set TNetbuf for name to be deleted */ info.len = nv; info.buf = addrn.fNBPNameBuffer; /* Remove a previously registered entity name */ oss = OTDeleteName(mref,&info); if (!oss) fprintf(unit2,"closed old adsp port: %s\n",addrn.fNBPNameBuffer); /* Set TNetbufs for name to be registered */ treq.name.len = nv; treq.name.buf = addrn.fNBPNameBuffer; treq.addr.len = 0; treq.addr.buf = 0; treq.flags = 0; trep.addr.maxlen = 8; trep.addr.buf = (unsigned char *)&addrb; trep.nameid = 0; /* Register a name on the network */ oss = OTRegisterName(mref,&treq,&trep); nameid = trep.nameid; if (oss) fprintf(unit2,"OTRegisterName Error, oss = %d\n",oss); /* Close a provider of any type */ oss = OTCloseProvider(mref); if (oss) fprintf(unit2,"Close NBP Provider Error, oss = %d\n",oss); return ierror; #undef kATalkFullSelfSend } void delport() { /* this subroutine deletes adsp portname local data */ long oss; OTConfigurationRef cfig = 0; MapperRef mref = 0; /* internal common block for adsp nameid = adsp portname ID */ /* Create a structure defining a provider's configuration */ cfig = OTCreateConfiguration("nbp"); if (!cfig) { fprintf(unit2,"delport: Insufficient Memory for nbp Mapper\n"); return; } else if (cfig==((OTConfigurationRef)-1L)) { fprintf(unit2,"delport: Invalid Configuration for nbp Mapper\n"); return; } /* Create a synchronous mapper provider */ mref = OTOpenMapperInContext(cfig,0,&oss,NULL); if (!mref) { fprintf(unit2,"delport: Invalid NBP Mapper, oss=%d\n",oss); OTDestroyConfiguration(cfig); return; } /* Set a provider to wait or block until function can complete */ oss = OTSetBlocking(mref); if (oss) { fprintf(unit2,"delport: NBP Mapper Set Blocking Error, oss=%d\n",oss); OTCloseProvider(mref); return; } /* Remove a previously registered name as specified by its name ID */ oss = OTDeleteNameByID(mref,nameid); if (oss) fprintf(unit2,"OTDeleteNameByID Error, oss = %d\n",oss); /* Close a provider of any type */ oss = OTCloseProvider(mref); if (oss) fprintf(unit2,"delport: Close NBP Provider Error, oss = %d\n",oss); return; } static pascal void notifier(void* context, OTEventCode event, OTResult result, void* cookie) { /* notifier function for asynchronous and completion events local data */ int n, m; long oss, tryagn = 0; InetAddress addrb; TCall call; /* internal mpi common block epref = array of endpoint references for each participating node nevents = log of unknown notifier events */ /* internal common block for non-blocking messages rwrec = read/write record for asynchronous messages trash = trash bin for unwanted data mqueue = message request queue */ /* Get reference to endpoint */ n = *(int *)context; /* Check event */ L10: switch (event) { case T_LISTEN: { /* Set TNetbuf for address which is returned */ call.addr.maxlen = 16; call.addr.buf = (unsigned char *)&addrb; /* Set TNetbuf for options which are returned */ call.opt.maxlen = 0; call.opt.buf = 0; /* Set TNetbuf for data which is returned */ call.udata.maxlen = 0; call.udata.buf = 0; /* Listen for an incoming connection request */ oss = OTListen(epref[n],&call); if (oss) /* Set completion flag to error */ *((int *)context+1) = oss; else { m = *((int *)context+1); /* Accept an incoming connection request */ oss = OTAccept(epref[n],epref[m-1],&call); } return; } case T_CONNECT: { /* Read the status of an asynchronous call to OTConnect */ oss = OTRcvConnect(epref[n],0); /* Set completion flag */ *((int *)context+1) = oss; return; } case T_DISCONNECT: { /* Identify cause of connection rejection and clear event */ oss = OTRcvDisconnect(epref[n],0); /* Set completion flag */ /* *((int *)context+1) = oss; */ /* Read pointer to current read record */ m = *((int *)context+2) - 1; /* Set iocompletion flag, if there is a pending read */ if ((m >= 0) && (m < MAXM)) rwrec[m].ioflag = -2; /* Read pointer to current send record */ m = *((int *)context+3) - 1; /* Set iocompletion flag, if there is a pending send */ if ((m >= 0) && (m < MAXM)) rwrec[m].ioflag = -2; return; } case T_DISCONNECTCOMPLETE: { /* Set completion flag */ /* *((int *)context+1) = oss; */ return; } case T_ACCEPTCOMPLETE: { *((int *)context+1) = result; return; } case T_PASSCON: { /* Set completion flag */ *((int *)context+1) = 0; return; } case T_ORDREL: { /* Reset event to complete OTLookErr handling */ if (tryagn) event = tryagn; /* Clear an incoming orderly disconnect event */ oss = OTRcvOrderlyDisconnect(epref[n]); /* Set completion flag */ /* *((int *)context+1) = oss; */ /* Read pointer to current read record */ m = *((int *)context+2) - 1; /* Set iocompletion flag, if there is a pending read */ if ((m >= 0) && (m < MAXM)) rwrec[m].ioflag = -1; /* Read pointer to current send record */ m = *((int *)context+3) - 1; /* Set iocompletion flag, if there is a pending send */ if ((m >= 0) && (m < MAXM)) rwrec[m].ioflag = -1; /* Obtain the current state of an endpoint */ oss = OTGetEndpointState(epref[n]); if (oss==T_INREL) /* Send a module-specific command to Open Transport protocol module */ oss = OTIoctl(epref[n],I_FLUSH,(void *)FLUSHRW); break; } case kStreamIoctlEvent: { /* Initiate an orderly disconnect */ oss = OTSndOrderlyDisconnect(epref[n]); /* Set completion flag */ *((int *)context+1) = oss; return; } case T_DATA: { tryagn = 0; /* Read pointer to current read record */ m = *((int *)context+2) - 1; /* Read data sent from a remote peer */ if ((m >= 0) && (m < MAXM)) { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[m].ts[1]); L20: oss = OTRcv(rwrec[m].ref,rwrec[m].buf,rwrec[m].nbytes, &rwrec[m].flags); /* Unexpected flag returned */ if (rwrec[m].flags > 1) { rwrec[m].ioflag = -3; /* Clear pointer to current read record */ *((int *)context+2) = 0; } /* Process data which arrived */ else if (oss >= 0) { /* Clear more flag */ rwrec[m].flags = 0; /* Clear non-fatal error code */ rwrec[m].nfatal = 0; /* Set actual length received */ rwrec[m].len += oss; /* Check if all the data has arrived */ if (rwrec[m].len < header[m].hdata[3]) { /* Incomplete message */ if (rwrec[m].nbytes > oss) { /* Readjust buffer pointer */ rwrec[m].buf = (void *)(((unsigned char *)rwrec[m].buf) + oss); rwrec[m].nbytes -= oss; if (rwrec[m].len >= 0) rwrec[m].ioflag += 1; } /* Header is received, readjust parameters to receive data */ else if (rwrec[m].ioflag==1) { rwrec[m].buf = rwrec[m].sbuf; rwrec[m].nbytes = imin(header[m].hdata[3],rwrec[m].sln); rwrec[m].ioflag = 2; } /* Data is received and buffer is full */ else { rwrec[m].buf = &trash; rwrec[m].nbytes = 1024; } goto L20; } /* Message complete */ else { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[m].ts[1]); /* Set iocompletion flag */ rwrec[m].ioflag = 0; /* Get next message if messages are queued */ if (rwrec[m].nextm > 0) { m = rwrec[m].nextm; if (m==mqueue[n][0]) mqueue[n][0] = 0; *((int *)context+2) = m; m -= 1; goto L20; } /* Clear pointer to current send record */ else *((int *)context+2) = 0; } } /* Check for errors */ else { /* Quit if no data is available */ if (oss==kOTNoDataErr) return; /* Determine cause of a kOTLookErr */ else if (oss==kOTLookErr) { event = OTLook(rwrec[m].ref); if ((event==T_GODATA) || (event==T_ORDREL)) { tryagn = T_DATA; break; } /* Store unprocessed event returned by OTLookErr */ else nevents[n] = event; } /* Store non-fatal error code */ else if (oss==kENOMEMErr) { rwrec[m].nfatal = oss; return; } /* Set iocompletion flag to error */ rwrec[m].ioflag = oss; *((int *)context+2) = 0; } } return; } case T_GODATA: { /* Read pointer to current send record */ m = *((int *)context+3) - 1; /* Reset event to complete OTLookErr handling */ if (tryagn) event = tryagn; /* Send data to a remote peer */ if ((m >= 0) && (m < MAXM)) { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[m].ts[1]); L30: oss = OTSnd(rwrec[m].ref,rwrec[m].buf,rwrec[m].nbytes, rwrec[m].flags); /* Process data which has been sent */ if (oss >= 0) { /* Clear non-fatal error code */ rwrec[m].nfatal = 0; /* Set actual length sent */ rwrec[m].len += oss; /* Check for incomplete header */ if (rwrec[m].len < 0) { header[m].first.fData = (void *)(((unsigned char *)header[m].first.fData) + oss); header[m].first.fLen -= oss; goto L30; } /* Check for incomplete data */ else if (rwrec[m].sln > rwrec[m].len) { /* Header is sent, readjust parameters to send data */ if (rwrec[m].ioflag==1) { rwrec[m].buf = rwrec[m].sbuf; rwrec[m].nbytes = rwrec[m].sln; oss -= header[m].first.fLen; } /* Readjust buffer pointer */ rwrec[m].buf = (void *)(((unsigned char *)rwrec[m].buf) + oss); rwrec[m].nbytes -= oss; rwrec[m].ioflag += 1; goto L30; } /* Data is sent */ else { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[m].ts[1]); /* Set iocompletion flag */ rwrec[m].ioflag = 0; /* Get next message if messages are queued */ if (rwrec[m].nextm > 0) { m = rwrec[m].nextm; if (m==mqueue[n][1]) mqueue[n][1] = 0; *((int *)context+3) = m; m -= 1; goto L30; } /* Clear pointer to current send record */ else *((int *)context+3) = 0; } } /* Check for errors */ else { /* Quit if no data can be sent */ if (oss==kOTFlowErr) break; /* Determine cause of a kOTLookErr */ else if (oss==kOTLookErr) { event = OTLook(rwrec[m].ref); /* Store unprocessed event returned by OTLookErr */ nevents[n] = event; } /* Store non-fatal error code */ else if (oss==kENOMEMErr) { rwrec[m].nfatal = oss; break; } /* Set iocompletion flag to error */ rwrec[m].ioflag = oss; *((int *)context+3) = 0; } } break; } default: { /* unknown event */ nevents[n] = event; return; } } if (tryagn) goto L10; return; } long otpinit(int np, TBind *reqad, TBind *repad) { /* this subroutine initializes an Open Transport provider for index np, using address specified in reqad. The cfig0 OTConfiguration template is assumed to be already created provider is left in asynchronous, blocking mode np = index to endpoint reference array reqad = address to which endpoint is to be bound repad = address to which endpoint is actually bound returns OSStatus indicator input: np, reqad local data */ int i; long oss, value; OTConfigurationRef cfig; TEndpointInfo epinfo; InetAddress *addrn; /* internal mpi common block cfig0 = OTConfiguration structure template epref = array of endpoint references for each participating node ioc = array of context pointers for notifier function */ oss = 0; /* Copy an OTConfiguration structure */ cfig = OTCloneConfiguration(cfig0); /* Create a synchronous endpoint provider */ epref[np] = OTOpenEndpointInContext(cfig,0,&epinfo,&oss,NULL); if (!epref[np]) { fprintf(unit2,"Invalid Endpoint, oss=%d\n",oss); OTDestroyConfiguration(cfig);; return oss; } /* Set a provider to wait or block until function can complete */ oss = OTSetBlocking(epref[np]); if (oss) { fprintf(unit2,"Endpoint Set Blocking Error, oss = %d\n",oss); return oss; } /* set TCP_NODELAY option */ value = 1; oss = setopts(epref[np],INET_TCP,TCP_NODELAY,&value); if (oss) fprintf(unit2,"TCP_NODELAY OTOptionManagement Error, oss = %d\n",oss); else if (!value) fprintf(unit2,"Info: TCP_NODELAY was not set\n"); /* Install a notifier function */ oss = OTInstallNotifier(epref[np],notifierUPP,&ioc[np]); if (oss) { fprintf(unit2,"OTInstall Notifier Error, oss = %d\n",oss); return oss; } i = 0; addrn = (InetAddress *)reqad->addr.buf; /* Assign an address to an endpoint */ L10: oss = OTBind(epref[np],reqad,repad); if (oss) { if ((addrn->fPort) && (i < 16)) { addrn->fPort += 1; i += 1; goto L10; } fprintf(unit2,"OTBind Error, oss = %d\n",oss); if (oss==kOTNoAddressErr) fprintf(unit2,"unable to allocate address\n"); return oss; } /* Pass processor epref index and iocompletion to notifier */ ioc[np][0] = np; ioc[np][1] = 1; /* Clear read and send record pointers */ ioc[np][2] = 0; ioc[np][3] = 0; /* Set a provider to asynchronous mode */ oss = OTSetAsynchronous(epref[np]); return oss; } long setopts(EndpointRef epref, unsigned long level, unsigned long name, long *value) { /* This function sets value of option local data */ long oss; TOption option; TOptMgmt req, ret; /* Set option structure */ option.len = 20; option.level = level; option.name = name; option.status = 0; option.value[0] = *value; /* Set OptMgmt request structure */ req.opt.len = 20; req.opt.buf = (unsigned char *)&option; req.flags = T_NEGOTIATE; /* Set OptMgmt reply structure */ ret.opt.maxlen = 20; ret.opt.buf = (unsigned char *)&option; /* Determine an endpoint's option values or change the values */ oss = OTOptionManagement(epref,&req,&ret); if (!oss) { if ((option.status==T_SUCCESS)) *value = option.value[0]; else { oss = option.status; if (oss==T_NOTSUPPORT) fprintf(unit2,"Option Not Supported\n"); } } else { if (oss==kOTBufferOverflowErr) fprintf(unit2,"Option reply buffer is too small\n"); } return oss; } int ioresult(int *pblock) { /* this function returns ioResult for asynchronous procedures input: pblock */ return pblock[1]; } int checkesc(long stk) { /* this procedure allows user to abort a procedure by checking for escape, Cmd-. or Ctrl-C keystrokes. Calling EventAvail also permits an idle procedure to time-share and checks for Quit Events returns true if an escape event occurred. recent keyboard events not processed are not removed from event queue stk = maximum number of sleepTicks (sixtieths of a second) that application agrees to relinquish the processor if no events are pending for it. input: stk local data */ /* myEventMask looks for mouse, keyboard, and quit events */ short myEventMask = 1086; EventRecord event; int key, checkesc = 0, nvp; WindowPtr which; static OTTimeStamp times; /* internal common block for message window cpptr = pointer to window structure crect = current drag region */ /* if monitor window is open, look for update events also */ if (cpptr) myEventMask += 64; /* get an event without removing it from the queue */ if (EventAvail(myEventMask,&event)) { if ((event.what==keyDown) || (event.what==autoKey)) { /* check for escape key */ key = event.message - 256*(event.message/256); if (key==27) checkesc = 1; /* check for Cmd-. */ else if (key==46) { if ((event.modifiers/256) != (2*(event.modifiers/512))) checkesc = 1; } /* check for Ctrl-C */ else if (key==3) checkesc = 1; /* remove processed or keyboard event more than 3 seconds old */ if (checkesc || ((TickCount()-event.when) > 180)) /* remove next available event */ GetNextEvent(myEventMask,&event); } /* check for 'QuitApplication' Apple Event */ else if (event.what==kHighLevelEvent) { /* remove high level events more than 5 seconds old */ if ((TickCount()-event.when) > 300) /* remove next available event */ GetNextEvent(myEventMask,&event); if ((event.where.v=='qu') && (event.where.h=='it')) { fprintf(unit2,"Quit Application Apple Event received\n"); checkesc = 1; } } /* check for update events */ else if (event.what==updateEvt) { /* get window pointer */ if (cpptr==(WindowPtr)event.message) { /* remove next available event */ GetNextEvent(myEventMask,&event); /* signal start of window update */ BeginUpdate(cpptr); key = MPI_Comm_size(MPI_COMM_WORLD,&nvp); messwin(nvp); /* signal end of update after BeginUpdate */ EndUpdate(cpptr); } } /* check for drag window event */ else if (event.what==mouseDown) { /* see which window part, including menu bar, is at a point */ if (FindWindow(event.where,&which)==4) { if (cpptr==which) { /* remove next available event */ GetNextEvent(myEventMask,&event); /* track the mouse and move a window */ DragWindow(cpptr,event.where,&crect); } } } } /* yield time when time slice since last yield > 1 second */ if (stk==0) { /* calculate time elapsed in milliseconds */ if (OTElapsedMilliseconds(×) > 1000) { /* receive null event from event manager to relinquish the processor */ WaitNextEvent(0,&event,1,0); /* Obtain the current time stamp */ OTGetTimeStamp(×); } } /* yield time if requested */ else { /* receive null event from event manager to relinquish the processor */ WaitNextEvent(0,&event,stk,0); /* Obtain the current time stamp */ OTGetTimeStamp(×); } return checkesc; } int MPI_Finalize(void) { /* terminate MPI execution environment local data */ int ierror, i; long state, oss; OTTimeStamp ptime; /* internal mpi common block nproc = number of real or virtual processors obtained cfig0 = OTConfiguration structure template epref = array of endpoint references for each participating node nevents = log of unknown notifier events mapcomm = communicator map */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters */ ierror = 0; /* MPI already finalized */ if (nproc < 0) { ierror = 1; return ierror; } if (monitor==2) fprintf(unit2,"MPI_Finalize started\n"); /* Dispose of an OTConfiguration structure */ OTDestroyConfiguration(cfig0); /* Flush endpoint providers */ for (i = 0; i <= MAXS; i++) { if (epref[i]) { /* Obtain the current state of an endpoint */ state = OTGetEndpointState(epref[i]); if (state==T_DATAXFER) /* Send a module-specific command to Open Transport protocol module */ oss = OTIoctl(epref[i],I_FLUSH,(void *)FLUSHRW); } } /* Obtain the current time stamp */ OTGetTimeStamp(&ptime); /* Disconnect endpoints with orderly disconnect */ for (i = 0; i <= MAXS; i++) { if (epref[i]) { /* Wait 5 seconds for endpoints to acknowledge disconnect */ L20: state = OTGetEndpointState(epref[i]); if (state > T_IDLE) { /* Calculate time elapsed in milliseconds */ if (OTElapsedMilliseconds(&ptime) < 5000) goto L20; else { fprintf(unit2,"OrderlyDisconnect Timeout: i,state=%d,%d\n", i,state); /* Tear down an open connection (abortive disconnect) */ oss = OTSndDisconnect(epref[i],0); if (oss) { fprintf(unit2,"OTSndDisconnect Error, i,oss=%d,%d\n",i,oss); if (oss==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state=%d\n", OTGetEndpointState(epref[i])); } } } } } /* Obtain the current time stamp */ OTGetTimeStamp(&ptime); /* Unbind and close providers */ for (i = 0; i <= MAXS; i++) { if (epref[i]) { /* Wait another 5 seconds for endpoints to acknowledge disconnect */ L40: state = OTGetEndpointState(epref[i]); if (state > T_IDLE) { /* Calculate time elapsed in milliseconds */ if (OTElapsedMilliseconds(&ptime) < 5000) goto L40; else fprintf(unit2,"Disconnect Timeout: i,state=%d,%d\n", i,state); } /* Set a provider to synchronous mode */ oss = OTSetSynchronous(epref[i]); /* Dissociate an endpoint from its address */ oss = OTUnbind(epref[i]); if (oss) { fprintf(unit2,"Unbind Endpoint Error, i, oss = %d,%d\n",i,oss); if (oss==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state=%d\n", OTGetEndpointState(epref[i])); } /* Close a provider of any type */ oss = OTCloseProvider(epref[i]); if (oss) { fprintf(unit2,"Close Endpoint Error, i, oss = %d,%d\n",i,oss); ierror = oss; } } } /* Dispose of Univeral Proc Ptr */ DisposeOTNotifyUPP(notifierUPP); notifierUPP = 0; /* Unregister adsp endpoint created for puppy */ if (LDM) delport(); /* Unregister your application connection to Open Transport */ CloseOpenTransportInContext(NULL); /* Write out any unknown notifier events */ for (i = 0; i <= MAXS; i++) { if (nevents[i]) fprintf(unit2,"Unknown notifier event, endpoint=%d,%d\n",nevents[i],i); } /* Close window for showing MPI message status */ if (monitor > 0) { logmess(0,0,-1,0,0); delmess(); } /* Nullify nproc */ nproc = -1; /* Nullify communicator mappings */ for (i = 0; i < MAXC; i++) { mapcomm[i][MAXS] = 0; } /* Nullify endpoint references and unknown notifier events */ for (i = 0; i <= MAXS; i++) { epref[i] = 0; nevents[i] = 0; } /* check if any messages remain outstanding */ state = 0; for (i = 0; i < MAXM; i++) { if (curreq[i][0]) state += 1; } if (state > 0) { fprintf(unit2,"%d message(s) never cleared\n",state); for (i = 0; i < MAXM; i++) { if (curreq[i][0]) { if (curreq[i][0]==(-1)) fprintf(unit2," transmission mode = send\n"); else if (curreq[i][0]==1) fprintf(unit2," transmission mode = receive\n"); fprintf(unit2," destination/source = %d\n",curreq[i][1]); fprintf(unit2," communicator = %d\n",curreq[i][2]); fprintf(unit2," tag = %d\n",curreq[i][3]); fprintf(unit2," datatype = %d\n",curreq[i][4]); fprintf(unit2,"\n"); } } } if (monitor==2) fprintf(unit2,"MPI_Finalize complete\n"); /* Delete file if empty */ if (!fseek(unit2,0,SEEK_END)) { i = ftell(unit2); fclose(unit2); if (!i) remove("MPIerrs"); } return ierror; } int MPI_Send(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) { /* blocking standard mode send buf = initial address of send buffer count = number of entries to send datatype = datatype of each entry dest = rank of destination tag = message tag comm = communicator input: buf, count, datatype, dest, tag, comm local data */ int ierror; MPI_Request request; MPI_Status status; ierror = MPI_Isend(buf,count,datatype,dest,tag,comm,&request); ierror = MPI_Wait(&request,&status); return ierror; } int MPI_Recv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status) { /* blocking receive buf = initial address of receive buffer count = maximum number of entries to receive datatype = datatype of each entry source = rank of source tag = message tag comm = communicator status = return status input: count, datatype, source, tag, comm output: buf, status local data */ int ierror; MPI_Request request; ierror = MPI_Irecv(buf,count,datatype,source,tag,comm,&request); ierror = MPI_Wait(&request,status); return ierror; } int MPI_Isend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) { /* start a non-blocking send buf = initial address of send buffer count = number of entries to send datatype = datatype of each entry dest = rank of destination tag = message tag comm = communicator request = request handle input: buf, count, datatype, dest, tag, comm output: request local data */ int ierror, np, longw, i; long response; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id epref = array of endpoint references for each participating node ioc = array of context pointers for notifier function mapcomm = communicator map */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters header = message envelope rwrec = read/write record for asynchronous messages mqueue = message request queue */ ierror = 0; if (dest==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return ierror; } /* find space for record */ i = -1; L10: i += 1; if (i >= MAXM) { fprintf(unit2,"too many sends waiting, dest, tag = %d,%d,\n", dest,tag); *request = MPI_REQUEST_NULL; ierror = 14; writerrs("MPI_Isend: ",ierror); return ierror; } else if (curreq[i][0]) goto L10; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid count */ else if (count < 0) ierror = 3; /* invalid destination */ else if ((dest < 0) || (dest >= nproc)) { fprintf(unit2,"destination = %d\n",dest); ierror = 4; } /* invalid tag */ else if (tag < (-1)) ierror = 6; /* communicator errors */ else { longw = mapcomm[comm][MAXS]; np = mapcomm[comm][dest]; /* communicator not mapped */ if ((longw <= 0) || (longw > nproc)) ierror = 2; /* invalid destination */ else if ((dest < 0) || (dest >= longw)) { fprintf(unit2,"destination = %d\n",dest); ierror = 4; } /* invalid mapping */ else if ((np < 0) || (np >= nproc)) { fprintf(unit2,"Invalid mapping, destination, node = %d,%d\n",dest,np); ierror = 2; } } /* handle errors */ if (ierror) { writerrs("MPI_Isend: ",ierror); return ierror; } /* Create header */ /* OTData structure for header */ header[i].first.fNext = &header[i].second; header[i].first.fData = &header[i].hdata[0]; header[i].first.fLen = 16; /* Save communicator */ header[i].hdata[0] = comm; /* Save tag */ header[i].hdata[1] = tag; /* Save datatype */ header[i].hdata[2] = datatype; /* Set destination id for selfsends */ if (idproc==np) np = MAXS; /* Set endpoint reference pointer */ rwrec[i].ref = epref[np]; /* Reset iocompletion flag */ rwrec[i].ioflag = 1; /* Set pointer to header */ rwrec[i].buf = (void *)&header[i]; /* Set buffer length for header */ rwrec[i].nbytes = kNetbufDataIsOTData; /* Find buffer length for data */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) longw = 4*count; else if (datatype==MPI_DOUBLE) longw = 8*count; else if (datatype==MPI_BYTE) longw = count; else if ((datatype==MPI_2INT) || (datatype==MPI_FLOAT_INT)) longw = 8*count; else if (datatype==MPI_DOUBLE_INT) longw = 16*count; /* Invalid datatype */ else { ierror = 7; writerrs("MPI_Isend: ",ierror); return ierror; } /* Set pointer to data buffer */ rwrec[i].sbuf = buf; /* Set buffer lengths for data */ rwrec[i].sln = longw; rwrec[i].len = -header[i].first.fLen; /* Clear more flag */ rwrec[i].flags = 0; /* Clear next message flag */ rwrec[i].nextm = 0; /* Clear non-fatal error code */ rwrec[i].nfatal = 0; /* OTData structure for data */; header[i].second.fNext = 0; header[i].second.fData = rwrec[i].sbuf; header[i].second.fLen = longw; /* Save length */ header[i].hdata[3] = longw; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[0]); /* Limit notifications that can be sent to notifier */ OTEnterNotifier(epref[np]); /* Append this message to send queue if necessary */ if (ioc[np][3] > 0) { if (mqueue[np][1] > 0) rwrec[mqueue[np][1]-1].nextm = i + 1; else rwrec[ioc[np][3]-1].nextm = i + 1; mqueue[np][1] = i + 1; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); goto L30; } /* First send 4 word header, then data */ sndmsgf(i,np); response = ioresult((int *)&rwrec[i]); /* Set pointer to current send record */ if (response > 0) ioc[np][3] = i + 1; /* Set iocompletion flag to error */ else if (response < 0) ierror = response; /* Allow Open Transport to resume sending events */ L30: OTLeaveNotifier(epref[np]); /* Handle read and write errors */ if (ierror) { /* Write out readwrite record */ rwstat(i,unit2); wqueue(unit2); for (i = 0; i < MAXM; i++) { if (curreq[i][0] != 0) rwstat(i,unit2); } writerrs("MPI_ISend: ",ierror); } /* Find actual destination */ if (np==MAXS) np = idproc; /* log MPI message state change and display status */ if (monitor > 0) logmess(np,1,rwrec[i].sln,0,tag); /* save transmission mode as send */ curreq[i][0] = -1; /* save destination/source id */ curreq[i][1] = np; /* save communicator */ curreq[i][2] = comm; /* save tag */ curreq[i][3] = tag; /* save datatype */ curreq[i][4] = datatype; /* assign request handle */ *request = i; return ierror; } int MPI_Irecv(void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request *request) { /* begin a non-blocking receive buf = initial address of receive buffer count = maximum number of entries to receive datatype = datatype of each entry source = rank of source tag = message tag comm = communicator request = request handle input: count, datatype, source, tag, comm output: buf, request local data */ int ierror, np, longw, i; long response; /* internal mpi common block nproc = number of real or virtual processors obtained epref = array of endpoint references for each participating node ioc = array of context pointers for notifier function mapcomm = communicator map */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters header = message envelope rwrec = read/write record for asynchronous messages mqueue = message request queue */ ierror = 0; if (source==MPI_PROC_NULL) { *request = MPI_REQUEST_NULL; return ierror; } /* find space for record */ i = -1; L10: i += 1; if (i >= MAXM) { fprintf(unit2,"too many receives waiting, source, tag = %d,%d,\n", source,tag); *request = MPI_REQUEST_NULL;; ierror = 15; writerrs("MPI_Irecv: ",ierror); return ierror; } else if (curreq[i][0]) goto L10; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid count */ else if (count < 0) ierror = 3; /* invalid source */ else if ((source < 0) || (source >= nproc)) { if (source==MPI_ANY_SOURCE) fprintf(unit2,"MPI_ANY_SOURCE not supported\n"); else fprintf(unit2,"source = %d\n",source); ierror = 5; } /* invalid tag */ else if (tag < (-1)) ierror = 6; /* communicator errors */ else { longw = mapcomm[comm][MAXS]; np = mapcomm[comm][source]; /* communicator not mapped */ if ((longw <= 0) || (longw > nproc)) ierror = 2; /* invalid source */ else if ((source < 0) || (source >= longw)) { fprintf(unit2,"source = %d\n",source); ierror = 4; } /* invalid mapping */ else if ((np < 0) || (np >= nproc)) { fprintf(unit2,"Invalid mapping, source, node = %d,%d\n",source,np); ierror = 2; } } /* handle errors */ if (ierror) { writerrs("MPI_Irecv: ",ierror); return ierror; } /* Set endpoint reference pointer */ rwrec[i].ref = epref[np]; /* Reset iocompletion flag for receive */ rwrec[i].ioflag = 1; /* Set pointer to header */ rwrec[i].buf = (void *)&header[i].hdata[0]; /* Set buffer length for header */ rwrec[i].nbytes = 16; /* Set buffer length for data */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) longw = 4*count; else if (datatype==MPI_DOUBLE) longw = 8*count; else if (datatype==MPI_BYTE) longw = count; else if ((datatype==MPI_2INT) || (datatype==MPI_FLOAT_INT)) longw = 8*count; else if (datatype==MPI_DOUBLE_INT) longw = 16*count; /* Invalid datatype */ else { ierror = 7; writerrs("MPI_Irecv: ",ierror); return ierror; } /* Set pointer to data buffer */ rwrec[i].sbuf = buf; /* Set buffer lengths */ rwrec[i].sln = longw; rwrec[i].len = -rwrec[i].nbytes; /* Clear more flag */ rwrec[i].flags = 0; /* Clear next message flag */ rwrec[i].nextm = 0; /* Clear non-fatal error code */ rwrec[i].nfatal = 0; /* Clear length */ header[i].hdata[3] = 0; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[0]); /* Limit notifications that can be sent to notifier */ OTEnterNotifier(epref[np]); /* Append this message to receive queue if necessary */ if (ioc[np][2] > 0) { if (mqueue[np][0] > 0) rwrec[mqueue[np][0]-1].nextm = i + 1; else rwrec[ioc[np][2]-1].nextm = i + 1; mqueue[np][0] = i + 1; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); goto L40; } /* First receive 4 word header, then data */ rcvmsgf(i,np); response = ioresult((int *)&rwrec[i]); /* Set pointer to current receive record */ if (response > 0) ioc[np][2] = i + 1; /* Set iocompletion flag to error */ else if (response < 0) ierror = response; /* Allow Open Transport to resume sending events */ L40: OTLeaveNotifier(epref[np]); /* Handle read and write errors */ if (ierror) { /* Write out readwrite record */ rwstat(i,unit2); wqueue(unit2); for (i = 0; i < MAXM; i++) { if (curreq[i][0] != 0) rwstat(i,unit2); } writerrs("MPI_IRecv: ",ierror); } /* log MPI message state change and display status */ if (monitor > 0) logmess(np,2,rwrec[i].sln,0,tag); /* save transmission mode as receive */ curreq[i][0] = 1; /* save destination/source id */ curreq[i][1] = np; /* save communicator */ curreq[i][2] = comm; /* save tag */ curreq[i][3] = tag; /* save datatype */ curreq[i][4] = datatype; /* assign request handle */ *request = i; return ierror; } int MPI_Test(MPI_Request *request, int *flag, MPI_Status *status) { /* check to see if a nonblocking send or receive operation has completed request = request handle flag = true if operation completed status = status object input: request output: request, flag, status local data */ int ierror, i, dest, source, slen, tag, rlen, rtag, nerr, j; float ts; MPI_Comm comm, rcomm; MPI_Datatype datatype, rdatat; OTTimeStamp delta; /* mstime = maximum time (msec) to wait for message to start arriving mptime = maximum time (msec) to wait for next part of message */ static int mstime = 300000, mptime = 10000; /* internal mpi common block nproc = number of real or virtual processors obtained ioc = array of context pointers for notifier function */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages curreq = request record for transmission parameters header = message envelope rwrec = read/write record for asynchronous messages */ ierror = 0; /* check for error conditions */ i = *request; /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* null request */ else if (*request < 0) { *flag = 1; return 0; } /* invalid request handle */ else if (i >= MAXM) ierror = 16; else if (curreq[i][0]==0) ierror = 16; /* handle errors */ if (ierror) { status->error = ierror; writerrs("MPI_Test: ",ierror); return ierror; } /* set status to empty */ status->source = -1; status->tag = -1; status->error = ierror; status->len = 0; status->type = 0; *flag = 0; /* Check if data read or write has completed */ if (ioresult((int *)&rwrec[i]) > 0) { if (checkesc(0)) { if (curreq[i][0] < 0) fprintf(unit2,"Send killed,dest,tag=%d,%d\n", curreq[i][1],curreq[i][3]); else fprintf(unit2,"Receive killed,source,tag=%d,%d\n", curreq[i][1],curreq[i][3]); ierror = -9; writerrs("MPI_Test: ",ierror); return ierror; } /* Check Timeout */ else { /* Limit notifications that can be sent to notifier */ OTEnterNotifier(rwrec[i].ref); /* Calculate time elapsed in milliseconds */ nerr = OTElapsedMilliseconds(&rwrec[i].ts[1]); /* Retry Send or Receive */ if ((nerr < mptime) && (rwrec[i].nfatal != kENOMEMErr)) { /* Allow Open Transport to resume sending events */ OTLeaveNotifier(rwrec[i].ref); return ierror; } /* Check if message arrived during checkesc */ nerr = ioresult((int *)&rwrec[i]); if (nerr <= 0) { fprintf(unit2,"Info: message arrived during checkesc\n"); *flag = 1; goto L20; } /* Check if any other messages need service */ for (j = 0; j < MAXM; j++) { if ((j != i) && (curreq[j][0] < 0)) { /* Limit notifications that can be sent to notifier */ if (rwrec[i].ref != rwrec[j].ref) OTEnterNotifier(rwrec[j].ref); /* Check if out of memory flag is set and message is incomplete */ if (rwrec[j].nfatal==kENOMEMErr) { if (ioresult((int *)&rwrec[j]) > 0) { dest = curreq[j][1]; fprintf(unit2,"send endpt %d out of memory\n",dest); if (idproc==dest) dest = MAXS; /* Attempt to send next block of data */ sndmsgf(j,dest); fprintf(unit2,"Info: request, response=%d,%d",j, ioresult((int *)&rwrec[j])); } } /* Allow Open Transport to resume sending events */ if (rwrec[i].ref != rwrec[j].ref) OTLeaveNotifier(rwrec[j].ref); } else if ((j != i) && (curreq[j][0] > 0)) { /* Limit notifications that can be sent to notifier */ if (rwrec[i].ref != rwrec[j].ref) OTEnterNotifier(rwrec[j].ref); /* Check if out of memory flag is set and message is incomplete */ if (rwrec[j].nfatal==kENOMEMErr) { if (ioresult((int *)&rwrec[j]) > 0) { source = curreq[j][1]; fprintf(unit2,"recv endpt %d out of memory\n",source); /* Attempt to receive next block of data */ rcvmsgf(j,source); fprintf(unit2,"Info: request, response=%d,%d",j, ioresult((int *)&rwrec[j])); } } /* Allow Open Transport to resume sending events */ if (rwrec[i].ref != rwrec[j].ref) OTLeaveNotifier(rwrec[j].ref); } } /* Try to determine cause of Timeout in Send */ if (curreq[i][0] < 0) { dest = curreq[i][1]; if (rwrec[i].nfatal==kENOMEMErr) fprintf(unit2,"OT Temporarily out of Memory\n"); else { ts = .001*(float) OTElapsedMilliseconds(&rwrec[i].ts[0]); fprintf(unit2,"Send Timeout, %f sec, Retrying...\n",ts); } /* Debug information */ fprintf(unit2,"destination=%d,size=%d,tag=%d\n", dest,rwrec[i].sln,curreq[i][3]); if (idproc==dest) dest = MAXS; /* Attempt to send next block of data */ sndmsgf(i,dest); nerr = ioresult((int *)&rwrec[i]); /* Non-fatal errors */ if (rwrec[i].nfatal) { if (rwrec[i].nfatal==kOTFlowErr) fprintf(unit2,"Flow Control prevents sending data\n"); /* Do not wait more than 5 minutes for message to start sending */ if (OTElapsedMilliseconds(&rwrec[i].ts[0]) >= mstime) { nerr = rwrec[i].nfatal; if (nerr==kENOMEMErr) fprintf(unit2,"Open Transport has run out of Memory\n"); fprintf(unit2,"OTSnd Retry failed\n"); ierror = nerr; ioc[dest][3] = 0; *flag = 1; } } /* Fatal errors */ else if (nerr < 0) { ierror = nerr; *flag = 1; } /* Data successfully sent */ else { /* Debug information */ fprintf(unit2,"data block sent, current total = %d\n", rwrec[i].len); if (!nerr) *flag = 1; } } /* Try to determine cause of Timeout in Receive */ else { source = curreq[i][1]; if (rwrec[i].nfatal==kENOMEMErr) fprintf(unit2,"OT Temporarily out of Memory\n"); else { ts = .001*(float) OTElapsedMilliseconds(&rwrec[i].ts[0]); fprintf(unit2,"Receive Timeout, %f sec, Retrying...\n",ts); } /* Debug information */ fprintf(unit2,"source=%d,size=%d,tag=%d\n", source,rwrec[i].sln,curreq[i][3]); /* Attempt to send next block of data */ rcvmsgf(i,source); nerr = ioresult((int *)&rwrec[i]); /* Non-fatal errors */ if (rwrec[i].nfatal) { if (rwrec[i].nfatal==kOTFlowErr) fprintf(unit2,"Flow Control prevents accepting data\n"); if (rwrec[i].nfatal==kOTNoDataErr) fprintf(unit2,"No data is currently available\n"); /* Do not wait more than 5 minutes for message to start sending */ if (OTElapsedMilliseconds(&rwrec[i].ts[0]) >= mstime) { nerr = rwrec[i].nfatal; if (nerr==kENOMEMErr) fprintf(unit2,"Open Transport has run out of Memory\n"); fprintf(unit2,"OTRcv Retry failed\n"); ierror = nerr; ioc[source][2] = 0; *flag = 1; } } /* Fatal errors */ else if (nerr < 0) { ierror = nerr; *flag = 1; } /* Data successfully received */ else { /* Debug information */ fprintf(unit2,"data block received, current total = %d\n", rwrec[i].len); if (!nerr) *flag = 1; } } /* Allow Open Transport to resume sending events */ L20: OTLeaveNotifier(rwrec[i].ref); if (!(*flag)) return ierror; } } /* Data read or write has completed */ else { nerr = ioresult((int *)&rwrec[i]); *flag = 1; } /* Get requested length */ slen = rwrec[i].sln; /* Get actual length */ rlen = rwrec[i].len; /* Read current request record */ tag = curreq[i][3]; /* Define length for MPI_Get_count */ status->len = rlen; /* Check for send errors */ if (curreq[i][0] < 0) { dest = curreq[i][1]; /* Define type for MPI_Get_count */ status->type = curreq[i][4]; /* Check for write errors */ if (nerr < 0) { if (!ierror) { fprintf(unit2,"OTSnd Error, nerr, dest, tag = %d,%d,%d\n", nerr,dest,tag); if (nerr==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state\n"); else if (nerr==kENOMEMErr) fprintf(unit2,"Open Transport has run out of memory\n"); ierror = nerr; } } else if (rlen != slen) { fprintf(unit2,"Send Length Error,dest,tag,requested/actual length=%d,%d,%d,%d\n", dest,tag,slen,rlen); ierror = 8; } /* log MPI message state change and display status */ if (monitor > 0) { if (checkesc(0)) { ierror = -9; writerrs("MPI_Test: ",ierror); return ierror; } else if (!ierror) { /* Subtract one timestamp value from another */ OTSubtractTimeStamps(&delta,&rwrec[i].ts[0],&rwrec[i].ts[1]); /* Convert difference between time steps into microseconds */ nerr = OTTimeStampInMicroseconds(&delta); logmess(dest,-1,rlen,nerr,tag); } } goto L30; } /* Read current request record */ source = curreq[i][1]; comm = curreq[i][2]; datatype = curreq[i][4]; /* Read header */ /* Get received tag from header */ rtag = header[i].hdata[1]; /* Get received comm from header */ rcomm = header[i].hdata[0]; /* Get received datatype from header */ rdatat = header[i].hdata[2]; /* Define source, tag and type for MPI_Get_count */ status->source = source; status->tag = rtag; status->type = rdatat; /* Check for receive errors */ if (nerr < 0) { if (!ierror) { fprintf(unit2,"OTRcv Error, nerr, source, tag = %d,%d,%d\n", nerr,source,tag); if (nerr==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state\n"); else if (nerr==kENOMEMErr) fprintf(unit2,"Open Transport has run out of memory\n"); ierror = nerr; } } /* Length error */ else if (rlen > slen) { fprintf(unit2,"Read Length Error, source, tag, requested/actual = %d,%d,%d,%d\n", source,tag,slen,rlen); fprintf(unit2,"Possibly attempting to receive data out of order\n"); ierror = 13; } /* Check for incomplete message, this should never be able to happen */ else if (rlen < header[i].hdata[3]) { fprintf(unit2,"Incomplete Read, source, tag, requested/actual = %d,%d,%d,%d\n", source,tag,header[i].hdata[3],rlen); ierror = 12; } /* Comm error */ else if (rcomm != comm) { fprintf(unit2,"Read Comm Error, source, tag, expected/received comm = %d,%d,%d,%d\n", source,tag,comm,rcomm); ierror = 9; } /* Tag error */ else if ((tag >= 0) && (rtag != tag)) { fprintf(unit2,"Read Tag Error, source, expected/received tag = %d,%d,%d\n", source,tag,rtag); fprintf(unit2,"Possibly attempting to receive data out of order\n"); ierror = 10; } /* Type error */ else if (rdatat != datatype) { fprintf(unit2,"Read Type Error, source, tag, expected/received type = %d,%d,%d,%d\n", source,tag,datatype,rdatat); ierror = 11; } /* log MPI message state change and display status */ if (monitor > 0) { if (checkesc(0)) { ierror = -9; writerrs("MPI_Test: ",ierror); return ierror; } else if (!ierror) { /* Subtract one timestamp value from another */ OTSubtractTimeStamps(&delta,&rwrec[i].ts[0],&rwrec[i].ts[1]); /* Convert difference between time steps into microseconds */ nerr = OTTimeStampInMicroseconds(&delta); logmess(source,-2,rlen,nerr,tag); } } /* Store error code */ L30: status->error = ierror; /* Nullify transmission mode */ curreq[i][0] = 0; /* Nullify request handle */ *request = MPI_REQUEST_NULL;; /* Handle read and write errors */ if (ierror) { /* Write out readwrite record */ rwstat(i,unit2); wqueue(unit2); for (i = 0; i < MAXM; i++) { if (curreq[i][0] != 0) rwstat(i,unit2); } writerrs("MPI_Test: ",ierror); } return ierror; } void sndmsgf(MPI_Request request, int dest) { /* send a message fragment request = request handle dest = rank of destination local data */ int i, np, nps; long response; /* internal mpi common block ioc = array of context pointers for notifier function */ /* internal common block for non-blocking messages curreq = request record for transmission parameters rwrec = read/write record for asynchronous messages mqueue = message request queue */ i = request; np = dest; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); /* Send data to a remote peer */ L10: response = OTSnd(rwrec[i].ref,rwrec[i].buf,rwrec[i].nbytes, rwrec[i].flags); /* Process data which has been sent */ if (response >= 0) { /* Clear non-fatal error code */ rwrec[i].nfatal = 0; /* Set actual length sent */ rwrec[i].len += response; /* Check for incomplete header */ if (rwrec[i].len < 0) { header[i].first.fData = (void *)(((unsigned char *)header[i].first.fData) + response); header[i].first.fLen -= response; } /* Check for incomplete data */ else if (rwrec[i].sln > rwrec[i].len) { /* Header is sent, readjust parameters to send data */ if (rwrec[i].ioflag==1) { rwrec[i].buf = rwrec[i].sbuf; rwrec[i].nbytes = rwrec[i].sln; response -= header[i].first.fLen; } /* Readjust buffer pointer */ rwrec[i].buf = (void *)(((unsigned char *)rwrec[i].buf) + response); rwrec[i].nbytes -= response; rwrec[i].ioflag += 1; } /* Data is sent */ else { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); rwrec[i].ioflag = 0; /* Get next message if messages are queued */ if (rwrec[i].nextm > 0) { i = rwrec[i].nextm; if (i==mqueue[np][1]) mqueue[np][1] = 0; ioc[np][3] = i; i -= 1; goto L10; } /* Clear pointer to current send record */ else ioc[np][3] = 0; } } /* Check for errors */ else { /* Process non-fatal errors */ if ((response==kOTFlowErr) || (response==kENOMEMErr)) rwrec[i].nfatal = response; /* Process fatal errors */ else { /* Find actual destination */ nps = np; if (nps==MAXS) nps = idproc; fprintf(unit2,"OTSnd Error, ierr, dest, tag = %d,%d,%d\n", response,nps,header[i].hdata[1]); if (response==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state\n"); /* Determine cause of a kOTLookErr */ else if (response==kOTLookErr) fprintf(unit2,"OTLookErr, cause=%d\n",OTLook(rwrec[i].ref)); /* Set iocompletion flag to error */ rwrec[i].ioflag = response; /* Clear pointer to current send record */ ioc[np][3] = 0; } } return; } void rcvmsgf(MPI_Request request, int source) { /* receive a message fragment request = request handle source = rank of source local data */ int i, np, j; long response; /* internal mpi common block ioc = array of context pointers for notifier function */ /* internal common block for non-blocking messages curreq = request record for transmission parameters rwrec = read/write record for asynchronous messages trash = trash bin for unwanted data mqueue = message request queue */ i = request; np = source; /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); /* Read data sent from a remote peer */ L10: response = OTRcv(rwrec[i].ref,rwrec[i].buf,rwrec[i].nbytes, &rwrec[i].flags); /* Unexpected flag returned */ if (rwrec[i].flags > 1) { rwrec[i].ioflag = -3; /* Clear pointer to current receive record */ ioc[np][2] = 0; } /* Process data which has arrived */ else if (response >= 0) { /* Clear more flag */ rwrec[i].flags = 0; /* Clear non-fatal error code */ rwrec[i].nfatal = 0; /* Set actual length received */ rwrec[i].len += response; /* Check if all the data has arrived */ if (rwrec[i].len < header[i].hdata[3]) { /* Incomplete data */ if (rwrec[i].nbytes > response) { /* Readjust buffer pointer */ rwrec[i].buf = (void *)(((unsigned char *)rwrec[i].buf) + response); rwrec[i].nbytes -= response; if (rwrec[i].len >= 0) rwrec[i].ioflag += 1; } /* Header is received, readjust parameters to receive data */ else if (rwrec[i].ioflag==1) { rwrec[i].buf = rwrec[i].sbuf; rwrec[i].nbytes = imin(header[i].hdata[3],rwrec[i].sln); rwrec[i].ioflag = 2; goto L10; } /* Data is received and buffer is full */ else { rwrec[i].buf = &trash; rwrec[i].nbytes = 1024; goto L10; } } /* Data is received */ else { /* Obtain the current time stamp */ OTGetTimeStamp(&rwrec[i].ts[1]); rwrec[i].ioflag = 0; /* Get next message if messages are queued */ if (rwrec[i].nextm > 0) { i = rwrec[i].nextm; if (i==mqueue[np][0]) mqueue[np][0] = 0; ioc[np][2] = i; } /* Clear pointer to current read record */ else ioc[np][2] = 0; } } /* Check for errors */ else { /* Process non-fatal errors */ if ((response==kOTFlowErr) || (response==kOTNoDataErr) || (response==kENOMEMErr)) rwrec[i].nfatal = response; /* Process potentially fatal errors */ else { /* Determine cause of a kOTLookErr */ if (response==kOTLookErr) { j = OTLook(rwrec[i].ref); if (j==T_GODATA) { /* Set pointer to current send record */ j = ioc[np][3] - 1; /* Send next block of data */ if ((j >= 0) && (j < MAXM)) { fprintf(unit2,"rcvmsgf: OTLookErr for node=%d\n",np); sndmsgf(j,np); response = ioresult((int *)&rwrec[j]); /* Report send error */ if (response < 0) { fprintf(unit2,"rcvmsgf: send error=%d\n",response); ioc[np][3] = 0; } } /* Clear more flag */ rwrec[i].flags = 0; goto L10; } } /* Process fatal errors */ fprintf(unit2,"OTRcv Error, ierr, source, tag = %d,%d,%d\n", response,np,header[i].hdata[1]); if (response==kOTOutStateErr) fprintf(unit2,"Endpoint not in an appropriate state\n"); else if (response==kOTLookErr) fprintf(unit2,"OTLookErr, unknown cause=%d",j); /* Set iocompletion flag to error */ rwrec[i].ioflag = response; /* Clear pointer to current receive record */ ioc[np][2] = 0; } } return; } int MPI_Wait(MPI_Request *request, MPI_Status *status) { /* wait for an MPI send or receive to complete request = request handle status = status object input: request output: request, status local data */ int ierror, flag; L10: ierror = MPI_Test(request,&flag,status); if (!flag) goto L10; return ierror; } int MPI_Request_free(MPI_Request *request) { /* free a communication request object request = request handle input: request output: request local data */ int ierror, i; /* internal mpi common block nproc = number of real or virtual processors obtained */ /* internal common block for non-blocking messages curreq = request record for transmission parameters rwrec = read/write record for asynchronous messages */ ierror = 0; /* check for error conditions */ i = *request; /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* null request */ else if (*request < 0) { return 0; } /* invalid request handle */ else if (i >= MAXM) ierror = 16; else if (curreq[i][0]==0) ierror = 16; /* handle errors */ if (ierror) { writerrs("MPI_Request_free ",ierror); return ierror; } /* Check if data read or write has completed */ if (ioresult((int *)&rwrec[i]) <= 0) { /* Nullify transmission mode */ curreq[i][0] = 0; /* Nullify request handle */ *request = MPI_REQUEST_NULL; } else { fprintf(unit2,"MPI_Request_free: Message not Completed\n"); /* Write out readwrite record */ rwstat(i,unit2); ierror = 32; } return ierror; } int MPI_Sendrecv(void* sendbuf, int sendcount, MPI_Datatype sendtype, int dest, int sendtag, void* recvbuf, int recvcount, MPI_Datatype recvtype, int source, int recvtag, MPI_Comm comm, MPI_Status *status) { /* blocking send and receive operation sendbuf = initial address of send buffer sendcount = number of entries to send sendtype = type of entries in send buffer dest = rank of destination sendtag = send tag recvbuf = initial address of receive buffer recvcount = max number of entries to receive recvtype = type of entries in receive buffer source = rank of source recvtag = receive tag comm = communicator status = return status input: sendbuf, sendcount, sendtype, dest, sendtag, recvbuf, recvcount recvtype, source, recvtag, comm output: recvbuf, status local data */ int ierror; MPI_Request recvreq, sendreq; /* post non-blocking receive and send */ ierror = MPI_Irecv(recvbuf,recvcount,recvtype,source,recvtag,comm,&recvreq); ierror = MPI_Isend(sendbuf,sendcount,sendtype,dest,sendtag,comm,&sendreq); /* wait for send and receive */ ierror = MPI_Wait(&sendreq,status); ierror = MPI_Wait(&recvreq,status); return ierror; } int MPI_Ssend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) { /* blocking synchronous mode send buf = initial address of send buffer count = number of entries to send datatype = datatype of each entry dest = rank of destination tag = message tag comm = communicator input: buf, count, datatype, dest, tag, comm comment: this is just a temporary stub local data */ int ierror; ierror = MPI_Send(buf,count,datatype,dest,tag,comm); return ierror; } int MPI_Issend(void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) { /* start a non-blocking synchronous mode send buf = initial address of send buffer count = number of entries to send datatype = datatype of each entry dest = rank of destination tag = message tag comm = communicator request = request handle input: buf, count, datatype, dest, tag, comm output: request comment: this is just a temporary stub local data */ int ierror; ierror = MPI_Isend(buf,count,datatype,dest,tag,comm,request); return ierror; } int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) { /* wait for a collection of specified MPI sends or receives to complete count = list length array_of_requests = array of request handles array_of_statuses = array of status objects input: count, array_of_requests output: array_of_requests, array_of_statuses local data */ int ierror, i, ierr; /* invalid count */ if (count < 0) { fprintf(unit2,"Invalid list length = %d\n",count); ierror = 17; writerrs("MPI_Waitall: ",ierror); return ierror; } ierror = 0; for (i = 0; i < count; i++) { ierr = MPI_Wait(&array_of_requests[i],&array_of_statuses[i]); if (ierr) ierror = MPI_ERR_IN_STATUS; } return ierror; } int MPI_Waitany(int count, MPI_Request *array_of_requests, int *index, MPI_Status *status) { /* wait for any specified MPI send or receive to complete count = list length array_of_requests = array of request handles index = index of request handle that completed status = status object input: count, array_of_requests output: array_of_requests, index, status local data */ int ierror, i, k, flag; /* invalid count */ if (count < 0) { fprintf(unit2,"Invalid list length = %d\n",count); ierror = 17; writerrs("MPI_Waitany: ",ierror); return ierror; } /* find number of requests already completed */ k = 0; for (i = 0; i < count; i++) if (array_of_requests[i] < 0) k = k + 1; if (k==count) { *index = -1; ierror = 0; return ierror; } i = 0; L20: flag = 0; if (array_of_requests[i] >= 0) ierror = MPI_Test(&array_of_requests[i],&flag,status); if (flag) *index = i; else { i += 1; if (i >= count) i = 0; goto L20; } return ierror; } int MPI_Get_count(MPI_Status *status, MPI_Datatype datatype, int *count) { /* get the number of "top level" elements status = return status of receive operation datatype = datatype of each receive buffer entry count = number of received entries input: status, datatype output: count local data */ int ierror; /* internal mpi common block nproc = number of real or virtual processors obtained */ ierror = 0; *count = 0; /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* mismatched datatype */ else if (datatype != status->type) ierror = 18; /* calculate count */ else if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) { *count = status->len/4; if (4*(*count) != status->len) *count = MPI_UNDEFINED; } else if (datatype==MPI_DOUBLE) { *count = status->len/8; if (8*(*count) != status->len) *count = MPI_UNDEFINED; } else if (datatype==MPI_BYTE) *count = status->len; else if ((datatype==MPI_2INT) || (datatype==MPI_FLOAT_INT)) { *count = status->len/8; if (8*(*count) != status->len) *count = MPI_UNDEFINED; } else if (datatype==MPI_DOUBLE_INT) { *count = status->len/16; if (16*(*count) != status->len) *count = MPI_UNDEFINED; } /* invalid datatype */ else ierror = 7; /* handle errors */ if (ierror) writerrs("MPI_Get_count: ",ierror); return ierror; } int MPI_Initialized(int *flag) { /* indicate whether MPI_Init has been called flag = true if MPI_Init has been called, false otherwise output: flag local data */ int ierror; /* internal mpi common block nproc = number of real or virtual processors obtained */ if (nproc > 0) *flag = 1; else *flag = 0; ierror = 0; return ierror; } int MPI_Comm_size(MPI_Comm comm, int *size) { /* determine the size of the group associated with a communicator comm = communicator (this is ignored) size = number of processors in the group of comm input: comm output: size local data */ int ierror, np; /* internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* get size */ else { np = mapcomm[comm][MAXS]; if ((np > 0) && (np <= nproc)) *size = np; else ierror = 2; } /* handle errors */ if (ierror) writerrs("MPI_Comm_size: ",ierror); return ierror; } int MPI_Comm_rank(MPI_Comm comm, int *rank) { /* determine the rank of the calling process in the communicator comm = communicator (this is ignored) rank = rank of the calling process in group of comm input: comm output: rank local data */ int ierror, np; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* get rank */ else { np = mapcomm[comm][MAXS]; if ((np > 0) && (np <= nproc)) { *rank = mapcomm[comm][MAXS+1]; if ((*rank >= 0) && (*rank < np)) { if (mapcomm[comm][*rank] != idproc) ierror = 29; } else ierror = 29; } else ierror = 2; } /* handle errors */ if (ierror) writerrs("MPI_Comm_rank: ",ierror); return ierror; } int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm) { /* duplicate existing communicator with all its cached information comm = communicator newcomm = new communicator input: comm output: newcomm local data */ int ierror, np, i, j, k; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; *newcomm = MPI_COMM_NULL; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; } /* handle errors */ if (ierror) { writerrs("MPI_Comm_dup: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Comm_dup started\n"); /* find space for communication record */ i = 1; L10: i += 1; if (i >= MAXC) { fprintf(unit2,"too many communicators\n"); ierror = 25; writerrs("MPI_Comm_dup: ",ierror); return ierror; } else if (mapcomm[i][MAXS] > 0) goto L10; /* check if all nodes agree on new communicator */ ierror = MPI_Allreduce(&i,&j,1,MPI_INT,MPI_MIN,comm); ierror = MPI_Allreduce(&i,&k,1,MPI_INT,MPI_MAX,comm); if (j != k) { /* try to find another communicator */ i = k - 1; goto L10; } /* duplicate mapping */ for (j = 0; j < MAXS+MAXD+3; j++) mapcomm[i][j]= mapcomm[comm][j]; /* assign communicator */ *newcomm = i; if (monitor==2) fprintf(unit2,"MPI_Comm_dup complete\n"); return ierror; } int MPI_Comm_split(MPI_Comm comm, int color, int key, MPI_Comm *newcomm) { /* create a new communicator based on color and key comm = communicator color = control of subset assignment key = control of rank assignment newcomm = new communicator input: comm, color, key output: newcomm local data */ int ierror, np, i, mp, j, k, l, kmin; /* declare internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; *newcomm = MPI_COMM_NULL; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid color */ else if (color < (-1)) { fprintf(unit2,"Invalid color = %d\n",color); ierror = 23; } /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; } /* handle errors */ if (ierror) { writerrs("MPI_Comm_split: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Comm_split started\n"); /* find space for communication record */ i = 1; L10: i += 1; if (i >= MAXC) { fprintf(unit2,"too many communicators, color = %d\n",color); ierror = 25; writerrs("MPI_Comm_split: ",ierror); return ierror; } else if (mapcomm[i][MAXS] > 0) goto L10; /* check if all nodes agree on new communicator */ ierror = MPI_Allreduce(&i,&j,1,MPI_INT,MPI_MIN,comm); ierror = MPI_Allreduce(&i,&k,1,MPI_INT,MPI_MAX,comm); if (j != k) { /* try to find another communicator */ i = k - 1; goto L10; } /* gather all the colors */ ierror = MPI_Allgather(&color,1,MPI_INT,&mapcomm[i][0],1,MPI_INT,comm); /* this node does not participate */ if (color==MPI_UNDEFINED) { if (monitor==2) fprintf(unit2,"MPI_Comm_split complete\n"); return ierror; } /* find other processors with same color */ mp = -1; mapcomm[i][MAXS+1] = -1; for (j = 0; j < np; j++) { if (mapcomm[i][j]==color) { mp += 1; k = mapcomm[comm][j]; if ((k >= 0) || (k < np)) { mapcomm[i][mp] = k; if (k==idproc) mapcomm[i][MAXS+1] = mp; } else ierror = 2; } } mp += 1; /* no nodes with found with color */ if (!mp) { fprintf(unit2,"Self color not found\n"); ierror = 2; } /* no nodes found with idproc */ else if (mapcomm[i][MAXS+1] < 0) { fprintf(unit2,"Self rank not found\n"); ierror = 2; } /* handle errors */ if (ierror) { writerrs("MPI_Comm_split: ",ierror); return ierror; } /* finish mapping */ for (j = mp; j < MAXS; j++) mapcomm[i][j] = MPI_UNDEFINED; mapcomm[i][MAXS] = mp; /* assign communicator */ *newcomm = i; /* gather all the keys into MPI_COMM_SELF mapping */ ierror = MPI_Allgather(&key,1,MPI_INT,&mapcomm[1][0],1,MPI_INT,*newcomm); k = 0; /* find lowest remaining key */ L40: kmin = mapcomm[1][k]; for (j = k+1; j < mp; j++) { if (mapcomm[1][j] < kmin) kmin = mapcomm[1][j]; } /* order all nodes with lowest remaining key */ for (j = k; j < mp; j++) { if (mapcomm[1][j]==kmin) { k += 1; /* right shift node and key order, if necessary */ if (j >= k) { np = mapcomm[i][j]; for (l = 0; l < j-k+1; l++) { mapcomm[i][j-l] = mapcomm[i][j-l-1]; mapcomm[1][j-l] = mapcomm[1][j-l-1]; } mapcomm[i][k-1] = np; } } } if (k < mp) goto L40; /* find self rank */ mapcomm[i][MAXS+1] = -1; for (j = 0; j < mp; j++) { k = mapcomm[i][j]; if (k==idproc) mapcomm[i][MAXS+1] = j; } mapcomm[i][MAXS+2] = 0; /* restore MPI_COMM_SELF map */ mapcomm[1][0] = idproc; for (j = 1; j < mp; j++) mapcomm[1][j] = MPI_UNDEFINED; if (monitor==2) fprintf(unit2,"MPI_Comm_split complete\n"); return ierror; } int MPI_Comm_free(MPI_Comm *comm) { /* mark the communicator object for deallocation comm = communicator input: comm output: comm local data */ int ierror, np, i; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((*comm < 0) || (*comm >= MAXC)) ierror = 2; /* release communicator */ else if (*comm > 1) { np = mapcomm[*comm][MAXS]; if ((np > 0) && (np <= nproc)) { /* clear mapping */ for (i = 0; i < MAXS; i++) { mapcomm[*comm][i] = 0; mapcomm[*comm][MAXS] = 0; mapcomm[*comm][MAXS+1] = MPI_UNDEFINED; mapcomm[*comm][MAXS+2] = 0; *comm = MPI_COMM_NULL; } } else ierror = 2; } /* handle errors */ if (ierror) writerrs("MPI_Comm_free: ",ierror); return ierror; } int MPI_Cart_create(MPI_Comm comm_old, int ndims, int *dims, int *periods, int reorder, MPI_Comm *comm_cart) { /* make new communicator to which topology information has been attached comm_old = input communicator ndims = number of dimensions of Cartesian grid dims = array specifying the number of processes in each dimension periods = array specifying whether grid is periodic or not reorder = specifies whether ranks may be reordered or not (ignored) comm_cart = communicator with new Carteisan topology input: comm_old, ndims, dims, periods, reorder output: comm_cart local data */ int ierror, np, mp, i, j, k; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; *comm_cart = MPI_COMM_NULL; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm_old < 0) || (comm_old >= MAXC)) ierror = 2; /* invalid topology */ else if ((ndims < 0) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* communicator errors */ else { np = mapcomm[comm_old][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* check topology length */ else { mp = 1; for (i = 0; i < ndims; i++) mp = mp*dims[i]; if (!ndims) mp = 0; if ((mp < 0) || (mp > np)) { fprintf(unit2,"Invalid Cartesian topology size = %d\n",mp); ierror = 27; } } } /* handle errors */ if (ierror) { writerrs("MPI_Cart_create: ",ierror); return ierror; } if (!mp) return ierror; if (monitor==2) fprintf(unit2,"MPI_Cart_create started\n"); /* find space for communication record */ i = 1; L20: i += 1; if (i >= MAXC) { fprintf(unit2,"too many communicators\n"); ierror = 25; writerrs("MPI_Cart_create: ",ierror); return ierror; } else if (mapcomm[i][MAXS] > 0) goto L20; /* check if all nodes agree on new communicator */ ierror = MPI_Allreduce(&i,&j,1,MPI_INT,MPI_MIN,comm_old); ierror = MPI_Allreduce(&i,&k,1,MPI_INT,MPI_MAX,comm_old); if (j != k) { /* try to find another communicator */ i = k - 1; goto L20; } /* quit if processor is beyond range of topology */ if (mapcomm[comm_old][MAXS+1] >= mp) { if (monitor==2) fprintf(unit2,"MPI_Cart_create complete\n"); return ierror; } /* create mapping */ for (j = 0; j < mp; j++) mapcomm[i][j] = mapcomm[comm_old][j]; for (j = mp; j < MAXS; j++) mapcomm[i][j]= MPI_UNDEFINED; mapcomm[i][MAXS] = mp; mapcomm[i][MAXS+1] = mapcomm[comm_old][MAXS+1]; /* store topology */ mapcomm[i][MAXS+2] = ndims; for (j = 0; j < ndims; j++) if (periods[j]) mapcomm[i][MAXS+3+j] = dims[j]; else mapcomm[i][MAXS+3+j] = -dims[j]; /* assign communicator */ *comm_cart = i; if (monitor==2) fprintf(unit2,"MPI_Cart_create complete\n"); return ierror; } int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int *coords) { /* determine process coords in Cartesian topology, given rank in group comm = communicator with Cartesian structure rank = rank of a process within group of comm maxdims = length of vector coords in the calling program coords = array containing Cartesian coordinates of specified process input: comm, rank, maxdims output: coords local data */ int ierror, np, ndims, i, j; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; ndims = mapcomm[comm][MAXS+2]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid topology */ else if ((ndims < 1) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* invalid vector length */ else if (maxdims < ndims) { fprintf(unit2,"Vector length too small = %d\n",maxdims); ierror = 28; } /* invalid rank */ else { if ((rank < 0) || (rank >= np)) { fprintf(unit2,"Invalid rank = %d\n",rank); ierror = 29; } } } /* handle errors */ if (ierror) { writerrs("MPI_Cart_coords: ",ierror); return ierror; } /* calculate cartesian coordinates */ j = rank; for (i = 0; i < ndims; i++) { np = np/abs(mapcomm[comm][MAXS+3+i]); coords[i] = j/np; j -= coords[i]*np; } return ierror; } int MPI_Cart_get(MPI_Comm comm, int maxdims, int *dims, int *periods, int *coords) { /* retrieve cartesian topology information associated with communicator comm = communicator with Cartesian structure maxdims = length of vectors dims, periods and coords dims = number of processes for each Cartesian dimension periods = periodicity (true/false) for each Cartesian dimension coords = array containing Cartesian coordinates of specified process input: comm, maxdims output: dims, periods, coords local data */ int ierror, np, ndims, rank, i, j, k; /* declare internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; ndims = mapcomm[comm][MAXS+2]; rank = mapcomm[comm][MAXS+1]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid topology */ else if ((ndims < 1) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* invalid vector length */ else if (maxdims < ndims) { fprintf(unit2,"Vector length too small = %d\n",maxdims); ierror = 28; } /* get rank */ else if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } /* handle errors */ if (ierror) { writerrs("MPI_Cart_get: ",ierror); return ierror; } /* calculate dimension, periodicity, and cartesian coordinates */ j = rank; for (i = 0; i < ndims; i++) { k = mapcomm[comm][MAXS+3+i]; if (k > 0) periods[i] = 1; else { periods[i] = 0; k = -k; } dims[i] = k; np = np/k; coords[i] = j/np; j -= coords[i]*np; } return ierror; } int MPI_Cart_shift(MPI_Comm comm, int direction, int disp, int *rank_source, int *rank_dest) { /* return shifted source and destination ranks given shift direction and amount comm = communicator with Cartesian structure direction = coordinate dimension shift disp = displacement (> 0: upwards shift, < 0: downwards shift) rank_source = rank of source process rank_dest = rank of destination process input: comm, direction, disp output: rank_source, rank_dest local data */ int ierror, np, ndims, rank, i, j, k, l, shift; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; ndims = mapcomm[comm][MAXS+2]; rank = mapcomm[comm][MAXS+1]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid topology */ else if ((ndims < 1) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* invalid direction */ else if ((direction < 0) || (direction >= ndims)) { fprintf(unit2,"Invalid direction = %d\n",direction); ierror = 31; } /* get rank */ else if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } /* handle errors */ if (ierror) { writerrs("MPI_Cart_shift: ",ierror); return ierror; } /* check if shift amount is zero */ if (!disp) { *rank_source = rank; *rank_dest = rank; return ierror; } /* find coordinate for selected direction */ j = rank; shift = np; for (i = 0; i < direction+1; i++) { shift = shift/abs(mapcomm[comm][MAXS+3+i]); k = j/shift; j -= k*shift; } /* calculate size of shift */ shift = 1; for (i = direction+1; i < ndims; i++) shift = shift*abs(mapcomm[comm][MAXS+3+i]); /* size of selected direction */ l = mapcomm[comm][MAXS+3+direction]; /* calculate new coordinate */ /* periodic boundary conditions */ if (l > 0) { /* make disp also periodic */ i = abs(disp)%l; if (disp < 0) i = -i; /* right neighbor */ j = k + i; if (j < 0) j += l; else if (j >= l) j -= l; *rank_dest = rank + (j - k)*shift; /* left neighbor */ j = k - i; if (j < 0) j += l; else if (j >= l) j -= l; *rank_source = rank + (j - k)*shift; } /* non-periodic boundary conditions */ else if (l < 0) { /* right neighbor */ j = k + disp; if ((j < 0) || (j >= (-l))) *rank_dest = MPI_PROC_NULL; else *rank_dest = rank + (j - k)*shift; /* left neighbor */ j = k - disp; if ((j < 0) || (j >= (-l))) *rank_source = MPI_PROC_NULL; else *rank_source = rank + (j - k)*shift; } /* verify ranks */ if (*rank_source != MPI_PROC_NULL) { if ((*rank_source < 0) || (*rank_source >= np)) { fprintf(unit2,"rank_source = %d\n",*rank_source); ierror = 29; } } if (*rank_dest != MPI_PROC_NULL) { if ((*rank_dest < 0) || (*rank_dest >= np)) { fprintf(unit2,"rank_dest = %d\n",*rank_dest); ierror = 29; } } /* process errors */ if (ierror) writerrs("MPI_Cart_shift: ",ierror); return ierror; } int MPI_Cart_rank(MPI_Comm comm, int *coords, int *rank) { /* determine process rank in communicator, given Cartesian location comm = communicator with Cartesian structure coords = array specifying Cartesian coordinates of a process rank = rank of specified process input: comm, coords output: rank local data */ int ierror, np, ndims, i, j, k, l; /* declare internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; ndims = mapcomm[comm][MAXS+2]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid topology */ else if ((ndims < 1) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* invalid coords */ else { for (i = 0; i < ndims; i++) { j = mapcomm[comm][MAXS+3+i]; if (j < 0) { if ((coords[i] < 0) || (coords[i] >= (-j))) { fprintf(unit2,"Invalid ith coord = %d,%d\n",i,coords[i]); ierror = 30; } } } } } /* handle errors */ if (ierror) { writerrs("MPI_Cart_rank: ",ierror); return ierror; } /* calculate rank, wrap periodic coordinates */ l = 0; for (i = 0; i < ndims; i++) { j = mapcomm[comm][MAXS+3+i]; k = coords[i]; if (j > 0) { if (k < 0) k += j; else if (k >= j) k -= j; } else j = -j; l = k + j*l; } /* verify rank */ if ((l < 0) || (l >= np)) { fprintf(unit2,"Invalid coords, resulting rank = %d\n",l); ierror = 30; writerrs("MPI_Cart_rank: ",ierror); } else *rank = l; return ierror; } int MPI_Cart_sub(MPI_Comm comm, int *remain_dims, MPI_Comm *newcomm) { /* partition communicator into subgroups that form lower-dimensional Cartesian subgrids comm = communicator with Cartesian structure remain_dims = each entry of remain_dims specifies whether dimension is kept in the subgrid or not newcomm = communicator containing the subgrid input: comm, remain_dims output: newcomm local data */ int ierror, np, ndims, rank, i, j, k, l, m, mp, color, key; /* declare internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; *newcomm = MPI_COMM_NULL; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; ndims = mapcomm[comm][MAXS+2]; rank = mapcomm[comm][MAXS+1]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid topology */ else if ((ndims < 1) || (ndims > MAXD)) { fprintf(unit2,"Invalid topology dimension = %d\n",ndims); ierror = 26; } /* get rank */ else if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } /* handle errors */ if (ierror) { writerrs("MPI_Cart_sub: ",ierror); return ierror; } /* find dimension of new topology */ k = 0; for (j = 0; j < ndims; j++) { if (remain_dims[j]) k += 1; } /* empty topology */ if (!k) return ierror; if (monitor==2) fprintf(unit2,"MPI_Cart_sub started\n"); /* find space for communication record */ i = 1; L20: i += 1; if (i >= MAXC) { fprintf(unit2,"too many communicators\n"); ierror = 25; writerrs("MPI_Cart_sub: ",ierror); return ierror; } else if (mapcomm[i][MAXS] > 0) goto L20; /* check if all nodes agree on new communicator */ ierror = MPI_Allreduce(&i,&j,1,MPI_INT,MPI_MIN,comm); ierror = MPI_Allreduce(&i,&k,1,MPI_INT,MPI_MAX,comm); if (j != k) { /* try to find another communicator */ i = k - 1; goto L20; } /* find color for missing dimension */ j = rank; color = 0; key = 0; mp = np; for (l = 0; l < ndims; l++) { m = abs(mapcomm[comm][MAXS+3+l]); mp = mp/m; k = j/mp; j -= k*mp; if (remain_dims[l]) key = k + key*m; else color = k + color*m; } /* gather all the colors */ ierror = MPI_Allgather(&color,1,MPI_INT,&mapcomm[i][0],1,MPI_INT,comm); /* find other processors with same color */ mp = -1; mapcomm[i][MAXS+1] = -1; for (j = 0; j < np; j++) { if (mapcomm[i][j]==color) { mp += 1; k = mapcomm[comm][j]; if ((k >= 0) || (k < np)) { mapcomm[i][mp] = k; if (k==idproc) mapcomm[i][MAXS+1] = mp; } else ierror = 2; } } mp += 1; /* no nodes with found with color */ if (!mp) { fprintf(unit2,"Self color not found\n"); ierror = 2; } /* no nodes found with idproc */ else if (mapcomm[i][MAXS+1] < 0) { fprintf(unit2,"Self rank not found\n"); ierror = 2; } /* handle errors */ if (ierror) { writerrs("MPI_Cart_sub: ",ierror); return ierror; } /* finish mapping */ for (j = mp; j < MAXS; j++) mapcomm[i][j] = MPI_UNDEFINED; mapcomm[i][MAXS] = mp; /* assign communicator */ *newcomm = i; /* create new topology */ k = 0; for (j = 0; j < ndims; j++) { if (remain_dims[j]) { k += 1; mapcomm[i][MAXS+2+k] = mapcomm[comm][MAXS+3+j]; } } mapcomm[i][MAXS+2] = k; if (monitor==2) fprintf(unit2,"MPI_Cart_sub complete\n"); return ierror; } int MPI_Dims_create(int nnodes, int ndims, int *dims) { /* create a division of processes in a Cartesian grid nnodes = number of nodes in a grid ndims = number of Cartesian dimensions dims = array specifying number of nodes in each dimension input: nnodes, ndims, dims output: dims local data */ int ierror, i, j, nd, mp, md; ierror = 0; /* check for errors */ nd = 0; mp = 1; for (i = 0; i < ndims; i++) { if (dims[i]==0) nd += 1; else if (dims[i] > 0) mp = mp*dims[i]; else if (dims[i] < 0) ierror = 26; } if (!nd) return ierror; if (ierror) fprintf(unit2,"Invalid topology dimension\n"); else if ((nnodes < 1) || (nnodes > MAXS)) { fprintf(unit2,"Invalid number of nodes = %d\n",nnodes); ierror = 24; } else { md = nnodes/mp; if ((md*mp) != nnodes) { fprintf(unit2,"Topology dimension, node number inconsistent\n"); ierror = 26; } } /* handle errors */ if (ierror) { writerrs("MPI_Dims_create: ",ierror); return ierror; } /* look for nd factors of md */ for (i = 0; i < ndims; i++) { if (!dims[i]) { mp = (int) (exp(log((float) md/(float) nd)) + .0001); if (((int) (pow(mp,nd) + .0001)) < md) mp += 1; L20: j = md/mp; if ((j*mp)==md) { dims[i] = mp; md = j; nd -= 1; } else { mp += 1; if (mp <= md) goto L20; fprintf(unit2,"MPI_Dims_create: factor not found\n"); ierror = 26; } } } /* sanity check */ if ((md != 1) || (nd != 0)) { fprintf(unit2,"MPI_Dims_create: missing factors\n"); ierror = 26; } return ierror; } int MPI_Bcast(void* buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) { /* broadcast a message from root to all processes in comm buffer = starting address of buffer count = number of entries in buffer datatype = datatype of buffer root = rank of broadcast root comm = communicator input: buffer, count, datatype, root, comm output: buffer local data */ int ierror, i, np, id, rank; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Bcast: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Bcast started\n"); /* start broadcast */ if (rank==root) { for (i = 0; i < np; i++) { if (i != root) ierror = MPI_Send(buffer,count,datatype,i,0,comm); } } else ierror = MPI_Recv(buffer,count,datatype,root,0,comm,&status); if (monitor==2) fprintf(unit2,"MPI_Bcast complete\n"); return ierror; } int MPI_Barrier(MPI_Comm comm) { /* blocks each process in comm until all processes have called it. comm = communicator input: comm local data */ int ierror, np, rank, ntasks, isync, irync, i; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Barrier: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Barrier started\n"); /* begin synchronization */ ntasks = np - 1; isync = -1; if (rank==0) { /* processor 0 receives a message from everyone else */ for (i=1; i <= ntasks; i++) { ierror = MPI_Recv(&irync,1,MPI_INT,i,0,comm,&status); if (irync != isync) fprintf(unit2,"sync error from proc %d\n",i); } /* then sends an acknowledgment back */ isync = 1; ierror = MPI_Bcast(&isync,1,MPI_INT,0,comm); } else { /* remaining processors send a message to processor 0 */ ierror = MPI_Send(&isync,1,MPI_INT,0,0,comm); /* then receive an acknowledgement back */ isync = 1; ierror = MPI_Bcast(&irync,1,MPI_INT,0,comm); if (irync != isync) fprintf(unit2,"rsync error at proc %d\n",rank); } if (monitor==2) fprintf(unit2,"MPI_Barrier complete\n"); return ierror; } int MPI_Reduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) { /* applies a reduction operation to the vector sendbuf over the set of processes specified by comm and places the result in recvbuf on root sendbuf = address of send buffer recvbuf = address of receive buffer count = number of elements in send buffer datatype = datatype of elements in send buffer op = reduce operation (only max, min and sum currently supported) root = rank of root process comm = communicator input: sendbuf, count, datatype, op, root, comm output: recvbuf local data */ int ierror, i, j, np, rank, id, ltmp, loct, nl, lcnt, lsize; void *tmpbuf; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid op */ else if ((op < 0) || (op > 5) || (op==3)) ierror = 20; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Reduce: ",ierror); return ierror; } /* determine size of temporary buffer */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) lsize = 4; else if (datatype==MPI_DOUBLE) lsize = 8; else if ((datatype==MPI_2INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_FLOAT_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_DOUBLE_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 16; /* invalid datatype */ else { ierror = 7; writerrs("MPI_Reduce: ",ierror); return ierror; } loct = 0; if (rank==root) { /* initialize by copying from send to receive buffer */ if (datatype==MPI_INT) iredux((int *)recvbuf,(int *)sendbuf,loct,count,-1); else if (datatype==MPI_FLOAT) fredux((float *)recvbuf,(float *)sendbuf,loct,count,-1); else if (datatype==MPI_DOUBLE) dredux((double *)recvbuf,(double *)sendbuf,loct,count,-1); else if (datatype==MPI_2INT) iredux((int *)recvbuf,(int *)sendbuf,loct,2*count,-1); else if (datatype==MPI_FLOAT_INT) fredux((float *)recvbuf,(float *)sendbuf,loct,2*count,-1); } ltmp = lsize*count; /* allocate a nonrelocatable block of memory */ tmpbuf = NewPtr(ltmp); /* memory not available */ if (!tmpbuf) { ierror = 21; writerrs("MPI_Reduce: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Reduce started\n"); ltmp = ltmp/lsize; /* send messages in groups of ltmp */ nl = (count - 1)/ltmp + 1; lcnt = ltmp; lsize = lsize*ltmp/4; for (j = 0; j < nl; j++) { /* better check to see if this is OK */ if (j==(nl-1)) lcnt = count - ltmp*(nl - 1); if (rank==root) { /* root receives data from everyone else */ for (i = 0; i < np; i++) { if (i != root) { ierror = MPI_Recv(tmpbuf,lcnt,datatype,i,j+1,comm,&status); /* reduce data */ if (datatype==MPI_INT) iredux((int *)recvbuf,(int *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_FLOAT) fredux((float *)recvbuf,(float *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_DOUBLE) dredux((double *)recvbuf,(double *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_2INT) iredux((int *)recvbuf,(int *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_FLOAT_INT) fredux((float *)recvbuf,(float *)tmpbuf,loct,lcnt,op); } } loct = loct + ltmp; } else { /* remaining processors send data to root */ ierror = MPI_Send(&((int *)sendbuf)[loct],lcnt,datatype,root,j+1,comm); loct = loct + lsize; } } /* release nonrelocatable memory block */ DisposePtr((char *)tmpbuf); if (monitor==2) fprintf(unit2,"MPI_Reduce complete\n"); return ierror; } int MPI_Scan(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { /* performs a parallel prefix reduction on data distributed across a group sendbuf = address of send buffer recvbuf = address of receive buffer count = number of elements in send buffer datatype = datatype of elements in send buffer op = reduce operation (only max, min and sum currently supported) comm = communicator input: sendbuf, count, datatype, op, comm output: recvbuf local data */ int ierror, i, j, np, rank, id, root = 0, ltmp, loct, nl, lcnt, lsize; void *tmpbuf; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid op */ else if ((op < 0) || (op > 5) || (op==3)) ierror = 20; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Scan: ",ierror); return ierror; } /* determine size of temporary buffer */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) lsize = 4; else if (datatype==MPI_DOUBLE) lsize = 8; else if ((datatype==MPI_2INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_FLOAT_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_DOUBLE_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 16; /* invalid datatype */ else { ierror = 7; writerrs("MPI_Scan: ",ierror); return ierror; } loct = 0; if (rank==root) { /* initialize by copying from send to receive buffer */ if (datatype==MPI_INT) iredux((int *)recvbuf,(int *)sendbuf,loct,count,-1); else if (datatype==MPI_FLOAT) fredux((float *)recvbuf,(float *)sendbuf,loct,count,-1); else if (datatype==MPI_DOUBLE) dredux((double *)recvbuf,(double *)sendbuf,loct,count,-1); else if (datatype==MPI_2INT) iredux((int *)recvbuf,(int *)sendbuf,loct,2*count,-1); else if (datatype==MPI_FLOAT_INT) fredux((float *)recvbuf,(float *)sendbuf,loct,2*count,-1); } ltmp = lsize*count; /* allocate a nonrelocatable block of memory */ tmpbuf = NewPtr(ltmp); /* memory not available */ if (!tmpbuf) { ierror = 21; writerrs("MPI_Scan: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Scan started\n"); ltmp = ltmp/lsize; /* send messages in groups of ltmp */ nl = (count - 1)/ltmp + 1; lcnt = ltmp; lsize = lsize*ltmp/4; for (j = 0; j < nl; j++) { /* better check to see if this is OK */ if (j==(nl-1)) lcnt = count - ltmp*(nl - 1); if (rank==root) { /* root receives data from everyone else */ for (i = 0; i < np; i++) { if (i != root) { ierror = MPI_Recv(tmpbuf,lcnt,datatype,i,j+1,comm,&status); /* reduce data */ if (datatype==MPI_INT) iredux((int *)recvbuf,(int *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_FLOAT) fredux((float *)recvbuf,(float *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_DOUBLE) dredux((double *)recvbuf,(double *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_2INT) iredux((int *)recvbuf,(int *)tmpbuf,loct,lcnt,op); else if (datatype==MPI_FLOAT_INT) fredux((float *)recvbuf,(float *)tmpbuf,loct,lcnt,op); /* send partial result data to processor i */ ierror = MPI_Send(&((int *)recvbuf)[loct],lcnt,datatype,i, j+nproc+1,comm); } } loct = loct + ltmp; } else { /* remaining processors send data to root */ ierror = MPI_Send(&((int *)sendbuf)[loct],lcnt,datatype,root,j+1,comm); /* receive partial result data from root */ ierror = MPI_Recv(&((int *)recvbuf)[loct],lcnt,datatype,root,j+nproc+1, comm,&status); loct = loct + lsize; } } if (rank==root) { loct = 0; /* initialize by copying from send to receive buffer */ if (datatype==MPI_INT) iredux((int *)recvbuf,(int *)sendbuf,loct,count,-1); else if (datatype==MPI_FLOAT) fredux((float *)recvbuf,(float *)sendbuf,loct,count,-1); else if (datatype==MPI_DOUBLE) dredux((double *)recvbuf,(double *)sendbuf,loct,count,-1); else if (datatype==MPI_2INT) iredux((int *)recvbuf,(int *)sendbuf,loct,2*count,-1); else if (datatype==MPI_FLOAT_INT) fredux((float *)recvbuf,(float *)sendbuf,loct,2*count,-1); } /* release nonrelocatable memory block */ DisposePtr((char *)tmpbuf); if (monitor==2) fprintf(unit2,"MPI_Scan complete\n"); return ierror; } static int imax(int val1, int val2) { return (val1 > val2 ? val1 : val2); } static int imin(int val1, int val2) { return (val1 < val2 ? val1 : val2); } static float flmax(float val1, float val2) { return (val1 > val2 ? val1 : val2); } static float flmin(float val1, float val2) { return (val1 < val2 ? val1 : val2); } static double dmax(double val1, double val2) { return (val1 > val2 ? val1 : val2); } static double dmin(double val1, double val2) { return (val1 < val2 ? val1 : val2); } static void iredux(int *recvbuf, int *sendbuf, int offset, int count, MPI_Op op) { /* perform reduction operation for int types recvbuf = address of receive buffer sendbuf = address of send buffer offset = starting location in receive buffer count = number of elements in send buffer op = reduce operation (only max, min and sum currently supported) input: recvbuf, sendbuf, offset, count, op output: recvbuf local data */ int i, j, k; /* perform reduction */ if (op==MPI_MAX) { for (i = 0; i < count; i++) { recvbuf[i+offset] = imax(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_MIN) { for (i = 0; i < count; i++) { recvbuf[i+offset] = imin(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_SUM) { for (i = 0; i < count; i++) { recvbuf[i+offset] = recvbuf[i+offset] + sendbuf[i]; } } /* perform reduction and location */ else if (op==MPI_MAXLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j < k) { recvbuf[2*i+offset] = sendbuf[2*i]; recvbuf[2*i+offset+1] = sendbuf[2*i+1]; } else if (j==k) recvbuf[2*i+offset+1] = imin(recvbuf[2*i+offset+1],sendbuf[2*i+1]); } } else if (op==MPI_MINLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j > k) { recvbuf[2*i+offset] = sendbuf[2*i]; recvbuf[2*i+offset+1] = sendbuf[2*i+1]; } else if (j==k) recvbuf[2*i+offset+1] = imax(recvbuf[2*i+offset+1],sendbuf[2*i+1]); } } /* copy initial data */ else if (op==(-1)) { for (i = 0; i < count; i++) { recvbuf[i+offset] = sendbuf[i]; } } return; } static void fredux(float *recvbuf, float *sendbuf, int offset, int count, MPI_Op op) { /* perform reduction operation for float types recvbuf = address of receive buffer sendbuf = address of send buffer offset = starting location in receive buffer count = number of elements in send buffer op = reduce operation (only max, min and sum currently supported) input: recvbuf, sendbuf, offset, count, op output: recvbuf local data */ int i; float j, k; /* perform reduction */ if (op==MPI_MAX) { for (i = 0; i < count; i++) { recvbuf[i+offset] = flmax(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_MIN) { for (i = 0; i < count; i++) { recvbuf[i+offset] = flmin(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_SUM) { for (i = 0; i < count; i++) { recvbuf[i+offset] = recvbuf[i+offset] + sendbuf[i]; } } /* perform reduction and location */ else if (op==MPI_MAXLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j < k) { recvbuf[2*i+offset] = sendbuf[2*i]; *((int *)&recvbuf[2*i+offset+1]) = *((int *)&sendbuf[2*i+1]); } else if (j==k) *((int *)&recvbuf[2*i+offset+1]) = imin( *((int *)&recvbuf[2*i+offset+1]),*((int *)&sendbuf[2*i+1])); } } else if (op==MPI_MINLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j > k) { recvbuf[2*i+offset] = sendbuf[2*i]; *((int *)&recvbuf[2*i+offset+1]) = *((int *)&sendbuf[2*i+1]); } else if (j==k) *((int *)&recvbuf[2*i+offset+1]) = imax( *((int *)&recvbuf[2*i+offset+1]),*((int *)&sendbuf[2*i+1])); } } /* copy initial data */ else if (op==(-1)) { for (i = 0; i < count; i++) { recvbuf[i+offset] = sendbuf[i]; } } return; } static void dredux(double *recvbuf, double *sendbuf, int offset, int count, MPI_Op op) { /* perform reduction operation for double types recvbuf = address of receive buffer sendbuf = address of send buffer offset = starting location in receive buffer count = number of elements in send buffer op = reduce operation (only max, min and sum currently supported) input: recvbuf, sendbuf, offset, count, op output: recvbuf local data */ int i; double j, k; /* perform reduction */ if (op==MPI_MAX) { for (i = 0; i < count; i++) { recvbuf[i+offset] = dmax(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_MIN) { for (i = 0; i < count; i++) { recvbuf[i+offset] = dmin(recvbuf[i+offset],sendbuf[i]); } } else if (op==MPI_SUM) { for (i = 0; i < count; i++) { recvbuf[i+offset] = recvbuf[i+offset] + sendbuf[i]; } } /* perform reduction and location */ else if (op==MPI_MAXLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j < k) { recvbuf[2*i+offset] = sendbuf[2*i]; *((int *)&recvbuf[2*i+offset+1]) = *((int *)&sendbuf[2*i+1]); } else if (j==k) *((int *)&recvbuf[2*i+offset+1]) = imin( *((int *)&recvbuf[2*i+offset+1]),*((int *)&sendbuf[2*i+1])); } } else if (op==MPI_MINLOC) { for (i = 0; i < count; i++) { j = recvbuf[2*i+offset]; k = sendbuf[2*i]; if (j > k) { recvbuf[2*i+offset] = sendbuf[2*i]; *((int *)&recvbuf[2*i+offset+1]) = *((int *)&sendbuf[2*i+1]); } else if (j==k) *((int *)&recvbuf[2*i+offset+1]) = imax( *((int *)&recvbuf[2*i+offset+1]),*((int *)&sendbuf[2*i+1])); } } /* copy initial data */ else if (op==(-1)) { for (i = 0; i < count; i++) { recvbuf[i+offset] = sendbuf[i]; } } return; } int MPI_Allreduce(void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { /* applies a reduction operation to the vector sendbuf over the set of processes specified by comm and places result in recvbuf on all nodes sendbuf = address of send buffer recvbuf = address of receive buffer count = number of elements in send buffer datatype = datatype of elements in send buffer op = reduce operation (only max, min and sum currently supported) comm = communicator input: sendbuf, count, datatype, op, root, comm output: recvbuf local data */ int ierror, root = 0, ierr; /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ if (monitor==2) fprintf(unit2,"MPI_Allreduce started\n"); ierror = MPI_Reduce(sendbuf,recvbuf,count,datatype,op,root,comm); ierr = MPI_Bcast(recvbuf,count,datatype,root,comm); if (monitor==2) fprintf(unit2,"MPI_Allreduce complete\n"); return ierror; } int MPI_Gather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { /* collect individual messages from each process in comm at root sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcount = number of elements for any single receive recvtype = datatype of recv buffer elements root = rank of receiving process comm = communicator input: sendbuf, sendcount, sendtype, recvcount, recvtype, root, comm output: recvbuf local data */ int ierror, np, loct, lsize, id, rank, i, j; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid count */ else if (sendcount < 0) ierror = 3; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Gather: ",ierror); return ierror; } /* root receives data */ if (rank==root) { /* invalid count */ if (recvcount < 0) ierror = 3; /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) loct = sendcount; else if (sendtype==MPI_DOUBLE) loct = 2*sendcount; /* invalid datatype */ else { loct = 0; ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) lsize = recvcount; else if (recvtype==MPI_DOUBLE) lsize = 2*recvcount; /* invalid datatype */ else { lsize = 0; ierror = 7; } /* unequal message length error */ if (loct != lsize) { fprintf(unit2,"Unequal message length, send/receive bytes = %d,%d\n", loct,lsize); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Gather: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Gather started\n"); for (i = 0; i < np; i++) { loct = lsize*i; /* root copies its own data directly */ if (i==root) { for (j = 0; j < lsize; j++) ((int *)recvbuf)[j+loct] = ((int *)sendbuf)[j]; } /* otherwise, root receives data from other processors */ else ierror = MPI_Recv(&((int *)recvbuf)[loct],recvcount,recvtype,i,1,comm, &status); } } /* processors other than root send data to root */ else { if (monitor==2) fprintf(unit2,"MPI_Gather started\n"); ierror = MPI_Send(sendbuf,sendcount,sendtype,root,1,comm); } if (monitor==2) fprintf(unit2,"MPI_Gather complete\n"); return ierror; } int MPI_Allgather(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { /* gather individual messages from each process in comm and distribute the resulting message to each process. sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcount = number of elements for any process recvtype = datatype of receive buffer elements comm = communicator input: sendbuf, sendcount, sendtype, recvcount, recvtype, comm output: recvbuf local data */ int ierror, np, root = 0, ierr; /* internal mpi common block mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ if (monitor==2) fprintf(unit2,"MPI_Allgather started\n"); ierror = MPI_Gather(sendbuf,sendcount,sendtype,recvbuf,recvcount,recvtype, root,comm); np = mapcomm[comm][MAXS]; ierr = MPI_Bcast(recvbuf,np*recvcount,recvtype,root,comm); if (monitor==2) fprintf(unit2,"MPI_Allgather complete\n"); return ierror; } int MPI_Scatter(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { /* distribute individual messages from root to each process in comm sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcount = number of elements for any single receive recvtype = datatype of recv buffer elements root = rank of sending process comm = communicator input: sendbuf, sendcount, sendtype, recvcount, recvtype, root, comm output: recvbuf local data */ int ierror, np, lsize, loct, id, rank, i, j; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid count */ else if (recvcount < 0) ierror = 3; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Scatter: ",ierror); return ierror; } /* root sends data */ if (rank==root) { /* invalid counts */ if (sendcount < 0) ierror = 3; /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) lsize = sendcount; else if (sendtype==MPI_DOUBLE) lsize = 2*sendcount; /* invalid datatype */ else { lsize = 0; ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) loct = recvcount; else if (recvtype==MPI_DOUBLE) loct = 2*recvcount; /* invalid datatype */ else { loct = 0; ierror = 7; } /* unequal message length error */ if (loct != lsize) { fprintf(unit2,"Unequal message length, send/receive bytes = %d,%d\n", lsize,loct); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Scatter: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Scatter started\n"); for (i = 0; i < np; i++) { loct = lsize*i; /* root copies its own data directly */ if (i==root) { for (j = 0; j < lsize; j++) ((int *)recvbuf)[j] = ((int *)sendbuf)[j+loct]; } /* otherwise, root sends data to other processors */ else ierror = MPI_Send(&((int *)sendbuf)[loct],sendcount,sendtype,i,1,comm); } } /* processors other than root receive data from root */ else { if (monitor==2) fprintf(unit2,"MPI_Scatter started\n"); ierror = MPI_Recv(recvbuf,recvcount,recvtype,root,1,comm,&status); } if (monitor==2) fprintf(unit2,"MPI_Scatter complete\n"); return ierror; } int MPI_Alltoall(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { /* send a distinct message from each process to every process sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcount = number of elements for any single receive recvtype = datatype of recv buffer elements comm = communicator input: sendbuf, sendcount, sendtype, recvcount, recvtype, comm output: recvbuf local data */ int ierror, np, loct, lsize, id, rank, i, j; MPI_Request request; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid counts */ else if ((sendcount < 0) || (recvcount < 0)) ierror = 3; /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Alltoall: ",ierror); return ierror; } /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) loct = sendcount; else if (sendtype==MPI_DOUBLE) loct = 2*sendcount; /* invalid datatype */ else { loct = 0; ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) lsize = recvcount; else if (recvtype==MPI_DOUBLE) lsize = 2*recvcount; /* invalid datatype */ else { lsize = 0; ierror = 7; } /* unequal message length error */ if (loct != lsize) { fprintf(unit2,"Unequal message length, send/receive bytes = %d,%d\n", loct,lsize); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Alltoall: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Alltoall started\n"); for (i = 0; i < np; i++) { id = i - rank; if (id < 0) id += np; loct = lsize*id; /* each node copies its own data directly */ if (rank==id) { for (j = 0; j < lsize; j++) ((int *)recvbuf)[j+loct] = ((int *)sendbuf)[j+loct]; } /* otherwise, each node receives data from other nodes */ else { ierror = MPI_Irecv(&((int *)recvbuf)[loct],recvcount,recvtype,id,i+1,comm, &request) ; ierror = MPI_Send(&((int *)sendbuf)[loct],sendcount,sendtype,id,i+1,comm); ierror = MPI_Wait(&request,&status); } } if (monitor==2) fprintf(unit2,"MPI_Alltoall complete\n"); return ierror; } int MPI_Gatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm) { /* collect individual messages from each process in comm at root messages can have different sizes and displacements sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcounts = integer array displs = integer array of displacements recvtype = datatype of recv buffer elements root = rank of receiving process comm = communicator input: sendbuf, sendcount, sendtype, recvcounts, displs, recvtype input: root, comm output: recvbuf local data */ int ierror, np, loct, lsize, id, rank, i, j; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid count */ else if (sendcount < 0) ierror = 3; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Gatherv: ",ierror); return ierror; } /* root receives data */ if (rank==root) { /* invalid counts */ for (i = 0; i < np; i++) { if (recvcounts[i] < 0) ierror = 3; } /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) loct = 1; else if (sendtype==MPI_DOUBLE) loct = 2; /* invalid datatype */ else { ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) lsize = 1; else if (recvtype==MPI_DOUBLE) lsize = 2; /* invalid datatype */ else { ierror = 7; } /* unequal message length error */ id = lsize*recvcounts[rank]; if ((!ierror) && (loct*sendcount != id)) { fprintf(unit2,"Unequal self message, send/receive bytes = %d,%d\n", loct*sendcount,id); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Gatherv: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Gatherv started\n"); for (i = 0; i < np; i++) { loct = lsize*displs[i]; /* root copies its own data directly */ if (i==root) { for (j = 0; j < lsize*recvcounts[i]; j++) ((int *)recvbuf)[j+loct] = ((int *)sendbuf)[j]; } /* otherwise, root receives data from other processors */ else { if (monitor==2) fprintf(unit2,"MPI_Gatherv started\n"); ierror = MPI_Recv(&((int *)recvbuf)[loct],recvcounts[i],recvtype, i,1,comm,&status); } } } /* processors other than root send data to root */ else ierror = MPI_Send(sendbuf,sendcount,sendtype,root,1,comm); if (monitor==2) fprintf(unit2,"MPI_Gatherv complete\n"); return ierror; } int MPI_Allgatherv(void* sendbuf, int sendcount, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, MPI_Comm comm) { /* gather individual messages from each process in comm and distribute the resulting message to each process. messages can have different sizes and displacements sendbuf = starting address of send buffer sendcount = number of elements in send buffer sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcounts = integer array displs = integer array of displacements recvtype = datatype of receive buffer elements comm = communicator input: sendbuf, sendcount, sendtype, recvcounts, displs, recvtype input: comm output: recvbuf local data */ int ierror, np, i, ierr; /* internal mpi common block nproc = number of real or virtual processors obtained mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* communicator errors */ if ((comm >= 0) && (comm < MAXC)) { np = mapcomm[comm][MAXS]; if ((np <= 0) || (np > nproc)) ierror = 2; } else ierror = 2; /* handle errors */ if (ierror) { writerrs("MPI_Allgatherv: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Allgatherv started\n"); for (i = 0; i < np; i++) { ierr = MPI_Gatherv(sendbuf,sendcount,sendtype,recvbuf,recvcounts,displs, recvtype,i,comm); if (ierr) ierror = ierr; } if (monitor==2) fprintf(unit2,"MPI_Allgatherv complete\n"); return ierror; } int MPI_Scatterv(void* sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void* recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) { /* distribute individual messages from root to each process in comm messages can have different sizes and displacements sendbuf = starting address of send buffer sendcounts = integer array displs = integer array of displacements sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcount = number of elements for any single receive recvtype = datatype of recv buffer elements root = rank of sending process comm = communicator input: sendbuf, sendcounts, displs, sendtype, recvcount, recvtype input: root, comm output: recvbuf local data */ int ierror, np, lsize, loct, id, rank, i, j; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= nproc)) ierror = 19; /* invalid counts */ else if (recvcount < 0) ierror = 3; /* communicator errors */ else { np = mapcomm[comm][MAXS]; id = mapcomm[comm][root]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* invalid root */ else if ((root < 0) || (root >= np)) ierror = 19; /* invalid mapping */ else if ((id < 0) || (id >= nproc)) { fprintf(unit2,"Invalid mapping, root, node = %d,%d\n",root,id); ierror = 2; } /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Scatterv: ",ierror); return ierror; } /* root sends data */ if (rank==root) { /* invalid counts */ for (i = 0; i < np; i++) { if (sendcounts[i] < 0) ierror = 3; } /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) lsize = 1; else if (sendtype==MPI_DOUBLE) lsize = 2; /* invalid datatype */ else { ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) loct = 1; else if (recvtype==MPI_DOUBLE) loct = 2; /* invalid datatype */ else { ierror = 7; } /* unequal message length error */ id = lsize*sendcounts[rank]; if ((!ierror) && (loct*recvcount != id)) { fprintf(unit2,"Unequal self message, send/receive bytes = %d,%d\n", id,loct*recvcount); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Scatterv: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Scatterv started\n"); for (i = 0; i < np; i++) { loct = lsize*displs[i]; /* root copies its own data directly */ if (i==root) { for (j = 0; j < lsize*sendcounts[i]; j++) ((int *)recvbuf)[j] = ((int *)sendbuf)[j+loct]; } /* otherwise, root sends data to other processors */ else ierror = MPI_Send(&((int *)sendbuf)[loct],sendcounts[i],sendtype, i,1,comm); } } /* processors other than root receive data from root */ else { if (monitor==2) fprintf(unit2,"MPI_Scatterv started\n"); ierror = MPI_Recv(recvbuf,recvcount,recvtype,root,1,comm,&status); } if (monitor==2) fprintf(unit2,"MPI_Scatterv complete\n"); return ierror; } int MPI_Alltoallv(void* sendbuf, int *sendcounts, int *sdispls, MPI_Datatype sendtype, void* recvbuf, int *recvcounts, int *rdispls, MPI_Datatype recvtype, MPI_Comm comm) { /* send a distinct message from each process to every process messages can have different sizes and displacements sendbuf = starting address of send buffer sendcounts = integer array sdispls = integer array of send displacements sendtype = datatype of send buffer elements recvbuf = address of receive buffer recvcounts = integer array rdispls = integer array of receive displacements recvtype = datatype of recv buffer elements comm = communicator input: sendbuf, sendcount, sendtype, recvcount, recvtype, comm output: recvbuf local data */ int ierror, np, locs, msize, loct, lsize, id, ld, rank, i, j; MPI_Request request; MPI_Status status; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* check for error conditions */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* invalid comm */ else if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* invalid counts */ for (i = 0; i < np; i++) { if ((sendcounts[i] < 0) || (recvcounts[i] < 0)) ierror = 3; } /* handle errors */ if (ierror) { writerrs("MPI_Alltoallv: ",ierror); return ierror; } /* determine size of data to be sent */ if ((sendtype==MPI_INT) || (sendtype==MPI_FLOAT)) msize = 1; else if (sendtype==MPI_DOUBLE) msize = 2; /* invalid datatype */ else { ierror = 7; } /* determine size of data to be received */ if ((recvtype==MPI_INT) || (recvtype==MPI_FLOAT)) lsize = 1; else if (recvtype==MPI_DOUBLE) lsize = 2; /* invalid datatype */ else { ierror = 7; } /* unequal message length error */ id = msize*sendcounts[rank]; ld = lsize*recvcounts[rank]; if ((!ierror) && (id != ld)) { fprintf(unit2,"Unequal self message length, send/receive bytes = %d,%d\n", id,ld); ierror = 22; } /* handle count, datatype and length errors */ if (ierror) { writerrs("MPI_Alltoallv: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Alltoallv started\n"); for (i = 0; i < np; i++) { id = i - rank; if (id < 0) id += np; locs = msize*sdispls[id]; loct = lsize*rdispls[id]; /* each node copies its own data directly */ if (rank==id) { for (j = 0; j < lsize*recvcounts[id]; j++) ((int *)recvbuf)[j+loct] = ((int *)sendbuf)[j+locs]; } /* otherwise, each node receives data from other nodes */ else { ierror = MPI_Irecv(&((int *)recvbuf)[loct],recvcounts[id],recvtype,id, i+1,comm,&request) ; ierror = MPI_Send(&((int *)sendbuf)[locs],sendcounts[id],sendtype,id, i+1,comm); ierror = MPI_Wait(&request,&status); } } if (monitor==2) fprintf(unit2,"MPI_Alltoallv complete\n"); return ierror; } int MPI_Reduce_scatter(void* sendbuf, void* recvbuf, int *recvcounts, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) { /* applies a reduction operation to the vector sendbuf over the set of processes specified by comm and scatters the result according to the values in recvcounts sendbuf = starting address of send buffer recvbuf = starting address of receive buffer recvcounts = integer array datatype = datatype of elements in input buffer op = reduce operation (only max, min and sum currently supported) comm = communicator input: sendbuf, recvcounts, datatype, op, comm output: recvbuf local data */ int ierror, np, rank, root = 0, count, lsize, ltmp, i; int *displs; void *tmpbuf; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id mapcomm = communicator map */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ ierror = 0; /* invalid comm */ if ((comm < 0) || (comm >= MAXC)) ierror = 2; /* communicator errors */ else { np = mapcomm[comm][MAXS]; /* communicator not mapped */ if ((np <= 0) || (np > nproc)) ierror = 2; /* get rank */ else { rank = mapcomm[comm][MAXS+1]; if ((rank >= 0) && (rank < np)) { if (mapcomm[comm][rank] != idproc) ierror = 29; } else ierror = 29; } } /* handle errors */ if (ierror) { writerrs("MPI_Reduce_scatter: ",ierror); return ierror; } ltmp = 4*np; /* allocate a nonrelocatable block of memory */ displs = (int *)NewPtr(ltmp); /* memory not available */ if (!displs) { ierror = 21; writerrs("MPI_Reduce_scatter: ",ierror); return ierror; } count = 0; /* determines overall count and displacements */ for (i = 0; i < np; i++) { displs[i] = count; count = count + recvcounts[i]; } /* determine size of temporary buffer */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) lsize = 4; else if (datatype==MPI_DOUBLE) lsize = 8; else if ((datatype==MPI_2INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_FLOAT_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 8; else if ((datatype==MPI_DOUBLE_INT) && ((op==MPI_MAXLOC) || (op==MPI_MINLOC))) lsize = 16; /* invalid datatype */ else { ierror = 7; writerrs("MPI_Reduce_scatter: ",ierror); return ierror; } if (monitor==2) fprintf(unit2,"MPI_Reduce_scatter started\n"); ltmp = lsize*count; /* allocate a nonrelocatable block of memory */ tmpbuf = NewPtr(ltmp); /* memory not available */ if (!tmpbuf) { ierror = 21; writerrs("MPI_Reduce_scatter: ",ierror); return ierror; } ierror = MPI_Reduce(sendbuf,tmpbuf,count,datatype,op,root,comm); ierror = MPI_Scatterv(tmpbuf,recvcounts,displs,datatype,recvbuf, recvcounts[rank],datatype,root,comm); /* release nonrelocatable memory block */ DisposePtr((char *)tmpbuf); /* release nonrelocatable memory block */ DisposePtr((char *)displs); if (monitor==2) fprintf(unit2,"MPI_Reduce_scatter complete\n"); return ierror; } int MPI_Abort(MPI_Comm comm, int errorcode) { /* force all tasks on an MPI environment to terminate comm = communicator errorcode = error code to return to invoking environment input: comm, errorcode local data */ int ierror; /* internal mpi common block nproc = number of real or virtual processors obtained */ /* MPI not initialized */ if (nproc <= 0) ierror = 1; /* this is just a temporary patch, have not yet notified everyone else */ else ierror = MPI_Finalize(); return ierror; } double MPI_Wtime(void) { /* return an elapsed time on the calling processor in seconds */ const double tick=1.0/1000.0; /* internal mpi common block stime = first time stamp if MPI_Init successful */ /* calculate time elapsed in milliseconds */ return (double) (OTElapsedMilliseconds(&stime))*tick; } double MPI_Wtick(void) { /* return the resolution of MPI_Wtime in seconds */ const double tick=1.0/1000.0; return tick; } int MPI_Type_extent(MPI_Datatype datatype, MPI_Aint *extent) { /* returns the size of a datatype datatype = datatype extent = datatype extent input: dataype output: extent local data */ int ierror; ierror = 0; /* find size of datatype */ if ((datatype==MPI_INT) || (datatype==MPI_FLOAT)) *extent = 4; else if (datatype==MPI_DOUBLE) *extent = 8; else if (datatype==MPI_BYTE) *extent = 1; else if ((datatype==MPI_2INT) || (datatype==MPI_FLOAT_INT)) *extent = 8; else if (datatype==MPI_DOUBLE_INT) *extent = 16; /* invalid datatype */ else { ierror = 7; fprintf(unit2,"MPI_Type_extent: Invalid datatype\n"); } return ierror; } int MPI_Get_processor_name(char *name,int *resultlen) { /* returns the name of the processor on which it was called name = a unique specifier for the current physical node resultlen = length of the result returned in name input: none output: name, resultlen local data */ int ierror, nv; InetInterfaceInfo info; long oss; char myself[18], ename[8]; /* declare internal mpi common block idproc = processor id */ ierror = 0; /* Convert processor id to printable integer */ sprintf(ename,"%d",idproc); /* Obtain information about the Internet environment */ oss = OTInetGetInterfaceInfo(&info,kDefaultInetInterface); if (oss) { ierror = oss; fprintf(unit2,"MPI_Get_processor_name: GetInterfaceInfo Error"); return ierror; } OTInetHostToString(info.fAddress,myself); strcat(myself,":"); strcat(myself,ename); nv = imin(strlen(myself),MPI_MAX_PROCESSOR_NAME); strncpy(name,myself,nv+1); *resultlen = nv; return ierror; } void writerrs(char *source, int ierror) { /* this subroutine writes out error descriptions from error codes source = source subroutine of error message ierror = error indicator input: source, ierror local data */ int ierr, fatal=1; /* check error code and print corresponding message */ if (ierror==1) fprintf(unit2,"%s MPI not initialized\n",source); else if (ierror==2) fprintf(unit2,"%s Invalid Communicator\n",source); else if (ierror==3) fprintf(unit2,"%s Invalid count\n",source); else if (ierror==4) fprintf(unit2,"%s Invalid destination\n",source); else if (ierror==5) fprintf(unit2,"%s Invalid source\n",source); else if (ierror==6) fprintf(unit2,"%s Invalid tag\n",source); else if (ierror==7) fprintf(unit2,"%s Invalid datatype\n",source); else if (ierror==12) fprintf(unit2,"%s Incomplete read\n",source); else if (ierror==16) fprintf(unit2,"%s Invalid request handle\n",source); else if (ierror==18) fprintf(unit2,"%s Mismatched dataype\n",source); else if (ierror==19) fprintf(unit2,"%s Invalid root\n",source); else if (ierror==20) fprintf(unit2,"%s Invalid operation\n",source); else if (ierror==21) fprintf(unit2,"%s Unable to allocate memory\n",source); else if (ierror==29) fprintf(unit2,"%s Invalid rank or communicator\n",source); else if (ierror==(-1)) fprintf(unit2,"%s Orderly disconnect request received\n",source); else if (ierror==(-2)) fprintf(unit2,"%s Disorderly disconnect request received\n",source); else if (ierror==(-3)) fprintf(unit2,"%s Unexpected flag returned on OTRcv\n",source); /* unlisted error code */ else fprintf(unit2,"%s Error code = %d\n",source,ierror); /* abort if error is fatal */ if (fatal) { fflush(unit2); ierr = MPI_Abort(MPI_COMM_WORLD,ierror); exit(0); } } void rwstat(int request, FILE *unit) { /* this subroutine writes out the status of a read-write record request = request handle unit = output file pointer input: request, unit */ /* internal common block for non-blocking messages rwrec = read/write record for asynchronous messages */ /* invalid request handle */ if ((request < 0) || (request >= MAXM)) return; fprintf(unit,"\n"); if (curreq[request][0]==(-1)) fprintf(unit," transmission mode = send\n"); else if (curreq[request][0]==1) fprintf(unit," transmission mode = receive\n"); fprintf(unit," destination/source = %d\n",curreq[request][1]); fprintf(unit," communicator = %d\n",curreq[request][2]); fprintf(unit," tag = %d\n",curreq[request][3]); fprintf(unit," datatype = %d\n",curreq[request][4]); fprintf(unit," request handle = %d\n",request); fprintf(unit," Endpoint reference pointer = %p\n",rwrec[request].ref); fprintf(unit," iocompletion flag = %d\n",rwrec[request].ioflag); fprintf(unit," current buffer pointer = %p\n",rwrec[request].buf); fprintf(unit," current buffer length = %d\n",rwrec[request].nbytes); fprintf(unit," T_MORE flag = %d\n",rwrec[request].flags); fprintf(unit," data pointer = %p\n",rwrec[request].sbuf); fprintf(unit," data length = %d\n",rwrec[request].sln); fprintf(unit," actual length sent/received = %d\n",rwrec[request].len); fprintf(unit," first time stamp = %d,%d\n",rwrec[request].ts[0].hi, rwrec[request].ts[0].lo); fprintf(unit," second time stamp = %d,%d\n",rwrec[request].ts[1].hi, rwrec[request].ts[1].lo); fprintf(unit," next message pointer = %d\n",rwrec[request].nextm); fprintf(unit," non-fatal error code = %d\n",rwrec[request].nfatal); fprintf(unit,"\n"); return; } void wqueue(FILE *unit) { /* this subroutine writes queue of pending messages unit = output file pointer input: iunit local data */ int i, is = 0; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id ioc = array of context pointers for notifier function */ /* internal common block for non-blocking messages mqueue = message request queue */ for (i = 0; i < nproc; i++) { if (ioc[i][2]) { fprintf(unit,"Incomplete receive: node, handle =%d,%d\n", i,ioc[i][2]); is += 1; } if (mqueue[i][0]) { fprintf(unit,"Non-empty receive queue end: node, handle =%d,%d\n", i,mqueue[i][0]); is += 1; } if (ioc[i][3]) { fprintf(unit,"Incomplete send: node, handle =%d,%d\n", i,ioc[i][3]); is += 1; } if (mqueue[i][1]) { fprintf(unit,"Non-empty send queue end: node, handle =%d,%d\n", i,mqueue[i][1]); is += 1; } } i = MAXS; if (ioc[i][3]) { fprintf(unit,"Incomplete selfsend: node, handle =%d,%d\n", idproc,ioc[i][3]); is += 1; } if (mqueue[i][1]) { fprintf(unit,"Non-empty selfsend queue end: node, handle =%d,%d\n", idproc,mqueue[i][1]); is += 1; } if (is > 0) fprintf(unit,"\n"); return; } void messwin(int nvp) { /* this subroutine creates a window for showing MPI message status nvp = number of real or virtual processors input argument: nvp local data */ Rect wrect, trect; short iw, is, ix, iy, it; GDevice** handle; GrafPtr wptr; int n, i; char name[37]; /* internal mpi common block nproc = number of real or virtual processors obtained */ /* common block for message window cpptr = pointer to window structure crect = current drag region nsp = amount of space between boxes nbx = size of box nds = number of message sizes monitored mbs = maximum maximum bandwidth in MB/sec expected */ /* get handle to main graphics device that carries a menu bar */ handle = GetMainDevice(); /* get size of screen */ crect.bottom = (*handle)->gdRect.bottom; crect.right = (*handle)->gdRect.right; wrect.bottom = crect.bottom - 40; wrect.right = crect.right - 12; /* find which grafPort is currently active */ GetPort(&wptr); /* calculate size of window */ n = imin(nvp,nproc); wrect.top = wrect.bottom - 2*(nbx + nsp) - nbx; wrect.left = wrect.right - ((nbx + nsp)*imax(n,8) + nsp); /* add more space for distribution function */ wrect.top -= (6*nbx + nsp); /* add more space for speedometer */ wrect.top -= (4*nbx + nsp); /* add more space for user-defined label */ wrect.top -= nbx; /* name = label for window */ strcpy(name," MacMPI"); name[0] = 6; /* create a window */ if (!cpptr) cpptr = NewCWindow(NULL,&wrect,(unsigned char *)name, 1,4,(WindowPtr)(-1),0,0); /* activate a GrafPort */ SetPort(GetWindowPort(cpptr)); /* keep a rectangular area from being updated */ GetPortBounds(GetWindowPort(cpptr),&trect); ValidWindowRect(cpptr,&trect); /* calculate clipping region */ wrect.bottom -= wrect.top; wrect.right -= wrect.left; wrect.top = 0; wrect.left = 0; /* select color to use in foreground drawing to black */ BackColor(33); /* fill rectangle with background pattern */ EraseRect(&wrect); /* calculate drag region */ crect.top = 4; crect.left = 4; crect.bottom -= 4; crect.right -= 4; /* select color to use in foreground drawing to white */ ForeColor(30); /* set dimensions of pen for current Grafport */ PenSize(1,1); /* set point size for subsequent text drawing to 12 points */ TextSize(12); /* set initial x and y coordinates for label */ iw = nbx + nsp; ix = nsp; iy = iw + nbx; /* write label for processor id */ for (i = 0; i < n; i++) { /* convert node number to string */ sprintf(name,"%2d",i); /* get width of unformatted text */ is = TextWidth(name,0,2); /* set pen location without drawing */ MoveTo(ix+(nbx-is)/2,iy); /* draw text from any arbitrary buffer */ DrawText(name,0,2); /* update x coordinate */ ix += iw; } /* write label for message state display */ strcpy(name,"Send=Green,Receive=Red,Both=Yellow"); /* set point size for subsequent text drawing to 10 points */ TextSize(10); ix = nsp; iy = nsp + 3*nbx; /* set pen location without drawing */ MoveTo(ix,iy); /* select color to use in foreground drawing to green */ ForeColor(341); /* draw text from any arbitrary buffer */ DrawText(name,0,11); /* get width of unformatted text */ is = TextWidth(name,0,11); ix += is; /* set pen location without drawing */ MoveTo(ix,iy); /* select color to use in foreground drawing to red */ ForeColor(205); /* draw text from any arbitrary buffer */ DrawText(name,11,12); /* get width of unformatted text */ is = TextWidth(name,11,12); ix += is; /* set pen location without drawing */ MoveTo(ix,iy); /* select color to use in foreground drawing to yellow */ ForeColor(69); /* draw text from any arbitrary buffer */ DrawText(name,23,11); /* select color to use in foreground drawing to white */ ForeColor(30); /* set point size for subsequent text drawing to 9 points */ TextSize(9); /* set initial x and y coordinates for label */ it = nbx/4; iw = nbx/2 + nsp; ix = nsp; iy = 8*nbx + 2*nsp; /* write label for message size, odd numbers only */ for (i = 0; i < nds; i+=2) { /* convert node number to string */ sprintf(name,"%2d",i); /* get width of unformatted text */ is = TextWidth(name,0,2); /* set pen location without drawing */ MoveTo(ix+(it-is)/2,iy); /* draw text from any arbitrary buffer */ DrawText(name,0,2); /* underline location of half maximum */ if (i==12) { MoveTo(ix+(it-is)/2,iy+3); DrawText("__",0,2); } /* update x coordinate */ ix += iw; } /* write second label */ strcpy(name,"Log2 Number vs. Log2 Message Size"); /* set point size for subsequent text drawing to 10 points */ TextSize(10); /* set pen location without drawing */ MoveTo(nsp,9*nbx+2*nsp); /* draw text from any arbitrary buffer */ DrawText(name,0,33); /* set rectangle for speedometer */ wrect.top = 9*nbx + 3*nsp; wrect.left = 2*nsp; wrect.bottom = wrect.top + 4*nbx + 6; iy = 11*nbx + 3*nsp + 2; /* set point size for subsequent text drawing to 9 points */ TextSize(9); for (i = 0; i < 2; i++) { if (i==1) wrect.left += 4*(nbx + nsp) + (nsp - 2); wrect.right = wrect.left + 4*nbx + 1; /* set dimensions of pen for current Grafport */ PenSize(2,2); /* fill a wedge with current pen pattern and mode */ PaintArc(&wrect,-90,180); if (i==0) { /* select color to use in foreground drawing to cyan */ ForeColor(273); /* write third label */ strcpy(name,"Communication % "); } else if (i==1) { /* select color to use in foreground drawing to red */ ForeColor(205); /* write third label */ strcpy(name,"Receiving (MB/s)"); } /* draw an arc */ FrameArc(&wrect,-90,180); /* set pen location without drawing */ MoveTo(wrect.left,iy+1); /* draw a line to specified coordinates */ LineTo(wrect.right-2,iy+1); /* set dimensions of pen for current Grafport */ PenSize(1,1); /* select color to use in foreground drawing to white */ ForeColor(30); /* set pen location without drawing */ MoveTo(wrect.left-nsp+2,iy+nbx); /* draw text from any arbitrary buffer */ DrawText(name,0,16); /* set pen location without drawing */ MoveTo(wrect.left-nsp,iy); /* draw text from any arbitrary buffer */ DrawText("0",0,1); /* set pen location without drawing */ MoveTo(wrect.right+2,iy); /* convert node number to string */ if (i==0) { sprintf(name,"%3d",100); /* draw text from any arbitrary buffer */ DrawText(name,0,3); } else { sprintf(name,"%2d",mbs); /* draw text from any arbitrary buffer */ DrawText(name,0,2); } } /* write third label */ strcpy(name," Current and Average Message-Passing"); /* set point size for subsequent text drawing to 10 points */ TextSize(10); /* set pen location without drawing */ MoveTo(nsp,13*nbx+3*nsp); /* draw text from any arbitrary buffer */ DrawText(name,0,36); /* activate the GrafPort that was originally active */ if (wptr) SetPort(wptr); /* display status */ logmess(0,0,0,0,0); return; } void logmess(int idp, int lstat, int lsize, int mticks, int tag) { /* this subroutine logs MPI message state change and displays status idp = remote processor id lstat = (-1,1,-2,2) = (clear send,add send,clear receive,add receive) lstat = 0 means display current status for all processors lsize = size of message (in bytes) lsize = -1 means print out message size distribution function mticks = wait time in microseconds tag = message tag input argument: idp, lstat, lsize, mticks, tag local data */ #define NDSIZE 24 int istat, istyle, i, i1, i2; float ar, cr, at, ct; double dt; /* mstat = number of outstanding sends and receives for each node msize = message size distribution function nmax = maximum number of messages in display lmax = log2 of nmax ssize/times = total bytes/time sent rsize/timer = total bytes/time received ptime = time since last short term average */ static int mstat[MAXS][2] = {{0,0}}, msize[2*NDSIZE] = {0}; static int lmax = 8, nmax = 256, nums = 0, numr = 0; static double ssize[2] = {0.0,0.0}, times[2] = {0.0,0.0}; static double rsize[2] = {0.0,0.0}, timer[2] = {0.0,0.0}; static double ptime = 0.0; /* internal mpi common block nproc = number of real or virtual processors obtained idproc = processor id */ /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ /* check for errors */ if ((idp < 0) || (idp >= nproc)) return; /* print out message size distribution function */ if (lsize < 0) { fprintf(unit2," Message Size Distribution Function\n"); i1 = 1; for (i = 0; i < NDSIZE; i++) { i2 = i + NDSIZE; fprintf(unit2," Size(bytes) = %d Sends = %d Receives = %d\n", i1,msize[i2],msize[i]-msize[i2]); i1 = 2*i1; } dt = 1.0/MPI_Wtime(); i1 = 100.0*(float) (times[0]*dt) + .5; ar = (float) (ssize[0]/times[0]); fprintf(unit2," Sending Time = %d %% Speed = %f MB/s\n",i1,ar); i1 = 100.0*(float) (timer[0]*dt) + .5; ar = (float) (rsize[0]/timer[0]); fprintf(unit2," Receiving Time = %d %% Speed = %f MB/s\n",i1,ar); return; } /* process message size and time data */ else if ((lstat < 0) && (lsize > 0)) { /* accumulate sending data */ if (lstat==(-1)) { dt = 1.0e-6*(double) lsize; ssize[0] += dt; ssize[1] += dt; dt = 1.0e-6*(double) mticks; times[0] += dt; times[1] += dt; /* calculate short term average */ nums += 1; if (nums==4) { nums = 0; ssize[1] = 0.0; times[1] = 0.0; } } /* accumulate receiving data */ else if (lstat==(-2)) { dt = 1.0e-6*(double) lsize; rsize[0] += dt; rsize[1] += dt; dt = 1.0e-6*(double) mticks; timer[0] += dt; timer[1] += dt; /* calculate short term average */ numr += 1; if (numr==4) { dt = MPI_Wtime(); ct = (float) ((int) (100.0*(float) ((times[0]+timer[0])/dt) + .5)); dt = dt - ptime; at = (float) ((int) (100.0*(float) ((times[1]+timer[1])/dt) + .5)); ptime = MPI_Wtime(); cr = (float) (rsize[0]/timer[0]); ar = (float) (rsize[1]/timer[1]); shospeed(at,ct,ar,cr); numr = 0; rsize[1] = 0.0; timer[1] = 0.0; } } /* increment message size distribution function */ i1 = imin((int) (log((float) (lsize))/log(2.) + .5),NDSIZE-1); msize[i1] += 1; /* increment message size distribution function for sends */ i2 = i1 + NDSIZE; if (lstat==(-1)) msize[i2] += 1; if (msize[i1] > nmax) { /* erase display of distribution function */ showdism(0,nmax,0,lmax,0); lmax += 1; nmax += nmax; /* redisplay distribution function */ for (i = 0; i < NDSIZE; i++) { showdism(i,msize[i],msize[i+NDSIZE],lmax,1); } } /* display distribution function */ else showdism(i1,msize[i1],msize[i2],lmax,1); } i1 = idp; /* calculate all current statuses */ if (lstat==0) { for (i = 0; i < nproc; i++) { istat = 0; if (mstat[i][0] >= 1) istat += 1; if (mstat[i][1] >= 1) istat += 2; /* differentiate single from multiple sends/receives */ /* if ((mstat[i][0] > 1) || (mstat[i][1] > 1)) istat += 3; */ /* display status, outline local node */ if (i==idproc) istyle = 1; else istyle = 0; showmess(i,istat,istyle); } /* display scale */ showdism(0,msize[0],msize[NDSIZE],lmax,0); /* display current distribution function */ for (i = 0; i < NDSIZE; i++) { showdism(i,msize[i],msize[i+NDSIZE],lmax,1); } return; } /* add state change to log */ else if (lstat==1) { mstat[i1][0] += 1; if (monitor==2) fprintf(unit2,"send posted: destination= %d size= %d tag= %d\n", idp,lsize,tag); } else if (lstat==(-1)) { mstat[i1][0] -= 1; if (monitor==2) fprintf(unit2,"sent: destination= %d size= %d time= %d tag= %d\n", idp,lsize,mticks,tag); } else if (lstat==2) { mstat[i1][1] += 1; if (monitor==2) fprintf(unit2,"receive posted: source= %d size= %d tag= %d\n", idp,lsize,tag); } else if (lstat==(-2)) { mstat[i1][1] -= 1; if (monitor==2) fprintf(unit2,"received: source= %d size= %d time= %d tag= %d\n", idp,lsize,mticks,tag); } /* calculate current status */ istat = 0; if (mstat[i1][0] >= 1) istat += 1; if (mstat[i1][1] >= 1) istat += 2; /* differentiate single from multiple sends/receives */ /* if ((mstat[i1][0] > 1) || (mstat[i1][1] > 1)) istat += 3; */ /* display status, outline local node */ if (idp==idproc) istyle = 1; else istyle = 0; showmess(idp,istat,istyle); if (cpptr) /* flush window buffer to screen */ QDFlushPortBuffer(GetWindowPort(cpptr),0); return; #undef NDSIZE } void showmess(int idp, int istat, int istyle) { /* this subroutine shows MPI message status idp = remote processor id istat = message status = (0,1,2,3) = (none,sending,receiving,both) istyle = (0,1) = (no,yes) outline rectangle input argument: idp, istat, istyle local data */ Rect wrect; GrafPtr wptr; /* icolor = white, green, red, yellow, blue, magenta, cyan, black */ const int icolor[8] = {30,341,205,69,409,137,273,33}; /* internal common block for message window cpptr = pointer to window structure nsp = amount of space between boxes nbx = size of box */ /* check for errors */ if ((istat < 0) || (istat > 7)) return; if (!cpptr) return; /* find which grafPort is currently active */ GetPort(&wptr); /* activate a GrafPort */ SetPort(GetWindowPort(cpptr)); /* set rectangle */ wrect.top = nsp; wrect.left = (nbx + nsp)*idp + nsp; wrect.bottom = wrect.top + nbx; wrect.right = wrect.left + nbx; /* select color to use in foreground drawing */ ForeColor(icolor[istat]); /* draw the outline of a rectangle */ FrameRect(&wrect); /* fill rectangle with current pen pattern and mode */ PaintRect(&wrect); /* outline rectangle */ if (istyle) { /* shrink or expand a rectangle */ InsetRect(&wrect,-2,-2); /* draw the outline of a rectangle */ FrameRect(&wrect); } /* activate the GrafPort that was originally active */ if (wptr) SetPort(wptr); return; } void showdism(int ibin,int nbin,int mbin,int lmax,int istyle) { /* this subroutine shows distribution of MPI messages ibin = bin number for distribution function nbin = number of total messages in ibin mbin = number of send messages in ibin lmax = log2 of maximum number of messages in display istyle = (0,1) = (erase and rescale,draw) display input argument: ibin, nbin, nmax local data */ Rect wrect; short iw, nscale, nsub; GrafPtr wptr; float scale; char name[9]; /* internal common block for message window cpptr = pointer to window structure nsp = amount of space between boxes nbx = size of box nds = number of message sizes monitored */ /* check for errors */ if (!cpptr) return; /* find which grafPort is currently active */ GetPort(&wptr); /* activate a GrafPort */ SetPort(GetWindowPort(cpptr)); /* set rectangle for entire distribution */ iw = nbx/4; wrect.bottom = 7*nbx + 2*nsp; /* erase and rescale display */ if (!istyle) { /* select color to use in foreground drawing to black */ ForeColor(33); wrect.top = wrect.bottom - 4*nbx; wrect.right = (iw + nsp/2)*nds + nsp/2; wrect.left = 0; /* fill rectangle with background pattern */ EraseRect(&wrect); /* select color to use in foreground drawing to white */ ForeColor(30); /* set point size for subsequent text drawing to 9 points */ TextSize(9); /* convert node number to string */ sprintf(name,"%2d",lmax); /* set pen location without drawing */ MoveTo(0,wrect.top+9); /* draw text from any arbitrary buffer */ DrawText(name,0,2); } /* draw display */ else { /* calculate size of total image */ scale = (float) (4*nbx)/(((float) (lmax))*log(2.)); nscale = imin((int) (log((float) (nbin+1))*scale),4*nbx); if (!nbin) goto L10; /* set width for individual bin */ wrect.right = (iw + nsp/2)*ibin + iw + nsp; wrect.left = wrect.right - iw; /* calculate size of image for sends */ nsub = (((float) (mbin))/((float) (nbin)))*((float) (nscale)) + .5; /* set height of individual bin for sends */ wrect.top = wrect.bottom - nsub; /* select color to use in foreground drawing to green */ ForeColor(341); /* draw the outline of a rectangle */ FrameRect(&wrect); /* fill rectangle with current pen pattern and mode */ PaintRect(&wrect); /* set height of individual bin for receives */ wrect.top = wrect.bottom - nscale; wrect.bottom = wrect.bottom - nsub; /* select color to use in foreground drawing to red */ ForeColor(205); /* draw the outline of a rectangle */ FrameRect(&wrect); /* fill rectangle with current pen pattern and mode */ PaintRect(&wrect); } /* activate the GrafPort that was originally active */ L10: if (wptr) SetPort(wptr); return; } void shospeed(float atime,float ctime,float arate,float crate) { /* this subroutine shows communication rates for MPI messages atime = short term average communication time (% of total) ctime = long term average communication time (% of total) arate = short term average reception rate (MB/sec) crate = long term average reception rate (MB/sec) input argument: atime, ctime, arate, crate local data */ float scale, angle, dx; const float pi=.5*6.28318530717959; short ix0, iy0; static short ix[4] = {0,0,0,0}, iy[4] = {0,0,0,0}; GrafPtr wptr; int i; /* internal common block for message window cpptr = pointer to window structure nsp = amount of space between boxes nbx = size of box mbs = maximum maximum bandwidth in MB/sec expected */ /* check for errors */ if (!cpptr) return; /* find which grafPort is currently active */ GetPort(&wptr); /* activate a GrafPort */ SetPort(GetWindowPort(cpptr)); /* set dimensions of pen for current Grafport */ PenSize(2,2); /* find center of speedometer */ iy0 = 11*nbx + 3*nsp + 1; /* parameters for communication plot */ ix0 = 2*(nbx + nsp) + 1; scale = .01; /* plot speedometer lines */ for (i = 0; i < 4; i++) { /* parameters for reception plot */ if (i==2) { ix0 += 4*(nbx + nsp) + (nsp - 2); scale = 1./(float) mbs; } /* erase old speedometer line */ /* select color to use in foreground drawing to white */ ForeColor(30); /* set pen location without drawing */ MoveTo(ix0,iy0); /* draw a line a specified distance */ Line(ix[i],-iy[i]); /* draw new speedometer line */ if (i==0) { /* select color to use in foreground drawing to cyan */ ForeColor(273); angle = flmax(0.,(1. - atime*scale))*pi; } else if (i==1) { /* select color to use in foreground drawing to black */ ForeColor(33); angle = flmax(0.,(1. - ctime*scale))*pi; } else if (i==2) { /* select color to use in foreground drawing to red */ ForeColor(205); angle = flmax(0.,(1. - arate*scale))*pi; } else if (i==3) { /* select color to use in foreground drawing to black */ ForeColor(33); angle = flmax(0.,(1. - crate*scale))*pi; } dx = (float) (2*nbx-3); ix[i] = dx*cos(angle) + .5; iy[i] = dx*sin(angle) + .5; /* set pen location without drawing */ MoveTo(ix0,iy0); /* draw a line a specified distance */ Line(ix[i],-iy[i]); } /* set dimensions of pen for current Grafport */ PenSize(1,1); /* activate the GrafPort that was originally active */ if (wptr) SetPort(wptr); return; } void Logname(char* name) { /* this subroutine records and displays user-defined label name = label to display local data */ Rect wrect; short nl; GrafPtr wptr; /* internal common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ /* internal common block for message window cpptr = pointer to window structure nsp = amount of space between boxes nbx = size of box */ if (monitor==0) return; if (monitor==2) fprintf(unit2,"%s",name); /* check for errors */ if (!cpptr) return; /* find which grafPort is currently active */ GetPort(&wptr); /* activate a GrafPort */ SetPort(GetWindowPort(cpptr)); /* set rectangle */ wrect.top = 13*nbx + 3*nsp + 2; wrect.left = nsp; wrect.bottom = wrect.top + nbx; wrect.right = wrect.left + 8*(nbx + nsp); /* select color to use in foreground drawing to black */ BackColor(33); /* fill rectangle with background pattern */ EraseRect(&wrect); /* select color to use in foreground drawing to white */ ForeColor(30); /* set pen location without drawing */ MoveTo(wrect.left,wrect.bottom-2); nl = imin(36,strlen(name)); /* draw text from any arbitrary buffer */ DrawText(name,0,nl); /* activate the GrafPort that was originally active */ if (wptr) SetPort(wptr); return; } void Set_Mon(int monval) { /* this subroutine sets new monitor value and corresponding window monval = new monitor value */ /* declare internal mpi common block nproc = number of real or virtual processors obtained */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ /* create or destroy window if MPI has been initialized */ if (nproc > 0) { /* open window */ if ((monitor==0) && (monval > 0)) messwin(nproc); /* close window */ if ((monitor > 0) && (monval < 1)) delmess(); } /* reset monitor value */ if (monval > 1) monitor = 2; else if (monval < 1) monitor = 0; else monitor = 1; return; } int Get_Mon() { /* this function gets current monitor value */ /* declare common block for non-blocking messages monitor = (0,1,2) = (suppress,display,display & log) monitor messages */ return monitor; } void delmess() { /* this subroutine closes a window for showing MPI message status /* internal common block for message window cpptr = pointer to window structure */ /* remove window from screen; keep WindowRecord */ if (cpptr) DisposeWindow(cpptr); cpptr = 0; return; }