如何通过Python直接提交Google Cloud ML Engine训练任务?
Absolutely! You can submit Google Cloud ML Engine (now integrated into Vertex AI) training jobs entirely via Python—no manual gcloud command-line calls needed. This is exactly what you need for your automated Flask web app, as it lets you trigger and manage training workflows programmatically.
Here's a step-by-step breakdown to implement this:
First, make sure you have the necessary tools set up:
- Install the Google Cloud Python client library for Vertex AI:
pip install google-cloud-aiplatform flask - Authentication: If your Flask app runs on GCP (e.g., Cloud Run, App Engine), it will automatically use the attached service account. For local testing, run
gcloud auth application-default loginor set theGOOGLE_APPLICATION_CREDENTIALSenvironment variable to point to your service account key file. - Permissions: Ensure your service account has these roles:
Vertex AI Admin(or the more granularaiplatform.jobs.createpermission)Storage Object Adminfor your GCS buckets (to read training code/data and write model outputs)
Vertex AI is the modern successor to ML Engine, with a more intuitive Python API. Below is a Flask endpoint example that submits a Keras training job:
import time from flask import Flask, request, jsonify from google.cloud import aiplatform app = Flask(__name__) # Initialize Vertex AI client with your project and region aiplatform.init(project="your-gcp-project-id", location="us-central1") @app.route('/submit-keras-training', methods=['POST']) def submit_training(): # Extract parameters from the incoming request (customize as needed) req_data = request.get_json() job_display_name = req_data.get('job_name', f"keras-training-{int(time.time())}") gcs_training_script = req_data.get('script_path', "gs://your-bucket/train_keras_model.py") gcs_model_output_dir = req_data.get('output_path', "gs://your-bucket/model-output/") # Define a custom training job using a pre-built TensorFlow/Keras container training_job = aiplatform.CustomTrainingJob( display_name=job_display_name, script_path=gcs_training_script, # Use a pre-built container matching your TensorFlow/Keras version container_uri="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-13.py310:latest", # Add any extra Python packages your script needs requirements=["pandas", "scikit-learn"], staging_bucket="gs://your-bucket/staging/", # For temporary job files ) # Start the training job with resource configuration training_job.run( replica_count=1, machine_type="n1-standard-4", # Adjust based on your compute needs model_display_name="my-keras-trained-model", base_output_dir=gcs_model_output_dir ) # Return job details to the client return jsonify({ "job_id": training_job.resource_name, "job_name": training_job.display_name, "current_status": training_job.state.name }) if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)
Key Notes for This Approach:
- Pre-built Containers: GCP provides official TensorFlow/Keras containers for different versions—no need to build your own Docker image. Pick the container URI that matches your TF/Keras version for seamless compatibility.
- Training Script: Upload your Keras training script to GCS first. The script should handle loading data (from GCS or another source), training the model, and saving the final
.h5model to the specified GCS output directory. - Job Monitoring: You can add logic to check the job status later using
training_job.get()or set up Cloud Pub/Sub notifications to trigger actions when the job completes (like notifying users or deploying the model).
If you need to stick with the original ML Engine API (instead of Vertex AI), you can use the google-api-python-client to call the REST API directly:
First, install the library:
pip install google-api-python-client oauth2client flask
Then, here's a Flask endpoint example:
from flask import Flask, request, jsonify from googleapiclient import discovery from oauth2client.client import GoogleCredentials app = Flask(__name__) @app.route('/submit-ml-engine-training', methods=['POST']) def submit_ml_engine_job(): credentials = GoogleCredentials.get_application_default() ml_service = discovery.build('ml', 'v1', credentials=credentials) project_id = "your-gcp-project-id" req_data = request.get_json() job_id = req_data.get('job_id', f"keras-training-job-{int(time.time())}") # Define the training job payload job_body = { 'jobId': job_id, 'trainingInput': { 'scaleTier': 'BASIC', # Adjust based on your compute needs 'packageUris': ['gs://your-bucket/training_package.tar.gz'], # Your packaged training code 'pythonModule': 'trainer.task', # Entry point (trainer/task.py) 'region': 'us-central1', 'args': [ '--model-dir', 'gs://your-bucket/model-output', '--epochs', '10' ], 'runtimeVersion': '2.13', # Match your TensorFlow version 'pythonVersion': '3.10' } } # Submit the job request = ml_service.projects().jobs().create( parent=f'projects/{project_id}', body=job_body ) response = request.execute() return jsonify(response) if __name__ == '__main__': app.run(host='0.0.0.0', port=8080)
Key Notes for This Approach:
- Packaged Training Code: You need to package your training code into a Python package (with a
setup.pyfile) and upload the tar.gz to GCS. ThepythonModulepoints to your entry script (e.g.,trainer.taskmeanstrainer/task.py).
- Error Handling: Add try/except blocks to catch API errors and return meaningful responses to users (e.g., invalid GCS paths, permission issues).
- Asynchronous Workflows: Since training jobs can take minutes or hours, return a job ID immediately and let users check status via a separate endpoint, instead of blocking the request until completion.
- Logging: Integrate GCP Cloud Logging to track training job events and Flask app activity for easier debugging.
内容的提问来源于stack exchange,提问作者Jakob




