• Create
    • Ask a question
    • Create an article
    • Topics
    • Questions
    • Articles
    • Users
    • Badges
  • Sign in
  • Home /
avatar image
0

List Filter - Socket Refused

pythonlistsockettimeoutexception
Question by shaunryan · Nov 26, 2018 at 04:01 PM ·

Hi All...

I think I know what's going with this but I'm relatively new to databricks spark and could do with a second opinion.

I have a couple of short pieces of code. The code is set of 90000 records used to filter a 6GB dataset.

  1. The first one using a filter on a list which fails with socket refused error.
  2. The second one using a join works fine.

I think the reason the 1st one fails is because a filter uses a list which is collected at the driver and therefore to do the filter it needs to collect all the data at the driver at which point the socket times out serializing JVM to Python. Where as the second one using a join is an RDD and therefore distributes the join which is far more efficient and succeeds. Can someone please share thoughts?

Things I've tried:

  • I tried to distribute the list variable but that fails with an AttributeError _get_object_id
  • enabled Arrow - has no effect fails with and without

Data is here: https://fasttext.cc/docs/en/english-vectors.html

NOTE: Arrow is enabled but doesn't help - same error as without

from pyspark.sql.functions import 
split
embeddings_index={} 
f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec") 
for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i)) 
f = f.drop('_c0') 
f = f.na.drop(subset=["val_7"]) 
wl = sc.broadcast(list( 
    f.select('val_0').limit(90000).toPandas()['val_0'] 
)) 
f_fil = f.filter((f.val_0.isin(wl))) 
em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list')
Exception: could not open socket: ["tried to connect to ('127.0.0.1', 36543), but an error occured: [Errno 111] Connection refused"] --------------------------------------------------------------------------- Exception Traceback (most recent call last) <command-537020040227877> in <module>() 18 f_fil = f.filter((f.val_0.isin(wl))) 19 #f_fil.explain() ---> 20 em_index = f_fil.toPandas().set_index('val_0').T.to_dict('list') 21 #wlb.unpersist() 22 #wlb.destroy()

/databricks/spark/python/pyspark/sql/dataframe.py in toPandas(self) 2137 _check_dataframe_localize_timestamps 2138 import pyarrow -> 2139 batches = self._collectAsArrow() 2140 if len(batches) > 0: 2141 table = pyarrow.Table.from_batches(batches) /databricks/spark/python/pyspark/sql/dataframe.py in _collectAsArrow(self) 2197 with SCCallSiteSync(self._sc) as css: 2198 sock_info = self._jdf.collectAsArrowToPython() -> 2199 return list(_load_from_socket(sock_info, ArrowStreamSerializer())) 2200 2201 ##########################################################################################

/databricks/spark/python/pyspark/rdd.py in _load_from_socket(sock_info, serializer) 142 143 def _load_from_socket(sock_info, serializer): --> 144 (sockfile, sock) = local_connect_and_auth(*sock_info) 145 # The RDD materialization time is unpredicable, if we set a timeout for socket reading 146 # operation, it will very possibly fail. See SPARK-18281.

/databricks/spark/python/pyspark/java_gateway.py in local_connect_and_auth(port, auth_secret) 176 sock = None 177 else: --> 178 raise Exception("could not open socket: %s" % errors) 179 180

The join however works fine - list has been convert back to RDD:

from pyspark.sql.functions import split embeddings_index={} f = sqlContext.read.format('csv').options(inferSchema='true').load("/mnt/ex_data/wiki.en.vec") for i in range(301): f = f.withColumn('val_' + str(i), split(f['_c0'], ' ').getItem(i)) f = f.drop('_c0') f = f.na.drop(subset=["val_7"]) wl = list( f.select('val_0').limit(90000).toPandas()['val_0'] )

from pyspark.sql.types import StringType from pyspark.sql.functions import col wordsdf = spark.createDataFrame(wl, StringType()) wordsdf = wordsdf.select(col("value").alias("val_0")) keep_words = f.join(wordsdf, ['val_0']) em_index = keep_words.toPandas().set_index('val_0').T.to_dict('list')

Add comment
Comment
10 |600 characters needed characters left characters exceeded
▼
  • Viewable by all users
  • Viewable by moderators
  • Viewable by moderators and the original poster
  • Advanced visibility
Viewable by all users

Sort

  • Votes
  • Created
  • Oldest

Your answer

Hint: You can notify a user about this post by typing @username

Up to 2 attachments (including images) can be used with a maximum of 524.3 kB each and 1.0 MB total.

Follow this Question

17 People are following this question.

avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image avatar image

Related Questions

Why are Python custom UDFs (registerFunction) showing Arrays with java.lang.Object references? 1 Answer

While running a application in Apache Spark, gave WARN message 0 Answers

Goose-extractor 0 Answers

How to concatenate/append multiple Spark dataframes column wise in Pyspark? 0 Answers

How can I integrate DataBricks into PyCharm? 3 Answers

  • Product
    • Databricks Cloud
    • FAQ
  • Spark
    • About Spark
    • Developer Resources
    • Community + Events
  • Services
    • Certification
    • Spark Support
    • Spark Training
  • Company
    • About Us
    • Team
    • News
    • Contact
  • Careers
  • Blog

Databricks Inc.
160 Spear Street, 13th Floor
San Francisco, CA 94105

info@databricks.com
1-866-330-0121

  • Twitter
  • LinkedIn
  • Facebook
  • Facebook

© Databricks 2015. All rights reserved. Apache Spark and the Apache Spark Logo are trademarks of the Apache Software Foundation.

  • Anonymous
  • Sign in
  • Create
  • Ask a question
  • Create an article
  • Explore
  • Topics
  • Questions
  • Articles
  • Users
  • Badges