diff --git a/.github/workflows/build-deploy-cloudrun-google-sheets-trigger.yml b/.github/workflows/build-deploy-cloudrun-google-sheets-trigger.yml new file mode 100644 index 0000000..35dcfc1 --- /dev/null +++ b/.github/workflows/build-deploy-cloudrun-google-sheets-trigger.yml @@ -0,0 +1,28 @@ +name: Build & Deploy Google Sheets Trigger + +on: + push: + branches: + - main + - poc + - staging + paths: + - 'google-sheets-trigger/**' + workflow_dispatch: + +jobs: + build_and_deploy_google_sheets_trigger: + uses: CruGlobal/.github/.github/workflows/build-deploy-cloudrun-function.yml@v1 + with: + function_name: google-sheets-trigger + entry_point: trigger_sheets_check + runtime: python312 + environment: ${{ github.ref == 'refs/heads/main' && 'production' || github.ref == 'refs/heads/poc' && 'poc' || github.ref == 'refs/heads/staging' && 'staging' }} + secrets: + GCP_PROJECT_ID: ${{ vars.GCP_PROJECT_ID }} + GCP_PROJECT_NUMBER: ${{ vars.GCP_PROJECT_NUMBER }} + WORKLOAD_IDENTITY_POOL: ${{ vars.WORKLOAD_IDENTITY_POOL }} + WORKLOAD_IDENTITY_PROVIDER: ${{ vars.WORKLOAD_IDENTITY_PROVIDER }} + GCP_SERVICE_ACCOUNT: ${{ vars.GCP_SERVICE_ACCOUNT }} + GCP_SERVICE_ACCOUNT_EMAIL: ${{ vars.GCP_SERVICE_ACCOUNT_EMAIL }} + GCP_REGION: ${{ vars.GCP_REGION }} diff --git a/.gitignore b/.gitignore index a9bffc4..55d953e 100644 --- a/.gitignore +++ b/.gitignore @@ -161,4 +161,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +# Claude Code local settings +CLAUDE.local.md \ No newline at end of file diff --git a/README.md b/README.md index ea2d32e..815697b 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,38 @@ Creating a new secret version does not automatically take effect. You'll need to trigger a new deployment. See below, except ignore the advice about "only doing this in the POC env". -## Deploy new code manually: -You probably should only be doing this in the POC env. +## Environments + +This project uses three environments, each tied to a Git branch. Pushing code to a branch +automatically deploys the corresponding function(s) via GitHub Actions. + +| Environment | Branch | GCP Project | Purpose | +|-------------|--------|-------------|---------| +| **Staging** | `staging` | staging GCP project | Testing and validation before production. Use this for all pre-production testing. | +| **Production** | `main` | production GCP project | Live environment. Deploy by merging PRs to `main`. | +| **POC** | `poc` | `cru-data-orchestration-poc` | Experimental sandbox. Requires local Terraform setup (see below). Not needed for regular testing. | + +### Testing in staging + +Staging is the recommended environment for testing new or updated functions: + +1. Merge your feature branch into `staging` and push (or push directly to `staging`): + ```bash + git checkout staging + git merge feature/your-feature + git push origin staging + ``` +2. GitHub Actions will automatically deploy the function to the staging GCP project. +3. Use Cloud Scheduler in the [GCP Console](https://console.cloud.google.com/cloudscheduler) to **"Force Run"** the scheduled job and verify behavior. +4. Check [Cloud Logging](https://console.cloud.google.com/logs) for function output and errors. + +### Deploying to production + +After testing in staging, merge your PR to `main`. GitHub Actions will deploy to the production GCP project. + +### Deploy new code manually + +You should only need to do this for experimental work in the POC environment: ```bash gcloud functions deploy fivetran-trigger --source=. --entry-point=hello_http --runtime=python312 --gen2 --region=us-central1 @@ -67,6 +97,8 @@ gcloud functions deploy fivetran-trigger --source=. --entry-point=hello_http --r ## POC environment infrastructure +> **Note:** The POC environment is for experimental work only. For regular testing, use the **staging** environment (see above). + The POC environment is contained within the [cru-data-orchestration-poc](https://console.cloud.google.com/welcome?project=cru-data-orchestration-poc) GCP project. The project and its integrations with Datadog and Github are managed by Terraform and Atlantis in the [cru-terraform repo](https://github.com/CruGlobal/cru-terraform/tree/master/applications/data-warehouse/dot/poc). However, the functions and related infrastructure are not managed that way. Instead, devs can 'spin up' the functions by using terraform locally, using local tf state. They can then use the web console or gcloud cli to experiment and learn. @@ -84,4 +116,92 @@ To spin up the POC infrastructure: To clean up when you're done: * `terraform destroy` -Infrastructure learnings here can be applied to the terraform config for the beta and production environments. +Infrastructure learnings here can be applied to the terraform config for the production environment. + + +## Service Accounts + +Service accounts for this project are managed in a separate Terraform repository: +- **Location**: `cru-terraform/applications/data-warehouse/dot/prod/permissions.tf` +- **Pattern**: One service account per Cloud Run function/job +- **Naming convention**: `{function-name}@{project-id}.iam.gserviceaccount.com` + +### Adding a New Service Account + +1. Add the service account resource to `permissions.tf` in the cru-terraform repo +2. Add required IAM bindings (Pub/Sub publisher, BigQuery access, etc.) +3. If the function needs access to Google Sheets, share those files with the service account email + + +## Pub/Sub Topics + +### cloud-run-job-completed +This topic triggers dbt jobs after a Cloud Run job completes. + +**Publishers**: okta-sync, woo-sync, process-geography, google-sheets-trigger + +**Subscriber**: A Cloud Function (not in this repo) that calls dbt-trigger + +To trigger a dbt job from your function: +```python +from google.cloud import pubsub_v1 +import json +import os + +def publish_pubsub_message(data: dict, topic_name: str) -> None: + google_cloud_project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(google_cloud_project_id, topic_name) + data_encoded = json.dumps(data).encode("utf-8") + future = publisher.publish(topic_path, data_encoded) + future.result() + +# Trigger dbt job: +publish_pubsub_message({"job_id": "YOUR_DBT_JOB_ID"}, "cloud-run-job-completed") +``` + + +## google-sheets-trigger + +A reusable Cloud Function that checks Google Sheets for changes and triggers dbt jobs. + +### Usage + +Configure schedules in Terraform (`poc-terraform/functions.tf`). Each schedule specifies: +- Which sheets to monitor (by ID and name) +- Which dbt job to trigger +- When to run (cron schedule) +- Whether to include weekends in change detection + +Example Terraform configuration: +```hcl +module "google-sheets-trigger" { + source = "..." + schedule = { + my_sheets: { + cron: "0 17 * * 1-5", # M-F 5pm + argument = { + "sheets" = [ + { "id" = "your-sheet-id", "name" = "My Sheet" } + ], + "dbt_job_id" = "123456", + "include_weekends" = false + } + } + } +} +``` + +### Parameters + +| Parameter | Type | Required | Description | +|-----------|------|----------|-------------| +| `sheets` | array | Yes | List of sheets to monitor, each with `id` and `name` | +| `dbt_job_id` | string | Yes | The dbt job ID to trigger when changes are detected | +| `include_weekends` | boolean | No | If `true`, always looks back 24 hours. If `false` (default), looks back 72 hours on Monday to cover the weekend. | + +### Permissions + +Share all monitored Google Sheets with the service account: +- `google-sheets-trigger@{project-id}.iam.gserviceaccount.com` +- Grant "Viewer" access (read-only is sufficient) diff --git a/google-sheets-trigger/main.py b/google-sheets-trigger/main.py new file mode 100644 index 0000000..ec67d36 --- /dev/null +++ b/google-sheets-trigger/main.py @@ -0,0 +1,226 @@ +""" +Google Sheets Trigger Cloud Function + +A reusable HTTP Cloud Function that checks Google Sheets for changes +and triggers dbt jobs via Pub/Sub. + +Request JSON: +{ + "sheets": [{"id": "...", "name": "..."}], + "dbt_job_id": "920201", + "include_weekends": false // optional, defaults to false +} +""" + +import os +import json +import logging +import sys +from datetime import datetime, timedelta +from typing import Any, Dict, List + +import functions_framework +import pytz +from google.cloud import pubsub_v1 + +from sheets_client import SheetsClient + + +logger = logging.getLogger("primary_logger") +logger.propagate = False + + +class CloudLoggingFormatter(logging.Formatter): + """ + Produces messages compatible with google cloud logging + """ + + def format(self, record: logging.LogRecord) -> str: + s = super().format(record) + return json.dumps( + { + "message": s, + "severity": record.levelname, + "timestamp": {"seconds": int(record.created), "nanos": 0}, + } + ) + + +def setup_logging(): + """ + Sets up logging for the application. + """ + global logger + + # Remove any existing handlers + if logger.handlers: + for handler in logger.handlers: + logger.removeHandler(handler) + + handler = logging.StreamHandler(stream=sys.stdout) + formatter = CloudLoggingFormatter(fmt="%(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + sys.excepthook = handle_unhandled_exception + + +def handle_unhandled_exception(exc_type, exc_value, exc_traceback): + """ + Handles unhandled exceptions by logging the exception details. + """ + if issubclass(exc_type, KeyboardInterrupt): + sys.__excepthook__(exc_type, exc_value, exc_traceback) + return + + logger.exception( + "Unhandled exception", exc_info=(exc_type, exc_value, exc_traceback) + ) + + +def get_lookback_time(include_weekends: bool = False) -> datetime: + """ + Get the cutoff time for change detection. + + Args: + include_weekends: If True, always look back 24 hours (for daily schedules). + If False, on Monday look back 72 hours to cover the weekend. + + Returns: + datetime: The cutoff time - sheets modified after this time are considered changed. + """ + est = pytz.timezone("America/New_York") + now = datetime.now(est) + + # Daily schedule (including weekends) - always 24 hours + if include_weekends: + return now - timedelta(hours=24) + + # Weekday-only schedule - on Monday, look back to Friday + if now.weekday() == 0: # Monday + return now - timedelta(hours=72) + + return now - timedelta(hours=24) + + +def publish_pubsub_message(data: Dict[str, Any], topic_name: str) -> None: + """ + Publishes a message to a Pub/Sub topic. + + Args: + data: The message data as a dictionary. + topic_name: The name of the Pub/Sub topic. + """ + google_cloud_project_id = os.environ.get("GOOGLE_CLOUD_PROJECT") + if not google_cloud_project_id: + raise ValueError("GOOGLE_CLOUD_PROJECT environment variable not set") + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(google_cloud_project_id, topic_name) + data_encoded = json.dumps(data).encode("utf-8") + future = publisher.publish(topic_path, data_encoded) + future.result() + logger.info(f"Published message to Pub/Sub topic '{topic_name}'") + + +@functions_framework.http +def trigger_sheets_check(request): + """ + HTTP Cloud Function to check Google Sheets for changes and trigger dbt jobs. + + Request JSON: + { + "sheets": [{"id": "...", "name": "..."}], + "dbt_job_id": "920201", + "include_weekends": false // optional + } + + Returns: + tuple: (response_message, status_code) + """ + setup_logging() + + # Parse request + request_json = request.get_json(silent=True) + + # Handle octet-stream content type (same pattern as dbt-trigger) + if ( + request_json is None + and request.headers.get("Content-Type") == "application/octet-stream" + ): + try: + request_data = request.get_data(as_text=True) + request_json = json.loads(request_data) if request_data else None + except Exception as e: + logger.exception(f"Failed to parse octet-stream data: {str(e)}") + request_json = None + + if not request_json: + logger.error("Missing request body") + return "Missing request body", 400 + + sheets = request_json.get("sheets", []) + dbt_job_id = request_json.get("dbt_job_id") + include_weekends = request_json.get("include_weekends", False) + + if not sheets: + logger.error("Missing sheets in request") + return "Missing sheets in request", 400 + + if not dbt_job_id: + logger.error("Missing dbt_job_id in request") + return "Missing dbt_job_id in request", 400 + + logger.info( + f"Checking {len(sheets)} sheets for changes (include_weekends={include_weekends})" + ) + + lookback_time = get_lookback_time(include_weekends) + logger.info(f"Looking for changes after {lookback_time.isoformat()}") + + try: + sheets_client = SheetsClient() + except Exception as e: + logger.exception(f"Failed to initialize SheetsClient: {str(e)}") + return "Failed to initialize Google Sheets client", 500 + + changes_detected = False + changed_sheets = [] + + for sheet in sheets: + sheet_id = sheet.get("id") + sheet_name = sheet.get("name", sheet_id) + + if not sheet_id: + logger.warning(f"Skipping sheet with missing id: {sheet}") + continue + + try: + modified_time = sheets_client.get_modified_time(sheet_id) + logger.info( + f"Sheet '{sheet_name}' last modified: {modified_time.isoformat()}" + ) + + if modified_time > lookback_time: + logger.info(f"Sheet '{sheet_name}' has changes (modified after lookback)") + changes_detected = True + changed_sheets.append(sheet_name) + else: + logger.info(f"Sheet '{sheet_name}' has no changes") + + except Exception as e: + logger.exception(f"Failed to check sheet '{sheet_name}' ({sheet_id}): {str(e)}") + return f"Failed to check sheet '{sheet_name}'", 500 + + if changes_detected: + try: + publish_pubsub_message({"job_id": dbt_job_id}, "cloud-run-job-completed") + logger.info(f"Triggered dbt job {dbt_job_id} due to changes in: {', '.join(changed_sheets)}") + return f"Changes detected in {len(changed_sheets)} sheet(s), triggered dbt job {dbt_job_id}", 200 + except Exception as e: + logger.exception(f"Failed to publish Pub/Sub message: {str(e)}") + return "Failed to trigger dbt job", 500 + else: + logger.info("No changes detected in any sheets") + return "No changes detected", 200 diff --git a/google-sheets-trigger/main_test.py b/google-sheets-trigger/main_test.py new file mode 100644 index 0000000..6945b14 --- /dev/null +++ b/google-sheets-trigger/main_test.py @@ -0,0 +1,306 @@ +""" +Tests for the Google Sheets Trigger Cloud Function. + +Run with: pytest main_test.py -v +""" + +import pytest +import logging +import sys +from datetime import datetime, timedelta +from unittest import mock + +import pytz + +import main + + +@pytest.fixture(autouse=True) +def setup_logging(): + """Set up logging for tests using the same configuration as main.py""" + logger = logging.getLogger("primary_logger") + logger.handlers = [] + logger.propagate = True + + handler = logging.StreamHandler(stream=sys.stdout) + formatter = main.CloudLoggingFormatter(fmt="%(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.DEBUG) + + yield + + logger.handlers = [] + logger.propagate = False + + +@pytest.fixture +def mock_env_vars(monkeypatch): + """Set required environment variables.""" + monkeypatch.setenv("GOOGLE_CLOUD_PROJECT", "test-project") + + +class TestGetLookbackTime: + """Tests for the get_lookback_time function.""" + + def test_weekday_no_weekends_returns_24_hours(self): + """On Tue-Fri without weekends, should look back 24 hours.""" + est = pytz.timezone("America/New_York") + # Tuesday at 5pm + tuesday = datetime(2024, 1, 9, 17, 0, tzinfo=est) + + with mock.patch("main.datetime") as mock_dt: + mock_dt.now.return_value = tuesday + # Need to keep timedelta working + mock_dt.side_effect = lambda *args, **kwargs: datetime(*args, **kwargs) + + result = main.get_lookback_time(include_weekends=False) + + # Should be roughly 24 hours ago + expected = tuesday - timedelta(hours=24) + assert abs((result - expected).total_seconds()) < 60 + + def test_monday_no_weekends_returns_72_hours(self): + """On Monday without weekends, should look back 72 hours to Friday.""" + est = pytz.timezone("America/New_York") + # Monday at 5pm + monday = datetime(2024, 1, 8, 17, 0, tzinfo=est) + + with mock.patch("main.datetime") as mock_dt: + mock_dt.now.return_value = monday + mock_dt.side_effect = lambda *args, **kwargs: datetime(*args, **kwargs) + + result = main.get_lookback_time(include_weekends=False) + + # Should be roughly 72 hours ago (Friday) + expected = monday - timedelta(hours=72) + assert abs((result - expected).total_seconds()) < 60 + + def test_monday_with_weekends_returns_24_hours(self): + """On Monday with weekends included, should still look back 24 hours.""" + est = pytz.timezone("America/New_York") + monday = datetime(2024, 1, 8, 17, 0, tzinfo=est) + + with mock.patch("main.datetime") as mock_dt: + mock_dt.now.return_value = monday + mock_dt.side_effect = lambda *args, **kwargs: datetime(*args, **kwargs) + + result = main.get_lookback_time(include_weekends=True) + + # Should be roughly 24 hours ago + expected = monday - timedelta(hours=24) + assert abs((result - expected).total_seconds()) < 60 + + def test_weekend_with_weekends_returns_24_hours(self): + """On Saturday with weekends included, should look back 24 hours.""" + est = pytz.timezone("America/New_York") + saturday = datetime(2024, 1, 6, 17, 0, tzinfo=est) + + with mock.patch("main.datetime") as mock_dt: + mock_dt.now.return_value = saturday + mock_dt.side_effect = lambda *args, **kwargs: datetime(*args, **kwargs) + + result = main.get_lookback_time(include_weekends=True) + + expected = saturday - timedelta(hours=24) + assert abs((result - expected).total_seconds()) < 60 + + +class TestTriggerSheetsCheck: + """Tests for the trigger_sheets_check HTTP function.""" + + def test_missing_request_body_returns_400(self, mock_env_vars): + """When request body is missing, should return 400.""" + mock_request = mock.Mock() + mock_request.get_json.return_value = None + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 400 + assert "Missing request body" in response + + def test_missing_sheets_returns_400(self, mock_env_vars): + """When sheets are missing from request, should return 400.""" + mock_request = mock.Mock() + mock_request.get_json.return_value = {"dbt_job_id": "920201"} + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 400 + assert "Missing sheets" in response + + def test_missing_dbt_job_id_returns_400(self, mock_env_vars): + """When dbt_job_id is missing from request, should return 400.""" + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [{"id": "123", "name": "Test"}] + } + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 400 + assert "Missing dbt_job_id" in response + + @mock.patch("main.SheetsClient") + @mock.patch("main.publish_pubsub_message") + def test_changes_detected_triggers_dbt( + self, mock_publish, mock_sheets_class, mock_env_vars + ): + """When sheet has recent changes, should trigger dbt job.""" + # Sheet was modified 1 hour ago (within lookback window) + recent_time = datetime.now(pytz.UTC) - timedelta(hours=1) + mock_sheets_class.return_value.get_modified_time.return_value = recent_time + + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [{"id": "123", "name": "Test Sheet"}], + "dbt_job_id": "920201", + } + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 200 + assert "triggered dbt job 920201" in response + mock_publish.assert_called_once_with( + {"job_id": "920201"}, "cloud-run-job-completed" + ) + + @mock.patch("main.SheetsClient") + @mock.patch("main.publish_pubsub_message") + def test_no_changes_skips_dbt( + self, mock_publish, mock_sheets_class, mock_env_vars + ): + """When no sheets have recent changes, should not trigger dbt job.""" + # Sheet was modified 1 week ago (outside lookback window) + old_time = datetime.now(pytz.UTC) - timedelta(days=7) + mock_sheets_class.return_value.get_modified_time.return_value = old_time + + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [{"id": "123", "name": "Test Sheet"}], + "dbt_job_id": "920201", + } + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 200 + assert "No changes detected" in response + mock_publish.assert_not_called() + + @mock.patch("main.SheetsClient") + @mock.patch("main.publish_pubsub_message") + def test_multiple_sheets_one_changed( + self, mock_publish, mock_sheets_class, mock_env_vars + ): + """When one of multiple sheets changed, should trigger dbt job.""" + old_time = datetime.now(pytz.UTC) - timedelta(days=7) + recent_time = datetime.now(pytz.UTC) - timedelta(hours=1) + + # First sheet unchanged, second sheet changed + mock_sheets_class.return_value.get_modified_time.side_effect = [ + old_time, + recent_time, + ] + + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [ + {"id": "123", "name": "Sheet 1"}, + {"id": "456", "name": "Sheet 2"}, + ], + "dbt_job_id": "920201", + } + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 200 + assert "triggered dbt job 920201" in response + mock_publish.assert_called_once() + + @mock.patch("main.SheetsClient") + def test_sheets_client_error_returns_500( + self, mock_sheets_class, mock_env_vars + ): + """When SheetsClient fails, should return 500.""" + mock_sheets_class.return_value.get_modified_time.side_effect = Exception( + "API Error" + ) + + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [{"id": "123", "name": "Test Sheet"}], + "dbt_job_id": "920201", + } + mock_request.headers.get.return_value = None + + response, status = main.trigger_sheets_check(mock_request) + + assert status == 500 + assert "Failed to check sheet" in response + + @mock.patch("main.SheetsClient") + @mock.patch("main.publish_pubsub_message") + def test_include_weekends_parameter( + self, mock_publish, mock_sheets_class, mock_env_vars + ): + """Should pass include_weekends parameter correctly.""" + recent_time = datetime.now(pytz.UTC) - timedelta(hours=1) + mock_sheets_class.return_value.get_modified_time.return_value = recent_time + + mock_request = mock.Mock() + mock_request.get_json.return_value = { + "sheets": [{"id": "123", "name": "Test Sheet"}], + "dbt_job_id": "920201", + "include_weekends": True, + } + mock_request.headers.get.return_value = None + + with mock.patch("main.get_lookback_time") as mock_lookback: + mock_lookback.return_value = datetime.now(pytz.UTC) - timedelta(hours=24) + main.trigger_sheets_check(mock_request) + mock_lookback.assert_called_once_with(True) + + def test_octet_stream_content_type(self, mock_env_vars): + """Should handle application/octet-stream content type.""" + mock_request = mock.Mock() + mock_request.get_json.return_value = None + mock_request.headers.get.return_value = "application/octet-stream" + mock_request.get_data.return_value = '{"sheets": [], "dbt_job_id": "920201"}' + + response, status = main.trigger_sheets_check(mock_request) + + # Should parse the octet-stream but fail on empty sheets + assert status == 400 + assert "Missing sheets" in response + + +class TestPublishPubsubMessage: + """Tests for the publish_pubsub_message function.""" + + def test_missing_project_raises_error(self, monkeypatch): + """When GOOGLE_CLOUD_PROJECT is not set, should raise ValueError.""" + monkeypatch.delenv("GOOGLE_CLOUD_PROJECT", raising=False) + + with pytest.raises(ValueError, match="GOOGLE_CLOUD_PROJECT"): + main.publish_pubsub_message({"job_id": "123"}, "test-topic") + + @mock.patch("main.pubsub_v1.PublisherClient") + def test_publishes_message(self, mock_publisher_class, mock_env_vars): + """Should publish encoded JSON to the topic.""" + mock_publisher = mock_publisher_class.return_value + mock_future = mock.Mock() + mock_publisher.publish.return_value = mock_future + + main.publish_pubsub_message({"job_id": "920201"}, "cloud-run-job-completed") + + mock_publisher.topic_path.assert_called_once_with( + "test-project", "cloud-run-job-completed" + ) + mock_publisher.publish.assert_called_once() + mock_future.result.assert_called_once() diff --git a/google-sheets-trigger/requirements-test.txt b/google-sheets-trigger/requirements-test.txt new file mode 100644 index 0000000..4a42efa --- /dev/null +++ b/google-sheets-trigger/requirements-test.txt @@ -0,0 +1,4 @@ +flask==3.0.3 +pytest==8.2.0 +pytest-mock==3.14.0 +responses==0.23.1 diff --git a/google-sheets-trigger/requirements.txt b/google-sheets-trigger/requirements.txt new file mode 100644 index 0000000..337f2e1 --- /dev/null +++ b/google-sheets-trigger/requirements.txt @@ -0,0 +1,12 @@ +# For basic Cloud Run Functions infrastructure +functions-framework==3.* + +# For Google Drive API (to get sheet modifiedTime) +google-api-python-client>=2.100.0 +google-auth>=2.38.0 + +# For Pub/Sub messaging +google-cloud-pubsub>=2.18.0 + +# For timezone handling +pytz>=2024.1 diff --git a/google-sheets-trigger/sheets_client.py b/google-sheets-trigger/sheets_client.py new file mode 100644 index 0000000..f6a28c3 --- /dev/null +++ b/google-sheets-trigger/sheets_client.py @@ -0,0 +1,70 @@ +""" +Google Sheets Client + +Uses the Google Drive API to retrieve file metadata (specifically modifiedTime) +for Google Sheets files. +""" + +from datetime import datetime +import logging + +import google.auth +from googleapiclient.discovery import build + + +logger = logging.getLogger("primary_logger") + + +class SheetsClient: + """ + Client for retrieving Google Sheets metadata via the Drive API. + + Uses Application Default Credentials (ADC) for authentication, + which works automatically in Cloud Run/Cloud Functions. + """ + + def __init__(self): + """ + Initialize the SheetsClient with Google Drive API credentials. + + Uses Application Default Credentials with drive.metadata.readonly scope + to read file metadata without accessing file contents. + """ + credentials, _ = google.auth.default( + scopes=["https://www.googleapis.com/auth/drive.metadata.readonly"] + ) + self.service = build("drive", "v3", credentials=credentials) + logger.debug("SheetsClient initialized successfully") + + def get_modified_time(self, file_id: str) -> datetime: + """ + Get the last modified time of a Google Sheet. + + Args: + file_id: The Google Sheet file ID (from the URL: + docs.google.com/spreadsheets/d/{file_id}/edit) + + Returns: + datetime: The last modified time as a timezone-aware datetime (UTC). + + Raises: + googleapiclient.errors.HttpError: If the file cannot be accessed + (e.g., not shared with the service account, or invalid ID). + """ + file_metadata = self.service.files().get( + fileId=file_id, + fields="modifiedTime,name", + supportsAllDrives=True + ).execute() + + modified_time_str = file_metadata["modifiedTime"] + # Google returns ISO format with Z suffix: "2024-01-15T14:30:00.000Z" + # Convert to timezone-aware datetime + modified_time = datetime.fromisoformat( + modified_time_str.replace("Z", "+00:00") + ) + + logger.debug( + f"Retrieved modifiedTime for file {file_id}: {modified_time.isoformat()}" + ) + return modified_time diff --git a/poc-terraform/functions.tf b/poc-terraform/functions.tf index e18ff29..bb16b91 100644 --- a/poc-terraform/functions.tf +++ b/poc-terraform/functions.tf @@ -22,3 +22,34 @@ module "dbt-triggers" { ] project_id = local.project_id } + +module "google-sheets-trigger" { + source = "git::https://github.com/CruGlobal/cru-terraform-modules.git//gcp/cloudrun-function/scheduled-tasks?ref=v30.14.4" + name = "google-sheets-trigger" + description = "Checks Google Sheets for changes and triggers dbt jobs" + time_zone = "America/New_York" + + schedule = { + dw_security_sheets : { + cron : "0 19 * * 1-5", # M-F 7pm EST + argument = { + "sheets" = [ + { "id" = "1bJzA7_THeCd3oZiLAfktBqHSx9_bOdFEbWLMVkY47Mc", "name" = "Power BI RLS - Financial" }, + { "id" = "1f49EQA5B0GraOHHYjw3zYo6EKzuzbfoYWUSEynGNm4s", "name" = "BigQuery Table Access" }, + { "id" = "1Wm9rVCkn2r8u_p_BzABffAysfMbW24-XsX0rWoUQXMc", "name" = "Power BI RLS - Other" } + ], + "dbt_job_id" = "920201", + "include_weekends" = false + } + } + } + + secrets = [] # No secrets needed - uses default SA credentials + + secret_managers = [ + "user:luis.rodriguez@cru.org", + "user:matt.drees@cru.org", + "group:dps-gcp-role-data-engineers@cru.org", + ] + project_id = local.project_id +}