Package dns :: Module dnssec
[hide private]
[frames] | no frames]

Source Code for Module dns.dnssec

  1  # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 
  2   
  3  # Copyright (C) 2003-2017 Nominum, Inc. 
  4  # 
  5  # Permission to use, copy, modify, and distribute this software and its 
  6  # documentation for any purpose with or without fee is hereby granted, 
  7  # provided that the above copyright notice and this permission notice 
  8  # appear in all copies. 
  9  # 
 10  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
 11  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 12  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 13  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 14  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 15  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 16  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 17   
 18  """Common DNSSEC-related functions and constants.""" 
 19   
 20  from io import BytesIO 
 21  import struct 
 22  import time 
 23   
 24  import dns.exception 
 25  import dns.name 
 26  import dns.node 
 27  import dns.rdataset 
 28  import dns.rdata 
 29  import dns.rdatatype 
 30  import dns.rdataclass 
 31  from ._compat import string_types 
 32   
 33   
34 -class UnsupportedAlgorithm(dns.exception.DNSException):
35 """The DNSSEC algorithm is not supported."""
36 37
38 -class ValidationFailure(dns.exception.DNSException):
39 """The DNSSEC signature is invalid."""
40 41 42 #: RSAMD5 43 RSAMD5 = 1 44 #: DH 45 DH = 2 46 #: DSA 47 DSA = 3 48 #: ECC 49 ECC = 4 50 #: RSASHA1 51 RSASHA1 = 5 52 #: DSANSEC3SHA1 53 DSANSEC3SHA1 = 6 54 #: RSASHA1NSEC3SHA1 55 RSASHA1NSEC3SHA1 = 7 56 #: RSASHA256 57 RSASHA256 = 8 58 #: RSASHA512 59 RSASHA512 = 10 60 #: ECDSAP256SHA256 61 ECDSAP256SHA256 = 13 62 #: ECDSAP384SHA384 63 ECDSAP384SHA384 = 14 64 #: INDIRECT 65 INDIRECT = 252 66 #: PRIVATEDNS 67 PRIVATEDNS = 253 68 #: PRIVATEOID 69 PRIVATEOID = 254 70 71 _algorithm_by_text = { 72 'RSAMD5': RSAMD5, 73 'DH': DH, 74 'DSA': DSA, 75 'ECC': ECC, 76 'RSASHA1': RSASHA1, 77 'DSANSEC3SHA1': DSANSEC3SHA1, 78 'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1, 79 'RSASHA256': RSASHA256, 80 'RSASHA512': RSASHA512, 81 'INDIRECT': INDIRECT, 82 'ECDSAP256SHA256': ECDSAP256SHA256, 83 'ECDSAP384SHA384': ECDSAP384SHA384, 84 'PRIVATEDNS': PRIVATEDNS, 85 'PRIVATEOID': PRIVATEOID, 86 } 87 88 # We construct the inverse mapping programmatically to ensure that we 89 # cannot make any mistakes (e.g. omissions, cut-and-paste errors) that 90 # would cause the mapping not to be true inverse. 91 92 _algorithm_by_value = {y: x for x, y in _algorithm_by_text.items()} 93 94
95 -def algorithm_from_text(text):
96 """Convert text into a DNSSEC algorithm value. 97 98 Returns an ``int``. 99 """ 100 101 value = _algorithm_by_text.get(text.upper()) 102 if value is None: 103 value = int(text) 104 return value
105 106
107 -def algorithm_to_text(value):
108 """Convert a DNSSEC algorithm value to text 109 110 Returns a ``str``. 111 """ 112 113 text = _algorithm_by_value.get(value) 114 if text is None: 115 text = str(value) 116 return text
117 118
119 -def _to_rdata(record, origin):
120 s = BytesIO() 121 record.to_wire(s, origin=origin) 122 return s.getvalue()
123 124
125 -def key_id(key, origin=None):
126 """Return the key id (a 16-bit number) for the specified key. 127 128 Note the *origin* parameter of this function is historical and 129 is not needed. 130 131 Returns an ``int`` between 0 and 65535. 132 """ 133 134 rdata = _to_rdata(key, origin) 135 rdata = bytearray(rdata) 136 if key.algorithm == RSAMD5: 137 return (rdata[-3] << 8) + rdata[-2] 138 else: 139 total = 0 140 for i in range(len(rdata) // 2): 141 total += (rdata[2 * i] << 8) + \ 142 rdata[2 * i + 1] 143 if len(rdata) % 2 != 0: 144 total += rdata[len(rdata) - 1] << 8 145 total += ((total >> 16) & 0xffff) 146 return total & 0xffff
147 148
149 -def make_ds(name, key, algorithm, origin=None):
150 """Create a DS record for a DNSSEC key. 151 152 *name* is the owner name of the DS record. 153 154 *key* is a ``dns.rdtypes.ANY.DNSKEY``. 155 156 *algorithm* is a string describing which hash algorithm to use. The 157 currently supported hashes are "SHA1" and "SHA256". Case does not 158 matter for these strings. 159 160 *origin* is a ``dns.name.Name`` and will be used as the origin 161 if *key* is a relative name. 162 163 Returns a ``dns.rdtypes.ANY.DS``. 164 """ 165 166 if algorithm.upper() == 'SHA1': 167 dsalg = 1 168 hash = SHA1.new() 169 elif algorithm.upper() == 'SHA256': 170 dsalg = 2 171 hash = SHA256.new() 172 else: 173 raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) 174 175 if isinstance(name, string_types): 176 name = dns.name.from_text(name, origin) 177 hash.update(name.canonicalize().to_wire()) 178 hash.update(_to_rdata(key, origin)) 179 digest = hash.digest() 180 181 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest 182 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, 183 len(dsrdata))
184 185
186 -def _find_candidate_keys(keys, rrsig):
187 candidate_keys = [] 188 value = keys.get(rrsig.signer) 189 if value is None: 190 return None 191 if isinstance(value, dns.node.Node): 192 try: 193 rdataset = value.find_rdataset(dns.rdataclass.IN, 194 dns.rdatatype.DNSKEY) 195 except KeyError: 196 return None 197 else: 198 rdataset = value 199 for rdata in rdataset: 200 if rdata.algorithm == rrsig.algorithm and \ 201 key_id(rdata) == rrsig.key_tag: 202 candidate_keys.append(rdata) 203 return candidate_keys
204 205
206 -def _is_rsa(algorithm):
207 return algorithm in (RSAMD5, RSASHA1, 208 RSASHA1NSEC3SHA1, RSASHA256, 209 RSASHA512)
210 211
212 -def _is_dsa(algorithm):
213 return algorithm in (DSA, DSANSEC3SHA1)
214 215
216 -def _is_ecdsa(algorithm):
217 return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384))
218 219
220 -def _is_md5(algorithm):
221 return algorithm == RSAMD5
222 223
224 -def _is_sha1(algorithm):
225 return algorithm in (DSA, RSASHA1, 226 DSANSEC3SHA1, RSASHA1NSEC3SHA1)
227 228
229 -def _is_sha256(algorithm):
230 return algorithm in (RSASHA256, ECDSAP256SHA256)
231 232
233 -def _is_sha384(algorithm):
234 return algorithm == ECDSAP384SHA384
235 236
237 -def _is_sha512(algorithm):
238 return algorithm == RSASHA512
239 240
241 -def _make_hash(algorithm):
242 if _is_md5(algorithm): 243 return MD5.new() 244 if _is_sha1(algorithm): 245 return SHA1.new() 246 if _is_sha256(algorithm): 247 return SHA256.new() 248 if _is_sha384(algorithm): 249 return SHA384.new() 250 if _is_sha512(algorithm): 251 return SHA512.new() 252 raise ValidationFailure('unknown hash for algorithm %u' % algorithm)
253 254
255 -def _make_algorithm_id(algorithm):
256 if _is_md5(algorithm): 257 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05] 258 elif _is_sha1(algorithm): 259 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a] 260 elif _is_sha256(algorithm): 261 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01] 262 elif _is_sha512(algorithm): 263 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03] 264 else: 265 raise ValidationFailure('unknown algorithm %u' % algorithm) 266 olen = len(oid) 267 dlen = _make_hash(algorithm).digest_size 268 idbytes = [0x30] + [8 + olen + dlen] + \ 269 [0x30, olen + 4] + [0x06, olen] + oid + \ 270 [0x05, 0x00] + [0x04, dlen] 271 return struct.pack('!%dB' % len(idbytes), *idbytes)
272 273
274 -def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
275 """Validate an RRset against a single signature rdata 276 277 The owner name of *rrsig* is assumed to be the same as the owner name 278 of *rrset*. 279 280 *rrset* is the RRset to validate. It can be a ``dns.rrset.RRset`` or 281 a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 282 283 *rrsig* is a ``dns.rdata.Rdata``, the signature to validate. 284 285 *keys* is the key dictionary, used to find the DNSKEY associated with 286 a given name. The dictionary is keyed by a ``dns.name.Name``, and has 287 ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values. 288 289 *origin* is a ``dns.name.Name``, the origin to use for relative names. 290 291 *now* is an ``int``, the time to use when validating the signatures, 292 in seconds since the UNIX epoch. The default is the current time. 293 """ 294 295 if isinstance(origin, string_types): 296 origin = dns.name.from_text(origin, dns.name.root) 297 298 candidate_keys = _find_candidate_keys(keys, rrsig) 299 if candidate_keys is None: 300 raise ValidationFailure('unknown key') 301 302 for candidate_key in candidate_keys: 303 # For convenience, allow the rrset to be specified as a (name, 304 # rdataset) tuple as well as a proper rrset 305 if isinstance(rrset, tuple): 306 rrname = rrset[0] 307 rdataset = rrset[1] 308 else: 309 rrname = rrset.name 310 rdataset = rrset 311 312 if now is None: 313 now = time.time() 314 if rrsig.expiration < now: 315 raise ValidationFailure('expired') 316 if rrsig.inception > now: 317 raise ValidationFailure('not yet valid') 318 319 hash = _make_hash(rrsig.algorithm) 320 321 if _is_rsa(rrsig.algorithm): 322 keyptr = candidate_key.key 323 (bytes_,) = struct.unpack('!B', keyptr[0:1]) 324 keyptr = keyptr[1:] 325 if bytes_ == 0: 326 (bytes_,) = struct.unpack('!H', keyptr[0:2]) 327 keyptr = keyptr[2:] 328 rsa_e = keyptr[0:bytes_] 329 rsa_n = keyptr[bytes_:] 330 try: 331 pubkey = CryptoRSA.construct( 332 (number.bytes_to_long(rsa_n), 333 number.bytes_to_long(rsa_e))) 334 except ValueError: 335 raise ValidationFailure('invalid public key') 336 sig = rrsig.signature 337 elif _is_dsa(rrsig.algorithm): 338 keyptr = candidate_key.key 339 (t,) = struct.unpack('!B', keyptr[0:1]) 340 keyptr = keyptr[1:] 341 octets = 64 + t * 8 342 dsa_q = keyptr[0:20] 343 keyptr = keyptr[20:] 344 dsa_p = keyptr[0:octets] 345 keyptr = keyptr[octets:] 346 dsa_g = keyptr[0:octets] 347 keyptr = keyptr[octets:] 348 dsa_y = keyptr[0:octets] 349 pubkey = CryptoDSA.construct( 350 (number.bytes_to_long(dsa_y), 351 number.bytes_to_long(dsa_g), 352 number.bytes_to_long(dsa_p), 353 number.bytes_to_long(dsa_q))) 354 sig = rrsig.signature[1:] 355 elif _is_ecdsa(rrsig.algorithm): 356 # use ecdsa for NIST-384p -- not currently supported by pycryptodome 357 358 keyptr = candidate_key.key 359 360 if rrsig.algorithm == ECDSAP256SHA256: 361 curve = ecdsa.curves.NIST256p 362 key_len = 32 363 elif rrsig.algorithm == ECDSAP384SHA384: 364 curve = ecdsa.curves.NIST384p 365 key_len = 48 366 367 x = number.bytes_to_long(keyptr[0:key_len]) 368 y = number.bytes_to_long(keyptr[key_len:key_len * 2]) 369 if not ecdsa.ecdsa.point_is_valid(curve.generator, x, y): 370 raise ValidationFailure('invalid ECDSA key') 371 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) 372 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, 373 curve) 374 pubkey = ECKeyWrapper(verifying_key, key_len) 375 r = rrsig.signature[:key_len] 376 s = rrsig.signature[key_len:] 377 sig = ecdsa.ecdsa.Signature(number.bytes_to_long(r), 378 number.bytes_to_long(s)) 379 380 else: 381 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 382 383 hash.update(_to_rdata(rrsig, origin)[:18]) 384 hash.update(rrsig.signer.to_digestable(origin)) 385 386 if rrsig.labels < len(rrname) - 1: 387 suffix = rrname.split(rrsig.labels + 1)[1] 388 rrname = dns.name.from_text('*', suffix) 389 rrnamebuf = rrname.to_digestable(origin) 390 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, 391 rrsig.original_ttl) 392 rrlist = sorted(rdataset) 393 for rr in rrlist: 394 hash.update(rrnamebuf) 395 hash.update(rrfixed) 396 rrdata = rr.to_digestable(origin) 397 rrlen = struct.pack('!H', len(rrdata)) 398 hash.update(rrlen) 399 hash.update(rrdata) 400 401 try: 402 if _is_rsa(rrsig.algorithm): 403 verifier = pkcs1_15.new(pubkey) 404 # will raise ValueError if verify fails: 405 verifier.verify(hash, sig) 406 elif _is_dsa(rrsig.algorithm): 407 verifier = DSS.new(pubkey, 'fips-186-3') 408 verifier.verify(hash, sig) 409 elif _is_ecdsa(rrsig.algorithm): 410 digest = hash.digest() 411 if not pubkey.verify(digest, sig): 412 raise ValueError 413 else: 414 # Raise here for code clarity; this won't actually ever happen 415 # since if the algorithm is really unknown we'd already have 416 # raised an exception above 417 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 418 # If we got here, we successfully verified so we can return without error 419 return 420 except ValueError: 421 # this happens on an individual validation failure 422 continue 423 # nothing verified -- raise failure: 424 raise ValidationFailure('verify failure')
425 426
427 -def _validate(rrset, rrsigset, keys, origin=None, now=None):
428 """Validate an RRset. 429 430 *rrset* is the RRset to validate. It can be a ``dns.rrset.RRset`` or 431 a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 432 433 *rrsigset* is the signature RRset to be validated. It can be a 434 ``dns.rrset.RRset`` or a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 435 436 *keys* is the key dictionary, used to find the DNSKEY associated with 437 a given name. The dictionary is keyed by a ``dns.name.Name``, and has 438 ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values. 439 440 *origin* is a ``dns.name.Name``, the origin to use for relative names. 441 442 *now* is an ``int``, the time to use when validating the signatures, 443 in seconds since the UNIX epoch. The default is the current time. 444 """ 445 446 if isinstance(origin, string_types): 447 origin = dns.name.from_text(origin, dns.name.root) 448 449 if isinstance(rrset, tuple): 450 rrname = rrset[0] 451 else: 452 rrname = rrset.name 453 454 if isinstance(rrsigset, tuple): 455 rrsigname = rrsigset[0] 456 rrsigrdataset = rrsigset[1] 457 else: 458 rrsigname = rrsigset.name 459 rrsigrdataset = rrsigset 460 461 rrname = rrname.choose_relativity(origin) 462 rrsigname = rrsigname.choose_relativity(origin) 463 if rrname != rrsigname: 464 raise ValidationFailure("owner names do not match") 465 466 for rrsig in rrsigrdataset: 467 try: 468 _validate_rrsig(rrset, rrsig, keys, origin, now) 469 return 470 except ValidationFailure: 471 pass 472 raise ValidationFailure("no RRSIGs validated")
473 474
475 -def _need_pycrypto(*args, **kwargs):
476 raise NotImplementedError("DNSSEC validation requires pycryptodome/pycryptodomex")
477 478 479 try: 480 try: 481 # test we're using pycryptodome, not pycrypto (which misses SHA1 for example) 482 from Crypto.Hash import MD5, SHA1, SHA256, SHA384, SHA512 483 from Crypto.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA 484 from Crypto.Signature import pkcs1_15, DSS 485 from Crypto.Util import number 486 except ImportError: 487 from Cryptodome.Hash import MD5, SHA1, SHA256, SHA384, SHA512 488 from Cryptodome.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA 489 from Cryptodome.Signature import pkcs1_15, DSS 490 from Cryptodome.Util import number 491 except ImportError: 492 validate = _need_pycrypto 493 validate_rrsig = _need_pycrypto 494 _have_pycrypto = False 495 _have_ecdsa = False 496 else: 497 validate = _validate 498 validate_rrsig = _validate_rrsig 499 _have_pycrypto = True 500 501 try: 502 import ecdsa 503 import ecdsa.ecdsa 504 import ecdsa.ellipticcurve 505 import ecdsa.keys 506 except ImportError: 507 _have_ecdsa = False 508 else: 509 _have_ecdsa = True 510
511 - class ECKeyWrapper(object):
512
513 - def __init__(self, key, key_len):
514 self.key = key 515 self.key_len = key_len
516
517 - def verify(self, digest, sig):
518 diglong = number.bytes_to_long(digest) 519 return self.key.pubkey.verifies(diglong, sig)
520