Pig lovers meet TOP

Have you ever needed to get the top n items for a key in Pig? For instance the most popular three items in each country for an online store? You could always solve this the hard way by calculating a threshold per country and then filter on that threshold. This is neither to write or execute. What you want to do is order the items by popularity per country and then limit to the top three for each country.

Pig has a builtin function that will help you with this, it’s called TOP. It is a combined order and limit and it can be used in a nested foreach. The parameters it takes are:

  • limit – the number of items to keep for each group
  • column number – the 0 based column to sort on
  • relation – the name of the relation to operate on

Read on for a detailed example of how TOP  can be used.

First we need some test data. Each line has a key and a score separated by tab. Let’s call the file test.tsv and put it in our hdfs home with hdfs dfs -put test.tsv.

a 1
b 1
a 2
b 2
c 1
c 2
a 3
c 3
c 4
a 4
a 5

Then we need to read and group the data by key. Here’s the pig code for that:

data = LOAD 'test.tsv' AS (key:chararray, score:int);
data_group = GROUP data BY key;

Running this and dumping the result gives us


Now iterate over that using a FOREACH and a nested TOP:

data_top = FOREACH data_group {
top = TOP(3, 1, data);

The output of this is:


Let’s flatten the top:

data_top = FOREACH data_group {
top = TOP(3, 1, data);
GENERATE flatten(top);

Dumping the result gives us


Since this is only sorted on the key, let’s also sort on descending score:

top_sorted = ORDER data_top BY key, score DESC;

The output after this is


Clearly we have only the top 3 items per key. In case of a, that means we have 5, 4 and 3. We only have 2 and 1 for b since b only has 2 scores. For c we have 4, 3 and 2.

The full pig script as tested on Cloudera’s pig version 0.12.0-cdh5.0.1

SET job.name 'nested top test';

data = LOAD 'test.tsv' AS (key:chararray, score:int);
data_group = GROUP data BY key;

data_top = FOREACH data_group {
top = TOP(3, 1, data);
GENERATE flatten(top);

top_sorted = ORDER data_top BY key, score DESC;
dump top_sorted;

Let’s call the script nested_top_test.pig. We run the script with pig nested_top_test.pig and observe the results.

Now you should have the very useful TOP function in your quiver for the next time you need to get the top items for each key in a relation. Have fun!


Crash course in Erlang

This is a summary of a talk I held Monday May 14 2012 at an XP Meetup in Trondheim. It is meant as a teaser for listeners to play with Erlang themselves.

First, some basic concepts. Erlang has a form of constant called atom that is defined on first use. They are typically used as enums or symbols in other languages. Variables in Erlang are immutable so assigning a new value to an existing variable is not allowed

1> A = 1.
2> A = 2.
** exception error: no match of right hand side value 2
3> A = 1.

The third statement shows that the assignment is actually a pattern match, but the first statement assigns a value to the variable.

Lists and property lists are central in Erlang. The lists behave much like lists in other functional languages or arrays in other types of languages. Property lists are a special type of list that is used as a hash or dict in other languages

1> A = [1,2,3,4,5,6,7].
2> lists:nth(1, A).
3> lists:nth(7, A).
4> lists:last(A).
5> length(A).
6> [Head|Tail] = A.
7> Head.
8> Tail.

The Erlang shell is useful and when defining a module, it can be compiled and run directly from the shell. Fibonacci numbers are an easy demonstration of the capabilities of a functional language.



fib(0) -> 0;
fib(1) -> 1;
fib(N) -> fib(N - 1) + fib(N - 2).

Compile and test

1> c(fib).
2> lists:foreach(fun(N) -> io:format("fib ~p = ~p~n", [N, fib:fib(N)]) end, [1,2,3,4,5,6,7]).
fib 1 = 1
fib 2 = 1
fib 3 = 2
fib 4 = 3
fib 5 = 5
fib 6 = 8
fib 7 = 13



Erlang has a very powerful messaging system. This system supports distributed messaging.

First, a simple message loop that isn’t really a loop at all. Calling loop:init() will spawn a separate process waiting to receive messages.

1> c(loop).
2> Pid = loop:init().
3> loop:ping(Pid).
4> flush().
Shell got {pong,<0.39.0>}
5> loop:ping(Pid).
6> flush().

The first time the loop is pinged, it replies pong, but the second time, nothing happens. When a message is received, the loop function will finish.


Message receive statements in Erlang may time out:

2> Pid = loop1:init().
I have decided to die

The message loop times out after 1000 millisecond and exits the function.

An actual message loop

Let’s convert the message handling function loop into a real loop through tail recursion. Tail recursion means this loop can run forever without the growing stack otherwise caused by recursion.

1> c(loop2).
2> Pid = loop2:init().
3> loop2:ping(Pid).
4> loop2:ping(Pid).
5> flush().
Shell got {pong,<0.39.0>}
Shell got {pong,<0.39.0>}
6> loop2:ping(Pid).
7> flush().
Shell got {pong,<0.39.0>}

Calling ping multiple times means we get multiple replies as we should from a message loop.

Calling stop terminates the message loop.

8> loop2:stop(Pid).
9> loop2:ping(Pid).
10> flush().        


While Erlang is a functional language and should be stateless, we may insert state into our message loop. Note that the variable State in the example never change value within a context.

1> c(loop3).
2> Pid = loop3:init().
3> loop3:ping(Pid).
4> loop3:ping(Pid).
5> loop3:ping(Pid).
6> flush().
Shell got {pong,<0.39.0>,0}
Shell got {pong,<0.39.0>,1}
Shell got {pong,<0.39.0>,2}
7> loop3:stop(Pid).
Final state = 3

Every ping leads to an increment of State and stop prints the final value.

Distributed messaging

First, we start an Erlang shell with the short name left and a cookie meetup. The cookie is used by the shells to find each other.

erl -sname left -setcookie meetup
Then start the loop in this shell.

(left@localhost)1> c(loop4).
(left@localhost)2> loop4:init().

Observe that the prompt includes the shortname of the node.

Start a new shell called right with the same cookie:
erl -sname right -setcookie meetup

Send messages to the loop running in the other shell and observe the response

(right@localhost)1> loop4:ping(left@localhost).
Got 0 from <6032.45.0>
(right@localhost)2> loop4:ping(left@localhost).
Got 1 from <6032.45.0>

Sending stop terminates the loop

(right@localhost)3> loop4:stop(left@localhost).

This causes the following output in the left shell

Final state = 2

OTP or at least generic servers

It’s hard to talk about Erlang and messaging without at least touching OTP and generic servers. A module defining a generic server needs to specify -behaviour(gen_server) and define some functions used by the generic server framework. This file also introduces Erlang macros as a built-in macro ?MODULE is used here. ?MODULE contains the name of the module as an atom.

The functions representing an API to this server are count, increment, start_link and stop. We start the server by calling start_link and then call count and increment to see what happens.

1> c(gobbler).
2> gobbler:start_link().
3> gobbler:count().
4> gobbler:count().
5> gobbler:increment().
6> gobbler:increment().
7> gobbler:count().

No surprises there. When gobbler:count is called, gen_sever:call(?MODULE, count) sends the message count to the message loop of the server. The message loop calls handle_call(count, From, State) with From identifying the caller and State containing the State variable which is set to 0 in init called by start_link. handle_call(count, From, State) returns {reply, State, State} with reply being mandatory in a call to indicate that the loop should send a reply to the caller, the first State is the message returned to the caller and the last State is the new value of the State variable to send into the next iteration of the loop.

gobbler:increment uses gen_server:cast(?MODULE, increment) to send an increment message to the server. Cast sends the message and forgets it, meaning the caller will not expect a reply or even wait for one.

Selection of the right handle_cast is a result of pattern matching. increment matches the first handle_cast so it is executed. A cast of stop would have matched the second one.

Iterating over joins in Pig

Apache Pig is a fantastic language for processing data. It is sometimes incredibly annoying, but it beats the hell out of writing a ton of map reduces and chaining them together. When iterating over joins, an issue that I know that I’m not the only one having ran into is referencing data after a join in pig.

Normally, you access fields using the dereference operators . or # depending on the data type. The period symbol, . is used for tuples and bags, i.e. tuple.field0, tuple.field1, bag.field0, bag.field1. Maps are dereference with a hash, #, i.e. map#’field0′, map#’field1′.

This does not work after a join. The expected iteration after a JOIN:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE list0.key;

This will fail with the obscure error: “scalar has more than one row in the output”. This error message is a known problem is and there is a . As can be seen from the ticket, the correct way to iterate over the join is by using the relation operator, :: instead of the dereferencing operators like this:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE list0::key;

If you fall for the temptation of skipping the name of the list to get the field from like this:

joined = JOIN list0 BY key, list1 BY key;
purified = FOREACH joined GENERATE key;

You will get the more informative message: “Found more than one match: list0::key, list1::key”.

What you are really doing after a join is addressing columns in relations. For users, addressing columns in a relation with a period would be easier, but using :: might make the underlying code easier to understand.

Hadoop Status Reporting from Ruby

Hadoop Map-Reduce is a great tool for analyzing and processing large amount of data. There are a few things one needs to keep in mind when working with Hadoop. This is the simple solution to one possibly annoying problem.

Hadoop Logo

Hadoop expects reducers to emit something regularly. If a reducer runs for a long time without output, it will be terminated and retried. The error message in this case is something like “Task attempt X failed to report status for Y seconds”.

I bet some of you are thinking that this should not be a problem since the mappers should do all the work and not the reducers. This is mostly true, but if the job of the reducer is to feed a lot of data to a database that is not write-optimized, things may take a little time.

The trick is to regularly write to STDERR to let Hadoop know that your reducer is healthy and progressing.

Add this line to the input processing loop of your reducer:

STDERR.puts("reporter:status:things_ok") unless (count += 1) % 1000 > 0

This will emit reporter:status:things_ok every 1000 items which is a fine magical number. Substitute your favorite magic number as long as it’s not too big.

Case statement pitfall when migrating to Ruby 1.9.2

Note that the pitfall is limited to MRI (standard Ruby) version 1.9.2. MRI 1.9.3, JRuby and Rubinius does not have this behavior.

I have been using Rubinius 2.0 to run machine learning experiments with libsvm lately. When running in Ruby 1.9.2, I noticed that my classifier always classified all samples as negative. I though this was caused by issues with libsvm-ruby-swig so I recompiled libsvm-ruby-swig from scratch including rerunning swig, but nothing changed. Next, I changed to use libsvmffi instead, but the result was the same. Realizing that I actually had some tests running av very simple classifier and that these tests passed on 1.9.2 made me look closer at the code. What I found was that the behavior of the Ruby case statement has changed from 1.8.7 to 1.9.2.

For if statements, 1 is equal to 1.0 in both 1.8.7 and 1.9, but while 1 matches 1.0 in 1.8.7 case statements, it does not in 1.9.2.

Code snippet that shows the difference:

#!/usr/bin/env ruby

puts case 1.0
when 1

First the output of irb when running 1.8.7:

$ rvm use ruby-1.8.7
Using /usr/local/rvm/gems/ruby-1.8.7-p334
$ ./case.rb 

And the same in 1.9.2:

$ rvm use ruby-1.9.2
Using /usr/local/rvm/gems/ruby-1.9.2-p180
$ ./case.rb 

Needless to say, I was puzzled by this result, but I was more surprised by the 1.8.7 behavior than 1.9.2. My assumption when I wrote the code was that I was dealing with integer values and since it worked, I forgot about it. Next time you see different behavior between 1.8.7 and 1.9.2 it might be worth reviewing case statements.

Feature prioritization for Pillow the CouchDB shard manager

I have now reached the end of my todo list for Pillow. That doesn’t mean it’s finished and ready to be stamped version 1.0. In it’s current incarnation it is fully usable and production ready, but in order to earn a 1.0 it needs to do a bit more.

The current resharding always doubles the number of servers required. Since you may overshard, that doesn’t necessarily mean you have to double the number of physical servers, but you need to organize more CouchDB instances than you might otherwise need. Smoother sharding algorithms that enable addition of single additional servers exist (consistent hashing) so Pillow should support this.

Pillow currently only supports rereducers written in Erlang. It would really be nice to support JavaScript for rereducers. A summing rereducer exists and mappers without reducers works just like in CouchDB. However when you have more complex reduction needs, copying the reducer code from your CouchDB into Pillow beats writing (and maintaining) them again in a new language.

Pillow should really support the bulk document API of CouchDB. I haven’t used this one myself, but adding support should be pretty straightforward.

CouchApp support is harder since it requires JavaScript support and then some. I probably need to play around with a CouchApp or two to find out more, but since I haven’t done so, it’s hard to determine how much work it would take.

While I do hope that there are no non-replicated CouchDB servers in production out there, reality is that there probably are lots. I like the three-way replication minimum myself and with CouchDB’s master-master scheme, it works really well. Pillow however is currently happily ignorant of any replication you have set up. I would really like to have Pillow manage such replication. In addition to managing replication, sets of Pillow servers should be controllable from a random server in the same master-master way ensuring full control of your cluster from any single Pillow node.

There is no clear prioritized list right now, all features listed above (and probably more) would be beneficial. However, as I am currently the only one developing Pillow and the time I can spend on Pillow is limited, I have to prioritize. The five features can be grouped:

  • CouchDB API compatibility: JavaScript views, bulk documents, CouchApp
  • Production flexibility and scaling: Consistent Hashing and Replication management

It is not hard to admit that API compatibility is important, but the core of the API is supported. Production flexibility and scaling is more important for me at the moment and I will probably focus on that. I also think that replication management is slightly more useful than consistent hashing. Choosing between the API features is harder since I don’t need them myself, but JavaScript views is a prerequisite of CouchApp and bulk document support is straightforward in comparison to CouchApp leading to this priority list:

  1. Replication management
  2. Consistent hashing
  3. JavaScript views
  4. Bulk documents
  5. CouchApp

This list is the result of my needs at the time of writing. Others may convince me to adjust the priorities. Better yet, others may jump in and add support for the features they need.

A functional approach to Ruby

Several articles and blog posts have been written about functional Ruby. They tend to focus either on whether Ruby is a functional language or how to do functional programming in Ruby. I am not planning to do either. This post will look into the benefits of a functional approach to Ruby and the transition from thinking classic object-oriented to functional.

I consider the discussion around Ruby being a functional language or not academic with no effect on my use of the language. There is no doubt that functional programming is possible in Ruby, but bear in mind that it does not enforce pure functions that do not have side-effects. As for good overviews of functional programming in Ruby, I suggest Khaled alHabache’s post Ruby and Functional Programming.

When I originally started developing in Ruby, I was used to object-oriented programming. I tended to make classes for all kinds of data objects and the result looked like a nicer, more readable version of Java code. While this works, it is not the most effective way of developing in Ruby (or other dynamic programming languages).

One of the benefits of Ruby (and many other dynamic programming languages), is their lack of static typing. In classic object-oriented development, you would define member variables and methods to operate on the variables. You don’t have to do that in Ruby since you have a flexible hash class that can store most of what you need. Once you have replaced all member variables with a hash, the hash is your object and the methods of the old class are just functions that could operate on your hash.

There are situations where a hash doesn’t make sense. If you build an abstraction class, i.e. a storage system abstraction class, you might want to keep some information internally. Connection parameters for the storage system could be kept in a hash, but that doesn’t feel right. Interestingly, since there is normally only a single storage system of a particular type in use, you could make the storage abstraction class a singleton and keep all the connection parameters internally in traditional member variables.

Some information that you would stored in a database or on a different server might be used often enough to keep a memory cache. Let me stress that I don’t like caches and I try to avoid them whenever possible since they add complexity and the potential for inconsistent data among different servers. That being said, I do add caches when they are needed and once again, caches can also be implemented as a hash. Some code needs to control the caches, but since you normally cache data coming either from storage or a different server or service, you could put them in the abstraction class for that. If you need to write and use common code to manage the caches, you can easily build a mixin module.

Interestingly, the model you end up with if you only have hashes you send around and keep consistent data in singletons is similar to the Erlang gen_server behavior. This behavior is a general server template for Erlang, a pure functional language with immutable variables. The state variable is given as a parameter to all the general server functions. This allows the gen_server to maintain information in a pure functional setting.

When you get used to keeping abstractions in singletons and your data in hashes, you can also use modules instead of classes to modularize your code to keep related functionality together. If you also use blocks to define exact behavior inside function you end up with flexible code that is very easy to reuse.

Keeping your object data in a hash and implementing functions without side-effects make testing easy. What you get back from a function call is only a result of the function parameters and there is no need to test combinations of operations. In an object-oriented setting, member variables might not be observable and even if the returned value of a method call is correct the object might be in an undesired state that you cannot test without modifying the class to make internal variables accessible in your test system. Clearly the functional approach is cleaner and requires less test code.

The Sincerial system being a request handling system is built mostly functionally. There are singletons guarding the storage system and other cached data. The system also uses classic objects where that makes sense. Ruby is an object-oriented language and I believe in using the language features available when appropriate. This might sound like a contradiction to the whole post, but it’s not. My point is that you should avoid creating traditional classes when a hash can do the job and use functional programming techniques actively to improve maintainability, testability and readability of your code.

Pillow, the CouchDB shard manager

If there is one thing that has bothered me about my choice of CouchDB as the main storage system for Sincerial, it’s the lack of an automatic system for shard management. In the early days of a startup, a single server is probably capable of handling all the necessary data. However, a successful service that is built around the harvesting and analysis of data will sooner or later have to shard the dataset across multiple servers. And for a new service, the sooner you have to start sharding, the better. Several good distributed storage systems exist. Google’s original bigtable, HBase (Hadoop’s bigtable equivalent), Cassandra and more solve this particular problem, but there is more to choosing and running a storage system than just data volume scaling. CouchDB has other strengths that made it a good choice for our application, but that is not the topic of this post.

CouchDB-lounge originally written by Kevin Ferguson (macfergus), Vijay Ragunathan (lukatmyshu) and Shaun Lindsay (srlindsay) at Meebo.com will handle distribution of requests to the right servers in a cluster of servers, but you still have to handle resharding or repartitioning manually. CouchDB-lounge consists of two components, dumbproxy handling reading and writing of documents and smartproxy handling views. These require Nginx and Twisted (a Python framework) respectively. If you overshard appropriately, you can scale your data volume a long way before you have to start resharding.

While manually repartitioning a CouchDB database is doable, I’d rather have an automatic way of doing it since I don’t want to make mistakes. In addition, the Sincerial system uses Ruby running with Phusion Passenger in Apache and I didn’t want to add two more frameworks on top of that. This might sound like a not-invented here excuse, but it isn’t or at least I don’t think it is.

When I started developing Pillow, I chose to do so in erlang to match couchdb. The reason was two-fold. First of all I was curious about erlang and I like functional programming. Secondly, CouchDB was written in erlang and there had to be a reason for that. Now I’ve released version 0.3 of Pillow. This version supports automatic resharding, routing of requests to the right shard and views. Reducers need to be written in erlang, but a summing reducer is in place and mappers without reducers are supported out of the box. As such, this version of Pillow has all the functionality I set out to develop, but it does not support the full CouchDB API.

The bulk document API is not supported. And I haven’t tried running standalone CouchApps. The reason being that I am focusing on our needs. I intend to have full support of the CouchDB API eventually, but it might be that I integrate more tightly with CouchDB and use CouchDB as a library to make this happen. This will make supporting javascript reducers easier as well. It would of course be interesting to have Pillow become an integral part of CouchDB as well providing one can still access each CouchDB server directly for maintenance purposes. The latter being one of the reasons I’m in general a bit sceptical to distributed systems that hide the inner workings since often hard to fix problems that may occur.

CouchDB Replication Monitor

CouchDB does replication, but replication needs to be set up after each server restart. This means you need to ensure that replication is restarted whenever the daemon restarts CouchDB. I have never seen replication stop working without a restart, but I prefer being safe to being sorry about replication. To be perfectly honest, I do not trust that my replication initiation after a soft CouchDB restart works properly either so I prefer to monitor the replication and have a safety mechanism in place to restart replication if needed.

There are several ways to monitor replication. You could fetch the status page of all servers and restart replication on servers with an empty page, but that is a kind of brute force approach in my world. A better solution is to use the replication itself to monitor that it works.

Each server updates their timestamp in CouchDB and this is again replicated to the other servers. This gets us a bit of the way, but not all the way. The server you are checking might have received updates from all the other servers, but you don’t know if it’s pushed out anything to the other servers. To solve this, you can add information about the other servers to the local server as well. This will give you a matrix of server replication status.

For each server, you will see the timestamp replicated from the server and a list of timestamps replicated to that server. The latter often being a generation older than the former. Cron can be used to update this data. The cronjob reads all the server timestamps and updates this servers timestamp followed by a list of the other servers timestamp.

A mapper to get a server id to server status out of the db.

map: function(doc) {
  emit(doc._id, doc);

Our monitroing database is called server_status. The design containing the mapper is called collections and the view server_list.

A Ruby database checker that can run on cron.

require 'rubygems'
require 'couchrest'
require 'json'
require 'open-uri'

STATUS_DB = 'http://localhost:5984/server_status'
COLLECTIONS = 'collections'
SERVER_LIST = 'server_list'

hostname = ARGV[0]

status_db = CouchRest.database!(STATUS_DB)
status_view = "#{STATUS_DB}/_design/#{COLLECTIONS}/_view/#{SERVER_LIST}"

# Get the current information about this server if available
server_status = begin
rescue RestClient::ResourceNotFound
  {'_id' => hostname}

server_status['time'] = Time.new.to_i
# Get the current times of the other servers and update this server's
# view of them
JSON(open(status_view).read)['rows'].map do |row|
  {'server' => row['id'], 'status' => row['value']}
end.each do |status|
  unless status['server'] == hostname
    server_status['servers'][status['server']] = status['status']['time'] 

Now you need to determine when to trigger replication restart. This can be handled in the watchdog cronjob. If the highest timestamp seen for this server at other servers is above a threshold, restart replication.

The final loop triggering when the age is above a threshold. The init_replication method just posts a continuous replication trigger to the db:

JSON(open(status_view).read)['rows'].map do |row|
  {'server' => row['id'], 'status' => row['value']}
end.each do |status|
  if server_status['time'] - status['status']['time'] > THRESHOLD 
  unless status['server'] == hostname
    server_status['servers'][status['server']] = status['status']['time'] 

Rudimentary init_replication method.

def init_replication(server)
  target = "http://#{server}:5984"
  databases = ['server_status']
  databases.each do |db|
    config = {
            'source' => "#{db}",
            'target' => "#{target}/#{db}",
            'continuous' => true
    payload = JSON.generate(config)
    result = Net::HTTP.new('', '5984').post(
      '/_replicate', payload, {'content-type' => 'text/x-json'})
    unless result.code == 200
      p "replication to #{target}/#{db} failed with #{result.code}" 

We have a monitoring view of replication ages in our system. It shows the matrix of timestamps as age in seconds rather than the actual timestamp since the age is the important metric.
Server Status

A bonus of this replication monitoring system is that we can access the status page from a mobil phone and get an accurate picture of the replication status. This doesn’t worry me now, but it did when we first set it up. Now it’s just a part of our general monitoring view.