1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """DNS rdata."""
19
20 from io import BytesIO
21 import base64
22 import binascii
23
24 import dns.exception
25 import dns.name
26 import dns.rdataclass
27 import dns.rdatatype
28 import dns.tokenizer
29 import dns.wiredata
30 from ._compat import xrange, string_types, text_type
31
32 try:
33 import threading as _threading
34 except ImportError:
35 import dummy_threading as _threading
36
37 _hex_chunksize = 32
41 """Convert a binary string into its hex encoding, broken up into chunks
42 of chunksize characters separated by a space.
43 """
44
45 line = binascii.hexlify(data)
46 return b' '.join([line[i:i + chunksize]
47 for i
48 in range(0, len(line), chunksize)]).decode()
49
50 _base64_chunksize = 32
54 """Convert a binary string into its base64 encoding, broken up into chunks
55 of chunksize characters separated by a space.
56 """
57
58 line = base64.b64encode(data)
59 return b' '.join([line[i:i + chunksize]
60 for i
61 in range(0, len(line), chunksize)]).decode()
62
63 __escaped = bytearray(b'"\\')
66 """Escape the characters in a quoted string which need it."""
67
68 if isinstance(qstring, text_type):
69 qstring = qstring.encode()
70 if not isinstance(qstring, bytearray):
71 qstring = bytearray(qstring)
72
73 text = ''
74 for c in qstring:
75 if c in __escaped:
76 text += '\\' + chr(c)
77 elif c >= 0x20 and c < 0x7F:
78 text += chr(c)
79 else:
80 text += '\\%03d' % c
81 return text
82
85 """Determine the index of greatest byte that isn't all zeros, and
86 return the bitmap that contains all the bytes less than that index.
87 """
88
89 for i in xrange(len(what) - 1, -1, -1):
90 if what[i] != 0:
91 return what[0: i + 1]
92 return what[0:1]
93
96 """Base class for all DNS rdata types."""
97
98 __slots__ = ['rdclass', 'rdtype']
99
101 """Initialize an rdata.
102
103 *rdclass*, an ``int`` is the rdataclass of the Rdata.
104 *rdtype*, an ``int`` is the rdatatype of the Rdata.
105 """
106
107 self.rdclass = rdclass
108 self.rdtype = rdtype
109
111 """Return the type a Rdata covers.
112
113 DNS SIG/RRSIG rdatas apply to a specific type; this type is
114 returned by the covers() function. If the rdata type is not
115 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
116 creating rdatasets, allowing the rdataset to contain only RRSIGs
117 of a particular type, e.g. RRSIG(NS).
118
119 Returns an ``int``.
120 """
121
122 return dns.rdatatype.NONE
123
125 """Return a 32-bit type value, the least significant 16 bits of
126 which are the ordinary DNS type, and the upper 16 bits of which are
127 the "covered" type, if any.
128
129 Returns an ``int``.
130 """
131
132 return self.covers() << 16 | self.rdtype
133
134 - def to_text(self, origin=None, relativize=True, **kw):
135 """Convert an rdata to text format.
136
137 Returns a ``text``.
138 """
139
140 raise NotImplementedError
141
142 - def to_wire(self, file, compress=None, origin=None):
143 """Convert an rdata to wire format.
144
145 Returns a ``binary``.
146 """
147
148 raise NotImplementedError
149
151 """Convert rdata to a format suitable for digesting in hashes. This
152 is also the DNSSEC canonical form.
153
154 Returns a ``binary``.
155 """
156
157 f = BytesIO()
158 self.to_wire(f, None, origin)
159 return f.getvalue()
160
162 """Check that the current contents of the rdata's fields are
163 valid.
164
165 If you change an rdata by assigning to its fields,
166 it is a good idea to call validate() when you are done making
167 changes.
168
169 Raises various exceptions if there are problems.
170
171 Returns ``None``.
172 """
173
174 dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
175
185
188
189 - def _cmp(self, other):
190 """Compare an rdata with another rdata of the same rdtype and
191 rdclass.
192
193 Return < 0 if self < other in the DNSSEC ordering, 0 if self
194 == other, and > 0 if self > other.
195
196 """
197 our = self.to_digestable(dns.name.root)
198 their = other.to_digestable(dns.name.root)
199 if our == their:
200 return 0
201 elif our > their:
202 return 1
203 else:
204 return -1
205
212
219
226
232
238
244
247
248 @classmethod
249 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
250 raise NotImplementedError
251
252 @classmethod
253 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
254 raise NotImplementedError
255
257 """Convert any domain names in the rdata to the specified
258 relativization.
259 """
260
262
263 """Generic Rdata Class
264
265 This class is used for rdata types for which we have no better
266 implementation. It implements the DNS "unknown RRs" scheme.
267 """
268
269 __slots__ = ['data']
270
271 - def __init__(self, rdclass, rdtype, data):
274
275 - def to_text(self, origin=None, relativize=True, **kw):
276 return r'\# %d ' % len(self.data) + _hexify(self.data)
277
278 @classmethod
279 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
280 token = tok.get()
281 if not token.is_identifier() or token.value != r'\#':
282 raise dns.exception.SyntaxError(
283 r'generic rdata does not start with \#')
284 length = tok.get_int()
285 chunks = []
286 while 1:
287 token = tok.get()
288 if token.is_eol_or_eof():
289 break
290 chunks.append(token.value.encode())
291 hex = b''.join(chunks)
292 data = binascii.unhexlify(hex)
293 if len(data) != length:
294 raise dns.exception.SyntaxError(
295 'generic rdata hex data has wrong length')
296 return cls(rdclass, rdtype, data)
297
298 - def to_wire(self, file, compress=None, origin=None):
299 file.write(self.data)
300
301 @classmethod
302 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
304
305 _rdata_modules = {}
306 _module_prefix = 'dns.rdtypes'
307 _import_lock = _threading.Lock()
310
311 def import_module(name):
312 with _import_lock:
313 mod = __import__(name)
314 components = name.split('.')
315 for comp in components[1:]:
316 mod = getattr(mod, comp)
317 return mod
318
319 mod = _rdata_modules.get((rdclass, rdtype))
320 rdclass_text = dns.rdataclass.to_text(rdclass)
321 rdtype_text = dns.rdatatype.to_text(rdtype)
322 rdtype_text = rdtype_text.replace('-', '_')
323 if not mod:
324 mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype))
325 if not mod:
326 try:
327 mod = import_module('.'.join([_module_prefix,
328 rdclass_text, rdtype_text]))
329 _rdata_modules[(rdclass, rdtype)] = mod
330 except ImportError:
331 try:
332 mod = import_module('.'.join([_module_prefix,
333 'ANY', rdtype_text]))
334 _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod
335 except ImportError:
336 mod = None
337 if mod:
338 cls = getattr(mod, rdtype_text)
339 else:
340 cls = GenericRdata
341 return cls
342
343
344 -def from_text(rdclass, rdtype, tok, origin=None, relativize=True):
345 """Build an rdata object from text format.
346
347 This function attempts to dynamically load a class which
348 implements the specified rdata class and type. If there is no
349 class-and-type-specific implementation, the GenericRdata class
350 is used.
351
352 Once a class is chosen, its from_text() class method is called
353 with the parameters to this function.
354
355 If *tok* is a ``text``, then a tokenizer is created and the string
356 is used as its input.
357
358 *rdclass*, an ``int``, the rdataclass.
359
360 *rdtype*, an ``int``, the rdatatype.
361
362 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``text``.
363
364 *origin*, a ``dns.name.Name`` (or ``None``), the
365 origin to use for relative names.
366
367 *relativize*, a ``bool``. If true, name will be relativized to
368 the specified origin.
369
370 Returns an instance of the chosen Rdata subclass.
371 """
372
373 if isinstance(tok, string_types):
374 tok = dns.tokenizer.Tokenizer(tok)
375 cls = get_rdata_class(rdclass, rdtype)
376 if cls != GenericRdata:
377
378 token = tok.get()
379 tok.unget(token)
380 if token.is_identifier() and \
381 token.value == r'\#':
382
383
384
385
386
387 rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin,
388 relativize)
389 return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data),
390 origin)
391 return cls.from_text(rdclass, rdtype, tok, origin, relativize)
392
393
394 -def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None):
395 """Build an rdata object from wire format
396
397 This function attempts to dynamically load a class which
398 implements the specified rdata class and type. If there is no
399 class-and-type-specific implementation, the GenericRdata class
400 is used.
401
402 Once a class is chosen, its from_wire() class method is called
403 with the parameters to this function.
404
405 *rdclass*, an ``int``, the rdataclass.
406
407 *rdtype*, an ``int``, the rdatatype.
408
409 *wire*, a ``binary``, the wire-format message.
410
411 *current*, an ``int``, the offset in wire of the beginning of
412 the rdata.
413
414 *rdlen*, an ``int``, the length of the wire-format rdata
415
416 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
417 then names will be relativized to this origin.
418
419 Returns an instance of the chosen Rdata subclass.
420 """
421
422 wire = dns.wiredata.maybe_wrap(wire)
423 cls = get_rdata_class(rdclass, rdtype)
424 return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)
425
428 """DNS rdatatype already exists."""
429 supp_kwargs = {'rdclass', 'rdtype'}
430 fmt = "The rdata type with class {rdclass} and rdtype {rdtype} " + \
431 "already exists."
432
436 """Dynamically register a module to handle an rdatatype.
437
438 *implementation*, a module implementing the type in the usual dnspython
439 way.
440
441 *rdtype*, an ``int``, the rdatatype to register.
442
443 *rdtype_text*, a ``text``, the textual form of the rdatatype.
444
445 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
446 RRsets of the type can have only one member.)
447
448 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
449 it applies to all classes.
450 """
451
452 existing_cls = get_rdata_class(rdclass, rdtype)
453 if existing_cls != GenericRdata:
454 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
455 _rdata_modules[(rdclass, rdtype)] = implementation
456 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)
457