Skip to main content
OCIDatabasesadvanced

OCI Data Flow: Managed Spark

Run serverless Apache Spark jobs on OCI Data Flow with applications, runs, Delta Lake, data sources, performance tuning, and pipeline orchestration.

CloudToolStack Team24 min readPublished Mar 14, 2026

Prerequisites

  • Basic understanding of Apache Spark and PySpark
  • OCI account with Data Flow and Object Storage permissions

Introduction to OCI Data Flow

OCI Data Flow is a fully managed Apache Spark service that lets you run Spark applications without managing clusters, infrastructure, or Spark versions. You submit your PySpark, Scala, or Java Spark applications, and Data Flow provisions the compute resources, executes the job, and automatically tears down the infrastructure when the job completes. This serverless model eliminates the operational overhead of managing Spark clusters while providing the full power of Apache Spark for batch processing, ETL, data engineering, and machine learning workloads.

Data Flow integrates natively with OCI Object Storage for reading and writing data, OCI Data Catalog for metadata management, Oracle Autonomous Database for structured data sources and sinks, and OCI Logging for job monitoring. It supports Spark 3.x with Delta Lake, Iceberg, and other popular Spark extensions.

This guide covers creating Data Flow applications, configuring and submitting runs, connecting to various data sources, performance tuning, cost optimization, and production pipeline patterns.

Data Flow Pricing

OCI Data Flow charges only for the compute resources consumed during job execution, billed per VM per hour. There are no charges for idle time, cluster management, or the Data Flow service itself. Driver and executor VMs use standard OCI compute shapes, with pricing varying by the shape selected. Object Storage I/O is priced separately.

Creating Data Flow Applications

A Data Flow application is a reusable definition of a Spark job. It specifies the Spark application code (stored in Object Storage), the Spark version, the compute shape and count for driver and executors, configuration parameters, and access permissions. You can create an application once and run it multiple times with different parameters.

Applications reference a Spark script or JAR file stored in OCI Object Storage. For PySpark applications, this is a Python file. For Scala/Java applications, it is a JAR file with the main class specified separately.

bash
# Upload your PySpark application to Object Storage
oci os object put \
  --namespace $NAMESPACE \
  --bucket-name "data-flow-apps" \
  --file etl_pipeline.py \
  --name "apps/etl_pipeline.py"

# Create a Data Flow application
oci data-flow application create \
  --compartment-id $C \
  --display-name "daily-etl-pipeline" \
  --language "PYTHON" \
  --spark-version "3.5.0" \
  --driver-shape "VM.Standard.E4.Flex" \
  --driver-shape-config '{"ocpus": 2, "memoryInGBs": 16}' \
  --executor-shape "VM.Standard.E4.Flex" \
  --executor-shape-config '{"ocpus": 4, "memoryInGBs": 32}' \
  --num-executors 4 \
  --file-uri "oci://<bucket>@<namespace>/apps/etl_pipeline.py" \
  --warehouse-bucket-uri "oci://<warehouse-bucket>@<namespace>/" \
  --logs-bucket-uri "oci://<logs-bucket>@<namespace>/" \
  --arguments '["--input", "oci://<bucket>@<namespace>/raw-data/", "--output", "oci://<bucket>@<namespace>/processed/"]'

# Create a Scala/Java application
oci data-flow application create \
  --compartment-id $C \
  --display-name "spark-etl-scala" \
  --language "SCALA" \
  --spark-version "3.5.0" \
  --driver-shape "VM.Standard.E4.Flex" \
  --driver-shape-config '{"ocpus": 2, "memoryInGBs": 16}' \
  --executor-shape "VM.Standard.E4.Flex" \
  --executor-shape-config '{"ocpus": 4, "memoryInGBs": 32}' \
  --num-executors 8 \
  --file-uri "oci://<bucket>@<namespace>/jars/etl-pipeline-1.0.jar" \
  --class-name "com.example.ETLPipeline" \
  --logs-bucket-uri "oci://<logs-bucket>@<namespace>/"

# List applications
oci data-flow application list \
  --compartment-id $C \
  --query 'data[].{"display-name":"display-name", language:language, "spark-version":"spark-version", "lifecycle-state":"lifecycle-state"}' \
  --output table

Submitting and Managing Runs

A run is a single execution of a Data Flow application. When you submit a run, Data Flow provisions the driver and executor VMs, initializes the Spark cluster, executes your application, and then automatically terminates all resources. Each run has its own logs, Spark UI, and resource consumption tracking.

You can submit runs using the OCI CLI, SDK, Console, or REST API. Runs can override application-level settings such as the number of executors, arguments, and configuration parameters. This allows you to use the same application definition with different data sets or resource allocations.

bash
# Submit a run using the application definition
oci data-flow run create \
  --compartment-id $C \
  --application-id <application-ocid> \
  --display-name "daily-etl-2026-03-14" \
  --arguments '["--input", "oci://<bucket>@<namespace>/raw-data/2026-03-14/", "--output", "oci://<bucket>@<namespace>/processed/2026-03-14/"]'

# Submit a run with overridden executor count
oci data-flow run create \
  --compartment-id $C \
  --application-id <application-ocid> \
  --display-name "large-backfill" \
  --num-executors 16 \
  --arguments '["--input", "oci://<bucket>@<namespace>/raw-data/", "--output", "oci://<bucket>@<namespace>/backfill/", "--date-range", "2026-01-01:2026-03-14"]'

# Monitor run status
oci data-flow run get \
  --run-id <run-ocid> \
  --query 'data.{"lifecycle-state":"lifecycle-state", "data-read-in-bytes":"data-read-in-bytes", "data-written-in-bytes":"data-written-in-bytes", "run-duration-in-milliseconds":"run-duration-in-milliseconds"}'

# List recent runs
oci data-flow run list \
  --compartment-id $C \
  --query 'data[].{"display-name":"display-name", "lifecycle-state":"lifecycle-state", "run-duration-ms":"run-duration-in-milliseconds", "time-created":"time-created"}' \
  --output table \
  --sort-by timeCreated \
  --sort-order DESC

# Get run logs
oci data-flow run get-log \
  --run-id <run-ocid> \
  --name "spark_driver_stdout"

# Cancel a running job
oci data-flow run cancel \
  --run-id <run-ocid>

Use the Spark UI for Debugging

Each Data Flow run generates a Spark UI that you can access from the OCI Console. The Spark UI shows detailed information about stages, tasks, executors, storage, and SQL queries. It is invaluable for understanding job performance, identifying data skew, and diagnosing failures. The Spark UI is available for 7 days after the run completes.

Connecting to Data Sources

Data Flow applications can read from and write to multiple data sources. The most common source is OCI Object Storage, which supports Parquet, ORC, JSON, CSV, Delta Lake, and Apache Iceberg formats. Additionally, Data Flow can connect to Oracle Autonomous Database, MySQL HeatWave, and other JDBC-compatible databases.

For Object Storage access, Data Flow uses resource principal authentication by default, meaning no credentials need to be stored in your application code. For database connections, use OCI Vault to securely store connection strings and passwords.

bash
# Example PySpark application reading from Object Storage
# etl_pipeline.py:
#
# from pyspark.sql import SparkSession
# import sys
#
# def main():
#     spark = SparkSession.builder \
#         .appName("DailyETL") \
#         .getOrCreate()
#
#     input_path = sys.argv[1]
#     output_path = sys.argv[2]
#
#     # Read JSON data from Object Storage
#     df = spark.read.json(input_path)
#
#     # Transform: filter, aggregate, enrich
#     result = df.filter(df.status == "active") \
#         .groupBy("region", "category") \
#         .agg({"amount": "sum", "id": "count"}) \
#         .withColumnRenamed("sum(amount)", "total_amount") \
#         .withColumnRenamed("count(id)", "record_count")
#
#     # Write as Parquet to Object Storage
#     result.write \
#         .mode("overwrite") \
#         .partitionBy("region") \
#         .parquet(output_path)
#
#     spark.stop()
#
# if __name__ == "__main__":
#     main()

# Example: Reading from Autonomous Database
# jdbc_url = "jdbc:oracle:thin:@<adb-name>_high?TNS_ADMIN=/opt/spark/work-dir/wallet"
# df = spark.read \
#     .format("jdbc") \
#     .option("url", jdbc_url) \
#     .option("dbtable", "SALES") \
#     .option("user", "ADMIN") \
#     .option("password", secret_value) \
#     .load()

# Upload dependent libraries
oci os object put \
  --namespace $NAMESPACE \
  --bucket-name "data-flow-apps" \
  --file oracle-jdbc-driver.jar \
  --name "libs/oracle-jdbc-driver.jar"

# Reference libraries in application config
# --archive-uri "oci://<bucket>@<namespace>/libs/"

Performance Tuning

Optimizing Data Flow performance involves tuning Spark configurations, choosing the right compute shapes, managing data partitioning, and minimizing shuffle operations. Here are the most impactful tuning strategies for OCI Data Flow:

Right-Size Executors: Choose executor shapes based on your workload profile. CPU-intensive workloads (complex transformations, ML) benefit from higher OCPU counts. Memory-intensive workloads (large joins, caching) benefit from higher memory ratios. Use flex shapes to customize the OCPU-to-memory ratio.

Optimize Parallelism: The number of partitions determines the parallelism of your Spark job. Too few partitions cause underutilization; too many cause excessive overhead. A good starting point is 2-4 partitions per executor OCPU.

Use Columnar Formats: Parquet and ORC provide significantly better performance than CSV or JSON due to columnar compression, predicate pushdown, and column pruning. Convert raw data to Parquet early in your pipeline.

Minimize Shuffles: Operations like groupBy(),join(), and repartition() trigger data shuffles, which are the most expensive operations in Spark. Use broadcast joins for small lookup tables and pre-partition data by common join keys.

bash
# Configure Spark properties for performance
oci data-flow application create \
  --compartment-id $C \
  --display-name "tuned-etl" \
  --language "PYTHON" \
  --spark-version "3.5.0" \
  --driver-shape "VM.Standard.E4.Flex" \
  --driver-shape-config '{"ocpus": 2, "memoryInGBs": 32}' \
  --executor-shape "VM.Standard.E4.Flex" \
  --executor-shape-config '{"ocpus": 4, "memoryInGBs": 64}' \
  --num-executors 8 \
  --file-uri "oci://<bucket>@<namespace>/apps/tuned_etl.py" \
  --logs-bucket-uri "oci://<logs-bucket>@<namespace>/" \
  --configuration '{
    "spark.sql.shuffle.partitions": "200",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    "spark.sql.adaptive.skewJoin.enabled": "true",
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.sql.parquet.compression.codec": "zstd",
    "spark.dynamicAllocation.enabled": "false"
  }'

# Performance monitoring metrics
oci monitoring metric-data summarize-metrics-data \
  --compartment-id $C \
  --namespace "oci_dataflow" \
  --query-text 'RunDurationMs[1h].mean()'

oci monitoring metric-data summarize-metrics-data \
  --compartment-id $C \
  --namespace "oci_dataflow" \
  --query-text 'DataReadBytes[1h].sum()'

Delta Lake and Iceberg Support

OCI Data Flow supports both Delta Lake and Apache Iceberg table formats, enabling ACID transactions, schema evolution, time travel, and incremental processing on OCI Object Storage. These lakehouse formats bridge the gap between data lake flexibility and data warehouse reliability.

Delta Lake provides ACID transactions for Object Storage, allowing concurrent reads and writes without data corruption. Time travel lets you query previous versions of your data, making it easy to audit changes, roll back errors, and reproduce historical analyses.

bash
# PySpark with Delta Lake:
# spark = SparkSession.builder \
#     .appName("DeltaETL") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .getOrCreate()
#
# # Write Delta table
# df.write \
#     .format("delta") \
#     .mode("overwrite") \
#     .save("oci://<bucket>@<namespace>/delta/sales")
#
# # Read Delta table
# sales = spark.read.format("delta").load("oci://<bucket>@<namespace>/delta/sales")
#
# # Time travel - read previous version
# sales_v0 = spark.read.format("delta") \
#     .option("versionAsOf", 0) \
#     .load("oci://<bucket>@<namespace>/delta/sales")
#
# # Upsert (merge) into Delta table
# from delta.tables import DeltaTable
# delta_table = DeltaTable.forPath(spark, "oci://<bucket>@<namespace>/delta/sales")
# delta_table.alias("target").merge(
#     updates.alias("source"),
#     "target.id = source.id"
# ).whenMatchedUpdateAll() \
#  .whenNotMatchedInsertAll() \
#  .execute()

# Create application with Delta Lake support
oci data-flow application create \
  --compartment-id $C \
  --display-name "delta-lake-etl" \
  --language "PYTHON" \
  --spark-version "3.5.0" \
  --driver-shape "VM.Standard.E4.Flex" \
  --driver-shape-config '{"ocpus": 2, "memoryInGBs": 16}' \
  --executor-shape "VM.Standard.E4.Flex" \
  --executor-shape-config '{"ocpus": 4, "memoryInGBs": 32}' \
  --num-executors 4 \
  --file-uri "oci://<bucket>@<namespace>/apps/delta_etl.py" \
  --logs-bucket-uri "oci://<logs-bucket>@<namespace>/" \
  --configuration '{
    "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
    "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
  }'

Scheduling and Pipeline Orchestration

Data Flow does not include a built-in scheduler, but it integrates with multiple OCI and third-party orchestration tools for production pipeline scheduling:

OCI Resource Scheduler: Schedule Data Flow runs on a cron-like schedule directly from the OCI Console without any additional infrastructure.

OCI Functions + Events: Trigger Data Flow runs in response to events, such as new files uploaded to Object Storage. An OCI Function receives the event and submits a Data Flow run using the SDK.

Apache Airflow: Use the OCI provider for Apache Airflow to orchestrate complex multi-step data pipelines that include Data Flow jobs alongside database operations, API calls, and data quality checks.

bash
# Schedule a Data Flow run using OCI CLI (in a cron job)
# Example crontab entry to run daily at 2 AM:
# 0 2 * * * /usr/local/bin/oci data-flow run create \
#   --compartment-id $C \
#   --application-id <application-ocid> \
#   --display-name "daily-etl-$(date +%Y-%m-%d)" 2>&1 >> /var/log/data-flow-cron.log

# Trigger Data Flow from an OCI Function (Python):
# import oci, json
# def handler(ctx, data):
#     signer = oci.auth.signers.get_resource_principals_signer()
#     df_client = oci.data_flow.DataFlowClient({}, signer=signer)
#
#     run = df_client.create_run(
#         oci.data_flow.models.CreateRunDetails(
#             compartment_id="<compartment-ocid>",
#             application_id="<application-ocid>",
#             display_name=f"triggered-run-{json.loads(data.getvalue())['data']['resourceName']}"
#         )
#     )
#     return {"run_id": run.data.id}

# Check if a run completed successfully
STATUS=$(oci data-flow run get \
  --run-id <run-ocid> \
  --query 'data."lifecycle-state"' \
  --raw-output)

if [ "$STATUS" = "SUCCEEDED" ]; then
  echo "Run completed successfully"
elif [ "$STATUS" = "FAILED" ]; then
  echo "Run failed - check logs"
  oci data-flow run get-log --run-id <run-ocid> --name "spark_driver_stderr"
fi

Production Best Practices

Running Data Flow in production requires attention to reliability, cost management, and operational visibility. Follow these best practices:

Idempotent Jobs: Design your Spark applications to be idempotent so they can be safely retried without producing duplicate or incorrect results. Usemode("overwrite") for full refreshes or Delta Lake merge for incremental updates.

Input Validation: Validate input data at the beginning of your Spark application before performing expensive transformations. Check for expected schema, null values, and data quality issues. Fail fast with clear error messages rather than producing incorrect output.

Cost Optimization: Use the smallest executor shape and count that meets your performance requirements. Enable Spark AQE (Adaptive Query Execution) to dynamically optimize shuffle partitions and join strategies. Monitor job durations and compare against budgets using OCI cost tracking tags.

Error Handling: Implement proper error handling in your Spark code with try/except blocks. Write failed records to a dead-letter location rather than failing the entire job. Use Data Flow run logs and the Spark UI to diagnose failures.

Version Control: Store your Spark application code in Git and deploy versioned artifacts to Object Storage. Include the version in the application display name and use tags to track which code version produced each output dataset.

OCI Object Storage & TiersAutonomous Database on OCIOCI Connector Hub Guide

Key Takeaways

  1. 1Data Flow is fully serverless: no cluster management, pay only for compute during job execution.
  2. 2Applications are reusable job definitions that can be run multiple times with different parameters.
  3. 3Delta Lake and Iceberg support enables ACID transactions and time travel on Object Storage.
  4. 4Spark AQE (Adaptive Query Execution) dynamically optimizes shuffle partitions and join strategies.

Frequently Asked Questions

How does OCI Data Flow compare to AWS EMR?
OCI Data Flow is fully serverless with no cluster management, while EMR requires cluster provisioning and management (unless using EMR Serverless). Data Flow charges only for compute during job execution with no idle cluster costs. EMR offers more customization including support for Hadoop, Hive, and Presto alongside Spark. Data Flow integrates natively with OCI Object Storage, Autonomous Database, and Data Catalog.
What Spark versions does Data Flow support?
OCI Data Flow supports Apache Spark 3.x including Spark 3.2, 3.3, and 3.5. The service includes built-in support for Delta Lake, Apache Iceberg, and common Spark extensions. PySpark, Scala, and Java applications are all supported. The Spark version is specified per application and can be updated without changing application code.

Written by CloudToolStack Team

Cloud engineers and architects with hands-on experience across AWS, Azure, and GCP. We write guides based on real-world production patterns, not just documentation rewrites.

Disclaimer: This guide is for educational purposes. Cloud services change frequently; always refer to official documentation for the latest information. AWS, Azure, and GCP are trademarks of their respective owners.