Professor: Samer Al-Kiswany | Term: Winter 2026
Lecture 1
Definition
A distributed system is a collection of independent computers that appear to the users of the system as a single computer.
Distributed software systems should provide easy-to-use abstractions, hide complexities, handle failures, efficiently use resources, and leverage emerging technologies.
Example
- A network of workstations allocated to users
- A pool of processors in the machine room allocated dynamically
- A single file system (all users access files with the same path name)
- User command executed in the best place (user workstation, a workstation belonging to someone else, or on an unassigned processor in the machine room)
Question
Why distributed?
- Economics: Microprocessors offer a better price/performance than mainframes
- Speed: A distributed system may have more total computing power than a mainframe
- Inherent distribution: Some applications involve spatially separated machines
- Reliability: If one machine crashes, the system as a whole can still survive
- Incremental Growth: Computing power can be added in small increments
Primary features
- Multiple computers
- Concurrent execution
- Independent operation and failures
- Communications
- Ability to communicate
- No tight synchronization (no global clock)
- “Virtual” computer
- Transparency
A bunch of computers that work together, but appear to be as one.
Challenges
- Heterogeneity
Distributed systems require communication and coordination across a variety of:
- Networks
- Hardware
- OS
- Programming Languages
- Implementations
- Scalability
Question
Can the system behave properly if the number of users and components increases?
- Scale-up: Increase the number of “users” while keeping the system performance unaffected
- Speed-up: Improvement in the system performance as system resources are increased
Tradeoffs need to be made (i.e availability and consistency CS451)
Impediments:
- Centralized data
- A single file
- Centralized services
- A single server
- Centralized algorithms
- Algorithms that “know it all”
Scalability is not black and white, there are shades of scalability. Some things that cannot be done at scale.
- Failure handling
- Partial failures
- Can non-failed components continue operation?
- Can the failed components easily recover?
- Failure detection
- Failure masking
- Failure tolerance
- Recovery
- An important technique is replication. This is a hard problem.
- Concurrency
Multiple clients sharing a resource how do you maintain integrity of the resource and proper operation (without interference) of these clients?
Managing failures in concurrent systems is difficult.
- Transparency
Transparency is not easy to maintain. There are many types of transparency:
| Transparency | Description |
|---|---|
| Access | Local and remove resources are accessed using identical operations |
| Location | Hide where an object is located |
| Relocation | Hide that an object may be moved to another location while in use |
| Migration | Hide that an object may move to another location |
| Replication | Hide that an object is replicated |
| Concurrency | Hide that an object may be shared by sevreal independent users |
| Failure | Hide the failure and recovery of an object |
Aiming at full distribution transparency may be too much. There are communication latencies that cannot be hidden.
Completely hiding failures of networks and nodes is impossible. You cannot distinguish a slow computer from a failing one. You can never be sure a server performed an operation before a crash.
Full transparency will cost performance. Keeping replicas up-to-date with the master takes time.
- Security
Confidentiality: Information is disclosed only to authorized parties.
Integrity: Ensure that alterations to assets of a system can be made only in an authorized way.
Authentication: How do we verify the correctness of a claimed identity?
Authorization: Does an identified entity have proper access rights?
Developing distributed systems: Pitfalls
Many distributed systems are needlessly complex, caused by mistakes that required patching later on. Many false assumptions are made.
False assumptions include:
- The network is reliable
- The network is secure
- The topology does not change
- Bandwidth is infinite
- Heartbeats are a sufficient failure detector - Partial network failures. Your view of a cluster is not the holistic view. https://uwaterloo.ca/computer-science/news/cheriton-computer-scientists-create-nifty-solution-to-catastrophic-network-fault.
Lecture 2
I skipped this lecture, it was a CS456 crash course.
Lecture 3
Communication Middleware
A distributed system organized as middleware.
The middleware layer extends over multiple machines.
Communication Alternatives:
Raw message passing
- TCP/IP: Reliable byte stream
- UDP/IP: Unreliable, packet oriented, connectionless
Problem: Low level not easy to use.
OS Abstractions: Distributed Shared Memory
On a page fault, get the remote page.
- Pros: Easy to program
- Cons: High overhead, hard to handle failures
Remove Procedure Call RPC (Xerox RPC)
Main idea: Call remote procedures as if they are local ones. The key advantage is that it is general, easy to use, and efficient.
Uses the well-known procedure call semantics.
The caller makes a procedure call and then waits. If it is a local procedure call, then it is handled normally. If it is a remote procedure, then it is handled as a remote procedure call.
Caller semantics is blocked send (send the request, stop, wait for the result, continue after it arrives). Callee semantics is blocked receive to get the parameters and a non-blocked send at the end to transmit results.
RPC architecture
flowchart TD
a[Caller Procedure]
b[Caller Stub]
c[Comm]
d[Comm]
e[Callee stub]
f[Callee procedure]
subgraph Caller
a --> b
b --> c
c --> b
b --> a
end
c -- Call packet(s) ---> d
subgraph Callee
d --> e
e --> f
f --> e
e --> d
end
d -- Result packet(s) ---> c
Interface{
// has enough information for compile time
// checking, and generating proper calling sequence
int func1(int arg)
int func2(char * buffer, int size)
}// Client stub for func1
int func1(int arg) {
// Create req
// Pack fid, arg, etc.
// Send req
// Wait for reply
// Unpack results
// Return result
}// Server stub for func1
int func1 {
// Unpack request
// Find the requested server function
// Call the function with arg in the request
// Pack results
// Send results
}Question
How about for
func2?
Problems
- * buffer points into different address space (solution: copy/restore if needed)
- Can be input or output (solution: add an argument type (in | out | both)
- Complex data structures
Question
How does the client know where to send the RPC request?
Binding
Server side:
- Load the function in memory
- Populate export table (
fid,fname,call_backfunction). Each function is assigned a unique id (fid) every time the function is exported - Export interface
In a register, the server ip and function names supported is stored.
Client Side:
The client queries the registry to see who provides the function they want to call (fname).
The registry will respond with the appropriate server ip.
The client will then query the sever to get the binding info for the function.
The server will respond with the fname, fid, and the client stores this info in memory.
At runtime, the client already knows the server address and function id. The client sends the function id and arguments to the server. The server looks up the fid in the export table, runs the function, and sends back the result.
Runtime choices:
TCP/IP
- High overhead on server, many connections and state
- High latency (setting/closing connections)
UDP/IP
- Unreliable
Fault Tolerance
Lost Request
When the function is called, client sends the fid and arguments to the server. Request was lost in the network on transit.
The client waits, times out, and the client retries. Request hits the server and returns the result back to the client. Retrying was safe because the function ran once.
Lost Reply
Client sends the fid and arguments to the server. Server receives the payload, runs the function, and sends the result back to the client. The reply back to the client is lost.
Client times out retries sending the function id and arguments to the server. The server runs the function and returns the result back to the client.
Problem
The function was executed twice.
Call Semantics
At least once: the function is executed at least once.
At most once: the function is executed once or not at all.
Neither one is perfect: exactly-once is desired, but has high overhead.
Xerox RPC implements, at most once semantic: If success is returned exactly once: it is guaranteed that it was executed exactly once. On failure, then it was called at once.
A client issues one RPC at a time.
Xerox at most once semantics:
Key idea: Every RPC request is uniquely identifiable.
The client sends the client ip, process id, sequence number (strictly increasing number), fid, arguments. This tuple uniquely identifies this call.
Server receives this, checks export table, check previous calls table in memory, runs function, returns result.
The previous call table checks if it has seen this (client_ip, process_id, seq_num) tuple before.
We return the client ip, process id, sequence number and result.
Fault Tolerance: Lost Reply
Client calls function, sends client ip, process id, sequence number, fid, arguments.
Server runs the function, lost in transit on the way back to the client.
The client times out and sends the request again (same client ip, process id, sequence number, fid, arguments). The server checks the previous_calls table and notices that this is a duplicate. It returns an ACK.
Fault Tolerance: Server Crash
Client calls function, sends client ip, process id, sequence number, fid, arguments. Server runs function, updates previous calls table. Then a reboot happens, server does not send anything back.
Client times out and sends the exact same request again. The server checks the previous calls, but there’s nothing in it, so it runs the function again and sends the result back to the client.
Solution to this is to rebind on server restart with new fid.
Client calls function, sends client ip, process id, sequence number, fid, arguments. Server runs function (export table has fid = 5). Then reboot hits. Rebind functions with a different fid. So now the same function has a fid = 11 in the export table.
The client times out and sends the same request as before (with fid = 5). The fid doesn’t match, so the server responds with an error that there is no such function id.
Upon receiving this error, the client should query the registry again for the new bindings.
Fault Tolerance: Client failure with repeated sequence numbers
Client calls function, sends client ip, process id, sequence number (10), fid, arguments. Server checks previous call table, populates it with the client ip, process id and sequence number. It runs the function, and returns the result.
Reboot after the client receives this.
We then have a later call of a different function, client sends client ip, process id, sequence number (1), fid, arguments. Server checks previous calls table, notices it is a duplicate (since sequence numbers are meant to be strictly increasing), and returns ack (without executing the function!). The server doesn’t know if the client rebooted and started fresh or if it is a delayed duplicate packet.
The solution is to add clock-based conversation id, to every call. <conversation_id, seq #> is strictly increasing.
Lecture 4
File system is the Operating Systems interface to storage.
Virtual file system is an abstraction layer in the Unix kernel that acts as an interface for the file system. Provides an API for the file system.
Specific file systems are assigned subtrees.
Application wants to read some data. Send call to VFS, VFS gets data from the right local file system (based on path).
Application wants to write some data. Send call to VFS, writes to file system, file system returns with success, though data isn’t written to disk.
fsyncis a blocking call that writes the data disk.
We don’t write to disk every time for performance reasons, must invoke fsync.
Hash table and Data table (Hash table stores pointer to the data in Data table).
If we want to update a key, we perform:
write(ptr) then write(data). This is risky if we crash.
So we can do
write(data)
write(ptr)
fsync(data)
fsync(ptr)
This is still not the right way to do it. There is no guarantee of ordering.
Correct ordering is
write(data)
fsync(data)
write(ptr)
fsync(ptr)
No guarantee of ordering for performance reasons. i.e. head is over a location on disk but the write operation for that location is not until later. We can reorder the operations to optimize speed.
Distributed file system emulates non-distributed file system behaviour on a physically distributed set of files, usually within an intranet.
We require transparency, concurrent access, file replication, and security.
Sun Network File System
On client computer, virtual file system goes remote to NFS client, NFS client uses NFS protocol to contact NFS server on server computer (uses RPC). NFS server sits in the UNIX kernel (not necessarily, just optimization), sends request to VFS, VFS gets data from UNIX file system.
flowchart TD
a[VFS]
b[NFS client]
c[NFS server]
d[VFS]
e[UNIX file system]
f[Application Program]
g[Unix file system]
subgraph Client computer
f -- UNIX system calls --> a
subgraph UNIX kernel
a --> b
a --> g
end
g --> ida[(Disk)]
end
b -- NFS Protocol ---> c
subgraph Server Computer
subgraph UNIX kernel
c --> d
d --> e
end
e --> id[(Disk)]
end
NFS V.3 Design Principles
- Access transparency: No distinction between local and remote files.
- Stateless design: Server does not maintain any state about clients. Easier implementation and easier to handle crashes. File operations are idempotent simplifies fault tolerance and implementation.
All NFS operations take a file handle.
Idempotent Example
Client wants to write to a file, performs:
fd = open(path)
write(fd, buf, size)
Server will write to file, send ack
Question
What if ack is lost?
Client will timeout, will run the same write again and send to server write(fd, buf, size)
Server will write again, send ack.
Error
This is not idempotent.
We include an offset in write(fd, buf, size, offset). Now this is idempotent.
Sketchy idempotence
Client wants to unlink file1.
Server unlinks the file, returns an ack. Ack is lost in transport.
Client times out and sends unlink file1 back to server.
Server is going to return an error since the file doesn’t exist anymore.
We account for this in NFS client side design. If we delete a file and it returns an error, then we have successfully deleted it.
File handler uniquely identifies file on the server side.
flowchart TD
a[VFS]
b[NFS server]
c[FS1]
d[FS2]
a --> b
a --> c[(FS1)]
a --> d[(FS2)]
We use i-node to uniquely identify files and to determine which file system has our files.
Warning
Client 1 writes to file with an inode 10. Server performs this action.
Client 2 unlinks file with inode 10 (delete). Client 2 creates a new file an i-node is generated to be assigned to the file. i-nodes are reused, this new file can be assigned i-node 10.
Client 1 writes to inode 10 (thinking it is the own file), but it is corrupting the new file.
To solve this we store an additional parameter, the i-node generation number. Store the i-node.i-node generation number in the function call.
It would first be i-node.0 from Client 1, and when Client 2 creates the new file and the i-node is the same, it would be i-node.1. When Client 1 tries to write to i-node.0, the server will respond with an error, the file doesn’t exist anymore. This solution is necessary since there exist a limited amount of i-node values.
To get the file handle of the root directory on a remote server, we use mount.
Client requests for mount path, server checks if path is exported, checks user permissions, returns file handle of exported root directory.
Lookup:
NFS does lookup iteratively one step at a time .
Client sends lookup for (people_fh, "bob")
Server responds with bob_fh
Client sends lookup for (bob_fh, "hw")
Server responds with hw_fh
Client sends lookup for (hw_fh, "a3")
Server responds with a3_fh
This is slow. Recursive would be faster but it is not implemented for the following reason.
Nested Mounting
Server A imports a directory from Server B
Client imports directory from Server A. It wants that directory from Server B.
Client sends request to Server A for the fh that is from Server B. Server A realizes it is from Server B, requests it, receives it, and sends it to Client.
This is dangerous. Server B may not want the client to access the directory, but when Server A is requesting the directory on behalf of the Client, Server B does not know this. The client should request to Server B itself.
The client needs to explicitly import subdirectory from server B.
To support all of this we need to do iterative lookup (recursive won’t work). We can still optimize by chopping into segments.
Read operation: Client sends read request for file handle. Server performs read and returns data blocks.
Write operation: Client writes to file handle. Server does persistent write back ack to client. Write through caching.
NFS requests transmitted via Remote Procedure Calls (RPCs). Client sends authentication information, checked against access permission in file attributes.
Any client may address RPC requests to server providing another client’s identification information. Introduction of security mechanisms in NFS v4.
Semantics of File Sharing
On a single processor, when a read follows a write, the value returned by the read is the value just written. In a distributed system with caching, obsolete values may be returned.
| Method | Comment |
|---|---|
| UNIX semantics | Each operation on a file is instantly visible to all processes |
| Session semantics (NFS) | No changes are visible to other processes until the file is closed |
| Immutable files | No updates are possible |
| Transaction | All changes occur atomically |
Lecture 5
WatDFS Project
Create a client module that acts as local file system, hook it to VFS, and have it issue commands to the remote server.
The client module does not need to be in the kernel space. We use FUSE. FUSE helps build a file system without writing kernel code.
Hook FUSE Kernel module to client module via libfuse. FUSE Kernel module will take system calls, send the calls to the user space client module, which the user will ship to the module. Server module executes the operations, computes result, and sends it back to the client module.
Requests from the client to the server are via a custom RPC library.
Components for each call
libfusehandler (handle from FUSE kernel module in kernel space to user space)- RPC calls
- RPC listener on the server in user space
- Request handler on the server in user space
flowchart TD
f[Applications]
g[Virtual File System]
h[Fuse Kernel Module]
i[libfuse]
a[libfuse handler]
b[RPC calls]
c[RPC listener]
d[Request handler]
e[Virtual File System]
subgraph client
subgraph user space
f
i
i --> a
a --> b
end
f --> g
h --> i
subgraph kernel
g
g --> h
end
end
b --> c
subgraph server
subgraph user space
c --> d
end
d--> e
subgraph kernel
e
end
end
Implementation walkthrough of getting file attributes (for request handler)
int watdf_cli_getattr(void *userdata, const char *path, struct stat *statbuf);
int stat(const char *pathname, struct stat *statbuf);Part 2: Setting up a RPC Call
// GET FILE ATTRIBUTES (need to apply this for all system calls)
int watdf_cli_getattr(void *userdata, const char *path, struct stat *statbuf) {
// Set up the RPC call
int ARG_COUNT = 3;
// Allocate space for the output arugments
void **args = new void*[ARG_COUNT];
// Allocate the space for arg types
int arg_types[ARG_COUNT + 1];
int pahtlen = strlen(path) + 1 // +1 for null terminator
// Fill in the arugments
// The first argument is the path
arg_types[0] = (1 << ARG_INPUT) | (1 << ARG_ARRAY) |
(ARG_CHAR << 16u) | pathlen;
// The first 3 bits of the first byte tell us whether the argument is an input, needs to be returned as an output, and whether the argument is an array, respectively. The last 2 bytes are the length of the array.
args[0] = (void *)path;
// The second argiment is the stat structure
arg_types[1] = (1 << ARG_OUTPUT) | (1 << ARG_ARRAY) |
(ARG_CHAR << 16) | sizeof(struct stat); // statbuf
// The third argument is the return code
// TODO: Fill in this argument type and argument
// Finally, the last position of the arg type is 0
arg_types[3] = 0;
// Making the RPC call
int rpc_ret = rpcCall((char *)"getattr", arg_types, args);
// Handle the return
}If the size is larger than the limit (), then we have to write in multiple RPC calls.
Part 3: WatDFS Server: Setup RPC call
int main(int argc, char *argv[]) {
// Set up arguments
int argTypes[4];
// First is the path
argTypes[0] = (1 << ARG_INPUT) | (1 << ARG_ARRAY) |
(ARG_CHAR << 16) | 1;
argTypes[1] = (1 << ARG_OUTPUT) | (1 << ARG_ARRAY) |
(ARG_CHAR << 16) | 1;
// Retcode
argTypes[2] = (1 << ARG_OUTPUT) | (ARG_INT << 16);
// Fill in the null terminator
argTypes[3] = 0;
// Register funciton
ret = rpcRegister((char*)"getattr", argTypes, watdfs_getattr);
// hand over control to RPC library (rpcExecute)
}Part 4: WatDFS Server: Define RPC function handler
int watdfs_getattr(int *argTypes, void **args) {
// Get the arguments
// The first argument is the path
char *short_path = (char*) args[0];
// The second arugment is the stat structure
struct stat *statbuf = (struct stat*) args[1];
// The third argument is the return code
// which will be 0, or -errno
int *ret = (int *)args[2];
// Convert path name
char *full_path = get_full_path(short_parth);
// Initially we set the return code to be 0
*ret = 0;
// Make the stat system call
// Remember to set *ret to -errno if there is an error
// The RPC call should always succeed, so return 0
return 0;
}Need to repeat this 8 more times for Project 1.
Example
Path manipulation
Home is on client, rest is on server (mounted).
stat('/mnt/home/Bob/F.dat', SB)
watDFS_cli_getattrib('/Bob/F.dat', SB) (this is the path given).
full_path() takes the above path and converts it into the full path.
Caution
Most errors happen in passing the arguments between the client and server. Print arguments and argument types before calls.
Lecture 6
Optimizations
- Client-side cache: File pages, directories, and file attributes.
- Read-ahead: Read the next page in the file. Once the file is open for read, NFS client will not wait for a read call, it will start reading.
- Write-delay (behind): Cache the writes (client or server side), until the page is flushed from the cache.
- Commit protocol: Server delays writing to disk until client commit(s).
- Client sends a write to the server. VFS on server will store the write in memory, send ACK back. Client sends another write, adds to cache, server ACKs back. Client commits, writes from cache get flushed to disk on server, ACKs back.
Question
Why is this okay for app developers?
Similar protocol for writing locally. Must use fsync to commit.
Caching in server and client is indispensable to improve performance.
Server caching
- Disk caching as in non-network filed systems
- Write-through caching
- Store updated data in cache and written on disk before sending reply to client. Inefficient if writes are frequent
- Commit operation
- Stored only in cache memory
Client caching write operations. Mark modified cache page as ‘dirty’ and schedule page to be flushed to server. Flush happens when closing of file, or when sync is used.
The data cached in client may not be identical to the same data stored on the server.
Timestamp-based scheme used in polling server about freshness of a data object.
- : Time cache entry was last validated
- : Time when block was last modified at the server as recorded by client/server
- : Freshness interval
Freshness condition: at time
If , then entry is presumed to be valid. If not , then needs to be obtained by a getattr call. If , then entry presumed valid; update its to current time. Else obtain data from server and update .
Lookup: Instead of iterative lookup NFS v4 performs full path lookup. Support for callback features.
flowchart TD
a[Old file]
b[Local copy]
c[Updated file]
subgraph Client
b
end
subgraph Server
direction TB
a
c
end
a --Server delegates file--> b
b --Client sends return file--> c
Client --Client asks for file --> a
Server --Server recalls delegation--> b
Andrew File System (AFS)
Infrequently updated shared file and local user files will remain valid for long periods of time. Allocate large local disk cache. Key assumptions usually small files, reads are more common than writes, sequential access, most files are used by only one user, burstiness of file references.
Design decisions:
- Whole-file serving: Entire contents of directories and files transferred from server to client (there is no recall delegation)
- Whole-file caching: When file is transferred to client, it will be stored on that client’s local disk.
Implementation of System Calls
When a file is opened, check if we have a cached copy of the file. Check if there is a valid callback promise. We don’t need to talk to the server. If we don’t have the file, or there is no valid callback promise, we need to get a fresh copy of the file from the server (and we get a valid callback promise). Place the copy of the file in the local file system, opened by the UNIX kernel, treated like a local file.
When closing the file, we notify Venus that the file has been closed. If the local copy has been changed, send a copy to the Vice Server that is the custodian of the file. Replace the file contents and send a callback to all other clients holding callback promises on the file.
AFS is a server initiated protocol, keep a list of clients, server will tell clients when there is a change (can only happen after a client closes a file). As opposed to frequent polling in Sun NFS.
Dealing with failures: The client may have missed some callbacks. Venus sends cache validation request to the Vice server (contains the file modification timestamp). If timestamp is current, server sends valid and callback promise is reinstated with valid. If timestamp is not current, server sends cancelled.
Problem: Communication link failures. Callback must be renewed with above protocol before new open if a time has lapsed since file was cached or callback promise was last validated.
Last Write Wins: AFS does not control concurrent updates of files, this is left up to the application.
Back to Communication
- Remote Procedure Call (RPC)
- Remote Method Invocation (RMI) - object-oriented
- Message-Based Communication
If client and server are on different CPU / OS, they may represent memory differently. Need to translate.
Option 1: Device a canonical form for data types and require all senders to convert their internal representation to this form while marshalling. This can be inefficient if both processes use the same formats, there is unnecessary conversation at both ends.
Option 2: Sender uses its own native format and indicates in first byte of message which format is used. Receiver stub examines first byte to see what the client is. Everyone can convert from everyone else’s format.
Client locates the server via dynamic binding.
The client kernel cannot differentiate between a) request to server, receives, executes, crashes, no ACK and b) request to server, receives, crashes, no ACK.
3 schools of thought:
- Wait until the server reboots and try the operation again. At least once semantic
- Give up immediately and report back failure. At most once semantics
- Client gets no help, guarantees nothing. Easy to implement.
Google RPC (gRPC) uses TCP-IP, protobufs for marshaling, at-most-once semantics. Missed deadlines are handled by applications.
flowchart LR
a[Client App]
b[Server Application]
subgraph gRPC_Client_Library
a
end
subgraph gRPC_Server_Library
b
end
gRPC_Client_Library <--The wire--> gRPC_Server_Library
If failure in gRPC client library, then try again. If failure in gRPC server library (before a hit to the server application), then try again.
Set a deadline on any gRPC call made (otherwise if stuck at server, client will be waiting forever).
Lecture 7
Remote Object is capable of receiving RMIs.
Remote Object Reference is needed by an object performing an RFI. Refers to the server of RMI. Can be passed as a parameter or result.
Client machine has a proxy that marshals the request to the server. On the server side, skeleton receives request and invokes same methods at the object. Similar to RPC.
RMI is implemented via a request/reply protocol (blocking).
Remote Interface
public interface HelloInterface extends Remote{
// Throws a RemoteException if the remote invocation fails
public String say() throws RemoteException;
}Remote Object
public class Hello extends UnicastRemoteObject implements HelloInterface {
private String message;
public Hello (String msg) throws RemoteException {
message = msg;
}
public String say() throws RemoteException {
return message;
}
}Server
public class HelloServer{
public static void main (String [] args) {
try {
Naming.rebind ("SHello", new Hello ("Hello world!"));
System.out.println("HelloServer is ready.");
} catch (Exception e) {
System.out.println("HelloServer failed : " + e);
}
}
}Client
public class HelloClient {
public static void main (String [] args) {
try {
HelloInterface hello = (HelloInterface)
Naming.lookup("//server_URL/Shello");
System.out.println(hello.say());
} catch (Exception e) {
System.out.println("HelloClient failed : " + e);
}
}
}At most-once semantics: Invoker receives result, in which case they know the operation was executed exactly once. Or an exception is received informing that no result was received, in which case the operation was either performed once or not at all.
Remote Reference Module sits in the server and maps remote object id to a reference of a local object.
Parameter Passing
If the argument is a primitive type pass by value
Object implementing Remove interface pass by reference
Object implementing Serializable interface pass by copy out
Non serializable object exception is raised
Garbage collection in non-distributed systems:
Solutions:
- Reference counting
- Tracing based solutions (mark and sweep)
Add a visited flag. Start from the core set of objects, follow all references and flag them. We then look at the total list of objects, and garbage collect the ones that are not marked.
The problem with reference counting is maintaining a proper count in the presence of unreliable communication.
We can copy the reference and let the destination increment the counter. However P1 can delete its reference before P2 increments the counter.
We can first increment the counter, get the reference, and then delete the reference after. Problem is that the two processes are coupled.
Solution is reference listing.
The skeleton maintains a list of proxies.
Advantage: Adds and deletes are idempotent.
Drawback: List of proxies can grow large.
Caution
Client process could crash, never clearing a reference from a server.
Use timeouts.
Lease based approach: Skeleton promises to keep info only for limited time. (Created by David Cheriton).
Message-Based Communication
Lower-level interface to provide more flexibility. Two abstract primitives are used to implement these: send and receive.
Message-oriented middleware (MOM)
Producers broadcast to brokers, brokers announce to consumers. Produces can send task to broker, broker will send to consumer.
Synchronous: The sender is blocked until its message is stored in the local buffer at the broker.
Asynchronous: The sender continues immediately after executing a send. Message is stored in the local buffer at sending host.
Transient: Sender puts message on the net and if it cannot be delivered to the sender, it is lost.
Persistent: The message is stored in the communication system as long as it takes to deliver the message to the receiver.
Question
How does this abstraction differ from other communication middleware?
MOM systems enable:
- Space decoupling: Producers and consumers don’t know each other.
- Time decoupling: Producers and consumers asynchronously commnuicate with the MOM service.
- Flexibility: Allow providing reliability and delivery semantics.
- Separation of concerns and extensibility: Clients communicate through a simple API.
MOM Topologies
- Single broker
- Mesh
- Flexible
- Peer-to-Peer
Single Broker
Single node runs the MOM service. Hosts all topics, receives all message, serves all consumers.
Single point of failure. Mitigation is replication.
Limited scalability and performance: Mitigation is partitioned topics.
Ill suited for multiple data centre deployments.
Mesh
Brokers can send to other brokers.
Flexible
Support a combination of single broker and mesh topologies. Can build a graph that fits unique deployments.
Brokerless peer-to-peer
No central brokers. Producers locally run the MOM service.
A discovery service provides consumers with the address of producers of a topics of interest. Consumers need to subscribe to all producers contributing to the topic. There is no coordination between producers.
Lecture 8
Question
What is a data centre?
Costco without the sign.
Houses tens of thousands of servers, organized into racks. Switches are usually placed in the middle of the rack (for wiring purposes).
Many non computer science / computer engineering problems in data centres.
Cooling strategies:
- Cold aisle, hot aisle. Servers are faced to each other. Pump cold air into the front of the servers, hot air gets pushed out to the back.
- Put servers in liquid baths (they do this in MC)
The new M4 building will use servers to heat water, and use the water to heat the building.
Compute, storage, network are three main components of a compute cluster.
Design metrics are performance (requests per second), cost (requests per dollar), power (requests per Watt).
Compute Node Design
Option 1: SMP: Symmetric Multi-processor
Shared memory multiprocessor. Set of CPU each with its own cache, sharing the main memory over a single bus. Very efficient in communication between cores, high performance per node, expensive.
Option 2: Commodity Nodes
Purchase same number of cores with off the shelf components, use switches to connect them. Much lower cost, equal performance to SMP at scale, fails more often.
Execution time CPU time communication time
Assume local access takes 100ns, and remove access takes s.
Communication time # of operations [ ns s ()].
As cluster scales, benefits of local diminishes.
Option 3: Wimpy nodes
Using low end CPU (e.g. ARM processors). These are lower cost and lower energy, but hard to use effectively.
Design disadvantages:
Amdahl’s law bounds:
Task execution is ( is the ratio of code that can run in parallel).
After parallelization on cores: .
If , then second term goes to 0.
Speed-up . From the above, .
Lower utilization with wimpy nodes. Better parallelization, but worse sequential portion since hardware performance is worse. Good if the code is very parallelizable. Higher communication cost since more threads, harder to program, higher networking cost.
Storage Node Design
Option 1: Network attached storage (NAS)
Dedicated storage appliance. This has simpler deployment, control and management, lower network overhead.
Option 2: Distributed storage: Aggregate storage space from nodes in cluster.
Reduce cost by using cheap disks, will fail more but will replicate anyway.
Has lower cost, higher availability, higher performance, higher data locality. But comes with higher network overhead (network sees the data number of times it is replicated), lower component reliability.
Use NAS storage for long-term reliability. Use in-cluster storage for something that needs high performance access.
Network Design
Challenge: Build high speed, scalable network at lower cost.
We don’t want link between TOR and Aggregation to be bottleneck. Needs to be able to support bandwidth of all servers sending their max size at once. Same goes for the core from the aggregations. At the core level, these are very expensive.
Optimization tricks:
- Reduce core bandwidth: 5:1 ratio is common
- Multiple networks (network for storage, control networks)
Data Centre Design Implications
Software using DC needs to be aware of the storage hierarchy. Software fault tolerance is necessary, server crashes happen every 30 minutes.
Lecture 9
Software using DC needs to be aware of the storage hierarchy.
Back of envelope math.
| Data location | Response Time | Throughput |
|---|---|---|
| RAM | 100ns | 20GBps |
| Hard Disk | 10ms | 80MPbs |
| Network-Rack | 70 s | 128 MBps (1Gbps) |
| Network-DC | 500 s | 32 MBps |
Response Time: How long does it take to get one item from the device
Throughput: Volume
| A RAM | A Disk | B Ram | B Disk | C Ram | C Disk | |
|---|---|---|---|---|---|---|
| Time | 1 100ns | 4 10ms | 2 70.1 s | 4 10ms | 3 500.1 s | 4 10ms |
| BW | 1 20 GBps | 3 80 MBps | 2 128 MBps | 3 80 MBps | 4 32 MBps | 4 32 MBps |
Special Cases: Pipeline vs Stop-and-Forward
flowchart LR
A -- 128 MBps --> B
B -- 128 MBps --> C
Question
How long does it take to transfer 1GB of data from ‘s memory to ‘s memory?
Stop-and-forward: waits to receive all data from before sending to .
Pipeline: As soon as gets data from , starts sending to .
Stop and forward, sending 1 GB from to
:
Combining gives .
Pipelined approach
flowchart LR
A -- 128 MBps --> B
B -- 128 MBps --> C
A -. 128 MBps .-> C
Throughput is slower of the two throughputs. Latency is
.
Bottleneck Link
flowchart LR
A -- 128 MBps --> B
B -- 64 MBps --> C
A -. 64 MBps .-> C
.
Concurrent Transfers
Question
How long does it take to transfer 1GB of data from memory to and memories, concurrently?
flowchart LR
Tor -- 128 MBps --> B
Tor -- 128 MBps --> D
A -. 64 MBps .-> Tor
A -- 128 MBps --> Tor
A -. 64 MBps.-> Tor
Tor -. 64 MBps.-> B
Tor -. 64 MBps.-> D
Contention is one input but two outputs. TCP is fair, splits the throughput.
Each one takes MBps.
.
.
Take the max, .
Question
How long does it take to replicate 1 GB of data from memory to and memories, concurrently?
flowchart LR
x[DC Net]
Tor -- 128 MBps --> B
Tor -- 32 MBps --> x
x --> C
A -.32 MBps.-> Tor
A -- 128 MBps --> Tor
A -.96 MBps.-> Tor
Tor -.96 MBps.-> B
Tor -.32 MBps.-> x
x -.32 MBps.-> C
TCP will try and split in half to Tor first, but DC Net can only take 32 MBps, so only gives it 32 MBps. The unused allocates this to link.
.
.
Take the max, .
How long to generate image results page (30 thumbnails)
Design 1: Read serially, thumbnail 256K images on the fly .
Design 2: Issue reads in parallel:
Architecture Models
System architectures define the structure of the system. Identify the components, define the functions, define relationships.
Styles: Organize into logically different components, and subsequently distribute those components over the various machines.
- Layered style (client server systems)
- Object-based style (service-oriented architectures)
System Architectures
- Client-server
- Multitier systems
- Peer-to-peer systems
Multiple-client/single server has problems. Server forms bottleneck, server is a single point of failure, system scaling is difficult.
Multiple clients/multiple servers has multiple approaches.
flowchart TB
a[Client]
b[Client]
c[Server]
d[Server]
e[Server]
subgraph Clients
direction TB
a
b
end
subgraph Service
direction TB
c --> d
d --> c
d --> e
e --> d
end
a --> c
c --> a
b --> e
e --> b
Alternatively, the initial server can invocate other servers.
flowchart LR
a[Client]
b[Client]
c[Server]
d[Server]
a
b
a --> c
c --> a
b --> c
c --> b
c --> d
d --> c
Lecture 10
Thank you to Kaushik for these lecture notes
Service Design
Client-Facing Services Performance Metrics
- Throughput: The number of requests that can be served
- Latency: How long does it take to serve an individual request? A short average latency is good, but tail latency is what’s critical
- Fairness: A single or few requests that are heavy cannot slow down other short requests
- Programmability: Ease of programming, debugging, maintenance
System Model
Load manager balances the load across the logic servers. Approaches
DNS-based: Go to DNS translation and route specific DNS addresses to different servers. Issues: can take hours to adapt and not available to small clusters.
Appliance or switch (L4). Balance at the TCP layer
Smart client (L7) (rarely used): Trusted client is exposed to the server architecture, clients randomly select a server that contains the services they wish to use.
Load Balancing Techniques:
Round robin: Only works for homogenous workload and platform. Can have weighted round robin for heterogenous platform.
Least number of connections: Monitor how many active requests per second send request to server with fewest connections.
Response time: Monitor how long it takes for a server to respond to a request pick server with the lowest response time.
Source IP hash: Remember which client is connected to which server by hashing the client IP, and the source port number.
SDN Based
Chained failover: Use server to max, then use more servers as necessary.
Server Designs
Single Process
Use a single process to handle all requests. Wait on a socket, receive and read a request from a socket, execute the request, return the result from the request. Even if there are cores on a machine, it will only use one core. Even the single core is not being used well, sitting idle for 50% of the time on I/O operations.
Caution
Blocking networking and disk operations.
Hard to program with low throughput and high latency.
Multi-Threaded
Make a multi-threaded application thread/request model.
When a request is received, create a new thread to process the new request, allow the thread to process the request, send the response back to the client, and finally kill that thread.
flowchart LR
z[network]
x[network]
a[Request 1]
b[Request 2]
c[Request 3]
d[Request 4]
e[Request N]
z --> dispatcher
dispatcher --dispatch--> a
dispatcher --dispatch--> b
dispatcher --dispatch--> c
dispatcher --dispatch--> d
dispatcher --dispatch--> e
a -- send result--> x
b -- send result--> x
c -- send result--> x
d -- send result--> x
e -- send result--> x
There is no queueing as requests are immediately placed on a thread for processing.
Maximum throughput when number of threads is equal to the number of cores on the machine. When threads exceed the cores available, the throughput will decrease; latency will eventually become exponential as no work is being done on requests.
Medium programmability with low throughput and high latency when under heavy load.
Multi-Process
Same architecture as a multi-thread design, except you fork a process for every new request.
Heavy overhead (more than threading) but offers greater fault and security isolation since memory is not shared. Under a heavy load it has low throughput, high latency, and is unfair to short requests. Can combine multi-process with multithreading to balance performance and isolation.
Thread/Process Pool
Create and maintain a limited number of threads/processes to reduce overhead. Requests are continuously fetched from the queue, read and processed, and the result returned to the client. Fix the number of threads at the peak.
Programmability is medium with high config, but throughput is high and latency is medium. Difficult to find the ideal configuration of the number of threads. If service is CPU bound, then the number of threads = the number of CPU cores. The bottleneck may not be CPU, it may be I/O or disk, which means the peak is less than CPU cores. Sweet spot depends on the ratio of CPU to I/O.
Single Event Based Processing
Idea: I/O is the bottleneck, a single process is good enough for the compute part if the process does not block.
Have a primitive such as select() or poll()
Non-Blocking I/O
Divide processing such that the process running on the core never blocks on network I/O.
Server structure
while(True) {
events_list = Select();
Proecss(events);
}Use Key.attachment to maintain state between events.
The server is only doing CPU stuff with the selector maintaining events in the buffer tasks are split between CPU and I/O. Isolated the waiting on the selector and the processing on the server selector performs the I/O so server receives only ready data.
If selector only has part of a request in the buffer, when it passes the packets to the CPU, they must be buffered until all of the request arrives. Create a state machine that must be maintained for each client.
This is very hard to program, with high throughput and low latency.
Debugging is incredibly difficult for thousands of state machines for thousands of clients all with various states.
Asymmetric Multi-Process Event Driven (AMPED)
AMPED is a single process-event based design with multiple helper threads for disk I/O.
This enables us to completely separate compute from I/O. Very hard to program, but has high throughput and low latency.
Staged Event Driven Architecture (SEDA)
SEDA breaks the processing of the request into the CPU and I/O parts.
Well-Conditioned: Performance does not degrade with higher workload. Architect the application into stages that communicate only through queues.
flowchart LR
a[Accept Condition]
b[Read Packet]
c[Parse Packet]
d[Check Cache]
e[Handle Miss]
f[Send Response]
g[File I/O]
h[Write Packet]
subgraph SocketListen
a
end
subgraph SocketRead
b
end
a --> c
b --> c
subgraph HttpParse
c
end
c --> d
subgraph PageCache
d
end
d --> e
d --> f
subgraph CacheMiss
e
end
subgraph HttpSend
f
end
e --> g
subgraph FileI/O
g
end
f --> h
subgraph SocketWrite
h
end
Each stage follows a thread pool design. Different controllers control number of threads, control batch size.
Looks into the application and divides it into stages based on which resources are being used. Role of a stage is to access exactly one resource pipeline of stages. Input to a stage is a queue that may output to multiple queues, only method of communication between stages.
Stages run handler to process task using a thread pool.
Benefits: Modular, easy to test and debug, automated performance tuning, fine granularity of control (per stage).
Good design since it abstracts complex generic parts letting programmer focus on application-specific parts.
| Throughput | Latency | Programming | |
|---|---|---|---|
| Single Proc | low | bad | easy |
| Multiproc | low | bad | medium |
| Multithread | low | bad | medium |
| Pool proc/thread | high | medium | medium |
| Event | high | good | hard |
| SEDA | high | good | medium |
Lecture 11
Design requirements for distributed systems:
- Performance
- Dependability
- Security
Performance metrics
- Response time
- Throughput
- System utilization
Dependability:
- Reliability: A system can run continuously without failure
- Availability: The probability that the system is operational at a given time .
If a system crashes for 1 ms every hour, the system is not reliable, but it is available.
Availability is measured via uptime.
MTBF: Time between failures
MTTR: Repair time
MTTR MTTDetection MTT diagnose MTTFix
Failure Models
- Fail-stop/Fail-restart: The components stop working.
- Byzantine failure: Components are running but return corrupted data. Not implemented by default.
- Limping: The component is very slow.
Hardware Failure Rates
- Early Infant Mortality Failure: Servers fail young, but stabilize.
- Wear Out Failures: Doesn’t fail new, but breaks down over time.
- Bathtub Curve: Infant mortality, stabilizes, wears out.
Don’t replace clusters all at once to avoid early infant morality failure. Don’t buy all machines from one vendor.
Multiple ways to reduce uptime. Can increase MTBF, or we can reduce MTTR. If MTTR 0, then uptime is 100%.
Question
Which one is the better investment (increase MTBF or decrease MTTR)?
MTTR.
Design for fault tolerance:
- Decompose system into modules (limit shared state, OOP principles)
- Each module fail-fast (speeds up detection)
- Make module failure noticeable quickly
- Reduce configuration
- Redundancy in hardware, software, and data
- Use sessions or a transaction mechanism
Naming
Name services: Entries of the form <name, attributes>, where attributes are typically network addresses. Type of lookup queries: name attribute values.
Directory services: <name, attribute> entries. Type of lookup queries: name attribute values. Attribute values names.
Scalability: Naming directories tend to grow very fast
Consistency: Short and mid-term inconsistencies are tolerable. In the long term, system should converge towards a consistent state.
Flat names (i.e. MAC address) and hierarchical names (i.e. UNIX file system).
Domain Name System (DNS)
Performs name-to-IP mapping
Distributed database, implemented in hierarchy of many name servers.
Question
Why not centralize DNS?
Single point of failure, traffic volume, doesn’t scale.
How is it distributed? Global layer stores TLD, then keeps narrowing in scope as we go down.
Iterative vs Recursive (covered in CS456).
Communication cost comparison of iterative vs recursive depends on the distance from the client to the server, and from the servers to each other.
Server-controlled Hybrid Navigation
Non-recursive, server controlled: Server contacts peers if it cannot resolve name itself.
Recursive, server-controlled: If name cannot be resolved, server contacts superior responsible for a larger prefix of the name space. Recursively applied until name is resolved.
Makes sense to do iterative on the top levels, and at the lower levels we do recursive. This reduces load on servers (though long range communication for a few).
Lecture 12
Name & Directory Services
X.500
- ITU - standardized directory service
LDAP
- Directory service
- Lightweight implementation of X.500
- Often used in intranets
Synchronization
Synchronization problem: How processes cooperate and synchronize with one another in a distributed system. In single CPU systems, critical regions, mutual exclusion, and other synchronization problems are solved using methods such as semaphores. These don’t work in distributed systems because they rely on the existence of shared memory.
We will review clocks, election, mutual exclusion.
Clock synchronization
In a centralized system: Time is unambiguous. A process gets the time by issuing a system call to the kernel. If process gets the time and later process gets the time, the value gets is higher than (or possibly equal to) the value got.
Example: UNIX make examines the times at which all the source and object files were last modified.
- If time (
input.c) > time(input.o) then recompileinput.c. - If Time (
input.c) < time(input.o) then no compilation is needed.
In a distributed system: Achieving agreement on time is not trivial.
Computer on which compiler runes and computer on which editor runs may be different. The times are according to the local clocks and may not be synced.
Question
Is it possible to synchronize all the clocks in a distributed system?
A computer has a timer, not really a clock. Timer is precisely machined quartz crystal oscillating at a frequency that depends on how the crystal was cut and the amount of tension.
Two registers are associated with the crystal: counter (track this) and holding register (hard-coded). Oscillations decrement the counter by one. When the counter hits 0, generate a clock tick (interrupt) to the OS, and set the counter back to 60. OS has a translation from clock time to real time.
On each of computers, the crystals will run at slightly different frequencies, causing the software clocks gradually to get out of sync (clock skew). We try to modify the register value to account for the impurities in quartz, but this is still discrete so skew is inevitable.
Each machine timer causes an interrupt times a second. The interrupt adds 1 to the software clock. When UTC is , the value of the clock on machine is . In a perfect world, for all and all .
In practice, timers do not interrupt exactly times a second. The relative error obtainable with modern chips is .
Timer is said to be working within its specification if:
where constant is the maximum drift rate.
If after synchronization, 2 clocks are drifting from UTC in the opposite direction, they may be as much as at time .
For clocks not to differ by more than , they must be resynchronized every seconds.
Cristian’s Algorithm
One time server (WWV receiver); all other machines stay synchronized with the time server. Changes are introduced gradually by adding more or less seconds for each interrupt. The propagation time is included in the change.
This is based on the assumption that the time is exactly in the middle, and the path taken is the same taken in both directions.
Network Time Protocol (NTP)
Offset between and = . Gradually adjust clock to minimize the offset.
This is assuming .
It is not safe to set a clock behind that was once ahead. Can result in negative runtimes which will break processes. Instead we slow the clock down (hardware clock ticks at same rate, but how conversion rate changes). Also not safe to set a clock ahead that was once behind. Same procedure to increase rate of clock.
With the invention of the atomic clock in 1948, it becomes possible to measure the time more accurately. Labs around the world have atomic clocks and each of them periodically tells the BIH in Paris how many times its clock ticked.
The BIH averages these to produce the TAI. Originally the atomic time is computed to make the atomic second qual to the mean solar second.
Since the mean solar day gets longer, the BIH made a necessary correction called UTC (universal coordinated time).
Berkeley Algorithm (for coordinating clocks internally, not caring about external times)
Time time server (a time daemon) is active. Polls every machine periodically to ask what time is there. Based on the answers, computes an average time. Tells all other machines to advance their clocks to the new time or slow their clocks down until some specified reduction has been achieved.
The time daemon’s time is set manually by operator periodically.
Averaging Algorithm
Each machine broadcasts the current time according to its own clock. After a machine broadcasts its time, it starts a timer to collect all other broadcasts that arrive during some interval . When all broadcasts arrive, an algorithm is run to compute the new time from them.
Simplest algorithm: Average the values from all other machines.
Variations: Discard highest and lowest value and average the rest; add to each message na estimate of the propagation time from source.
For algorithms where clocks must not only be the same, but also must not deviate from real-time, we speak of physical clocks.
For algorithms where only internal consistency of clocks matters, we speak of logical clocks.
Logical Clock Synchronization
Happens-before relation:
If and are events in the same process, and happens-before , then is true.
If is the event of a message being sent by one process, and is the event of the same message being received by another process, then is also true.
If two events, and , happen in different processes that do not exchange messages, then is not true, but neither is . These events are said to be concurrent ().
Happens-before is transitive: and .
Lamport’s Algorithm
Capturing happens-before relation.
Each process has a local monotonically increasing counter, called its logical clock .
Each event that occurs at process is assigned a Lamport timestamp .
Rules:
- is incremented before event is issued at
- When sends message , it adds [this is event
send(m)]. - On receiving computes ; timestamp event
receive(m).
Lecture 13
Lamport Timestamps
Two data centres, one on east coast other on west coast. $1,000 in account.
Depositing $100 into east coast, and replicating on west coast.
Interest of 10% applied on west coast, replicate on east coast.
On east coast, $100 deposit may be registered first, and then interest is computed ($1,210).
On west coast, interest may be registered first, and then $100 deposit is added ($1,200).
We want to define an order such that all replicas follow that same order.
Total-ordered multicast
To send a message, sender adds local logical time to the message. Broadcasts the message, waits for ACKs from all nodes.
For nodes, upon receiving a message, add message to queue order by time stamp in the message. Update the local logical time. Broadcast an ACK to all nodes. If the message at the head of the queue receives ACKs from all nodes, deliver the message to the application.
Assumes the network is reliable (no packet loss, no packet reordering, nodes do not fail).
We buffer in since 5 isn’t in our queue yet (since we receive the ACK before the actual message). Once we receive 5 () from , we can instantly execute in .
To deal with the problem of multiple servers having the same logical clock time, we concatenate the IP to the logical clock to create a floating point number.
Question
In the concatenation, does the IP or logical clock come first?
Logical clock comes first. If IP was first, some servers with higher IP addresses will always be higher than others (and thus later).
Clients don’t participate in total-ordered multicast, servers do.
Vector clocks are constructed by letting each process maintain a vector with the following two properties.
- is the number of events that have occurred so far at . In other words is the local logical clock at process .
- If then knows that events have occurred at . It is this ‘s knowledge of the local time at .
Steps carried out accomplish property 2.
- Before executing an event executes .
- When process sends a message to , it sets ‘s (vector) timestamp () equal to after having executed the previous step.
- Upon the receipt of a message , process adjusts its own vector by setting for each , after which it executes the first step and delivers the message to the application.
Causal-ordered multicast
To send a message, the sender increments the vector clock only on message send events. Adds the local vector clock to the message, broadcasts the message.
For nodes, upon receiving a message from node :
- On node buffer the message until:
- (logical clock in the receive message = what you () think the senders logical clock is + 1)
- for all
Update the local clock on for the sender only .
Difference between this and total ordering multicast.