Source code for acmeow.handlers.dns

"""DNS-01 challenge handlers.

Provides handlers for DNS-01 challenge validation.
"""

from __future__ import annotations

import hashlib
import logging
from collections.abc import Callable

from acmeow._internal.encoding import base64url_encode
from acmeow.handlers.base import ChallengeHandler

logger = logging.getLogger(__name__)


[docs] class CallbackDnsHandler(ChallengeHandler): """DNS-01 handler using user-provided callbacks. This handler delegates DNS record management to user-provided callback functions, allowing integration with any DNS provider. Args: create_record: Callback to create a DNS TXT record. Signature: (domain: str, record_name: str, record_value: str) -> None - domain: The domain being validated (e.g., "example.com") - record_name: The full record name (e.g., "_acme-challenge.example.com") - record_value: The TXT record value (base64url SHA-256 hash) delete_record: Callback to delete a DNS TXT record. Signature: (domain: str, record_name: str) -> None propagation_delay: Seconds to wait after creating record. Default 60. Example: >>> def create_txt(domain, name, value): ... dns_api.create_record(name, "TXT", value) >>> def delete_txt(domain, name): ... dns_api.delete_record(name, "TXT") >>> handler = CallbackDnsHandler(create_txt, delete_txt) """
[docs] def __init__( self, create_record: Callable[[str, str, str], None], delete_record: Callable[[str, str], None], propagation_delay: int = 60, ) -> None: self._create_record = create_record self._delete_record = delete_record self._propagation_delay = propagation_delay
@property def propagation_delay(self) -> int: """Seconds to wait after creating DNS record for propagation.""" return self._propagation_delay
[docs] def setup(self, domain: str, token: str, key_authorization: str) -> None: """Create DNS TXT record for the challenge. The TXT record value is the base64url-encoded SHA-256 hash of the key authorization, as specified by RFC 8555. Args: domain: The domain being validated. token: The challenge token (not used for DNS record value). key_authorization: The key authorization string to hash. """ record_name = self._get_record_name(domain) record_value = self._compute_record_value(key_authorization) logger.info( "Creating DNS TXT record: %s = %s", record_name, record_value, ) self._create_record(domain, record_name, record_value)
[docs] def cleanup(self, domain: str, token: str) -> None: """Remove DNS TXT record. Args: domain: The domain that was validated. token: The challenge token (not used). """ record_name = self._get_record_name(domain) logger.info("Removing DNS TXT record: %s", record_name) try: self._delete_record(domain, record_name) except Exception as e: logger.warning("Failed to cleanup DNS record %s: %s", record_name, e)
@staticmethod def _get_record_name(domain: str) -> str: """Get the DNS record name for a domain. Args: domain: The domain being validated. Returns: The full record name (e.g., "_acme-challenge.example.com"). """ return f"_acme-challenge.{domain}" @staticmethod def _compute_record_value(key_authorization: str) -> str: """Compute the DNS TXT record value. Args: key_authorization: The key authorization string. Returns: Base64url-encoded SHA-256 hash of the key authorization. """ digest = hashlib.sha256(key_authorization.encode("utf-8")).digest() return base64url_encode(digest)