edu.rit.pj.cluster

Package edu.rit.pj.cluster

Package edu.rit.pj.cluster contains client and server programs for message passing parallel programming using Parallel Java (PJ).

See: Description

Package edu.rit.pj.cluster Description

Package edu.rit.pj.cluster contains client and server programs for message passing parallel programming using Parallel Java (PJ). PJ message passing programs use class edu.rit.pj.Comm, and class Comm in turn uses package edu.rit.pj.cluster. PJ message passing programs do not use package edu.rit.pj.cluster directly. For further information about PJ, see package edu.rit.pj.

PJ Cluster Operation
PJ Cluster Configuration
PJ Cluster Design
PJ Job Scheduler Debugging


PJ Cluster Operation

A cluster parallel computer typically consists of a frontend node and one or more backend nodes connected via a dedicated high-speed network. To run a PJ message passing program on a cluster parallel computer, several things must be set up first:

  • Set up a configuration file on the frontend node containing information about the cluster. For further information, see class Configuration.
     
  • Run the PJ Job Scheduler Daemon on the frontend node. This allows the PJ message passing program to determine which backend nodes are available for running a process. The Job Scheduler Daemon reads the configuration file to get information about the cluster. For further information, see class JobScheduler.
     
  • The PJ message passing program uses the Secure Shell (SSH) to start a backend process in the user's account on each backend node. Therefore, it must be possible to do an SSH remote login from the frontend node to each backend node. Each backend node must have SSH enabled.
     
  • Furthermore, it must be possible to do an SSH remote login from the frontend node to each backend node without the user having to type a password. This is done using public key authentication. SSH must have public key authentication enabled. Each user must set up SSH public and private keys in his or her account. For further information, refer to the SSH documentation.

Once the above setup has been done, you can run a PJ message passing program that uses class edu.rit.pj.Comm on the frontend node. Class Comm in turn uses a JobFrontend. The job frontend contacts the Job Scheduler Daemon running on the frontend node to start a parallel processing job using a given number of backend nodes. If there are not enough idle backend nodes, the job frontend waits until there are enough. Once the backend nodes have been assigned to the job, the job frontend prints the job number and the names of the backend nodes. Using SSH, the job frontend then starts a JobBackend process on each backend node. The job backend processes run in the user's account, and they run in the same current directory as the job frontend process. The job backend processes execute the parallel program, passing messages among themselves. The job backend processes' standard output and standard error streams are redirected to the job frontend process, which copies them to its own standard output and standard error.

The Job Scheduler Daemon uses the nn, np, and nt settings to assign resources to a job. These settings are specified by the -Dpj.nn, -Dpj.np, and -Dpj.nt flags on the java command line; for example:

    $ java -Dpj.np=4 . . .
For further information, see class edu.rit.pj.PJProperties.

If neither nn nor np is specified, the Job Scheduler will run the job with one process on one node.

If nn or np is specified but not both, the Job Scheduler will run the job on nn (or np) nodes with one process on each node.

If nn and np are both specified and nn >= np, the Job Scheduler will run the job on np nodes with one process on each node.

If nn and np are both specified and nn < np, the Job Scheduler will run the job on nn nodes with with more than one process on some or all of the nodes, apportioning the np processes as equally as possible among the nn nodes. Note that in this case, different nodes may be assigned different numbers of processes.

On each node, the Job Scheduler will assign nt CPUs to each process. If nt is not specified, the default is to use all the CPUs in the node, apportioning the CPUs as equally as possible among the processes on the node. Note that in this case, different processes may be assigned different numbers of CPUs.

The Job Scheduler Daemon has a web interface that lets you examine the cluster status. Just point your web browser at this URL:

    http://<hostname>:8080/
where <hostname> is replaced by the host name of the frontend node. The default port for the cluster status web interface is port 8080. The Job Scheduler Daemon can be configured to use a different port. For further information, see class Configuration.


PJ Cluster Configuration

Here are three examples showing how the PJ cluster middleware is configured to run on the RIT Computer Science Department's parallel computers. For further information about configuring the Job Scheduler Daemon, see class Configuration.

Paranoia Cluster

The Paranoia cluster consists of a frontend node, hostname paranoia.cs.rit.edu, and 32 backend nodes, hostnames thug01 through thug32. The frontend node is a Sun Microsystems UltraSPARC-II CPU with a 296 MHz clock and 192 MB of main memory. Each backend node is a Sun Microsystems UltraSPARC-IIi CPU with a 440 MHz clock and 256 MB of main memory. The cluster machines are interconnected via a dedicated 100 Mbps switched Ethernet. The frontend node also has a connection to the campus network.

Here is the configuration file for the Job Scheduler Daemon. It is stored in the file "/var/tmp/parajava/scheduler.conf" on the paranoia.cs.rit.edu machine.

# Parallel Java Job Scheduler configuration file
# Frontend node: paranoia
# Backend nodes: thug01-thug32

cluster RIT CS Paranoia 32-Node Cluster
logfile /var/tmp/parajava/scheduler.log
webhost paranoia.cs.rit.edu
webport 8080
schedulerhost localhost
schedulerport 20617
frontendhost paranoia.cs.rit.edu
backend thug01 1 thug01 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug02 1 thug02 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug03 1 thug03 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug04 1 thug04 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug05 1 thug05 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug06 1 thug06 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug07 1 thug07 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug08 1 thug08 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug09 1 thug09 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug10 1 thug10 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug11 1 thug11 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug12 1 thug12 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug13 1 thug13 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug14 1 thug14 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug15 1 thug15 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug16 1 thug16 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug17 1 thug17 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug18 1 thug18 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug19 1 thug19 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug20 1 thug20 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug21 1 thug21 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug22 1 thug22 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug23 1 thug23 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug24 1 thug24 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug25 1 thug25 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug26 1 thug26 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug27 1 thug27 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug28 1 thug28 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug29 1 thug29 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug30 1 thug30 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug31 1 thug31 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server
backend thug32 1 thug32 /usr/jdk/jdk1.5.0_17/bin/java /var/tmp/parajava/pj.jar -server

By saying webhost paranoia.cs.rit.edu, a user can view the Job Scheduler Daemon's job status web interface from any machine in the Internet. However, by saying schedulerhost localhost, a user must log into the paranoia.cs.rit.edu machine to run PJ jobs. (Since the Job Scheduler process is listening to localhost, the job frontend process must be running on the same host as the Job Scheduler process in order for the job frontend process to communicate with the Job Scheduler process.)

Although the backend nodes are not "server-class" machines, we want to run the Hotspot Server JVM on the backend nodes anyway. Thus, each backend entry includes the -server JVM flag to force the Hotspot Server JVM to run.

Here are the commands to run the Job Scheduler Daemon on the paranoia.cs.rit.edu machine.

    cd /var/tmp/parajava/
    java -classpath pj.jar edu.rit.pj.cluster.JobScheduler scheduler.conf

Parasite Machine

The Parasite machine is not actually a cluster parallel computer, rather it is a single SMP parallel computer. The PJ cluster middleware is used to run a job queue for the Parasite machine, ensuring that only one job runs at a time on the Parasite machine.

The Parasite "cluster" consists of a frontend node, hostname paragon.cs.rit.edu, and one backend node, hostname parasite.cs.rit.edu. The backend node is a Sun Microsystems machine with four Sun UltraSPARC-IV dual-core CPUs (eight CPUs total), a 1.35 GHz clock speed, and 16 GB of main memory.

Here is the configuration file for the Job Scheduler Daemon. It is stored in the file "/var/tmp/parajava/scheduler.conf" on the paragon.cs.rit.edu machine.

# Parallel Java Job Scheduler configuration file
# Frontend node: paragon
# Backend node: parasite

cluster RIT CS Parasite 8-Core SMP
logfile /var/tmp/parajava/scheduler.log
webhost paragon.cs.rit.edu
webport 8080
schedulerhost localhost
schedulerport 20617
frontendhost paragon.cs.rit.edu
backend parasite 8 parasite.cs.rit.edu /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar

Here are the commands to run the Job Scheduler Daemon on the paragon.cs.rit.edu machine.

    cd /var/tmp/parajava/
    java -classpath pj.jar edu.rit.pj.cluster.JobScheduler scheduler.conf

Tardis Hybrid SMP Cluster

The Tardis cluster consists of a frontend node, hostname tardis.cs.rit.edu, and 10 backend nodes, hostnames dr00 through dr09. The frontend node has a Sun Microsystems UltraSPARC-IIe CPU chip with a 650 MHz clock and 512 MB of main memory. Each backend node has two AMD Opteron 2218 dual-core CPU chips with a 2.6 GHz clock and 8 GB of main memory. The cluster machines are interconnected via a dedicated 1 Gbps switched Ethernet. The frontend node also has a connection to the campus network.

Here is the configuration file for the Job Scheduler Daemon. It is stored in the file "/var/tmp/parajava/scheduler.conf" on the tardis.cs.rit.edu machine.

# Parallel Java Job Scheduler configuration file
# Frontend node: tardis.cs.rit.edu
# Backend nodes: dr00-dr09

cluster RIT CS Tardis Hybrid SMP Cluster
logfile /var/tmp/parajava/scheduler.log
webhost tardis.cs.rit.edu
webport 8080
schedulerhost localhost
schedulerport 20617
frontendhost 10.10.221.1
backend dr00 4 10.10.221.10 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr01 4 10.10.221.11 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr02 4 10.10.221.12 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr03 4 10.10.221.13 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr04 4 10.10.221.14 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr05 4 10.10.221.15 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr06 4 10.10.221.16 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr07 4 10.10.221.17 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr08 4 10.10.221.18 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar
backend dr09 4 10.10.221.19 /usr/local/versions/jdk-1.5.0_15/bin/java /var/tmp/parajava/pj.jar

By saying webhost tardis.cs.rit.edu, a user can view the Job Scheduler Daemon's job status web interface from any machine in the Internet. However, by saying schedulerhost localhost, a user must log into the tardis.cs.rit.edu machine to run PJ jobs. (Since the Job Scheduler process is listening to localhost, the job frontend process must be running on the same host as the Job Scheduler process in order for the job frontend process to communicate with the Job Scheduler process.)

Here are the commands to run the Job Scheduler Daemon on the tardis.cs.rit.edu machine.

    cd /var/tmp/parajava/
    java -classpath pj.jar edu.rit.pj.cluster.JobScheduler scheduler.conf

Paradise and Paradox Machines

The PJ Library is installed on two additional RIT Computer Science Department SMP parallel computers, hostnames paradise.cs.rit.edu paradox.cs.rit.edu. However, the Job Scheduler Daemon is not running on these machines. Thus, a PJ program will run directly on these machines without going through the job queue.

The paradise.cs.rit.edu machine is a Sun Microsystems machine with four Sun UltraSPARC-IV dual-core CPUs (eight CPUs total), a 1.35 GHz clock speed, and 16 GB of main memory. The paradox.cs.rit.edu machine is a Sun Microsystems machine with four Sun UltraSPARC-II CPUs, a 450 MHz clock, and 4 GB of main memory.


PJ Cluster Design

A PJ message passing program running on a PJ cluster is a distributed system comprising four kinds of processes:

The Job Scheduler Daemon, a process always running on the frontend node, has a channel group (class ChannelGroup) that is listening for connections to a well-known port on localhost. The default PJ port is 20617.

Each backend node has an always-running SSH Daemon process, typically a system process.

The Job Scheduler Daemon is configured with the following information, provided in a configuration file (see class Configuration):

  • File name for logging Job Scheduler actions.
  • Host and port for Job Scheduler's web interface for displaying status.
  • Host to which job frontend processes will listen for connections from job backend processes.
  • Port to which Job Scheduler is listening for connections from job frontend processes (default 20617).
  • For each backend node:
    • Backend node's name.
    • Number of CPUs.
    • Host name for SSH remote login to the backend node.
    • Full pathname for executing the Java Virtual Machine on the backend node.
    • Java class path for the PJ Library on the backend node.
    • JVM command line flags (zero or more).

The user starts things off by running a PJ message passing program that calls the Comm.init() method (see class Comm). The Comm.init() method creates an instance of class JobFrontend, the job frontend object takes over execution, and the user's process becomes the job frontend process.

The job frontend process begins by setting up three channel groups; each channel group is listening for connections to a separate host and port:

  1. The "middleware channel group" is for the PJ middleware itself. The job frontend process uses the middleware channel group to communicate with the Job Scheduler Daemon and the job backend processes.
     
  2. The "world channel group" is used to implement the "world communicator," which the parallel program uses to do message passing. All the job backend processes, but not the job frontend process, are part of the world communicator.
     
  3. The "frontend channel group" is used to implement the "frontend communicator," which the parallel program can also use to do message passing. All the job backend processes and the job frontend process are part of the frontend communicator. (Actually, the frontend communicator is optional. A flag on the Comm.init() method says whether to create the frontend communicator.)

Next, the job frontend process opens a connection to the Job Scheduler Daemon. (If the Job Scheduler Daemon is not listening to the default PJ port, the user must specify the "-Dpj.port=" flag on the Java command line when running the PJ program.) The job frontend process and the Job Scheduler Daemon exchange the following messages during the course of the job:

Job                                   Job
frontend                              Scheduler
process                               Daemon
|                                     |
|                                     |  Nn = Number of nodes
|  Request job (username,Nn,Np,Nt)    |  Np = Number of processes
|------------------------------------>|  Nt = Threads per process
|                                     |
|  Cancel job (errmsg)                |  If insufficient resources
|<------------------------------------|
|                                     |
|  Assign job number (jobnum)         |  If sufficient resources
|<------------------------------------|
|                                     |
|  Assign backend (name, host, port)  |  Repeat as each backend
|<------------------------------------|  node is assigned; job
|                                     |  commences once all are
|                                     |  assigned
|                                     |
|  Renew lease ()                     |  Periodically while job
|------------------------------------>|  is in progress
|                                     |
|  Renew lease ()                     |  Periodically while job
|<------------------------------------|  is in progress
|                                     |
|  Job finished ()                    |  When job finishes
|------------------------------------>|
|                                     |
|  Cancel job (errmsg)                |  If user cancels job
|------------------------------------>|
|                                     |
|  Cancel job (errmsg)                |  If error in Job Scheduler,
|<------------------------------------|  or admin cancels job
|                                     |
|  Backend failed (name)              |  If frontend detects
|------------------------------------>|  backend has failed
|                                     |
The Job Scheduler Daemon renews a lease on the job frontend process, and vice versa, by sending a "renew lease" message every 60 seconds. If one side fails to receive a "renew lease" message for 150 seconds, the lease expires. If the lease expires, that process assumes the process at the other end has failed, and the process takes the appropriate actions to abort the job.

The Job Scheduler Daemon sends messages to the job frontend process to assign a job number for the job and to assign backend processes, np of them, to the job. When a backend process is assigned to run on a backend node, the job frontend process performs an SSH command to log into the user's account on the backend node; the requisite host name is given in the "assign backend" message. The job frontend process tells SSH to execute the following command string on the backend node:

sh -c "cd '<dir>'; nohup <jvm> -classpath '<classpath>' <jvmflags> \
edu.rit.pj.cluster.JobBackend <args> >/dev/null 2>/dev/null &"
After verifying the user using public key authentication, SSH changes the working directory to the job frontend process's working directory, <dir>. Then SSH executes a Java Virtual Machine using the <jvm> command from the configuration file. This JVM becomes the job backend process. The Java class path is <classpath> from the configuration file. Any JVM flags from the configuration file (<jvmflags>), as well as any flags the user specified on the command line with the -Dpj.jvmflags property, are included. The main program class is JobBackend, followed by class JobBackend's command line arguments. The job backend process's standard output and standard error streams are redirected to /dev/null; the job backend process will intercept these streams and send them to the job frontend process. The job backend process is set to ignore hangup signals (nohup). The SSH session terminates without waiting for the job backend process to terminate (because of the trailing &). This leaves the job backend process running in the background on the backend node.

Note that the SSH command string uses Unix shell syntax. If this syntax is not appropriate for the job backend nodes, the code in class JobFrontend must be changed to emit the proper command string.

The job backend process sets up its middleware channel group, world communicator, and frontend communicator using the information supplied in the command line arguments, which were specified by the job frontend process when it performed the SSH command. The job frontend process and the job backend process exchange the following messages during the course of the job using the middleware channel group:

Job                                   Job
frontend                              backend
process                               process
|                                     |
|  Backend ready (rank, mcg, wcg,     |  mcg = middleware channel group
|     fcg)                            |  wcg = world channel group
|<------------------------------------|  fcg = frontend channel group
|                                     |
|  Commence job (mcg[], wcg[], fcg[], |  Sent to each backend when all
|     properties, mainclass, args)    |  backends are ready
|----------------------------------->>|
|                                     |
|  Request resource (name)            |  To load a class from the user's
|<------------------------------------|  program
|                                     |
|  Report resource (name, bytecodes)  |
|------------------------------------>|
|                                     |
|  Write file (fd, buf, off, len)     |  Buffer goes to job frontend which
|<------------------------------------|  writes it to file or stdout/stderr
|                                     |
|  Renew lease ()                     |  Periodically while job
|------------------------------------>|  is in progress
|                                     |
|  Renew lease ()                     |  Periodically while job
|<------------------------------------|  is in progress
|                                     |
|  Backend finished ()                |  When main program finishes
|<------------------------------------|
|                                     |
|  Job finished ()                    |  Sent to each backend when all
|----------------------------------->>|  backend main programs have
|                                     |  finished
|  Cancel job (errmsg)                |
|------------------------------------>|  If job aborts
|                                     |
|  Cancel job (errmsg)                |  If error in job backend
|<------------------------------------|
|                                     |

The job backend process begins by sending a "backend ready" message to the job frontend process with the following information:

  • Job backend process's rank.
  • Host and port for the job backend process's middleware channel group.
  • Host and port for the job backend process's world communicator.
  • Host and port for the job backend process's frontend communicator (if the frontend communicator exists).
Once all the job backend processes have reported ready, the job frontend process sends a "commence job" message to each job backend process with this information:
  • Array of hosts and ports for the middleware channel group.
  • Array of hosts and ports for the world communicator.
  • Array of hosts and ports for the frontend communicator (if the frontend communicator exists).
  • Names and values of all defined Java system properties.
  • Fully qualified class name of the Java main program class to execute.
  • Java command line arguments (array of 0 or more strings).
Once the job commences running, the job frontend process and job backend processes exchange further messages as shown above.

In summary, the messages sent from and to each process are:

Sent From Sent To
Job Scheduler Job Frontend Job Backend
Job Scheduler   Assign backend
Assign job number
Cancel job
Renew lease
 
Job Frontend Backend failed
Cancel job
Job finished
Renew lease
Request job
  Cancel job
Commence job
Job finished
Renew lease
Report resource
Job Backend   Backend finished
Backend ready
Cancel job
Renew lease
Request resource
Write file
 


PJ Job Scheduler Debugging

As an aid to debugging, the Job Scheduler Daemon's web interface also lets you examine the Daemon's internal state. Point your web browser at this URL:

    http://<hostname>:8080/debug
where <hostname> is replaced by the host name of the frontend node. In the current release, this displays a stack dump for each thread.

SCaVis 1.7 © jWork.org