Open
Description
I was using abris with confluent schema registry to deserialize avro records received from kafka source.
When i use confluent schema registry and manually get the schema and pass it to spark default from_avro function by skipping first 6 bytes i was able to read records. I wish to do the same using abris. Since abris as a library does that. But when i am using abris 0 byte files are written. Is this issue with Abris ?
Working code without Abris:
package pruthvi.kafka.poc
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.functions.from_avro
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.avro.Schema
import java.util
import org.apache.spark.sql.avro.functions._
object Important1 {
def main(args: Array[String]): Unit = {
println("Hello world!")
try {
val spark: SparkSession = SparkSession.builder
.master("local[3]")
.appName("Kafka testing")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.config("spark.sql.shuffle.partitions",3)
.getOrCreate()
spark.conf.set("spark.sql.avro.compression.codec", "uncompressed")
val topicName = "foo"
val df = spark.readStream
.format("kafka")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "certs/truststore.jks")
.option("kafka.ssl.keystore.location", "certs/keystore.jks")
.option("kafka.ssl.key.password", "foo")
.option("kafka.ssl.keystore.password", "foo")
.option("kafka.ssl.truststore.password", "foo")
.option("kafka.bootstrap.servers", "x:16501,y:16501,z:16501")
.option("subscribe", topicName)
.option("kafka.group.id", "foo")
.option("startingOffsets", "earliest")
.load()
val schemaRegistryURL = "url"
val restService = new RestService(schemaRegistryURL)
val valueRestResponseSchema = restService.getLatestVersion(topicName)
val jsonSchema = valueRestResponseSchema.getSchema
import spark.implicits._
val dsAvroRecord = df
.selectExpr("substring(value, 6) as avro_value")
.select(
from_avro($"avro_value", jsonSchema, fromAvroConfig).as("RecordValue"))
dsAvroRecord.writeStream
.outputMode("append")
.format("json")
.option("path", "output")
.trigger(Trigger.ProcessingTime(1))
.option("checkpointLocation", "chk_point_dir")
.start().awaitTermination()
}
catch{
case e:Exception=>{
println(e.printStackTrace())
}
}
}
}
With Abris :
package pruthvi.kafka.poc
package examples
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.streaming.Trigger
import za.co.absa.abris.config.AbrisConfig
import scala.concurrent.duration.Duration
object readStreamingData {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.master("local[4]")
.appName("Kafka testing")
.config("spark.streaming.stopGracefullyOnShutdown", "true")
.getOrCreate()
val topicName = "foo"
val df = spark.readStream
.format("kafka")
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", "certs/truststore.jks")
.option("kafka.ssl.keystore.location", "certs/keystore.jks")
.option("kafka.ssl.key.password", "")
.option("kafka.ssl.keystore.password", "")
.option("kafka.ssl.truststore.password", "")
.option(
"kafka.bootstrap.servers",
"x:16501,y:16501,z:16501"
)
.option("kafka.group.id", "foo")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger","50")
.load()
val abrisConfig =
AbrisConfig.fromConfluentAvro.downloadReaderSchemaByLatestVersion
.andTopicNameStrategy(topicName)
.usingSchemaRegistry("url")
import za.co.absa.abris.avro.functions.from_avro
val deserialized = df.select(from_avro(col("value"), abrisConfig) as 'data)
deserialized
.writeStream
.option("path", "output")
.option("checkpointLocation", "chk_point_dir")
.outputMode("append")
.format("parquet")
.trigger(Trigger.ProcessingTime(1000))
.start()
.awaitTermination()
}
}
Metadata
Metadata
Assignees
Labels
No labels