diff --git a/src/mcp/shared/auth.py b/src/mcp/shared/auth.py index bf03a8b8d..984afb22b 100644 --- a/src/mcp/shared/auth.py +++ b/src/mcp/shared/auth.py @@ -1,4 +1,5 @@ from typing import Any, Literal +from urllib.parse import urlparse from pydantic import AnyHttpUrl, AnyUrl, BaseModel, Field, field_validator @@ -79,11 +80,51 @@ def validate_scope(self, requested_scope: str | None) -> list[str] | None: return requested_scopes # pragma: no cover def validate_redirect_uri(self, redirect_uri: AnyUrl | None) -> AnyUrl: + """Validate redirect_uri against client's registered URIs. + + Implements RFC 8252 Section 7.3 for loopback addresses: + "The authorization server MUST allow any port to be specified at the time + of the request for loopback IP redirect URIs, to accommodate clients that + obtain an available ephemeral port from the operating system at the time + of the request." + + For loopback addresses (localhost, 127.0.0.1, ::1), the port is ignored + during validation. For all other URIs, exact matching is required. + + Args: + redirect_uri: The redirect_uri from the authorization request + + Returns: + The validated redirect_uri + + Raises: + InvalidRedirectUriError: If redirect_uri is invalid or not registered + """ if redirect_uri is not None: - # Validate redirect_uri against client's registered redirect URIs - if self.redirect_uris is None or redirect_uri not in self.redirect_uris: - raise InvalidRedirectUriError(f"Redirect URI '{redirect_uri}' not registered for client") - return redirect_uri + if self.redirect_uris is None: + raise InvalidRedirectUriError("No redirect URIs registered for client") + + # Try exact match first (fast path) + if redirect_uri in self.redirect_uris: + return redirect_uri + + # RFC 8252 loopback matching: ignore port for localhost/127.0.0.1/::1 + requested_str = str(redirect_uri) + parsed_requested = urlparse(requested_str) + is_loopback = parsed_requested.hostname in ("localhost", "127.0.0.1", "::1", "[::1]") + + if is_loopback: + for registered in self.redirect_uris: + parsed_registered = urlparse(str(registered)) + if parsed_registered.hostname not in ("localhost", "127.0.0.1", "::1", "[::1]"): + continue + # Match scheme, hostname, and path - port can differ per RFC 8252 + if (parsed_requested.scheme == parsed_registered.scheme and + parsed_requested.hostname == parsed_registered.hostname and + (parsed_requested.path or "/") == (parsed_registered.path or "/")): + return redirect_uri + + raise InvalidRedirectUriError(f"Redirect URI '{redirect_uri}' not registered for client") elif self.redirect_uris is not None and len(self.redirect_uris) == 1: return self.redirect_uris[0] else: diff --git a/tests/shared/test_rfc8252_redirect_uri.py b/tests/shared/test_rfc8252_redirect_uri.py new file mode 100644 index 000000000..eba35e40b --- /dev/null +++ b/tests/shared/test_rfc8252_redirect_uri.py @@ -0,0 +1,184 @@ +"""Tests for RFC 8252 Section 7.3 compliant redirect_uri validation. + +RFC 8252 Section 7.3 states: +"The authorization server MUST allow any port to be specified at the time of +the request for loopback IP redirect URIs, to accommodate clients that obtain +an available ephemeral port from the operating system at the time of the request." +""" + +import pytest +from pydantic import AnyUrl + +from mcp.shared.auth import InvalidRedirectUriError, OAuthClientMetadata + + +def test_exact_match_non_loopback(): + """Non-loopback URIs must match exactly.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("https://example.com:8080/callback")] + ) + + # Exact match should work + result = client.validate_redirect_uri(AnyUrl("https://example.com:8080/callback")) + assert str(result) == "https://example.com:8080/callback" + + # Different port should fail + with pytest.raises(InvalidRedirectUriError): + client.validate_redirect_uri(AnyUrl("https://example.com:9090/callback")) + + +def test_loopback_localhost_port_ignored(): + """Localhost loopback URIs should ignore port per RFC 8252.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/callback")] + ) + + # Different port should work for loopback + result = client.validate_redirect_uri(AnyUrl("http://localhost:9999/callback")) + assert str(result) == "http://localhost:9999/callback" + + # Same port should also work + result = client.validate_redirect_uri(AnyUrl("http://localhost:8080/callback")) + assert str(result) == "http://localhost:8080/callback" + + +def test_loopback_ipv4_port_ignored(): + """127.0.0.1 loopback URIs should ignore port per RFC 8252.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://127.0.0.1:5000/")] + ) + + # Different port should work for loopback + result = client.validate_redirect_uri(AnyUrl("http://127.0.0.1:60847/")) + assert str(result) == "http://127.0.0.1:60847/" + + +def test_loopback_ipv6_port_ignored(): + """[::1] loopback URIs should ignore port per RFC 8252.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://[::1]:8080/")] + ) + + # Different port should work for loopback + result = client.validate_redirect_uri(AnyUrl("http://[::1]:9999/")) + assert str(result) == "http://[::1]:9999/" + + +def test_loopback_scheme_must_match(): + """Loopback URIs must still match scheme.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/callback")] + ) + + # HTTPS vs HTTP should fail + with pytest.raises(InvalidRedirectUriError): + client.validate_redirect_uri(AnyUrl("https://localhost:9999/callback")) + + +def test_loopback_path_must_match(): + """Loopback URIs must still match path.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/callback")] + ) + + # Different path should fail + with pytest.raises(InvalidRedirectUriError): + client.validate_redirect_uri(AnyUrl("http://localhost:9999/other")) + + +def test_loopback_hostname_must_match(): + """Loopback hostname must match (can't mix localhost and 127.0.0.1).""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/callback")] + ) + + # Different loopback hostname should fail + with pytest.raises(InvalidRedirectUriError): + client.validate_redirect_uri(AnyUrl("http://127.0.0.1:9999/callback")) + + +def test_multiple_redirect_uris_loopback(): + """Should match against any registered loopback URI.""" + client = OAuthClientMetadata( + redirect_uris=[ + AnyUrl("http://localhost:8080/callback"), + AnyUrl("http://127.0.0.1:5000/auth"), + ] + ) + + # Should match first with different port + result = client.validate_redirect_uri(AnyUrl("http://localhost:9999/callback")) + assert str(result) == "http://localhost:9999/callback" + + # Should match second with different port + result = client.validate_redirect_uri(AnyUrl("http://127.0.0.1:6000/auth")) + assert str(result) == "http://127.0.0.1:6000/auth" + + +def test_mixed_loopback_and_non_loopback(): + """Client can have both loopback and non-loopback URIs.""" + client = OAuthClientMetadata( + redirect_uris=[ + AnyUrl("http://localhost:8080/callback"), + AnyUrl("https://example.com:8080/callback"), + ] + ) + + # Loopback with different port should work + result = client.validate_redirect_uri(AnyUrl("http://localhost:9999/callback")) + assert str(result) == "http://localhost:9999/callback" + + # Non-loopback with different port should fail + with pytest.raises(InvalidRedirectUriError): + client.validate_redirect_uri(AnyUrl("https://example.com:9999/callback")) + + # Non-loopback exact match should work + result = client.validate_redirect_uri(AnyUrl("https://example.com:8080/callback")) + assert str(result) == "https://example.com:8080/callback" + + +def test_no_redirect_uris_registered(): + """Should fail if no redirect URIs are registered.""" + client = OAuthClientMetadata(redirect_uris=None) + + with pytest.raises(InvalidRedirectUriError, match="No redirect URIs registered"): + client.validate_redirect_uri(AnyUrl("http://localhost:8080/")) + + +def test_single_registered_uri_omit_request(): + """If only one URI registered and none provided, use the registered one.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/callback")] + ) + + result = client.validate_redirect_uri(None) + assert str(result) == "http://localhost:8080/callback" + + +def test_multiple_registered_uris_omit_request(): + """Must specify redirect_uri when multiple are registered.""" + client = OAuthClientMetadata( + redirect_uris=[ + AnyUrl("http://localhost:8080/callback"), + AnyUrl("http://127.0.0.1:5000/auth"), + ] + ) + + with pytest.raises(InvalidRedirectUriError, match="must be specified"): + client.validate_redirect_uri(None) + + +def test_root_path_normalization(): + """Empty path should be treated as '/'.""" + client = OAuthClientMetadata( + redirect_uris=[AnyUrl("http://localhost:8080/")] + ) + + # Both should match + result = client.validate_redirect_uri(AnyUrl("http://localhost:9999/")) + assert str(result) == "http://localhost:9999/" + + # Without trailing slash - Pydantic normalizes to include slash + result = client.validate_redirect_uri(AnyUrl("http://localhost:9999")) + # AnyUrl normalizes URLs, so both forms match + assert "localhost:9999" in str(result)