diff --git a/src/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py index 8c8858e..624ee80 100644 --- a/src/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -1,3 +1,4 @@ +from base64 import b64decode, b64encode from typing import Literal from uuid import UUID @@ -5,7 +6,13 @@ from vaultwarden.models.exception_models import BitwardenError from vaultwarden.models.sync import ConnectToken, SyncData -from vaultwarden.utils.crypto import make_master_key +from vaultwarden.utils.crypto import ( + encrypt_asym, + encrypt_sym, + make_asym_key, + make_master_key, + make_org_key, +) from vaultwarden.utils.logger import log_raise_for_status @@ -132,6 +139,46 @@ def _api_request( method, path, headers=headers, **kwargs ) + def get_public_key_for_user(self, user_id: UUID | None = None) -> str: + used_id = user_id if user_id else self.sync().Profile.Id + resp = self.api_request("GET", f"api/users/{used_id}/public-key") + return resp.json().get("publicKey") + + def create_organisation( + self, + name: str, + email: str, + default_collection_name: str = "DefaultCollection", + ) -> Response: + if not self.connect_token: + raise BitwardenError("Not connected") + + public_key_user = b64decode(self.get_public_key_for_user()) + org_key = make_org_key() + protected_organisation_symetric_key = encrypt_asym( + org_key, public_key_user + ) + + collection = encrypt_sym( + bytes(default_collection_name, "utf-8"), org_key + ) + encrypted_priv, pub, _ = make_asym_key(self.connect_token.user_key) + + payload = { + "key": protected_organisation_symetric_key, + "collectionName": collection, + "name": name, + "billingEmail": email, + "initiationPath": "New organization creation in-product", + "keys": { + "publicKey": b64encode(pub).decode("utf-8"), + "encryptedPrivateKey": encrypted_priv, + }, + "planType": 0, + } + resp = self._api_request("POST", "api/organizations", json=payload) + return resp + def sync(self, force_refresh: bool = False) -> SyncData: if self._sync is None or force_refresh: resp = self._api_request("GET", "api/sync") diff --git a/src/vaultwarden/utils/crypto.py b/src/vaultwarden/utils/crypto.py index fa77b8e..e4e95ad 100644 --- a/src/vaultwarden/utils/crypto.py +++ b/src/vaultwarden/utils/crypto.py @@ -281,6 +281,10 @@ def make_asym_key(key, stretch=True): return encrypt_sym(private_key, key), public_key, private_key +def make_org_key(): + return token_bytes(64) + + def gen_password(length=32, alphabet=None): alphabet = string.ascii_letters + string.digits while True: diff --git a/tests/e2e/test_bitwarden.py b/tests/e2e/test_bitwarden.py index ff59ad5..6a312f0 100644 --- a/tests/e2e/test_bitwarden.py +++ b/tests/e2e/test_bitwarden.py @@ -123,6 +123,11 @@ def test_add_remove_collection_cipher(self): self.assertEqual(len(res[0].CollectionIds), 2) cipher.update_collection(old_colls) + def test_add_organsiation(self): + res = bitwarden.create_organisation("test_me", "me@example.com") + self.assertTrue(res.is_success) + + def test_deduplicate(self): # Todo build test fixtures and delete them at the end of the test return