I have existing Hive data stored in Avro format. For whatever reason reading these data by executing SELECT is very slow. I didn't figure out yet why. So I decided to read the data directly by navigating to the partition path and using Spark SQLContext. This works much faster. However, the problem I have is reading the DOUBLE values which are stored as logical double type. In the schema file they are defined as:
{"name":"ENDING_NET_RECEIVABLES_LOCAL","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":18}],"doc":"Ending Net Receivables Local","default":null}
Somewhere I found a recommendation of using the following approach to convert my Avro schema into a Spark SQL schema:
def getSparkSchemaForAvro(sqc: SQLContext, avroSchema: Schema): StructType = { val tmpPath = "/tmp/avro_dummy" val dummyFIle = File.createTempFile(tmpPath, "avro") val datumWriter = new GenericDatumWriter[Any]() datumWriter.setSchema(avroSchema) val writer = new DataFileWriter(datumWriter).create(avroSchema, dummyFIle) writer.flush() writer.close() val df = sqc.read.format("com.databricks.spark.avro").load("file://" + dummyFIle.getAbsolutePath) val sparkSchema = df.schema sparkSchema }However, it converts the above type into
I this conversion was correct I could have read the file like this:
val df = sqlContext.read.schema(CRDataUtils.getSparkSchemaForAvro(sqlContext, avroSchema)).avro(path)
I also looked at com.databricks.spark.avro.SchemaConverters but the conversion method deftoSqlType(avroSchema: Schema):SchemaType returns SchemaType instead of StructType required by the above approach.
Can anyone help how to read Avro files with logical types in Spark.
Answer by subhamoy chowdhury · Mar 03, 2017 at 01:41 AM
facing same issue here. Can anyone please help?
I am using spark 1.6.0 in CDH 5.7
,Same issue here can anyone please help?
Answer by subhamoy chowdhury · Mar 03, 2017 at 08:03 PM
I am trying to write a dataframe to avro using databricks scala api. The writing is successful. But while reading the data from hive it is throwing exception:
Error: java.io.IOException: org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Failed to obtain scale value from file schema: "bytes" (state=,code=0)
In the avsc file I have column wityh type byte:
-->
{"name":"rate","type":["null",{"type":"bytes","logicalType":"decimal","precision":38,"scale":18}],"default":null}
reading
====================
val df = sqlContext.read.format("com.databricks.spark.avro")
.option("avroSchema", schema.toString)
.option("inferSchema", "true")
.avro(sourceFile)
.filter(preparePartitionFilterClause);
====================
writing
=======================
df.write.mode(SaveMode.Append).format("com.databricks.spark.avro").partitionBy(TrlConstants.PARTITION_COLUMN_COUNTRYCODE).save(path);
=======================
I am completely clue less please help!!!
subhamoy chowdhury, How do your questions above qualify as answers?
Answer by sai krishna Pujari · Apr 10, 2017 at 12:42 PM
@subhamoy chowdhuryI am also facing similar problem ..
Let me know if you got the solution.
Answer by Uthayakumar · Mar 01, 2018 at 09:41 AM
HI Subhamoy / Pujari, Similar thing happening for me, if any findings please share with me . thanks in advance.
UK
Answer by smiksha · Jun 11 at 12:29 PM
I am facing the same issue. Can someone suggest something? @Databricks_Support
Databricks Inc.
160 Spear Street, 13th Floor
San Francisco, CA 94105
info@databricks.com
1-866-330-0121