Package dns :: Module rdata
[hide private]
[frames] | no frames]

Source Code for Module dns.rdata

  1  # Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 
  2   
  3  # Copyright (C) 2001-2017 Nominum, Inc. 
  4  # 
  5  # Permission to use, copy, modify, and distribute this software and its 
  6  # documentation for any purpose with or without fee is hereby granted, 
  7  # provided that the above copyright notice and this permission notice 
  8  # appear in all copies. 
  9  # 
 10  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
 11  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 12  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 13  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 14  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 15  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 16  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 17   
 18  """DNS rdata.""" 
 19   
 20  from io import BytesIO 
 21  import base64 
 22  import binascii 
 23   
 24  import dns.exception 
 25  import dns.name 
 26  import dns.rdataclass 
 27  import dns.rdatatype 
 28  import dns.tokenizer 
 29  import dns.wiredata 
 30  from ._compat import xrange, string_types, text_type 
 31   
 32  try: 
 33      import threading as _threading 
 34  except ImportError: 
 35      import dummy_threading as _threading 
 36   
 37  _hex_chunksize = 32 
38 39 40 -def _hexify(data, chunksize=_hex_chunksize):
41 """Convert a binary string into its hex encoding, broken up into chunks 42 of chunksize characters separated by a space. 43 """ 44 45 line = binascii.hexlify(data) 46 return b' '.join([line[i:i + chunksize] 47 for i 48 in range(0, len(line), chunksize)]).decode()
49 50 _base64_chunksize = 32
51 52 53 -def _base64ify(data, chunksize=_base64_chunksize):
54 """Convert a binary string into its base64 encoding, broken up into chunks 55 of chunksize characters separated by a space. 56 """ 57 58 line = base64.b64encode(data) 59 return b' '.join([line[i:i + chunksize] 60 for i 61 in range(0, len(line), chunksize)]).decode()
62 63 __escaped = bytearray(b'"\\')
64 65 -def _escapify(qstring):
66 """Escape the characters in a quoted string which need it.""" 67 68 if isinstance(qstring, text_type): 69 qstring = qstring.encode() 70 if not isinstance(qstring, bytearray): 71 qstring = bytearray(qstring) 72 73 text = '' 74 for c in qstring: 75 if c in __escaped: 76 text += '\\' + chr(c) 77 elif c >= 0x20 and c < 0x7F: 78 text += chr(c) 79 else: 80 text += '\\%03d' % c 81 return text
82
83 84 -def _truncate_bitmap(what):
85 """Determine the index of greatest byte that isn't all zeros, and 86 return the bitmap that contains all the bytes less than that index. 87 """ 88 89 for i in xrange(len(what) - 1, -1, -1): 90 if what[i] != 0: 91 return what[0: i + 1] 92 return what[0:1]
93
94 95 -class Rdata(object):
96 """Base class for all DNS rdata types.""" 97 98 __slots__ = ['rdclass', 'rdtype'] 99
100 - def __init__(self, rdclass, rdtype):
101 """Initialize an rdata. 102 103 *rdclass*, an ``int`` is the rdataclass of the Rdata. 104 *rdtype*, an ``int`` is the rdatatype of the Rdata. 105 """ 106 107 self.rdclass = rdclass 108 self.rdtype = rdtype
109
110 - def covers(self):
111 """Return the type a Rdata covers. 112 113 DNS SIG/RRSIG rdatas apply to a specific type; this type is 114 returned by the covers() function. If the rdata type is not 115 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 116 creating rdatasets, allowing the rdataset to contain only RRSIGs 117 of a particular type, e.g. RRSIG(NS). 118 119 Returns an ``int``. 120 """ 121 122 return dns.rdatatype.NONE
123
124 - def extended_rdatatype(self):
125 """Return a 32-bit type value, the least significant 16 bits of 126 which are the ordinary DNS type, and the upper 16 bits of which are 127 the "covered" type, if any. 128 129 Returns an ``int``. 130 """ 131 132 return self.covers() << 16 | self.rdtype
133
134 - def to_text(self, origin=None, relativize=True, **kw):
135 """Convert an rdata to text format. 136 137 Returns a ``text``. 138 """ 139 140 raise NotImplementedError
141
142 - def to_wire(self, file, compress=None, origin=None):
143 """Convert an rdata to wire format. 144 145 Returns a ``binary``. 146 """ 147 148 raise NotImplementedError
149
150 - def to_digestable(self, origin=None):
151 """Convert rdata to a format suitable for digesting in hashes. This 152 is also the DNSSEC canonical form. 153 154 Returns a ``binary``. 155 """ 156 157 f = BytesIO() 158 self.to_wire(f, None, origin) 159 return f.getvalue()
160
161 - def validate(self):
162 """Check that the current contents of the rdata's fields are 163 valid. 164 165 If you change an rdata by assigning to its fields, 166 it is a good idea to call validate() when you are done making 167 changes. 168 169 Raises various exceptions if there are problems. 170 171 Returns ``None``. 172 """ 173 174 dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
175
176 - def __repr__(self):
177 covers = self.covers() 178 if covers == dns.rdatatype.NONE: 179 ctext = '' 180 else: 181 ctext = '(' + dns.rdatatype.to_text(covers) + ')' 182 return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \ 183 dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \ 184 str(self) + '>'
185
186 - def __str__(self):
187 return self.to_text()
188
189 - def _cmp(self, other):
190 """Compare an rdata with another rdata of the same rdtype and 191 rdclass. 192 193 Return < 0 if self < other in the DNSSEC ordering, 0 if self 194 == other, and > 0 if self > other. 195 196 """ 197 our = self.to_digestable(dns.name.root) 198 their = other.to_digestable(dns.name.root) 199 if our == their: 200 return 0 201 elif our > their: 202 return 1 203 else: 204 return -1
205
206 - def __eq__(self, other):
207 if not isinstance(other, Rdata): 208 return False 209 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 210 return False 211 return self._cmp(other) == 0
212
213 - def __ne__(self, other):
214 if not isinstance(other, Rdata): 215 return True 216 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 217 return True 218 return self._cmp(other) != 0
219
220 - def __lt__(self, other):
221 if not isinstance(other, Rdata) or \ 222 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 223 224 return NotImplemented 225 return self._cmp(other) < 0
226
227 - def __le__(self, other):
228 if not isinstance(other, Rdata) or \ 229 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 230 return NotImplemented 231 return self._cmp(other) <= 0
232
233 - def __ge__(self, other):
234 if not isinstance(other, Rdata) or \ 235 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 236 return NotImplemented 237 return self._cmp(other) >= 0
238
239 - def __gt__(self, other):
240 if not isinstance(other, Rdata) or \ 241 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 242 return NotImplemented 243 return self._cmp(other) > 0
244
245 - def __hash__(self):
246 return hash(self.to_digestable(dns.name.root))
247 248 @classmethod
249 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
250 raise NotImplementedError
251 252 @classmethod
253 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
254 raise NotImplementedError
255
256 - def choose_relativity(self, origin=None, relativize=True):
257 """Convert any domain names in the rdata to the specified 258 relativization. 259 """
260
261 -class GenericRdata(Rdata):
262 263 """Generic Rdata Class 264 265 This class is used for rdata types for which we have no better 266 implementation. It implements the DNS "unknown RRs" scheme. 267 """ 268 269 __slots__ = ['data'] 270
271 - def __init__(self, rdclass, rdtype, data):
272 super(GenericRdata, self).__init__(rdclass, rdtype) 273 self.data = data
274
275 - def to_text(self, origin=None, relativize=True, **kw):
276 return r'\# %d ' % len(self.data) + _hexify(self.data)
277 278 @classmethod
279 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
280 token = tok.get() 281 if not token.is_identifier() or token.value != r'\#': 282 raise dns.exception.SyntaxError( 283 r'generic rdata does not start with \#') 284 length = tok.get_int() 285 chunks = [] 286 while 1: 287 token = tok.get() 288 if token.is_eol_or_eof(): 289 break 290 chunks.append(token.value.encode()) 291 hex = b''.join(chunks) 292 data = binascii.unhexlify(hex) 293 if len(data) != length: 294 raise dns.exception.SyntaxError( 295 'generic rdata hex data has wrong length') 296 return cls(rdclass, rdtype, data)
297
298 - def to_wire(self, file, compress=None, origin=None):
299 file.write(self.data)
300 301 @classmethod
302 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
303 return cls(rdclass, rdtype, wire[current: current + rdlen])
304 305 _rdata_modules = {} 306 _module_prefix = 'dns.rdtypes' 307 _import_lock = _threading.Lock()
308 309 -def get_rdata_class(rdclass, rdtype):
310 311 def import_module(name): 312 with _import_lock: 313 mod = __import__(name) 314 components = name.split('.') 315 for comp in components[1:]: 316 mod = getattr(mod, comp) 317 return mod
318 319 mod = _rdata_modules.get((rdclass, rdtype)) 320 rdclass_text = dns.rdataclass.to_text(rdclass) 321 rdtype_text = dns.rdatatype.to_text(rdtype) 322 rdtype_text = rdtype_text.replace('-', '_') 323 if not mod: 324 mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype)) 325 if not mod: 326 try: 327 mod = import_module('.'.join([_module_prefix, 328 rdclass_text, rdtype_text])) 329 _rdata_modules[(rdclass, rdtype)] = mod 330 except ImportError: 331 try: 332 mod = import_module('.'.join([_module_prefix, 333 'ANY', rdtype_text])) 334 _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod 335 except ImportError: 336 mod = None 337 if mod: 338 cls = getattr(mod, rdtype_text) 339 else: 340 cls = GenericRdata 341 return cls 342
343 344 -def from_text(rdclass, rdtype, tok, origin=None, relativize=True):
345 """Build an rdata object from text format. 346 347 This function attempts to dynamically load a class which 348 implements the specified rdata class and type. If there is no 349 class-and-type-specific implementation, the GenericRdata class 350 is used. 351 352 Once a class is chosen, its from_text() class method is called 353 with the parameters to this function. 354 355 If *tok* is a ``text``, then a tokenizer is created and the string 356 is used as its input. 357 358 *rdclass*, an ``int``, the rdataclass. 359 360 *rdtype*, an ``int``, the rdatatype. 361 362 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``text``. 363 364 *origin*, a ``dns.name.Name`` (or ``None``), the 365 origin to use for relative names. 366 367 *relativize*, a ``bool``. If true, name will be relativized to 368 the specified origin. 369 370 Returns an instance of the chosen Rdata subclass. 371 """ 372 373 if isinstance(tok, string_types): 374 tok = dns.tokenizer.Tokenizer(tok) 375 cls = get_rdata_class(rdclass, rdtype) 376 if cls != GenericRdata: 377 # peek at first token 378 token = tok.get() 379 tok.unget(token) 380 if token.is_identifier() and \ 381 token.value == r'\#': 382 # 383 # Known type using the generic syntax. Extract the 384 # wire form from the generic syntax, and then run 385 # from_wire on it. 386 # 387 rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin, 388 relativize) 389 return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data), 390 origin) 391 return cls.from_text(rdclass, rdtype, tok, origin, relativize)
392
393 394 -def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None):
395 """Build an rdata object from wire format 396 397 This function attempts to dynamically load a class which 398 implements the specified rdata class and type. If there is no 399 class-and-type-specific implementation, the GenericRdata class 400 is used. 401 402 Once a class is chosen, its from_wire() class method is called 403 with the parameters to this function. 404 405 *rdclass*, an ``int``, the rdataclass. 406 407 *rdtype*, an ``int``, the rdatatype. 408 409 *wire*, a ``binary``, the wire-format message. 410 411 *current*, an ``int``, the offset in wire of the beginning of 412 the rdata. 413 414 *rdlen*, an ``int``, the length of the wire-format rdata 415 416 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 417 then names will be relativized to this origin. 418 419 Returns an instance of the chosen Rdata subclass. 420 """ 421 422 wire = dns.wiredata.maybe_wrap(wire) 423 cls = get_rdata_class(rdclass, rdtype) 424 return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)
425
426 427 -class RdatatypeExists(dns.exception.DNSException):
428 """DNS rdatatype already exists.""" 429 supp_kwargs = {'rdclass', 'rdtype'} 430 fmt = "The rdata type with class {rdclass} and rdtype {rdtype} " + \ 431 "already exists."
432
433 434 -def register_type(implementation, rdtype, rdtype_text, is_singleton=False, 435 rdclass=dns.rdataclass.IN):
436 """Dynamically register a module to handle an rdatatype. 437 438 *implementation*, a module implementing the type in the usual dnspython 439 way. 440 441 *rdtype*, an ``int``, the rdatatype to register. 442 443 *rdtype_text*, a ``text``, the textual form of the rdatatype. 444 445 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. 446 RRsets of the type can have only one member.) 447 448 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if 449 it applies to all classes. 450 """ 451 452 existing_cls = get_rdata_class(rdclass, rdtype) 453 if existing_cls != GenericRdata: 454 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) 455 _rdata_modules[(rdclass, rdtype)] = implementation 456 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)
457