dict issues in pyspark

I was running some Spark jobs that showed odd results. The output had complex fields that showed up with null values for fields that should always have a value:

    "year": null,
    "name": "John Smith",
    "age": null

This puzzled me. I tried hardcoding all those values and setting them once by setting the field to this dict. Still, the name was changed to the hardcoded value, but the other two values remaining null. Without going into detail, I tried every plausible and implausible solution before the truth dawned on me.

When converting from an RDD which I used to set these values to a DataFrame; all the values of the dict either need to be the same type.

Let’s look at a simple example. Some fake data highlighting the issue.

>>> fake_data.show(truncate=False)
|age|name    |year|
|256|John Doe|2000|

It’s a simple DataFrame with three fields, two numbers and a string.

>>> fake_data.map(lambda r: Row(person={'age': r.age})).toDF()
DataFrame[person: map<string,bigint>]

Here’s what’s happening. The dict is turned into a checked map, forcing the values to be bigints.

>>> fake_data.map(lambda r: Row(person={'age': r.age, 'name': r.name})).toDF()
DataFrame[person: map<string,bigint>]

Confirmed. The name will cause problems since the name is not a number.
Let’s verify that this happens in the transition from RDD to DataFrame.

>>> from __future__ import print_function
>>> fake_data.map(lambda r: Row(
    person={'age': r.age, 'name': r.name})).foreach(print)
Row(person={'age': 256, 'name': u'John Doe'})

So as long as this is an RDD; we are fine.

Let’s have a look at the DataFrame

>>> fake_data.map(lambda r: Row(
    person={'age': r.age, 'name': r.name})).toDF().show(truncate=False)
|person                       |
|Map(name -> null, age -> 256)|

The only solution I have found for this is to convert all the fields to strings or since I am outputting this as JSON anyway, to just dump the dict to JSON in the RDD before converting to a DataFrame.

>>> from json import dumps
>>> fake_data.map(lambda r: Row(
    person={'age': r.age, 'name': r.name}
)).map(lambda r: Row(
|person                          |
|{"age": 256, "name": "John Doe"}|

I am using Spark 1.5.2 for this. I would be happy to hear if this is not an issue in more recent versions.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.