Project 2

A sequencer-based multicast protocol using Java

by Tim Kindberg. Revised October 2000

 

What is required

Your task is to create a sequencer-based multicasting service, using a slight variant of the simple version of the Amoeba protocol (the first protocol described in www.cdk3.net/coordination), and a small client application to exercise it. You will use Java RMI and multicast sockets.

The main classes in your implementation will be:

Below you will find: You need to write SequencerImpl.java and History.java from scratch.

Note that the scheme you produce will not precisely implement the Amoeba protocol. In that protocol, messages are sent unreliably to the sequencer in the first instance. In yours, messages to be multicast are handed to the sequencer using RMI.
 

Choosing a multicast address and time-to-live

SequencerImpl should take as an argument the IP multicast address that the corresponding Groups are to use. To avoid collisions with numbers chosen by others, incorporate a random number into the multicast IP address that you use -- e.g. 234.day.month.rand, where day and month are chosen from a team member's birthday, and rand is a random number between 1 and 254.

Don't set your sockets' time-to-live (TTL) to more than 1. That way, your multicast packets will not be transmitted beyond the local Mbone router.
 

Organisation

This project is suitable for implementation by groups of two or three students. The following division of labour might be used:
  1. Team member 1: SequencerImpl.java and any associated helper classes apart from History
  2. Team member 2: Group.java
  3. Team member 3: TestSequencer.java and History.java
Teams hand in all code and a written report of between four and six pages, explaining the design and its rationale.

Teams are required to demonstrate their implementation. The demonstration should include as many as possible of the following features: simple message sending, stress-testing, recovery from simulated multicast datagram loss, heartbeat messages and history truncation.
 

Reference implementation

A reference implementation (class files only) is here. Documentation for the classes is here.
 

Code

Sequencer.java (complete)

package sequencer;

import java.rmi.*;
import java.net.*;
import java.io.*;

public interface Sequencer extends Remote
{
    // join -- request for "sender" to join sequencer's multicasting service;
    // returns an object specifying the multicast address and the first sequence number to expect
    public SequencerJoinInfo join(String sender)
        throws RemoteException, SequencerException;

    // send -- "sender" supplies the msg to be sent, its identifier,
    // and the sequence number of the last received message
   public void send(String sender, byte[] msg, long msgID, long lastSequenceReceived)
        throws RemoteException;

    // leave -- tell sequencer that "sender" will no longer need its services
   public void leave(String sender)
        throws RemoteException;

    // getMissing -- ask sequencer for the message whose sequence number is "sequence"
   public byte[] getMissing(String sender, long sequence)
        throws RemoteException, SequencerException;

    // heartbeat -- we have received messages up to number "lastSequenceReceived"
   public void heartbeat(String sender, long lastSequenceReceived)
        throws RemoteException;
}
 

SequencerJoinInfo.java (complete)

package sequencer;

import java.io.*;
import java.net.*;

public class SequencerJoinInfo implements Serializable
{
    public InetAddress addr;
    public long sequence;

    public SequencerJoinInfo(InetAddress addr, long sequence)
    {
        this.addr = addr;
        this.sequence = sequence;
    }
}
 
 

SequencerException.java (complete)

package sequencer;

import java.io.*;

public class SequencerException extends Exception implements Serializable
{
    public SequencerException(String s)
    {
        super(s);
    }
}
 
 

Group.java (outline only)

package sequencer;

import java.net.*;
import java.util.*;
import java.io.*;
import java.rmi.*;

public class Group implements Runnable
{

    public Group(String host, MsgHandler handler, String senderName)  throws GroupException
    {
       // contact Sequencer on "host" to join group,
       // create MulticastSocket and thread to listen on it,
       // perform other initialisations
    }

    public void send(byte[] msg) throws GroupException
    {
        // send the given message to all instances of Group using the same sequencer
    }

    public void leave()
    {
       // leave group
    }

    public void run()
    {
        // repeatedly: listen to MulticastSocket created in constructor, and on receipt
        // of a datagram call "handle" on the instance
        // of Group.MsgHandler which was supplied to the constructor
    }

    public interface MsgHandler
    {
         public void handle(int count, byte[] msg);
    }

    public class GroupException extends Exception
    {
        public GroupException(String s)
        {
            super(s);
        }
    }

    public class HeartBeater extends Thread
    {
        // This thread sends heartbeat messages when required
    }

}
 

Marshalling data into a datagram

ByteArrayOutputStream bstream =
    new ByteArrayOutputStream(MAX_MSG_LENGTH);
DataOutputStream dstream = new DataOutputStream(bstream);
dstream.writeLong(aLong); // marshals a Long into the byte array underlying bstream
 .....
 

After marshalling, the data can be obtained from bstream for inclusion in a DatagramPacket:

    byte[] theData = bstream.toByteArray();

(For unmarshalling there are corresponding classes ByteArrayInputStream and DataInputStream.)