Professor: Dan Holtby | Fall: Term 2025

Lecture 1

Java is like C++, BUT

  • No operator overloading. if (word == "Waterloo") is always false. Use the .equals method
  • Single Inheritance
  • It uses the word Generic not Template

Price per GB over the years seems to drop at exponential rate.

  • Facebook generates ~4 million GB a day
  • 500 million new tweets a day
  • 720,00 hours of new YouTube videos per day

We generate ~500 exabytes per day. This seems like a lot, but nothing compared to what it is going to be.

Big data is horizontal scaling / cheap computers, just more of them.

Parallelization is hard. Deadlocks, livelocks, race conditions except on more than one computer.

We are not going to design fault-tolerant distributed computer networks. We are going to use one.

You don’t need to understand the hardware to use assembly. You don’t need to understand assembly to use C++. You didn’t need to understand a hash table to use std::UnorderedMap.

Question

How can we abstract a distributed network?

This course involves a little bit of Data Science Tools, Analytics Infrastructure, Execution Infrastructure.

Lecture 2

For text processing we usually start with simple word counters.

Word Count in Python

counts = Counter()
with open("file.txt", "rt") as file:
	for line in line:
		counts.update(tokenize(line))

This is basically as fast as your HDD.

Most simple processing is likely bottlenecked by the load, not the processing.

Horizontal scaling: 100x the servers, 100x the speed? Not always true. Not all algorithms can be run in parallel without overhead.

Word count in parallel - Divide the file into 1/100th. Each server counts and adds the totals together. This is easier said than done.

Question

We can’t split it by line number, why?

We don’t know where the new lines are. We’d have to scan through and count, this is slow.

Question

Can we split by bytes?

This works, but has an issue. We could end up splitting a word in the middle of the word. There are many solutions to this.

MapReduce

Two functions

  • Map: Python’s / Racket’s CS135 Map
  • Reduce: Python’s Reduce

MapReduce is based around key-value pairs (records). This is a common way to break things down.

If the input is a text file:

Key - Position of a line (Byte # not line #) Value - Text of a line

Programmer defines two functions:

  • map:
  • reduce:

The output key can be different than the input key.

MapReduce requires a key, even though we only need a single integer (the count).

A file gets split and sent to a bunch of mappers. They then compute their counts and send them to one of the mappers to then apply the reduce function to merge the results. The map function processes line by line.

What about memory?

The Counter() used 8 bytes max.

How much does the Dictionary use? if there are unique words.

In 10TB of data, what is ? We don’t care, we are working line by line.

How many unique words per line? Not many (Heaps law).

But thinking at scale means we have to be careful when we say something is probably small.

Alternative to Map Counting. We don’t need a dictionary for each row, we can emit words as soon as we see them (so (key, 1)). Might seem less efficient but repeats words aren’t that common.

The mappers then send the list of (key, 1) pairs to the reducer, where the key is the word and the value is its count. The reducer adds up all intermediate results. But it can now be a bottleneck.

Question

Can we have multiple reducer like mappers?

def map(line):
	for word in line:
		emit(word, 1)
 
def reduce(key, values):
	sum = 0
	for v in values:
		sum += v
	emit(key, sum)

The Reduce server is getting too much data. If the file was 10TB, then more than 10TB will arrive.

Question

What if we have multiple reducers?

Each reducer gets ALL pairs for a given key.

Programmer defines three functions:

  • map:
  • reduce:
  • partition:

Partition will default to a hash function that hashes the key and ignores the value.

Partition takes a key-value pair plus the numbers of reducers () and assigns it to one of the reducers. Although it is given the value as well as the key, you normally want to decide based only on the key. Otherwise this defeats the purpose.

def map(pos: Long, text: String):
	for word in tokenize(text):
		emit(word, 1)
 
def reduce(key: String, Iterator[int]):
	sum = 0
	for v in values:
		sum += v
	emit(key, sum)
 
def partition(key: String, value: Int, reducer_count: Nat):
	return hashcode(key) % reducer_count

When dealing with hash codes that can be negative: hashcode & INT_MAX % n

The term for sending records to different reducers depending on what the partition tells us is called shuffling.

Hadoop is the most famous open-source implementation of MapReduce. Google has a proprietary implementation in C++.

Hadoop provides an open-source implementation in Java. Development begun by Yahoo, later an Apache project. Used in production at Facebook, Twitter (), Netflix, etc.

Framework: Assigns workers to map and reduce. Divides data between map workers. Groups intermediate values (sorting pairs by key, determining which pairs go to which reduce worker). Handles errors (some workers might fail or crash).

Input goes to mappers. Mappers complete their tasks and sort their maps.

We then group values by key across mappers.

We then reduce and receive out output.

Question

What’s the slowest operation here?

Sending intermediate results from the mappers to the reducers.

How about only one value per key per mapper?

def combine(key, values):
	sum = 0
	for v in values:
		sum += v
	emit(key,sum)

Programmer defines four functions:

  • map:
  • combine:
  • reduce:
  • partition:

Combine is an optional thing the mapper / reducer might do when idle. Input and output types are not allowed to be different.

Conceptually it should always be producing one key-value pair, since the whole point is to combine many values into one for the same key.

Combine might be the same as reduce if , then it would be legal to do.

It also might not. Even if legal, it might be inappropriate. It can run but give the wrong answer.

For averages it can give the wrong answer: Mean(2,3,4) 3 versus Mean(Mean(2,3),4) 3.25

Physical View

  1. User submits a job to the cluster
  2. The cluster schedules the map and reduce tasks - the number of map tasks is derived from the size of the input file(s). The number of reduce tasks is set by the user
  3. The workers start reading and mapping their split of the file
  4. As they map, they save the “intermediate” key-value pairs to the local file system, not to the network file system
  5. When the job is nearly finished, reducer workers start asking mappers for intermediate files that are ready
  6. Once a reducer has all of its intermediate files it begins applying reduce - the output is written to the network file system

Distributed Group By in MapReduce

Map: Map outputs are buffered in memory in a circular buffer. When buffer reaches threshold, contents are spilled to disk. Spillers are merged into a single, partitioned file (sorted within each partition). Combine runs (or might run) during the merges.

Reduce: First, map outputs are copied over to reducer machine. Sort is a multi-pass merge of map outputs (happens in memory and on disk). Combiner runs during the merges. Final merge pass goes directly into reducer.

The Combiner in the Reducer usually runs when waiting for some mappers to finish (when some are already done). Not as advantageous as running combiner in Mapper though.

Lecture 3

class MRIterator(Iterator):
	def __init__(self, files):
		self.files = files
		self.key = getSmallestKey(files)
		
	def next(self):
		key, value = getSmallestRecord(files)
		if key != self.key:
			raise EOFEXception
		commitRead(files)
		return value

Transferring from mapper to reducer is the bottleneck.

We assign chunks to servers (this is called sharding).

We put multiple servers in a server rack. Clusters of racks of servers build a data centre.

Storage Hierarchy:

  • Local machine (L1/L2/L3 cache, memory, SSD, magnetic disks)
  • Remote machine (same rack)
  • Remote machine (different rack)
  • Remote machine (different server)

Distributed File System

Assume we have 20 identical networked servers each with 100 TB Of disk space.

Question

How would you store a file on these servers?

If a server that contains one of the chunks fails, the files become corrupted. Since failure rate is high on commodity servers, we need to figure out a solution.

If each chunk is stored on multiple servers, if a server fails there is a backup. The number of copies determines how much resilience we want.

Hadoop Distributed File System (HDFS)

  • Very large distributed file system (10k nodes, 100 million files, 10 PB).
  • Assume commodity hardware
    • Files are replicated to handle hardware failure
    • Detect failures and recover from them
  • Optimized for batch processing
    • Provides very high aggregate bandwidth

DFS

  • Data coherency
    • Write-once-read-many access model
    • Client can only append to existing files
  • Files are broken into blocks
    • Each block is replicated on multiple DataNodes
  • Intelligent Client
    • Client can find location of blocks

HDFS Architecture

The namenode stores where the data is located on datanodes (not the actual data)

May still have a redundant namenode in the background if the primary one fails. HDFS client gets data information from namenode and then interacts with datanodes to get that data.

HDFS client asks the HDFS namenode to read from a file. The HDFS namenode returns the block id and block location to the client. The HDFS client then goes to the HDFS datanode to get the block data.

Functions of a NameNode

  • Managers file system namespace
    • Maps a file name to a set of blocks
    • Maps a block to the DataNodes where it resides
  • Cluster configuration management

NameNode metadata

  • Metadata in memory
    • Entire metadata is in main memory
  • Types of metadata
    • List of files
    • List of Blocks for each file
    • List of DataNodes for each block
  • Transaction Log

DataNode

  • Block server
    • Stores data in the local file system
    • Stores metadata of a block
    • Serves data and metadata to clients
  • Block report
    • Sends a report of all existing blocks to the namenode
  • Facilitates pipelining of data
    • Forwards data to other specified DataNodes

Block Placement Policy

  • Current policy: 3 replicates will be stored on at least 2 racks
    • One replica on a local node
    • Second replica on a remote rack
    • Third replica on same remote rack
      • Rebalance might later move this to a third rack
  • Clients read from nearest replica

If all data is on one rack, all it takes is one fire to lose it all. Inter-rack communication has higher latency, lower bandwidth than intra-rack, so the remote replica is sent to one rack, and assigned to two nodes there.

DataNodes send heartbeat to the Namenode once every 3 seconds. NameNode uses heartbeats to detect DataNode failure.

Replication Engine - NameNode detects DataNode failures. Chooses new DataNodes for new replicates, balances disk usage, and balances communication traffic to DataNode.

Hadoop Cluster Architecture

Question

How do we get data to the workers

Compute nodes communicate with SAN: Storage Area Network

This makes sense for compute-intensive tasks as computations are likely to take long while even on sophisticated hardware. Communication costs are outweighed by computation costs. For data-intensive tasks, computations aren’t likely to take nearly as long, so computation costs are outweighed by the communication costs. Likely to experience latency even with high speed transfer.

Solution is to not move data to workers, but move workers to data.

Key idea: Co-locate storage and compute. Start up worker on nodes that hold the data.

If a server is responsible for both data storage and processing, Hadoop can do some optimization.

On a Hadoop cluster, resource manager is in charge of running tasks. The resource manager assigns jobs to workers that already have relevant data (so it talks to the NameNode).

Combiner design

Combiners are like Reducers - they have the same signature. A reducer can have different key types. Combiners are optional.

def map(key: String, value: Int):
	emit(key, (value, 1))
 
def combine(key: String, values: List[(Int, Int)]):
	for (v,c) in values:
		sum += v
		count += c
	emit(key, (sum, count))
 
def reduce(key: String, values: List[(Int, Int)]):
	for (v,c) in values:
		sum += v
		count += c
	emit(key, sum / count)

To go faster: Combiners improve performance by reducing network traffic. Combiners work during file merges. Local filesystem is faster than network access. But memory is faster than the filesystem.

class mapper:
	def setup(self):
		self.sums = Map()
		self.counts = Map()
	def map(self, key, value):
		self.sums[key] += value
		self.counts[key] += 1
	def cleanup(self):
		for (key, count) in counts:
			emit(key, (sums[key], count))

In-Mapper Combin

  • Preserve state across calls to map
  • Advantage: Speed
  • Disadvantage: Requires memory management

Question

Can we do this for word frequency?

class mapper:
	def setup(sefl):
		counts = HashMap()
	def map(self, key: Long, value: String):
		for word in tokenize(value):
			counts[word] += 1
	def map_cleanup():
		for (key, count) in counts:
			emit(key, count)