Spark - AWS EMR Serverless Runtime Service

Spark - AWS EMR Serverless Runtime Service

This document describes the integration between ModelOp’s spark-serverless-runtime-service and Amazon EMR Serverless.

Overview

ModelOp Center offers a Spark serverless runtime service that leverages Amazon EMR Serverless to submit and monitor Spark jobs in a scalable, cost-effective, and fully managed environment. The serverless runtime environment removes the need to provision and manage clusters, allowing the Spark serverless runtime service to run big data processing workloads efficiently and on demand.

Architecture

Spark-ServerlessVSTraditional-EMR Serverless Overview.png
High-level overview of the Spark runtime service implementation

Components

  • spark-serverless-runtime-service – Submits Spark jobs and monitors their execution

  • Amazon EMR Serverless – Executes Spark jobs

  • Amazon S3 – Stores input data, output data, model assets and other job dependencies

  • CloudWatch – Stores logs from job execution

Flow

Job Convertion.png
High-level overview of the job flow

PySparkPreprocessorService

  • Translates jobs of type MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB, and MODEL_BATCH_TRAINING_JOB into a PySparkJobManifest

    • Creates temporary, local and S3, files used during job execution, such as the ModelOpPySparkDriver.py, the primary source code, the job metadata, and any additional job files

SparkServerlessJobScheduler

  • Monitors jobs of type MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB and MODEL_BATCH_TRAINING_JOB in CREATED state with a SPARK_SERVERLESS_RUNTIME as the runtime type

    • Updates job status from CREATED to WAITING

    • Submits job for execution using SparkServerlessJobLauncher

    • Updates job with Spark application ID

  • Monitors jobs of type MODEL_BATCH_JOB, MODEL_BATCH_TEST_JOB and MODEL_BATCH_TRAINING_JOB in WAITING or RUNNING state with a SPARK_SERVERLESS_RUNTIME as the runtime type

    • Uses Spark application ID to monitor the job and its status on the Spark cluster using SparkServerlessJobMonitor

    • Updates job status based on the latest Spark application status using SparkReportingService

    • Updates job with the logs generated by the Spark cluster using SparkReportingService

    • Updates job with output data

    • Deletes job’s temporary working directory

Configuration

The following configurations must be set for the successful integration of the Spark serverless runtime service with Amazon EMR Serverless:

ModelOp Center Configs

emr-serverless-cluster: defaultAppName: <The default app name for the job run when one is not provided as part of the job params> releaseVersion: <The desired Amazon EMR Serverless release version> region: <The region in which the app and the jobs will be created> mainEmrServerlessRoleArn: <The IAM role arn with permissions to call EMR serverless APIs and store logs for CloudWatchLogsClient> defaultJobExecutionRoleArn: <The default IAM role arn for the job run when one is not provided as part of the job params> jobExecutionConf: # The next key, value pairs can be specified in the YAML as shown below or # using the monitor wizard in ModelOp Center as job params (please see example below) spark.archives: <The S3 location of the Python virtual environment compressed file> spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON: <Python directory within the Python virtual environment compressed file> spark.emr-serverless.driverEnv.PYSPARK_PYTHON: <Python directory within the Python virtual environment compressed file> spark.executorEnv.PYSPARK_PYTHON: <Python directory within the Python virtual environment compressed file>

For the given configs, please note that “jobExecutionConf” is a map, and “spark.archives”, “spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON”, etc. are keys of the map

Example Optional Optional ModelOp Center Job Params

The following key, value pairs are optional job params where --conf is part of the key:

--conf spark.archives s3://my_bucket/my_pyspark_venv.tar.gz#environment --conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON ./environment/bin/python --conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON ./environment/bin/python --conf spark.executorEnv.PYSPARK_PYTHON ./environment/bin/python --conf modelop.serverless.job.app.name <The app name for a particular job run> --conf modelop.serverless.job.rolearn <The IAM role arn for a particular job run>

Amazon EMR Serverless Configs

 

The required permissions can be defined as:

  1. One IAM role

  2. Multiple IAM roles

 

The IAM role(s) must grant access to:

  1. EMR Serverless APIs

  2. PassRole

  3. CloudWatchLogs

  4. The S3 bucket containing the input data

  5. The S3 bucket containing the model assets

    1. These assets are stored in S3 by model-manage. Therefore, the configured bucket for model-manage’s aws-s3-client is the one for which access is required

  6. The S3 bucket containing the temporary assets

    1. These assets are stored in S3 by spark-serverless-runtime-service. Therefore, the configured bucket for spark-serverless-runtime-service’s aws-s3-client is the one for which access is required

Example Amazon EMR Serverless Configs

The following configuration is an example of how the required permissions are split between two different IAM roles. The second IAM role allows for more granular control over S3 buckets:

  1. IAM role with permissions to call EMR Serverless APIs and CloudWatchLogs:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "emr-serverless:ListApplications", "emr-serverless:GetApplication", "emr-serverless:CreateApplication", "emr-serverless:StartApplication", "emr-serverless:StopApplication", "emr-serverless:StartJobRun", "emr-serverless:GetJobRun" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": "arn:aws:iam::account:role/<role with permissions to s3 bucket>" }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:*:*:*" ] }, { "Sid": "AllowCloudWatchLogs", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:GetLogEvents" ], "Resource": "*" } ] }
  1. IAM role with permissions to access input and/or output buckets and CloudWatchLogs:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:DeleteObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<S3 BUCKET NAME>", "arn:aws:s3:::<S3 BUCKET NAME>/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:*:*:*" ] }, { "Sid": "AllowCloudWatchLogs", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups", "logs:DescribeLogStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:GetLogEvents" ], "Resource": "*" } ] }