Run Spark on Ray cluster on Vertex AI

The RayDP Python library makes it possible to run Spark on a Ray cluster. This document covers installing, configuring and running RayDP on Ray on Vertex AI (Ray cluster on Vertex AI).

Installation

Ray on Vertex AI enables users to run their applications using the open source Ray framework. RayDP provides APIs for running Spark on Ray. The prebuilt container images available to create a Ray cluster on Vertex AI don't come with RayDP pre-installed. This means you must create a custom Ray cluster on Vertex AI image for your Ray cluster on Vertex AI to run RayDP applications on Ray cluster on Vertex AI. The following section explains how to build a RayDP custom image.

Build a Ray on Vertex AI custom container image

Use this dockerfile to create a custom container image for Ray on Vertex AI that has RayDP installed.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

You can use the latest Ray cluster on Vertex AI prebuilt image for creating the RayDP custom image. You can also install other Python packages that you anticipate you'll use in your applications. The pyarrow==14.0 is due to a dependency constraint of Ray 2.42.0.

Build and push the custom container image

Create a Docker repository in Artifact Registry before you build your custom image (see Work with container images for how to create and configure your Docker repository). After you create the docker repository, build and push the custom container image using the Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Where:

  • LOCATION: The Cloud Storage location (for example, us-central1) that you created in your Artifact Registry.
  • PROJECT_ID: Your Google Cloud project ID.
  • DOCKER_REPOSITORY: The name of the docker repository that you created.
  • IMAGE_NAME: The name of your custom container images.

Create a Ray cluster on Vertex AI

Use the custom container image built in the previous step to create a Ray cluster on Vertex AI. You can use the Vertex AI SDK for Python for creating a Ray cluster on Vertex AI.

If you haven't done so yet, install the required Python libraries.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configure Head and Worker nodes and create the cluster using Vertex AI SDK for Python. For example:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Where:

  • CUSTOM_CONTAINER_IMAGE_URI: The URI of the custom container image pushed to Artifact Registry.
  • CLUSTER_NAME: The name of your Ray cluster on Vertex AI.

Spark on Ray cluster on Vertex AI

Before you run your Spark application, create a Spark session using the RayDP API. You can use the Ray client for doing this interactively or use the Ray job API. The Ray job API is recommended, especially for production and long-running applications. The RayDP API provides parameters to configure the Spark session, as well as supporting Spark Configuration. Learn more about the RayDP API for creating Spark Session see Spark master actors node affinity.

RayDP with Ray client

You can use Ray Task or Actor to create a Spark cluster and session on the Ray cluster on Vertex AI. Ray Task, or Actor, is required to use a Ray Client to create a Spark session on the Ray cluster on Vertex AI. The following code shows how a Ray Actor can create a Spark Session, run a Spark application, and stop a Spark cluster on a Ray cluster on Vertex AI using RayDP.

To interactively connect to the Ray cluster on Vertex AI, see Connect to a Ray cluster through Ray Client

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP with Ray Job API

Ray client is useful for small experiments that require interactive connection with the Ray cluster. The Ray Job API is the recommended way to run long-running and production jobs on a Ray cluster. This also applies to running Spark applications on the Ray cluster on Vertex AI.

Create a Python script that contains your Spark application code. For example:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Submit the job to run the python script using Ray Job API. For example:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Where:

  • SCRIPT_NAME: The filename of the script that you created.

Reading Cloud Storage files from Spark application

It's common practice to store data files in a Google Cloud Storage bucket. You can read these files in multiple ways from a Spark application that's running on the Ray cluster on Vertex AI. This section explains two techniques for reading Cloud Storage files from Spark applications running on Ray Cluster on Vertex AI.

Use the Google Cloud Storage Connector

You can use the Google Cloud Connector for Hadoop to read files from a Cloud Storage bucket from your Spark application. After you create a Spark session using RayDP, you can read files using a few configuration parameter. The following code shows how to read a CSV file stored in a Cloud Storage bucket from a Spark application on the Ray cluster on Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Where:

  • GCS_FILE_URI: The URI of a file stored in a Cloud Storage bucket. For example: gs://my-bucket/my-file.csv.

Use Ray data

The Google Cloud connector provides a way to read files from a Google Cloud bucket and it may be sufficient for most use cases. You might want to use Ray Data to read files from the Google Cloud bucket when you need to use Ray's distributed processing for reading data, or when you face issues reading Google Cloud file with Google Cloud connector. This could possibly happen because of Java dependency conflicts when some other application dependencies are added to the Spark Java classpath using either spark.jars.packages or spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

Pyspark Pandas UDF on Ray cluster on Vertex AI

Pyspark Pandas UDFs sometimes require additional code when you use them in your Spark application running on a Ray cluster on Vertex AI. This is usually required when the Pandas UDF uses a Python library that isn't available on the Ray cluster on Vertex AI. You can package the Python dependencies of an application using the runtime environment with the Ray Job API. After you submit the Ray job to the cluster, Ray installs those dependencies in the Python virtual environment that it creates for running the job. The Pandas UDFs, however, don't use the same virtual environment. Instead, they use the default Python System environment. If that dependency isn't available in the System environment, you might need to install it within your Pandas UDF. In the following example, install the statsmodels library within the UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()