Description
Description
I have a Spark Structured Streaming job where I read Avro messages from Kafka, convert them to a Spark DataFrame using Abris, and then write the stream as a Delta table to S3. Below is the relevant configuration and code snippet.
Abris Configuration
val abrisConfig = AbrisConfig.fromConfluentAvro
.downloadReaderSchemaByVersion(1)
.andRecordNameStrategy(schemaName, schemaNamespace)
.usingSchemaRegistry(schemaRegistryUrl)
.withExceptionHandler(new PermissiveRecordExceptionHandler())
Code Example
Here’s an example where I write the stream to the console for demonstration purposes:
val df = reader
.readKafkaStream(topic, kafkaConfig, options)
.withColumnRenamed("headers", KAFKA_HEADERS)
.withColumn("kafka_meta", map_from_entries(col(KAFKA_HEADERS)))
.withColumn("kafka_meta_column", col("kafka_meta.meta"))
.withColumn(KAFKA_HEADERS_META, from_avro(col("kafka_meta_column"), abrisConfig))
df.select(KAFKA_HEADERS_META).writeStream
.outputMode("append")
.format("console")
.option("truncate", "true")
.start()
The kafka_meta.meta column contains the Avro message I want to deserialize using from_avro. The schema for the Avro message is as follows:
{
"type": "record",
"name": "Example",
"fields": [
{ "name": "id", "type": "string" },
{ "name": "timestamp", "type": "long" }
]
}
Problem
I encounter the following error during execution:
Caused by: java.lang.NullPointerException
Upon investigation, I concluded that this occurs because the PermissiveRecordExceptionHandler returns a null record for malformed data as expected. However, downstream transformations attempt to access the nested field meta (the column containing the Avro message), resulting in the NullPointerException.
Note: None of the columns originally contain null values, so the issue is not caused by the source data.
Question
How can I handle this scenario to avoid the NullPointerException while keeping the permissive behavior of the exception handler? Any guidance would be appreciated.
Thank you!