Spark Streaming with PubSub Lite

I want to create a Spark Streaming application in DataProc that has a subscription to PubSub Lite or PubSub (I discovered there is no support for a regular PubSub service, so I tried to use the "Lite" service).

The idea is straightforward: as soon as a new message arrives in PubSub Lite, Spark pipeline should be processed.

Here is a simple code example I used - https://cloud.google.com/pubsub/lite/docs/samples/pubsublite-spark-streaming-from-pubsublite

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType

# TODO(developer):
# project_number = 11223344556677
# location = "us-central1-a"
# subscription_id = "your-subscription-id"

spark
= SparkSession.builder.appName("read-app").master("yarn").getOrCreate()

sdf
= (
    spark
.readStream.format("pubsublite")
   
.option(
       
"pubsublite.subscription",
        f
"projects/{project_number}/locations/{location}/subscriptions/{subscription_id}",
   
)
   
.load()
)

sdf
= sdf.withColumn("data", sdf.data.cast(StringType()))

query
= (
    sdf
.writeStream.format("console")
   
.outputMode("append")
   
.trigger(processingTime="1 second")
   
.start()
)

# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query
.awaitTermination(120)
query
.stop() 

I expect the pipeline to be triggered immediately after a new message is published into pubsub, but it doesn't happen, instead, all the accumulated messages are processed in 1 minute interval time.


1. What configuration properties am I missing?
2. Is there some 3rd party connector for a regular PubSub service or a workaround?

Solved Solved
2 4 652
2 ACCEPTED SOLUTIONS

Your current Spark Streaming setup with Pub/Sub Lite processes data every second. To enhance this for near-real-time processing, consider the following adjustments:

Reducing Trigger Interval:

  • While Spark Structured Streaming doesn't support triggering on individual messages, you can approximate near-real-time processing by reducing the trigger interval.
  • Example of setting it to 100 milliseconds:
 
# ... previous code ...
# Trigger every 100 milliseconds
query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="100 milliseconds")
    .start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
  • Decreases latency but doesn't guarantee processing of each message as it arrives.
  • Be aware that very short intervals can increase resource usage, such as CPU.

Exploring Pub/Sub Connectors:

  • Spark doesn't have native support for Google Cloud Pub/Sub, but you can explore third-party connectors:
    • Google Cloud Dataproc Pub/Sub Connector: Official Google connector for batch processing with Dataproc, allowing Spark jobs to interact with Pub/Sub. Not designed for streaming.
    • Spark PubSub Connector: Open-source connector that integrates Spark with Google Cloud Pub/Sub. Not officially supported by Google, so evaluate its compatibility with your Spark environment.

Alternative Approaches:

  • Cloud Dataflow: For real-time streaming with per-message processing, Google Cloud Dataflow is an excellent choice. Offers native Pub/Sub integration and is a fully managed service for running Apache Beam pipelines.
  • Direct Pub/Sub Integration: Implement custom code within your Spark application to interact directly with the Pub/Sub API. Requires careful management of message acknowledgment and error handling.

Select the method that aligns best with your project's latency, throughput, and resource requirements.

View solution in original post

The issue you're encountering, where messages are processed in batches with a delay despite setting a 100-millisecond trigger interval, can be attributed to several factors. Here are some potential causes and solutions:

  1. Processing Time vs. Data Arrival Time:

    • The trigger interval in Spark Structured Streaming controls how often the system checks for new data, not necessarily how quickly it processes data after arrival. If the data arrives just after a trigger interval, it might only be processed in the next interval.
    • The warning message Current batch is falling behind suggests that the processing time for each batch is longer than the trigger interval. This can cause delays and backlog in processing.
  2. Resource Constraints and Configuration:

    • Check if your Spark cluster has sufficient resources (CPU, memory) to handle the processing load. Insufficient resources can lead to delays.
    • Review Spark configurations related to streaming, such as spark.streaming.backpressure.enabled (for rate limiting) and spark.executor.memory (for memory allocation).
  3. PubSub Lite Configuration:

    • Ensure that your PubSub Lite subscription and topic are configured correctly. For instance, the throughput capacity of the topic and subscription can affect how quickly messages are delivered and processed.
    • PubSub Lite's partitioning might also impact performance. More partitions can lead to better throughput but require careful management.
  4. Network Latency and System Overheads:

    • Network latency between your Spark cluster and PubSub Lite can contribute to delays.
    • System overheads, such as serialization/deserialization and I/O operations, can also add to processing time.
  5. Debugging Steps:

    • Monitor the Spark UI to understand the job's execution details and identify bottlenecks.
    • Experiment with different trigger intervals and observe the changes in processing behavior.
    • Test with a simpler Spark job to isolate whether the issue is with the PubSub Lite source or with the Spark processing logic.
  6. Alternative Approaches:

    • If real-time processing is critical and these delays are unacceptable, consider alternative architectures or technologies that guarantee lower latency, such as Apache Kafka with Spark or Google Cloud Dataflow.

While reducing the trigger interval is a step in the right direction, achieving near-real-time processing in Spark Structured Streaming, especially with PubSub Lite, involves a combination of appropriate resource allocation, efficient job configuration, and understanding the inherent limitations of the technologies involved.

View solution in original post

4 REPLIES 4

Your current Spark Streaming setup with Pub/Sub Lite processes data every second. To enhance this for near-real-time processing, consider the following adjustments:

Reducing Trigger Interval:

  • While Spark Structured Streaming doesn't support triggering on individual messages, you can approximate near-real-time processing by reducing the trigger interval.
  • Example of setting it to 100 milliseconds:
 
# ... previous code ...
# Trigger every 100 milliseconds
query = (
    sdf.writeStream.format("console")
    .outputMode("append")
    .trigger(processingTime="100 milliseconds")
    .start()
)
# Wait 120 seconds (must be >= 60 seconds) to start receiving messages.
query.awaitTermination(120)
query.stop()
  • Decreases latency but doesn't guarantee processing of each message as it arrives.
  • Be aware that very short intervals can increase resource usage, such as CPU.

Exploring Pub/Sub Connectors:

  • Spark doesn't have native support for Google Cloud Pub/Sub, but you can explore third-party connectors:
    • Google Cloud Dataproc Pub/Sub Connector: Official Google connector for batch processing with Dataproc, allowing Spark jobs to interact with Pub/Sub. Not designed for streaming.
    • Spark PubSub Connector: Open-source connector that integrates Spark with Google Cloud Pub/Sub. Not officially supported by Google, so evaluate its compatibility with your Spark environment.

Alternative Approaches:

  • Cloud Dataflow: For real-time streaming with per-message processing, Google Cloud Dataflow is an excellent choice. Offers native Pub/Sub integration and is a fully managed service for running Apache Beam pipelines.
  • Direct Pub/Sub Integration: Implement custom code within your Spark application to interact directly with the Pub/Sub API. Requires careful management of message acknowledgment and error handling.

Select the method that aligns best with your project's latency, throughput, and resource requirements.

@ms4446 
thank you so much for the detailed explanation! I would definitely take a look at alternative solutions

Regarding the delay in message processing, I decreased it to 100 milliseconds, but I still experience a 1-minute delay, here is my console output:

24/01/09 17:11:26 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|        subscription|partition|offset|key|                data|   publish_timestamp|event_timestamp|attributes|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|projects/sandbox-...|        0|     0| []|[7B 20 20 20 20 2...|2024-01-09 17:04:...|           NULL|        {}|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+

24/01/09 17:11:32 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 100 milliseconds, but spent 4145 milliseconds
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|        subscription|partition|offset|key|                data|   publish_timestamp|event_timestamp|attributes|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+
|projects/sandbox-...|        0|     1| []|[7B 20 20 20 20 2...|2024-01-09 17:11:...|           NULL|        {}|
|projects/sandbox-...|        0|     2| []|[7B 20 20 20 20 2...|2024-01-09 17:11:...|           NULL|        {}|
+--------------------+---------+------+---+--------------------+--------------------+---------------+----------+

24/01/09 17:12:29 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 100 milliseconds, but spent 586 milliseconds

As you can see the messages are not processed in near-real-time, but rather in Batches and every batch triggers only after 1-minute delay.

I use the gcp cli to publish a message into pubsub lite:

gcloud pubsub lite-topics publish my-topic --location=europe-west2 --message='my message'

 Is there something I can be missing?

The issue you're encountering, where messages are processed in batches with a delay despite setting a 100-millisecond trigger interval, can be attributed to several factors. Here are some potential causes and solutions:

  1. Processing Time vs. Data Arrival Time:

    • The trigger interval in Spark Structured Streaming controls how often the system checks for new data, not necessarily how quickly it processes data after arrival. If the data arrives just after a trigger interval, it might only be processed in the next interval.
    • The warning message Current batch is falling behind suggests that the processing time for each batch is longer than the trigger interval. This can cause delays and backlog in processing.
  2. Resource Constraints and Configuration:

    • Check if your Spark cluster has sufficient resources (CPU, memory) to handle the processing load. Insufficient resources can lead to delays.
    • Review Spark configurations related to streaming, such as spark.streaming.backpressure.enabled (for rate limiting) and spark.executor.memory (for memory allocation).
  3. PubSub Lite Configuration:

    • Ensure that your PubSub Lite subscription and topic are configured correctly. For instance, the throughput capacity of the topic and subscription can affect how quickly messages are delivered and processed.
    • PubSub Lite's partitioning might also impact performance. More partitions can lead to better throughput but require careful management.
  4. Network Latency and System Overheads:

    • Network latency between your Spark cluster and PubSub Lite can contribute to delays.
    • System overheads, such as serialization/deserialization and I/O operations, can also add to processing time.
  5. Debugging Steps:

    • Monitor the Spark UI to understand the job's execution details and identify bottlenecks.
    • Experiment with different trigger intervals and observe the changes in processing behavior.
    • Test with a simpler Spark job to isolate whether the issue is with the PubSub Lite source or with the Spark processing logic.
  6. Alternative Approaches:

    • If real-time processing is critical and these delays are unacceptable, consider alternative architectures or technologies that guarantee lower latency, such as Apache Kafka with Spark or Google Cloud Dataflow.

While reducing the trigger interval is a step in the right direction, achieving near-real-time processing in Spark Structured Streaming, especially with PubSub Lite, involves a combination of appropriate resource allocation, efficient job configuration, and understanding the inherent limitations of the technologies involved.

cool, thanks for the detailed replies!