"""Main ACME client implementation.
Provides the AcmeClient class for automated certificate management
using the ACME protocol (RFC 8555).
"""
from __future__ import annotations
import hashlib
import hmac
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import Any
from cryptography.hazmat.primitives import serialization
from acmeow._internal.crypto import (
create_csr,
generate_private_key,
get_key_authorization,
serialize_private_key,
)
from acmeow._internal.dns import DnsConfig, DnsVerifier
from acmeow._internal.encoding import base64url_encode, base64url_encode_json
from acmeow._internal.http import AcmeHttpClient, RetryConfig
from acmeow.enums import (
AuthorizationStatus,
ChallengeType,
KeyType,
OrderStatus,
RevocationReason,
)
from acmeow.exceptions import (
AcmeAuthenticationError,
AcmeAuthorizationError,
AcmeCertificateError,
AcmeConfigurationError,
AcmeDnsError,
AcmeNetworkError,
AcmeOrderError,
AcmeTimeoutError,
)
from acmeow.handlers.base import ChallengeHandler
from acmeow.models.account import Account
from acmeow.models.authorization import Authorization
from acmeow.models.identifier import Identifier
from acmeow.models.order import Order
logger = logging.getLogger(__name__)
[docs]
class AcmeClient:
"""ACME protocol client for automated certificate management.
Implements the ACME protocol (RFC 8555) for obtaining SSL/TLS
certificates from ACME-compliant certificate authorities like
Let's Encrypt.
Args:
server_url: ACME server directory URL.
email: Account email address.
storage_path: Directory for storing account and certificate data.
proxy_url: URL to the proxy. Default None.
verify_ssl: Whether to verify SSL certificates. Default True.
timeout: Request timeout in seconds. Default 30.
retry_config: Retry configuration for transient failures.
Example:
>>> client = AcmeClient(
... server_url="https://acme-staging-v02.api.letsencrypt.org/directory",
... email="admin@example.com",
... storage_path=Path("./acme_data"),
... )
>>> client.create_account()
>>> order = client.create_order([Identifier.dns("example.com")])
>>> # Complete challenges...
>>> client.finalize_order(KeyType.EC256)
>>> cert, key = client.get_certificate()
Raises:
AcmeConfigurationError: If configuration is invalid.
AcmeNetworkError: If server communication fails.
"""
[docs]
def __init__(
self,
server_url: str,
email: str,
storage_path: Path | str,
proxy_url: str | None = None,
verify_ssl: bool = True,
timeout: int = 30,
retry_config: RetryConfig | None = None,
) -> None:
self._server_url = server_url.rstrip("/")
self._email = email
self._storage_path = Path(storage_path)
self._verify_ssl = verify_ssl
# HTTP client with retry support
self._http = AcmeHttpClient(
proxy_url=proxy_url,
verify_ssl=verify_ssl,
timeout=timeout,
retry_config=retry_config,
)
# ACME directory endpoints
self._directory: dict[str, Any] = {}
# Account
self._account: Account | None = None
# Current order
self._order: Order | None = None
# External Account Binding (EAB)
self._eab_kid: str | None = None
self._eab_hmac_key: bytes | None = None
# DNS verification
self._dns_config: DnsConfig | None = None
# Fetch directory on initialization
self._fetch_directory()
@property
def server_url(self) -> str:
"""ACME server directory URL."""
return self._server_url
@property
def email(self) -> str:
"""Account email address."""
return self._email
@property
def account(self) -> Account | None:
"""Current account, or None if not created."""
return self._account
@property
def order(self) -> Order | None:
"""Current order, or None if not created."""
return self._order
[docs]
def set_dns_config(self, config: DnsConfig) -> None:
"""Set DNS verification configuration.
When configured, DNS propagation is verified before notifying the
ACME server during DNS-01 challenges.
Args:
config: DNS verification configuration.
"""
self._dns_config = config
logger.info("DNS verification configured with %d nameservers", len(config.nameservers))
[docs]
def set_external_account_binding(self, kid: str, hmac_key: str) -> None:
"""Set External Account Binding (EAB) credentials.
Some ACME servers require EAB to link the ACME account to an
existing account in an external system.
Args:
kid: Key identifier from the CA.
hmac_key: Base64url-encoded HMAC key from the CA.
"""
from acmeow._internal.encoding import base64url_decode
self._eab_kid = kid
self._eab_hmac_key = base64url_decode(hmac_key)
logger.info("External Account Binding configured with kid: %s", kid)
def _fetch_directory(self) -> None:
"""Fetch ACME directory from server.
Raises:
AcmeNetworkError: If directory cannot be fetched.
"""
logger.info("Fetching ACME directory from %s", self._server_url)
response = self._http.get(self._server_url)
try:
self._directory = response.json()
except ValueError as e:
raise AcmeNetworkError(f"Invalid directory response: {e}", e) from e
# Configure nonce URL
new_nonce_url = self._directory.get("newNonce")
if not new_nonce_url:
raise AcmeNetworkError("Directory missing newNonce URL")
self._http.set_nonce_url(new_nonce_url)
logger.debug("ACME directory: %s", self._directory)
[docs]
def create_account(self, terms_agreed: bool = True) -> Account:
"""Create or retrieve an ACME account.
Creates a new account if one doesn't exist, or retrieves the
existing account if already registered.
Args:
terms_agreed: Whether the user agrees to the CA's terms of service.
Must be True to create an account.
Returns:
The Account object.
Raises:
AcmeAuthenticationError: If account creation fails.
AcmeConfigurationError: If terms not agreed.
"""
if not terms_agreed:
raise AcmeConfigurationError("Terms of service must be agreed to")
# Initialize account
self._account = Account(
email=self._email,
storage_path=self._storage_path,
server_url=self._server_url,
)
# Check if account already exists and is valid
if self._account.is_valid:
logger.info("Using existing account: %s", self._account.uri)
# Try to load any saved order
self._load_order()
return self._account
# Create new key if needed
if not self._account.exists:
self._account.create_key()
# Build registration payload
payload: dict[str, Any] = {
"termsOfServiceAgreed": True,
"contact": [self._account.contact],
}
# Add EAB if required
if self._is_eab_required():
if not self._eab_kid or not self._eab_hmac_key:
raise AcmeConfigurationError(
"External Account Binding required but not configured"
)
payload["externalAccountBinding"] = self._create_eab_payload()
# Register account
new_account_url = self._directory.get("newAccount")
if not new_account_url:
raise AcmeNetworkError("Directory missing newAccount URL")
logger.info("Creating account for %s", self._email)
response = self._http.post(
new_account_url,
payload,
self._account.key,
jwk=self._account.jwk,
)
# Get account URL from Location header
account_uri = response.headers.get("Location")
if not account_uri:
raise AcmeAuthenticationError("Server did not return account URL")
# Parse response
data = response.json()
status = data.get("status", "valid")
# Save account
self._account.save(account_uri, status)
logger.info("Account created: %s (status: %s)", account_uri, status)
return self._account
def _is_eab_required(self) -> bool:
"""Check if External Account Binding is required.
Returns:
True if EAB is required by the server.
"""
meta = self._directory.get("meta", {})
return bool(meta.get("externalAccountRequired", False))
def _create_eab_payload(self) -> dict[str, str]:
"""Create External Account Binding JWS payload.
Returns:
EAB JWS structure.
"""
if not self._account or not self._eab_kid or not self._eab_hmac_key:
raise AcmeConfigurationError("EAB not properly configured")
# Protected header for EAB (HS256)
protected = {
"alg": "HS256",
"kid": self._eab_kid,
"url": self._directory["newAccount"],
}
protected_b64 = base64url_encode_json(protected)
# Payload is the account JWK
payload_b64 = base64url_encode_json(self._account.jwk)
# Sign with HMAC-SHA256
signing_input = f"{protected_b64}.{payload_b64}".encode("ascii")
signature = hmac.new(self._eab_hmac_key, signing_input, hashlib.sha256).digest()
signature_b64 = base64url_encode(signature)
return {
"protected": protected_b64,
"payload": payload_b64,
"signature": signature_b64,
}
[docs]
def create_order(
self,
identifiers: list[Identifier],
save: bool = True,
) -> Order:
"""Create a new certificate order.
Args:
identifiers: List of identifiers (domains/IPs) for the certificate.
save: Whether to save order state for recovery. Default True.
Returns:
The Order object.
Raises:
AcmeOrderError: If order creation fails.
AcmeAuthenticationError: If not authenticated.
"""
if not self._account or not self._account.is_valid:
raise AcmeAuthenticationError("Account not created or invalid")
if not identifiers:
raise AcmeConfigurationError("At least one identifier required")
new_order_url = self._directory.get("newOrder")
if not new_order_url:
raise AcmeNetworkError("Directory missing newOrder URL")
payload = {
"identifiers": [i.to_dict() for i in identifiers],
}
logger.info("Creating order for: %s", [i.value for i in identifiers])
response = self._http.post(
new_order_url,
payload,
self._account.key,
kid=self._account.uri,
)
# Get order URL from Location header
order_url = response.headers.get("Location")
if not order_url:
raise AcmeOrderError("Server did not return order URL")
# Parse response
data = response.json()
self._order = Order.from_dict(data, order_url)
# Fetch authorizations
self._fetch_authorizations()
# Save order for recovery
if save:
self._save_order()
logger.info("Order created: %s (status: %s)", order_url, self._order.status)
return self._order
[docs]
def load_order(self) -> Order | None:
"""Load a previously saved order.
Attempts to load and resume an incomplete order that was saved
to disk. Useful for recovering from interruptions.
Returns:
The Order if found and still valid, None otherwise.
"""
self._load_order()
return self._order
def _get_order_path(self) -> Path:
"""Get the path for storing order state."""
return self._storage_path / "orders" / "current_order.json"
def _save_order(self) -> None:
"""Save current order state to disk for recovery."""
if not self._order:
return
order_path = self._get_order_path()
order_path.parent.mkdir(parents=True, exist_ok=True)
order_data = {
"url": self._order.url,
"status": self._order.status.value,
"identifiers": [i.to_dict() for i in self._order.identifiers],
"finalize_url": self._order.finalize_url,
"expires": self._order.expires,
"certificate_url": self._order.certificate_url,
}
order_path.write_text(json.dumps(order_data, indent=2))
logger.debug("Order saved to %s", order_path)
def _load_order(self) -> None:
"""Load order state from disk."""
order_path = self._get_order_path()
if not order_path.exists():
return
try:
order_data = json.loads(order_path.read_text())
# Check if order is still relevant
status = OrderStatus(order_data.get("status", "invalid"))
if status in (OrderStatus.INVALID, OrderStatus.VALID):
# Order is terminal, delete it
order_path.unlink()
return
# Recreate order
self._order = Order(
status=status,
url=order_data["url"],
identifiers=tuple(
Identifier.from_dict(i) for i in order_data["identifiers"]
),
finalize_url=order_data["finalize_url"],
expires=order_data.get("expires"),
certificate_url=order_data.get("certificate_url"),
)
# Refresh from server to get current status
self._refresh_order()
self._fetch_authorizations()
logger.info("Loaded order from disk: %s (status: %s)", self._order.url, self._order.status)
except (KeyError, ValueError, json.JSONDecodeError) as e:
logger.warning("Failed to load order: %s", e)
order_path.unlink(missing_ok=True)
def _clear_order(self) -> None:
"""Clear saved order state."""
order_path = self._get_order_path()
order_path.unlink(missing_ok=True)
def _fetch_authorizations(self) -> None:
"""Fetch all authorizations for the current order."""
if not self._order or not self._account or not self._account.uri:
return
data = self._http.post_as_get(
self._order.url, self._account.key, self._account.uri,
).json()
auth_urls = data.get("authorizations", [])
self._order.authorizations.clear()
for auth_url in auth_urls:
response = self._http.post_as_get(
auth_url,
self._account.key,
self._account.uri,
)
auth_data = response.json()
auth = Authorization.from_dict(auth_data, auth_url)
self._order.authorizations.append(auth)
logger.debug("Fetched %d authorizations", len(self._order.authorizations))
[docs]
def complete_challenges(
self,
handler: ChallengeHandler,
challenge_type: ChallengeType = ChallengeType.DNS,
propagation_delay: int = 0,
verify_dns: bool = True,
dns_timeout: int = 300,
parallel: bool = False,
max_workers: int | None = None,
) -> None:
"""Complete all pending challenges using the provided handler.
This method sets up challenge responses, optionally verifies DNS
propagation (for DNS-01), notifies the ACME server, and waits for
validation to complete.
Args:
handler: Challenge handler for deploying/cleaning up responses.
challenge_type: Type of challenge to complete. Default DNS-01.
propagation_delay: Seconds to wait after setup before notifying server.
If handler has propagation_delay attribute, that value is used.
verify_dns: Whether to verify DNS propagation before notifying server.
Only applies to DNS-01 challenges. Default True.
dns_timeout: Maximum time to wait for DNS propagation in seconds.
Default 300 (5 minutes).
parallel: Whether to set up and clean up challenges in parallel.
Default False. Server notification and polling remain sequential.
max_workers: Maximum number of parallel workers. Default None (auto).
Only used when parallel=True.
Raises:
AcmeAuthorizationError: If challenge validation fails.
AcmeOrderError: If no order exists.
AcmeDnsError: If DNS propagation verification fails.
"""
if not self._order:
raise AcmeOrderError("No order exists")
if not self._account:
raise AcmeAuthenticationError("Account not created")
# Get propagation delay from handler if available
delay = getattr(handler, "propagation_delay", propagation_delay)
# Track challenges we set up for cleanup
setup_challenges: list[tuple[str, str, str]] = [] # domain, token, key_auth
try:
# Collect challenges to set up
challenges_to_setup: list[tuple[str, str, str]] = []
for auth in self._order.authorizations:
if auth.is_valid:
logger.debug("Authorization already valid: %s", auth.domain)
continue
challenge = auth.get_challenge(challenge_type)
if not challenge:
raise AcmeAuthorizationError(
auth.domain,
f"No {challenge_type.value} challenge available",
)
# Compute key authorization
key_auth = get_key_authorization(
challenge.token,
self._account.thumbprint,
)
challenges_to_setup.append((auth.domain, challenge.token, key_auth))
# Set up challenges (parallel or sequential)
if parallel and len(challenges_to_setup) > 1:
setup_challenges = self._setup_challenges_parallel(
handler, challenges_to_setup, challenge_type, max_workers
)
else:
setup_challenges = self._setup_challenges_sequential(
handler, challenges_to_setup, challenge_type
)
# Wait for propagation (simple delay)
if delay > 0 and setup_challenges:
logger.info("Waiting %d seconds for propagation", delay)
time.sleep(delay)
# Verify DNS propagation if configured
if (
challenge_type == ChallengeType.DNS
and verify_dns
and self._dns_config
and setup_challenges
):
self._verify_dns_propagation(setup_challenges, dns_timeout)
# Respond to all challenges (sequential - server expects order)
for auth in self._order.authorizations:
if auth.is_valid:
continue
challenge = auth.get_challenge(challenge_type)
if not challenge:
continue
logger.info("Responding to challenge for %s", auth.domain)
self._respond_to_challenge(challenge.url)
# Poll for validation
self._poll_authorizations()
# Update saved order
self._save_order()
finally:
# Clean up challenges (parallel or sequential)
if parallel and len(setup_challenges) > 1:
self._cleanup_challenges_parallel(handler, setup_challenges, max_workers)
else:
self._cleanup_challenges_sequential(handler, setup_challenges)
def _setup_challenges_sequential(
self,
handler: ChallengeHandler,
challenges: list[tuple[str, str, str]],
challenge_type: ChallengeType,
) -> list[tuple[str, str, str]]:
"""Set up challenges sequentially.
Args:
handler: Challenge handler.
challenges: List of (domain, token, key_auth) tuples.
challenge_type: The challenge type.
Returns:
List of successfully set up challenges.
"""
setup_challenges: list[tuple[str, str, str]] = []
for domain, token, key_auth in challenges:
logger.info(
"Setting up %s challenge for %s",
challenge_type.value,
domain,
)
handler.setup(domain, token, key_auth)
setup_challenges.append((domain, token, key_auth))
return setup_challenges
def _setup_challenges_parallel(
self,
handler: ChallengeHandler,
challenges: list[tuple[str, str, str]],
challenge_type: ChallengeType,
max_workers: int | None,
) -> list[tuple[str, str, str]]:
"""Set up challenges in parallel using ThreadPoolExecutor.
Args:
handler: Challenge handler.
challenges: List of (domain, token, key_auth) tuples.
challenge_type: The challenge type.
max_workers: Maximum number of workers.
Returns:
List of successfully set up challenges.
"""
setup_challenges: list[tuple[str, str, str]] = []
errors: list[tuple[str, Exception]] = []
logger.info(
"Setting up %d %s challenges in parallel",
len(challenges),
challenge_type.value,
)
def setup_one(challenge_info: tuple[str, str, str]) -> tuple[str, str, str]:
domain, token, key_auth = challenge_info
logger.info(
"Setting up %s challenge for %s",
challenge_type.value,
domain,
)
handler.setup(domain, token, key_auth)
return challenge_info
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(setup_one, c): c[0] for c in challenges
}
for future in as_completed(futures):
domain = futures[future]
try:
result = future.result()
setup_challenges.append(result)
except Exception as e:
logger.error("Failed to setup challenge for %s: %s", domain, e)
errors.append((domain, e))
if errors:
# Raise the first error, but log all
domain, error = errors[0]
raise AcmeAuthorizationError(
domain,
f"Challenge setup failed: {error}",
)
return setup_challenges
def _cleanup_challenges_sequential(
self,
handler: ChallengeHandler,
challenges: list[tuple[str, str, str]],
) -> None:
"""Clean up challenges sequentially.
Args:
handler: Challenge handler.
challenges: List of (domain, token, key_auth) tuples.
"""
for domain, token, _ in challenges:
try:
handler.cleanup(domain, token)
except Exception as e:
logger.warning("Challenge cleanup failed for %s: %s", domain, e)
def _cleanup_challenges_parallel(
self,
handler: ChallengeHandler,
challenges: list[tuple[str, str, str]],
max_workers: int | None,
) -> None:
"""Clean up challenges in parallel.
Args:
handler: Challenge handler.
challenges: List of (domain, token, key_auth) tuples.
max_workers: Maximum number of workers.
"""
logger.debug("Cleaning up %d challenges in parallel", len(challenges))
def cleanup_one(challenge_info: tuple[str, str, str]) -> None:
domain, token, _ = challenge_info
handler.cleanup(domain, token)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(cleanup_one, c): c[0] for c in challenges
}
for future in as_completed(futures):
domain = futures[future]
try:
future.result()
except Exception as e:
logger.warning("Challenge cleanup failed for %s: %s", domain, e)
def _verify_dns_propagation(
self,
challenges: list[tuple[str, str, str]],
timeout: int,
) -> None:
"""Verify DNS propagation for all challenges.
Args:
challenges: List of (domain, token, key_auth) tuples.
timeout: Maximum time to wait for propagation.
Raises:
AcmeDnsError: If any DNS record fails to propagate.
"""
if not self._dns_config:
return
verifier = DnsVerifier(self._dns_config)
for domain, _, key_auth in challenges:
# Compute expected TXT value
digest = hashlib.sha256(key_auth.encode("utf-8")).digest()
expected_value = base64url_encode(digest)
record_name = f"_acme-challenge.{domain}"
logger.info("Verifying DNS propagation for %s", record_name)
if not verifier.verify_txt_record(record_name, expected_value, max_wait=timeout):
raise AcmeDnsError(
domain,
f"TXT record {record_name} did not propagate within {timeout}s",
)
def _respond_to_challenge(self, challenge_url: str) -> None:
"""Notify the server that a challenge is ready.
Args:
challenge_url: URL of the challenge to respond to.
"""
if not self._account:
raise AcmeAuthenticationError("Account not created")
self._http.post(
challenge_url,
{}, # Empty payload signals ready
self._account.key,
kid=self._account.uri,
)
def _poll_authorizations(
self,
max_attempts: int = 30,
interval: int = 2,
) -> None:
"""Poll authorizations until all are valid or failed.
Args:
max_attempts: Maximum polling attempts.
interval: Seconds between polls.
Raises:
AcmeAuthorizationError: If any authorization fails.
AcmeTimeoutError: If polling times out.
"""
if not self._order or not self._account or not self._account.uri:
return
for attempt in range(max_attempts):
all_valid = True
any_failed = False
failed_domain = ""
failed_error = ""
for auth in self._order.authorizations:
# Fetch current status
response = self._http.post_as_get(
auth.url,
self._account.key,
self._account.uri,
)
data = response.json()
status = AuthorizationStatus(data.get("status", "pending"))
if status == AuthorizationStatus.INVALID:
any_failed = True
failed_domain = auth.domain
# Try to get error details from challenge
for c in data.get("challenges", []):
if c.get("error"):
failed_error = c["error"].get("detail", "Unknown error")
break
break
elif status != AuthorizationStatus.VALID:
all_valid = False
if any_failed:
raise AcmeAuthorizationError(failed_domain, failed_error)
if all_valid:
logger.info("All authorizations valid")
return
logger.debug(
"Polling authorizations (attempt %d/%d)",
attempt + 1,
max_attempts,
)
time.sleep(interval)
raise AcmeTimeoutError("Authorization polling timed out")
[docs]
def finalize_order(
self,
key_type: KeyType = KeyType.EC256,
common_name: str | None = None,
csr: bytes | str | None = None,
) -> None:
"""Finalize the order by submitting a CSR.
Either generates a private key and CSR internally, or uses an
externally provided CSR. When using an external CSR, the caller
is responsible for managing the corresponding private key.
Args:
key_type: Key type for the certificate. Default EC256.
Ignored when ``csr`` is provided.
common_name: Common name override (defaults to first identifier).
Ignored when ``csr`` is provided.
csr: Externally generated CSR in PEM (str/bytes) or DER (bytes)
format. When provided, no private key is generated or saved.
Raises:
AcmeOrderError: If finalization fails.
AcmeConfigurationError: If the provided CSR cannot be parsed.
"""
if not self._order:
raise AcmeOrderError("No order exists")
if not self._account:
raise AcmeAuthenticationError("Account not created")
# Refresh order status
self._refresh_order()
if not self._order.is_ready:
raise AcmeOrderError(
f"Order not ready for finalization (status: {self._order.status})"
)
if csr is not None:
# Use externally provided CSR
csr_der = self._parse_external_csr(csr)
logger.info("Using externally provided CSR")
else:
# Generate key and CSR
logger.info("Generating %s key and CSR", key_type.value)
cert_key = generate_private_key(key_type)
csr_der = create_csr(
list(self._order.identifiers),
cert_key,
common_name=common_name,
)
# Save the private key
cert_path, key_path = self._account.get_certificate_paths(
self._order.common_name
)
key_path.parent.mkdir(parents=True, exist_ok=True)
key_path.write_bytes(serialize_private_key(cert_key))
logger.info("Certificate key saved to %s", key_path)
# Submit CSR
payload = {"csr": base64url_encode(csr_der)}
logger.info("Finalizing order")
self._http.post(
self._order.finalize_url,
payload,
self._account.key,
kid=self._account.uri,
)
# Poll for certificate
self._poll_order()
# Update saved order
self._save_order()
@staticmethod
def _parse_external_csr(csr: bytes | str) -> bytes:
"""Parse an externally provided CSR into DER format.
Args:
csr: CSR in PEM (str/bytes) or DER (bytes) format.
Returns:
DER-encoded CSR bytes.
Raises:
AcmeConfigurationError: If the CSR cannot be parsed.
"""
from cryptography import x509
csr_bytes = csr.encode("utf-8") if isinstance(csr, str) else csr
# Try PEM first
try:
parsed = x509.load_pem_x509_csr(csr_bytes)
return parsed.public_bytes(serialization.Encoding.DER)
except (ValueError, Exception):
pass
# Try DER
try:
parsed = x509.load_der_x509_csr(csr_bytes)
return parsed.public_bytes(serialization.Encoding.DER)
except (ValueError, Exception):
pass
raise AcmeConfigurationError(
"Unable to parse external CSR. Provide a valid PEM or DER encoded CSR."
)
def _refresh_order(self) -> None:
"""Refresh the current order from the server."""
if not self._order or not self._account or not self._account.uri:
return
response = self._http.post_as_get(
self._order.url,
self._account.key,
self._account.uri,
)
data = response.json()
self._order.update_from_dict(data)
def _poll_order(
self,
max_attempts: int = 30,
interval: int = 2,
) -> None:
"""Poll order until certificate is ready.
Args:
max_attempts: Maximum polling attempts.
interval: Seconds between polls.
Raises:
AcmeOrderError: If order fails.
AcmeTimeoutError: If polling times out.
"""
if not self._order or not self._account:
return
for attempt in range(max_attempts):
self._refresh_order()
if self._order.is_valid:
logger.info("Order valid, certificate ready")
return
if self._order.is_invalid:
error_detail = ""
if self._order.error:
error_detail = self._order.error.get("detail", "")
raise AcmeOrderError(f"Order failed: {error_detail}")
logger.debug(
"Polling order (attempt %d/%d, status: %s)",
attempt + 1,
max_attempts,
self._order.status,
)
time.sleep(interval)
raise AcmeTimeoutError("Order polling timed out")
[docs]
def get_certificate(
self,
preferred_chain: str | None = None,
) -> tuple[str, str | None]:
"""Download the issued certificate.
Args:
preferred_chain: Preferred certificate chain issuer CN.
If the server provides multiple chains, select the one
whose issuer CN contains this string. Default None (use default chain).
Returns:
Tuple of (certificate_pem, private_key_pem).
The certificate includes the full chain.
The private key is ``None`` when an external CSR was used
during finalization (the caller already holds the key).
Raises:
AcmeCertificateError: If certificate download fails.
"""
if not self._order:
raise AcmeOrderError("No order exists")
if not self._account or not self._account.uri:
raise AcmeAuthenticationError("Account not created")
# Refresh order to get certificate URL
self._refresh_order()
if not self._order.certificate_url:
raise AcmeCertificateError("Certificate URL not available")
# Download certificate
logger.info("Downloading certificate")
response = self._http.post_as_get(
self._order.certificate_url,
self._account.key,
self._account.uri,
)
cert_pem = response.text
# Check for alternate chains if preferred chain is specified
if preferred_chain:
cert_pem = self._select_preferred_chain(response, preferred_chain) or cert_pem
# Save certificate
cert_path, key_path = self._account.get_certificate_paths(
self._order.common_name
)
cert_path.parent.mkdir(parents=True, exist_ok=True)
cert_path.write_text(cert_pem)
logger.info("Certificate saved to %s", cert_path)
# Read private key (may not exist if an external CSR was used)
key_pem: str | None = None
if key_path.exists():
key_pem = key_path.read_text()
# Clear saved order (completed)
self._clear_order()
return cert_pem, key_pem
def _select_preferred_chain(
self,
response: Any,
preferred_chain: str,
) -> str | None:
"""Select preferred certificate chain from Link headers.
Args:
response: HTTP response with potential Link headers.
preferred_chain: Preferred issuer CN substring.
Returns:
PEM certificate if preferred chain found, None otherwise.
"""
from cryptography import x509
# Parse Link headers for alternate chains
link_header = response.headers.get("Link", "")
alternate_urls: list[str] = []
for link in link_header.split(","):
link = link.strip()
if 'rel="alternate"' in link:
# Extract URL from <url>
start = link.find("<")
end = link.find(">")
if start != -1 and end != -1:
alternate_urls.append(link[start + 1 : end])
if not alternate_urls:
logger.debug("No alternate certificate chains available")
return None
logger.info("Found %d alternate certificate chain(s)", len(alternate_urls))
# Check default chain first
try:
default_cert = x509.load_pem_x509_certificate(response.text.encode())
issuer_cn = default_cert.issuer.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if issuer_cn and preferred_chain.lower() in str(issuer_cn[0].value).lower():
logger.info("Default chain matches preferred issuer: %s", issuer_cn[0].value)
return None # Use default
except Exception as e:
logger.debug("Failed to parse default certificate: %s", e)
# Check alternate chains
if not self._account or not self._account.uri:
return None
for alt_url in alternate_urls:
try:
alt_response = self._http.post_as_get(
alt_url,
self._account.key,
self._account.uri,
)
alt_cert = x509.load_pem_x509_certificate(alt_response.text.encode())
issuer_cn = alt_cert.issuer.get_attributes_for_oid(
x509.oid.NameOID.COMMON_NAME
)
if issuer_cn and preferred_chain.lower() in str(issuer_cn[0].value).lower():
logger.info("Selected alternate chain with issuer: %s", issuer_cn[0].value)
return alt_response.text
except Exception as e:
logger.debug("Failed to fetch/parse alternate chain %s: %s", alt_url, e)
logger.warning("Preferred chain '%s' not found, using default", preferred_chain)
return None
[docs]
def deactivate_account(self) -> None:
"""Deactivate the current ACME account.
This permanently deactivates the account. Once deactivated,
the account cannot be used for any further operations and
cannot be reactivated.
Per RFC 8555 Section 7.3.7.
Raises:
AcmeAuthenticationError: If not authenticated or deactivation fails.
"""
if not self._account or not self._account.uri:
raise AcmeAuthenticationError("Account not created")
logger.info("Deactivating account: %s", self._account.uri)
payload = {"status": "deactivated"}
response = self._http.post(
self._account.uri,
payload,
self._account.key,
kid=self._account.uri,
)
# Update account status
data = response.json()
new_status = data.get("status", "deactivated")
self._account.save(self._account.uri, new_status)
logger.info("Account deactivated: %s", self._account.uri)
[docs]
def update_account(self, email: str | None = None) -> Account:
"""Update the account contact information.
Updates the account's contact email address with the ACME server.
Per RFC 8555 Section 7.3.2.
Args:
email: New email address for the account.
If None, keeps the current email.
Returns:
Updated Account object.
Raises:
AcmeAuthenticationError: If not authenticated or update fails.
"""
if not self._account or not self._account.uri:
raise AcmeAuthenticationError("Account not created")
if email is None:
email = self._email
logger.info("Updating account contact to: %s", email)
payload = {"contact": [f"mailto:{email}"]}
response = self._http.post(
self._account.uri,
payload,
self._account.key,
kid=self._account.uri,
)
data = response.json()
status = data.get("status", "valid")
self._account.save(self._account.uri, status)
self._email = email
logger.info("Account contact updated: %s", email)
return self._account
[docs]
def key_rollover(self) -> None:
"""Roll over the account key to a new key.
Generates a new account key and updates the ACME server to use it.
The old key is replaced and can no longer be used.
Per RFC 8555 Section 7.3.5.
Raises:
AcmeAuthenticationError: If not authenticated or rollover fails.
AcmeNetworkError: If key-change URL is not available.
"""
from acmeow._internal.crypto import generate_account_key, get_jwk
if not self._account or not self._account.uri:
raise AcmeAuthenticationError("Account not created")
key_change_url = self._directory.get("keyChange")
if not key_change_url:
raise AcmeNetworkError("Directory missing keyChange URL")
logger.info("Rolling over account key for: %s", self._account.uri)
# Generate new key
new_key = generate_account_key()
new_jwk = get_jwk(new_key)
# Create inner JWS payload
inner_payload = {
"account": self._account.uri,
"oldKey": self._account.jwk,
}
# Create inner JWS protected header
inner_protected = {
"alg": "ES256",
"jwk": new_jwk,
"url": key_change_url,
}
# Sign inner JWS with new key
from acmeow._internal.crypto import sign_es256
inner_protected_b64 = base64url_encode_json(inner_protected)
inner_payload_b64 = base64url_encode_json(inner_payload)
inner_signing_input = f"{inner_protected_b64}.{inner_payload_b64}".encode("ascii")
inner_signature = sign_es256(new_key, inner_signing_input)
inner_signature_b64 = base64url_encode(inner_signature)
# The inner JWS becomes the outer payload
outer_payload = {
"protected": inner_protected_b64,
"payload": inner_payload_b64,
"signature": inner_signature_b64,
}
# Send outer JWS signed with old key
self._http.post(
key_change_url,
outer_payload,
self._account.key,
kid=self._account.uri,
)
# Update account with new key
self._account.update_key(new_key)
logger.info("Account key rollover complete")
[docs]
def revoke_certificate(
self,
certificate: str | bytes,
reason: RevocationReason | None = None,
) -> None:
"""Revoke a certificate.
Revokes the specified certificate with the ACME server.
Per RFC 8555 Section 7.6.
Args:
certificate: PEM-encoded certificate string or DER-encoded bytes.
reason: Optional revocation reason code.
Raises:
AcmeAuthenticationError: If not authenticated.
AcmeCertificateError: If revocation fails.
AcmeNetworkError: If revokeCert URL is not available.
"""
import base64
from cryptography import x509
if not self._account or not self._account.uri:
raise AcmeAuthenticationError("Account not created")
revoke_cert_url = self._directory.get("revokeCert")
if not revoke_cert_url:
raise AcmeNetworkError("Directory missing revokeCert URL")
# Convert PEM to DER if needed
if isinstance(certificate, str):
cert = x509.load_pem_x509_certificate(certificate.encode())
cert_der = cert.public_bytes(serialization.Encoding.DER)
else:
cert_der = certificate
logger.info("Revoking certificate")
# Build payload
payload: dict[str, Any] = {
"certificate": base64.urlsafe_b64encode(cert_der).rstrip(b"=").decode("ascii"),
}
if reason is not None:
payload["reason"] = reason.value
self._http.post(
revoke_cert_url,
payload,
self._account.key,
kid=self._account.uri,
)
logger.info("Certificate revoked successfully")
[docs]
def close(self) -> None:
"""Close the client and release resources."""
self._http.close()
[docs]
def __enter__(self) -> AcmeClient:
"""Context manager entry."""
return self
[docs]
def __exit__(self, *args: object) -> None:
"""Context manager exit."""
self.close()