Package dns :: Module dnssec
[hide private]
[frames] | no frames]

Source Code for Module dns.dnssec

  1  # Copyright (C) 2003-2007, 2009, 2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  """Common DNSSEC-related functions and constants.""" 
 17   
 18  from io import BytesIO 
 19  import struct 
 20  import time 
 21   
 22  import dns.exception 
 23  import dns.hash 
 24  import dns.name 
 25  import dns.node 
 26  import dns.rdataset 
 27  import dns.rdata 
 28  import dns.rdatatype 
 29  import dns.rdataclass 
 30  from ._compat import string_types 
 31   
 32   
33 -class UnsupportedAlgorithm(dns.exception.DNSException):
34 35 """The DNSSEC algorithm is not supported."""
36 37
38 -class ValidationFailure(dns.exception.DNSException):
39 40 """The DNSSEC signature is invalid."""
41 42 RSAMD5 = 1 43 DH = 2 44 DSA = 3 45 ECC = 4 46 RSASHA1 = 5 47 DSANSEC3SHA1 = 6 48 RSASHA1NSEC3SHA1 = 7 49 RSASHA256 = 8 50 RSASHA512 = 10 51 ECDSAP256SHA256 = 13 52 ECDSAP384SHA384 = 14 53 INDIRECT = 252 54 PRIVATEDNS = 253 55 PRIVATEOID = 254 56 57 _algorithm_by_text = { 58 'RSAMD5': RSAMD5, 59 'DH': DH, 60 'DSA': DSA, 61 'ECC': ECC, 62 'RSASHA1': RSASHA1, 63 'DSANSEC3SHA1': DSANSEC3SHA1, 64 'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1, 65 'RSASHA256': RSASHA256, 66 'RSASHA512': RSASHA512, 67 'INDIRECT': INDIRECT, 68 'ECDSAP256SHA256': ECDSAP256SHA256, 69 'ECDSAP384SHA384': ECDSAP384SHA384, 70 'PRIVATEDNS': PRIVATEDNS, 71 'PRIVATEOID': PRIVATEOID, 72 } 73 74 # We construct the inverse mapping programmatically to ensure that we 75 # cannot make any mistakes (e.g. omissions, cut-and-paste errors) that 76 # would cause the mapping not to be true inverse. 77 78 _algorithm_by_value = dict((y, x) for x, y in _algorithm_by_text.items()) 79 80
81 -def algorithm_from_text(text):
82 """Convert text into a DNSSEC algorithm value 83 @rtype: int""" 84 85 value = _algorithm_by_text.get(text.upper()) 86 if value is None: 87 value = int(text) 88 return value
89 90
91 -def algorithm_to_text(value):
92 """Convert a DNSSEC algorithm value to text 93 @rtype: string""" 94 95 text = _algorithm_by_value.get(value) 96 if text is None: 97 text = str(value) 98 return text
99 100
101 -def _to_rdata(record, origin):
102 s = BytesIO() 103 record.to_wire(s, origin=origin) 104 return s.getvalue()
105 106
107 -def key_id(key, origin=None):
108 rdata = _to_rdata(key, origin) 109 rdata = bytearray(rdata) 110 if key.algorithm == RSAMD5: 111 return (rdata[-3] << 8) + rdata[-2] 112 else: 113 total = 0 114 for i in range(len(rdata) // 2): 115 total += (rdata[2 * i] << 8) + \ 116 rdata[2 * i + 1] 117 if len(rdata) % 2 != 0: 118 total += rdata[len(rdata) - 1] << 8 119 total += ((total >> 16) & 0xffff) 120 return total & 0xffff
121 122
123 -def make_ds(name, key, algorithm, origin=None):
124 if algorithm.upper() == 'SHA1': 125 dsalg = 1 126 hash = dns.hash.hashes['SHA1']() 127 elif algorithm.upper() == 'SHA256': 128 dsalg = 2 129 hash = dns.hash.hashes['SHA256']() 130 else: 131 raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) 132 133 if isinstance(name, string_types): 134 name = dns.name.from_text(name, origin) 135 hash.update(name.canonicalize().to_wire()) 136 hash.update(_to_rdata(key, origin)) 137 digest = hash.digest() 138 139 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest 140 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, 141 len(dsrdata))
142 143
144 -def _find_candidate_keys(keys, rrsig):
145 candidate_keys = [] 146 value = keys.get(rrsig.signer) 147 if value is None: 148 return None 149 if isinstance(value, dns.node.Node): 150 try: 151 rdataset = value.find_rdataset(dns.rdataclass.IN, 152 dns.rdatatype.DNSKEY) 153 except KeyError: 154 return None 155 else: 156 rdataset = value 157 for rdata in rdataset: 158 if rdata.algorithm == rrsig.algorithm and \ 159 key_id(rdata) == rrsig.key_tag: 160 candidate_keys.append(rdata) 161 return candidate_keys
162 163
164 -def _is_rsa(algorithm):
165 return algorithm in (RSAMD5, RSASHA1, 166 RSASHA1NSEC3SHA1, RSASHA256, 167 RSASHA512)
168 169
170 -def _is_dsa(algorithm):
171 return algorithm in (DSA, DSANSEC3SHA1)
172 173
174 -def _is_ecdsa(algorithm):
175 return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384))
176 177
178 -def _is_md5(algorithm):
179 return algorithm == RSAMD5
180 181
182 -def _is_sha1(algorithm):
183 return algorithm in (DSA, RSASHA1, 184 DSANSEC3SHA1, RSASHA1NSEC3SHA1)
185 186
187 -def _is_sha256(algorithm):
188 return algorithm in (RSASHA256, ECDSAP256SHA256)
189 190
191 -def _is_sha384(algorithm):
192 return algorithm == ECDSAP384SHA384
193 194
195 -def _is_sha512(algorithm):
196 return algorithm == RSASHA512
197 198
199 -def _make_hash(algorithm):
200 if _is_md5(algorithm): 201 return dns.hash.hashes['MD5']() 202 if _is_sha1(algorithm): 203 return dns.hash.hashes['SHA1']() 204 if _is_sha256(algorithm): 205 return dns.hash.hashes['SHA256']() 206 if _is_sha384(algorithm): 207 return dns.hash.hashes['SHA384']() 208 if _is_sha512(algorithm): 209 return dns.hash.hashes['SHA512']() 210 raise ValidationFailure('unknown hash for algorithm %u' % algorithm)
211 212
213 -def _make_algorithm_id(algorithm):
214 if _is_md5(algorithm): 215 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05] 216 elif _is_sha1(algorithm): 217 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a] 218 elif _is_sha256(algorithm): 219 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01] 220 elif _is_sha512(algorithm): 221 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03] 222 else: 223 raise ValidationFailure('unknown algorithm %u' % algorithm) 224 olen = len(oid) 225 dlen = _make_hash(algorithm).digest_size 226 idbytes = [0x30] + [8 + olen + dlen] + \ 227 [0x30, olen + 4] + [0x06, olen] + oid + \ 228 [0x05, 0x00] + [0x04, dlen] 229 return struct.pack('!%dB' % len(idbytes), *idbytes)
230 231
232 -def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
233 """Validate an RRset against a single signature rdata 234 235 The owner name of the rrsig is assumed to be the same as the owner name 236 of the rrset. 237 238 @param rrset: The RRset to validate 239 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) 240 tuple 241 @param rrsig: The signature rdata 242 @type rrsig: dns.rrset.Rdata 243 @param keys: The key dictionary. 244 @type keys: a dictionary keyed by dns.name.Name with node or rdataset 245 values 246 @param origin: The origin to use for relative names 247 @type origin: dns.name.Name or None 248 @param now: The time to use when validating the signatures. The default 249 is the current time. 250 @type now: int 251 """ 252 253 if isinstance(origin, string_types): 254 origin = dns.name.from_text(origin, dns.name.root) 255 256 for candidate_key in _find_candidate_keys(keys, rrsig): 257 if not candidate_key: 258 raise ValidationFailure('unknown key') 259 260 # For convenience, allow the rrset to be specified as a (name, 261 # rdataset) tuple as well as a proper rrset 262 if isinstance(rrset, tuple): 263 rrname = rrset[0] 264 rdataset = rrset[1] 265 else: 266 rrname = rrset.name 267 rdataset = rrset 268 269 if now is None: 270 now = time.time() 271 if rrsig.expiration < now: 272 raise ValidationFailure('expired') 273 if rrsig.inception > now: 274 raise ValidationFailure('not yet valid') 275 276 hash = _make_hash(rrsig.algorithm) 277 278 if _is_rsa(rrsig.algorithm): 279 keyptr = candidate_key.key 280 (bytes_,) = struct.unpack('!B', keyptr[0:1]) 281 keyptr = keyptr[1:] 282 if bytes_ == 0: 283 (bytes_,) = struct.unpack('!H', keyptr[0:2]) 284 keyptr = keyptr[2:] 285 rsa_e = keyptr[0:bytes_] 286 rsa_n = keyptr[bytes_:] 287 keylen = len(rsa_n) * 8 288 pubkey = Crypto.PublicKey.RSA.construct( 289 (Crypto.Util.number.bytes_to_long(rsa_n), 290 Crypto.Util.number.bytes_to_long(rsa_e))) 291 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),) 292 elif _is_dsa(rrsig.algorithm): 293 keyptr = candidate_key.key 294 (t,) = struct.unpack('!B', keyptr[0:1]) 295 keyptr = keyptr[1:] 296 octets = 64 + t * 8 297 dsa_q = keyptr[0:20] 298 keyptr = keyptr[20:] 299 dsa_p = keyptr[0:octets] 300 keyptr = keyptr[octets:] 301 dsa_g = keyptr[0:octets] 302 keyptr = keyptr[octets:] 303 dsa_y = keyptr[0:octets] 304 pubkey = Crypto.PublicKey.DSA.construct( 305 (Crypto.Util.number.bytes_to_long(dsa_y), 306 Crypto.Util.number.bytes_to_long(dsa_g), 307 Crypto.Util.number.bytes_to_long(dsa_p), 308 Crypto.Util.number.bytes_to_long(dsa_q))) 309 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:]) 310 sig = (Crypto.Util.number.bytes_to_long(dsa_r), 311 Crypto.Util.number.bytes_to_long(dsa_s)) 312 elif _is_ecdsa(rrsig.algorithm): 313 if rrsig.algorithm == ECDSAP256SHA256: 314 curve = ecdsa.curves.NIST256p 315 key_len = 32 316 elif rrsig.algorithm == ECDSAP384SHA384: 317 curve = ecdsa.curves.NIST384p 318 key_len = 48 319 else: 320 # shouldn't happen 321 raise ValidationFailure('unknown ECDSA curve') 322 keyptr = candidate_key.key 323 x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len]) 324 y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2]) 325 assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y) 326 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) 327 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, 328 curve) 329 pubkey = ECKeyWrapper(verifying_key, key_len) 330 r = rrsig.signature[:key_len] 331 s = rrsig.signature[key_len:] 332 sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r), 333 Crypto.Util.number.bytes_to_long(s)) 334 else: 335 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 336 337 hash.update(_to_rdata(rrsig, origin)[:18]) 338 hash.update(rrsig.signer.to_digestable(origin)) 339 340 if rrsig.labels < len(rrname) - 1: 341 suffix = rrname.split(rrsig.labels + 1)[1] 342 rrname = dns.name.from_text('*', suffix) 343 rrnamebuf = rrname.to_digestable(origin) 344 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, 345 rrsig.original_ttl) 346 rrlist = sorted(rdataset) 347 for rr in rrlist: 348 hash.update(rrnamebuf) 349 hash.update(rrfixed) 350 rrdata = rr.to_digestable(origin) 351 rrlen = struct.pack('!H', len(rrdata)) 352 hash.update(rrlen) 353 hash.update(rrdata) 354 355 digest = hash.digest() 356 357 if _is_rsa(rrsig.algorithm): 358 # PKCS1 algorithm identifier goop 359 digest = _make_algorithm_id(rrsig.algorithm) + digest 360 padlen = keylen // 8 - len(digest) - 3 361 digest = struct.pack('!%dB' % (2 + padlen + 1), 362 *([0, 1] + [0xFF] * padlen + [0])) + digest 363 elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm): 364 pass 365 else: 366 # Raise here for code clarity; this won't actually ever happen 367 # since if the algorithm is really unknown we'd already have 368 # raised an exception above 369 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 370 371 if pubkey.verify(digest, sig): 372 return 373 raise ValidationFailure('verify failure')
374 375
376 -def _validate(rrset, rrsigset, keys, origin=None, now=None):
377 """Validate an RRset 378 379 @param rrset: The RRset to validate 380 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) 381 tuple 382 @param rrsigset: The signature RRset 383 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset) 384 tuple 385 @param keys: The key dictionary. 386 @type keys: a dictionary keyed by dns.name.Name with node or rdataset 387 values 388 @param origin: The origin to use for relative names 389 @type origin: dns.name.Name or None 390 @param now: The time to use when validating the signatures. The default 391 is the current time. 392 @type now: int 393 """ 394 395 if isinstance(origin, string_types): 396 origin = dns.name.from_text(origin, dns.name.root) 397 398 if isinstance(rrset, tuple): 399 rrname = rrset[0] 400 else: 401 rrname = rrset.name 402 403 if isinstance(rrsigset, tuple): 404 rrsigname = rrsigset[0] 405 rrsigrdataset = rrsigset[1] 406 else: 407 rrsigname = rrsigset.name 408 rrsigrdataset = rrsigset 409 410 rrname = rrname.choose_relativity(origin) 411 rrsigname = rrname.choose_relativity(origin) 412 if rrname != rrsigname: 413 raise ValidationFailure("owner names do not match") 414 415 for rrsig in rrsigrdataset: 416 try: 417 _validate_rrsig(rrset, rrsig, keys, origin, now) 418 return 419 except ValidationFailure: 420 pass 421 raise ValidationFailure("no RRSIGs validated")
422 423
424 -def _need_pycrypto(*args, **kwargs):
425 raise NotImplementedError("DNSSEC validation requires pycrypto")
426 427 try: 428 import Crypto.PublicKey.RSA 429 import Crypto.PublicKey.DSA 430 import Crypto.Util.number 431 validate = _validate 432 validate_rrsig = _validate_rrsig 433 _have_pycrypto = True 434 except ImportError: 435 validate = _need_pycrypto 436 validate_rrsig = _need_pycrypto 437 _have_pycrypto = False 438 439 try: 440 import ecdsa 441 import ecdsa.ecdsa 442 import ecdsa.ellipticcurve 443 import ecdsa.keys 444 _have_ecdsa = True 445
446 - class ECKeyWrapper(object):
447
448 - def __init__(self, key, key_len):
449 self.key = key 450 self.key_len = key_len
451
452 - def verify(self, digest, sig):
453 diglong = Crypto.Util.number.bytes_to_long(digest) 454 return self.key.pubkey.verifies(diglong, sig)
455 456 except ImportError: 457 _have_ecdsa = False 458