Skip to content

Getting zero byte files using abris #307

Open
@kpr9991

Description

@kpr9991

I was using abris with confluent schema registry to deserialize avro records received from kafka source.
When i use confluent schema registry and manually get the schema and pass it to spark default from_avro function by skipping first 6 bytes i was able to read records. I wish to do the same using abris. Since abris as a library does that. But when i am using abris 0 byte files are written. Is this issue with Abris ?

Working code without Abris:

package pruthvi.kafka.poc
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.avro.Schema

import java.util
import org.apache.spark.sql.avro.functions._

object Important1 {

  def main(args: Array[String]): Unit = {
    println("Hello world!")

try {
      val spark: SparkSession = SparkSession.builder
      .master("local[3]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .config("spark.sql.shuffle.partitions",3)
      .getOrCreate()
      spark.conf.set("spark.sql.avro.compression.codec", "uncompressed")

      val topicName = "foo"

      val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "foo")
      .option("kafka.ssl.keystore.password", "foo")
      .option("kafka.ssl.truststore.password", "foo")
      .option("kafka.bootstrap.servers", "x:16501,y:16501,z:16501")
      .option("subscribe", topicName)
      .option("kafka.group.id", "foo")
      .option("startingOffsets", "earliest")
    .load()

  val schemaRegistryURL = "url"
  val restService = new RestService(schemaRegistryURL)
  val valueRestResponseSchema = restService.getLatestVersion(topicName)
val jsonSchema = valueRestResponseSchema.getSchema

      import spark.implicits._
      val dsAvroRecord = df
        .selectExpr("substring(value, 6) as avro_value")
        .select(
      from_avro($"avro_value", jsonSchema, fromAvroConfig).as("RecordValue")) 

  dsAvroRecord.writeStream
    .outputMode("append")
    .format("json")
    .option("path", "output")
    .trigger(Trigger.ProcessingTime(1))
    .option("checkpointLocation", "chk_point_dir")
    .start().awaitTermination()
}
    catch{
      case e:Exception=>{
        println(e.printStackTrace())
      }
    }
  }
}

With Abris :

package pruthvi.kafka.poc
package examples

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.config.AbrisConfig

import scala.concurrent.duration.Duration

object readStreamingData {

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession.builder
      .master("local[4]")
      .appName("Kafka testing")
      .config("spark.streaming.stopGracefullyOnShutdown", "true")
      .getOrCreate()

    val topicName = "foo"
    val df = spark.readStream
      .format("kafka")
      .option("kafka.security.protocol", "SSL")
      .option("kafka.ssl.truststore.location", "certs/truststore.jks")
      .option("kafka.ssl.keystore.location", "certs/keystore.jks")
      .option("kafka.ssl.key.password", "")
      .option("kafka.ssl.keystore.password", "")
      .option("kafka.ssl.truststore.password", "")
      .option(
        "kafka.bootstrap.servers",
        "x:16501,y:16501,z:16501"
      )
      .option("kafka.group.id", "foo")
      .option("subscribe", topicName)
      .option("startingOffsets", "earliest")
      .option("maxOffsetsPerTrigger","50")
      .load()

    val abrisConfig =
      AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
        .andTopicNameStrategy(topicName)
        .usingSchemaRegistry("url")

    import za.co.absa.abris.avro.functions.from_avro
    val deserialized = df.select(from_avro(col("value"), abrisConfig) as 'data)

    deserialized
    .writeStream
    .option("path", "output")
    .option("checkpointLocation", "chk_point_dir")
    .outputMode("append")
    .format("parquet")
    .trigger(Trigger.ProcessingTime(1000))
    .start()
    .awaitTermination()



  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions