_SIMULATING HYPERCUBES IN UNIX_ by Jeffery W. Hamilton and Eileen M. Ormsby [LISTING ONE] /***** cube.h *****/ /* Hypercube Simulation definitions */ #define NUMBER_IN_PART 4 /* number of nodes in partition */ #define PM_PORT 6000 /* Maximum message sent between nodes */ #define MAX_MESSAGE_SIZE (1024 * 16) typedef struct { char *name; /* network name of the computer hosting partition */ int socket; /* file descriptor for the socket */ int errfdp; /* file descriptor for sending "kill" values */ struct sockaddr_in addr; } subpart; typedef struct { int type; /* message type sent with the message -1 or greater */ int spid; /* sender's group number (pid) */ int snode; /* sender's node number */ int dnode; /* node this message is destined for */ int t_length; /* total length of message */ int length; /* length of the message */ char valid[NUMBER_IN_PART+2]; /* 0= no message */ char msg[MAX_MESSAGE_SIZE]; /* Actual message contents */ [LISTING TWO] /***** pm.c *****/ /* PARTITION MANAGER -- This program will run on all partitions used for an ** application. It is started via a remote execution call from "load". ** The main program gets the input arguments, sets a few variables and calls ** the Partition Manager subroutine which performs the following functions: ** determines local partition information; allocates neccessary partition ** structures; sets up interrupt handling to free system resources when the ** application is terminated; sets up server portion of socket communications; ** forks a client PM that sets up client portion of the socket communications, ** waits for a node to request data to be sent to a partition, and sends data ** over the sockets; forks and execs application children; performs server PM ** functions that waits to receive data from the sockets and notifies ** appropriate nodes when data has arrived. ** The PM server only receives data for its nodes, and the PM client sends ** data to a remote partition. ** BASIC SOFTWARE ARCHITECTURE: The load module in the simcube library will: ** 1) Read the .pmrc file; 2) Determine how many partitions will be used ** for this application; 3) Fork and exec a local PM and the appropriate ** number of remote PMs PM is passed the name of the application program, ** its partition number, the key value for PM to communicate to the host ** process with the group number, the total number of application nodes, ** the names of the other partitions running this application. ** The initialization portion (init_simulator) which is called by the ** application processes (nodes) will set up interrupt handling and create ** the shared memory and semaphores necessary for communications ** between local nodes and the Partition Manager. */ /* These functions allow a UNIX system to simulate a hypercube environment. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "cube.h" #define NUM_TRIES 60 #define min(x,y) (((x) < (y)) ? (x) : (y)) /* Function Prototypes */ void *malloc(int size); void *shmat(int, void*, int); int pm (char *filename, char *pmsites[]); void setup_server_sockets(void); void pm_server(void); void setup_client_sockets(void); void pm_getmsg(void); void abort_prog(void); void sig_terminate(int sig, int code, struct sigcontext *scp); void unexpected_death(int sig, int code, struct sigcontext *scp); int _killcube(int node, int pid); int init_shared_mem(void **pointer, int size, int key); int init_semaphore(int *semid, int size, int value, int key); int semcall_all(int semid, int size, int operation); int semcall_one(int semid, int num, int operation); int numnodes(void); int numparts(void); int mypart(void); int partof(int node); int pm_partof(int node); int numbuffers(void); int mybuffer(void); int bufferof(int node); int mynode(void); int myhost(void); void pm_client(void); /* Local, Private Information */ fd_set node_part_set, temp_set; /* node_part_set is the variable that FD_XXX commands are */ /* applied to. Definitions of fd_set structure, and FD_ZERO, */ /* FD_SET, FD_CLR, and FD_ISSET macros are in */ /* node_part_set will have socket file descriptors.*/ static int num_parts; /* number of partitions */ static int my_part; /* partition this process is in */ static int nodes_in_part; /* number of nodes in this partition */ static subpart *partition; /* list of partition information */ static int base; /* base key value for allocating shared data */ static int my_node; /* node number for this process */ static int my_group; /* group id for this process */ /* There are two groups, host communications */ /* and inter-node communications */ static int num_nodes; /* total number of nodes in all partitions */ static int msgavail = -1; /* semaphores indicating message is available */ static int msgfree = -1; /* semaphores indicating buffer is free */ static int next_message = -1; /* which message is to be received next */ static int shmid_m = -1; /* id of shared area for messages */ static message *buffer = NULL;/* communication areas */ static int *children = NULL; /* process ids of all child processes */ static int child_index = 0; /* number of children created */ static int pmserver_pid = 0; /* pid of pmserver */ /* Main: reads arguments from command line, places them in local variables ** and calls pm. (Local variables are not necessary, but enhances readability) ** NOTE: ONLY TEN NODES (PM SITES) ARE READ FROM THE COMMAND LINE */ int main(int argc, char *argv[]) { char *filename; char *pmsites[16]; int i; if (argc < 7 ) { fprintf (stderr, "PM main: error not enough arguments\n"); fflush(stderr); exit(-1); } filename = argv[1]; my_part = atoi(argv[2]); base = atoi(argv[3]); my_group = atoi(argv[4]); num_nodes = atoi(argv[5]); for (i = 0; i < argc - 6; i++) { pmsites[i] = argv[i + 6]; } pm (filename, pmsites); } /* pm -- Determines partition information, sets up signal handling, sets up ** server sockets, forks client pm, forks application children. PM splits the ** application into NUMBER_IN_PART processes. The partition number is passed ** as an input parameter. The starting node number is the partition number ** NUMBER_IN_PART and remaining processes will be numbered consecutively. ** Shared memory will be allocated to serve as a communications vehicle within ** a partition. Sockets used between partitions to allow multiple UNIX systems ** to be combined to create a larger set of CPUs to be applied to a problem. */ int pm (char *filename, char *pmsites[]) { register int i, pid; char temp[128]; /* used to set up environment variables */ char part_names[64]; int start_node; int dest_node; /* Determine how many other partitions exist */ num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART; /* Determine which node is the first for this partition */ start_node = mypart() * NUMBER_IN_PART; /* Determine how many nodes are in this partition (1-4) */ nodes_in_part = numnodes() - (mypart() * NUMBER_IN_PART); nodes_in_part = min(NUMBER_IN_PART, nodes_in_part); /* Set PM's node to be the last node on this partition */ /* (The children will be start_node through start_node + nodes_in_part-1) */ my_node = nodes_in_part; /* Create the structure to hold the partition names and socket fds */ if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) { fprintf(stderr,"PM %d SERVER: insufficient memory\n, mypart()"); fflush(stderr); return -1; } memset(partition, 0, num_parts * sizeof(subpart)); /* Catch these signals so PM can notify children to clean up */ signal(SIGINT,sig_terminate); signal(SIGTERM,sig_terminate); signal(SIGQUIT,sig_terminate); /* Watch for unexpected deaths */ signal(SIGCHLD, unexpected_death); /* Create, bind, and listen on sockets */ setup_server_sockets(); if (mypart() != 0) { /* Only change the base on partitions that are not the one that includes ** host. That partition requires same base that host session is using. */ base = getpid(); } /* Allocate shared memory */ shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base); if (mypart() != 0) { memset(buffer, 0, sizeof(message) * numbuffers()); } /* Allocate communications semaphores */ init_semaphore(&msgavail, numbuffers(), 0, base+10000); init_semaphore(&msgfree, numbuffers(), 0, base+20000); /* Flush stdout and stderr before doing a fork, so child doesn't inherit */ fflush(stdout); fflush(stderr); /* Fork PM CLIENT here */ if ((pmserver_pid = fork()) < 0) { /* Can't create the PM CLIENT */ _killcube(0, 0); fprintf(stderr, "PM %d SERVER: unable to create PM CLIENT process %d\n", mypart(), i); fflush(stderr); return -1; } else if (pmserver_pid == 0) { /* Fill in the names of the other sites in the partition structure and ** close the socket file desciptors that this process just inherited. */ for (i = 0; i < num_parts; i++) { if (mypart() != i) { partition[i].name = pmsites[i]; close(partition[i].socket); } } /* CALL CLIENT SUBROUTINES */ setup_client_sockets(); pm_client(); } else { /* SERVER: forks application children then calls pm_server subroutine */ /* Read from pmsites array, create a comma delimited string for env */ part_names[0] = '\0'; for (i = 0; i < num_parts; i ++) { strcat(part_names, pmsites[i]); strcat(part_names, ","); } /* Allocate space for child pids */ if ((children = malloc(nodes_in_part * sizeof(int))) == NULL) { fprintf(stderr,"PM %d SERVER: insufficient memory\n", mypart()); fflush(stderr); return -1; } /* Load all nodes within this partition */ for (i = start_node; (i < start_node + nodes_in_part); i++) { if ((pid = fork()) < 0) { /* Can't create all the children! */ _killcube(0, 0); fprintf(stderr, "PM %d SERVER: unable to create node process %d\n", mypart(), i); fflush(stderr); return -1; } else if (pid == 0) { /* I'm the child process */ /* Start the node program */ my_node = i; sprintf(temp, "SIM_INFO=%d,%d,%d,%d,%s", base,my_node,my_group,num_nodes,part_names); if (putenv(temp) != 0) { fprintf(stderr, "PM %d SERVER: Insufficient room to add env variable\n", my_node); fflush(stderr); return -1; } execlp(filename,filename,NULL); /* If we get here, we had a problem */ perror("execlp"); fprintf(stderr,"PM %d SERVER: error execing node=%d file=%s errno=%d\n", mypart(), my_node, filename, errno); fflush(stderr); return -1; } else { /* I'm the parent process */ children[child_index++] = pid; } } /* CALL SERVER SUBROUTINE */ pm_server(); } /* end if PM SERVER */ } /* setup_server_sockets -- SERVER SOCKETS- for all partitions except ourself: ** Create a socket Bind the socket to a unique PORT id. (If the socket was ** in use in a prior iteration, it may not have been reset yet - therefore we ** loop a fixed number of times retrying.) Put a listen on socket. Put new ** socket file descriptor into our set */ static void setup_server_sockets(void) { int i, j; struct sockaddr_in part_sock, tempaddr; /* Zero out the set of partition sockets */ FD_ZERO(&node_part_set); FD_ZERO(&temp_set); for (i = 0; i < num_parts; i++) { /* Skip ourself */ if (i == mypart () ) continue; for (j = 0; j < NUM_TRIES; j++) { /* Create a SERVER socket to receive data */ if ((partition[i].socket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { fprintf(stderr, "PM %d SERVER: can't open stream socket, errno\n", mypart(), errno); fflush(stderr); exit (100); } /* Bind SERVER socket to local addr so partitions can send to it */ bzero((char*)&part_sock, sizeof(part_sock)); part_sock.sin_family = AF_INET; part_sock.sin_addr.s_addr = htonl (INADDR_ANY); /* Create unique SERVER socket port address, up to 16 per computer */ part_sock.sin_port = htons (PM_PORT + (mypart() << 4) + i); /* If socket is still in use from prev iter, keep trying to bind */ if ((bind(partition[i].socket, &part_sock, sizeof(part_sock))) < 0) { if ((errno == EADDRINUSE) || (errno == EINTR)) { /* Previous load hasn't shutdown yet, or we were interrupted. */ close(partition[i].socket); sleep(2); } else { fprintf(stderr,"PM %d SERVER: can't bind local addr, errno=%d\n", mypart(), errno); fflush(stderr); exit(100); } } else { /* It worked, exit the loop */ break; } } if (j == NUM_TRIES) { /* Exceeded retry limit */ fprintf(stderr,"PM %d SERVER: can't bind local addr, errno=%d\n", mypart(), errno); fflush(stderr); exit(100); } /* Issue a listen for the server sockets */ if (listen(partition[i].socket, 1) < 0) { fprintf(stderr,"PM %d SERVER: can't listen on %d, errno = %d\n", mypart(), partition[i].socket, errno); fflush(stderr); exit(100); } /* Set the bit for the socket file descriptor */ FD_SET(partition[i].socket, &node_part_set); } /* end for setting up SERVER sockets */ } /* pm_server -- SERVER- go into a receiving loop: Copy file desciptors to a ** temporary set. Determine how many sockets are ready to be accepted. For ** each file descriptor that is ready: Find file descriptor that is ready. ** If it is found in a partition's array of fd's then it is a base socket and ** it is "accept"ed and added to the fd set. Else it is an fd that has data to ** be received. Receive the size of the message. Loop until entire message is ** received. Clear the valid indicator bits. Inform nodes that a message has ** arrived. If a broadcast message, set everyone's valid bit, and wait until ** everyone receives it. Else verify that message belongs to a node on this ** part and set that node's valid bit, wait until it is recvd. */ static void pm_server(void) { int i, j; int accept_rdy; int newsockfd, templen; int size, count, partial; char *target; struct sockaddr_in tempaddr; /* forever, accept sockets and receive data */ for ( ; ; ) { temp_set = node_part_set; /* Determine how many sockets are ready to be accepted */ /* FD_SETSIZE is defined in to be 200 */ if ((accept_rdy = select( FD_SETSIZE, &temp_set, 0, 0, 0)) == -1) { if (errno != 4) { fprintf(stderr, "PM %d SERVER: error in select, errno = %d\n", mypart(), errno); perror( "pm select" ) ; fflush(stderr); _killcube(0,0); exit(-1); } else { /* We were interrupted, try again */ continue; } } for (i = 1; (accept_rdy != 0) && (i < FD_SETSIZE) ; i++) { /* Find the file descriptor that needs servicing */ if ( FD_ISSET( i, &temp_set)) { /* temporary modification */ /* accept_rdy--; */ accept_rdy = 0; /* Examine each partition's array of fd's to find ready one */ for (j = 0; j < num_parts; j++) { /* Skip examining our own partition */ if (j == mypart() ) continue; /* Since this matches our "base" socket, accept the socket */ if (i == partition[j].socket) { newsockfd = accept(partition[j].socket, (struct sockaddr_in *)&tempaddr, &templen); FD_SET (newsockfd, &node_part_set); /* Found "base" socket, break out of for each part loop */ break; } /* end if base socket */ } /* end for check file descriptors in partition's array */ /* If it wasn't a base socket, then need to receive data */ if (j != num_parts) { continue; } else /* receive the data from the socket */ { /* First receive the size of the message */ while (recv(i, &size, sizeof(size)) < 0) { if (errno != 22) { fprintf(stderr, "PM %d SERVER: recv size err, errno=%d, fd=%d\n", mypart(),errno, i); fflush(stderr); _killcube(0,0); exit(-1); } else { fprintf(stderr, "PM %d SERVER: recv size err, errno=%d, fd=%d\n", mypart(), errno, i); fflush(stderr); } } /* end while recv msg */ target = (char *) &buffer[nodes_in_part]; count = 0; /* Now receive the message, it could come in pieces */ while (count < size) { if ((partial = recv(i, target, size - count)) < 0) { fprintf(stderr, "PM %d SERVER: Error recvng msg; errno=%d\n", mypart(),errno); fflush(stderr); exit(-1); } count += partial; target += partial; } /* Make sure all valid bits are cleared */ memset(buffer[nodes_in_part].valid,0, sizeof(buffer[nodes_in_part].valid)); /* Tell the node(s) the message is there */ if (buffer[nodes_in_part].dnode == -1) { /* Broadcast the message to nodes in this partition */ for (j=0; j < nodes_in_part; j++) { buffer[nodes_in_part].valid[j] = 1; } semcall_all(msgavail,nodes_in_part, 1); /* Wait until everyone receives the message */ semcall_one(msgfree, nodes_in_part, -nodes_in_part); } else { if (mypart() != partof(buffer[nodes_in_part].dnode)) { fprintf(stderr, "PM %d SERVER: Recvd msg for node %d not this partition\n", mypart(), buffer[nodes_in_part].dnode); fflush(stderr); } else { /* Point to point to another node in same partition */ j = bufferof(buffer[nodes_in_part].dnode); buffer[nodes_in_part].valid[j] = 1; semcall_one(msgavail, j, 1); /* Wait until it is received */ semcall_one(msgfree, nodes_in_part, -1); } } /* endif broadcast message */ } /* endif receiving data from this socket */ } /* endif this socket */ } /* endfor */ } /* end forever receive messages on sockets */ } /* setup_client_sockets -- Setting up CLIENT sockets- for all partitions ** except ourself: Create a socket to send data. Look up address of host, ** place in the sockaddr_in structure. Determine appropriate PORT id (needs to ** match with SERVER). (If socket was in use in a prior iteration, it may not ** have been reset yet - therefore we loop a fixed number of times retrying.). ** Issue a connect for the socket */ static void setup_client_sockets(void) { int i, j; struct hostent *hent; /* Establish socket communications with other partitions */ for (i = 0; i < num_parts; i++) { /* Skip ourself */ if (i == mypart () ) continue; for (j = 0; j < NUM_TRIES; j++) { /* Create a CLIENT socket to send data */ partition[i].socket = socket(AF_INET, SOCK_STREAM, 0); /* Lookup host address and place in the socket address structure */ memset(&partition[i].addr, 0, sizeof(struct sockaddr_in)); partition[i].addr.sin_family = AF_INET; if ((hent = gethostbyname(partition[i].name)) == NULL) { fprintf(stderr,"PM %d CLIENT: No entry for %d in /etc/hosts\n", mypart(), partition[i].name); fflush(stderr); exit(100); } memcpy(&partition[i].addr.sin_addr, hent->h_addr, hent->h_length); partition[i].addr.sin_port = htons(PM_PORT + (i << 4) + mypart()); /* Connect to the socket */ if (connect(partition[i].socket, &partition[i].addr, sizeof(struct sockaddr_in)) < 0) { if (errno == ECONNREFUSED) { /* unsuccessful connect, sleep and try again */ sleep(3); } else { /* another error occurred, quit trying to connect */ j = NUM_TRIES; break; } } else { /* successful connect, break out of loop */ break; } /* endif connect */ } /* endfor NUM_TRIES */ if (j == NUM_TRIES) { fprintf(stderr, "PM %d CLIENT: Unable to connect sock to %s, errno %d\n", mypart(), partition[i].name, errno); fflush(stderr); exit(100); } } /* end for setting up CLIENT sockets */ } /* pm_client -- The PM CLIENT process sends data to partitions. Set up client ** sockets. Send messages over the sockets: Get message. Send message (if it ** is a broadcast message send it to all partitions, if not send it to ** appropriate partition). Acknowledge sending of message. Release buffer. ** Reset next message indicator. */ static void pm_client(void) { int i, size; /* CLIENT- GO INTO INFINITE SENDING LOOP */ /* Initial setting to indicate the next message has not been selected */ next_message = -1; /* Forever, wait for messages to send over socket */ for ( ; ; ) { /* Get the message */ pm_getmsg(); /* Determine where to send the message */ if (buffer[next_message].dnode == -1) { /* BROADCAST MESSAGE, SEND TO ALL PARTITIONS */ for (i = 0; i < numparts(); i++) { /* Don't send broadcast to self */ if (i == mypart () ) continue; /* First send the size of the message */ size = buffer[next_message].length; if (send(partition[i].socket, &size, sizeof(size),0) < 0) { fprintf(stderr, "PM %d CLIENT: send to PM %d failed, errno=%d\n", mypart(), i, errno); fflush(stderr); return -1; } /* Then send the actual message */ if (send(partition[i].socket, &buffer[next_message], size, 0) < 0) { fprintf(stderr, "PM %d CLIENT: send to PM %d failed, errno=%d\n", mypart(), i, errno); fflush(stderr); return -1; } } /* endfor SEND BROADCAST TO ALL PARTITIONS */ } else { /* SEND TO A SPECIFIC PARTITION */ /* First send the size of the message */ size = buffer[next_message].length; i = partof(buffer[next_message].dnode); if (send(partition[i].socket, &size, sizeof(size),0) < 0) { fprintf(stderr, "PM %d CLIENT: send to PM %d failed, errno=%d\n", mypart(), i, errno); fflush(stderr); return -1; } /* Then send the actual message */ if (send(partition[i].socket, &buffer[next_message], size, 0) < 0) { fprintf(stderr, "PM %d CLIENT: send to PM %d failed, errno=%d\n", mypart(), i, errno); return -1; } } /* FOR BOTH BROADCAST AND REGULAR MESSAGES */ /* acknowledge the sending of the message */ buffer[next_message].valid[mybuffer()] = 0; /* release (free) the buffer */ semcall_one(msgfree, next_message, 1); /* reset next_message so the next getmsg will work */ next_message = -1; } /* end forever CLIENT PROCESS sending messages over socket */ } /***** Initialization and Termination routines *****/ /* abort_prog -- Clean up in the case of an error */ static void abort_prog(void) { int i; /* Remove the sets of semaphores */ if (pmserver_pid != 0) { if (msgavail != -1) { semctl(msgavail, 0, IPC_RMID, 0); msgavail = -1; } if (msgfree != -1) { semctl(msgfree, 0, IPC_RMID, 0); msgfree = -1; } } /* Remove the shared memory */ if (buffer != NULL) { shmdt(buffer); buffer = NULL; } /* Only PM SERVER process should execute this code */ if (pmserver_pid != 0) { if (shmid_m != -1) { shmctl(shmid_m, IPC_RMID, 0); shmid_m = -1; } } /* Close the sockets */ for (i = 0; i < num_parts; i++) { if (i != mypart() ) { close (partition[i].socket); partition[i].socket = 0; } } /* Make sure all pending output gets out */ fflush(stdout); fflush(stderr); } /* Handle termination signals */ void sig_terminate(int sig, int code, struct sigcontext *scp) { int i; /* Send termination signal to each of PM SERVER's children */ if (pmserver_pid != 0) { for (i = 0; i < child_index; i++) { kill(children[i], SIGTERM); } child_index = 0; kill(pmserver_pid, SIGTERM); } /* Clean up the use of semaphores and shared memory */ abort_prog(); exit(100); } /* Handle unexpected termination signals */ void unexpected_death(int sig, int code, struct sigcontext *scp) { int statval; int waitpid; /* Only PM SERVER process should execute this code */ if (pmserver_pid != 0) { waitpid = wait(&statval); if (waitpid < 0) { printf("Error determining who died unexpectedly. Errno=%d\n", errno); } else { if (WIFSIGNALED(statval) != 0) { printf("Process %d did not catch signal %d.\n", waitpid, WTERMSIG(statval)); } else if (WIFSTOPPED(statval) != 0) { printf("Process %d stopped due to signal %d.\n", waitpid, WSTOPSIG(statval)); } else if (WIFEXITED(statval) == 0) { /* Normal termination */ } else { /* Terminated with exit code */ } } } fflush(stdout); } /* killcube -- On abort, kill off all children on the hypercube partition */ int _killcube(int node, int pid) { int i; int statval; int waitpid; /* Only PM SERVER process should execute this code */ if (pmserver_pid != 0) { for (i = 0; i < child_index; i++) { kill(children[i], SIGTERM); } kill(pmserver_pid, SIGTERM); for (i = 0; i <= child_index; i++) { waitpid = wait(&statval); if (waitpid < 0) { /* No more children left */ break; } else { if (WIFSIGNALED(statval) != 0) { printf("Process %d did not catch signal %d.\n", waitpid, WTERMSIG(statval)); } else if (WIFSTOPPED(statval) != 0) { printf("Process %d stopped due to signal %d.\n", waitpid, WSTOPSIG(statval)); } else if (WIFEXITED(statval) == 0) { /* Normal termination */ } else { /* Terminated with exit code */ } } } } /* Clean up after ourself */ abort_prog(); child_index = 0; return 0; } /* init_shared_mem -- Allocates a shared memory region. Sets pointer to region ** in this process's memory space and returns the shared memory identifier. */ static int init_shared_mem(void **pointer, int size, int key) { int shmid; if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) { printf("init_shm: allocation of shared memory failed. Errno=%d\n",errno); printf(" mynode=%d key=%d size=%d\n",my_node,key,size); _killcube(0,0); exit(-1); } *pointer = shmat(shmid, NULL, 0); return shmid; } /* init_semaphore -- Allocates a set of semaphores and initializes them */ static int init_semaphore(int *semid, int size, int value, int key) { register int i; if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) { printf("init_sem: allocation of semaphores failed. Errno=%d\n",errno); printf(" mynode=%d key=%d size=%d\n",my_node,key,size); _killcube(0,0); exit(-1); } for (i = 0; i < size; i++) { if (semctl(*semid, i, SETVAL, value) < 0) { printf("init_sem: init of semaphores failed. Errno=%d\n",errno); printf(" mynode=%d offset=%d value=%d\n",my_node,i,value); _killcube(0,0); exit(-1); } } return *semid; } /* semcall_all --Perform same operation on all elements of semaphore at once.*/ static int semcall_all(int semid, int size, int operation) { struct sembuf sbuf[NUMBER_IN_PART+1]; register int i; for (i = 0; i < size; i++) { sbuf[i].sem_num = i; sbuf[i].sem_op = operation; sbuf[i].sem_flg = 0; } while (semop(semid, sbuf, size) < 0) { /* repeat operation if interrupted */ if (errno != EINTR) { printf("PM %d: Semaphore broadcast failed. Errno = %d\n", mypart(), errno); fflush(stdout); return -1; } } return 0; } /* semcall_one -- Perform an operation on an element of a semaphore. */ static int semcall_one(int semid, int num, int operation) { struct sembuf sbuf; sbuf.sem_num = num; sbuf.sem_op = operation; sbuf.sem_flg = 0; while (semop(semid, &sbuf, 1) < 0) { /* repeat operation if interrupted */ if (errno != EINTR) { printf("PM %d: Semaphore failed. Errno = %d\n", mypart(), errno); fflush(stdout); return -1; } } return 0; } /***** Environment Information (External and Internal) *****/ /* numnodes -- Returns the number of simulated nodes */ int numnodes(void) { return num_nodes; } /* numparts -- number of partitions */ static int numparts(void) { return num_parts; } /* mypart -- Partition this process is in */ static int mypart(void) { return my_part; } /* partof -- Determines which partition a given node is a member of */ static int partof(int n) { if (n == myhost()) { return 0; } else { return n / NUMBER_IN_PART; } } /* pm_partof -- Determines which subpartition a given node is a member of ** A -1 can be passed if a destination node is broadcast, return -1. */ static int pm_partof(int n) { if (n == myhost()) { return 0; } else if (n == -1) { return -1; } else { return n / NUMBER_IN_PART; } } /* numbuffers -- Number of buffers in this partition */ static int numbuffers(void) { if (mypart() == 0) { return (nodes_in_part + 2); } else { return (nodes_in_part + 1); } } /* mybuffer -- returns the index for this process's buffer */ static int mybuffer(void) { return (nodes_in_part); } /* bufferof -- Returns the buffer offset of the given node. Host is always ** second to last buffer in partition 0. The PM is always the last buffer */ static int bufferof(int n) { if (mypart() != partof(n)) { return nodes_in_part; /* Return the buffer of PM */ } else if (n == myhost()) { return nodes_in_part + 1; /* This partition, buffer of host */ } else { return n % NUMBER_IN_PART; /* This partition, buffer of node */ } } /* mynode -- Returns the node number for this process */ int mynode(void) { return my_node; } /* myhost -- Returns the node number of the host */ int myhost(void) { return numnodes(); } /***** Communications *****/ /* pm_getmsg -- Wait until a message is available. This routine differs from ** getmsg, in that it checks to ensure that destination node is not in this ** partition. (Getmsg checks that current node equals destination node.) ** OUTPUT: next_message - set to the message found of the proper type */ static void pm_getmsg(void) { int i; /* Only wait if a message is not already selected */ if (next_message != -1) return; /* Wait for a message for me */ semcall_one(msgavail, mybuffer(), -1); /* Search for those messages that are for me */ for (i = 0; i < numbuffers(); i++) { if (buffer[i].valid[mybuffer()] != 0) { next_message = i; return; } } } [LISTING THREE] /***** simulate.c *****/ /* These functions allow a UNIX system simulate a hypercube environment. */ #include #include #include #include #include #include #include #include #include #include #include #include #include "cube.h" /* Prototypes */ char *getenv(char *variable); void *shmat(int shmid, void *shmaddr, int shmflg); char *strtok(char *, char *); char *strcpy(char *, char *); void *malloc(int size); #define min(x,y) (((x) < (y)) ? (x) : (y)) int csend(int type, void *msg, int length, int target_node, int group); int crecv(int type, void *buf, int len); int killcube(int, int); int numnodes(void); int myhost(void); int mynode(void); int numparts(void); int numbuffers(void); int mybuffer(void); int bufferof(int node); int mypart(void); int partof(int node); /* Local, Private Information */ static int num_parts; /* number of partitions */ static int my_part; /* partition this process is in */ static int nodes_in_part; /* number of nodes in this partition */ static subpart *partition = NULL; /* list of partition information */ static int base; /* base key value for allocating shared data */ static int my_node; /* node number for this process */ static int my_group; /* group id for this process */ /* There are two groups, host communications */ /* and inter-node communications */ static int num_nodes; /* total number of nodes in all partitions */ static int msgavail = -1; /* semaphores indicating message is available */ static int msgfree = -1; /* semaphores indicating buffer is free */ static int next_message; /* which message is to be received next */ static int shmid_m = -1; /* id of shared area for messages */ static message *buffer = NULL;/* communication areas */ static int *children = NULL; /* process ids of all child processes */ static int child_index = 0; /* number of children created */ / ** Initialization and Termination routines ** / /* abort_prog -- Clean up when the program terminates */ void abort_prog(void) { /* Remove the sets of semaphores */ if (mynode() == myhost()) { if (msgavail != -1) { semctl(msgavail, 0, IPC_RMID, 0); msgavail = -1; } if (msgfree != -1) { semctl(msgfree, 0, IPC_RMID, 0); msgfree = -1; } } /* Remove the shared memory */ if (buffer != NULL) { shmdt(buffer); buffer = NULL; } if (mynode() == myhost()) { if (shmid_m != -1) { shmctl(shmid_m, IPC_RMID, 0); shmid_m = -1; } } /* Make sure all pending output gets out */ fflush(stdout); fflush(stderr); } /* Handle termination signals */ void sig_terminate(int sig, int code, struct sigcontext *scp) { if (mynode() == myhost()) { /* Pass on the termination signal to the node processes */ killcube(0,0); } else { /* This is executed by the node processes */ /* Clean up the use of semaphores and shared memory */ abort_prog(); } exit(100); } /* Handle unexpected termination signals. Used by the host process. */ void unexpected_death(int sig, int code, struct sigcontext *scp) { int statval; int waitpid; waitpid = wait(&statval); if (waitpid < 0) { printf("Error determining who died unexpectedly. Errno=%d\n", errno); } else { if (WIFSIGNALED(statval) != 0) { printf("Process %d did not catch signal %d.\n", waitpid, WTERMSIG(statval)); } else if (WIFSTOPPED(statval) != 0) { printf("Process %d stopped due to signal %d.\n", waitpid, WSTOPSIG(statval)); } else if (WIFEXITED(statval) == 0) { /* Normal termination */ } else { /* Terminated with exit code */ } } fflush(stdout); } /* handler -- handles hypercube specific errors that do not map to UNIX. */ void handler(int type, void (*proc)()) { /* ignore this */ } /* getcube -- Called by host process to gain possession of a partition in a ** hypercube. Note: Assuming getcube is only called once per host process. */ void getcube(char *cubename, char *cubetype, char *srmname, int keep, char *account) { char size[8]; int is_dimension = 0; int i; char *ptr; char *target; /* Pull out the requested number of nodes */ ptr = cubetype; if (*ptr == 'd') { ptr++; is_dimension = 1; } target = size; i = 4; while (isdigit(*ptr) && (i-- != 0)) { *target++ = *ptr++; } *target = '\0'; /* The rest of the parameters don't matter */ /* Determine the total number of nodes */ num_nodes = NUMBER_IN_PART; /* default size */ sscanf(size,"%d",&num_nodes); if (is_dimension) { num_nodes = 1 << num_nodes; } } /* cubeinfo -- Passes back information about the partitions on a hypercube. ** Input: global=0 current attached cube; 1. all cubes you own and allocated ** by the current host; 2. all cubes on the system from which the command was ** executed; 3. how cubes are allocated on all SRMs; 4. 1 addition parameter ** (srmname) returns info for that SRM */ int cubeinfo(struct cubetable *ct, int numslots, int global, ...) { /* returns the number of cubes for which information is available */ /* Ignore this for now */ return 0; } /* relcube -- release cube gained by the getcube call. */ void relcube(char *cubename) { /* Ignore this for now */ } /* killcube -- On abort, kill off all processes in the hypercube partition */ int killcube(int node, int pid) { int i; int statval; int waitpid; /* Force everyone to terminate */ for (i = 0; i < child_index; i++) { kill(children[i], SIGTERM); } /* Give the children a chance to terminate */ if (child_index > 0) sleep(1); /* Wait for everyone to exit, check status in case */ for (i = 0; i < child_index; i++) { waitpid = wait(&statval); if (waitpid < 0) { /* No more children left */ break; } else { if (WIFSIGNALED(statval) != 0) { printf("Process %d did not catch signal %d.\n", waitpid, WTERMSIG(statval)); } else if (WIFSTOPPED(statval) != 0) { printf("Process %d stopped due to signal %d.\n", waitpid, WSTOPSIG(statval)); } else if (WIFEXITED(statval) == 0) { /* Normal termination */ } else { /* Terminated with exit code */ } } } /* Clean up after ourself */ abort_prog(); child_index = 0; return 0; } /* init_shared_mem -- Allocates a shared memory region. Sets pointer to region ** in this process's memory space and returns the shared memory identifier. */ static int init_shared_mem(void **pointer, int size, int key) { int shmid; if ((shmid = shmget(key, size, 0666 | IPC_CREAT)) < 0) { printf("init: allocation of shared memory failed. Errno=%d\n",errno); printf(" mynode=%d key=%d size=%d\n",my_node,key,size); fflush(stdout); sig_terminate(0,0,NULL); } *pointer = shmat(shmid, NULL, 0); return shmid; } /* init_semaphore -- Allocates a set of semaphores and initializes them */ static int init_semaphore(int *semid, int size, int value, int key) { register int i; if ((*semid = semget(key, size, 0666 | IPC_CREAT)) < 0) { printf("init: allocation of semaphores failed. Errno=%d\n",errno); printf(" mynode=%d key=%d size=%d\n",my_node,key,size); fflush(stdout); sig_terminate(0,0,NULL); } for (i = 0; i < size; i++) { if (semctl(*semid, i, SETVAL, value) < 0) { printf("init: initialization of semaphores failed. Errno=%d\n",errno); printf(" mynode=%d offset=%d value=%d\n",my_node,i,value); fflush(stdout); sig_terminate(0,0,NULL); } } return *semid; } /* semcall_all -- Perform same operation on all elements of a semaphore. */ static int semcall_all(int semid, int size, int operation) { struct sembuf sbuf[NUMBER_IN_PART+1]; register int i; for (i = 0; i < size; i++) { sbuf[i].sem_num = i; sbuf[i].sem_op = operation; sbuf[i].sem_flg = 0; } while (semop(semid, sbuf, size) < 0) { /* repeat operation if interrupted */ if (errno != EINTR) { printf("%d: Semaphore broadcast failed. Errno = %d\n",mynode(),errno); abort_prog(); exit(-1); } } return 0; } /* semcall_one -- Perform an operation on an element of a semaphore. */ static int semcall_one(int semid, int num, int operation) { struct sembuf sbuf; sbuf.sem_num = num; sbuf.sem_op = operation; sbuf.sem_flg = 0; while (semop(semid, &sbuf, 1) < 0) { /* repeat operation if interrupted */ if (errno != EINTR) { printf("%d: Semaphore failed. Errno = %d\n",mynode(), errno); abort_prog(); exit(-1); } } return 0; } /* setpid -- Assigns a partition identifier to the simulated partition. */ int setpid(int id) { my_group = id; return 0; } /* init_simulator -- Should be called near the beginning of an application ** before any hypercube-related functions are called. */ void init_simulator(void) { register int i, pid; char filename[20]; char *temp; static char env[256]; /* must be static */ struct hostent *hent; /* parent cm will send child cm SIGINT when a CTRL-BREAK is pressed */ signal(SIGINT,sig_terminate); signal(SIGTERM,sig_terminate); signal(SIGQUIT,sig_terminate); /* Pick up the base key value from the environment */ if ((temp = getenv("SIM_INFO")) == NULL) { fprintf(stderr,"init_sim: Missing environment variable\n"); fflush(stderr); exit(-1); } strcpy(env,temp); if ((temp = strtok(env,",")) == NULL) { fprintf(stderr, "init_sim: Missing information in environment variable\n"); fflush(stderr); exit(-1); } sscanf(temp,"%d",&base); if ((temp = strtok(NULL,",")) == NULL) { fprintf(stderr,"init_sim: Missing node info in environment variable\n"); fflush(stderr); exit(-1); } sscanf(temp,"%d",&my_node); if ((temp = strtok(NULL,",")) == NULL) { fprintf(stderr,"init_sim: Missing pid info in environment variable\n"); fflush(stderr); exit(-1); } sscanf(temp,"%d",&my_group); if ((temp = strtok(NULL,",")) == NULL) { fprintf(stderr,"init_sim: Missing number of node info in environment variable\n"); fflush(stderr); exit(-1); } sscanf(temp,"%d",&num_nodes); num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART; my_part = my_node / NUMBER_IN_PART; /* Calcuate the number of nodes in this and remaining partitions */ i = numnodes() - (mypart() * NUMBER_IN_PART); nodes_in_part = min(NUMBER_IN_PART, i); /* Allocate shared memory */ shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(), base); /* Allocate communications semaphores */ init_semaphore(&msgavail, numbuffers(), 0, base+10000); init_semaphore(&msgfree, numbuffers(), 0, base+20000); } /* load -- Should be called near the beginning of a host application before any ** hypercube-related functions are called, except for getcube. It will start ** the appropriate number of PMs on the appropriate systems (as read from the ** .pmrc file.) Parent process will be node 0, which has special roles on a ** hypercube. Remaining processes will be numbered consecutively. */ int load(char *filename, int which_node, int group_id) { register int i, j, pid, size; char *argv[20]; char base_string[20]; char partition_number[20]; char group_string[20]; char number_of_nodes[20]; char temp[256]; char *ptr; struct servent *sp; FILE *fd; /* Allocate space for child pids */ if (children == NULL) { if ((children = malloc(numnodes() * sizeof(int))) == NULL) { fprintf(stderr,"load: insufficient memory\n"); fflush(stderr); return -1; } } /* parent will send us SIGINT when CTRL-BREAK is pressed */ signal(SIGINT,sig_terminate); signal(SIGTERM,sig_terminate); signal(SIGQUIT,sig_terminate); signal(SIGCHLD, unexpected_death); base = getpid(); num_parts = (num_nodes + NUMBER_IN_PART - 1) / NUMBER_IN_PART; if (partition == NULL) { if ((partition = malloc(num_parts * sizeof(subpart))) == NULL) { fprintf(stderr,"load: insufficient memory\n"); fflush(stderr); return -1; } memset(partition, 0, num_parts * sizeof(subpart)); if ((fd = fopen(".pmrc","r")) == NULL) { fprintf(stderr,"load: Missing configuration file \".pmrc\"\n"); fflush(stderr); return -1; } for (i = 0; i < num_parts; i++) { temp[0] = '\0'; fscanf(fd," %[^ \n] \n",temp); size = strlen(temp); if ((ptr = malloc(size+1)) == NULL) { fprintf(stderr,"load: Insufficent memory\n"); fflush(stderr); return -1; } strcpy(ptr,temp); partition[i].name = ptr; } fclose(fd); } /* Host program's node number is the same as the number of nodes */ my_node = numnodes(); my_part = 0; /* Calcuate the number of nodes in this and remaining partitions */ i = numnodes() - (mypart() * NUMBER_IN_PART); nodes_in_part = min(NUMBER_IN_PART, i); /* Allocate shared memory */ if (shmid_m == -1) { shmid_m = init_shared_mem(&buffer, sizeof(message) * numbuffers(),base); } memset(buffer,0,sizeof(message) * numbuffers()); /* Allocate communications semaphores */ if (msgavail == -1) { init_semaphore(&msgavail, numbuffers(), 0, base+10000); } if (msgfree == -1) { init_semaphore(&msgfree, numbuffers(), 0, base+20000); } /* Split into node processes */ fflush(stdout); fflush(stderr); /* Start the local and remote Partition Managers */ for (i = 0; i < num_parts; i++) { if ((pid = fork()) < 0) { /* Can't create all the children! */ killcube(0,0); fprintf(stderr, "LOAD: unable to create Partition Managers\n"); return -1; } else if (pid == 0) { /* I'm the child process */ my_node = -1; /* Start the Partition Managers */ if (i == 0) { argv[0] = "pm"; argv[1] = filename; sprintf(partition_number, "%d", i); argv[2] = partition_number; sprintf(base_string,"%d", base); argv[3] = base_string; sprintf(group_string, "%d", group_id); argv[4] = group_string; sprintf(number_of_nodes, "%d", numnodes()); argv[5] = number_of_nodes; for (i = 0; i < num_parts; i++) { argv[i+6] = partition[i].name; } argv[i+6] = NULL; execvp("pm",argv); /* If we get here, we had a problem */ printf("execvp of PM 0 failed. errno=%d\n",errno); fflush(stdout); exit(-1); } else { argv[0] = "rsh"; argv[1] = partition[i].name; argv[2] = "pm"; argv[3] = filename; sprintf(partition_number, "%d", i); argv[4] = partition_number; sprintf(base_string,"%d", base); argv[5] = base_string; sprintf(group_string, "%d", group_id); argv[6] = group_string; sprintf(number_of_nodes, "%d", numnodes()); argv[7] = number_of_nodes; for (i = 0; i < num_parts; i++) { argv[i+8] = partition[i].name; } argv[i+8] = NULL; execvp("rsh",argv); /* If we get here, we had a problem */ printf("execvp of PM 0 failed. errno=%d\n",errno); fflush(stdout); exit(-1); } } else { /* I'm the parent process */ children[child_index++] = pid; } } } /** Environment Information (External and Internal) **/ /* availmem -- returns amount of memory available */ int availmem(void) { return 0; } /* nodedim -- Returns the dimension of the simulated hypercube */ int nodedim(void) { unsigned int i, temp; temp = num_nodes; i = 0; while (temp != 0) { temp >> 1; i++; } return i; } /* numnodes -- Returns the number of simulated nodes */ int numnodes(void) { return num_nodes; } /* numparts -- number of simulator partitions */ static int numparts(void) { return num_parts; } /* mypart -- Simulator partition this process is in */ static int mypart(void) { return my_part; } /* partof -- Determines which simulator partition a given node is member of */ static int partof(int n) { if (n == myhost()) { return 0; } else { return n / NUMBER_IN_PART; } } /* numbuffers -- Number of buffers in this simulator partition */ static int numbuffers(void) { if (mypart() == 0) { return nodes&us.in&us.part + 2; } else { return nodes&us.in&us.part + 1; } } /* mybuffer -- returns the index for this process's buffer */ static int mybuffer(void) { if (mynode() == myhost()) { return nodes&us.in&us.part+1; } else { return mynode() % NUMBER_IN_PART; } } /* bufferof -- Returns the buffer offset of the given node. The host is always ** the last buffer in partition 0. The PM is always second to last buffer */ static int bufferof(int n) { if (mypart() != partof(n)) { return nodes_in_part; /* Return the buffer of PM */ } else if (n == myhost()) { return nodes_in_part + 1; /* This partition, buffer of host */ } else { return n % NUMBER_IN_PART; /* This partition, buffer of node */ } } /* mynode -- Returns the node number for this process */ int mynode(void) { return my_node; } /* mypid -- Returns the group number */ int mypid(void) { return my_group; } /* myhost -- Returns the node number of the host */ int myhost(void) { return numnodes(); } /** Communications **/ /* cread -- Special read for files on hypercube's high-speed disk system. We just issue a standard read instead. */ int cread(int fd, void *buffer, int size) { return read(fd, buffer, size); } /* gdsum -- Sum individual elements of an array on all processes */ void gdsum(double x[], long elements, double work[]) { register int i,j; double temp; if ((mybuffer()) == 0) { /* The first node in each partition sums the local data */ if (nodes_in_part > 1) { /* Only sum when we aren't the only ones in the partition */ for (i = 1; i < nodes_in_part; i++) { /* Get the next set of numbers to sum */ crecv(-2, work, elements * sizeof(double)); for (j = 0; j < elements; j++) { x[j] += work[j]; } } } /* Node 0 sums for all partitions */ if (mynode() == 0) { /* Only sum if there are more than one partition */ if (numparts() > 1) { for (i = 1; i < numparts(); i++) { /* Get the next set of numbers to sum */ crecv(-3, work, elements * sizeof(double)); for (j = 0; j < elements; j++) { x[j] += work[j]; } } } /* Only broadcast if there is more than one node */ if (nodes_in_part > 1) { /* Broadcast the results */ csend(-4,x,elements * sizeof(double),-1,mypid()); } } else { /* Each partition needs to send the partial sum to node 0 */ csend(-3,x,elements * sizeof(double),0,mypid()); /* Wait for the answer */ crecv(-4,x,elements * sizeof(double)); } } else { /* Send the data to local node to do the summation */ csend(-2,x,elements * sizeof(double),mypart()*4,mypid()); /* Wait for the answer */ crecv(-4,x,elements * sizeof(double)); } } /* getmsg -- Wait until a message is available. OUTPUT: next_message, set to ** the message found of the proper type */ static void getmsg(void) { int i; /* Only wait if a message is not already selected */ if (next_message != -1) return; /* Wait for a message for me */ semcall_one(msgavail, mybuffer(), -1); /* Search for those messages that are for me */ for (i = 0; i < numbuffers(); i++) { if (buffer[i].valid[mybuffer()] == 1) { if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) { next_message = i; return; } } } } /* cprobe -- Wait until a message of a specific type is available. OUTPUT: ** next_message, set to the message found of the proper type */ void cprobe(int type) { int i,j; /* Make sure all pending writes in application have occured */ fflush(stdout); fflush(stderr); /* See if a specific type was requested */ if (type == -1) { getmsg(); return; } else if ((next_message != -1) && (type == buffer[next_message].type)) { /* message was already located */ return; } else { while (1) { /* Wait for a message for me */ semcall_one(msgavail, mybuffer(), -1); /* Search for those messages that are for me and is the type I need */ for (i = 0; i < numbuffers(); i++) { if (buffer[i].valid[mybuffer()] == 1) { if ((buffer[i].dnode == mynode()) || (buffer[i].dnode == -1)) { if (buffer[i].type == type) { next_message = i; /* Put back all skipped messages back */ for (j = 0; j < numbuffers(); j++) { if (buffer[j].valid[mybuffer()] == 2) { buffer[j].valid[mybuffer()] = 1; semcall_one(msgavail, mybuffer(), 1); } } return; } else { /* Mark the message so that we don't look at it again */ buffer[i].valid[mybuffer()] = 2; } } } } } } } /* infocount -- Return the length of the message that will be received. */ int infocount(void) { getmsg(); return buffer[next_message].t_length; } /* infonode -- Returns the node that sent the message */ int infonode(void) { getmsg(); return buffer[next_message].snode; } /* infopid -- Returns the group (pid) of the node that sent the message */ int infopid(void) { getmsg(); return buffer[next_message].spid; } /* csend -- Synchronous message sending between two nodes. If the target node ** number is -1, then the message is broadcasted to all nodes. Limitations: ** Assumes that the message buffer is free to use. In other words, if ** an asynchronous send was previously done, we assume that a msgwait ** was done to ensure the previous message reached its destination. */ int csend(int type, void *msg, int length, int target_node, int group) { int i,j, sent_length = 0; char *source; i = mybuffer(); /* Fill in the message */ source = msg; buffer[i].type = type; buffer[i].dnode = target_node; buffer[i].spid = mypid(); buffer[i].snode = mynode(); buffer[i].t_length = length; while (length > 0) { /* Divide the message into smaller chunks */ buffer[i].length = min(MAX_MESSAGE_SIZE, length); memcpy(buffer[i].msg, source, buffer[i].length); source += buffer[i].length; sent_length += buffer[i].length; length -= buffer[i].length; /* Tell the node(s) the message is there */ if (target_node == -1) { /* Broadcast the message to nodes in this partition */ /* and to the process manager */ for (j=0; j < nodes&us.in&us.part + 1; j++) { buffer[i].valid[j] = 1; } semcall_all(msgavail,nodes&us.in&us.part+1, 1); /* Of course, we already have the message */ semcall_one(msgavail,i, -1); /* Wait until everyone receives the message */ semcall_one(msgfree, i, -nodes&us.in&us.part); } else { /* Point to point to another node */ j = bufferof(target_node); buffer[i].valid[j] = 1; semcall_one(msgavail, j, 1); /* Wait until it is received */ semcall_one(msgfree, i, -1); } } return sent_length; } /* crecv -- Synchronous message reception between two nodes. */ int crecv(int type, void *buf, int len) { int recv_len = 0, copy_len = 0, temp_len, total_len; int recv_node; char *target; /* Get a message of this type */ cprobe(type); target = buf; total_len = buffer[next_message].t_length; recv_node = buffer[next_message].snode; do { if (recv_node != buffer[next_message].snode) { /* Message is from another node, put off receiving */ buffer[next_message].valid[mybuffer()] = 3; } else { /* Message is from same node, add it the previous messages */ recv_len += buffer[next_message].length; temp_len = min(len - copy_len, buffer[next_message].length); if (temp_len > 0) { memcpy(target, buffer[next_message].msg, temp_len); target += temp_len; } copy_len += buffer[next_message].length; /* Acknowledge the receipt of the message */ buffer[next_message].valid[mybuffer()] = 0; semcall_one(msgfree, next_message, 1); } /* Indicate that no message has been selected */ next_message = -1; if (recv_len < total_len) { cprobe(type); } } while (recv_len < total_len); /* Scan buffers to restore any skipped messages */ for (i = 0; i < numbuffers(); i++) { if (buffer[i].valid[mybuffer()] == 3) { buffer[i].valid[mybuffer()] = 1; semcall_one(msgavail, mybuffer(), 1); } } return total_len; } /* isend -- Asynchronous message sending between two nodes. If the target node ** number is -1, then the message is broadcasted to all nodes. Limitations: ** Assumes that the message buffer is free to use. In other words, if ** an asynchronous send was previously done, we assume that a msgwait ** was done to ensure the previous message reached its destination. */ int isend(int type, void *msg, int length, int target_node, int group) { int i,j, sent_length = 0; char *source; i = mybuffer(); buffer[i].type = type; buffer[i].dnode = target_node; buffer[i].spid = mypid(); buffer[i].snode = mynode(); buffer[i].t_length = length; while (length > 0) { /* Divide the message into smaller chunks */ buffer[i].length = min(MAX_MESSAGE_SIZE, length); memcpy(buffer[i].msg, source, buffer[i].length); source += buffer[i].length; sent_length += buffer[i].length; length -= buffer[i].length; /* Tell the node(s) the message is there */ if (target_node == -1) { /* Broadcast the message to nodes in this partition */ /* and to the process manager */ for (j=0; j < nodes_in_part+1; j++) { buffer[i].valid[j] = 1; } semcall_all(msgavail,nodes_in_part+1, 1); /* Of course, we already have the message */ semcall_one(msgavail,i, -1); /* Wait for acknowledge on all but the last part */ if (length > 0) { /* Wait until everyone receives the message */ semcall_one(msgfree, i, -nodes_in_part); } } else { /* Point to point to another node */ j = bufferof(target_node); buffer[i].valid[j] = 1; semcall_one(msgavail, j, 1); /* Wait for acknowledge on all but the last part */ if (length > 0) { /* Wait until it is received */ semcall_one(msgfree, i, -1); } } } /* Return which buffer needs to be waited on */ return i; } /* irecv -- Asynchronous message reception between two nodes. Returns message ** identifier for acknowledging the message. */ int irecv(int type, void *buf, int len) { int mid; int recv_len = 0, copy_len = 0, temp_len, total_len; char *target; /* Get a message of this type */ cprobe(type); mid = next_message; target = buf; total_len = buffer[next_message].t_length; do { recv_len += buffer[next_message].length; temp_len = min(len - copy_len, buffer[next_message].length); if (temp_len > 0) { memcpy(target, buffer[next_message].msg, temp_len); target += temp_len; } copy_len += buffer[next_message].length; /* Acknowledge all but last partial message */ if (recv_len < total_len) { /* Acknowledge the receipt of the message */ buffer[next_message].valid[mybuffer()] = 0; semcall_one(msgfree, next_message, 1); } /* Indicate that no message has been selected */ next_message = -1; if (recv_len < total_len) { cprobe(type); } } while (recv_len < total_len); return mid; } /* msgwait -- Wait for a message to be received by the target node(s) */ void msgwait(int mid) { if (mid == mybuffer()) { /* Then it was a send to another node */ if (buffer[mid].dnode == -1) { /* Wait for everyone to receive the message */ semcall_all(msgfree, mid, -nodes&us.in&us.part); } else { semcall_one(msgfree, mid, -1); } } else { /* It was a receive from another node */ semcall_one(msgfree, mid, 1); } } /* flushmsg -- Forces the removal of pending messages to a node */ void flushmsg(int type, int target_node, int group) { /* Do nothing for now */ fflush(stdout); fflush(stderr); } /* mclock -- Return time in milliseconds. */ unsigned long mclock(void) { unsigned long current_time; time(¤t_time); return current_time * 1000; }