Source code for acmeow.models.account
"""Account model for ACME protocol.
Manages ACME account state, keys, and persistence.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from urllib.parse import urlparse
from cryptography.hazmat.primitives.asymmetric import ec
from acmeow._internal.crypto import (
generate_account_key,
get_jwk,
get_jwk_thumbprint,
load_private_key,
serialize_private_key,
)
from acmeow.enums import AccountStatus
from acmeow.exceptions import AcmeAuthenticationError, AcmeConfigurationError
logger = logging.getLogger(__name__)
[docs]
class Account:
"""ACME account management.
Handles account creation, key management, and persistence of account
data to disk. Account keys use EC P-256 (SECP256R1) as recommended
by the ACME specification.
Args:
email: Account email address.
storage_path: Base directory for storing account data.
server_url: ACME server URL (used to organize storage by server).
Raises:
AcmeConfigurationError: If email is invalid.
"""
[docs]
def __init__(
self,
email: str,
storage_path: Path,
server_url: str,
) -> None:
if not self._is_valid_email(email):
raise AcmeConfigurationError(f"Invalid email address: {email}")
self._email = email
self._storage_path = storage_path
self._server_host = urlparse(server_url).hostname or "unknown"
self._key: ec.EllipticCurvePrivateKey | None = None
self._jwk: dict[str, str] | None = None
self._thumbprint: str | None = None
self._uri: str | None = None
self._status: AccountStatus | None = None
# Set up paths
self._account_dir = (
self._storage_path / "accounts" / self._server_host / self._email
)
self._key_path = self._account_dir / "keys" / f"{self._email}.key"
self._account_path = self._account_dir / "account.json"
# Try to load existing account
self._load_existing_account()
@property
def email(self) -> str:
"""Account email address."""
return self._email
@property
def key(self) -> ec.EllipticCurvePrivateKey:
"""Account private key.
Raises:
AcmeAuthenticationError: If key is not initialized.
"""
if self._key is None:
raise AcmeAuthenticationError("Account key not initialized")
return self._key
@property
def jwk(self) -> dict[str, str]:
"""JSON Web Key representation of public key.
Raises:
AcmeAuthenticationError: If key is not initialized.
"""
if self._jwk is None:
raise AcmeAuthenticationError("Account JWK not initialized")
return self._jwk
@property
def thumbprint(self) -> str:
"""JWK thumbprint for key authorization.
Raises:
AcmeAuthenticationError: If key is not initialized.
"""
if self._thumbprint is None:
raise AcmeAuthenticationError("Account thumbprint not initialized")
return self._thumbprint
@property
def uri(self) -> str | None:
"""Account URI from the ACME server."""
return self._uri
@property
def status(self) -> AccountStatus | None:
"""Account status."""
return self._status
@property
def exists(self) -> bool:
"""Check if account data exists on disk."""
return self._account_path.exists() and self._key_path.exists()
@property
def is_valid(self) -> bool:
"""Check if account is valid and ready for use."""
return (
self._key is not None
and self._uri is not None
and self._status == AccountStatus.VALID
)
@property
def contact(self) -> str:
"""Contact URL for the account (mailto: format)."""
return f"mailto:{self._email}"
[docs]
def create_key(self) -> None:
"""Generate a new account key.
Creates a new EC P-256 private key and saves it to disk.
Raises:
AcmeAuthenticationError: If key already exists.
"""
if self._key is not None:
raise AcmeAuthenticationError("Account key already exists")
logger.info("Generating new account key for %s", self._email)
self._key = generate_account_key()
self._jwk = get_jwk(self._key)
self._thumbprint = get_jwk_thumbprint(self._jwk)
# Save key to disk
self._key_path.parent.mkdir(parents=True, exist_ok=True)
self._key_path.write_bytes(serialize_private_key(self._key))
logger.debug("Account key saved to %s", self._key_path)
[docs]
def save(self, uri: str, status: str) -> None:
"""Save account data after registration.
Args:
uri: Account URI from the ACME server.
status: Account status string.
"""
self._uri = uri
self._status = AccountStatus(status)
account_data = {
"email": self._email,
"uri": self._uri,
"status": self._status.value,
}
self._account_dir.mkdir(parents=True, exist_ok=True)
self._account_path.write_text(json.dumps(account_data, indent=2))
logger.info("Account saved: %s", self._uri)
[docs]
def update_key(self, new_key: ec.EllipticCurvePrivateKey) -> None:
"""Update the account key after a key rollover.
Args:
new_key: The new EC P-256 private key.
"""
self._key = new_key
self._jwk = get_jwk(self._key)
self._thumbprint = get_jwk_thumbprint(self._jwk)
# Save new key to disk
self._key_path.parent.mkdir(parents=True, exist_ok=True)
self._key_path.write_bytes(serialize_private_key(self._key))
logger.info("Account key updated and saved to %s", self._key_path)
def _load_existing_account(self) -> None:
"""Load existing account data and key from disk."""
if not self.exists:
logger.debug("No existing account found for %s", self._email)
return
try:
# Load key
key_data = self._key_path.read_bytes()
loaded_key = load_private_key(key_data)
if not isinstance(loaded_key, ec.EllipticCurvePrivateKey):
raise AcmeAuthenticationError("Invalid account key type")
self._key = loaded_key
self._jwk = get_jwk(self._key)
self._thumbprint = get_jwk_thumbprint(self._jwk)
# Load account data
account_data = json.loads(self._account_path.read_text())
self._uri = account_data.get("uri")
status = account_data.get("status")
if status:
self._status = AccountStatus(status)
logger.info("Loaded existing account: %s", self._uri)
except Exception as e:
logger.warning("Failed to load existing account: %s", e)
self._reset()
def _reset(self) -> None:
"""Reset account state."""
self._key = None
self._jwk = None
self._thumbprint = None
self._uri = None
self._status = None
@staticmethod
def _is_valid_email(email: str) -> bool:
"""Validate email address format.
Args:
email: Email address to validate.
Returns:
True if email appears valid.
"""
from email.utils import parseaddr
_, addr = parseaddr(email)
return "@" in addr and len(addr) > 3
[docs]
def get_certificate_paths(self, common_name: str) -> tuple[Path, Path]:
"""Get the paths for storing a certificate and its key.
Args:
common_name: Certificate common name.
Returns:
Tuple of (certificate_path, key_path).
"""
# Sanitize filename
safe_name = "".join(
c if c.isalnum() or c in ".-_" else "_"
for c in common_name
)
cert_dir = self._storage_path / "certificates"
return (
cert_dir / f"{safe_name}.crt",
cert_dir / f"{safe_name}.key",
)