Pig lovers meet TOP

Have you ever needed to get the top n items for a key in Pig? For instance the most popular three items in each country for an online store? You could always solve this the hard way by calculating a threshold per country and then filter on that threshold. This is neither to write or execute. What you want to do is order the items by popularity per country and then limit to the top three for each country.

Pig has a builtin function that will help you with this, it’s called TOP. It is a combined order and limit and it can be used in a nested foreach. The parameters it takes are:

  • limit – the number of items to keep for each group
  • column number – the 0 based column to sort on
  • relation – the name of the relation to operate on

Read on for a detailed example of how TOP  can be used.

First we need some test data. Each line has a key and a score separated by tab. Let’s call the file test.tsv and put it in our hdfs home with hdfs dfs -put test.tsv.

a 1
b 1
a 2
b 2
c 1
c 2
a 3
c 3
c 4
a 4
a 5

Then we need to read and group the data by key. Here’s the pig code for that:

data = LOAD 'test.tsv' AS (key:chararray, score:int);
data_group = GROUP data BY key;

Running this and dumping the result gives us


Now iterate over that using a FOREACH and a nested TOP:

data_top = FOREACH data_group {
top = TOP(3, 1, data);

The output of this is:


Let’s flatten the top:

data_top = FOREACH data_group {
top = TOP(3, 1, data);
GENERATE flatten(top);

Dumping the result gives us


Since this is only sorted on the key, let’s also sort on descending score:

top_sorted = ORDER data_top BY key, score DESC;

The output after this is


Clearly we have only the top 3 items per key. In case of a, that means we have 5, 4 and 3. We only have 2 and 1 for b since b only has 2 scores. For c we have 4, 3 and 2.

The full pig script as tested on Cloudera’s pig version 0.12.0-cdh5.0.1

SET job.name 'nested top test';

data = LOAD 'test.tsv' AS (key:chararray, score:int);
data_group = GROUP data BY key;

data_top = FOREACH data_group {
top = TOP(3, 1, data);
GENERATE flatten(top);

top_sorted = ORDER data_top BY key, score DESC;
dump top_sorted;

Let’s call the script nested_top_test.pig. We run the script with pig nested_top_test.pig and observe the results.

Now you should have the very useful TOP function in your quiver for the next time you need to get the top items for each key in a relation. Have fun!


Crash course in Erlang

This is a summary of a talk I held Monday May 14 2012 at an XP Meetup in Trondheim. It is meant as a teaser for listeners to play with Erlang themselves.

First, some basic concepts. Erlang has a form of constant called atom that is defined on first use. They are typically used as enums or symbols in other languages. Variables in Erlang are immutable so assigning a new value to an existing variable is not allowed

1> A = 1.
2> A = 2.
** exception error: no match of right hand side value 2
3> A = 1.

The third statement shows that the assignment is actually a pattern match, but the first statement assigns a value to the variable.

Lists and property lists are central in Erlang. The lists behave much like lists in other functional languages or arrays in other types of languages. Property lists are a special type of list that is used as a hash or dict in other languages

1> A = [1,2,3,4,5,6,7].
2> lists:nth(1, A).
3> lists:nth(7, A).
4> lists:last(A).
5> length(A).
6> [Head|Tail] = A.
7> Head.
8> Tail.

The Erlang shell is useful and when defining a module, it can be compiled and run directly from the shell. Fibonacci numbers are an easy demonstration of the capabilities of a functional language.



fib(0) -> 0;
fib(1) -> 1;
fib(N) -> fib(N - 1) + fib(N - 2).

Compile and test

1> c(fib).
2> lists:foreach(fun(N) -> io:format("fib ~p = ~p~n", [N, fib:fib(N)]) end, [1,2,3,4,5,6,7]).
fib 1 = 1
fib 2 = 1
fib 3 = 2
fib 4 = 3
fib 5 = 5
fib 6 = 8
fib 7 = 13



Erlang has a very powerful messaging system. This system supports distributed messaging.

First, a simple message loop that isn’t really a loop at all. Calling loop:init() will spawn a separate process waiting to receive messages.

1> c(loop).
2> Pid = loop:init().
3> loop:ping(Pid).
4> flush().
Shell got {pong,<0.39.0>}
5> loop:ping(Pid).
6> flush().

The first time the loop is pinged, it replies pong, but the second time, nothing happens. When a message is received, the loop function will finish.


Message receive statements in Erlang may time out:

2> Pid = loop1:init().
I have decided to die

The message loop times out after 1000 millisecond and exits the function.

An actual message loop

Let’s convert the message handling function loop into a real loop through tail recursion. Tail recursion means this loop can run forever without the growing stack otherwise caused by recursion.

1> c(loop2).
2> Pid = loop2:init().
3> loop2:ping(Pid).
4> loop2:ping(Pid).
5> flush().
Shell got {pong,<0.39.0>}
Shell got {pong,<0.39.0>}
6> loop2:ping(Pid).
7> flush().
Shell got {pong,<0.39.0>}

Calling ping multiple times means we get multiple replies as we should from a message loop.

Calling stop terminates the message loop.

8> loop2:stop(Pid).
9> loop2:ping(Pid).
10> flush().        


While Erlang is a functional language and should be stateless, we may insert state into our message loop. Note that the variable State in the example never change value within a context.

1> c(loop3).
2> Pid = loop3:init().
3> loop3:ping(Pid).
4> loop3:ping(Pid).
5> loop3:ping(Pid).
6> flush().
Shell got {pong,<0.39.0>,0}
Shell got {pong,<0.39.0>,1}
Shell got {pong,<0.39.0>,2}
7> loop3:stop(Pid).
Final state = 3

Every ping leads to an increment of State and stop prints the final value.

Distributed messaging

First, we start an Erlang shell with the short name left and a cookie meetup. The cookie is used by the shells to find each other.

erl -sname left -setcookie meetup
Then start the loop in this shell.

(left@localhost)1> c(loop4).
(left@localhost)2> loop4:init().

Observe that the prompt includes the shortname of the node.

Start a new shell called right with the same cookie:
erl -sname right -setcookie meetup

Send messages to the loop running in the other shell and observe the response

(right@localhost)1> loop4:ping(left@localhost).
Got 0 from <6032.45.0>
(right@localhost)2> loop4:ping(left@localhost).
Got 1 from <6032.45.0>

Sending stop terminates the loop

(right@localhost)3> loop4:stop(left@localhost).

This causes the following output in the left shell

Final state = 2

OTP or at least generic servers

It’s hard to talk about Erlang and messaging without at least touching OTP and generic servers. A module defining a generic server needs to specify -behaviour(gen_server) and define some functions used by the generic server framework. This file also introduces Erlang macros as a built-in macro ?MODULE is used here. ?MODULE contains the name of the module as an atom.

The functions representing an API to this server are count, increment, start_link and stop. We start the server by calling start_link and then call count and increment to see what happens.

1> c(gobbler).
2> gobbler:start_link().
3> gobbler:count().
4> gobbler:count().
5> gobbler:increment().
6> gobbler:increment().
7> gobbler:count().

No surprises there. When gobbler:count is called, gen_sever:call(?MODULE, count) sends the message count to the message loop of the server. The message loop calls handle_call(count, From, State) with From identifying the caller and State containing the State variable which is set to 0 in init called by start_link. handle_call(count, From, State) returns {reply, State, State} with reply being mandatory in a call to indicate that the loop should send a reply to the caller, the first State is the message returned to the caller and the last State is the new value of the State variable to send into the next iteration of the loop.

gobbler:increment uses gen_server:cast(?MODULE, increment) to send an increment message to the server. Cast sends the message and forgets it, meaning the caller will not expect a reply or even wait for one.

Selection of the right handle_cast is a result of pattern matching. increment matches the first handle_cast so it is executed. A cast of stop would have matched the second one.

Iterating over joins in Pig

Apache Pig is a fantastic language for processing data. It is sometimes incredibly annoying, but it beats the hell out of writing a ton of map reduces and chaining them together. When iterating over joins, an issue that I know that I’m not the only one having ran into is referencing data after a join in pig.

Normally, you access fields using the dereference operators . or # depending on the data type. The period symbol, . is used for tuples and bags, i.e. tuple.field0, tuple.field1, bag.field0, bag.field1. Maps are dereference with a hash, #, i.e. map#’field0′, map#’field1′.

This does not work after a join. The expected iteration after a JOIN:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE list0.key;

This will fail with the obscure error: “scalar has more than one row in the output”. This error message is a known problem is and there is a . As can be seen from the ticket, the correct way to iterate over the join is by using the relation operator, :: instead of the dereferencing operators like this:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE list0::key;

If you fall for the temptation of skipping the name of the list to get the field from like this:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE key;

You will get the more informative message: “Found more than one match: list0::key, list1::key”.

What you are really doing after a join is addressing columns in relations. For users, addressing columns in a relation with a period would be easier, but using :: might make the underlying code easier to understand.

Hadoop Status Reporting from Ruby

Hadoop Map-Reduce is a great tool for analyzing and processing large amount of data. There are a few things one needs to keep in mind when working with Hadoop. This is the simple solution to one possibly annoying problem.

Hadoop Logo

Hadoop expects reducers to emit something regularly. If a reducer runs for a long time without output, it will be terminated and retried. The error message in this case is something like “Task attempt X failed to report status for Y seconds”.

I bet some of you are thinking that this should not be a problem since the mappers should do all the work and not the reducers. This is mostly true, but if the job of the reducer is to feed a lot of data to a database that is not write-optimized, things may take a little time.

The trick is to regularly write to STDERR to let Hadoop know that your reducer is healthy and progressing.

Add this line to the input processing loop of your reducer:

STDERR.puts("reporter:status:things_ok") unless (count += 1) % 1000 > 0

This will emit reporter:status:things_ok every 1000 items which is a fine magical number. Substitute your favorite magic number as long as it’s not too big.

Case statement pitfall when migrating to Ruby 1.9.2

Note that the pitfall is limited to MRI (standard Ruby) version 1.9.2. MRI 1.9.3, JRuby and Rubinius does not have this behavior.

I have been using Rubinius 2.0 to run machine learning experiments with libsvm lately. When running in Ruby 1.9.2, I noticed that my classifier always classified all samples as negative. I though this was caused by issues with libsvm-ruby-swig so I recompiled libsvm-ruby-swig from scratch including rerunning swig, but nothing changed. Next, I changed to use libsvmffi instead, but the result was the same. Realizing that I actually had some tests running av very simple classifier and that these tests passed on 1.9.2 made me look closer at the code. What I found was that the behavior of the Ruby case statement has changed from 1.8.7 to 1.9.2.

For if statements, 1 is equal to 1.0 in both 1.8.7 and 1.9, but while 1 matches 1.0 in 1.8.7 case statements, it does not in 1.9.2.

Code snippet that shows the difference:

#!/usr/bin/env ruby

puts case 1.0
when 1

First the output of irb when running 1.8.7:

$ rvm use ruby-1.8.7
Using /usr/local/rvm/gems/ruby-1.8.7-p334
$ ./case.rb 

And the same in 1.9.2:

$ rvm use ruby-1.9.2
Using /usr/local/rvm/gems/ruby-1.9.2-p180
$ ./case.rb 

Needless to say, I was puzzled by this result, but I was more surprised by the 1.8.7 behavior than 1.9.2. My assumption when I wrote the code was that I was dealing with integer values and since it worked, I forgot about it. Next time you see different behavior between 1.8.7 and 1.9.2 it might be worth reviewing case statements.

Feature prioritization for Pillow the CouchDB shard manager

I have now reached the end of my todo list for Pillow. That doesn’t mean it’s finished and ready to be stamped version 1.0. In it’s current incarnation it is fully usable and production ready, but in order to earn a 1.0 it needs to do a bit more.

The current resharding always doubles the number of servers required. Since you may overshard, that doesn’t necessarily mean you have to double the number of physical servers, but you need to organize more CouchDB instances than you might otherwise need. Smoother sharding algorithms that enable addition of single additional servers exist (consistent hashing) so Pillow should support this.

Pillow currently only supports rereducers written in Erlang. It would really be nice to support JavaScript for rereducers. A summing rereducer exists and mappers without reducers works just like in CouchDB. However when you have more complex reduction needs, copying the reducer code from your CouchDB into Pillow beats writing (and maintaining) them again in a new language.

Pillow should really support the bulk document API of CouchDB. I haven’t used this one myself, but adding support should be pretty straightforward.

CouchApp support is harder since it requires JavaScript support and then some. I probably need to play around with a CouchApp or two to find out more, but since I haven’t done so, it’s hard to determine how much work it would take.

While I do hope that there are no non-replicated CouchDB servers in production out there, reality is that there probably are lots. I like the three-way replication minimum myself and with CouchDB’s master-master scheme, it works really well. Pillow however is currently happily ignorant of any replication you have set up. I would really like to have Pillow manage such replication. In addition to managing replication, sets of Pillow servers should be controllable from a random server in the same master-master way ensuring full control of your cluster from any single Pillow node.

There is no clear prioritized list right now, all features listed above (and probably more) would be beneficial. However, as I am currently the only one developing Pillow and the time I can spend on Pillow is limited, I have to prioritize. The five features can be grouped:

  • CouchDB API compatibility: JavaScript views, bulk documents, CouchApp
  • Production flexibility and scaling: Consistent Hashing and Replication management

It is not hard to admit that API compatibility is important, but the core of the API is supported. Production flexibility and scaling is more important for me at the moment and I will probably focus on that. I also think that replication management is slightly more useful than consistent hashing. Choosing between the API features is harder since I don’t need them myself, but JavaScript views is a prerequisite of CouchApp and bulk document support is straightforward in comparison to CouchApp leading to this priority list:

  1. Replication management
  2. Consistent hashing
  3. JavaScript views
  4. Bulk documents
  5. CouchApp

This list is the result of my needs at the time of writing. Others may convince me to adjust the priorities. Better yet, others may jump in and add support for the features they need.