Skip to content

Issue: NullPointerException When Using PermissiveRecordExceptionHandler in Structured Streaming Job #368

Open
@talperetz1

Description

@talperetz1

Description
I have a Spark Structured Streaming job where I read Avro messages from Kafka, convert them to a Spark DataFrame using Abris, and then write the stream as a Delta table to S3. Below is the relevant configuration and code snippet.

Abris Configuration

val abrisConfig = AbrisConfig.fromConfluentAvro
  .downloadReaderSchemaByVersion(1)
  .andRecordNameStrategy(schemaName, schemaNamespace)
  .usingSchemaRegistry(schemaRegistryUrl)
  .withExceptionHandler(new PermissiveRecordExceptionHandler())

Code Example
Here’s an example where I write the stream to the console for demonstration purposes:

val df = reader
.readKafkaStream(topic, kafkaConfig, options)
.withColumnRenamed("headers", KAFKA_HEADERS)
.withColumn("kafka_meta", map_from_entries(col(KAFKA_HEADERS)))
.withColumn("kafka_meta_column", col("kafka_meta.meta"))
.withColumn(KAFKA_HEADERS_META, from_avro(col("kafka_meta_column"), abrisConfig))

df.select(KAFKA_HEADERS_META).writeStream
.outputMode("append")
.format("console")
.option("truncate", "true")
.start()

The kafka_meta.meta column contains the Avro message I want to deserialize using from_avro. The schema for the Avro message is as follows:

{
  "type": "record",
  "name": "Example",
  "fields": [
    { "name": "id", "type": "string" },
    { "name": "timestamp", "type": "long" }
  ]
}

Problem
I encounter the following error during execution:
Caused by: java.lang.NullPointerException

Upon investigation, I concluded that this occurs because the PermissiveRecordExceptionHandler returns a null record for malformed data as expected. However, downstream transformations attempt to access the nested field meta (the column containing the Avro message), resulting in the NullPointerException.

Note: None of the columns originally contain null values, so the issue is not caused by the source data.

Question
How can I handle this scenario to avoid the NullPointerException while keeping the permissive behavior of the exception handler? Any guidance would be appreciated.

Thank you!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions