From 5b624b7d5403796011b5eb57d1e57362a3bf2797 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 16 Dec 2025 12:23:51 +0200 Subject: [PATCH 01/15] Add Domains keys endpoints; add handle_dkimkeys --- .gitignore | 3 ++ mailgun/client.py | 12 ++++- mailgun/examples/domain_examples.py | 79 ++++++++++++++++++++++++++++- mailgun/handlers/domains_handler.py | 24 +++++++++ 4 files changed, 116 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 5895d7e..40ce45b 100644 --- a/.gitignore +++ b/.gitignore @@ -302,3 +302,6 @@ dev/ pytestdebug.log */_version.py + +# local temp files +.server.key diff --git a/mailgun/client.py b/mailgun/client.py index 75c76b8..58a8541 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -28,6 +28,7 @@ from mailgun.handlers.bounce_classification_handler import handle_bounce_classification from mailgun.handlers.default_handler import handle_default +from mailgun.handlers.domains_handler import handle_dkimkeys from mailgun.handlers.domains_handler import handle_domainlist from mailgun.handlers.domains_handler import handle_domains from mailgun.handlers.domains_handler import handle_mailboxes_credentials @@ -63,6 +64,7 @@ "resendmessage": handle_resend_message, "domains": handle_domains, "domainlist": handle_domainlist, + "dkim": handle_dkimkeys, "dkim_authority": handle_domains, "dkim_selector": handle_domains, "web_prefix": handle_domains, @@ -132,7 +134,8 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: "mimemessage": {"base": v3_base, "keys": ["messages.mime"]}, "resendmessage": {"base": v3_base, "keys": ["resendmessage"]}, "ippools": {"base": v3_base, "keys": ["ip_pools"]}, - "dkimkeys": {"base": v1_base, "keys": ["dkim", "keys"]}, + # /v1/dkim/keys + "dkim": {"base": v1_base, "keys": ["dkim", "keys"]}, "domainlist": {"base": v4_base, "keys": ["domainlist"]}, # /v1/analytics/metrics # /v1/analytics/usage/metrics @@ -148,6 +151,7 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: "base": v2_base, "keys": ["bounce-classification", "metrics"], }, + # /v5/users "users": { "base": v5_base, "keys": ["users", "me"], @@ -157,6 +161,12 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: if key in special_cases: return special_cases[key], headers + if "dkim" in key: + return { + "base": v1_base, + "keys": key.split("_"), + }, headers + if "analytics" in key: headers |= {"Content-Type": "application/json"} return { diff --git a/mailgun/examples/domain_examples.py b/mailgun/examples/domain_examples.py index f703cbe..81b70b3 100644 --- a/mailgun/examples/domain_examples.py +++ b/mailgun/examples/domain_examples.py @@ -1,6 +1,8 @@ from __future__ import annotations import os +import subprocess +from pathlib import Path from mailgun.client import Client @@ -183,10 +185,85 @@ def get_sending_queues() -> None: :return: """ request = client.domains_sendingqueues.get(domain="python.test.domain5") - print(request) + print(request.json()) + + +def get_dkim_keys() -> None: + """ + GET /v1/dkim/keys + :return: + """ + data = { + "page": "string", + "limit": "0", + "signing_domain": "python.test.domain5", + "selector": "smtp", + } + + request = client.dkim_keys.get(data=data) + print(request.json()) + + +def post_dkim_keys() -> None: + """ + POST /v1/dkim/keys + :return: + """ + + # Private key PEM file must be generated in PKCS1 format. You need 'openssl' on your machine + # example: + # openssl genrsa -traditional -out .server.key 2048 + subprocess.run(["openssl", "genrsa", "-traditional", "-out", ".server.key", "2048"]) + + files = [ + ( + "pem", + ("server.key", Path(".server.key").read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + request = client.dkim_keys.create(data=data, headers=headers, files=files) + # TODO: Create test cases for these failures: + # + # {'message': 'failed to import domain key: failed to parse PEM'} + + # You must remove a domain key on WEB UI https://app.mailgun.com/mg/sending/domains selecting your "signing_domain" + # + # {'message': 'failed to create domain key: duplicate key'} + + # If you provide a string instead of a file + # + # {'message': 'failed to import domain key: failed to parse PEM'} + + # + # {'message': 'failed to import domain key: failed to parse private key: key must be PKCS1 format'} + print(request.json()) + + +def delete_dkim_keys() -> None: + """ + GET /v1/dkim/keys + :return: + """ + query = {"signing_domain": "python.test.domain5", "selector": "smtp"} + + request = client.dkim_keys.delete(filters=query) print(request.json()) if __name__ == "__main__": add_domain() get_domains() + + post_dkim_keys() + get_dkim_keys() + delete_dkim_keys() diff --git a/mailgun/handlers/domains_handler.py b/mailgun/handlers/domains_handler.py index 1aa148c..3eccd78 100644 --- a/mailgun/handlers/domains_handler.py +++ b/mailgun/handlers/domains_handler.py @@ -121,3 +121,27 @@ def handle_mailboxes_credentials( url = url["base"] + domain + final_keys + "/" + kwargs["login"] return url + + +def handle_dkimkeys( + url: dict[str, Any], + _domain: str | None, + _method: str | None, + **kwargs: Any, +) -> Any: + """Handle Mailboxes credentials. + + :param url: Incoming URL dictionary + :type url: dict + :param domain: Incoming domain + :type domain: str + :param _method: Incoming request method (it's not being used for this handler) + :type _method: str + :param kwargs: kwargs + :return: final url for Mailboxes credentials endpoint + """ + final_keys = path.join(*url["keys"]) if url["keys"] else "" + if "keys" in final_keys: + url = url["base"] + final_keys + + return url From 8c14cea061aa9cc12042df91609c8aeb4402d0b0 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Wed, 17 Dec 2025 10:06:50 +0200 Subject: [PATCH 02/15] Add Keys endpoints; add handle_keys --- .pre-commit-config.yaml | 12 ++-- mailgun/client.py | 8 +++ mailgun/examples/keys_examples.py | 94 ++++++++++++++++++++++++++++++ mailgun/examples/users_examples.py | 4 +- mailgun/handlers/keys_handler.py | 35 +++++++++++ pyproject.toml | 2 + 6 files changed, 148 insertions(+), 7 deletions(-) create mode 100644 mailgun/examples/keys_examples.py create mode 100644 mailgun/handlers/keys_handler.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e221f0c..95c5e42 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -63,12 +63,12 @@ repos: hooks: - id: detect-secrets # Detect accidentally committed secrets - - repo: https://github.com/codespell-project/codespell - rev: v2.4.1 - hooks: - - id: codespell - args: [--write] - exclude: ^tests +# - repo: https://github.com/codespell-project/codespell +# rev: v2.4.1 +# hooks: +# - id: codespell +# args: [--write] +# exclude: ^tests - repo: https://github.com/python-jsonschema/check-jsonschema rev: 0.35.0 diff --git a/mailgun/client.py b/mailgun/client.py index 58a8541..e97b84c 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -38,6 +38,7 @@ from mailgun.handlers.inbox_placement_handler import handle_inbox from mailgun.handlers.ip_pools_handler import handle_ippools from mailgun.handlers.ips_handler import handle_ips +from mailgun.handlers.keys_handler import handle_keys from mailgun.handlers.mailinglists_handler import handle_lists from mailgun.handlers.messages_handler import handle_resend_message from mailgun.handlers.metrics_handler import handle_metrics @@ -88,6 +89,7 @@ "analytics": handle_metrics, "bounce-classification": handle_bounce_classification, "users": handle_users, + "keys": handle_keys, } @@ -189,6 +191,12 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: "keys": key.split("_"), }, headers + if "keys" in key: + return { + "base": v1_base, + "keys": key.split("_"), + }, headers + # Handle DIPP endpoints if "subaccount" in key: if "ip_pools" in key: diff --git a/mailgun/examples/keys_examples.py b/mailgun/examples/keys_examples.py new file mode 100644 index 0000000..00492a0 --- /dev/null +++ b/mailgun/examples/keys_examples.py @@ -0,0 +1,94 @@ +from __future__ import annotations + +import os + +from mailgun.client import Client + + +key: str = os.environ["APIKEY"] +domain: str = os.environ["DOMAIN"] +mailgun_email = os.environ["MAILGUN_EMAIL"] +role = os.environ["ROLE"] +user_id = os.environ["USER_ID"] +user_name = os.environ["USER_NAME"] + +client: Client = Client(auth=("api", key)) + + +def get_keys() -> None: + """ + GET /v1/keys + :return: + """ + query = {"domain_name": "python.test.domain5", "kind": "web"} + req = client.keys.get(filters=query) + print(req.json()) + + +def post_keys() -> None: + """ + POST /v1/keys + + This code generate a Web API key tied to the account user associated with the data inputted for the USER_EMAIL field and USER_ID values. + This is returned by the API in the "secret":"API_KEY" key/value pair. This key will authenticate the call (Get one's own user details) made to the /v5/users/me endpoint, # pragma: allowlist secret + and will return the user's data associated with the USER_EMAIL and USER_ID values. + + Important Notes: + USER_EMAIL - The user login email address of the user that is trying to make the call to the /v5/users/me endpoint. + SECONDS - How many seconds you want the key to be active before it expires. + ROLE - The role of the API Key. This dictates what permissions the key has (https://help.mailgun.com/hc/en-us/articles/26016288026907-API-Key-Roles) + USER_ID - The internal User ID of the user that is trying to call the /v5/users/me endpoint. This is present in the URL in the address bar when viewing the User details in the GUI or in Admin. Both will show /users/USER_ID in the address. + DESCRIPTION - Description of the key. + + :return: + """ + + data = { + "email": mailgun_email, + "domain_name": "python.test.domain5", + "kind": "web", + "expiration": "3600", + "role": role, + "user_id": user_id, + "user_name": user_name, + "description": "a new key", + } + + headers = {"Content-Type": "multipart/form-data"} + + req = client.keys.create(data=data, headers=headers) + print(req.json()) + + +def delete_key() -> None: + """ + DELETE /vq/keys/{key_id} + :return: + """ + query = {"domain_name": "python.test.domain5", "kind": "web"} + req1 = client.keys.get(filters=query) + items = req1.json()["items"] + + for item in items: + if mailgun_email == item["requestor"]: # codespell:disable-line + req2 = client.keys.delete(key_id=item["id"]) + print(req2.json()) + + +def regenerate_key() -> None: + """ + POST /v1/keys/public + :return: + """ + req = client.keys_public.create() + print(req.json()) + + +if __name__ == "__main__": + # get_keys() + post_keys() + get_keys() + delete_key() + get_keys() + regenerate_key() + get_keys() diff --git a/mailgun/examples/users_examples.py b/mailgun/examples/users_examples.py index 172b122..5d080e0 100644 --- a/mailgun/examples/users_examples.py +++ b/mailgun/examples/users_examples.py @@ -8,6 +8,8 @@ key: str = os.environ["APIKEY"] domain: str = os.environ["DOMAIN"] mailgun_email = os.environ["MAILGUN_EMAIL"] +role = os.environ["ROLE"] + client: Client = Client(auth=("api", key)) @@ -17,7 +19,7 @@ def get_users() -> None: GET /v5/users :return: """ - query = {"role": "admin", "limit": "0", "skip": "0"} + query = {"role": role, "limit": "0", "skip": "0"} req = client.users.get(filters=query) print(req.json()) diff --git a/mailgun/handlers/keys_handler.py b/mailgun/handlers/keys_handler.py new file mode 100644 index 0000000..9982fad --- /dev/null +++ b/mailgun/handlers/keys_handler.py @@ -0,0 +1,35 @@ +"""KEYS HANDLER. + +Doc: https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/keys +""" + +from __future__ import annotations + +from os import path +from typing import Any + + +def handle_keys( + url: dict[str, Any], + _domain: str | None, + _method: str | None, + **kwargs: Any, +) -> Any: + """Handle Keys. + + :param url: Incoming URL dictionary + :type url: dict + :param _domain: Incoming domain (it's not being used for this handler) + :type _domain: str + :param _method: Incoming request method (it's not being used for this handler) + :type _method: str + :param kwargs: kwargs + :return: final url for Keys endpoint + """ + final_keys = path.join("/", *url["keys"]) if url["keys"] else "" + if "key_id" in kwargs: + url = url["base"][:-1] + final_keys + "/" + kwargs["key_id"] + else: + url = url["base"][:-1] + final_keys + + return url diff --git a/pyproject.toml b/pyproject.toml index 50fe3ab..a5008d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,8 @@ spelling = ["typos"] other = ["toml"] +[tool.typos.default.extend-words] +requestor = "requestor" [tool.black] line-length = 100 From 1b3c62cb4d60899c63b249d88d8d3f79217b3c0b Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Thu, 18 Dec 2025 10:58:26 +0200 Subject: [PATCH 03/15] test: Add KeysTests and AsyncKeysTests; add pytest-asyncio as a test dependency --- environment-dev.yaml | 1 + mailgun/examples/users_examples.py | 40 ++- pyproject.toml | 1 + tests/tests.py | 434 ++++++++++++++++++++++------- 4 files changed, 354 insertions(+), 122 deletions(-) diff --git a/environment-dev.yaml b/environment-dev.yaml index e15ed79..79df919 100644 --- a/environment-dev.yaml +++ b/environment-dev.yaml @@ -17,6 +17,7 @@ dependencies: - conda-forge::pyfakefs - coverage >=4.5.4 - pytest + - pytest-asyncio - pytest-benchmark - pytest-cov - pytest-xdist diff --git a/mailgun/examples/users_examples.py b/mailgun/examples/users_examples.py index 5d080e0..87f8934 100644 --- a/mailgun/examples/users_examples.py +++ b/mailgun/examples/users_examples.py @@ -9,6 +9,8 @@ domain: str = os.environ["DOMAIN"] mailgun_email = os.environ["MAILGUN_EMAIL"] role = os.environ["ROLE"] +user_id = os.environ["USER_ID"] +user_name = os.environ["USER_NAME"] client: Client = Client(auth=("api", key)) @@ -29,18 +31,9 @@ def get_own_user_details() -> None: GET /v5/users/me Please note, for the command("Get one's own user details") to be successful, you must use a Web type API key for the call. Private type API keys will Not work. - This below Call will generate a Web API key tied to the account user associated with the data inputted for the USER_EMAIL field and USER_ID values. This is returned by the API in the "secret":"API_KEY" key/value pair. This key will authenticate the call(Get one's own user details) made to the /v5/users/me endpoint, and will return the user's data associated with the USER_EMAIL and USER_ID values. - - Generate Web API Key: - curl -i -X POST \ - -u api:API_KEY \ - https://api.mailgun.net/v1/keys \ - -F email=USER_EMAIL \ - -F kind=web \ - -F expiration=SECONDS (Lifetime of the key in seconds) \ - -F role=ROLE \ - -F user_id=USER_ID \ - -F description=DESCRIPTION + The below Call will generate a Web API key tied to the account user associated with the data inputted for the USER_EMAIL field and USER_ID values. + This is returned by the API in the "secret":"API_KEY" key/value pair. # pragma: allowlist secret + This key will authenticate the call(Get one's own user details) made to the /v5/users/me endpoint, and will return the user's data associated with the USER_EMAIL and USER_ID values. see https://documentation.mailgun.com/docs/mailgun/api-reference/send/mailgun/keys/api.(*keysapi).createkey-fm-7 @@ -54,10 +47,27 @@ def get_own_user_details() -> None: :return: """ - secret: str = os.environ["SECRET"] + + data = { + "email": mailgun_email, + "domain_name": "python.test.domain5", + "kind": "web", + "expiration": "3600", + "role": role, + "user_id": user_id, + "user_name": user_name, + "description": "a new key", + } + + headers = {"Content-Type": "multipart/form-data"} + + req1 = client.keys.create(data=data, headers=headers) + print(req1.json()) + secret = req1.json()["key"]["secret"] + client_with_secret_key: Client = Client(auth=("api", secret)) - req = client_with_secret_key.users.get(user_id="me") - print(req.json()) + req2 = client_with_secret_key.users.get(user_id="me") + print(req2.json()) def get_user_details() -> None: diff --git a/pyproject.toml b/pyproject.toml index a5008d9..3400c62 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -121,6 +121,7 @@ profilers = ["scalene>=1.3.16", "snakeviz"] tests = [ # tests "pytest>=7.0.0", + "pytest-asyncio", "pytest-benchmark", "pytest-order", "pytest-cov", diff --git a/tests/tests.py b/tests/tests.py index 1f4e471..aea64a8 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -2152,6 +2152,136 @@ def test_get_account_tag_incorrect_url_without_limits_part(self) -> None: self.assertIn("not found", req.json()["error"]) +class BounceClassificationTests(unittest.TestCase): + """Tests for Mailgun Bounce Classification API, https://api.mailgun.net/v2/bounce-classification/metrics. + + This class provides setup functionality for tests involving the + functionality with authentication and client initialization handled + in `setUp`. Each test in this suite operates with the configured Mailgun client + instance to simulate API interactions. + + """ + + def setUp(self) -> None: + self.auth: tuple[str, str] = ( + "api", + os.environ["APIKEY"], + ) + self.client: Client = Client(auth=self.auth) + self.domain: str = os.environ["DOMAIN"] + + now = datetime.now() + now_formatted = now.strftime("%a, %d %b %Y %H:%M:%S +0000") + yesterday = now - timedelta(days=1) + yesterday_formatted = yesterday.strftime("%a, %d %b %Y %H:%M:%S +0000") # noqa: FURB184 + + self.payload = { + "start": yesterday_formatted, + "end": now_formatted, + "resolution": "day", + "duration": "24h0m0s", + "dimensions": ["entity-name", "domain.name"], + "metrics": [ + "critical_bounce_count", + "non_critical_bounce_count", + "critical_delay_count", + "non_critical_delay_count", + "delivered_smtp_count", + "classified_failures_count", + "critical_bounce_rate", + "non_critical_bounce_rate", + "critical_delay_rate", + "non_critical_delay_rate", + ], + "filter": { + "AND": [ + { + "attribute": "domain.name", + "comparator": "=", + "values": [{"value": self.domain}], + } + ] + }, + "include_subaccounts": True, + "pagination": {"sort": "entity-name:asc", "limit": 10}, + } + self.payload_without_dimensions = { + "start": yesterday_formatted, + "end": now_formatted, + "metrics": [ + "critical_bounce_count", + ], + "pagination": {"sort": "entity-name:asc", "limit": 10}, + } + self.payload_with_old_dates = { + "start": "Wed, 12 Nov 2000 23:00:00 UTC", + "end": "Thu, 13 Nov 2000 23:00:00 UTC", + "dimensions": ["entity-name", "domain.name"], + "metrics": [ + "critical_bounce_count", + ], + "pagination": {"sort": "entity-name:asc", "limit": 10}, + } + self.empty_payload: dict[str, str] = {} + + def test_post_list_statistic(self) -> None: + """Test to post query to list statistic: Happy Path with valid data.""" + req = self.client.bounceclassification_metrics.create(data=self.payload) + + expected_keys: list[str] = [ + "start", + "end", + "resolution", + "duration", + "dimensions", + "pagination", + "items", + ] + expected_dimensions_elements: list[str] = [ + "entity-name", + "domain.name", + ] + expected_pagination_keys: list[str] = [ + "sort", + "skip", + "limit", + "total", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_dimensions_elements) for key in req.json()["dimensions"]] # type: ignore[func-returns-value] + [self.assertIn(key, expected_pagination_keys) for key in req.json()["pagination"]] # type: ignore[func-returns-value] + + def test_post_list_statistic_without_dimensions(self) -> None: + """Test to post query to list statistic: Wrong Path with invalid data.""" + req = self.client.bounceclassification_metrics.create(data=self.payload_without_dimensions) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("sort should be either one of metrics or dimensions", req.json()["message"]) + + def test_post_list_statistic_with_old_dates(self) -> None: + """Test to post query to list statistic: Wrong Path with invalid data.""" + req = self.client.bounceclassification_metrics.create(data=self.payload_with_old_dates) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("is out of permitted log retention", req.json()["message"]) + + def test_post_list_statistic_with_empty_payload(self) -> None: + """Test to post query to list statistic: Wrong Path with invalid data.""" + req = self.client.bounceclassification_metrics.create(data=self.empty_payload) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] + self.assertIn("is out of permitted log retention", req.json()["message"]) + + class UsersTests(unittest.TestCase): """Tests for Mailgun Users API, https://api.mailgun.net/v5/users. @@ -2303,6 +2433,118 @@ def test_get_invalid_user_details(self) -> None: self.assertIsInstance(req2.json(), dict) self.assertEqual(req2.status_code, 404) + +class KeysTests(unittest.TestCase): + """Tests for Mailgun Keys API, https://api.mailgun.net/v1/keys. + + This class provides setup functionality for tests involving + with authentication and client initialization handled + in `setUp`. Each test in this suite operates with the configured Mailgun client + instance to simulate API interactions. + + """ + + def setUp(self) -> None: + self.auth: tuple[str, str] = ( + "api", + os.environ["APIKEY"], + ) + self.client: Client = Client(auth=self.auth) + self.domain: str = os.environ["DOMAIN"] + self.mailgun_email = os.environ["MAILGUN_EMAIL"] + self.role = os.environ["ROLE"] + self.user_id = os.environ["USER_ID"] + self.user_name = os.environ["USER_NAME"] + + def test_get_keys(self) -> None: + """Test to get the list of Mailgun API keys: happy path with valid data.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} + req = self.client.keys.get(filters=query) + + expected_keys = [ + "total_count", + "items", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + + def test_get_keys_with_invalid_url(self) -> None: + """Test to get the list of Mailgun API keys: expected failure with invalid URL.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} + + with self.assertRaises(KeyError): + self.client.key.get(filters=query) + + def test_get_keys_without_filtering_data(self) -> None: + """Test to get the list of Mailgun API keys: Happy Path without filtering data.""" + req = self.client.keys.get() + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + self.assertGreater(len(req.json()["items"]), 0) + + def test_post_keys(self) -> None: + """Test to create the Mailgun API key: happy path with valid data.""" + data = { + "email": self.mailgun_email, + "domain_name": "python.test.domain5", + "kind": "web", + "expiration": "3600", + "role": self.role, + "user_id": self.user_id, + "user_name": self.user_name, + "description": "a new key", + } + + headers = {"Content-Type": "multipart/form-data"} + + req = self.client.keys.create(data=data, headers=headers) + + expected_keys = [ + "message", + "key", + ] + + expected_key_keys = [ + "id", + "description", + "kind", + "role", + "created_at", + "updated_at", + "expires_at", + "secret", + "is_disabled", + "domain_name", + "requestor", + "user_name", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + self.assertEqual(req.json()["message"], "great success") + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_key_keys) for key in req.json()["key"]] # type: ignore[func-returns-value] + + def test_delete_key(self) -> None: + """Test to delete the Mailgun API keys: happy path with valid data.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} + req1 = self.client.keys.get(filters=query) + items = req1.json()["items"] + + for item in items: + if self.mailgun_email == item["requestor"]: # codespell:disable-line + req2 = self.client.keys.delete(key_id=item["id"]) + self.assertEqual(req2.json()["message"], "key deleted") + + @pytest.mark.skip("Don't regenerate a public Mailgun API without a need") + def test_regenerate_key(self) -> None: + """Test to regenerate the Mailgun API keys: happy path with valid data.""" + self.client.keys_public.create() + + # ============================================================================ # Async Test Classes (using AsyncClient and AsyncEndpoint) # ============================================================================ @@ -4435,7 +4677,7 @@ async def test_get_user_invalid_url(self) -> None: """Test to get account's users details: expected failure with invalid URL.""" query = {"role": "admin", "limit": "0", "skip": "0"} - with self.assertRaises(KeyError) as cm: + with self.assertRaises(KeyError): await self.client.user.get(filters=query) @pytest.mark.xfail @@ -4525,134 +4767,112 @@ async def test_get_invalid_user_details(self) -> None: self.assertEqual(req2.status_code, 404) -class BounceClassificationTests(unittest.TestCase): - """Tests for Mailgun Bounce Classification API, https://api.mailgun.net/v2/bounce-classification/metrics. - - This class provides setup functionality for tests involving the - functionality with authentication and client initialization handled - in `setUp`. Each test in this suite operates with the configured Mailgun client - instance to simulate API interactions. - - """ +class AsyncKeysTests(unittest.IsolatedAsyncioTestCase): + """Async tests for Mailgun Users API using AsyncClient.""" - def setUp(self) -> None: + async def asyncSetUp(self) -> None: self.auth: tuple[str, str] = ( "api", os.environ["APIKEY"], ) - self.client: Client = Client(auth=self.auth) + self.client: AsyncClient = AsyncClient(auth=self.auth) self.domain: str = os.environ["DOMAIN"] + self.mailgun_email = os.environ["MAILGUN_EMAIL"] + self.role = os.environ["ROLE"] + self.user_id = os.environ["USER_ID"] + self.user_name = os.environ["USER_NAME"] - now = datetime.now() - now_formatted = now.strftime("%a, %d %b %Y %H:%M:%S +0000") - yesterday = now - timedelta(days=1) - yesterday_formatted = yesterday.strftime("%a, %d %b %Y %H:%M:%S +0000") # noqa: FURB184 - - self.payload = { - "start": yesterday_formatted, - "end": now_formatted, - "resolution": "day", - "duration": "24h0m0s", - "dimensions": ["entity-name", "domain.name"], - "metrics": [ - "critical_bounce_count", - "non_critical_bounce_count", - "critical_delay_count", - "non_critical_delay_count", - "delivered_smtp_count", - "classified_failures_count", - "critical_bounce_rate", - "non_critical_bounce_rate", - "critical_delay_rate", - "non_critical_delay_rate", - ], - "filter": { - "AND": [ - { - "attribute": "domain.name", - "comparator": "=", - "values": [{"value": self.domain}], - } - ] - }, - "include_subaccounts": True, - "pagination": {"sort": "entity-name:asc", "limit": 10}, - } - self.payload_without_dimensions = { - "start": yesterday_formatted, - "end": now_formatted, - "metrics": [ - "critical_bounce_count", - ], - "pagination": {"sort": "entity-name:asc", "limit": 10}, - } - self.payload_with_old_dates = { - "start": "Wed, 12 Nov 2000 23:00:00 UTC", - "end": "Thu, 13 Nov 2000 23:00:00 UTC", - "dimensions": ["entity-name", "domain.name"], - "metrics": [ - "critical_bounce_count", - ], - "pagination": {"sort": "entity-name:asc", "limit": 10}, - } - self.empty_payload: dict[str, str] = {} + async def asyncTearDown(self) -> None: + await self.client.aclose() - def test_post_list_statistic(self) -> None: - """Test to post query to list statistic: Happy Path with valid data.""" - req = self.client.bounceclassification_metrics.create(data=self.payload) + async def test_get_keys(self) -> None: + """Test to get the list of Mailgun API keys: happy path with valid data.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} + req = await self.client.keys.get(filters=query) - expected_keys: list[str] = [ - "start", - "end", - "resolution", - "duration", - "dimensions", - "pagination", + expected_keys = [ + "total_count", "items", ] - expected_dimensions_elements: list[str] = [ - "entity-name", - "domain.name", - ] - expected_pagination_keys: list[str] = [ - "sort", - "skip", - "limit", - "total", - ] self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 200) - [self.assertIn(key, expected_keys) for key in req.json().keys()] # type: ignore[func-returns-value] - [self.assertIn(key, expected_dimensions_elements) for key in req.json()["dimensions"]] # type: ignore[func-returns-value] - [self.assertIn(key, expected_pagination_keys) for key in req.json()["pagination"]] # type: ignore[func-returns-value] + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] - def test_post_list_statistic_without_dimensions(self) -> None: - """Test to post query to list statistic: Wrong Path with invalid data.""" - req = self.client.bounceclassification_metrics.create(data=self.payload_without_dimensions) + @pytest.mark.asyncio + async def test_get_keys_with_invalid_url(self) -> None: + """Test to get the list of Mailgun API keys: expected failure with invalid URL.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} - self.assertIsInstance(req.json(), dict) - self.assertEqual(req.status_code, 400) - [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] - self.assertIn("sort should be either one of metrics or dimensions", req.json()["message"]) + with pytest.raises(KeyError): + await self.client.key.get(filters=query) - def test_post_list_statistic_with_old_dates(self) -> None: - """Test to post query to list statistic: Wrong Path with invalid data.""" - req = self.client.bounceclassification_metrics.create(data=self.payload_with_old_dates) + async def test_get_keys_without_filtering_data(self) -> None: + """Test to get the list of Mailgun API keys: Happy Path without filtering data.""" + req = await self.client.keys.get() self.assertIsInstance(req.json(), dict) - self.assertEqual(req.status_code, 400) - [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] - self.assertIn("is out of permitted log retention", req.json()["message"]) + self.assertEqual(req.status_code, 200) + self.assertGreater(len(req.json()["items"]), 0) - def test_post_list_statistic_with_empty_payload(self) -> None: - """Test to post query to list statistic: Wrong Path with invalid data.""" - req = self.client.bounceclassification_metrics.create(data=self.empty_payload) + async def test_post_keys(self) -> None: + """Test to create the Mailgun API key: happy path with valid data.""" + data = { + "email": self.mailgun_email, + "domain_name": "python.test.domain5", + "kind": "web", + "expiration": "3600", + "role": self.role, + "user_id": self.user_id, + "user_name": self.user_name, + "description": "a new key", + } + + headers = {"Content-Type": "multipart/form-data"} + + req = await self.client.keys.create(data=data, headers=headers) + + expected_keys = [ + "message", + "key", + ] + + expected_key_keys = [ + "id", + "description", + "kind", + "role", + "created_at", + "updated_at", + "expires_at", + "secret", + "is_disabled", + "domain_name", + "requestor", + "user_name", + ] self.assertIsInstance(req.json(), dict) - self.assertEqual(req.status_code, 400) - [self.assertIn(key, "message") for key in req.json().keys()] # type: ignore[func-returns-value] - self.assertIn("is out of permitted log retention", req.json()["message"]) + self.assertEqual(req.status_code, 200) + self.assertEqual(req.json()["message"], "great success") + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_key_keys) for key in req.json()["key"]] # type: ignore[func-returns-value] + + async def test_delete_key(self) -> None: + """Test to delete the Mailgun API keys: happy path with valid data.""" + query = {"domain_name": "python.test.domain5", "kind": "web"} + req1 = await self.client.keys.get(filters=query) + items = req1.json()["items"] + + for item in items: + if self.mailgun_email == item["requestor"]: # codespell:disable-line + req2 = await self.client.keys.delete(key_id=item["id"]) + self.assertEqual(req2.json()["message"], "key deleted") + + @pytest.mark.skip("Don't regenerate a public Mailgun API without a need") + async def test_regenerate_key(self) -> None: + """Test to regenerate the Mailgun API keys: happy path with valid data.""" + await self.client.keys_public.create() if __name__ == "__main__": From ea00120fb185f3e1056beb4195ff07c417749da8 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Mon, 22 Dec 2025 11:50:46 +0200 Subject: [PATCH 04/15] test: Add dkim tests to DomainTests and test_get_dkim_keys to AsyncTests; fix bugs; replace skip with xfail --- mailgun/client.py | 14 +- mailgun/examples/domain_examples.py | 18 +- pyproject.toml | 2 + tests/tests.py | 307 ++++++++++++++++++++++++++-- 4 files changed, 304 insertions(+), 37 deletions(-) diff --git a/mailgun/client.py b/mailgun/client.py index e97b84c..3b5dbe2 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -163,12 +163,6 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: if key in special_cases: return special_cases[key], headers - if "dkim" in key: - return { - "base": v1_base, - "keys": key.split("_"), - }, headers - if "analytics" in key: headers |= {"Content-Type": "application/json"} return { @@ -260,6 +254,14 @@ def __getitem__(self, key: str) -> tuple[Any, dict[str, str]]: base = v3_base if any(x in key for x in v3_domain_endpoints) else v4_base return {"base": f"{base}domains/", "keys": final_keys}, headers + # "dkim" must follow after "dkim_management", "dkimauthority", "dkimselector", + # otherwise a wrong base url will be chosen. + if "dkim" in key: + return { + "base": v1_base, + "keys": key.split("_"), + }, headers + if "addressvalidate" in key: return { "base": f"{v4_base}address/validate", diff --git a/mailgun/examples/domain_examples.py b/mailgun/examples/domain_examples.py index 81b70b3..dfab2ce 100644 --- a/mailgun/examples/domain_examples.py +++ b/mailgun/examples/domain_examples.py @@ -154,7 +154,7 @@ def put_dkim_authority() -> None: PUT /domains//dkim_authority :return: """ - data = {"self": "false"} + data = {"self": "true"} request = client.domains_dkimauthority.put(domain=domain, data=data) print(request.json()) @@ -232,20 +232,6 @@ def post_dkim_keys() -> None: headers = {"Content-Type": "multipart/form-data"} request = client.dkim_keys.create(data=data, headers=headers, files=files) - # TODO: Create test cases for these failures: - # - # {'message': 'failed to import domain key: failed to parse PEM'} - - # You must remove a domain key on WEB UI https://app.mailgun.com/mg/sending/domains selecting your "signing_domain" - # - # {'message': 'failed to create domain key: duplicate key'} - - # If you provide a string instead of a file - # - # {'message': 'failed to import domain key: failed to parse PEM'} - - # - # {'message': 'failed to import domain key: failed to parse private key: key must be PKCS1 format'} print(request.json()) @@ -266,4 +252,6 @@ def delete_dkim_keys() -> None: post_dkim_keys() get_dkim_keys() + get_sending_queues() + put_dkim_authority() delete_dkim_keys() diff --git a/pyproject.toml b/pyproject.toml index 3400c62..838db4f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -127,6 +127,8 @@ tests = [ "pytest-cov", "coverage>=4.5.4", "codecov", + # For a Private key PEM file generated in PKCS1 format. + "openssl", ] conda_build = ["conda-build"] diff --git a/tests/tests.py b/tests/tests.py index aea64a8..7773b71 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -5,8 +5,10 @@ import json import os import string +import subprocess import unittest import random +from pathlib import Path from typing import Any from datetime import datetime, timedelta @@ -129,6 +131,8 @@ def tearDown(self) -> None: # otherwise, test_delete_domain and test_verify_domain will fail with a new run of tests self.client.domains.delete(domain=self.test_domain) + # Make sure that you can Add New Domain (see https://app.mailgun.com/mg/sending/new-domain) in your Mailgun Plan, + # otherwise you get Error 403. @pytest.mark.order(1) def test_post_domain(self) -> None: self.client.domains.delete(domain=self.test_domain) @@ -145,7 +149,7 @@ def test_post_domain_creds(self) -> None: self.assertEqual(request.status_code, 200) self.assertIn("message", request.json()) - @pytest.mark.order(2) + @pytest.mark.order(3) def test_update_simple_domain(self) -> None: self.client.domains.delete(domain=self.test_domain) self.client.domains.create(data=self.post_domain_data) @@ -154,7 +158,7 @@ def test_update_simple_domain(self) -> None: self.assertEqual(request.status_code, 200) self.assertEqual(request.json()["message"], "Domain has been updated") - @pytest.mark.order(2) + @pytest.mark.order(3) def test_put_domain_creds(self) -> None: self.client.domains_credentials.create( domain=self.domain, @@ -169,7 +173,7 @@ def test_put_domain_creds(self) -> None: self.assertEqual(request.status_code, 200) self.assertIn("message", request.json()) - @pytest.mark.order(2) + @pytest.mark.order(3) def test_put_mailboxes_credentials(self) -> None: """Test to update Mailgun SMTP credentials: Happy Path with valid data.""" self.client.domains_credentials.create( @@ -206,7 +210,8 @@ def test_get_smtp_creds(self) -> None: self.assertEqual(request.status_code, 200) self.assertIn("items", request.json()) - @pytest.mark.order(3) + @pytest.mark.order(4) + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") def test_get_sending_queues(self) -> None: self.client.domains.delete(domain=self.test_domain) self.client.domains.create(data=self.post_domain_data) @@ -215,7 +220,7 @@ def test_get_sending_queues(self) -> None: self.assertIn("scheduled", request.json()) @pytest.mark.order(4) - @pytest.mark.skip("The test can fail because the domain name is a random string") + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") def test_get_single_domain(self) -> None: self.client.domains.create(data=self.post_domain_data) req = self.client.domains.get(domain_name=self.post_domain_data["name"]) @@ -224,7 +229,7 @@ def test_get_single_domain(self) -> None: self.assertIn("domain", req.json()) @pytest.mark.order(5) - @pytest.mark.skip("The test can fail because the domain name is a random string") + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") def test_verify_domain(self) -> None: self.client.domains.create(data=self.post_domain_data) req = self.client.domains.put(domain=self.post_domain_data["name"], verify=True) @@ -295,6 +300,231 @@ def test_put_dkim_selector(self) -> None: ) self.assertIn("message", request.json()) + @pytest.mark.order(6) + def test_get_dkim_keys(self) -> None: + """Test to get keys for all domains: happy path with valid data.""" + data = { + "page": "string", + "limit": "0", + "signing_domain": "python.test.domain5", + "selector": "smtp", + } + + req = self.client.dkim_keys.get(data=data) + + expected_keys = [ + "items", + "paging", + ] + + expected_items_keys = [ + "signing_domain", + "selector", + "dns_record", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_items_keys) for key in req.json()["items"][0]] # type: ignore[func-returns-value] + + @pytest.mark.order(6) + def test_post_dkim_keys(self) -> None: + """Test to create a domain key: happy path with valid data.""" + # Private key PEM file must be generated in PKCS1 format. You need 'openssl' on your machine + # openssl genrsa -traditional -out .server.key 2048 + subprocess.run(["openssl", "genrsa", "-traditional", "-out", ".server.key", "2048"]) + server_key_path = Path(".server.key") + files = [ + ( + "pem", + ("server.key", server_key_path.read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + req = self.client.dkim_keys.create(data=data, headers=headers, files=files) + + expected_keys = [ + "signing_domain", + "selector", + "dns_record", + ] + + expected_dns_record_keys = [ + "is_active", + "cached", + "name", + "record_type", + "valid", + "value", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_dns_record_keys) for key in req.json()["dns_record"]] # type: ignore[func-returns-value] + + # Also you can remove a domain key on WEB UI https://app.mailgun.com/mg/sending/domains selecting your "signing_domain" + query = {"signing_domain": "python.test.domain5", "selector": "smtp"} + req2 = self.client.dkim_keys.delete(filters=query) + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 200) + self.assertIn("success", req2.json()["message"]) # type: ignore[func-returns-value] + + server_key_path.unlink(missing_ok=True) + print(f"File {server_key_path} has been removed.") + + @pytest.mark.order(6) + def test_post_dkim_keys_invalid_pem_string(self) -> None: + """Test to create a domain key: expected failure to parse PEM from string.""" + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": "lorem ipsum", + } + + req = self.client.dkim_keys.create(data=data) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) # type: ignore[func-returns-value] + + @pytest.mark.order(6) + def test_post_dkim_keys_if_duplicate_key_exists(self) -> None: + """Test to create a domain key: expected failure because a duplicate key exists""" + + subprocess.run(["openssl", "genrsa", "-traditional", "-out", ".server.key", "2048"]) + server_key_path = Path(".server.key") + files = [ + ( + "pem", + ("server.key", server_key_path.read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + req = self.client.dkim_keys.create(data=data, headers=headers, files=files) + + expected_keys = [ + "signing_domain", + "selector", + "dns_record", + ] + + expected_dns_record_keys = [ + "is_active", + "cached", + "name", + "record_type", + "valid", + "value", + ] + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_dns_record_keys) for key in req.json()["dns_record"]] # type: ignore[func-returns-value] + + req2 = self.client.dkim_keys.create(data=data, headers=headers, files=files) + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 400) + self.assertIn("failed to create domain key: duplicate key", req2.json()["message"]) # type: ignore[func-returns-value] + + @pytest.mark.order(6) + def test_post_dkim_keys_key_must_be_pkcs1_format(self) -> None: + """Test to create a domain key: expected failure because a key must be PKCS1 format""" + + subprocess.run(["openssl", "genpkey", "-algorithm", "Ed25519", "-out", ".server.key"]) + server_key_path = Path(".server.key") + files = [ + ( + "pem", + ("server.key", server_key_path.read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + req = self.client.dkim_keys.create(data=data, headers=headers, files=files) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + self.assertIn("failed to parse private key: key must be PKCS1 format", req.json()["message"]) # type: ignore[func-returns-value] + + @pytest.mark.order(7) + def test_delete_dkim_keys(self) -> None: + """Test to delete a domain key: happy path with valid data.""" + query = {"signing_domain": "python.test.domain5", "selector": "smtp"} + + req = self.client.dkim_keys.delete(filters=query) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + self.assertIn("success", req.json()["message"]) # type: ignore[func-returns-value] + + @pytest.mark.order(7) + def test_delete_non_existing_dkim_keys(self) -> None: + """Test to delete a domain key: expected failure if a domain doesn't exist.""" + subprocess.run(["openssl", "genrsa", "-traditional", "-out", ".server.key", "2048"]) + server_key_path = Path(".server.key") + files = [ + ( + "pem", + ("server.key", server_key_path.read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + self.client.dkim_keys.create(data=data, headers=headers, files=files) + + query = {"signing_domain": "python.test.domain5", "selector": "smtp"} + + req1 = self.client.dkim_keys.delete(filters=query) + self.assertIsInstance(req1.json(), dict) + self.assertEqual(req1.status_code, 200) + self.assertIn("success", req1.json()["message"]) # type: ignore[func-returns-value] + + req2 = self.client.dkim_keys.delete(filters=query) + + self.assertIsInstance(req2.json(), dict) + self.assertEqual(req2.status_code, 404) + self.assertIn("domain key not found", req2.json()["message"]) # type: ignore[func-returns-value] + @pytest.mark.order(7) def test_delete_domain_creds(self) -> None: self.client.domains_credentials.create( @@ -321,6 +551,7 @@ def test_delete_all_domain_credentials(self) -> None: "All domain credentials have been deleted") @pytest.mark.order(8) + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") def test_delete_domain(self) -> None: self.client.domains.create(data=self.post_domain_data) request = self.client.domains.delete(domain=self.test_domain) @@ -1844,9 +2075,7 @@ def test_post_query_get_account_usage_metrics_invalid_data(self) -> None: req = self.client.analytics_usage_metrics.create( data=self.invalid_account_usage_metrics_data, ) - from pprint import pprint - pprint(req.json()) self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 400) self.assertNotIn("items", req.json()) @@ -2644,6 +2873,8 @@ async def asyncTearDown(self) -> None: await self.client.domains.delete(domain=self.test_domain) await self.client.aclose() + # Make sure that you can Add New Domain (see https://app.mailgun.com/mg/sending/new-domain) in your Mailgun Plan, + # otherwise you get Error 403. @pytest.mark.order(1) async def test_post_domain(self) -> None: await self.client.domains.delete(domain=self.test_domain) @@ -2693,8 +2924,6 @@ async def test_put_mailboxes_credentials(self) -> None: ) name = "alice_bob" req = await self.client.mailboxes.put(domain=self.domain, login=f"{name}@{self.domain}") - print(req) - print(req.json()) expected_keys = [ "message", @@ -2724,6 +2953,7 @@ async def test_get_smtp_creds(self) -> None: self.assertIn("items", request.json()) @pytest.mark.order(3) + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") async def test_get_sending_queues(self) -> None: await self.client.domains.delete(domain=self.test_domain) await self.client.domains.create(data=self.post_domain_data) @@ -2732,7 +2962,7 @@ async def test_get_sending_queues(self) -> None: self.assertIn("scheduled", request.json()) @pytest.mark.order(4) - @pytest.mark.skip("The test can fail because the domain name is a random string") + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") async def test_get_single_domain(self) -> None: await self.client.domains.create(data=self.post_domain_data) req = await self.client.domains.get(domain_name=self.post_domain_data["name"]) @@ -2741,7 +2971,7 @@ async def test_get_single_domain(self) -> None: self.assertIn("domain", req.json()) @pytest.mark.order(5) - @pytest.mark.skip("The test can fail because the domain name is a random string") + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") async def test_verify_domain(self) -> None: await self.client.domains.create(data=self.post_domain_data) req = await self.client.domains.put(domain=self.post_domain_data["name"], verify=True) @@ -2812,14 +3042,60 @@ async def test_put_dkim_selector(self) -> None: ) self.assertIn("message", request.json()) + @pytest.mark.order(6) + async def test_get_dkim_keys(self) -> None: + """Test to get keys for all domains: happy path with valid data.""" + data = { + "page": "string", + "limit": "0", + "signing_domain": "python.test.domain5", + "selector": "smtp", + } + + req = await self.client.dkim_keys.get(data=data) + + expected_keys = [ + "items", + "paging", + ] + + expected_items_keys = [ + "signing_domain", + "selector", + "dns_record", + ] + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 200) + [self.assertIn(key, expected_keys) for key in req.json()] # type: ignore[func-returns-value] + [self.assertIn(key, expected_items_keys) for key in req.json()["items"][0]] # type: ignore[func-returns-value] + + @pytest.mark.order(6) + async def test_post_dkim_keys_invalid_pem_string(self) -> None: + """Test to create a domain key: expected failure to parse PEM from string.""" + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": "lorem ipsum", + } + + req = await self.client.dkim_keys.create(data=data) + + self.assertIsInstance(req.json(), dict) + self.assertEqual(req.status_code, 400) + self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) # type: ignore[func-returns-value] + @pytest.mark.order(7) + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") async def test_delete_domain_creds(self) -> None: await self.client.domains_credentials.create( - domain=self.domain, + domain=self.test_domain, data=self.post_domain_creds, ) request = await self.client.domains_credentials.delete( - domain=self.domain, + domain=self.test_domain, login="alice_bob", ) @@ -2836,6 +3112,7 @@ async def test_delete_all_domain_credentials(self) -> None: self.assertIn(request.json()["message"], "All domain credentials have been deleted") @pytest.mark.order(8) + @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") async def test_delete_domain(self) -> None: await self.client.domains.create(data=self.post_domain_data) request = await self.client.domains.delete(domain=self.test_domain) @@ -4318,9 +4595,7 @@ async def test_post_query_get_account_usage_metrics_invalid_data(self) -> None: req = await self.client.analytics_usage_metrics.create( data=self.invalid_account_usage_metrics_data, ) - from pprint import pprint - pprint(req.json()) self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 400) self.assertNotIn("items", req.json()) From 3a338e423b96e7461ecfb6a7df926c800b29cd04 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Mon, 22 Dec 2025 13:39:08 +0200 Subject: [PATCH 05/15] Add openssl to test dependencies --- environment-dev.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/environment-dev.yaml b/environment-dev.yaml index 79df919..3e0a968 100644 --- a/environment-dev.yaml +++ b/environment-dev.yaml @@ -16,6 +16,7 @@ dependencies: # tests - conda-forge::pyfakefs - coverage >=4.5.4 + - openssl - pytest - pytest-asyncio - pytest-benchmark From 13a854a7c24b48d1ccd6b7c06ee661e6861c5442 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 10:32:23 +0200 Subject: [PATCH 06/15] docs: Add Domain Keys and Keys examples to README.md; improve the users' example; fix structure --- README.md | 150 ++++++++++++++++++++++++++++- mailgun/examples/users_examples.py | 2 +- 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 1b97d3b..e8d5a43 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,8 @@ Check out all the resources and Python code examples in the official - [Overview](#overview) - [Base URL](#base-url) - [Authentication](#authentication) + - [Client](#client) + - [AsyncClient](#asyncclient) - [API Response Codes](#api-response-codes) - [Request examples](#request-examples) - [Full list of supported endpoints](#full-list-of-supported-endpoints) @@ -37,6 +39,8 @@ Check out all the resources and Python code examples in the official - [Update a domain](#update-a-domain) - [Domain connections](#domain-connections) - [Domain keys](#domain-keys) + - [List keys for all domains](#list-keys-for-all-domains) + - [Create a domain key](#create-a-domain-key) - [Update DKIM authority](#update-dkim-authority) - [Domain Tracking](#domain-tracking) - [Get tracking settings](#get-tracking-settings) @@ -234,8 +238,14 @@ export DOMAINS_DEDICATED_IP="127.0.0.1" export MAILLIST_ADDRESS="everyone@mailgun.domain.com" export VALIDATION_ADDRESS_1="test1@i.ua" export VALIDATION_ADDRESS_2="test2@gmail.com" +export MAILGUN_EMAIL="username@example.com" +export USER_ID="123456789012345678901234" +export USER_NAME="Name Surname" +export ROLE="admin" ``` +### Client + Initialize your [Mailgun](http://www.mailgun.com/) client: ```python @@ -501,6 +511,68 @@ def get_connections() -> None: #### Domain keys +### List keys for all domains + +List domain keys, and optionally filter by signing domain or selector. The page & limit data is only required when paging through the data. + +```python +def get_dkim_keys() -> None: + """ + GET /v1/dkim/keys + :return: + """ + data = { + "page": "string", + "limit": "0", + "signing_domain": "python.test.domain5", + "selector": "smtp", + } + + request = client.dkim_keys.get(data=data) + print(request.json()) +``` + +#### Create a domain key + +Create a domain key. +Note that once private keys are created or imported they are never exported. +Alternatively, you can import an existing PEM file containing a RSA private key in PKCS #1, ASn.1 DER format. +Note, the pem can be passed as a file attachment or as a form-string parameter. + +```python +def post_dkim_keys() -> None: + """ + POST /v1/dkim/keys + :return: + """ + import subprocess + from pathlib import Path + + # Private key PEM file must be generated in PKCS1 format. You need 'openssl' on your machine + # example: + # openssl genrsa -traditional -out .server.key 2048 + subprocess.run(["openssl", "genrsa", "-traditional", "-out", ".server.key", "2048"]) + + files = [ + ( + "pem", + ("server.key", Path(".server.key").read_bytes()), + ) + ] + + data = { + "signing_domain": "python.test.domain5", + "selector": "smtp", + "bits": "2048", + "pem": files, + } + + headers = {"Content-Type": "multipart/form-data"} + + request = client.dkim_keys.create(data=data, headers=headers, files=files) + print(request.json()) +``` + ##### Update DKIM authority ```python @@ -1313,6 +1385,76 @@ def get_all_inbox() -> None: print(req.json()) ``` +### Keys + +The Keys API lets you view and manage api keys. + +#### List Mailgun API keys + +```python +def get_keys() -> None: + """ + GET /v1/keys + :return: + """ + query = {"domain_name": "python.test.domain5", "kind": "web"} + req = client.keys.get(filters=query) + print(req.json()) +``` + +#### Create Mailgun API key + +```python +import os + +from mailgun.client import Client + + +key: str = os.environ["APIKEY"] +domain: str = os.environ["DOMAIN"] +mailgun_email = os.environ["MAILGUN_EMAIL"] +role = os.environ["ROLE"] +user_id = os.environ["USER_ID"] +user_name = os.environ["USER_NAME"] + +client: Client = Client(auth=("api", key)) + + +def post_keys() -> None: + """ + POST /v1/keys + + This code generate a Web API key tied to the account user associated with the data inputted for the USER_EMAIL field and USER_ID values. + This is returned by the API in the "secret":"API_KEY" key/value pair. This key will authenticate the call (Get one's own user details) made to the /v5/users/me endpoint, # pragma: allowlist secret + and will return the user's data associated with the USER_EMAIL and USER_ID values. + + Important Notes: + USER_EMAIL - The user login email address of the user that is trying to make the call to the /v5/users/me endpoint. + SECONDS - How many seconds you want the key to be active before it expires. + ROLE - The role of the API Key. This dictates what permissions the key has (https://help.mailgun.com/hc/en-us/articles/26016288026907-API-Key-Roles) + USER_ID - The internal User ID of the user that is trying to call the /v5/users/me endpoint. This is present in the URL in the address bar when viewing the User details in the GUI or in Admin. Both will show /users/USER_ID in the address. + DESCRIPTION - Description of the key. + + :return: + """ + + data = { + "email": mailgun_email, + "domain_name": "python.test.domain5", + "kind": "web", + "expiration": "3600", + "role": role, + "user_id": user_id, + "user_name": user_name, + "description": "a new key", + } + + headers = {"Content-Type": "multipart/form-data"} + + req = client.keys.create(data=data, headers=headers) + print(req.json()) +``` + ### Credentials #### List Mailgun SMTP credential metadata for a given domain @@ -1361,13 +1503,17 @@ def get_users() -> None: #### Get a user's details ```python +mailgun_email = os.environ["MAILGUN_EMAIL"] +role = os.environ["ROLE"] +user_name = os.environ["USER_NAME"] + + def get_user_details() -> None: """ GET /v5/users/{user_id} :return: """ - mailgun_email = os.environ["MAILGUN_EMAIL"] - query = {"role": "admin", "limit": "0", "skip": "0"} + query = {"role": role, "limit": "0", "skip": "0"} req1 = client.users.get(filters=query) users = req1.json()["users"] diff --git a/mailgun/examples/users_examples.py b/mailgun/examples/users_examples.py index 87f8934..80b637c 100644 --- a/mailgun/examples/users_examples.py +++ b/mailgun/examples/users_examples.py @@ -75,7 +75,7 @@ def get_user_details() -> None: GET /v5/users/{user_id} :return: """ - query = {"role": "admin", "limit": "0", "skip": "0"} + query = {"role": role, "limit": "0", "skip": "0"} req1 = client.users.get(filters=query) users = req1.json()["users"] From 35d6d85efb88d4f90eb33164f6617dcab1a70eeb Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 12:07:30 +0200 Subject: [PATCH 07/15] ci: Update pre-commit hooks --- .pre-commit-config.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 95c5e42..e5add27 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -71,7 +71,7 @@ repos: # exclude: ^tests - repo: https://github.com/python-jsonschema/check-jsonschema - rev: 0.35.0 + rev: 0.36.0 hooks: - id: check-github-workflows files: ^\.github/workflows/.*\.ya?ml$ @@ -118,7 +118,7 @@ repos: - repo: https://github.com/charliermarsh/ruff-pre-commit # Ruff version. - rev: v0.14.8 + rev: v0.14.10 hooks: # Run the linter. - id: ruff-check @@ -139,7 +139,7 @@ repos: - id: refurb - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.19.0 + rev: v1.19.1 hooks: - id: mypy args: [--config-file=./pyproject.toml] @@ -163,7 +163,7 @@ repos: additional_dependencies: [".[toml]"] - repo: https://github.com/crate-ci/typos - rev: v1.38.1 + rev: v1.40.0 hooks: - id: typos From 5f340a6593c8b5e8cfc3568ff94041dc9c5c15b2 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 13:21:57 +0200 Subject: [PATCH 08/15] style: Fix typing issues, clean up type ignores; apply linters --- environment-dev.yaml | 2 +- environment.yaml | 2 +- mailgun/_version.py | 2 +- mailgun/client.py | 8 +++++++- pyproject.toml | 2 +- tests/tests.py | 20 +++++++++++--------- 6 files changed, 22 insertions(+), 14 deletions(-) diff --git a/environment-dev.yaml b/environment-dev.yaml index 3e0a968..50e4e7c 100644 --- a/environment-dev.yaml +++ b/environment-dev.yaml @@ -12,7 +12,7 @@ dependencies: # runtime deps - requests >=2.32.5 - httpx >=0.24.0 - - typing_extensions >=4.7.1 + - typing_extensions >=4.7.1 # [py<311] # tests - conda-forge::pyfakefs - coverage >=4.5.4 diff --git a/environment.yaml b/environment.yaml index 0ba011b..d023974 100644 --- a/environment.yaml +++ b/environment.yaml @@ -9,7 +9,7 @@ dependencies: # runtime deps - requests >=2.32.5 - httpx >=0.24.0 - - typing_extensions >=4.7.1 + - typing_extensions >=4.7.1 # [py<311] # tests - pytest >=7.0.0 # other diff --git a/mailgun/_version.py b/mailgun/_version.py index c179ed2..5b60188 100644 --- a/mailgun/_version.py +++ b/mailgun/_version.py @@ -1 +1 @@ -__version__ = "1.5.0" \ No newline at end of file +__version__ = "1.5.0" diff --git a/mailgun/client.py b/mailgun/client.py index 3b5dbe2..ead3851 100644 --- a/mailgun/client.py +++ b/mailgun/client.py @@ -17,6 +17,7 @@ import io import json +import sys from collections import defaultdict from typing import TYPE_CHECKING from typing import Any @@ -24,7 +25,6 @@ import httpx import requests -from typing_extensions import Self from mailgun.handlers.bounce_classification_handler import handle_bounce_classification from mailgun.handlers.default_handler import handle_default @@ -52,6 +52,12 @@ from mailgun.handlers.users_handler import handle_users +if sys.version_info >= (3, 11): + from typing import Self +else: + from typing_extensions import Self + + if TYPE_CHECKING: import types from collections.abc import Callable diff --git a/pyproject.toml b/pyproject.toml index 838db4f..e30f0eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,7 @@ license-files = ["LICENSE"] readme = "README.md" requires-python = ">=3.10" -dependencies = ["requests>=2.32.5", "httpx>=0.24.0", "typing_extensions>=4.7.1"] +dependencies = ["requests>=2.32.5", "httpx>=0.24.0", "typing_extensions>=4.7.1; python_version<'3.11'"] keywords = [ "Python SDK for Mailgun", diff --git a/tests/tests.py b/tests/tests.py index 7773b71..b2b8f50 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -379,7 +379,7 @@ def test_post_dkim_keys(self) -> None: self.assertIsInstance(req2.json(), dict) self.assertEqual(req2.status_code, 200) - self.assertIn("success", req2.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("success", req2.json()["message"]) server_key_path.unlink(missing_ok=True) print(f"File {server_key_path} has been removed.") @@ -399,7 +399,7 @@ def test_post_dkim_keys_invalid_pem_string(self) -> None: self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 400) - self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) @pytest.mark.order(6) def test_post_dkim_keys_if_duplicate_key_exists(self) -> None: @@ -448,7 +448,7 @@ def test_post_dkim_keys_if_duplicate_key_exists(self) -> None: self.assertIsInstance(req2.json(), dict) self.assertEqual(req2.status_code, 400) - self.assertIn("failed to create domain key: duplicate key", req2.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("failed to create domain key: duplicate key", req2.json()["message"]) @pytest.mark.order(6) def test_post_dkim_keys_key_must_be_pkcs1_format(self) -> None: @@ -476,7 +476,9 @@ def test_post_dkim_keys_key_must_be_pkcs1_format(self) -> None: self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 400) - self.assertIn("failed to parse private key: key must be PKCS1 format", req.json()["message"]) # type: ignore[func-returns-value] + self.assertIn( + "failed to parse private key: key must be PKCS1 format", req.json()["message"] + ) @pytest.mark.order(7) def test_delete_dkim_keys(self) -> None: @@ -487,7 +489,7 @@ def test_delete_dkim_keys(self) -> None: self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 200) - self.assertIn("success", req.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("success", req.json()["message"]) @pytest.mark.order(7) def test_delete_non_existing_dkim_keys(self) -> None: @@ -517,13 +519,13 @@ def test_delete_non_existing_dkim_keys(self) -> None: req1 = self.client.dkim_keys.delete(filters=query) self.assertIsInstance(req1.json(), dict) self.assertEqual(req1.status_code, 200) - self.assertIn("success", req1.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("success", req1.json()["message"]) req2 = self.client.dkim_keys.delete(filters=query) self.assertIsInstance(req2.json(), dict) self.assertEqual(req2.status_code, 404) - self.assertIn("domain key not found", req2.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("domain key not found", req2.json()["message"]) @pytest.mark.order(7) def test_delete_domain_creds(self) -> None: @@ -2577,7 +2579,7 @@ def test_get_user_invalid_url(self) -> None: """Test to get account's users details: expected failure with invalid URL.""" query = {"role": "admin", "limit": "0", "skip": "0"} - with self.assertRaises(KeyError) as cm: + with self.assertRaises(KeyError): self.client.user.get(filters=query) @pytest.mark.xfail @@ -3085,7 +3087,7 @@ async def test_post_dkim_keys_invalid_pem_string(self) -> None: self.assertIsInstance(req.json(), dict) self.assertEqual(req.status_code, 400) - self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) # type: ignore[func-returns-value] + self.assertIn("failed to import domain key: failed to parse PEM", req.json()["message"]) @pytest.mark.order(7) @pytest.mark.xfail(reason="The test can fail because the domain name is a random string") From 5f8d4a3119baf11979527f26b4358d35ee3fa870 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 13:37:35 +0200 Subject: [PATCH 09/15] docs: Update changelog --- CHANGELOG.md | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8bb8dda..be7fb71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,48 @@ We [keep a changelog.](http://keepachangelog.com/) ## [Unreleased] +### Added + +- Add Keys and Domain Keys API endpoints: + + - Add `handle_keys` to `mailgun.handlers.keys_handler`. + - Add `handle_dkimkeys` to `mailgun.handlers.domains_handler`. + - Add "dkim" key to special cases in the class `Config`. + +- Examples: + + - Add the `get_dkim_keys()`, `post_dkim_keys()`, `delete_dkim_keys()` examples to `mailgun/examples/domain_examples.py`. + - Add the `get_keys()`, `post_keys()`, `delete_key()`, `regenerate_key()` examples to `mailgun/examples/keys_examples.py`. + +- Docs: + + - Add `Keys` and `Domain Keys` sections with examples to `README.md`. + - Add docstrings to the test class `KeysTests` & `AsyncKeysTests` and their methods. + +- Tests: + + - Add dkim keys tests to `DomainTests` and only `test_get_dkim_keys`, `test_post_dkim_keys_invalid_pem_string` to `AsyncDomainTests`. + - Add classes `KeysTests` and `AsyncKeysTests` to `tests/tests.py`. + - Add keys tests to `KeysTests` and `AsyncKeysTests`. + +### Changed + +- Update `get_own_user_details()` by creating `client_with_secret_key` in `mailgun/examples/users_examples.py`. +- Improve the users' example in `README.md`. +- Fix markdown structure in `README.md`. +- Update environment variables in `README.md`. +- Move `BounceClassificationTests` to another place in `tests/tests.py`. +- Replace some pytest's skip marks with xfail. +- Disable `codespell` pre-commit hook as it lashes with `typos`. +- Update `pre-commit` hooks to the latest versions. +- Update test dependencies: add `openssl` and `pytest-asyncio` to `environment-dev.yaml` and `pyproject.toml`. +- Add `.server.key` to `.gitignore`. +- Add a constraint `py<311` for `typing_extensions >=4.7.1` in files `environment.yaml`, `environment-dev.yaml`, `pyproject.toml`, and in `mailgun/client.py`. + +### Pull Requests Merged + +- [PR_27](https://github.com/mailgun/mailgun-python/pull/27) - Add Keys and Domain Keys API endpoints + ## [1.5.0] - 2025-12-11 ### Added From f2e068f8090053b7ff167ab2b381a7e9c6a550e1 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 15:22:07 +0200 Subject: [PATCH 10/15] docs: Add CONTRIBUTING.md, update README --- CONTRIBUTING.md | 24 ++++++++++++++++++++++++ README.md | 15 +++------------ 2 files changed, 27 insertions(+), 12 deletions(-) create mode 100644 CONTRIBUTING.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..27b0e47 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,24 @@ +### Contributing + +First off, thank you for considering contributing to Mailgun Python SDK. Contributions to the `mailgun-python` repository from the community are welcome and encouraged! + +Mailgun loves developers. You can be part of this project! + +This Python SDK is a great introduction to the open source world, check out the code! + +Please create issues for any major changes so it can be discussed first. For small bug fixes feel free to submit the PR directly. + +Make sure to run tests before creating the PR. + +When submitting a pull request, please include the purpose and implementation details. + +Feel free to ask anything, and contribute: + +- Fork the project. +- Create a new branch. +- Implement your feature or bug fix. +- Add documentation to it. +- Commit, push, open a pull request and voila. + +If you have suggestions on how to improve the guides, please submit an issue in our +[Official API Documentation](https://documentation.mailgun.com). diff --git a/README.md b/README.md index e8d5a43..e44b705 100644 --- a/README.md +++ b/README.md @@ -1529,20 +1529,11 @@ def get_user_details() -> None: ## Contribute -Mailgun loves developers. You can be part of this project! +See for details [CONTRIBUTING.md](CONTRIBUTING.md) -This Python SDK is a great introduction to the open source world, check out the code! +## Security -Feel free to ask anything, and contribute: - -- Fork the project. -- Create a new branch. -- Implement your feature or bug fix. -- Add documentation to it. -- Commit, push, open a pull request and voila. - -If you have suggestions on how to improve the guides, please submit an issue in our -[Official API Documentation](https://documentation.mailgun.com). +See for details [SECURITY.md](SECURITY.md) ## Contributors From 69aa6a936652929cda1c56ca6629a4e2b91946ab Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Tue, 23 Dec 2025 15:25:03 +0200 Subject: [PATCH 11/15] docs: Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index be7fb71..a61d668 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ We [keep a changelog.](http://keepachangelog.com/) - Add `Keys` and `Domain Keys` sections with examples to `README.md`. - Add docstrings to the test class `KeysTests` & `AsyncKeysTests` and their methods. + - Add `CONTRIBUTING.md`. - Tests: From 15ae568f1e5c06059bd49c198ad13dcb62230e10 Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Wed, 7 Jan 2026 13:02:17 +0200 Subject: [PATCH 12/15] ci: Add validate-pyproject pre-commit hook --- .pre-commit-config.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e5add27..e8fe61a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -39,6 +39,7 @@ repos: # ensure syntaxes are valid - id: check-toml - id: debug-statements + language_version: python3 # Makes sure files end in a newline and only a newline; - id: end-of-file-fixer - id: mixed-line-ending @@ -175,3 +176,8 @@ repos: # gfm = GitHub Flavored Markdown - mdformat-gfm - mdformat-black + + - repo: https://github.com/abravalheri/validate-pyproject + rev: v0.24.1 + hooks: + - id: validate-pyproject From 4f813b96602863f4ad335982dd825e50d76411ab Mon Sep 17 00:00:00 2001 From: Serhii Kupriienko <61395455+skupriienko@users.noreply.github.com> Date: Wed, 7 Jan 2026 15:13:20 +0200 Subject: [PATCH 13/15] style: Apply linters & formatters; add new pre-commit hooks --- .github/dependabot.yml | 8 ++ .pre-commit-config.yaml | 222 ++++++++++++++++++++++++++++------------ LICENSE | 1 - Makefile | 2 +- README.md | 24 ++--- mailgun/__init__.py | 1 - mailgun/client.py | 28 +++-- tests/__init__.py | 7 +- tests/tests.py | 21 +--- 9 files changed, 196 insertions(+), 118 deletions(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b101ec0..a966f0b 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -9,3 +9,11 @@ updates: python-packages: patterns: - "*" + + # Enable version updates for GitHub Actions + - package-ecosystem: 'github-actions' + # Workflow files stored in the default location of `.github/workflows` + # You don't need to specify `/.github/workflows` for `directory`. You can use `directory: "/"`. + directory: '/' + schedule: + interval: 'weekly' diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index e8fe61a..e5c5e2a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,130 +19,218 @@ ci: skip: [] submodules: false +# .pre-commit-config.yaml repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v6.0.0 hooks: + # Python-specific checks - id: check-ast + name: "🐍 python · Validate syntax" - id: check-builtin-literals - - id: fix-byte-order-marker - - id: check-case-conflict + name: "🐍 python · Use literal syntax" - id: check-docstring-first - - id: check-vcs-permalinks - # Fail if staged files are above a certain size. - # To add a large file, use 'git lfs track ; git add to track large files with - # git-lfs rather than committing them directly to the git history + name: "🐍 python · Validate docstring placement" + - id: debug-statements + name: "🐍 python · Detect debug statements" + language_version: python3 + + # Git workflow protection - id: check-added-large-files - args: [ "--maxkb=500" ] - # Fails if there are any ">>>>>" lines in files due to merge conflicts. + name: "🌳 git · Block large files" + args: ['--maxkb=500'] - id: check-merge-conflict - # ensure syntaxes are valid + name: "🌳 git · Detect conflict markers" + - id: forbid-new-submodules + name: "🌳 git · Prevent submodules" + - id: no-commit-to-branch + name: "🌳 git · Protect main branches" + args: ["--branch", "main", "--branch", "master"] + - id: check-vcs-permalinks + name: "🌳 git · Validate VCS links" + + # Filesystem and naming validation + - id: check-case-conflict + name: "📁 filesystem · Check case sensitivity" + - id: check-illegal-windows-names + name: "📁 filesystem · Validate Windows names" + - id: check-symlinks + name: "📁 filesystem · Check symlink validity" + - id: destroyed-symlinks + name: "📁 filesystem · Detect broken symlinks" + + # File format validation - id: check-toml - - id: debug-statements - language_version: python3 - # Makes sure files end in a newline and only a newline; + name: "📋 format · Validate TOML" + - id: check-yaml + name: "📋 format · Validate YAML" + exclude: conda.recipe/meta.yaml + + # File content fixes + - id: fix-byte-order-marker + name: "✨ fix · Remove BOM markers" - id: end-of-file-fixer + name: "✨ fix · Ensure final newline" - id: mixed-line-ending - # Trims trailing whitespace. Allow a single space on the end of .md lines for hard line breaks. + name: "✨ fix · Normalize line endings" - id: trailing-whitespace - args: [ --markdown-linebreak-ext=md ] - # Sort requirements in requirements.txt files. + name: "✨ fix · Trim trailing whitespace" + args: [--markdown-linebreak-ext=md] - id: requirements-txt-fixer - # Prevent committing directly to trunk - - id: no-commit-to-branch - args: [ "--branch=master" ] - # Detects the presence of private keys + name: "✨ fix · Sort requirements" + + # Security checks - id: detect-private-key + name: "🔒 security · Detect private keys" + # Git commit quality - repo: https://github.com/jorisroovers/gitlint rev: v0.19.1 hooks: - id: gitlint + name: "🌳 git · Validate commit format" + - repo: https://github.com/commitizen-tools/commitizen + rev: v4.11.1 + hooks: + - id: commitizen + name: "🌳 git · Validate commit message" + stages: [commit-msg] + + # Security scanning (grouped together) - repo: https://github.com/Yelp/detect-secrets rev: v1.5.0 hooks: - - id: detect-secrets # Detect accidentally committed secrets + - id: detect-secrets + name: "🔒 security · Detect committed secrets" + + - repo: https://github.com/gitleaks/gitleaks + rev: v8.30.0 + hooks: + - id: gitleaks + name: "🔒 security · Scan for hardcoded secrets" + + - repo: https://github.com/PyCQA/bandit + rev: 1.9.2 + hooks: + - id: bandit + name: "🔒 security · Check Python vulnerabilities" + args: ["-c", "pyproject.toml", "-r", "."] + exclude: ^tests/ + additional_dependencies: [".[toml]"] + + - repo: https://github.com/pypa/pip-audit + rev: v2.10.0 + hooks: + - id: pip-audit + name: "🔒 security · Audit Python dependencies" + args: ['--desc', 'on'] + + # Spelling and typos + - repo: https://github.com/crate-ci/typos + rev: v1.40.0 + hooks: + - id: typos + name: "📝 spelling · Check typos" # - repo: https://github.com/codespell-project/codespell # rev: v2.4.1 # hooks: # - id: codespell +# name: "📝 spelling · Fix common misspellings" # args: [--write] # exclude: ^tests + # CI/CD validation - repo: https://github.com/python-jsonschema/check-jsonschema rev: 0.36.0 hooks: + - id: check-dependabot + name: "🔧 ci/cd · Validate Dependabot config" - id: check-github-workflows + name: "🔧 ci/cd · Validate GitHub workflows" files: ^\.github/workflows/.*\.ya?ml$ - - repo: https://github.com/akaihola/darker - rev: v3.0.0 - hooks: - - id: darker - additional_dependencies: [black] - + # Python code formatting (order matters: autoflake → pyupgrade → darker/ruff) - repo: https://github.com/PyCQA/autoflake rev: v2.3.1 hooks: - id: autoflake + name: "🐍 format · Remove unused imports" args: - --in-place - --remove-all-unused-imports - --remove-unused-variable - --ignore-init-module-imports + - repo: https://github.com/asottile/pyupgrade + rev: v3.21.2 + hooks: + - id: pyupgrade + name: "🐍 format · Modernize syntax" + args: [--py310-plus, --keep-runtime-typing] + + - repo: https://github.com/akaihola/darker + rev: v3.0.0 + hooks: + - id: darker + name: "🐍 format · Format changed lines" + additional_dependencies: [black] + + - repo: https://github.com/charliermarsh/ruff-pre-commit + rev: v0.14.10 + hooks: + - id: ruff-check + name: "🐍 lint · Check with Ruff" + args: [--fix, --preview, --exit-non-zero-on-fix] + - id: ruff-format + name: "🐍 format · Format with Ruff" + + # Python linting (comprehensive checks) - repo: https://github.com/pycqa/flake8 rev: 7.3.0 hooks: - - id: flake8 + - id: flake8 + name: "🐍 lint · Check style (Flake8)" additional_dependencies: - radon - flake8-docstrings - Flake8-pyproject - # TODO: Remove tests when we will be ready to process tests + - flake8-bugbear + - flake8-comprehensions + - flake8-tidy-imports + - pycodestyle exclude: ^tests - repo: https://github.com/PyCQA/pylint rev: v4.0.4 hooks: - id: pylint + name: "🐍 lint · Check code quality" args: - --exit-zero - - repo: https://github.com/asottile/pyupgrade - rev: v3.21.2 + - repo: https://github.com/dosisod/refurb + rev: v2.2.0 hooks: - - id: pyupgrade - args: [--py310-plus, --keep-runtime-typing] - - - repo: https://github.com/charliermarsh/ruff-pre-commit - # Ruff version. - rev: v0.14.10 - hooks: - # Run the linter. - - id: ruff-check - args: [--fix, --preview, --exit-non-zero-on-fix] - # Run the formatter. - - id: ruff-format + - id: refurb + name: "🐍 lint · Suggest modernizations" + # Python documentation - repo: https://github.com/pycqa/pydocstyle rev: 6.3.0 hooks: - id: pydocstyle + name: "🐍 docs · Validate docstrings" args: [--select=D200,D213,D400,D415] additional_dependencies: [tomli] - - repo: https://github.com/dosisod/refurb - rev: v2.2.0 - hooks: - - id: refurb - + # Python type checking - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.19.1 hooks: - - id: mypy + - id: mypy + name: "🐍 types · Check with mypy" args: [--config-file=./pyproject.toml] additional_dependencies: - types-requests @@ -152,32 +240,36 @@ repos: - repo: https://github.com/RobertCraigie/pyright-python rev: v1.1.407 hooks: - - id: pyright - - - repo: https://github.com/PyCQA/bandit - rev: 1.9.2 - hooks: - - id: bandit - args: ["-c", "pyproject.toml", "-r", "."] - # ignore all tests, not just tests data - exclude: ^tests/ - additional_dependencies: [".[toml]"] + - id: pyright + name: "🐍 types · Check with pyright" - - repo: https://github.com/crate-ci/typos - rev: v1.40.0 + # Python project configuration + - repo: https://github.com/abravalheri/validate-pyproject + rev: v0.24.1 hooks: - - id: typos + - id: validate-pyproject + name: "🐍 config · Validate pyproject.toml" + # Markdown formatting - repo: https://github.com/executablebooks/mdformat rev: 1.0.0 hooks: - id: mdformat + name: "📝 markdown · Format files" additional_dependencies: - # gfm = GitHub Flavored Markdown - mdformat-gfm - mdformat-black + - mdformat-ruff +# TODO: Enable it for a single check +# - repo: https://github.com/tcort/markdown-link-check +# rev: v3.14.2 +# hooks: +# - id: markdown-link-check +# name: "📝 docs · Check markdown links" - - repo: https://github.com/abravalheri/validate-pyproject - rev: v0.24.1 + # Makefile linting + - repo: https://github.com/checkmake/checkmake + rev: 0.2.2 hooks: - - id: validate-pyproject + - id: checkmake + name: "🔧 build · Lint Makefile" diff --git a/LICENSE b/LICENSE index 6c01456..261eeb9 100644 --- a/LICENSE +++ b/LICENSE @@ -199,4 +199,3 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ->>>>>>> 6b1e6b18e0fd97cff8c7a65813d573875c1cdf0d diff --git a/Makefile b/Makefile index 9092db1..7e5a879 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: clean clean-env clean-test clean-pyc clean-build clean-other help dev test test-debug test-cov pre-commit lint format format-docs analyze docs +.PHONY: all clean clean-env clean-test clean-pyc clean-build clean-other help dev test test-debug test-cov pre-commit lint format format-docs analyze docs .DEFAULT_GOAL := help # The `.ONESHELL` and setting `SHELL` allows us to run commands that require diff --git a/README.md b/README.md index e44b705..eb21106 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ Check out all the resources and Python code examples in the official - [Create Mailgun SMTP credentials for a given domain](#create-mailgun-smtp-credentials-for-a-given-domain) - [Users](#users) - [Get users on an account](#get-users-on-an-account) - - [Get a user's details](#) + - [Get a user's details](#get-a-users-details) - [License](#license) - [Contribute](#contribute) - [Contributors](#contributors) @@ -1013,9 +1013,7 @@ def import_list_unsubs() -> None: :return: """ files = { - "unsubscribe2_csv": Path( - "mailgun/doc_tests/files/mailgun_unsubscribes.csv" - ).read_bytes() + "unsubscribe2_csv": Path("mailgun/doc_tests/files/mailgun_unsubscribes.csv").read_bytes() } req = client.unsubscribes_import.create(domain=domain, files=files) print(req.json()) @@ -1048,11 +1046,7 @@ def import_complaint_list() -> None: POST //complaints/import, Content-Type: multipart/form-data :return: """ - files = { - "complaints_csv": Path( - "mailgun/doc_tests/files/mailgun_complaints.csv" - ).read_bytes() - } + files = {"complaints_csv": Path("mailgun/doc_tests/files/mailgun_complaints.csv").read_bytes()} req = client.complaints_import.create(domain=domain, files=files) print(req.json()) ``` @@ -1237,9 +1231,7 @@ def get_all_versions() -> None: GET //templates/