Professor: Samer Al-Kiswany | Term: Winter 2026

Lecture 1

Definition

A distributed system is a collection of independent computers that appear to the users of the system as a single computer.

Distributed software systems should provide easy-to-use abstractions, hide complexities, handle failures, efficiently use resources, and leverage emerging technologies.

Example

  • A network of workstations allocated to users
  • A pool of processors in the machine room allocated dynamically
  • A single file system (all users access files with the same path name)
  • User command executed in the best place (user workstation, a workstation belonging to someone else, or on an unassigned processor in the machine room)

Question

Why distributed?

  • Economics: Microprocessors offer a better price/performance than mainframes
  • Speed: A distributed system may have more total computing power than a mainframe
  • Inherent distribution: Some applications involve spatially separated machines
  • Reliability: If one machine crashes, the system as a whole can still survive
  • Incremental Growth: Computing power can be added in small increments

Primary features

  • Multiple computers
    • Concurrent execution
    • Independent operation and failures
  • Communications
    • Ability to communicate
    • No tight synchronization (no global clock)
  • “Virtual” computer
    • Transparency

A bunch of computers that work together, but appear to be as one.

Challenges

  1. Heterogeneity

Distributed systems require communication and coordination across a variety of:

  • Networks
  • Hardware
  • OS
  • Programming Languages
  • Implementations
  1. Scalability

Question

Can the system behave properly if the number of users and components increases?

  • Scale-up: Increase the number of “users” while keeping the system performance unaffected
  • Speed-up: Improvement in the system performance as system resources are increased

Tradeoffs need to be made (i.e availability and consistency CS451)

Impediments:

  • Centralized data
    • A single file
  • Centralized services
    • A single server
  • Centralized algorithms
    • Algorithms that “know it all”

Scalability is not black and white, there are shades of scalability. Some things that cannot be done at scale.

  1. Failure handling
  • Partial failures
    • Can non-failed components continue operation?
    • Can the failed components easily recover?
  • Failure detection
  • Failure masking
  • Failure tolerance
  • Recovery
  • An important technique is replication. This is a hard problem.
  1. Concurrency

Multiple clients sharing a resource how do you maintain integrity of the resource and proper operation (without interference) of these clients?

Managing failures in concurrent systems is difficult.

  1. Transparency

Transparency is not easy to maintain. There are many types of transparency:

TransparencyDescription
AccessLocal and remove resources are accessed using identical operations
LocationHide where an object is located
RelocationHide that an object may be moved to another location while in use
MigrationHide that an object may move to another location
ReplicationHide that an object is replicated
ConcurrencyHide that an object may be shared by sevreal independent users
FailureHide the failure and recovery of an object

Aiming at full distribution transparency may be too much. There are communication latencies that cannot be hidden.

Completely hiding failures of networks and nodes is impossible. You cannot distinguish a slow computer from a failing one. You can never be sure a server performed an operation before a crash.

Full transparency will cost performance. Keeping replicas up-to-date with the master takes time.

  1. Security

Confidentiality: Information is disclosed only to authorized parties.

Integrity: Ensure that alterations to assets of a system can be made only in an authorized way.

Authentication: How do we verify the correctness of a claimed identity?

Authorization: Does an identified entity have proper access rights?

Developing distributed systems: Pitfalls

Many distributed systems are needlessly complex, caused by mistakes that required patching later on. Many false assumptions are made.

False assumptions include:

Lecture 2

I skipped this lecture, it was a CS456 crash course.

Lecture 3

Communication Middleware

A distributed system organized as middleware.

The middleware layer extends over multiple machines.

Communication Alternatives:

Raw message passing

  • TCP/IP: Reliable byte stream
  • UDP/IP: Unreliable, packet oriented, connectionless

Problem: Low level not easy to use.

OS Abstractions: Distributed Shared Memory

On a page fault, get the remote page.

  • Pros: Easy to program
  • Cons: High overhead, hard to handle failures

Remove Procedure Call RPC (Xerox RPC)

Main idea: Call remote procedures as if they are local ones. The key advantage is that it is general, easy to use, and efficient.

Uses the well-known procedure call semantics.

The caller makes a procedure call and then waits. If it is a local procedure call, then it is handled normally. If it is a remote procedure, then it is handled as a remote procedure call.

Caller semantics is blocked send (send the request, stop, wait for the result, continue after it arrives). Callee semantics is blocked receive to get the parameters and a non-blocked send at the end to transmit results.

RPC architecture

flowchart TD

a[Caller Procedure]
b[Caller Stub]
c[Comm]
d[Comm]
e[Callee stub]
f[Callee procedure]


subgraph Caller
	a --> b
	b --> c
	c --> b
	b --> a
end
c -- Call packet(s) ---> d
subgraph Callee
	d --> e
	e --> f
	f --> e
	e --> d
end
d -- Result packet(s) ---> c
Interface{
	// has enough information for compile time
	// checking, and generating proper calling sequence
	int func1(int arg)
	int func2(char * buffer, int size)
}
client.c
// Client stub for func1
int func1(int arg) {
	// Create req
	// Pack fid, arg, etc. 
	// Send req
	// Wait for reply
	// Unpack results
	// Return result
}
server.c
// Server stub for func1
int func1 {
	// Unpack request
	// Find the requested server function
	// Call the function with arg in the request
	// Pack results
	// Send results
}

Question

How about for func2?

Problems

  • * buffer points into different address space (solution: copy/restore if needed)
  • Can be input or output (solution: add an argument type (in | out | both)
  • Complex data structures

Question

How does the client know where to send the RPC request?

Binding

Server side:

  • Load the function in memory
  • Populate export table (fid, fname, call_back function). Each function is assigned a unique id (fid) every time the function is exported
  • Export interface

In a register, the server ip and function names supported is stored.

Client Side:

The client queries the registry to see who provides the function they want to call (fname).

The registry will respond with the appropriate server ip.

The client will then query the sever to get the binding info for the function.

The server will respond with the fname, fid, and the client stores this info in memory.

At runtime, the client already knows the server address and function id. The client sends the function id and arguments to the server. The server looks up the fid in the export table, runs the function, and sends back the result.

Runtime choices:

TCP/IP

  • High overhead on server, many connections and state
  • High latency (setting/closing connections)

UDP/IP

  • Unreliable

Fault Tolerance

Lost Request

When the function is called, client sends the fid and arguments to the server. Request was lost in the network on transit.

The client waits, times out, and the client retries. Request hits the server and returns the result back to the client. Retrying was safe because the function ran once.

Lost Reply

Client sends the fid and arguments to the server. Server receives the payload, runs the function, and sends the result back to the client. The reply back to the client is lost.

Client times out retries sending the function id and arguments to the server. The server runs the function and returns the result back to the client.

Problem

The function was executed twice.

Call Semantics

At least once: the function is executed at least once.

At most once: the function is executed once or not at all.

Neither one is perfect: exactly-once is desired, but has high overhead.

Xerox RPC implements, at most once semantic: If success is returned exactly once: it is guaranteed that it was executed exactly once. On failure, then it was called at once.

A client issues one RPC at a time.

Xerox at most once semantics:

Key idea: Every RPC request is uniquely identifiable.

The client sends the client ip, process id, sequence number (strictly increasing number), fid, arguments. This tuple uniquely identifies this call.

Server receives this, checks export table, check previous calls table in memory, runs function, returns result.

The previous call table checks if it has seen this (client_ip, process_id, seq_num) tuple before.

We return the client ip, process id, sequence number and result.

Fault Tolerance: Lost Reply

Client calls function, sends client ip, process id, sequence number, fid, arguments.

Server runs the function, lost in transit on the way back to the client.

The client times out and sends the request again (same client ip, process id, sequence number, fid, arguments). The server checks the previous_calls table and notices that this is a duplicate. It returns an ACK.

Fault Tolerance: Server Crash

Client calls function, sends client ip, process id, sequence number, fid, arguments. Server runs function, updates previous calls table. Then a reboot happens, server does not send anything back.

Client times out and sends the exact same request again. The server checks the previous calls, but there’s nothing in it, so it runs the function again and sends the result back to the client.

Solution to this is to rebind on server restart with new fid.

Client calls function, sends client ip, process id, sequence number, fid, arguments. Server runs function (export table has fid = 5). Then reboot hits. Rebind functions with a different fid. So now the same function has a fid = 11 in the export table.

The client times out and sends the same request as before (with fid = 5). The fid doesn’t match, so the server responds with an error that there is no such function id.

Upon receiving this error, the client should query the registry again for the new bindings.

Fault Tolerance: Client failure with repeated sequence numbers

Client calls function, sends client ip, process id, sequence number (10), fid, arguments. Server checks previous call table, populates it with the client ip, process id and sequence number. It runs the function, and returns the result.

Reboot after the client receives this.

We then have a later call of a different function, client sends client ip, process id, sequence number (1), fid, arguments. Server checks previous calls table, notices it is a duplicate (since sequence numbers are meant to be strictly increasing), and returns ack (without executing the function!). The server doesn’t know if the client rebooted and started fresh or if it is a delayed duplicate packet.

The solution is to add clock-based conversation id, to every call. <conversation_id, seq #> is strictly increasing.

Lecture 4

File system is the Operating Systems interface to storage.

Virtual file system is an abstraction layer in the Unix kernel that acts as an interface for the file system. Provides an API for the file system.

Specific file systems are assigned subtrees.

Application wants to read some data. Send call to VFS, VFS gets data from the right local file system (based on path).

Application wants to write some data. Send call to VFS, writes to file system, file system returns with success, though data isn’t written to disk. fsync is a blocking call that writes the data disk.

We don’t write to disk every time for performance reasons, must invoke fsync.

Hash table and Data table (Hash table stores pointer to the data in Data table).

If we want to update a key, we perform:

write(ptr) then write(data). This is risky if we crash.

So we can do

write(data)
write(ptr)
fsync(data)
fsync(ptr)

This is still not the right way to do it. There is no guarantee of ordering.

Correct ordering is

write(data)
fsync(data)
write(ptr)
fsync(ptr)

No guarantee of ordering for performance reasons. i.e. head is over a location on disk but the write operation for that location is not until later. We can reorder the operations to optimize speed.

Distributed file system emulates non-distributed file system behaviour on a physically distributed set of files, usually within an intranet.

We require transparency, concurrent access, file replication, and security.

Sun Network File System

On client computer, virtual file system goes remote to NFS client, NFS client uses NFS protocol to contact NFS server on server computer (uses RPC). NFS server sits in the UNIX kernel (not necessarily, just optimization), sends request to VFS, VFS gets data from UNIX file system.

flowchart TD
a[VFS]
b[NFS client]
c[NFS server]
d[VFS]
e[UNIX file system]
f[Application Program]
g[Unix file system]

subgraph Client computer
f -- UNIX system calls --> a
subgraph UNIX kernel
a --> b 
a --> g
end
g --> ida[(Disk)]
end
b -- NFS Protocol ---> c
subgraph Server Computer
subgraph UNIX kernel
c --> d
d --> e
end
e --> id[(Disk)]
end

NFS V.3 Design Principles

  • Access transparency: No distinction between local and remote files.
  • Stateless design: Server does not maintain any state about clients. Easier implementation and easier to handle crashes. File operations are idempotent simplifies fault tolerance and implementation.

All NFS operations take a file handle.

Idempotent Example

Client wants to write to a file, performs: fd = open(path) write(fd, buf, size)

Server will write to file, send ack

Question

What if ack is lost?

Client will timeout, will run the same write again and send to server write(fd, buf, size)

Server will write again, send ack.

Error

This is not idempotent.

We include an offset in write(fd, buf, size, offset). Now this is idempotent.

Sketchy idempotence

Client wants to unlink file1.

Server unlinks the file, returns an ack. Ack is lost in transport.

Client times out and sends unlink file1 back to server.

Server is going to return an error since the file doesn’t exist anymore.

We account for this in NFS client side design. If we delete a file and it returns an error, then we have successfully deleted it.

File handler uniquely identifies file on the server side.

flowchart TD
a[VFS]
b[NFS server]
c[FS1]
d[FS2]

a --> b
a --> c[(FS1)]
a --> d[(FS2)]

We use i-node to uniquely identify files and to determine which file system has our files.

Warning

Client 1 writes to file with an inode 10. Server performs this action.

Client 2 unlinks file with inode 10 (delete). Client 2 creates a new file an i-node is generated to be assigned to the file. i-nodes are reused, this new file can be assigned i-node 10.

Client 1 writes to inode 10 (thinking it is the own file), but it is corrupting the new file.

To solve this we store an additional parameter, the i-node generation number. Store the i-node.i-node generation number in the function call.

It would first be i-node.0 from Client 1, and when Client 2 creates the new file and the i-node is the same, it would be i-node.1. When Client 1 tries to write to i-node.0, the server will respond with an error, the file doesn’t exist anymore. This solution is necessary since there exist a limited amount of i-node values.

To get the file handle of the root directory on a remote server, we use mount.

Client requests for mount path, server checks if path is exported, checks user permissions, returns file handle of exported root directory.

Lookup:

NFS does lookup iteratively one step at a time .

Client sends lookup for (people_fh, "bob")
Server responds with bob_fh
Client sends lookup for (bob_fh, "hw")
Server responds with hw_fh
Client sends lookup for (hw_fh, "a3")
Server responds with a3_fh

This is slow. Recursive would be faster but it is not implemented for the following reason.

Nested Mounting

Server A imports a directory from Server B
Client imports directory from Server A. It wants that directory from Server B. 
Client sends request to Server A for the fh that is from Server B. Server A realizes it is from Server B, requests it, receives it, and sends it to Client. 

This is dangerous. Server B may not want the client to access the directory, but when Server A is requesting the directory on behalf of the Client, Server B does not know this. The client should request to Server B itself.

The client needs to explicitly import subdirectory from server B.

To support all of this we need to do iterative lookup (recursive won’t work). We can still optimize by chopping into segments.

Read operation: Client sends read request for file handle. Server performs read and returns data blocks.

Write operation: Client writes to file handle. Server does persistent write back ack to client. Write through caching.

NFS requests transmitted via Remote Procedure Calls (RPCs). Client sends authentication information, checked against access permission in file attributes.

Any client may address RPC requests to server providing another client’s identification information. Introduction of security mechanisms in NFS v4.

Semantics of File Sharing

On a single processor, when a read follows a write, the value returned by the read is the value just written. In a distributed system with caching, obsolete values may be returned.

MethodComment
UNIX semanticsEach operation on a file is instantly visible to all processes
Session semantics (NFS)No changes are visible to other processes until the file is closed
Immutable filesNo updates are possible
TransactionAll changes occur atomically

Lecture 5

WatDFS Project

Create a client module that acts as local file system, hook it to VFS, and have it issue commands to the remote server.

The client module does not need to be in the kernel space. We use FUSE. FUSE helps build a file system without writing kernel code.

Hook FUSE Kernel module to client module via libfuse. FUSE Kernel module will take system calls, send the calls to the user space client module, which the user will ship to the module. Server module executes the operations, computes result, and sends it back to the client module.

Requests from the client to the server are via a custom RPC library.

Components for each call

  1. libfuse handler (handle from FUSE kernel module in kernel space to user space)
  2. RPC calls
  3. RPC listener on the server in user space
  4. Request handler on the server in user space
flowchart TD
f[Applications]
g[Virtual File System]
h[Fuse Kernel Module]
i[libfuse]
a[libfuse handler]
b[RPC calls]
c[RPC listener]
d[Request handler]
e[Virtual File System]

subgraph client
	subgraph user space
		f
		i
		i --> a
		a --> b
	end
	f --> g
	h --> i
	subgraph kernel
		g
		g --> h
	end
end

b --> c

subgraph server
	subgraph user space
		c --> d
	end
	d--> e
	subgraph kernel 
		e
	end
end

Implementation walkthrough of getting file attributes (for request handler)

int watdf_cli_getattr(void *userdata, const char *path, struct stat *statbuf);
 
int stat(const char *pathname, struct stat *statbuf);

Part 2: Setting up a RPC Call

// GET FILE ATTRIBUTES (need to apply this for all system calls)
int watdf_cli_getattr(void *userdata, const char *path, struct stat *statbuf) {
	// Set up the RPC call
	
	int ARG_COUNT = 3;
	
	// Allocate space for the output arugments
	void **args = new void*[ARG_COUNT];
	
	// Allocate the space for arg types
	int arg_types[ARG_COUNT + 1];
	
	int pahtlen = strlen(path) + 1 // +1 for null terminator 
	
	// Fill in the arugments
	// The first argument is the path
	arg_types[0] = (1 << ARG_INPUT) | (1 << ARG_ARRAY) | 
				(ARG_CHAR << 16u) | pathlen;
				
	// The first 3 bits of the first byte tell us whether the argument is an input, needs to be returned as an output, and whether the argument is an array, respectively. The last 2 bytes are the length of the array. 
	args[0] = (void *)path;
	
	// The second argiment is the stat structure
	arg_types[1] = (1 << ARG_OUTPUT) | (1 << ARG_ARRAY) | 
				(ARG_CHAR << 16) | sizeof(struct stat); // statbuf
	
	// The third argument is the return code
	// TODO: Fill in this argument type and argument
	
	// Finally, the last position of the arg type is 0
	arg_types[3] = 0;
	
	
	// Making the RPC call
	
	int rpc_ret = rpcCall((char *)"getattr", arg_types, args);
	
	// Handle the return
 
}

If the size is larger than the limit (), then we have to write in multiple RPC calls.

Part 3: WatDFS Server: Setup RPC call

int main(int argc, char *argv[]) {
	// Set up arguments
	int argTypes[4];
	
	// First is the path
	argTypes[0] = (1 << ARG_INPUT) | (1 << ARG_ARRAY) |
	(ARG_CHAR << 16) | 1;
	
	argTypes[1] = (1 << ARG_OUTPUT) | (1 << ARG_ARRAY) | 
	(ARG_CHAR << 16) | 1;
	// Retcode
	argTypes[2] = (1 << ARG_OUTPUT) | (ARG_INT << 16);
	// Fill in the null terminator
	argTypes[3] = 0;
	
	// Register funciton
	ret = rpcRegister((char*)"getattr", argTypes, watdfs_getattr);
	// hand over control to RPC library (rpcExecute)
}

Part 4: WatDFS Server: Define RPC function handler

int watdfs_getattr(int *argTypes, void **args) {
	// Get the arguments
	
	// The first argument is the path
	char *short_path = (char*) args[0];
	
	// The second arugment is the stat structure
	struct stat *statbuf = (struct stat*) args[1];
	
	// The third argument is the return code
	// which will be 0, or -errno
	int *ret = (int *)args[2];
	
	// Convert path name
	char *full_path = get_full_path(short_parth);
	
	// Initially we set the return code to be 0 
	*ret = 0;
	
	// Make the stat system call
	// Remember to set *ret to -errno if there is an error
	
	// The RPC call should always succeed, so return 0
	return 0;	
}

Need to repeat this 8 more times for Project 1.

Example

Path manipulation

Home is on client, rest is on server (mounted).

stat('/mnt/home/Bob/F.dat', SB)

watDFS_cli_getattrib('/Bob/F.dat', SB) (this is the path given).

full_path() takes the above path and converts it into the full path.

Caution

Most errors happen in passing the arguments between the client and server. Print arguments and argument types before calls.

Lecture 6

Optimizations

  • Client-side cache: File pages, directories, and file attributes.
  • Read-ahead: Read the next page in the file. Once the file is open for read, NFS client will not wait for a read call, it will start reading.
  • Write-delay (behind): Cache the writes (client or server side), until the page is flushed from the cache.
  • Commit protocol: Server delays writing to disk until client commit(s).
    • Client sends a write to the server. VFS on server will store the write in memory, send ACK back. Client sends another write, adds to cache, server ACKs back. Client commits, writes from cache get flushed to disk on server, ACKs back.

Question

Why is this okay for app developers?

Similar protocol for writing locally. Must use fsync to commit.

Caching in server and client is indispensable to improve performance.

Server caching

  • Disk caching as in non-network filed systems
  • Write-through caching
    • Store updated data in cache and written on disk before sending reply to client. Inefficient if writes are frequent
  • Commit operation
    • Stored only in cache memory

Client caching write operations. Mark modified cache page as ‘dirty’ and schedule page to be flushed to server. Flush happens when closing of file, or when sync is used.

The data cached in client may not be identical to the same data stored on the server.

Timestamp-based scheme used in polling server about freshness of a data object.

  • : Time cache entry was last validated
  • : Time when block was last modified at the server as recorded by client/server
  • : Freshness interval

Freshness condition: at time

If , then entry is presumed to be valid. If not , then needs to be obtained by a getattr call. If , then entry presumed valid; update its to current time. Else obtain data from server and update .

Lookup: Instead of iterative lookup NFS v4 performs full path lookup. Support for callback features.

flowchart TD
a[Old file]
b[Local copy]
c[Updated file]
subgraph Client
	b
end
subgraph Server
	direction TB
	a
	c
end
a --Server delegates file--> b
b --Client sends return file--> c

Client --Client asks for file --> a
Server --Server recalls delegation--> b

Andrew File System (AFS)

Infrequently updated shared file and local user files will remain valid for long periods of time. Allocate large local disk cache. Key assumptions usually small files, reads are more common than writes, sequential access, most files are used by only one user, burstiness of file references.

Design decisions:

  • Whole-file serving: Entire contents of directories and files transferred from server to client (there is no recall delegation)
  • Whole-file caching: When file is transferred to client, it will be stored on that client’s local disk.

Implementation of System Calls

When a file is opened, check if we have a cached copy of the file. Check if there is a valid callback promise. We don’t need to talk to the server. If we don’t have the file, or there is no valid callback promise, we need to get a fresh copy of the file from the server (and we get a valid callback promise). Place the copy of the file in the local file system, opened by the UNIX kernel, treated like a local file.

When closing the file, we notify Venus that the file has been closed. If the local copy has been changed, send a copy to the Vice Server that is the custodian of the file. Replace the file contents and send a callback to all other clients holding callback promises on the file.

AFS is a server initiated protocol, keep a list of clients, server will tell clients when there is a change (can only happen after a client closes a file). As opposed to frequent polling in Sun NFS.

Dealing with failures: The client may have missed some callbacks. Venus sends cache validation request to the Vice server (contains the file modification timestamp). If timestamp is current, server sends valid and callback promise is reinstated with valid. If timestamp is not current, server sends cancelled.

Problem: Communication link failures. Callback must be renewed with above protocol before new open if a time has lapsed since file was cached or callback promise was last validated.

Last Write Wins: AFS does not control concurrent updates of files, this is left up to the application.

Back to Communication

  • Remote Procedure Call (RPC)
  • Remote Method Invocation (RMI) - object-oriented
  • Message-Based Communication

If client and server are on different CPU / OS, they may represent memory differently. Need to translate.

Option 1: Device a canonical form for data types and require all senders to convert their internal representation to this form while marshalling. This can be inefficient if both processes use the same formats, there is unnecessary conversation at both ends.

Option 2: Sender uses its own native format and indicates in first byte of message which format is used. Receiver stub examines first byte to see what the client is. Everyone can convert from everyone else’s format.

Client locates the server via dynamic binding.

The client kernel cannot differentiate between a) request to server, receives, executes, crashes, no ACK and b) request to server, receives, crashes, no ACK.

3 schools of thought:

  1. Wait until the server reboots and try the operation again. At least once semantic
  2. Give up immediately and report back failure. At most once semantics
  3. Client gets no help, guarantees nothing. Easy to implement.

Google RPC (gRPC) uses TCP-IP, protobufs for marshaling, at-most-once semantics. Missed deadlines are handled by applications.

flowchart LR
a[Client App]
b[Server Application]

subgraph gRPC_Client_Library
a
end

subgraph gRPC_Server_Library
b
end
gRPC_Client_Library <--The wire--> gRPC_Server_Library

If failure in gRPC client library, then try again. If failure in gRPC server library (before a hit to the server application), then try again.

Set a deadline on any gRPC call made (otherwise if stuck at server, client will be waiting forever).

Lecture 7

Remote Object is capable of receiving RMIs.

Remote Object Reference is needed by an object performing an RFI. Refers to the server of RMI. Can be passed as a parameter or result.

Client machine has a proxy that marshals the request to the server. On the server side, skeleton receives request and invokes same methods at the object. Similar to RPC.

RMI is implemented via a request/reply protocol (blocking).

Remote Interface

public interface HelloInterface extends Remote{
 
	
	// Throws a RemoteException if the remote invocation fails
	public String say() throws RemoteException;
}

Remote Object

public class Hello extends UnicastRemoteObject implements HelloInterface {
	private String message;
	
	public Hello (String msg) throws RemoteException {
		message = msg;
	}
	
	public String say() throws RemoteException {
		return message;
	}
}

Server

public class HelloServer{
	
	public static void main (String [] args) {
		try {
			Naming.rebind ("SHello", new Hello ("Hello world!"));
			
			System.out.println("HelloServer is ready.");
		} catch (Exception e) {
			System.out.println("HelloServer failed : " + e);
		}
	}
 
}

Client

public class HelloClient {
	public static void main (String [] args) {
		try {
			HelloInterface hello = (HelloInterface)
			Naming.lookup("//server_URL/Shello");
			System.out.println(hello.say());
		} catch (Exception e) {
			System.out.println("HelloClient failed : " + e);
		}
	}
}

At most-once semantics: Invoker receives result, in which case they know the operation was executed exactly once. Or an exception is received informing that no result was received, in which case the operation was either performed once or not at all.

Remote Reference Module sits in the server and maps remote object id to a reference of a local object.

Parameter Passing

If the argument is a primitive type pass by value

Object implementing Remove interface pass by reference

Object implementing Serializable interface pass by copy out

Non serializable object exception is raised

Garbage collection in non-distributed systems:

Solutions:

  • Reference counting
  • Tracing based solutions (mark and sweep)

Add a visited flag. Start from the core set of objects, follow all references and flag them. We then look at the total list of objects, and garbage collect the ones that are not marked.

The problem with reference counting is maintaining a proper count in the presence of unreliable communication.

We can copy the reference and let the destination increment the counter. However P1 can delete its reference before P2 increments the counter.

We can first increment the counter, get the reference, and then delete the reference after. Problem is that the two processes are coupled.

Solution is reference listing.

The skeleton maintains a list of proxies.

Advantage: Adds and deletes are idempotent.

Drawback: List of proxies can grow large.

Caution

Client process could crash, never clearing a reference from a server.

Use timeouts.

Lease based approach: Skeleton promises to keep info only for limited time. (Created by David Cheriton).

Message-Based Communication

Lower-level interface to provide more flexibility. Two abstract primitives are used to implement these: send and receive.

Message-oriented middleware (MOM)

Producers broadcast to brokers, brokers announce to consumers. Produces can send task to broker, broker will send to consumer.

Synchronous: The sender is blocked until its message is stored in the local buffer at the broker.

Asynchronous: The sender continues immediately after executing a send. Message is stored in the local buffer at sending host.

Transient: Sender puts message on the net and if it cannot be delivered to the sender, it is lost.

Persistent: The message is stored in the communication system as long as it takes to deliver the message to the receiver.

Question

How does this abstraction differ from other communication middleware?

MOM systems enable:

  1. Space decoupling: Producers and consumers don’t know each other.
  2. Time decoupling: Producers and consumers asynchronously commnuicate with the MOM service.
  3. Flexibility: Allow providing reliability and delivery semantics.
  4. Separation of concerns and extensibility: Clients communicate through a simple API.

MOM Topologies

  1. Single broker
  2. Mesh
  3. Flexible
  4. Peer-to-Peer

Single Broker

Single node runs the MOM service. Hosts all topics, receives all message, serves all consumers.

Single point of failure. Mitigation is replication.

Limited scalability and performance: Mitigation is partitioned topics.

Ill suited for multiple data centre deployments.

Mesh

Brokers can send to other brokers.

Flexible

Support a combination of single broker and mesh topologies. Can build a graph that fits unique deployments.

Brokerless peer-to-peer

No central brokers. Producers locally run the MOM service.

A discovery service provides consumers with the address of producers of a topics of interest. Consumers need to subscribe to all producers contributing to the topic. There is no coordination between producers.

Lecture 8

Question

What is a data centre?

Costco without the sign.

Houses tens of thousands of servers, organized into racks. Switches are usually placed in the middle of the rack (for wiring purposes).

Many non computer science / computer engineering problems in data centres.

Cooling strategies:

  • Cold aisle, hot aisle. Servers are faced to each other. Pump cold air into the front of the servers, hot air gets pushed out to the back.
  • Put servers in liquid baths (they do this in MC)

The new M4 building will use servers to heat water, and use the water to heat the building.

Compute, storage, network are three main components of a compute cluster.

Design metrics are performance (requests per second), cost (requests per dollar), power (requests per Watt).

Compute Node Design

Option 1: SMP: Symmetric Multi-processor

Shared memory multiprocessor. Set of CPU each with its own cache, sharing the main memory over a single bus. Very efficient in communication between cores, high performance per node, expensive.

Option 2: Commodity Nodes

Purchase same number of cores with off the shelf components, use switches to connect them. Much lower cost, equal performance to SMP at scale, fails more often.

Execution time CPU time communication time

Assume local access takes 100ns, and remove access takes s.

Communication time # of operations [ ns s ()].

As cluster scales, benefits of local diminishes.

Option 3: Wimpy nodes

Using low end CPU (e.g. ARM processors). These are lower cost and lower energy, but hard to use effectively.

Design disadvantages:

Amdahl’s law bounds:

Task execution is ( is the ratio of code that can run in parallel).

After parallelization on cores: .

If , then second term goes to 0.

Speed-up . From the above, .

Lower utilization with wimpy nodes. Better parallelization, but worse sequential portion since hardware performance is worse. Good if the code is very parallelizable. Higher communication cost since more threads, harder to program, higher networking cost.

Storage Node Design

Option 1: Network attached storage (NAS)

Dedicated storage appliance. This has simpler deployment, control and management, lower network overhead.

Option 2: Distributed storage: Aggregate storage space from nodes in cluster.

Reduce cost by using cheap disks, will fail more but will replicate anyway.

Has lower cost, higher availability, higher performance, higher data locality. But comes with higher network overhead (network sees the data number of times it is replicated), lower component reliability.

Use NAS storage for long-term reliability. Use in-cluster storage for something that needs high performance access.

Network Design

Challenge: Build high speed, scalable network at lower cost.

We don’t want link between TOR and Aggregation to be bottleneck. Needs to be able to support bandwidth of all servers sending their max size at once. Same goes for the core from the aggregations. At the core level, these are very expensive.

Optimization tricks:

  • Reduce core bandwidth: 5:1 ratio is common
  • Multiple networks (network for storage, control networks)

Data Centre Design Implications

Software using DC needs to be aware of the storage hierarchy. Software fault tolerance is necessary, server crashes happen every 30 minutes.

Lecture 9

Software using DC needs to be aware of the storage hierarchy.

Back of envelope math.

Data locationResponse TimeThroughput
RAM100ns20GBps
Hard Disk10ms80MPbs
Network-Rack70 s128 MBps (1Gbps)
Network-DC500 s32 MBps

Response Time: How long does it take to get one item from the device

Throughput: Volume

A RAMA DiskB RamB DiskC RamC Disk
Time1
100ns
4
10ms
2
70.1 s
4
10ms
3
500.1 s
4
10ms
BW1
20 GBps
3
80 MBps
2
128 MBps
3
80 MBps
4
32 MBps
4
32 MBps

Special Cases: Pipeline vs Stop-and-Forward

flowchart LR
A -- 128 MBps --> B 
B -- 128 MBps --> C

Question

How long does it take to transfer 1GB of data from ‘s memory to ‘s memory?

Stop-and-forward: waits to receive all data from before sending to .

Pipeline: As soon as gets data from , starts sending to .

Stop and forward, sending 1 GB from to

:

Combining gives .

Pipelined approach

flowchart LR
A -- 128 MBps --> B 
B -- 128 MBps --> C
A -. 128 MBps .-> C

Throughput is slower of the two throughputs. Latency is

.

Bottleneck Link

flowchart LR
A -- 128 MBps --> B
B -- 64 MBps --> C
A -. 64 MBps .-> C

.

Concurrent Transfers

Question

How long does it take to transfer 1GB of data from memory to and memories, concurrently?

flowchart LR

Tor -- 128 MBps --> B
Tor -- 128 MBps --> D
A -. 64 MBps .-> Tor
A -- 128 MBps --> Tor
A -. 64 MBps.-> Tor
Tor -. 64 MBps.-> B
Tor -. 64 MBps.-> D

Contention is one input but two outputs. TCP is fair, splits the throughput.

Each one takes MBps.

.

.

Take the max, .

Question

How long does it take to replicate 1 GB of data from memory to and memories, concurrently?

flowchart LR
x[DC Net]

Tor -- 128 MBps --> B
Tor -- 32 MBps --> x
x --> C

A -.32 MBps.-> Tor
A -- 128 MBps --> Tor
A -.96 MBps.-> Tor
Tor -.96 MBps.-> B
Tor -.32 MBps.-> x
x -.32 MBps.-> C

TCP will try and split in half to Tor first, but DC Net can only take 32 MBps, so only gives it 32 MBps. The unused allocates this to link.

.

.

Take the max, .

How long to generate image results page (30 thumbnails)

Design 1: Read serially, thumbnail 256K images on the fly .

Design 2: Issue reads in parallel:

Architecture Models

System architectures define the structure of the system. Identify the components, define the functions, define relationships.

Styles: Organize into logically different components, and subsequently distribute those components over the various machines.

  • Layered style (client server systems)
  • Object-based style (service-oriented architectures)

System Architectures

  • Client-server
  • Multitier systems
  • Peer-to-peer systems

Multiple-client/single server has problems. Server forms bottleneck, server is a single point of failure, system scaling is difficult.

Multiple clients/multiple servers has multiple approaches.

flowchart TB
a[Client]
b[Client]
c[Server]
d[Server]
e[Server]

subgraph Clients
direction TB
a 
b
end
subgraph Service
direction TB
c --> d
d --> c
d --> e
e --> d
end
a --> c
c --> a
b --> e
e --> b

Alternatively, the initial server can invocate other servers.

flowchart LR
a[Client]
b[Client]
c[Server]
d[Server]
a
b
a --> c
c --> a
b --> c
c --> b
c --> d
d --> c

Lecture 10

Thank you to Kaushik for these lecture notes

Service Design

Client-Facing Services Performance Metrics

  • Throughput: The number of requests that can be served
  • Latency: How long does it take to serve an individual request? A short average latency is good, but tail latency is what’s critical
  • Fairness: A single or few requests that are heavy cannot slow down other short requests
  • Programmability: Ease of programming, debugging, maintenance

System Model

Load manager balances the load across the logic servers. Approaches

DNS-based: Go to DNS translation and route specific DNS addresses to different servers. Issues: can take hours to adapt and not available to small clusters.

Appliance or switch (L4). Balance at the TCP layer

Smart client (L7) (rarely used): Trusted client is exposed to the server architecture, clients randomly select a server that contains the services they wish to use.

Load Balancing Techniques:

Round robin: Only works for homogenous workload and platform. Can have weighted round robin for heterogenous platform.

Least number of connections: Monitor how many active requests per second send request to server with fewest connections.

Response time: Monitor how long it takes for a server to respond to a request pick server with the lowest response time.

Source IP hash: Remember which client is connected to which server by hashing the client IP, and the source port number.

SDN Based

Chained failover: Use server to max, then use more servers as necessary.

Server Designs

Single Process

Use a single process to handle all requests. Wait on a socket, receive and read a request from a socket, execute the request, return the result from the request. Even if there are cores on a machine, it will only use one core. Even the single core is not being used well, sitting idle for 50% of the time on I/O operations.

Caution

Blocking networking and disk operations.

Hard to program with low throughput and high latency.

Multi-Threaded

Make a multi-threaded application thread/request model.

When a request is received, create a new thread to process the new request, allow the thread to process the request, send the response back to the client, and finally kill that thread.

flowchart LR
z[network]
x[network]
a[Request 1]
b[Request 2]
c[Request 3]
d[Request 4]
e[Request N]

z --> dispatcher
dispatcher --dispatch--> a
dispatcher --dispatch--> b
dispatcher --dispatch--> c
dispatcher --dispatch--> d
dispatcher --dispatch--> e
a -- send result--> x
b -- send result--> x
c -- send result--> x
d -- send result--> x
e -- send result--> x

There is no queueing as requests are immediately placed on a thread for processing.

Maximum throughput when number of threads is equal to the number of cores on the machine. When threads exceed the cores available, the throughput will decrease; latency will eventually become exponential as no work is being done on requests.

Medium programmability with low throughput and high latency when under heavy load.

Multi-Process

Same architecture as a multi-thread design, except you fork a process for every new request.

Heavy overhead (more than threading) but offers greater fault and security isolation since memory is not shared. Under a heavy load it has low throughput, high latency, and is unfair to short requests. Can combine multi-process with multithreading to balance performance and isolation.

Thread/Process Pool

Create and maintain a limited number of threads/processes to reduce overhead. Requests are continuously fetched from the queue, read and processed, and the result returned to the client. Fix the number of threads at the peak.

Programmability is medium with high config, but throughput is high and latency is medium. Difficult to find the ideal configuration of the number of threads. If service is CPU bound, then the number of threads = the number of CPU cores. The bottleneck may not be CPU, it may be I/O or disk, which means the peak is less than CPU cores. Sweet spot depends on the ratio of CPU to I/O.

Single Event Based Processing

Idea: I/O is the bottleneck, a single process is good enough for the compute part if the process does not block.

Have a primitive such as select() or poll()

Non-Blocking I/O

Divide processing such that the process running on the core never blocks on network I/O.

Server structure

while(True) {
	events_list = Select();
	Proecss(events);
}

Use Key.attachment to maintain state between events.

The server is only doing CPU stuff with the selector maintaining events in the buffer tasks are split between CPU and I/O. Isolated the waiting on the selector and the processing on the server selector performs the I/O so server receives only ready data.

If selector only has part of a request in the buffer, when it passes the packets to the CPU, they must be buffered until all of the request arrives. Create a state machine that must be maintained for each client.

This is very hard to program, with high throughput and low latency.

Debugging is incredibly difficult for thousands of state machines for thousands of clients all with various states.

Asymmetric Multi-Process Event Driven (AMPED)

AMPED is a single process-event based design with multiple helper threads for disk I/O.

This enables us to completely separate compute from I/O. Very hard to program, but has high throughput and low latency.

Staged Event Driven Architecture (SEDA)

SEDA breaks the processing of the request into the CPU and I/O parts.

Well-Conditioned: Performance does not degrade with higher workload. Architect the application into stages that communicate only through queues.

flowchart LR
a[Accept Condition]
b[Read Packet]
c[Parse Packet]
d[Check Cache]
e[Handle Miss]
f[Send Response]
g[File I/O]
h[Write Packet]

subgraph SocketListen
	a
end
subgraph SocketRead
	b
end
a --> c
b --> c
subgraph HttpParse
	c
end
c --> d
subgraph PageCache
d
end
d --> e
d --> f
subgraph CacheMiss
e
end

subgraph HttpSend
f
end
e --> g
subgraph FileI/O
g
end
f --> h
subgraph SocketWrite
h
end

Each stage follows a thread pool design. Different controllers control number of threads, control batch size.

Looks into the application and divides it into stages based on which resources are being used. Role of a stage is to access exactly one resource pipeline of stages. Input to a stage is a queue that may output to multiple queues, only method of communication between stages.

Stages run handler to process task using a thread pool.

Benefits: Modular, easy to test and debug, automated performance tuning, fine granularity of control (per stage).

Good design since it abstracts complex generic parts letting programmer focus on application-specific parts.

ThroughputLatencyProgramming
Single Proclowbadeasy
Multiproclowbadmedium
Multithreadlowbadmedium
Pool proc/threadhighmediummedium
Eventhighgoodhard
SEDAhighgoodmedium

Lecture 11

Design requirements for distributed systems:

  • Performance
  • Dependability
  • Security

Performance metrics

  • Response time
  • Throughput
  • System utilization

Dependability:

  • Reliability: A system can run continuously without failure
  • Availability: The probability that the system is operational at a given time .

If a system crashes for 1 ms every hour, the system is not reliable, but it is available.

Availability is measured via uptime.

MTBF: Time between failures

MTTR: Repair time

MTTR MTTDetection MTT diagnose MTTFix

Failure Models

  • Fail-stop/Fail-restart: The components stop working.
  • Byzantine failure: Components are running but return corrupted data. Not implemented by default.
  • Limping: The component is very slow.

Hardware Failure Rates

  • Early Infant Mortality Failure: Servers fail young, but stabilize.
  • Wear Out Failures: Doesn’t fail new, but breaks down over time.
  • Bathtub Curve: Infant mortality, stabilizes, wears out.

Don’t replace clusters all at once to avoid early infant morality failure. Don’t buy all machines from one vendor.

Multiple ways to reduce uptime. Can increase MTBF, or we can reduce MTTR. If MTTR 0, then uptime is 100%.

Question

Which one is the better investment (increase MTBF or decrease MTTR)?

MTTR.

Design for fault tolerance:

  • Decompose system into modules (limit shared state, OOP principles)
  • Each module fail-fast (speeds up detection)
  • Make module failure noticeable quickly
  • Reduce configuration
  • Redundancy in hardware, software, and data
  • Use sessions or a transaction mechanism

Naming

Name services: Entries of the form <name, attributes>, where attributes are typically network addresses. Type of lookup queries: name attribute values.

Directory services: <name, attribute> entries. Type of lookup queries: name attribute values. Attribute values names.

Scalability: Naming directories tend to grow very fast

Consistency: Short and mid-term inconsistencies are tolerable. In the long term, system should converge towards a consistent state.

Flat names (i.e. MAC address) and hierarchical names (i.e. UNIX file system).

Domain Name System (DNS)

Performs name-to-IP mapping

Distributed database, implemented in hierarchy of many name servers.

Question

Why not centralize DNS?

Single point of failure, traffic volume, doesn’t scale.

How is it distributed? Global layer stores TLD, then keeps narrowing in scope as we go down.

Iterative vs Recursive (covered in CS456).

Communication cost comparison of iterative vs recursive depends on the distance from the client to the server, and from the servers to each other.

Server-controlled Hybrid Navigation

Non-recursive, server controlled: Server contacts peers if it cannot resolve name itself.

Recursive, server-controlled: If name cannot be resolved, server contacts superior responsible for a larger prefix of the name space. Recursively applied until name is resolved.

Makes sense to do iterative on the top levels, and at the lower levels we do recursive. This reduces load on servers (though long range communication for a few).

Lecture 12

Name & Directory Services

X.500

  • ITU - standardized directory service

LDAP

  • Directory service
  • Lightweight implementation of X.500
  • Often used in intranets

Synchronization

Synchronization problem: How processes cooperate and synchronize with one another in a distributed system. In single CPU systems, critical regions, mutual exclusion, and other synchronization problems are solved using methods such as semaphores. These don’t work in distributed systems because they rely on the existence of shared memory.

We will review clocks, election, mutual exclusion.

Clock synchronization

In a centralized system: Time is unambiguous. A process gets the time by issuing a system call to the kernel. If process gets the time and later process gets the time, the value gets is higher than (or possibly equal to) the value got.

Example: UNIX make examines the times at which all the source and object files were last modified.

  • If time (input.c) > time(input.o) then recompile input.c.
  • If Time (input.c) < time(input.o) then no compilation is needed.

In a distributed system: Achieving agreement on time is not trivial.

Computer on which compiler runes and computer on which editor runs may be different. The times are according to the local clocks and may not be synced.

Question

Is it possible to synchronize all the clocks in a distributed system?

A computer has a timer, not really a clock. Timer is precisely machined quartz crystal oscillating at a frequency that depends on how the crystal was cut and the amount of tension.

Two registers are associated with the crystal: counter (track this) and holding register (hard-coded). Oscillations decrement the counter by one. When the counter hits 0, generate a clock tick (interrupt) to the OS, and set the counter back to 60. OS has a translation from clock time to real time.

On each of computers, the crystals will run at slightly different frequencies, causing the software clocks gradually to get out of sync (clock skew). We try to modify the register value to account for the impurities in quartz, but this is still discrete so skew is inevitable.

Each machine timer causes an interrupt times a second. The interrupt adds 1 to the software clock. When UTC is , the value of the clock on machine is . In a perfect world, for all and all .

In practice, timers do not interrupt exactly times a second. The relative error obtainable with modern chips is .

Timer is said to be working within its specification if:

where constant is the maximum drift rate.

If after synchronization, 2 clocks are drifting from UTC in the opposite direction, they may be as much as at time .

For clocks not to differ by more than , they must be resynchronized every seconds.

Cristian’s Algorithm

One time server (WWV receiver); all other machines stay synchronized with the time server. Changes are introduced gradually by adding more or less seconds for each interrupt. The propagation time is included in the change.

This is based on the assumption that the time is exactly in the middle, and the path taken is the same taken in both directions.

Network Time Protocol (NTP)

Offset between and = . Gradually adjust clock to minimize the offset.

This is assuming .

It is not safe to set a clock behind that was once ahead. Can result in negative runtimes which will break processes. Instead we slow the clock down (hardware clock ticks at same rate, but how conversion rate changes). Also not safe to set a clock ahead that was once behind. Same procedure to increase rate of clock.

With the invention of the atomic clock in 1948, it becomes possible to measure the time more accurately. Labs around the world have atomic clocks and each of them periodically tells the BIH in Paris how many times its clock ticked.

The BIH averages these to produce the TAI. Originally the atomic time is computed to make the atomic second qual to the mean solar second.

Since the mean solar day gets longer, the BIH made a necessary correction called UTC (universal coordinated time).

Berkeley Algorithm (for coordinating clocks internally, not caring about external times)

Time time server (a time daemon) is active. Polls every machine periodically to ask what time is there. Based on the answers, computes an average time. Tells all other machines to advance their clocks to the new time or slow their clocks down until some specified reduction has been achieved.

The time daemon’s time is set manually by operator periodically.

Averaging Algorithm

Each machine broadcasts the current time according to its own clock. After a machine broadcasts its time, it starts a timer to collect all other broadcasts that arrive during some interval . When all broadcasts arrive, an algorithm is run to compute the new time from them.

Simplest algorithm: Average the values from all other machines.

Variations: Discard highest and lowest value and average the rest; add to each message na estimate of the propagation time from source.

For algorithms where clocks must not only be the same, but also must not deviate from real-time, we speak of physical clocks.

For algorithms where only internal consistency of clocks matters, we speak of logical clocks.

Logical Clock Synchronization

Happens-before relation:

If and are events in the same process, and happens-before , then is true.

If is the event of a message being sent by one process, and is the event of the same message being received by another process, then is also true.

If two events, and , happen in different processes that do not exchange messages, then is not true, but neither is . These events are said to be concurrent ().

Happens-before is transitive: and .

Lamport’s Algorithm

Capturing happens-before relation.

Each process has a local monotonically increasing counter, called its logical clock .

Each event that occurs at process is assigned a Lamport timestamp .

Rules:

  • is incremented before event is issued at
  • When sends message , it adds [this is event send(m)].
  • On receiving computes ; timestamp event receive(m).

Lecture 13

Lamport Timestamps

Two data centres, one on east coast other on west coast. $1,000 in account.

Depositing $100 into east coast, and replicating on west coast.

Interest of 10% applied on west coast, replicate on east coast.

On east coast, $100 deposit may be registered first, and then interest is computed ($1,210).

On west coast, interest may be registered first, and then $100 deposit is added ($1,200).

We want to define an order such that all replicas follow that same order.

Total-ordered multicast

To send a message, sender adds local logical time to the message. Broadcasts the message, waits for ACKs from all nodes.

For nodes, upon receiving a message, add message to queue order by time stamp in the message. Update the local logical time. Broadcast an ACK to all nodes. If the message at the head of the queue receives ACKs from all nodes, deliver the message to the application.

Assumes the network is reliable (no packet loss, no packet reordering, nodes do not fail).

We buffer in since 5 isn’t in our queue yet (since we receive the ACK before the actual message). Once we receive 5 () from , we can instantly execute in .

To deal with the problem of multiple servers having the same logical clock time, we concatenate the IP to the logical clock to create a floating point number.

Question

In the concatenation, does the IP or logical clock come first?

Logical clock comes first. If IP was first, some servers with higher IP addresses will always be higher than others (and thus later).

Clients don’t participate in total-ordered multicast, servers do.

Vector clocks are constructed by letting each process maintain a vector with the following two properties.

  1. is the number of events that have occurred so far at . In other words is the local logical clock at process .
  2. If then knows that events have occurred at . It is this ‘s knowledge of the local time at .

Steps carried out accomplish property 2.

  1. Before executing an event executes .
  2. When process sends a message to , it sets ‘s (vector) timestamp () equal to after having executed the previous step.
  3. Upon the receipt of a message , process adjusts its own vector by setting for each , after which it executes the first step and delivers the message to the application.

Causal-ordered multicast

To send a message, the sender increments the vector clock only on message send events. Adds the local vector clock to the message, broadcasts the message.

For nodes, upon receiving a message from node :

  • On node buffer the message until:
    • (logical clock in the receive message = what you () think the senders logical clock is + 1)
    • for all

Update the local clock on for the sender only .

Difference between this and total ordering multicast.