Hi All...
I think I know what's going with this but I'm relatively new to databricks spark and could do with a second opinion.
I have a couple of short pieces of code. The code is set of 90000 records used to filter a 6GB dataset.
I think the reason the 1st one fails is because a filter uses a list which is collected at the driver and therefore to do the filter it needs to collect all the data at the driver at which point the socket times out serializing JVM to Python. Where as the second one using a join is an RDD and therefore distributes the join which is far more efficient and succeeds. Can someone please share thoughts?
Things I've tried:
Data is here: https://fasttext.cc/docs/en/english-vectors.html
NOTE: Arrow is enabled but doesn't help - same error as without
from pyspark.sql.functions import split embeddings_index={} f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec") for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i)) f = f.drop('_c0') f = f.na.drop(subset=["val_7"]) wl = sc.broadcast(list( f.select('val_0').limit(90000).toPandas()['val_0'] )) f_fil = f.filter((f.val_0.isin(wl))) em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')Exception: could not open socket: ["tried to connect to ('127.0.0.1', 36543), but an error occured: [Errno 111] Connection refused"] --------------------------------------------------------------------------- Exception Traceback (most recent call last) <command-537020040227877> in <module>() 18 f_fil = f.filter((f.val_0.isin(wl))) 19 #f_fil.explain() ---> 20 em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list') 21 #wlb.unpersist() 22 #wlb.destroy()
/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self) 2137 _check_dataframe_localize_timestamps 2138 import pyarrow -> 2139 batches = self._collectAsArrow() 2140 if len(batches) > 0: 2141 table = pyarrow.Table.from_batches(batches) /databricks/spark/python/pyspark/sql/dataframe.py in _collectAsArrow(self) 2197 with SCCallSiteSync(self._sc) as css: 2198 sock_info = self._jdf.collectAsArrowToPython() -> 2199 return list(_load_from_socket(sock_info, ArrowStreamSerializer())) 2200 2201 ##########################################################################################
/databricks/spark/python/pyspark/rdd.py in _load_from_socket(sock_info, serializer) 142 143 def _load_from_socket(sock_info, serializer): --> 144 (sockfile, sock) = local_connect_and_auth(*sock_info) 145 # The RDD materialization time is unpredicable, if we set a timeout for socket reading 146 # operation, it will very possibly fail. See SPARK-18281.
/databricks/spark/python/pyspark/java_gateway.py in local_connect_and_auth(port, auth_secret) 176 sock = None 177 else: --> 178 raise Exception("could not open socket: %s" % errors) 179 180
The join however works fine - list has been convert back to RDD:
from pyspark.sql.functions import split embeddings_index={} f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec") for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i)) f = f.drop('_c0') f = f.na.drop(subset=["val_7"]) wl = list( f.select('val_0').limit(90000).toPandas()['val_0'] )from pyspark.sql.types import StringType from pyspark.sql.functions import col wordsdf = spark.createDataFrame(wl, StringType()) wordsdf = wordsdf.select(col("value").alias("val_0")) keep_words = f.join(wordsdf, ['val_0']) em_index = keep_words.toPandas().set_index('val_0').T.to_dict('list')
Databricks Inc.
160 Spear Street, 13th Floor
San Francisco, CA 94105
info@databricks.com
1-866-330-0121