Distributed Thinking: A gentle introduction to distributed processing using Apache Storm and Apache Spark - Part 2
Welcome to Part 2 of Distributed Thinking! Here I discuss about How to look at data in a distributed environment. In case you missed anything, click here for Part 0.
1. Distributed Thinking
1.3 How to look at data?
As discussed before, our primary goal is to transform the data into the form that we need. Let us take a classic example of counting lines in a text file, starting with a simple version using Java:
This is a simple transformation where the Data Source DS
is the file in args[0]
, the connector is the try
block with a BufferedReader
and the Extract XT
method is the br.readLine()
method.
We filter the data by checking if the line read is null
or not. The Transform
process TR
merely counts all lines that aren’t filtered out. It Transforms
a line into the number 1
and adds it to the count.
The Load method LD
is the System.out.println
method which prints the lineCount
onto the screen, which acts as the Data Sink SNK
here.
Let’s break this program into blocks:
The Connector
block is responsible for getting the data from the underlying system.
The Distributed Transformation Code
block takes the list of lines Line 0 to Line (N-1) and transforms it into a number 1
and then sums it up to N
, which is the number of lines in the file.
The Output
block is responsible for delivering the result to the output system, which may be a Data Sink DS
or the screen.
Let’s rewrite this program to look like the blocks:
Here you can see that the FileConnector
is responsible for getting the data from the underlying file and providing it for the Transformation
code. The Transformation
code is now in two methods. The first one is called transform1
which converts the String input
, which is the line, into the Integer 1
. The second one is called transform2
which adds the count to the dataSink
which is an AtomicInteger
in our case, but can be a Database or any other system.
Immutability and Side Effects
Here I need to bring up a very important concept called Immutability
. While there are several ways to explain it, this is the basic concept:
An Object
x
is said to be Immutable if there is no function F which can change its value.
i.e. there is noF(x) = y
where the value ofx
changes after callingF()
. In other words, you can apply F1(x), F2(x) … Fn(x) but the value ofx
does not change.
Let’s look at an example of an object that is immutable and an object that is mutable:
1
2
3
4
5
6
7
8
9
10
11
12
13
int x = 10
def f1(y) = {
y = y + 20
print(y) // Prints 30
}
print(x) // Prints 10
AtomicInteger i = 10
def f2(y) = {
y.incrementAndGet(20)
print(y) // Prints 30
}
print(i) // Prints 30
Here you can see that the integer x
is immutable. There is no method that can be called on x
that will change the value 10
. However, the AtomicInteger i
is mutable, and the method f2
changes its value each time it’s called.
This brings us to another important concept: Avoiding Side Effects in Functions.
A side effect of a function F is the changes that the function causes to the state of the system outside the function. Let us look at the following method:
1
2
3
4
5
AtomicInteger countA = 0
def containsA(x) {
if (x.contains("A"))
countA.incr()
}
Here the increment method containsA
modifies the value countA
, which is a side effect of the method. The called of the method does not have any control over the modifications that the method made. Why is this a problem? Look at the following code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def runSomething() {
for (String x : xList) {
int retries = 3
do {
try {
containsA(x)
doSomething()
break
} catch (e) {
retries--
}
} while (retries > 0)
}
}
Here the method runSomething()
loops through a list of Strings xList
and counts the number of As, then does something with doSomething()
. Now in order to ensure that every line in xList
is definitely accounted for, runSomething()
performs retries on failures. This retry-on-failure is a common feature of distributed systems in order to guarantee reliability in large clusters where there can be many causes of failures - nodes are down, network is down, data is corrupt, etc.
In this case if doSomething()
fails, the String x
is retried. Now if containsA
has found A in the String, it counts twice, thus leading to inaccuracy.
Let’s rewrite this code to avoid Side Effects:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
AtomicInteger countA = 0
def containsA(x): Int {
if (x.contains("A"))
return 1
return 0
}
def runSomething() {
for (String x : xList) {
int retries = 3
do {
int inc = 0
try {
inc = containsA(x)
countA.incr(inc)
doSomething()
break
} catch (e) {
countA.decr(inc)
retries--
}
} while (retries > 0)
}
}
Now no matter the number of times containsA
is called, the system remains unchanged, at the caller has the ability to now avoid side effects, thus making a more deterministic, reliable system.
Another important reason why we would avoid side effects is if the system depends on a global state that changes when a method is called, there is no easy way to replicate this change across all nodes of the cluster. Global counters on one machine do not hold the values of the counters in another machine.
To be able to process your code in a distributed way, the following pointers will help:
- Break the data into small immutable blocks which are only transformed from one immutable form to another immutable form
- Do not transform with side effects - avoid changing the state of the system in the transformations in order to allow retries and reproducible results in a distributed environment.
Often, the reason for an unreliable system yielding non-deterministic results (Different runs of the same data yields different results) is because either the data is mutable or the functions have side effects.
In the example above with the line counting, Transformation method transform2
is not free of side effects as it modifies the counter that is passed to it, which means that transform2
cannot be reliably used in a distributed environment.
In most problems, the final step of collecting
the data into the data sink is not straight forward. Hence, Spark and Storm both provide means to collect the data in such a way that all the results of the previous step is grouped together on one machine in the end. This step is also called Reduce
in the Map-Reduce
terminology.
How to write these collectors
and sinks
for a distributed environment will be discussed later.
In the next post, let’s look at a more complex example, Word Count. Click here to continue reading.
Click on any of the links below to go directly to any part:
Part 0: Introduction
Part 1: When and Where
Part 2: How to look at data
Part 3: How to look at data (continued)
Part 4: Processing Data Streams
Part 5: Processing Large Data Chunks