Professor: Dan Holtby | Fall: Term 2025

Lecture 1

Java is like C++, BUT

  • No operator overloading. if (word == "Waterloo") is always false. Use the .equals method
  • Single Inheritance
  • It uses the word Generic not Template

Price per GB over the years seems to drop at exponential rate.

  • Facebook generates ~4 million GB a day
  • 500 million new tweets a day
  • 720,00 hours of new YouTube videos per day

We generate ~500 exabytes per day. This seems like a lot, but nothing compared to what it is going to be.

Big data is horizontal scaling / cheap computers, just more of them.

Parallelization is hard. Deadlocks, livelocks, race conditions except on more than one computer.

We are not going to design fault-tolerant distributed computer networks. We are going to use one.

You don’t need to understand the hardware to use assembly. You don’t need to understand assembly to use C++. You didn’t need to understand a hash table to use std::UnorderedMap.

Question

How can we abstract a distributed network?

This course involves a little bit of Data Science Tools, Analytics Infrastructure, Execution Infrastructure.

Lecture 2

For text processing we usually start with simple word counters.

Word Count in Python

counts = Counter()
with open("file.txt", "rt") as file:
	for line in line:
		counts.update(tokenize(line))

This is basically as fast as your HDD.

Most simple processing is likely bottlenecked by the load, not the processing.

Horizontal scaling: 100x the servers, 100x the speed? Not always true. Not all algorithms can be run in parallel without overhead.

Word count in parallel - Divide the file into 1/100th. Each server counts and adds the totals together. This is easier said than done.

Question

We can’t split it by line number, why?

We don’t know where the new lines are. We’d have to scan through and count, this is slow.

Question

Can we split by bytes?

This works, but has an issue. We could end up splitting a word in the middle of the word. There are many solutions to this.

MapReduce

Two functions

  • Map: Python’s / Racket’s CS135 Map
  • Reduce: Python’s Reduce

MapReduce is based around key-value pairs (records). This is a common way to break things down.

If the input is a text file:

Key - Position of a line (Byte # not line #) Value - Text of a line

Programmer defines two functions:

  • map:
  • reduce:

The output key can be different than the input key.

MapReduce requires a key, even though we only need a single integer (the count).

A file gets split and sent to a bunch of mappers. They then compute their counts and send them to one of the mappers to then apply the reduce function to merge the results. The map function processes line by line.

What about memory?

The Counter() used 8 bytes max.

How much does the Dictionary use? if there are unique words.

In 10TB of data, what is ? We don’t care, we are working line by line.

How many unique words per line? Not many (Heaps law).

But thinking at scale means we have to be careful when we say something is probably small.

Alternative to Map Counting. We don’t need a dictionary for each row, we can emit words as soon as we see them (so (key, 1)). Might seem less efficient but repeats words aren’t that common.

The mappers then send the list of (key, 1) pairs to the reducer, where the key is the word and the value is its count. The reducer adds up all intermediate results. But it can now be a bottleneck.

Question

Can we have multiple reducer like mappers?

def map(line):
	for word in line:
		emit(word, 1)
 
def reduce(key, values):
	sum = 0
	for v in values:
		sum += v
	emit(key, sum)

The Reduce server is getting too much data. If the file was 10TB, then more than 10TB will arrive.

Question

What if we have multiple reducers?

Each reducer gets ALL pairs for a given key.

Programmer defines three functions:

  • map:
  • reduce:
  • partition:

Partition will default to a hash function that hashes the key and ignores the value.

Partition takes a key-value pair plus the numbers of reducers () and assigns it to one of the reducers. Although it is given the value as well as the key, you normally want to decide based only on the key. Otherwise this defeats the purpose.

def map(pos: Long, text: String):
	for word in tokenize(text):
		emit(word, 1)
 
def reduce(key: String, Iterator[int]):
	sum = 0
	for v in values:
		sum += v
	emit(key, sum)
 
def partition(key: String, value: Int, reducer_count: Nat):
	return hashcode(key) % reducer_count

When dealing with hash codes that can be negative: hashcode & INT_MAX % n

The term for sending records to different reducers depending on what the partition tells us is called shuffling.

Hadoop is the most famous open-source implementation of MapReduce. Google has a proprietary implementation in C++.

Hadoop provides an open-source implementation in Java. Development begun by Yahoo, later an Apache project. Used in production at Facebook, Twitter (), Netflix, etc.

Framework: Assigns workers to map and reduce. Divides data between map workers. Groups intermediate values (sorting pairs by key, determining which pairs go to which reduce worker). Handles errors (some workers might fail or crash).

Input goes to mappers. Mappers complete their tasks and sort their maps.

We then group values by key across mappers.

We then reduce and receive out output.

Question

What’s the slowest operation here?

Sending intermediate results from the mappers to the reducers.

How about only one value per key per mapper?

def combine(key, values):
	sum = 0
	for v in values:
		sum += v
	emit(key,sum)

Programmer defines four functions:

  • map:
  • combine:
  • reduce:
  • partition:

Combine is an optional thing the mapper / reducer might do when idle. Input and output types are not allowed to be different.

Conceptually it should always be producing one key-value pair, since the whole point is to combine many values into one for the same key.

Combine might be the same as reduce if , then it would be legal to do.

It also might not. Even if legal, it might be inappropriate. It can run but give the wrong answer.

For averages it can give the wrong answer: Mean(2,3,4) 3 versus Mean(Mean(2,3),4) 3.25

Physical View

  1. User submits a job to the cluster
  2. The cluster schedules the map and reduce tasks - the number of map tasks is derived from the size of the input file(s). The number of reduce tasks is set by the user
  3. The workers start reading and mapping their split of the file
  4. As they map, they save the “intermediate” key-value pairs to the local file system, not to the network file system
  5. When the job is nearly finished, reducer workers start asking mappers for intermediate files that are ready
  6. Once a reducer has all of its intermediate files it begins applying reduce - the output is written to the network file system

Distributed Group By in MapReduce

Map: Map outputs are buffered in memory in a circular buffer. When buffer reaches threshold, contents are spilled to disk. Spillers are merged into a single, partitioned file (sorted within each partition). Combine runs (or might run) during the merges.

Reduce: First, map outputs are copied over to reducer machine. Sort is a multi-pass merge of map outputs (happens in memory and on disk). Combiner runs during the merges. Final merge pass goes directly into reducer.

The Combiner in the Reducer usually runs when waiting for some mappers to finish (when some are already done). Not as advantageous as running combiner in Mapper though.

Lecture 3

class MRIterator(Iterator):
	def __init__(self, files):
		self.files = files
		self.key = getSmallestKey(files)
		
	def next(self):
		key, value = getSmallestRecord(files)
		if key != self.key:
			raise EOFEXception
		commitRead(files)
		return value

Transferring from mapper to reducer is the bottleneck.

We assign chunks to servers (this is called sharding).

We put multiple servers in a server rack. Clusters of racks of servers build a data centre.

Storage Hierarchy:

  • Local machine (L1/L2/L3 cache, memory, SSD, magnetic disks)
  • Remote machine (same rack)
  • Remote machine (different rack)
  • Remote machine (different server)

Distributed File System

Assume we have 20 identical networked servers each with 100 TB Of disk space.

Question

How would you store a file on these servers?

If a server that contains one of the chunks fails, the files become corrupted. Since failure rate is high on commodity servers, we need to figure out a solution.

If each chunk is stored on multiple servers, if a server fails there is a backup. The number of copies determines how much resilience we want.

Hadoop Distributed File System (HDFS)

  • Very large distributed file system (10k nodes, 100 million files, 10 PB).
  • Assume commodity hardware
    • Files are replicated to handle hardware failure
    • Detect failures and recover from them
  • Optimized for batch processing
    • Provides very high aggregate bandwidth

DFS

  • Data coherency
    • Write-once-read-many access model
    • Client can only append to existing files
  • Files are broken into blocks
    • Each block is replicated on multiple DataNodes
  • Intelligent Client
    • Client can find location of blocks

HDFS Architecture

The namenode stores where the data is located on datanodes (not the actual data)

May still have a redundant namenode in the background if the primary one fails. HDFS client gets data information from namenode and then interacts with datanodes to get that data.

HDFS client asks the HDFS namenode to read from a file. The HDFS namenode returns the block id and block location to the client. The HDFS client then goes to the HDFS datanode to get the block data.

Functions of a NameNode

  • Managers file system namespace
    • Maps a file name to a set of blocks
    • Maps a block to the DataNodes where it resides
  • Cluster configuration management

NameNode metadata

  • Metadata in memory
    • Entire metadata is in main memory
  • Types of metadata
    • List of files
    • List of Blocks for each file
    • List of DataNodes for each block
  • Transaction Log

DataNode

  • Block server
    • Stores data in the local file system
    • Stores metadata of a block
    • Serves data and metadata to clients
  • Block report
    • Sends a report of all existing blocks to the namenode
  • Facilitates pipelining of data
    • Forwards data to other specified DataNodes

Block Placement Policy

  • Current policy: 3 replicates will be stored on at least 2 racks
    • One replica on a local node
    • Second replica on a remote rack
    • Third replica on same remote rack
      • Rebalance might later move this to a third rack
  • Clients read from nearest replica

If all data is on one rack, all it takes is one fire to lose it all. Inter-rack communication has higher latency, lower bandwidth than intra-rack, so the remote replica is sent to one rack, and assigned to two nodes there.

DataNodes send heartbeat to the Namenode once every 3 seconds. NameNode uses heartbeats to detect DataNode failure.

Replication Engine - NameNode detects DataNode failures. Chooses new DataNodes for new replicates, balances disk usage, and balances communication traffic to DataNode.

Hadoop Cluster Architecture

Question

How do we get data to the workers

Compute nodes communicate with SAN: Storage Area Network

This makes sense for compute-intensive tasks as computations are likely to take long while even on sophisticated hardware. Communication costs are outweighed by computation costs. For data-intensive tasks, computations aren’t likely to take nearly as long, so computation costs are outweighed by the communication costs. Likely to experience latency even with high speed transfer.

Solution is to not move data to workers, but move workers to data.

Key idea: Co-locate storage and compute. Start up worker on nodes that hold the data.

If a server is responsible for both data storage and processing, Hadoop can do some optimization.

On a Hadoop cluster, resource manager is in charge of running tasks. The resource manager assigns jobs to workers that already have relevant data (so it talks to the NameNode).

Combiner design

Combiners are like Reducers - they have the same signature. A reducer can have different key types. Combiners are optional.

def map(key: String, value: Int):
	emit(key, (value, 1))
 
def combine(key: String, values: List[(Int, Int)]):
	for (v,c) in values:
		sum += v
		count += c
	emit(key, (sum, count))
 
def reduce(key: String, values: List[(Int, Int)]):
	for (v,c) in values:
		sum += v
		count += c
	emit(key, sum / count)

To go faster: Combiners improve performance by reducing network traffic. Combiners work during file merges. Local filesystem is faster than network access. But memory is faster than the filesystem.

class mapper:
	def setup(self):
		self.sums = Map()
		self.counts = Map()
	def map(self, key, value):
		self.sums[key] += value
		self.counts[key] += 1
	def cleanup(self):
		for (key, count) in counts:
			emit(key, (sums[key], count))

In-Mapper Combin

  • Preserve state across calls to map
  • Advantage: Speed
  • Disadvantage: Requires memory management

Question

Can we do this for word frequency?

class mapper:
	def setup(sefl):
		counts = HashMap()
	def map(self, key: Long, value: String):
		for word in tokenize(value):
			counts[word] += 1
	def map_cleanup():
		for (key, count) in counts:
			emit(key, count)

Lecture 4

New Problem: Term Co-occurrence

: number of times word and word co-occur in some context.

Example

How many times is followed immediately by in a sentence.

is where is the vocabulary.

There are two approaches to this.

  1. Pair - Compute individual cells
  2. Stripe - Compute individual rows

We start with describing the Pairs implementation.

Mapper

  • Input: Sentence
  • Output: , for all pairs of words in the sentence

Reducer

  • Input: Pair of words, list of counts
  • Output: Pair of words, count

The reducer can also serve as the combiner.

def map(key: Long, value: String):
	for u in tokenize(value):
		for each v that cooccurrs with u in value:
			emit((u,v), 1)
 
def reduce(key: (String, String), values: List[Int]):
	for value in values:
		sum += value
	emit(key, sum)

Pairs is easy to implement and easy to understand. But there are a lot of pairs and the combiner doesn’t do much. This is because there are potential keys.

Now on to Stripes.

Mapper

  • Input: Sentence
  • Output: , where
    • is a word from the input
    • are all words that co-occur with
    • is the number of times co-occur

means a map.

def map(key: Long, value: String):
	for u in tokenize(value):
		counts = {}
		for each v that cooccurs with u in value:
			counts(v) += 1
		emit(u, counts)
 
def reduce(key: Long, values: List[Map[String->Int]]):
	for value in values:
		sum += value
	emit(key, sum)

Adding two maps means taking the union of the keys, and setting the value to be the sum of the two values if it occurs in both maps.

Stripes has fewer key-value pairs to send, and combiners do more work. This is computationally intensive, and the map may not fit in memory.

For English words and normal sentence lengths, the stripe fits in memory easily.

Relative frequencies (another problem)

Where is the number of co-occurrences of and , and is the sum of over all .

Pairs - reduce

def reduce(key: Pair[String], values: List[Int]):
	let (a,b) = key;
	for v in values:
		sum += v
	emit((b,a), sum / freq(a))

Question

What is freq(a)?

We don’t know until we have processed all keys of the form

Pairs - mapper and partitioner

def map(key: Long, value: String):
	for u in tokenize(value):
		for v in cooccurrence(u):
			emit((u,v), 1)
			emit((u, "*"), 1)
 
def partition(key: Pair, value: Int, N: Int):
	return hash(key.left) % N

Improved version

def map(key: Long, value: String):
	for u in tokenize(value):
		for v in cooccurrence(u):
			emit((u,v), 1)
		emit((u, "*"), len(cooccurrence(u)))
 
def partition(key: Pair, value: Int, N: Int):
	return hash(key.left) % N

Pairs - reducer

marginal = 0
 
def reduce(key: Pair, values: List[Int]):
	let(a,b) = key
	for (v in values):
		sum += v
	if (b == "*"):
		marginal = sum
	else:
		emit((b,a), sum / marginal)

This will work in Hadoop using English.

The assignment requires computing Pairwise Mutual Information (PMI)

PMI is useful for establishing “semantic distance” between tokens. Tokens with similar lists of cooccurrences sorted by PMI likely have similar meaning. 1 usually means exact same meaning, and -1 usually means exact opposite meaning.

Lecture 5

The datacenter as a computer.

Question

What is the instruction set?

Hadoop.

Layers of abstraction

  • Higher level language
  • Lower level language
  • Assembly
  • Machine code
  • Instruction set architecture
  • Micro-architecture
  • Gates, adders, registers
  • Electronics
  • Physics

Data centre abstraction

  • Spark
  • Hadoop task
  • HDFS / Hadoop framework
  • Cluster of computers
  • Individual servers

Hadoop is an alternative to assembly, but it has a lot of boilerplate and repetition. It is very tedious to program.

Question

Can we create a Distributed C (or Python) to Hadoop’s assembly?

Facebook built Apache Hive and Yahoo built Apache Pig.

Hive is on top of MapReduce. It is good for large datasets accessed in a linear fashion.

Pig and Hive are both converted to MapReduce jobs at the end of the day.

Example

Find the top 10 most visited pages in each category with Pig

visits = load '/data/visits' as (user, url, time);
gVisits = group visits by url;
visitCounts = foreach gVisits generate url, count(visits);
urlInfo = load '/data/urlInfo' as (url, category, pRank);
visitCounts = join visitCounts by url, urlInfo by url;
gCategories = group visitCounts by category;
topUrls = foreach gCategories generate top(visitCounts,10);
 
store topUrls into '/data/topUrls';
flowchart TD
a(load visits)
b(group by url)
c(for each url generate count)
e(load urlInfo)
d(join on url)
f(group by category)
g(foreach category generate top 10 urls)

subgraph Map1
a ---> b
end
subgraph Reduce1
b ---> c
end
subgraph Reduce2
c ---> d
end
subgraph Map2
e ---> d
end
subgraph Map3
d ---> f
end
subgraph Reduce 3
f ---> g
end

MapReduce workflows have a lot of disk i/o involved which significantly reduces running MapReduce jobs like this.

We can do Map-to-Map (usually not a very strong need).

There is a higher-order Mapper class called ChainMapper which lets you pass in 2+ Mapper classes and it will chain them together.

We can also have Map-to-Reduce-to-Reduce.

The second job’s map function is pretty much an identity function map.

Question

Is there a better instruction set?

Hadoop 2.

Nodes are now resource managers. Can do MapReduce the same as always, and can also do other things.

Spark is more popular than Hadoop today. The only mechanism we had in MapReduce was map and reduce. Spark brings more.

Resilient Distributed Dataset - RDD. RDD[T] is a collection of values of type T.

RDDs are divided into partitions. Workers operate on partitions independently.

Lecture 6

Spark provides a more enriched instruction set.

map(f) - is given one value of type , and returns one value of type . Each worker will call on each item from RDD. Each worker will put the value returned by into a partition of RDD

flatMap(f) - is given one value of type , and returns an iterator/iterable collection that produces values of type . Each worker will call on each item from RDD. Each worker will then traverse the iterable returned by , and each value gets added to RDD.

mapPartitions(f) - is given an iterator that produces value of type , and returns an iterator/iterable collection that produces values of type . Each worker will call ONCE where is an iterator that traverses all items in that worker’s partition of RDD. Each worker will then traverse the iterable returned by , and each value gets added to RDD just like flapMap.

Reduce-like operations.

groupByKey - like MapReduces shuffle. Not the reduce part, this is just the shuffle that brings the pairs to a single place. You’d then use map flatMap to perform the reduce action itself.

reduceByKey - like Map Reduce’s shuffle + combine + reduce.

aggregateByKey - a more complicated reduceByKey.

Question

What is Spark?

Fast and expressive cluster computing engine compatible with Apache Hadoop.

It is up to 10x faster on disk, 100x in memory, and 2-5x less code.

Efficient - uses general execution graphs and in-memory storage.

Usable - Rich APIs in Java, Scala, Python & an interactive shell.

Key concept of RDDs: Write programs in terms of operations on distributed datasets.

RDD:

  • Collections of objects spread across a cluster, stored in RAM or on disk. Built through parallel transformations. Automatically rebuilt on failure.

Operations:

  • Transformations and actions

RDDs track lineage information that can be used to efficiently recompute lost data.

msgs = textFile.filter(lambda s: s.startsWith("ERROR"))
				.map(lambda s: s.split("\t")[2])
flowchart LR
a[HDFS File]
b[Filtered RDD]
c[Mapped RDD]

a -- filter ---b
b -- map ---c

SparkContext is the main entry point to Spark functionality. Available in shell as variable sc.

# Turn a Python collection into an RDD
nums = sc.parallelize([1,2,3])
# Load text file from local FS, HDFS, s3
sc.textFile("directory/*.txt")
 
# Pass each element through a function
squares = nums.map(lambda x: x*x) # {1,4,9}
 
# Keep elements passing a predicate
even = squares.filter(lambda x : x % 2 == 0) # {4}
 
# Map each element to zero or more others
nums.flatMap(lambda x: => range(x))
# => {0,0,1,0,1,2}
 
nums = sc.parallelize([1,2,3])
 
nums.collect() # => [1,2,3]
nums.take(2) # => [1,2]
nums.count() # => 3
 
nums.reduce(lambda x, y: x + y) # => 6
 
nums.saveAsTextFile("hdfs://file.txt")

Spark’s distributed reduce transformations operate on RDDs of key-value pairs.

pair = (a,b)
pair[0] # => a
pair[1] # => b
val pair = (a,b)
pair._1 // => a
pair._2 // => b
Tuple2 pair = new Tuple2(a,b);
pair._1 // => a
pair._2 // => b

While this seems bad, we rarely need to deal with pairs in Scala. You can use pattern matching / case lambdas in a way that’s not entirely unlike packing in Python.

Key-value operations

pets = sc.parallelize(
		[("cat", 1), ("dog", 1), ("cat", 2)])
	
pets.reduceByKey(lambda x, y: x + y) # => {(cat, 3), (dog, 1)}
pets.groupByKey() # => {(cat, [1, 2]), (dog, [1])}
pets.sortByKey() # => {(cat, 1), (cat, 2), (dog, 1)}

Word count in Python

lines = sc.textFile("hamlet.txt")
counts = lines.flatMap(lambda line: line.split(" "))
				.map(lambda word: (word, 1))
				.reduceByKey(lambda x, y: x + y)
							.saveASsTextFile("results")

Word count in Scala

val textFile = sc.textFile("hamlet.txt")
 
textFile
	.flatMap(line => line.split(" "))
	.map(word => (word, 1))
	.reduceByKey((x,y), => x + y)
	.saveAsTextFile("results")
visits = sc.parallelize([("index.html", "1.2.3.4")])
 
pageNames = sc.parallelize([("index.html", "Home")])
 
visits.join(pageNames)
# ("index.html", "(1.2.3.4, "Home"))

All the pair RDD operations take an optional second parameter for number of tasks.

words.reduceByKey(lambda x, y: x + y, 5)
words.groupByKey(5)
visits.join(pageViews, 5)

The default is to use the same number of partitions for destination that the source has. If you specify spark.default.parallelism, it will use this as the default instead.

Under the hood: DAG scheduler

General task graphs that automatically pipeline functions. Data locality aware and partitioning aware to avoid shuffles. A job is broken down to multiple stages that form a DAG.

You can get the DAG from an RDD using the toDebugString method. It’s also viewable through the Hadoop monitoring page.

Narrow dependencies are faster than wide dependencies because it does not require shuffling data between working nodes.

reduceByKey, groupByKey will have narrow dependencies if the upstream RDD is already partitioned by key.

Since Spark avoids heavy disk i/o, it significantly improves the performance (relative to Hadoop).

Spark outperforms Hadoop in iterative programs because it tries to keep the data that will be used again in the next iteration in memory. In contrast, Hadoop always read and write from/to disk.

Hadoop’s original limitation is that it can only run MapReduce.

Question

What if we want to run other distributed frameworks?

YARN = Yet-Another-Resource-Negotiator

Provides an API to develop any generic distributed application. Handles scheduling and resource request. (MR2) is one such application in YARN.

The Job tracker in Hadoop v1.0 was specific to Hadoop jobs. But the resource manager in v2.0 can support different types of jobs (Hadoop, Spark, etc.).

Spark architecture has a driver program, cluster manager, and Worker Nodes.

flowchart LR
a[SparkContext]
b[Cluster Manager]
c[Executor]
d[Executor]

subgraph Driver Program
a
end
a <---> b
b
b <---> c
subgraph Worker Node
c
end
subgraph Worker Node 
d
end
d <---> c
a <---> c
a <---> d
b <---> d

There are multiple tasks per executor.

Question

Why is this important?

To work, the Spark driver must send relevant code to run each task.

thresh = 5
myRdd.map(lambda x: x >= thresh)

The lambda captures thresh, so it gets packaged up too.

If you Broadcast a value, then Spark only sends one copy per Executor (worker machine) not per Task.

thresh = sc.broadcast(5)
myRdd.filter(lambda x : x > thresh.value)

Constant means constant, Broadcast variables are read-only

thresh = sc.broadcast(5)
thresh.value = 6
# Error value is not assignable

It’s possible to make a broadcast variable where the value is a mutable type. This will work (in that it won’t give you the above errors), but it won’t work in that each worker has its own copy of this value, so if one of them updates a dictionary, the other workers don’t see that.

Lecture 7

Question

A Broadcast variable carries information from Driver to Executor. What if we want communication from Executor back to the Driver?

Accumulator.

lineCounter = sc.accumulator(0)
 
def split_and_count(line):
	lineCounter.add(1)
	return line.split()
 
myRdd.map(split_and_count) 
linecounter.value()
val lineCounter = sc.longAccumulator
 
def split_and_count(line : String) = { 
	lineCounter.add(1)
	line.split()
}
 
myRdd.map(split_and_count)
lineCounter.value

There are different types of accumulators (longAccumulator, doubleAccumulator)

By default, Spark shuffles use a hash partitioner, or sometimes a range partitioner (for sorting).

Ranger partitioner: Workers gather statistics on their key distributions then heuristics attempt to divide the keys into ranges of approximately-equal sizes.

 
class myPartitioner(override val numPartitions : Int) {
	def getPartition(key : Any) : Int = key match {
		case x : Int => logic % numPartitions
		case _ => throw
	}
}
 
rdd.reduceByKey(new myPartitioner(5), _+_)

The equal function lets Spark know whether two partitions are equivalent. Scala makes little distinction between val fields and zero-argument methods so we can override

abstract numPartitions(): Int

with

val numPartitions: Int

PySpark doesn’t use Partitioner objects, just a partition function.

Avoid groupByKey if you can, MapReduce in Spark without a combiner is essentially flatMap groupByKey flatMap. We are trying to do better than MapReduce.

Reduce (MapReduce)

  • List
  • KVP are partitioned and shuffled by Partitioner
  • Reduce job calls reduce on keys in sorted order

reduceByKey (Spark)

  • RDD RDD
  • Less flexible, but does what reduce should normally be used for
  • Reduces before shuffle (combiner)
  • Reduces after shuffle (reducer)

combineByKey gives more fine-grained control than reduceByKey

RDD.combineByKey(create, append, merge) RDD

  • Create - make a from a
  • Append - take a and add a to it
  • Merge - combine two

reduceByKey(reduce) calls combineByKey(identity, reduce, reduce)

Question

What is C?

In CS135 we called these variables accumulators. Spark calls them collectors.

aggregateByKey is between reduceByKey and combineByKey

RDD.aggregateByKey(zero, append, merge) RDD

  • Zero - initial value [Type ]
  • Append - take and add a to it
  • Merge - combine two

Instead of initialize function, there is a “zero” value provided. The Collector is initialized by appending the first value encountered into the zero value.

In Scala, the function has been partially curried, converted from one function with 3 parameters into a function with one parameter that returns a second function with two.

Instead of calling it as myRdd.aggregateByKey(zero, append, merge), you call it as myRDD.aggregateByKey(zero)(append, merge).

Question

Why?

Type inference is done simultaneously for all parameters so it’s not possible for it to infer what type is from the zero value and use that to infer types for append / merge.

Python doesn’t need this as it is dynamically typed.

Spark’s reduceByKey is not like the reduce phase of MapReduce.

reduceByKey partitions the RDD, then reduces each partition, then shuffles for a final reduce. Repartition triggers shuffling but it gives moire balanced partitions. It can be used to increase or decrease the number of partitions. Coalesce can be used to only reduce the number of partitions. It avoids full shuffling so it is faster than repartition but it may give unbalanced partitions.

sc.textFile("movies.csv").
	map(_.split(",")).
	map(list => (list(1).toInt, (lst(2).toDouble, 1))).
	reduceByKey({case ((s1, c1), (s2, c2)) => 
						(s1 + s2, c1 + c2)}).
	mapValues({case (sum, cnt) => 
					sum / cnt}).
	coalesce(1).
	saveAsTextFile("averages")
sc.textFile("movies.csv").\
	map(lambda line: line.split(",")).\
	map(lambda lst:
		(int(lst[1]), (float(lst[2]), 1))).\
	reduceByKey(lambda p1, p2:
		(p1[0] + p2[0], p1[1] + p2[1])).\
	mapValues(lambda pair: pair[0] / pair[1]).\
	coalesce(1).\
	saveAsTextFile("averages")
  1. Text file loaded and partitioned
  2. Map is applied to existing partitions (split the lines)
  3. Map is applied to existing partitions (extract useful fields)
  4. reduceByKey triggers a repartition based on the keys
  5. Map is applied to the new partitions (convert (sum, count) to sum / count)
  6. Coalesce merges data into 1 partition

Next Module: Analyzing Text (Language models, natural language processing, information retrieval / search)

We start with analyzing text.

is the probability of encountering the sentence

Question

What good is this?

  • Machine Translation
  • Spell Checker that’s not fooled by homophones
  • Speech Recognition

We want to be able to take a sentence with words, and assign a probability to it. This lets us rank alternatives.

How LLMs work:

  1. Given obtain probability distribution for
  2. Sample word from distribution
  3. Repeat until you generate the special “stop” word

The tricky part is generating the probability distribution. There are a lot of different sampling techniques to choose from.

Lecture 8

Probabilistic model

Chain rule - similar to PMI

Question

Can we actually use this? Is this reasonable? How long is a typical sentence

Short answer, no.

The size of a sentence is unbounded. If we set the sentence length to max of 20, let’s say there are 100k commonly used English words,

LLMs work this way, but they don’t actually base the conditional probability of the next token on ALL of the surrounding context.

Smaller Limit: -Gram

Basic idea: Probability of next word only depends on the previous words.

: Unigram model -

: Bigram model -

Google uses -Grams for suggestions ( is small).

We can do this with Hadoop.

  • Unigram:
  • Bigram: .

Example

Bigrams

# I am Sam $
# Sam I am $
# I do not like green eggs and ham $
CountsProbabilities
| Sam

The probability .

Recall that means count of all pairs that start with b.

Example

Bigrams

P(I like ham)
= P(I | #) P(like | I) P(ham | like) P($ | ham)
= 0

means sentence is impossible. This is not true (probably).

If a single -gram in the sentence has never been seen, .

Just one unusual word takes a sentence from “likely” to “impossible”. That’s a “discontinuity”.

Removing s makes the distribution “smooth”.

Question

How can we remove s?

Laplace Smoothing

Start each count at 1, not 0.

Laplace smoothing (bigram probabilities)

Question

What is V?

Vocabulary size. Since every pair of words has a , we need to add to .

Question

Shouldn’t it be ?

No, for cooccurrence we did not count the pair , but for bigrams we do.

Other smoothing techniques

  • Good-Turing - Used by Good and Turing as part of cracking enigma
  • Katz backoff - “backoff” means if -grams and unigrams
  • Dirichlet Smoothing, Witten-Bell - ways to pick
  • Kneser-Ney - current best practice
    • Google used to use this for Google Translate.

Hidden Markov Model (HMM) is used a lot in NLP.

Transformers

Question

ChatGPT can do 8k, 16k, even 64k context, how is it not all zeroes?

It is a transformer neural network, not a table of -gram counters. Words will have latent probability - no smoothing ended.

Self-attention in a context of 4000 words, only some words are important.

Topic Shift: Searching (Information Retrieval)

The central problem in search. Query terms and document terms should match, but language is hard.

Abstract IR architecture

flowchart TD

a[Query]
b[Representation Function]
c[Comparison Function]
d[Hits]
e[Documents]
f[Representation Function]
g[Index]

a ---> b
subgraph online
	b -- Query Representation ---> c
end
c ---> d
e ---> f
subgraph offline
	f -- Document Representation ---> g
	g ---> c
end

Representation matters. Computers can’t understand, we need to tell them what relevant means. Assumption: terms are independent, relevance is irrelevant, the concept of a “word” is well defined.

We will stick to English words. The tokenizer will remove punctuation, and the case folding will treat things as lowercase and put unicode into canonical form.

A representation is often called an embedding. You take a high dimensional object, and embed it in a lower-dimensional plane.

The distance between embeddings: cosine

The distance between words: semantic similarity

Goal for embeddings:

PMI is a great way to estimate the semantic similarity of words. PMI gives similarity measures similar to cosine similarity. PMI terms are uncorrelated aka orthogonal. terms are uncorrelated aka orthogonal.

Inverted index maps context to documents. Forward index maps documents to context.

Scaling assumptions

  • Queries are small
  • Postings are not

Vocabulary Size: Heap’s Law

is vocabulary size, is collection size (number of documents), and are constants.

Typically, is between 30 and 100, and is between 0.4 and 0.6.

Heap’s Law: Linear in log-log space.

We can save space in our postings list by only storing the document numbers. This saves a lot of space since most terms do not appear in most documents.

Postings Size: Zipf’s Law

is number of elements, is rank, is characteristic exponent.

Zipf’s Law: is also linear in log-log space.

Few elements occur very frequently, many elements occur very infrequently.

Lecture 9

Map

  • Input (docid: doctext)
  • Output (term: (docid, freq))
    • Can add metadata

Reduce

  • Input (term : Iterator[(docid, freq)])
  • Output (term : Postings List)
def map(docid: Long, doctext : String):
	counts = Counter()
	for term in tokenize(doctext):
		counts.add(term)
	for term, freq in counts:
		emit(term, (docid, freq))

We can assume each document has only a few million unique terms, so the counter will easily fit.

def reduce(term: String, postings: Iterator[(Long)]):
	p = list()
	for docid, freq in postings:
		p.append((docid,freq))
	p.sort()
	emit(term,p)

How big does get? Zipf’s law says usually small, sometimes not small. Sorting is which is not ideal if is large.

Question

Why not use Spark?

You can. Map your tokenizer + wordcount to get (docid, Map[term,freq]) pairs. flatMap to get ((term, docID), freq) pairs. repartitionAndSortWithinPartitions to do the secondary sort pattern. mapPartitions to build postings list - logic is the same.

Delta Compression: Zipf’s Law works for us now. If a term is rare, there are not many postings. If a term is common: The average delta (docid docid) is small

If a sequence is ascending, you can instead write down only the delta between elements,

This does not save anything if our output is int, but there exist other datatypes.

Variable-width integer type (VInt). Uses 1-5 bytes to represent an Int (same range as 4 byte fixed width). VInt saves space when you have many packed together. ArrayWritable doesn’t pack them together.

Question

How many ways can we divide up 28 bits?

Simple-9.

Variable Length Quality (VLQ)

How it works: Slice number into septets. Use high bit to indicate continues.

Example

767 0010 1111 1111 00001011111111 10000 101 0111 1111

VInt (or VLong) works at the Byte level. What about the bit level?

Elias code

Assumptions:

  • Natural numbers with no upper bound (like counts, for example)
  • Small numbers are more common than large numbers

Encoding :

  1. Let
  2. Write 0s
  3. Write as an bit number

Decoding :

  1. Read 0s until a 1, call this
  2. Interpret next bits as a binary number
    1. Including the 1

This does well for term frequencies (how many times the term appears in the document). Does OK for gaps too.

The Elias code assumes the values are distributed by a power law. Most numbers should be small, or it doesn’t save any space.

Question

What do you think the distribution of gaps looks like if you have document IDs and a term that matches documents?

It is clearly a Poisson distribution (STAT231), with

The overall DF can exceed , and sampling is done with replacement.

Golomb Code

For encoding a positive integer . Quotient and remainder when divided by .

gets encoded in unary ( 0s, then 1)

gets encoded in truncated binary.

Each term gets its own custom encoding sheet.

Truncated Binary:

For numbers :

First codewords: First codes with length .

Last codewords: Last codes with length .

It is best to treat ID and frequency as independent.

There is also an “Exponential Golomb Code” that uses an Elias Gamma code to write the quotient instead of using unary.

Question

MapReduce it?

The indexing problem is good for MapReduce. Scalability is critical, must be fast, a batch operation, incremental updates might be important.

MapReduce is not very good for the retrieval problem. Must have sub-second response time. However we still want to distribute it.

Boolean Retrieval: Query is formed using Boolean operators (). Hits: set of documents for which the query is true.

To execute a boolean query, build a syntax tree in Polish Notation (who said LeetCode wasn’t helpful).

For each clause, look up postings.

Traverse postings and apply Boolean operator.

For each term, generate sets of documents.

Term-at-a-time: Compute one operation, take the result and carry it onwards.

Document-at-a-time: For each document, see if it passes the query. Since documents are in sorted order, modified merge operation will work.

Repeat: For the smallest “next doc” - does it match?

Our index file is partitioned. We probably want to distribute lookup, even if it’s not a MapReduce task we use. Options:

  • Leave the index partitioned by term
    • Each index file has the complete collection for a subset of terms
  • Repartition by document ID
    • Each index file has a subset of the collection, but the index for all terms.

Document indexing is much nicer.

A set of hits is fine, but we probably want them to be sorted by relevance. That is a different problem: Ranked Retrieval.

Requires: relevance function

A query “X Y Z W” might yield a high Relevance value even if a document only contains terms “X” “Y” “W”.

Ranked Retrieval: Simplify the query, not only a list of terms. Need a way to weight a hit.

Question

Can we just do Boolean Retrieval and then sort by relevance?

No.

Terms that occur many times in one document should have high weights for that document. Terms that occur in many times in the entire collection should have low weights.

We need:

  • Term frequency
  • Document frequency

Term Frequency and Inverse Document Frequency (TF-IDF)

  • - weight (relevance) of term in document
  • - number of occurrences of term in document
  • - number of documents containing term
  • - total number of documents

The relevance of a document is the sum of the values.

Document-At-a-Time:

For each document:

  1. Score each query term, and add them all up
  2. Accumulate best hits
    1. A min-heap is good for this

PRO: Time is memory is - is probably a constant

CON: Can’t terminate early, must look at a whole document collection.

Term-At-a-Time:

  1. Collect hits and ranking for rarest term into accumulator
  2. For each other term in the query:
    1. If a document does not have that term, remove from accumulator
    2. Otherwise, add next term’s ranking to overall ranking

PRO: Can have early termination heuristics, will not normally need to traverse all documents.

CON: Uses a lot of memory.

Lecture 10

Question

What is a graph?

Same definition from CS341. (set of vertices and edges).

Direction:

  • Undirected - edge implied edge
  • Directed - edge does not imply edge

Edges may or may not be labelled. Numerical labels are called weights.

Most graphs are sparse. Number of edges is closer to than it is to .

In the past we have seen three representations:

  1. Adjacency matrix
  2. Adjacency list
  3. Edge list

Adjacency Matrix: An matrix . iff there is an edge between and .

Pro:

  • Useful for mathematical operations
  • CUDA loves matrix operations
  • Fast neighbourhood check (in and out)

Cons:

  • Mostly 0s in a sparse graph (wasted space)
  • to find neighbourhood of (in and out)

Adjacency List: Row is a list of the out-neighbours of vertex . Now it is a stripe, with the key as the vertex label and value as the sparse adjacency vector.

Pro:

  • Smaller than the matrix
  • Even easier to find out-neighbours of

Con:

  • Difficult to find in-neighbours of
    • Have to search all other adjacency lists

Edge List: Just a list of all of the edges.

Pro:

  • Easy to add an edge (just append)
  • Simple

Con:

  • Hard to find neighbours
  • Hard to find anything

The web looks like 4.77 billion - 50 billion pages (vertices), and 100 billion - 1 trillion links (directed edges).

This is big. Power laws again: .

Lots of pages have 0 or 1 in-links. Fibonacci binning is like regular binning but the bin size increases by the Fibonacci sequence. Use this instead of linear binning for plotting in log-log space.

The most common out-degree is 1.

Many graph algorithms involve:

  • Local computations for each node
  • Propagating these results to neighbours

Question

Which representation fits the best?

Adjacency list makes the most sense.

Question

How do the local computations work?

Local computation means independent, sounds like a map-like task.

Question

How does the traversal work?

The pairs are “messages”. The key is the intended recipient, the value is the content of the message. Propagation: Shuffle - collecting the propagated messages is reduce-like.

It is reduce-like as sending messages to neighbours is a non-local computation. If we override the hash partitioner we can try to partition in such a way to minimize the number of inter-partition edges.

Single-Source Shortest Path

Problem: Find the shortest path from a single node (source) to all other nodes. Shortest might mean fewest links, or lowest total weight.

Dijkstra’s Algorithm (again)

Step 1:

  • Set all nodes as unvisited, with
  • Set source node’s to

Step 2:

  • Let be the unvisited node with lowest distance
  • For all out-neighbours of :

Instead of Dijkstra on MapReduce (it is not parallel because the minimum is not local), we do parallel breadth-first search (pBFS).

Simple case - unweighted graph.

We want the fewest “hops” from source to destination.

Inductive definition:

  • if there is an edge from to

Implementation on Hadoop

Keys - Node

Values - ( - distance to , adjacency list of )

Mapper:

  • for in adjacency list, emit
  • also emit

Reducer:

  • Update distance to node based on messages from mapper

To iterate, the output of the reducer gets passed to another (identical) job as the input.

But we also need to emit the adjacency list.

def map(id, node):
	emit(id, node)
	for m in node.adjList:
		emit(m, node.d + 1)
 
def reduce(id, values):
	d = float("inf")
	node = None
	for o in values:
		if isNode(o):
			node = o
		else:
			d = min(d,o)
	node.d = min(node.d, d)
	emit(id, node)

Node is a Writable object with 2 fields:

  • d - distanceFromSource
  • adjList - list of IDs for the out-neighbours of the node

With Hadoop MapReduce, the Value type can be ObjectWritable, and you can use reflection to see what kind of object it actually is.

Improvement: A “last updated” field - in iteration , if id node was not last updated in iteration , do not emit messages from the mapper.

The most important part of recursion is knowing when to stop. The second most important part of recursion is knowing when to stop.

Question

When should this iteration stop?

If no node’s distance gets updated, then the next iteration will send the same messages.

All the nodes with distance of are “visited” in iteration .

Pseudocode with weighted edges

def map(id, node):
	emit(id, node)
	for m in node.adjList:
		emit(m.id, node.d +m.w)
 
def reduce(id, values):
	d = float("inf")
	node = None
	for o in values:
		if isNode(o):
			node = o
		else:
			d = min(d,o)
	node.d = min(node.d, d)
	emit(id, node)

You can still update a node after you first discover it (fuzzy frontier). Stopping condition is unchanged. Stop when there are no more changes. The number of iterations ended for an unweighted graph is the length of the longest “shortest” path. For a weighted graph, it is the length of the longest cycle-free path.

Dijkstra only investigates the lowest-cost node on the frontier. Parallel BFS investigate all paths in parallel. It’s a simple optimization to restrict the frontier. But we still need to send the adjacency list back and forth,

The issue with MapReduce iteration is that everything is written to HDFS, then loaded again. We must send the entire graph structure to the reducers each iteration, only to have them send it back again.

MapReduce

flowchart TD
a[(HDFS)]
b[(HDFS)]
a ---> map
map ---> reduce
reduce ---> b
b ---> map

Spark

flowchart TD
b[Adjacency Lists]
c[Start Node]
d[join]
e[flatMap]
f[reduceByKey]
g[Updated Nodes]
h[Stop?]

id[(HDFS)] --- b
b ---> d
c ---> d
d ---> e
e ---> f
f ---> g
g ---> h
h ---> id[(HDFS)]
h ---> d

Although there are two reduce-like transformations (join and reduceByKey) they have the same keys so there will be narrow dependencies. Conceptually, each node receives here’s your best path, and then announces this to its neighbours.

Web contains many sources of truth. Who do we know to trust? Not all web pages are equally important. There is large diversity in the web-graph node connectivity. We can rank the pages by the link structure.

PageRank: The “Flow” Formulation

Idea: Links as votes. Page is more important if it has more links. Think of in-links as votes.

Not all in-links are equal. Links from important pages count more. This is a recursive question.

Each link’s vote is proportional to the importance of its source page. If page with importance has out-links, each link gets votes. Page ‘s own importance is the sum of the votes on its in-links.

We define a rank for page to be

Flow equations:

3 equations, 3 unknowns, no constants. There is no unique solution, all solutions equivalent modulo to the scale factor.

Additional constraint forces uniqueness:

  • Solution

Gaussian elimination method works for small examples, but we need a better method for large web-size graphs. Need a new formulation.

Stochastic adjacency matrix . Let page have out-links.

If , then else .

is a column stochastic matrix, where the columns sum to 1.

yam
y
a
m

Question

How do we solve this?

Power iteration:

  • Set
  • Goto to second line

Example:

Random walk interpretation

Imagine a random web surfer. At any time , surfer is on some page . At time , the surfer follows an out-link from uniformly at random. Ends up on some page linked from . Process repeats indefinitely.

There is not actually a random walker. We construct a distribution of what page the random walker may be on.

Let vector whose th coordinate is the probability that the surfer is at page at time . So is a probability distribution over pages.

Question

Where is the surfer at time ?

Follows a link uniformly at random :

Suppose the random walk reaches a state , then is stationary distribution of a random walk.

For graphs that satisfy certain conditions, the stationary distribution is unique and eventually will be reached no matter what the initial probability distribution at time .

A graph is ergodic if there exists a distribution such that for all initial distributions

Question

What is required to be ergodic?

It must be connected and not bi-partite. Connected is obvious, not bi-partite is less obvious.

In an ergodic graph, no matter the initial distribution, a random walk will always converge on a unique stationary distribution.

Necessary and sufficient conditions:

  • The graph must be connected
  • The graph must be vertex aperiodic
    • the length of all cycles involving must have a GCD of 1

PageRank: Three questions

Question

Does this converge?

It does not converge in the simple bipartite connected graph with two vertices. In a bi-partite graph, the two partitions will have , as their total importance in the initial distribution, and will vacillate between and . That’s because each node in partition 1 will send all of its importance to partition 2, and vice versa.

In terms of a single walk, instead of a probability distribution, the walker will alternate between which partition they are in.

Question

Does it converge to what we want?

No it doesn’t.

Lecture 11

PageRank Problems:

Some pages are dead ends (have no out-links). Random walk has “nowhere” to go to. Such pages cause importance to leak out. Not connected not ergodic.

Spider traps (all out-links are within the group). Random walker gets stuck in a trap, and eventually the spider trap absorb all importance. Not connected not ergodic.

Solution: Teleports

The Google solution for spider traps: At each time step, the random surfer has two options.

  • With probability , follow a link at random
  • With probability , jump to some random page
  • Common values for are in the range 0.8 to 0.9

Surfer will teleport out of spider trap within a few time steps.

Solution: Always Teleport

Teleports: Follow random teleport links with probability from dead-ends. Adjust matrix accordingly.

Question

Why Teleports solve the problem?

Why are dead-ends and spider traps a problem, and why do teleports solve the problem?

Spider-traps are not a problem, but with traps, PageRank scores are not what we want. Solution: Never get stuck in a spider trap by teleporting out of it in a finite number of steps.

Dead-ends are a problem. The matrix is not column stochastic, so our initial assumptions are not met. Solution: Make matrix column stochastic by always teleporting when there is nowhere else to go.

Solution: Random Teleports

Google’s solution that does it all. At each step, random surfer has two options. With probability , follow a link at random. With probability , jump to some random page.

PageRank equation [Brin-Page, 98]

This formulation assumes that has no dead ends. We can either preprocess matrix to remove all dead ends or explicitly follow random teleport links with probability from dead-ends.

PageRank with MapReduce: No random jumps and no dead-ends.

Map Phase - Each node “sends” its importance to its out-links.

Reduce Phase - Each node sets its new importance to the sum of the received values.

def map(id, n):
	emit(id, n)
	p = n.rank / len(n.adj)
	for m in n.adj:
		emit(m,p)
 
def reduce(id, msgs):
	n = None
	sum = 0
	for o in msgs:
		if o is Node:
			n = o
		else:
			s += o
	n.rank = s
	emit(id, n)
PageRankBFS
Map
Reduce

Missing two things:

  • Random jumps
  • Dead End Jumps (always)

Question

How can we add these?

Every node has the same chance of jumping:

If it jumps, it jumps to any random page ( for a particular page). Only need to change reducer side.

New reducer

def reduce(id, msgs):
	n = None
	sum = 0
	for o in msgs:
		if o is Node:
			n = o
		else:
			s += o
	n.rank = s * β + (1 - β) / N
	emit(id, n)

Dead-ends send all their weight everywhere, instead of only some of it.

Option 1: Replace dead-ends with “links to everyone, everywhere” (no changes to the code, a lot of messages)

Option 2: Post-process to redistribute “missing mass” (avoids making -degree nodes)

Option 1 is really bad.

Post-Processing

  • = sum of all ranks
  • = missing mass

Add to all nodes.

This is easy with Spark, we can use a double accumulator.

We have to do an extra pass. Map (send mass) Reduce (compute new mass) Map (add missing mass)

Or, we can do Map (add missing mass and then send mass) Reduce (compute new mass).

Only need one extra map as the final step. After iteration finished, add missing mass one last time. Or, we can just leave it missing. If dead ends have lost some mass, we’re just spreading that evenly across all nodes. This won’t change rankings.

You can move the “post processing” map into the “next map”.

Alternative is that mappers can send:

  • node.rank divided evenly amount of out-links
  • special case: dead-ends send entire rank to “everyone”

Reducer adds “everyone” rank to each sum.

However, you must send one everyone per reducer per node. That’s a lot of messages.

How do we send to all reducers?

for i in range(numReducers):
	emit(EVERYONE, (i, missingMass))

Partitioner, if key is EVERYONE, will send this to reducer i.

Small problem: billion. The individual masses will be small. Solution: store the logs.

Log Masses

Ranks are in [0,1].

Mathematically, of all float values are also in this range.

It also lets us store much smaller numbers without underflow. Most pages will have a very small but non-zero PageRank so this is important.

Problems with MapReduce

  • Sending adjacency list with each iteration
  • Needless shuffling
  • Needless filesystem access
  • Verbose programs
  • Each iteration is a new job

So we go to Spark.

Old Search Ranking - TF and DF and logarithms.

Flaw: Term Spam

Solution is to trust what others say about you, now what you say about yourself. Use link text and surrounding text as terms instead of contents of the page.

Another problem is forum spam / comment spam. Going to every page that allows posting and adding a link to your webpage. Since they have a high rank you will have a high rank.

Spam Farming Techniques

Spider traps accumulate rank. Random jumps prevent them from accumulating all ranks, but it’s still boosted by the topology.

Technique:

  • Page you want to promote has millions of hidden links to farm pages
  • Farm pages all link back to the page you want to promote

Random jumps mean that the spider trap gets incoming rank even with no external links. The more websites you can insert links into, the better.

Solution to link spam - ignore links tagged as “nofollow”. Convince forums, news sites, etc. to insert “nofollow” to all links posted in comments.

Added benefit: A researcher can link to a page and not boost its ranking. When , this is not solved. The rank is directly proportional to .

How do we solve the link farm problem.

Idea: Collect a sampling of webpages (seed pages). Oracle (Human) sorts the trustworthy from the spam. This is expensive, keep the set as small as possible.

Use the “good” pages as the source nodes for personalized page rank.

Small change: Each page in trust set is initialized to 1: Trust sums to instead of to 1.

After iteration, all pages have a trust factor of between 0 and 1. Pick a threshold and mark all pages below that as spam.

Trustworthy pages mostly only link to other trustworthy pages. Spam pages mostly only link to other spam pages. By only teleporting to known good pages, only the “good” partition accumulates significant trust.

The more seed pages there are, the most time and effort needed to curate. The fewer seed pages, the less trust there is in the system.

Need to pick seed pages that are highly likely to point to “most” of the other good pages.

Bootstrapping trust: If your seed set is small, will miss a lot of trustworthy sites. But, anything that you find is probably pretty trustworthy.

  1. Run PageRank
  2. Select top pages as seed
  3. Run TrustRank
  4. Set threshold low enough to avoid false positives
  5. Remove spam pages from graph
  6. Goto Step 1

Alternative - Spam Mass

= PageRank of Page

= PageRank of Page , but random jumps only lead to Trusted pages

= Contribution of “low trust” pages to ‘s rank

= Spam Mass (fraction of ‘s rank that’s from low trust pages)

The higher your Spam mass, the more likely you are to be spam.

Topics also matter. Instead of the teleport set being all pages, it’s all pages on a given topic .

Lecture 12

Data Mining

Analyzing a large dataset in order to gain actionable insights (whatever this means).

Supervised Machine Learning

Classification Functions:

  • Input: Object
  • Output: Label(s)

In supervised machine learning, you provide a training set, where emails have been classified as “SPAM” or not, then math happens, and out comes a model that can classify things.

Objects are represented as a vector of features:

  • Dense features: Sender IP, timestamp, etc.
  • Sparse features: Subject contains “URGENT”, etc.

For the same object, each application will have its own set of features. Selecting good features is important.

An embedding is obtained by machine learning.

Example

word2vec converts a word to a 348 dimension vector

Goal: King - Man + Woman Queen

LLMs train the embedding (encoder/decoder) as well as the language model concurrently or alternating. Diffusers like StableDiffusion do as well.

An embedding is a specific kind of dense feature vector. One where each dimension is NOT a distinct feature, but one where the vector was learned by some kind of Deep Learning model.

ML solutions look like:

  1. Acquire data (clean it)
  2. Label data (by hand)
  3. Determine features
  4. Pick an architecture
  5. Train the model
  6. Repeat as needed

There are many architectures. Naïve Bayes, Logistic regression, SVM, Random Forest, Neural Networks etc.

Simple Spam Filter (Idiot’s Spam Filter)

  1. Put emails 1 per line in a text file, with SPAM/NOT label
  2. Do a modified word count (keep two counts)
  3. Do some math
    1. (spam)
    2. ()
    3. ( | spam) =
  4. Bayes’ Theorem: (spam | ) = (spam) ( | spam) ()

Acquiring high quality, clean, and labelled data is the most important part.

To label by hand at scale, need to crowdsource, bootstrap, exploit user behaviours.

Binary classification: Label is a single binary value. Yes or no. Spam or Not Spam.

You can make more nuanced classifiers out of binary classifiers.

We can use binary classifiers to create a classifier tree, if the labels are not mutually exclusive.

Binary Classifier Cascades aren’t the only sort of AI.

Training a classifier

. : Training data, : feature vector, : label

We want to find . A function that maps feature vectors to labels

But not any , an that minimizes “entropy” or “loss”).

Parameterization. There are lots of functions, we’d rather have a parameterized function

or

and we are trying to find argmin .

There probably isn’t a closed form solution to this so we use gradient descent.

Remember Newton’s Iteration.

  1. Get the slope at point
  2. Compute the intercept for the tangent line at point , call it
  3. Repeat

Gradient descent is the same thing, it just has many dimensions not just .

For each component of :

  1. Compute
  2. Compute “downhill” (like Newton’s Method with many dimensions)
  3. Compute new

To find

So at any point ,

Gamma is our step (learning rate), or how far we move along the gradient. .

This inequality is for sufficiently small gamma.

Gradient descent is first order. It has local linear approximations and slow convergence. To avoid local minima we need momentum, stochastic gradient descent (randomly selecting a subset of training data). Can still get stuck.

Boolean Classifiers

The parameters to optimize are the weight vector . is used as the abstract view of “The Model”, vector is the concrete realization.

Loss

Loss is 0 if prediction is right, 1 if prediction is wrong.

Problem: Not continuous, not differentiable. Gradient descent doesn’t work.

Solution is a surrogate loss function.

Linear function

Where is the predicted value, is the true value.

The cost is unbounded.

Sigmoidal functions are nice

Prediction is between 0 and 1. Loss is between 0 and 1.

Continuous and differentiable: .

Now we can do gradient descent.

A better loss function to use here is Binary Cross Entropy (BCE).

Giving the gradient

Lecture 13

Gradient Descent on Hadoop

MapReduce is bad at iteration. Gradient descent must have only one reducer (bottleneck).

Spark to the rescue.

val points = spark.TextFile(...).map(...).cache()
var w = ... // initial reaction
for (i = 1 to Iterations) {
	val gradient = points.map{ p => {
		val score = p.x * w
		val y = 1 / (1 + exp(-score)
		p.x * (p.y - y)}}.reduce((a,b) => a+b)
	w -= gradient * LR
}	

This is really better. We cache data, no shuffles, driver only does one final reduce.

Stochastic Gradient Descent

Pick a random subset, only compute gradient for that subset of values. If the subset is small enough for one machine why have the cluster?

If we have mappers. Have each compute the gradient for its subset, then transform independently?

Ensemble Learning

  1. Train multiple independent models
  2. Combine predictions
    1. Voting
    2. Merging models

You can’t always merge models, but for a simple linear / sigmoidal classifier function, averaging the weights is probably okay.

Other kinds of models are not so easily merged. You can merge neural networks sometimes if they’re all finetunes of the same base model.

Ensemble learning works because errors won’t be correlated, less likely that most models are all wrong, reduced variance.

Online stochastic gradient descent as ensemble. We select elements in random order and compute the gradient one training value at a time.

In MapReduce, each mapper holds the current parameter set in memory. Trains model by computing gradient for one element, updating parameters (in map).

If we only want one model, the mappers become parsers. The reducer becomes the learner by creating the model, training the model, and emitting the model. No more ensemble learning.

In Spark, we can do model = LogisticRegressionWithSGD(data, iterations, step)

How do we evaluate the model? It’s not enough to minimize loss. Loss is a measure of how closely it matches the training data. An overfitted model can have 0 loss, but still be useless.

If there is too much training overfitted model poor results. But there is no way to know how much is too much except to compare results.

Epoch: 1 pass through the training data

Checkpoint: Save the current model to disk.

Usually there is a checkpoint every epochs (the smaller the the better, but also the more HDD space you use).

Metrics: Actual vs Predicted

PositiveNegative
PositiveTrue Positive (TP)False Positive (FP) = Type I errorPrecision
NegativeFalse Negative (FN) = Type II errorTrue Negative (TN)Miss rate
Recall or TPR Fall-Out or FPR

Improving one can harm the other.

ROC: true positive rate vs false positive rate

PR: Precision vs Recall

ROC represents: As you vary the threshold, how many more false positives slip in, and how many more true positives are successfully found.

Area under the curve is ROCA. This area represents threshold invariant accuracy.

Big data isn’t enough. The less data we train on, the worse our model. The less data we test on, the less we trust our model.

fold cross validation. Divide data into subsets. Repeat holdout method times. Each time, one subset is the test set, the rest are the training sets.

Each set is used to validate once. Each set is used to train times.

Question

Why cross-validate?

This kind of ML is “offline” - when the technique is validated, then it can be deployed on live data in real-time. Cross-validation gives you confidence that the technique will work on typical data sets.

A/B testing for ML.

Step 1: Offline training and evaluation.

Step 2: A/B testing vs other methods. Return to step 1 if needed.

We can use this to compare different feature selection methods ore compare to current best-practice models.

A working model on the training set that has good precision and recall is half the problem. The rest is production.

Lecture 14

Sometimes the feature vector is still too high-dimensional to work with.

To reduce the dimensionality of a set of -grams: Hash them modulo some large prime (but much smaller than the original number of dimensions).

On the assignment:

1M << 4B : collisions are rare enough to ignore.

Near Neighbours Problem

  • - set of object
  • - distance from object to
  • - maximum distance threshold

Goal: Find all unordered pairs such that

Template Based Protein Folding

Given a sequence, find proteins in database with a similar sequence. This is not as easy as hamming distance, not all mutations are equal.

Given high-dimensional datapoints and some distance function : Find all pairs of datapoints such that .

The naïve approach: just compute for all . is possible to achieve.

Question

How can you find all identical items in a collection of ?

Hash table - insert is - only need to compare collisions, not all pairs.

This is assuming we have a low collision rate. With sufficiently large table, we can make the number of expected collisions sub-linear.

Locality Sensitive Hashing (LSH)

For , such that

Normal Hash Function: If you know and : No idea what is

LSH Hash Function: If you know and :

Items that are close have hash codes that are close (on average).

Scalar LSH to find Near Neighbours

  1. Make a hash table
  2. Use buckets that are wide (and overlapped, so items go in multiple buckets)
  3. Most values such that will have at least one bucket in common.
    1. Most values will not share a common bucket.

So we need to create a LSH function.

Question

How do -grams give us a distance measure?

Jaccard distance. This is used for sets (which we have).

A document embedding is the set of -grams it contains.

Question

What do we use?

Depends on if we are doing byte-level or word-level -grams.

Jaccard for Multisets. Just adjust the definitions of union and intersection.

Union: If then

Intersection: If then

the sum of the counts for all keys in .

-gram embeddings are usually regular sets, not multisets.

Remember a set can be represented as a bit vector.

Assign natural numbers to the elements of the Universe set. Bit vector at index is 1 if the set contains element . Benefit: union/intersection are bitwise or / bitwise and.

LSH - A hash function such that:

  • If and are highly similar, then with high probability:
  • If and are highly dissimilar, then with high probability:

Different approach is needed for each definition of similarity.

For Jaccard distance, we use min-hashing.

Two views of min-hashing

Let be a set of integers (each -gram has been numbered). Imagine you have a random permutation . .

Let be a set of -grams (-tuples of strings). Imagine you have a random enumeration function . .

Question

What is an enumeration function?

It’s a bijection that maps some set onto the range (MATH239).

Question

So are we permutating or enumerating?

Yes

Permutating assumes the -grams have already been enumerated and represented as a bit-vector.

Enumerating assumes they are still a set of string tuples and we are assigning each -gram an integer.

It is easier to create a random permutation than it is to create a random -gram enumerator. Being mathematically equivalent isn’t the same thing as being computationally equivalent.

Collection as a matrix

  • Row = elements of a set (-gram)
  • Column = one set (document)
  • 1 in -gram is in document

Next objective: Compute a signature for each column such that |sig| |col|. Ideally, column similarity = signature similarity.