1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """Common DNSSEC-related functions and constants."""
17
18 from io import BytesIO
19 import struct
20 import time
21
22 import dns.exception
23 import dns.hash
24 import dns.name
25 import dns.node
26 import dns.rdataset
27 import dns.rdata
28 import dns.rdatatype
29 import dns.rdataclass
30 from ._compat import string_types
31
32
34
35 """The DNSSEC algorithm is not supported."""
36
37
39
40 """The DNSSEC signature is invalid."""
41
42 RSAMD5 = 1
43 DH = 2
44 DSA = 3
45 ECC = 4
46 RSASHA1 = 5
47 DSANSEC3SHA1 = 6
48 RSASHA1NSEC3SHA1 = 7
49 RSASHA256 = 8
50 RSASHA512 = 10
51 ECDSAP256SHA256 = 13
52 ECDSAP384SHA384 = 14
53 INDIRECT = 252
54 PRIVATEDNS = 253
55 PRIVATEOID = 254
56
57 _algorithm_by_text = {
58 'RSAMD5': RSAMD5,
59 'DH': DH,
60 'DSA': DSA,
61 'ECC': ECC,
62 'RSASHA1': RSASHA1,
63 'DSANSEC3SHA1': DSANSEC3SHA1,
64 'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1,
65 'RSASHA256': RSASHA256,
66 'RSASHA512': RSASHA512,
67 'INDIRECT': INDIRECT,
68 'ECDSAP256SHA256': ECDSAP256SHA256,
69 'ECDSAP384SHA384': ECDSAP384SHA384,
70 'PRIVATEDNS': PRIVATEDNS,
71 'PRIVATEOID': PRIVATEOID,
72 }
73
74
75
76
77
78 _algorithm_by_value = dict((y, x) for x, y in _algorithm_by_text.items())
79
80
82 """Convert text into a DNSSEC algorithm value
83 @rtype: int"""
84
85 value = _algorithm_by_text.get(text.upper())
86 if value is None:
87 value = int(text)
88 return value
89
90
92 """Convert a DNSSEC algorithm value to text
93 @rtype: string"""
94
95 text = _algorithm_by_value.get(value)
96 if text is None:
97 text = str(value)
98 return text
99
100
105
106
121
122
123 -def make_ds(name, key, algorithm, origin=None):
124 if algorithm.upper() == 'SHA1':
125 dsalg = 1
126 hash = dns.hash.hashes['SHA1']()
127 elif algorithm.upper() == 'SHA256':
128 dsalg = 2
129 hash = dns.hash.hashes['SHA256']()
130 else:
131 raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
132
133 if isinstance(name, string_types):
134 name = dns.name.from_text(name, origin)
135 hash.update(name.canonicalize().to_wire())
136 hash.update(_to_rdata(key, origin))
137 digest = hash.digest()
138
139 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
140 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
141 len(dsrdata))
142
143
162
163
168
169
172
173
176
177
180
181
185
186
189
190
193
194
197
198
211
212
214 if _is_md5(algorithm):
215 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
216 elif _is_sha1(algorithm):
217 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
218 elif _is_sha256(algorithm):
219 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
220 elif _is_sha512(algorithm):
221 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
222 else:
223 raise ValidationFailure('unknown algorithm %u' % algorithm)
224 olen = len(oid)
225 dlen = _make_hash(algorithm).digest_size
226 idbytes = [0x30] + [8 + olen + dlen] + \
227 [0x30, olen + 4] + [0x06, olen] + oid + \
228 [0x05, 0x00] + [0x04, dlen]
229 return struct.pack('!%dB' % len(idbytes), *idbytes)
230
231
233 """Validate an RRset against a single signature rdata
234
235 The owner name of the rrsig is assumed to be the same as the owner name
236 of the rrset.
237
238 @param rrset: The RRset to validate
239 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
240 tuple
241 @param rrsig: The signature rdata
242 @type rrsig: dns.rrset.Rdata
243 @param keys: The key dictionary.
244 @type keys: a dictionary keyed by dns.name.Name with node or rdataset
245 values
246 @param origin: The origin to use for relative names
247 @type origin: dns.name.Name or None
248 @param now: The time to use when validating the signatures. The default
249 is the current time.
250 @type now: int
251 """
252
253 if isinstance(origin, string_types):
254 origin = dns.name.from_text(origin, dns.name.root)
255
256 for candidate_key in _find_candidate_keys(keys, rrsig):
257 if not candidate_key:
258 raise ValidationFailure('unknown key')
259
260
261
262 if isinstance(rrset, tuple):
263 rrname = rrset[0]
264 rdataset = rrset[1]
265 else:
266 rrname = rrset.name
267 rdataset = rrset
268
269 if now is None:
270 now = time.time()
271 if rrsig.expiration < now:
272 raise ValidationFailure('expired')
273 if rrsig.inception > now:
274 raise ValidationFailure('not yet valid')
275
276 hash = _make_hash(rrsig.algorithm)
277
278 if _is_rsa(rrsig.algorithm):
279 keyptr = candidate_key.key
280 (bytes_,) = struct.unpack('!B', keyptr[0:1])
281 keyptr = keyptr[1:]
282 if bytes_ == 0:
283 (bytes_,) = struct.unpack('!H', keyptr[0:2])
284 keyptr = keyptr[2:]
285 rsa_e = keyptr[0:bytes_]
286 rsa_n = keyptr[bytes_:]
287 keylen = len(rsa_n) * 8
288 pubkey = Crypto.PublicKey.RSA.construct(
289 (Crypto.Util.number.bytes_to_long(rsa_n),
290 Crypto.Util.number.bytes_to_long(rsa_e)))
291 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
292 elif _is_dsa(rrsig.algorithm):
293 keyptr = candidate_key.key
294 (t,) = struct.unpack('!B', keyptr[0:1])
295 keyptr = keyptr[1:]
296 octets = 64 + t * 8
297 dsa_q = keyptr[0:20]
298 keyptr = keyptr[20:]
299 dsa_p = keyptr[0:octets]
300 keyptr = keyptr[octets:]
301 dsa_g = keyptr[0:octets]
302 keyptr = keyptr[octets:]
303 dsa_y = keyptr[0:octets]
304 pubkey = Crypto.PublicKey.DSA.construct(
305 (Crypto.Util.number.bytes_to_long(dsa_y),
306 Crypto.Util.number.bytes_to_long(dsa_g),
307 Crypto.Util.number.bytes_to_long(dsa_p),
308 Crypto.Util.number.bytes_to_long(dsa_q)))
309 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
310 sig = (Crypto.Util.number.bytes_to_long(dsa_r),
311 Crypto.Util.number.bytes_to_long(dsa_s))
312 elif _is_ecdsa(rrsig.algorithm):
313 if rrsig.algorithm == ECDSAP256SHA256:
314 curve = ecdsa.curves.NIST256p
315 key_len = 32
316 elif rrsig.algorithm == ECDSAP384SHA384:
317 curve = ecdsa.curves.NIST384p
318 key_len = 48
319 else:
320
321 raise ValidationFailure('unknown ECDSA curve')
322 keyptr = candidate_key.key
323 x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len])
324 y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2])
325 assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
326 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
327 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
328 curve)
329 pubkey = ECKeyWrapper(verifying_key, key_len)
330 r = rrsig.signature[:key_len]
331 s = rrsig.signature[key_len:]
332 sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r),
333 Crypto.Util.number.bytes_to_long(s))
334 else:
335 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
336
337 hash.update(_to_rdata(rrsig, origin)[:18])
338 hash.update(rrsig.signer.to_digestable(origin))
339
340 if rrsig.labels < len(rrname) - 1:
341 suffix = rrname.split(rrsig.labels + 1)[1]
342 rrname = dns.name.from_text('*', suffix)
343 rrnamebuf = rrname.to_digestable(origin)
344 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
345 rrsig.original_ttl)
346 rrlist = sorted(rdataset)
347 for rr in rrlist:
348 hash.update(rrnamebuf)
349 hash.update(rrfixed)
350 rrdata = rr.to_digestable(origin)
351 rrlen = struct.pack('!H', len(rrdata))
352 hash.update(rrlen)
353 hash.update(rrdata)
354
355 digest = hash.digest()
356
357 if _is_rsa(rrsig.algorithm):
358
359 digest = _make_algorithm_id(rrsig.algorithm) + digest
360 padlen = keylen // 8 - len(digest) - 3
361 digest = struct.pack('!%dB' % (2 + padlen + 1),
362 *([0, 1] + [0xFF] * padlen + [0])) + digest
363 elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
364 pass
365 else:
366
367
368
369 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
370
371 if pubkey.verify(digest, sig):
372 return
373 raise ValidationFailure('verify failure')
374
375
376 -def _validate(rrset, rrsigset, keys, origin=None, now=None):
377 """Validate an RRset
378
379 @param rrset: The RRset to validate
380 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
381 tuple
382 @param rrsigset: The signature RRset
383 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
384 tuple
385 @param keys: The key dictionary.
386 @type keys: a dictionary keyed by dns.name.Name with node or rdataset
387 values
388 @param origin: The origin to use for relative names
389 @type origin: dns.name.Name or None
390 @param now: The time to use when validating the signatures. The default
391 is the current time.
392 @type now: int
393 """
394
395 if isinstance(origin, string_types):
396 origin = dns.name.from_text(origin, dns.name.root)
397
398 if isinstance(rrset, tuple):
399 rrname = rrset[0]
400 else:
401 rrname = rrset.name
402
403 if isinstance(rrsigset, tuple):
404 rrsigname = rrsigset[0]
405 rrsigrdataset = rrsigset[1]
406 else:
407 rrsigname = rrsigset.name
408 rrsigrdataset = rrsigset
409
410 rrname = rrname.choose_relativity(origin)
411 rrsigname = rrname.choose_relativity(origin)
412 if rrname != rrsigname:
413 raise ValidationFailure("owner names do not match")
414
415 for rrsig in rrsigrdataset:
416 try:
417 _validate_rrsig(rrset, rrsig, keys, origin, now)
418 return
419 except ValidationFailure:
420 pass
421 raise ValidationFailure("no RRSIGs validated")
422
423
425 raise NotImplementedError("DNSSEC validation requires pycrypto")
426
427 try:
428 import Crypto.PublicKey.RSA
429 import Crypto.PublicKey.DSA
430 import Crypto.Util.number
431 validate = _validate
432 validate_rrsig = _validate_rrsig
433 _have_pycrypto = True
434 except ImportError:
435 validate = _need_pycrypto
436 validate_rrsig = _need_pycrypto
437 _have_pycrypto = False
438
439 try:
440 import ecdsa
441 import ecdsa.ecdsa
442 import ecdsa.ellipticcurve
443 import ecdsa.keys
444 _have_ecdsa = True
445
447
449 self.key = key
450 self.key_len = key_len
451
452 - def verify(self, digest, sig):
453 diglong = Crypto.Util.number.bytes_to_long(digest)
454 return self.key.pubkey.verifies(diglong, sig)
455
456 except ImportError:
457 _have_ecdsa = False
458