OCI Streaming & Events
Build event-driven architectures with OCI Streaming (Kafka-compatible), Events service, and Notifications for real-time data processing.
Prerequisites
- OCI account with Streaming and Events permissions
- Basic understanding of messaging and event-driven patterns
Introduction to OCI Streaming and Events
Modern cloud applications require real-time data ingestion, event-driven architectures, and reliable message delivery between services. Oracle Cloud Infrastructure provides three complementary services for these patterns: OCI Streaming for high-throughput data streaming, OCI Events for resource change notifications, and OCI Notifications for fan-out message delivery.
OCI Streaming is a fully managed, Apache Kafka-compatible service for ingesting and processing continuous data streams. It handles millions of messages per second with configurable retention periods, and because it speaks the Kafka protocol, existing Kafka producers and consumers work without code changes. OCI Events monitors OCI resource changes and emits structured events when actions occur, enabling automated responses to infrastructure changes. OCI Notifications provides pub/sub message delivery to email, PagerDuty, Slack (via Functions), and HTTPS endpoints.
This guide covers all three services in depth: creating and configuring streams, producing and consuming messages with both the OCI SDK and Kafka clients, setting up event rules to capture resource changes, configuring notification topics and subscriptions, and building event-driven architectures that chain these services together with OCI Functions.
Kafka Compatibility
OCI Streaming is compatible with Apache Kafka 2.x and 3.x client libraries. You can use any Kafka producer or consumer library (Java, Python, Go, Node.js) by pointing it at the OCI Streaming endpoint with SASL/PLAIN authentication. This means existing Kafka applications can migrate to OCI Streaming with minimal configuration changes. However, some Kafka admin operations like topic creation and consumer group management use OCI APIs rather than the Kafka admin protocol.
OCI Streaming Fundamentals
OCI Streaming organizes data into streams, which are equivalent to Kafka topics. Each stream has one or more partitions that provide parallel processing capability. Messages within a partition are strictly ordered by offset, but ordering across partitions is not guaranteed. You choose the number of partitions when creating a stream based on your throughput requirements.
Streams belong to stream pools, which are management containers that define encryption, networking, and Kafka endpoint settings. A stream pool provides a single Kafka bootstrap server endpoint that all streams within the pool share. You can create multiple stream pools for different environments or security boundaries.
Creating a Stream Pool and Stream
# Create a stream pool with default settings
oci streaming admin stream-pool create \
--compartment-id <compartment-ocid> \
--name "production-events" \
--kafka-settings '{
"autoCreateTopicsEnable": false,
"numPartitions": 1,
"logRetentionHours": 24,
"replicationFactor": 1
}'
# Get the stream pool details including Kafka endpoint
oci streaming admin stream-pool get \
--stream-pool-id <stream-pool-ocid> \
--query 'data.{"Name": name, "Kafka Endpoint": "kafka-settings"."bootstrap-servers", "State": "lifecycle-state"}' \
--output table
# Create a stream (equivalent to a Kafka topic)
oci streaming admin stream create \
--compartment-id <compartment-ocid> \
--name "order-events" \
--partitions 6 \
--stream-pool-id <stream-pool-ocid> \
--retention-in-hours 168
# List all streams in a pool
oci streaming admin stream list \
--compartment-id <compartment-ocid> \
--stream-pool-id <stream-pool-ocid> \
--lifecycle-state ACTIVE \
--query 'data[].{"Name": name, "Partitions": partitions, "Retention (hrs)": "retention-in-hours", "ID": id}' \
--output tablePartition Count Planning
Each partition provides approximately 1 MB/s write and 2 MB/s read throughput. For a stream that needs to handle 10 MB/s of incoming data, create at least 10 partitions. More partitions also mean more parallel consumers. A good rule of thumb is to set partitions equal to the maximum number of consumer instances you expect to run, since each partition is consumed by exactly one consumer within a consumer group. You cannot reduce partition count after creation, so plan for growth.
Producing Messages to Streams
You can produce messages to OCI Streaming using the OCI SDK (REST API), the Kafka producer protocol, or the OCI CLI. Each message consists of a key (optional, used for partition routing), a value (the message payload, up to 1 MB), and headers (optional key-value metadata). Messages with the same key always go to the same partition, preserving ordering for related events.
Using the OCI SDK (Python)
import oci
import json
import base64
from datetime import datetime
# Configure the streaming client
config = oci.config.from_file()
stream_client = oci.streaming.StreamClient(
config,
service_endpoint="https://cell-1.streaming.<region>.oci.oraclecloud.com"
)
stream_id = "<stream-ocid>"
# Produce a single message
message = {
"orderId": "ORD-2026-0001",
"customerId": "CUST-4829",
"items": [
{"sku": "WIDGET-001", "quantity": 3, "price": 29.99},
{"sku": "GADGET-042", "quantity": 1, "price": 149.99}
],
"total": 239.96,
"timestamp": datetime.utcnow().isoformat()
}
# Messages must be base64-encoded
encoded_value = base64.b64encode(
json.dumps(message).encode()
).decode()
encoded_key = base64.b64encode(
"ORD-2026-0001".encode()
).decode()
result = stream_client.put_messages(
stream_id=stream_id,
put_messages_details=oci.streaming.models.PutMessagesDetails(
messages=[
oci.streaming.models.PutMessagesDetailsEntry(
key=encoded_key,
value=encoded_value
)
]
)
)
print(f"Published {len(result.data.entries)} messages")
for entry in result.data.entries:
print(f" Partition: {entry.partition}, Offset: {entry.offset}")
# Produce a batch of messages
batch_messages = []
for i in range(100):
msg = {
"eventType": "page_view",
"userId": f"user-{i % 20}",
"page": f"/products/{i}",
"timestamp": datetime.utcnow().isoformat()
}
batch_messages.append(
oci.streaming.models.PutMessagesDetailsEntry(
key=base64.b64encode(f"user-{i % 20}".encode()).decode(),
value=base64.b64encode(json.dumps(msg).encode()).decode()
)
)
# Produce in batches of 50 (API limit)
for chunk_start in range(0, len(batch_messages), 50):
chunk = batch_messages[chunk_start:chunk_start + 50]
result = stream_client.put_messages(
stream_id=stream_id,
put_messages_details=oci.streaming.models.PutMessagesDetails(
messages=chunk
)
)
failed = sum(1 for e in result.data.entries if e.error)
print(f"Batch {chunk_start//50 + 1}: {len(chunk)} sent, {failed} failed")Using the Kafka Producer (Python)
from confluent_kafka import Producer
import json
# OCI Streaming Kafka configuration
kafka_config = {
'bootstrap.servers': 'cell-1.streaming.<region>.oci.oraclecloud.com:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
# Username format: <tenancy-name>/<username>/<stream-pool-ocid>
'sasl.username': '<tenancy>/<user>/<stream-pool-ocid>',
'sasl.password': '<auth-token>',
'acks': 'all',
'retries': 3,
'linger.ms': 5,
'batch.size': 16384
}
producer = Producer(kafka_config)
def delivery_callback(err, msg):
if err:
print(f"Delivery failed: {err}")
else:
print(f"Delivered to {msg.topic()}[{msg.partition()}] @ offset {msg.offset()}")
# Produce messages using standard Kafka API
for i in range(1000):
event = {
"eventType": "click",
"userId": f"user-{i % 50}",
"elementId": f"btn-{i % 10}",
"timestamp": "2026-03-14T10:30:00Z"
}
producer.produce(
topic="clickstream-events",
key=f"user-{i % 50}",
value=json.dumps(event),
callback=delivery_callback
)
# Poll for delivery callbacks periodically
if i % 100 == 0:
producer.poll(0)
# Wait for all messages to be delivered
producer.flush(timeout=30)Consuming Messages from Streams
Consumers read messages from stream partitions using either the OCI SDK or Kafka consumer protocol. The OCI SDK uses a cursor-based model where you create a cursor at a specific position (beginning, latest, specific offset, or time) and then fetch messages in a loop. The Kafka consumer protocol provides the familiar consumer group model with automatic partition assignment and offset management.
import oci
import json
import base64
# OCI SDK consumer
config = oci.config.from_file()
stream_client = oci.streaming.StreamClient(
config,
service_endpoint="https://cell-1.streaming.<region>.oci.oraclecloud.com"
)
stream_id = "<stream-ocid>"
# Create a cursor to start reading from the latest messages
cursor_response = stream_client.create_cursor(
stream_id=stream_id,
create_cursor_details=oci.streaming.models.CreateCursorDetails(
partition="0",
type="LATEST"
)
)
cursor = cursor_response.data.value
# Other cursor types:
# type="TRIM_HORIZON" - Read from the beginning of retention
# type="AT_OFFSET", offset=12345 - Start at a specific offset
# type="AT_TIME", time="2026-03-14T00:00:00Z" - Start at a timestamp
# Consume messages in a loop
import time
while True:
response = stream_client.get_messages(
stream_id=stream_id,
cursor=cursor,
limit=100
)
if response.data:
for msg in response.data:
key = base64.b64decode(msg.key).decode() if msg.key else None
value = json.loads(base64.b64decode(msg.value).decode())
print(f"Partition: {msg.partition}, Offset: {msg.offset}, Key: {key}")
print(f" Value: {json.dumps(value, indent=2)}")
# The next cursor is in the response header
cursor = response.headers.get("opc-next-cursor")
if not response.data:
time.sleep(1) # Back off when no new messages# Kafka consumer with consumer groups
from confluent_kafka import Consumer, KafkaError
kafka_config = {
'bootstrap.servers': 'cell-1.streaming.<region>.oci.oraclecloud.com:9092',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
'sasl.username': '<tenancy>/<user>/<stream-pool-ocid>',
'sasl.password': '<auth-token>',
'group.id': 'order-processor-group',
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 5000
}
consumer = Consumer(kafka_config)
consumer.subscribe(['order-events'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
print(f"Consumer error: {msg.error()}")
break
key = msg.key().decode() if msg.key() else None
value = json.loads(msg.value().decode())
print(f"Topic: {msg.topic()}, Partition: {msg.partition()}, Offset: {msg.offset()}")
print(f" Key: {key}, Value: {value}")
# Process the message...
process_order(value)
finally:
consumer.close()OCI Events Service
The OCI Events service monitors your OCI resources and emits events when specific actions occur. For example, an event is emitted when a compute instance is launched or terminated, when an object is uploaded to a bucket, when a database backup completes, or when an IAM policy is changed. You create event rules that match specific event types and route them to targets like Functions, Streaming, or Notifications.
Events follow the CloudEvents specification and include the event type, source resource OCID, timestamp, compartment information, and event-specific data. This makes them ideal for audit logging, automated remediation, and triggering downstream workflows.
# Create an event rule that triggers when objects are created in a bucket
oci events rule create \
--compartment-id <compartment-ocid> \
--display-name "object-upload-processor" \
--description "Process new objects uploaded to the data-lake bucket" \
--is-enabled true \
--condition '{
"eventType": ["com.oraclecloud.objectstorage.createobject"],
"data": {
"compartmentId": ["<compartment-ocid>"],
"additionalDetails": {
"bucketName": ["data-lake"],
"namespace": ["<namespace>"]
}
}
}' \
--actions '{
"actions": [
{
"actionType": "FAAS",
"functionId": "<function-ocid>",
"isEnabled": true
}
]
}'
# Create a rule for compute instance state changes
oci events rule create \
--compartment-id <compartment-ocid> \
--display-name "instance-state-monitor" \
--description "Alert on instance termination or stop" \
--is-enabled true \
--condition '{
"eventType": [
"com.oraclecloud.computeapi.terminateinstance.begin",
"com.oraclecloud.computeapi.instanceaction.begin"
],
"data": {
"compartmentId": ["<compartment-ocid>"]
}
}' \
--actions '{
"actions": [
{
"actionType": "ONS",
"topicId": "<topic-ocid>",
"isEnabled": true
},
{
"actionType": "OSS",
"streamId": "<stream-ocid>",
"isEnabled": true
}
]
}'
# List event rules
oci events rule list \
--compartment-id <compartment-ocid> \
--query 'data[].{"Name": "display-name", "Enabled": "is-enabled", "State": "lifecycle-state"}' \
--output tableEvent Delivery Guarantees
OCI Events provides at-least-once delivery, meaning the same event may be delivered more than once in rare cases (for example, during service failovers). Your event handlers must be idempotent: processing the same event twice should produce the same result. Use the event's unique eventID field for deduplication if your downstream processing cannot tolerate duplicates.
OCI Notifications Service
OCI Notifications provides a pub/sub messaging service for sending alerts and notifications to humans and systems. You create topics, subscribe endpoints to topics, and then publish messages to topics. Subscriptions can be email addresses, HTTPS endpoints (webhooks), PagerDuty integration keys, Slack (via Functions), SMS (via Functions), or OCI Functions for custom processing.
# Create a notification topic
oci ons topic create \
--compartment-id <compartment-ocid> \
--name "infrastructure-alerts" \
--description "Critical infrastructure event notifications"
# Subscribe an email address
oci ons subscription create \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--protocol EMAIL \
--endpoint "oncall-team@example.com"
# Subscribe an HTTPS webhook endpoint
oci ons subscription create \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--protocol HTTPS \
--endpoint "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXX"
# Subscribe a PagerDuty integration
oci ons subscription create \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--protocol PAGERDUTY \
--endpoint "https://events.pagerduty.com/integration/<routing-key>/enqueue"
# Subscribe an OCI Function
oci ons subscription create \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--protocol ORACLE_FUNCTIONS \
--endpoint "<function-ocid>"
# Publish a message to a topic
oci ons message publish \
--topic-id <topic-ocid> \
--title "High CPU Alert" \
--body "Instance i-abc123 CPU utilization exceeded 95% for 10 minutes."
# List subscriptions for a topic
oci ons subscription list \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--query 'data[].{"Protocol": protocol, "Endpoint": endpoint, "State": "lifecycle-state"}' \
--output tableBuilding Event-Driven Architectures
The real power of OCI messaging services emerges when you combine them. A typical event-driven architecture on OCI uses Events to detect resource changes, Streaming for high-volume data ingestion and buffering, Functions for serverless processing, and Notifications for human alerting. Here is a complete example of an image processing pipeline triggered by Object Storage uploads.
# OCI Function that processes new images uploaded to Object Storage
# Triggered by an OCI Event rule on com.oraclecloud.objectstorage.createobject
import io
import json
import logging
import oci
from fdk import response
def handler(ctx, data: io.BytesIO):
logging.getLogger().info("Image processing function triggered")
try:
event = json.loads(data.getvalue())
bucket_name = event["data"]["additionalDetails"]["bucketName"]
object_name = event["data"]["resourceName"]
namespace = event["data"]["additionalDetails"]["namespace"]
logging.info(f"Processing: {namespace}/{bucket_name}/{object_name}")
# Use resource principal auth (auto-configured in Functions)
signer = oci.auth.signers.get_resource_principals_signer()
object_storage = oci.object_storage.ObjectStorageClient(
config={}, signer=signer
)
# Download the uploaded image
obj = object_storage.get_object(namespace, bucket_name, object_name)
image_data = obj.data.content
# Process the image (resize, generate thumbnail, etc.)
from PIL import Image
img = Image.open(io.BytesIO(image_data))
img.thumbnail((256, 256))
thumbnail_buffer = io.BytesIO()
img.save(thumbnail_buffer, format='JPEG', quality=85)
thumbnail_bytes = thumbnail_buffer.getvalue()
# Upload the thumbnail to a different bucket
object_storage.put_object(
namespace,
"thumbnails",
f"thumb-{object_name}",
thumbnail_bytes,
content_type="image/jpeg"
)
# Publish a message to a stream for downstream analytics
stream_client = oci.streaming.StreamClient(
config={}, signer=signer,
service_endpoint="https://cell-1.streaming.<region>.oci.oraclecloud.com"
)
import base64
message = {
"eventType": "image_processed",
"originalObject": object_name,
"thumbnailObject": f"thumb-{object_name}",
"originalSize": len(image_data),
"thumbnailSize": len(thumbnail_bytes),
"dimensions": {"width": img.width, "height": img.height}
}
stream_client.put_messages(
stream_id="<stream-ocid>",
put_messages_details=oci.streaming.models.PutMessagesDetails(
messages=[
oci.streaming.models.PutMessagesDetailsEntry(
key=base64.b64encode(object_name.encode()).decode(),
value=base64.b64encode(json.dumps(message).encode()).decode()
)
]
)
)
return response.Response(
ctx,
response_data=json.dumps({"status": "processed", "thumbnail": f"thumb-{object_name}"}),
headers={"Content-Type": "application/json"}
)
except Exception as e:
logging.error(f"Error processing image: {str(e)}")
raiseStream Processing Patterns
Beyond simple produce-consume patterns, OCI Streaming supports several advanced stream processing architectures. These include event sourcing (using streams as an immutable event log), CQRS (separating read and write models with streams as the communication channel), stream aggregation (computing running totals or averages over time windows), and fan-out (one stream feeding multiple independent consumer groups).
Common Streaming Patterns
| Pattern | Use Case | Implementation |
|---|---|---|
| Event Sourcing | Audit trail, state reconstruction | Long retention stream with append-only writes |
| Fan-Out | Multiple consumers processing same events | Multiple consumer groups on one stream |
| Event Filtering | Route events to specialized processors | Consumer reads all, filters by message key or type |
| Dead Letter Queue | Handle processing failures | Separate stream for failed messages |
| Stream-to-Stream | Transform and enrich events | Consumer reads, transforms, produces to new stream |
| Aggregation | Real-time metrics and counts | Consumer maintains in-memory windows |
Connector Hub for ETL
OCI Connector Hub (formerly Service Connector Hub) can automatically move data between OCI services without custom code. For example, you can configure a connector to read from a stream and write to Object Storage, Logging Analytics, or Monitoring. This is ideal for archiving stream data, feeding analytics pipelines, or creating custom metrics from streaming events without writing any consumer code.
Monitoring and Operational Best Practices
Monitoring streaming and event-driven architectures requires tracking both the infrastructure metrics (stream throughput, consumer lag, partition utilization) and the application-level metrics (event processing latency, error rates, dead letter queue depth). OCI provides built-in metrics for Streaming, Events, and Notifications in the Monitoring service.
# View stream metrics - message throughput
oci monitoring metric-data summarize-metrics-data \
--compartment-id <compartment-ocid> \
--namespace oci_streaming \
--query-text 'PutMessages[1m]{streamId = "<stream-ocid>"}.rate' \
--start-time "2026-03-14T00:00:00Z" \
--end-time "2026-03-14T23:59:59Z"
# View consumer lag (messages behind)
oci monitoring metric-data summarize-metrics-data \
--compartment-id <compartment-ocid> \
--namespace oci_streaming \
--query-text 'GetMessages[5m]{streamId = "<stream-ocid>"}.count' \
--start-time "2026-03-14T00:00:00Z" \
--end-time "2026-03-14T23:59:59Z"
# Create an alarm for high consumer lag
oci monitoring alarm create \
--compartment-id <compartment-ocid> \
--display-name "High Consumer Lag" \
--namespace oci_streaming \
--query 'ConsumerLag[5m]{streamId = "<stream-ocid>"}.max > 10000' \
--severity CRITICAL \
--is-enabled true \
--destinations '["<notification-topic-ocid>"]' \
--pending-duration "PT5M" \
--body "Consumer lag exceeds 10,000 messages for stream order-events"
# View Events service delivery metrics
oci monitoring metric-data summarize-metrics-data \
--compartment-id <compartment-ocid> \
--namespace oci_events \
--query-text 'EventsDelivered[1h]{ruleId = "<rule-ocid>"}.count' \
--start-time "2026-03-14T00:00:00Z" \
--end-time "2026-03-14T23:59:59Z"
# Check notification delivery status
oci ons subscription list \
--compartment-id <compartment-ocid> \
--topic-id <topic-ocid> \
--query 'data[].{"Protocol": protocol, "Endpoint": endpoint, "State": "lifecycle-state", "Delivered": "delivery-policy"}' \
--output tableOperational Checklist
Follow these practices for reliable event-driven architectures on OCI. Set stream retention to match your recovery requirements (minimum 24 hours, maximum 168 hours). Use multiple partitions for streams that need parallel processing. Implement idempotent consumers to handle duplicate deliveries. Configure dead letter queues for messages that fail processing after retries. Set up alarms on consumer lag, delivery failures, and processing error rates. Use separate stream pools for production and development environments.
For high-availability scenarios, use private endpoints with service gateways so streaming traffic stays on the Oracle backbone network. Enable encryption at rest with customer-managed keys from OCI Vault for sensitive data streams. Tag all streaming resources with environment and application labels for cost tracking and access control.
OCI Functions: Serverless GuideOCI Logging AnalyticsOCI VCN Networking Deep DiveKey Takeaways
- 1OCI Streaming is Kafka-compatible, allowing existing Kafka clients to connect without code changes.
- 2OCI Events monitors resource changes and routes structured events to Functions, Streaming, or Notifications.
- 3Stream partitions provide parallel processing with strict ordering within each partition.
- 4Connector Hub moves data between OCI services without custom consumer code.
Frequently Asked Questions
Is OCI Streaming fully compatible with Apache Kafka?
What is the maximum retention for OCI Streaming?
Written by CloudToolStack Team
Cloud engineers and architects with hands-on experience across AWS, Azure, and GCP. We write guides based on real-world production patterns, not just documentation rewrites.
Disclaimer: This guide is for educational purposes. Cloud services change frequently; always refer to official documentation for the latest information. AWS, Azure, and GCP are trademarks of their respective owners.