Submit Dataproc serverless job using service account

Hi Folks,

I'm trying to submit to submit a Dataproc serverless job using service account (have necessary permissions) to load multiple csv files (> 100GB) from GCS bucket to Cloud SQL PostgreSQL instance. 

Can you please help me with the command that needs to be submitted? How can I reference the JSON file linked with the service account for authentication or is it not necessary to mention?

gcloud dataproc batches submit spark \ --region=us \ --service-account=test@Test.iam.gserviceaccount.com \ --job-type=spark \ --python-file=test.py \ --cluster=cluster_name

When I submit this batch job, how does it work in the background? Does the cluster gets created and deleted after job gets completed or it needs to be deleted manually?

Thanks,

Vigneswar Jeyaraj

0 5 415
5 REPLIES 5

Here is how to submit a Dataproc Serverless job using a service account, a potential script, and an explanation of the background processes:

Steps to Submit a Dataproc Serverless Job Using a Service Account

1. Grant Permissions

  • Crucial: Make sure the service account (test@Test.iam.gserviceaccount.com) has the following roles to interact with Google Cloud services:
    • Dataproc Serverless Roles:
      • Use roles/dataproc.editor for broad permissions, or create a custom, more restrictive role tailored to your needs.
    • Cloud SQL Client: Allows connection to the Cloud SQL instance.
    • Storage Object Viewer: Enables reading data from GCS.

2. Create Python Script 

 
import pyspark.sql.functions as F 
from pyspark.sql import SparkSession 

def load_and_insert(spark, gcs_bucket, gcs_path, sql_instance, sql_user, sql_password, sql_db, sql_table): 
    # Read CSV files from GCS 
    df = spark.read.option("header", True).csv(f"gs://{gcs_bucket}/{gcs_path}/*.csv") 

    # Write data to Cloud SQL 
    df.write \ 
     .format("jdbc") \ 
     .mode("append") \ 
     .option("url", f"jdbc:postgresql://{sql_instance}/{sql_db}") \ 
     .option("dbtable", sql_table) \ 
     .option("user", sql_user) \ 
     .option("password", sql_password) \ 
     .save() 

if __name__ == "__main__": 
    spark = SparkSession.builder.appName("GCS-to-SQL").getOrCreate() 

    # Replace with your actual values 
    gcs_bucket = "your-gcs-bucket" 
    gcs_path = "your/csv/data/path" 
    sql_instance = "your-cloud-sql-instance"
    sql_user = "your-sql-username"
    sql_password = "your-sql-password" 
    sql_db = "your-database-name" 
    sql_table = "your-target-table" 

    load_and_insert(spark, gcs_bucket, gcs_path, sql_instance, sql_user, sql_password, sql_db, sql_table) 

3. Submit the Dataproc Serverless Batch

Bash
gcloud dataproc batches submit pyspark \
 --region=us-central1 \ 
 --batch=csv-to-cloudsql-batch \ 
 --service-account=test@Test.iam.gserviceaccount.com \ 
 gs://your-gcs-bucket/test.py 

Background Processes

  • Spin-up: Dataproc Serverless creates a Spark environment scaled for your job.
  • Execution: Your Python script runs, moving data from GCS to Cloud SQL.
  • Tear-down: Resources are released after completion, optimizing costs.

Additional Considerations

  • Security and Network: Verify proper networking (especially if using VPCs) for Dataproc Serverless and Cloud SQL to communicate.
  • Dependencies: Use a requirements.txt file or a packaged dependencies archive and the --py-files option if your script requires non-standard Python libraries.

Thank you for your response. Though we are providing the necessary permissions to the service account, is it necessary to provide username and password in the 

# Write data to Cloud SQL

 module of the code? Is there any way that we can connect without these user credentials?

Yes, connecting to a database like Cloud SQL from a Spark job involves providing credentials such as a username and password within your code. However, embedding credentials directly in your code is not a best practice due to security concerns. Fortunately, Google Cloud offers more secure methods to authenticate and connect to Cloud SQL without hardcoding credentials in your application code.

Cloud SQL Proxy

  • One common approach to securely connect to Cloud SQL instances is using the Cloud SQL Proxy. This provides secure access to your Cloud SQL second-generation instances without the need for Authorized networks or configuring SSL.
  • The proxy works by establishing an authenticated connection to Google Cloud using IAM permissions on your environment (cluster nodes, containers, etc.). Your application then connects to the local proxy, which handles secure communication to Cloud SQL.
  • Considerations: While effective, the Cloud SQL Proxy can add setup complexity, especially in serverless environments where you have less control over the underlying infrastructure.

Cloud SQL IAM Database Authentication

  • An even more seamless method is Cloud SQL IAM database authentication. This feature allows you to authenticate database users directly with Google Cloud IAM, avoiding separate database passwords altogether.

  • To use IAM database authentication:

    1. Enable IAM database authentication for your Cloud SQL instance.
    2. Create a database user linked to an IAM service account or user account.
    3. Modify your application to obtain a short-lived access token from IAM and use that as the password during the database connection.
  • Preference: Cloud SQL IAM database authentication is generally preferred due to its ease of use and tight integration with Google Cloud's permission model.

Example

 

import google.auth 
from google.auth.transport.requests import Request
from sqlalchemy import create_engine 

# Obtain credentials and request an access token 
credentials, project = google.auth.default()
credentials.refresh(Request()) 

# Connection string (replace placeholders)
connection_string = f"postgresql+pg8000://<db_user>@/<db_name>?host=/cloudsql/<instance_connection_name>&password={credentials.token}" 

# Create a database engine 
engine = create_engine(connection_string) 

Google Cloud provides robust tools for secure database connections. Leveraging IAM, either through the Cloud SQL Proxy or even better, IAM Database Authentication, significantly enhances security compared to embedding credentials in code.

Thank you so much for your response. We have created a connection string in the code below, 

import google.auth
from google.auth.transport.requests import Request
from sqlalchemy import create_engine

# Obtain credentials and request an access token
credentials, project = google.auth.default()
credentials.refresh(Request())

# Connection string (replace placeholders)
connection_string = f"postgresql+pg8000://pipelsvc-acct@alytED?host=35.289.58.202&password={credentials.token}"

print("String: ",connection_string)
print(type(credentials))
print("Credentials:", credentials.token)

# Create a database engine
engine = create_engine(connection_string)

# Connect to the database
connection = engine.connect()
print(connection)

# Declare a metadata object
metadata = MetaData()

# Define the table you want to query
table_name = 't'
table = Table(table_name, metadata, autoload=True, autoload_with=engine)

# Query to select all data from the table
query = table.select()

# Execute the query and fetch the results
result_proxy = connection.execute(query)
results = result_proxy.fetchall()

# Print the results
for row in results:
print(row)

# Close the connection
connection.close()

Modified some of the sensitive info for security purpose. When I submit this code, I'm getting below errors as below,

TimeoutError: [Errno 110] Connection timed out

pg8000.exceptions.InterfaceError: Can't create a connection to host 35.225.58.231 and port 5432 (timeout is None and source_address is None).

raise InterfaceError( sqlalchemy.exc.InterfaceError: (pg8000.exceptions.InterfaceError) Can't create a connection to host 35.289.58.202 and port 5432 (timeout is None and source_address is None). (Background on this error at: https://sqlalche.me/e/20/rvf5)

Can you please help me what is wrong with the code?

Thanks,

Vigneswar Jeyaraj

The errors you're encountering suggest that your application is unable to establish a connection to the specified Cloud SQL instance. This can be due to several reasons, including network issues, incorrect connection details, or misconfiguration of the Cloud SQL instance or the client's environment. Here are some  troubleshooting steps:

1. Verify Cloud SQL Instance Details

  • Ensure that the IP address 35.289.58.202 and port 5432 are correct for your Cloud SQL instance. Typically, Cloud SQL instances are accessed through a public IP address or via the Cloud SQL Proxy for private IP connections. If you're using a public IP, make sure it's correctly configured in your Cloud SQL instance settings.

2. Check Network Accessibility

  • If you're connecting directly to a public IP, ensure that your network environment (e.g., firewall rules) allows outbound connections to the Cloud SQL instance's IP address and port (5432 for PostgreSQL).

  • For Cloud SQL instances configured with a public IP, ensure that the instance is configured to accept connections from the IP address(es) of the client machine or network from which you're trying to connect.

3. Use Cloud SQL Proxy for Secure Connections

  • For a more secure and reliable connection method, especially if you're encountering network issues or if your instance uses a private IP, consider using the Cloud SQL Proxy. The Cloud SQL Proxy provides a secure tunnel between your application and the Cloud SQL instance without the need to allowlist IP addresses or configure SSL.

  • When using the Cloud SQL Proxy, your connection string's host parameter would typically be set to localhost or the path to a Unix socket provided by the proxy, rather than a direct IP address.

4. IAM Authentication Considerations

  • When using IAM authentication, ensure that the Cloud SQL instance has IAM database authentication enabled and that the IAM user or service account has the role of Cloud SQL Instance User at the minimum.

  • The database user (pipelsvc-acct in your connection string) must be created in the Cloud SQL instance and linked to the corresponding IAM account or service account. This user is different from the IAM entity and must be set up within the database.

5. Correct Use of Access Tokens

  • Ensure that the access token is valid at the time of connection. Tokens have a limited lifetime and must be refreshed periodically. Your current code snippet correctly refreshes the token, but if there's a significant delay between token refresh and usage, the token might expire.

6. SQLAlchemy Engine Configuration

  • When creating the SQLAlchemy engine, additional parameters might be needed for optimal performance or compatibility, such as pool sizes or timeout settings. However, these are less likely to be the cause of the connection timeout issue you're facing.

7. Debugging and Logs

  • Enable verbose logging for both SQLAlchemy and the Cloud SQL Proxy (if used) to get more detailed error messages. This can help pinpoint whether the issue is at the network level, authentication level, or with the SQL query execution.