1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 """Common DNSSEC-related functions and constants."""
17
18 import cStringIO
19 import struct
20 import time
21
22 import dns.exception
23 import dns.hash
24 import dns.name
25 import dns.node
26 import dns.rdataset
27 import dns.rdata
28 import dns.rdatatype
29 import dns.rdataclass
30
32 """Raised if an algorithm is not supported."""
33 pass
34
36 """The DNSSEC signature is invalid."""
37 pass
38
39 RSAMD5 = 1
40 DH = 2
41 DSA = 3
42 ECC = 4
43 RSASHA1 = 5
44 DSANSEC3SHA1 = 6
45 RSASHA1NSEC3SHA1 = 7
46 RSASHA256 = 8
47 RSASHA512 = 10
48 ECDSAP256SHA256 = 13
49 ECDSAP384SHA384 = 14
50 INDIRECT = 252
51 PRIVATEDNS = 253
52 PRIVATEOID = 254
53
54 _algorithm_by_text = {
55 'RSAMD5' : RSAMD5,
56 'DH' : DH,
57 'DSA' : DSA,
58 'ECC' : ECC,
59 'RSASHA1' : RSASHA1,
60 'DSANSEC3SHA1' : DSANSEC3SHA1,
61 'RSASHA1NSEC3SHA1' : RSASHA1NSEC3SHA1,
62 'RSASHA256' : RSASHA256,
63 'RSASHA512' : RSASHA512,
64 'INDIRECT' : INDIRECT,
65 'ECDSAP256SHA256' : ECDSAP256SHA256,
66 'ECDSAP384SHA384' : ECDSAP384SHA384,
67 'PRIVATEDNS' : PRIVATEDNS,
68 'PRIVATEOID' : PRIVATEOID,
69 }
70
71
72
73
74
75 _algorithm_by_value = dict([(y, x) for x, y in _algorithm_by_text.iteritems()])
76
78 """Convert text into a DNSSEC algorithm value
79 @rtype: int"""
80
81 value = _algorithm_by_text.get(text.upper())
82 if value is None:
83 value = int(text)
84 return value
85
87 """Convert a DNSSEC algorithm value to text
88 @rtype: string"""
89
90 text = _algorithm_by_value.get(value)
91 if text is None:
92 text = str(value)
93 return text
94
99
112
113 -def make_ds(name, key, algorithm, origin=None):
114 if algorithm.upper() == 'SHA1':
115 dsalg = 1
116 hash = dns.hash.get('SHA1')()
117 elif algorithm.upper() == 'SHA256':
118 dsalg = 2
119 hash = dns.hash.get('SHA256')()
120 else:
121 raise UnsupportedAlgorithm, 'unsupported algorithm "%s"' % algorithm
122
123 if isinstance(name, (str, unicode)):
124 name = dns.name.from_text(name, origin)
125 hash.update(name.canonicalize().to_wire())
126 hash.update(_to_rdata(key, origin))
127 digest = hash.digest()
128
129 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
130 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
131 len(dsrdata))
132
151
156
159
162
165
169
172
175
178
191
193 if _is_md5(algorithm):
194 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
195 elif _is_sha1(algorithm):
196 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
197 elif _is_sha256(algorithm):
198 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
199 elif _is_sha512(algorithm):
200 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
201 else:
202 raise ValidationFailure, 'unknown algorithm %u' % algorithm
203 olen = len(oid)
204 dlen = _make_hash(algorithm).digest_size
205 idbytes = [0x30] + [8 + olen + dlen] + \
206 [0x30, olen + 4] + [0x06, olen] + oid + \
207 [0x05, 0x00] + [0x04, dlen]
208 return ''.join(map(chr, idbytes))
209
211 """Validate an RRset against a single signature rdata
212
213 The owner name of the rrsig is assumed to be the same as the owner name
214 of the rrset.
215
216 @param rrset: The RRset to validate
217 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
218 tuple
219 @param rrsig: The signature rdata
220 @type rrsig: dns.rrset.Rdata
221 @param keys: The key dictionary.
222 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
223 @param origin: The origin to use for relative names
224 @type origin: dns.name.Name or None
225 @param now: The time to use when validating the signatures. The default
226 is the current time.
227 @type now: int
228 """
229
230 if isinstance(origin, (str, unicode)):
231 origin = dns.name.from_text(origin, dns.name.root)
232
233 for candidate_key in _find_candidate_keys(keys, rrsig):
234 if not candidate_key:
235 raise ValidationFailure, 'unknown key'
236
237
238
239 if isinstance(rrset, tuple):
240 rrname = rrset[0]
241 rdataset = rrset[1]
242 else:
243 rrname = rrset.name
244 rdataset = rrset
245
246 if now is None:
247 now = time.time()
248 if rrsig.expiration < now:
249 raise ValidationFailure, 'expired'
250 if rrsig.inception > now:
251 raise ValidationFailure, 'not yet valid'
252
253 hash = _make_hash(rrsig.algorithm)
254
255 if _is_rsa(rrsig.algorithm):
256 keyptr = candidate_key.key
257 (bytes,) = struct.unpack('!B', keyptr[0:1])
258 keyptr = keyptr[1:]
259 if bytes == 0:
260 (bytes,) = struct.unpack('!H', keyptr[0:2])
261 keyptr = keyptr[2:]
262 rsa_e = keyptr[0:bytes]
263 rsa_n = keyptr[bytes:]
264 keylen = len(rsa_n) * 8
265 pubkey = Crypto.PublicKey.RSA.construct(
266 (Crypto.Util.number.bytes_to_long(rsa_n),
267 Crypto.Util.number.bytes_to_long(rsa_e)))
268 sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
269 elif _is_dsa(rrsig.algorithm):
270 keyptr = candidate_key.key
271 (t,) = struct.unpack('!B', keyptr[0:1])
272 keyptr = keyptr[1:]
273 octets = 64 + t * 8
274 dsa_q = keyptr[0:20]
275 keyptr = keyptr[20:]
276 dsa_p = keyptr[0:octets]
277 keyptr = keyptr[octets:]
278 dsa_g = keyptr[0:octets]
279 keyptr = keyptr[octets:]
280 dsa_y = keyptr[0:octets]
281 pubkey = Crypto.PublicKey.DSA.construct(
282 (Crypto.Util.number.bytes_to_long(dsa_y),
283 Crypto.Util.number.bytes_to_long(dsa_g),
284 Crypto.Util.number.bytes_to_long(dsa_p),
285 Crypto.Util.number.bytes_to_long(dsa_q)))
286 (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
287 sig = (Crypto.Util.number.bytes_to_long(dsa_r),
288 Crypto.Util.number.bytes_to_long(dsa_s))
289 elif _is_ecdsa(rrsig.algorithm):
290 if rrsig.algorithm == ECDSAP256SHA256:
291 curve = ecdsa.curves.NIST256p
292 key_len = 32
293 digest_len = 32
294 elif rrsig.algorithm == ECDSAP384SHA384:
295 curve = ecdsa.curves.NIST384p
296 key_len = 48
297 digest_len = 48
298 else:
299
300 raise ValidationFailure, 'unknown ECDSA curve'
301 keyptr = candidate_key.key
302 x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len])
303 y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2])
304 assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
305 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
306 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
307 curve)
308 pubkey = ECKeyWrapper(verifying_key, key_len)
309 r = rrsig.signature[:key_len]
310 s = rrsig.signature[key_len:]
311 sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r),
312 Crypto.Util.number.bytes_to_long(s))
313 else:
314 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
315
316 hash.update(_to_rdata(rrsig, origin)[:18])
317 hash.update(rrsig.signer.to_digestable(origin))
318
319 if rrsig.labels < len(rrname) - 1:
320 suffix = rrname.split(rrsig.labels + 1)[1]
321 rrname = dns.name.from_text('*', suffix)
322 rrnamebuf = rrname.to_digestable(origin)
323 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
324 rrsig.original_ttl)
325 rrlist = sorted(rdataset);
326 for rr in rrlist:
327 hash.update(rrnamebuf)
328 hash.update(rrfixed)
329 rrdata = rr.to_digestable(origin)
330 rrlen = struct.pack('!H', len(rrdata))
331 hash.update(rrlen)
332 hash.update(rrdata)
333
334 digest = hash.digest()
335
336 if _is_rsa(rrsig.algorithm):
337
338 digest = _make_algorithm_id(rrsig.algorithm) + digest
339 padlen = keylen // 8 - len(digest) - 3
340 digest = chr(0) + chr(1) + chr(0xFF) * padlen + chr(0) + digest
341 elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
342 pass
343 else:
344
345
346
347 raise ValidationFailure, 'unknown algorithm %u' % rrsig.algorithm
348
349 if pubkey.verify(digest, sig):
350 return
351 raise ValidationFailure, 'verify failure'
352
353 -def _validate(rrset, rrsigset, keys, origin=None, now=None):
354 """Validate an RRset
355
356 @param rrset: The RRset to validate
357 @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
358 tuple
359 @param rrsigset: The signature RRset
360 @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
361 tuple
362 @param keys: The key dictionary.
363 @type keys: a dictionary keyed by dns.name.Name with node or rdataset values
364 @param origin: The origin to use for relative names
365 @type origin: dns.name.Name or None
366 @param now: The time to use when validating the signatures. The default
367 is the current time.
368 @type now: int
369 """
370
371 if isinstance(origin, (str, unicode)):
372 origin = dns.name.from_text(origin, dns.name.root)
373
374 if isinstance(rrset, tuple):
375 rrname = rrset[0]
376 else:
377 rrname = rrset.name
378
379 if isinstance(rrsigset, tuple):
380 rrsigname = rrsigset[0]
381 rrsigrdataset = rrsigset[1]
382 else:
383 rrsigname = rrsigset.name
384 rrsigrdataset = rrsigset
385
386 rrname = rrname.choose_relativity(origin)
387 rrsigname = rrname.choose_relativity(origin)
388 if rrname != rrsigname:
389 raise ValidationFailure, "owner names do not match"
390
391 for rrsig in rrsigrdataset:
392 try:
393 _validate_rrsig(rrset, rrsig, keys, origin, now)
394 return
395 except ValidationFailure, e:
396 pass
397 raise ValidationFailure, "no RRSIGs validated"
398
400 raise NotImplementedError, "DNSSEC validation requires pycrypto"
401
402 try:
403 import Crypto.PublicKey.RSA
404 import Crypto.PublicKey.DSA
405 import Crypto.Util.number
406 validate = _validate
407 validate_rrsig = _validate_rrsig
408 _have_pycrypto = True
409 except ImportError:
410 validate = _need_pycrypto
411 validate_rrsig = _need_pycrypto
412 _have_pycrypto = False
413
414 try:
415 import ecdsa
416 import ecdsa.ecdsa
417 import ecdsa.ellipticcurve
418 import ecdsa.keys
419 _have_ecdsa = True
420
423 self.key = key
424 self.key_len = key_len
425 - def verify(self, digest, sig):
426 diglong = Crypto.Util.number.bytes_to_long(digest)
427 return self.key.pubkey.verifies(diglong, sig)
428
429 except ImportError:
430 _have_ecdsa = False
431