I am attempting to append new json files into an existing parquet table defined in Databricks.
Using a dataset defined by this command (dataframe initially added to a temp table):
val output = sql("select headers.event_name, to_date(from_unixtime(headers.received_timestamp)) as dt, from_unixtime(headers.received_timestamp) AS login_datetime, headers.ip_address, headers.acting_user_id from usersloggedInRaw_tmp")
I create the initial table with the following:
This output of this table looks like the following:
If I try to append a new json file to the now existing 'dev_session' table, using the following:
Here is what I see:
The dataset seems to 'shift'. For example, the acting_user_id value is now populating the 'dt' column, the column used in the append command to partition the data.
I have tried this flow multiple times and can reproduce the same result. Is this a bug in dataframe.write(), or am I making a mistake somewhere? Note that prior to appending the table, I inspect the 'output' dataframe in databricks via the display() command and there is no issues - the values are in their expected columns. It is only after appending to the table using the write command that the issue seems to occur.
Any help that can be provided would be sincerely appreciated.
Answer by anil.s.langote · Apr 20, 2016 at 06:46 PM
We came across similar situation we are using spark 1.6.1, we have a daily load process to pull data from oracle and write as parquet files, this works fine for 18 days of data (till 18th run), the problem comes after 19th run where the data frame load job getting called multiple times and it never completes, when we delete all the partitioned data and run just for 19 day it works which proves that there is no issue data. How can we proceed with this, is disabling the metadata helps? if yes then can we run into issues when we have more than 500 partitions?
Answer by vida · Sep 12, 2015 at 05:10 PM
Just updating this question with the latest update. Omoshiroi says he found a workaround:
I have since moved away from this approach and made heavy modifications to the notebook of interest as I did not see a way to define a specific path in conjunction with saveAsTable. I would like it to point to a S3 directory of my choosing. Therefore I am now using "sqlContext.createExternalTable(tableName, warehouseDirectory)" in conjunction with "sqlContext.refreshTable(tableName)". With this approach, I do not get the same issue. When adding new data, no 'shifting' of the data takes place.
If anyone runs into the issue above though - let us know - if we can get a reliable reproduction - we'd love to debug and fix that.
Answer by CliveEvans · Oct 30, 2015 at 11:42 AM
I can reproduce this quite simply here.case class Thing(first: String, second: String)
val df = sqlContext.createDataFrame(Seq(Thing("one","two")))
Results in the following on disk:
-rw-r--r--. 1 dev dev 211 Oct 30 11:37 _common_metadata drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=one drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=two -rw-r--r--. 1 dev dev 459 Oct 30 11:37 _metadata -rw-r--r--. 1 dev dev 0 Oct 30 11:37 _SUCCES
Tried on 1.5.0 and 1.5.1
how to use display() on a table 1 Answer