Skip to content

Unable to deserialize the records via Abris Pyspark #366

Open
@debuggerrr

Description

@debuggerrr

I am trying to run this code where I am trying to install the packages during runtime

`from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.session import SparkSession

spark = SparkSession
.builder
.appName("Kafka_Test")
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.apache.spark:spark-avro_2.12:3.3.0,za.co.absa:abris_2.13:6.4.0")
.getOrCreate()

def from_avro(col, config):
"""
avro deserialize

:param col (PySpark column / str): column name "key" or "value"
:param config (za.co.absa.abris.config.FromAvroConfig): abris config, generated from abris_config helper function
:return: PySpark Column
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
abris_avro = jvm_gateway.za.co.absa.abris.avro

return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

def from_avro_abris_config(config_map, topic, is_key):
"""
Create from avro abris config with a schema url

:param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
:param topic (str): kafka topic
:param is_key (bool): boolean
:return: za.co.absa.abris.config.FromAvroConfig
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

return jvm_gateway.za.co.absa.abris.config \
    .AbrisConfig \
    .fromConfluentAvro() \
    .downloadReaderSchemaByLatestVersion() \
    .andTopicNameStrategy(topic, is_key) \
    .usingSchemaRegistry(scala_map)

def to_avro(col, config):
"""
avro serialize
:param col (PySpark column / str): column name "key" or "value"
:param config (za.co.absa.abris.config.ToAvroConfig): abris config, generated from abris_config helper function
:return: PySpark Column
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
abris_avro = jvm_gateway.za.co.absa.abris.avro

return Column(abris_avro.functions.to_avro(_to_java_column(col), config))

def to_avro_abris_config(config_map, topic, is_key):
"""
Create to avro abris config with a schema url

:param config_map (dict[str, str]): configuration map to pass to the serializer, ex: {'schema.registry.url': 'http://localhost:8081'}
:param topic (str): kafka topic
:param is_key (bool): boolean
:return: za.co.absa.abris.config.ToAvroConfig
"""
jvm_gateway = SparkContext._active_spark_context._gateway.jvm
scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

return jvm_gateway.za.co.absa.abris.config \
    .AbrisConfig \
    .toConfluentAvro() \
    .downloadSchemaByLatestVersion() \
    .andTopicNameStrategy(topic, is_key) \
    .usingSchemaRegistry(scala_map)

df = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test01").load()

from_avro_abris_settings = from_avro_abris_config({'schema.registry.url': 'http://schema-registry:8081'}, 'test01', False)
df2 = df.withColumn("parsed", from_avro("value", from_avro_abris_settings))
df2.show()`

But it is giving me the below error:

`:: problems summary ::
:::: WARNINGS
module not found: io.confluent#kafka-avro-serializer;6.2.1

==== local-m2-cache: tried

  file:/Users/sid/.m2/repository/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  file:/Users/sid/.m2/repository/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

==== local-ivy-cache: tried

  /Users/sid/.ivy2/local/io.confluent/kafka-avro-serializer/6.2.1/ivys/ivy.xml

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  /Users/sid/.ivy2/local/io.confluent/kafka-avro-serializer/6.2.1/jars/kafka-avro-serializer.jar

==== central: tried

  https://repo1.maven.org/maven2/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  https://repo1.maven.org/maven2/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

==== spark-packages: tried

  https://repos.spark-packages.org/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.pom

  -- artifact io.confluent#kafka-avro-serializer;6.2.1!kafka-avro-serializer.jar:

  https://repos.spark-packages.org/io/confluent/kafka-avro-serializer/6.2.1/kafka-avro-serializer-6.2.1.jar

	module not found: io.confluent#kafka-schema-registry-client;6.2.1

==== local-m2-cache: tried

  file:/Users/sid/.m2/repository/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  file:/Users/sid/.m2/repository/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

==== local-ivy-cache: tried

  /Users/sid/.ivy2/local/io.confluent/kafka-schema-registry-client/6.2.1/ivys/ivy.xml

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  /Users/sid/.ivy2/local/io.confluent/kafka-schema-registry-client/6.2.1/jars/kafka-schema-registry-client.jar

==== central: tried

  https://repo1.maven.org/maven2/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  https://repo1.maven.org/maven2/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

==== spark-packages: tried

  https://repos.spark-packages.org/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.pom

  -- artifact io.confluent#kafka-schema-registry-client;6.2.1!kafka-schema-registry-client.jar:

  https://repos.spark-packages.org/io/confluent/kafka-schema-registry-client/6.2.1/kafka-schema-registry-client-6.2.1.jar

	::::::::::::::::::::::::::::::::::::::::::::::

	::          UNRESOLVED DEPENDENCIES         ::

	::::::::::::::::::::::::::::::::::::::::::::::

	:: io.confluent#kafka-avro-serializer;6.2.1: not found

	:: io.confluent#kafka-schema-registry-client;6.2.1: not found

	::::::::::::::::::::::::::::::::::::::::::::::

:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: io.confluent#kafka-avro-serializer;6.2.1: not found, unresolved dependency: io.confluent#kafka-schema-registry-client;6.2.1: not found]
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1456)
` Please help me in this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions