Weird behavior in Class methods vs StaticMethods in Pyspark
Note, using Spark 2.0.0 with python 2.7
I just found a very weird behavior in PySpark. I will show it with an example. Who knows, maybe this can help someone else.
I am processing a list of text files containing data in jsonlines format. After some fiddling I set up a basic class to process the files:
class TestClassProcessor(object):
def __init__(self):
self.spark = SparkSession...GetOrCreate()
@staticmethod
def parse_record(self, record):
... do something with record...
return record_updated
def process_file(self, fname):
data = self.spark.read.text(fname)
data_processed = data.rdd.map(
lambda r: self.parse_record(r.value)
)
df = data_processed.toDF()
The reason why I set up parse_record
as a staticmethod
was simple. Initially I set up this code as a set of functions, and when I switched to classes I wanted the existing code to work. So in the existing code, I just changed
data_processed = data.rdd.map(
lambda r: parse_record(r.value)
)
to
data_processed = data.rdd.map(
lambda r: TestClassProcessor.parse_record(r.value)
)
I mean, that is the purpose of static methods, ain't it?
Big was my surprise, when I spark-submitt
ed (is that a verb?) the script and got one of Spark's wonderful spaguetti traces, ending in the infamous:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:211)
at java.lang.Thread.run(Thread.java:745)
Trace which, based on my experience, it can mean fkin anything.
Interesingly though, the old function based code still worked.
At the end, the way I fixed this baffling error was easy. Just had to change one line of the class TestClassProcessor
to
class TestClassProcessor(object):
def __init__(self):
self.spark = SparkSession...GetOrCreate()
@staticmethod
def parse_record(self, record):
... do something with record...
return record_updated
def process_file(self, fname):
data = TestClassProcessor.spark.read.text(fname) #<==============TADAAA!
data_processed = data.rdd.map(
lambda r: self.parse_record(r.value)
)
df = data_processed.toDF()
Which Im pretty sure is not required in regular python.
Cheers!