Package dns :: Module rdata
[hide private]
[frames] | no frames]

Source Code for Module dns.rdata

  1  # Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  """DNS rdata. 
 17   
 18  @var _rdata_modules: A dictionary mapping a (rdclass, rdtype) tuple to 
 19  the module which implements that type. 
 20  @type _rdata_modules: dict 
 21  @var _module_prefix: The prefix to use when forming modules names.  The 
 22  default is 'dns.rdtypes'.  Changing this value will break the library. 
 23  @type _module_prefix: string 
 24  @var _hex_chunk: At most this many octets that will be represented in each 
 25  chunk of hexstring that _hexify() produces before whitespace occurs. 
 26  @type _hex_chunk: int""" 
 27   
 28  from io import BytesIO 
 29  import base64 
 30  import binascii 
 31  import struct 
 32   
 33  import dns.exception 
 34  import dns.name 
 35  import dns.rdataclass 
 36  import dns.rdatatype 
 37  import dns.tokenizer 
 38  import dns.wiredata 
 39  from ._compat import xrange, string_types, text_type 
 40   
 41  _hex_chunksize = 32 
42 43 44 -def _hexify(data, chunksize=_hex_chunksize):
45 """Convert a binary string into its hex encoding, broken up into chunks 46 of I{chunksize} characters separated by a space. 47 48 @param data: the binary string 49 @type data: string 50 @param chunksize: the chunk size. Default is L{dns.rdata._hex_chunksize} 51 @rtype: string 52 """ 53 54 line = binascii.hexlify(data) 55 return b' '.join([line[i:i + chunksize] 56 for i 57 in range(0, len(line), chunksize)]).decode()
58 59 _base64_chunksize = 32
60 61 62 -def _base64ify(data, chunksize=_base64_chunksize):
63 """Convert a binary string into its base64 encoding, broken up into chunks 64 of I{chunksize} characters separated by a space. 65 66 @param data: the binary string 67 @type data: string 68 @param chunksize: the chunk size. Default is 69 L{dns.rdata._base64_chunksize} 70 @rtype: string 71 """ 72 73 line = base64.b64encode(data) 74 return b' '.join([line[i:i + chunksize] 75 for i 76 in range(0, len(line), chunksize)]).decode()
77 78 __escaped = { 79 '"': True, 80 '\\': True, 81 }
82 83 84 -def _escapify(qstring):
85 """Escape the characters in a quoted string which need it. 86 87 @param qstring: the string 88 @type qstring: string 89 @returns: the escaped string 90 @rtype: string 91 """ 92 93 if isinstance(qstring, text_type): 94 qstring = qstring.encode() 95 if not isinstance(qstring, bytearray): 96 qstring = bytearray(qstring) 97 98 text = '' 99 for c in qstring: 100 packed = struct.pack('!B', c).decode() 101 if packed in __escaped: 102 text += '\\' + packed 103 elif c >= 0x20 and c < 0x7F: 104 text += packed 105 else: 106 text += '\\%03d' % c 107 return text
108
109 110 -def _truncate_bitmap(what):
111 """Determine the index of greatest byte that isn't all zeros, and 112 return the bitmap that contains all the bytes less than that index. 113 114 @param what: a string of octets representing a bitmap. 115 @type what: string 116 @rtype: string 117 """ 118 119 for i in xrange(len(what) - 1, -1, -1): 120 if what[i] != 0: 121 break 122 return what[0: i + 1]
123
124 125 -class Rdata(object):
126 127 """Base class for all DNS rdata types. 128 """ 129 130 __slots__ = ['rdclass', 'rdtype'] 131
132 - def __init__(self, rdclass, rdtype):
133 """Initialize an rdata. 134 @param rdclass: The rdata class 135 @type rdclass: int 136 @param rdtype: The rdata type 137 @type rdtype: int 138 """ 139 140 self.rdclass = rdclass 141 self.rdtype = rdtype
142
143 - def covers(self):
144 """DNS SIG/RRSIG rdatas apply to a specific type; this type is 145 returned by the covers() function. If the rdata type is not 146 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 147 creating rdatasets, allowing the rdataset to contain only RRSIGs 148 of a particular type, e.g. RRSIG(NS). 149 @rtype: int 150 """ 151 152 return dns.rdatatype.NONE
153
154 - def extended_rdatatype(self):
155 """Return a 32-bit type value, the least significant 16 bits of 156 which are the ordinary DNS type, and the upper 16 bits of which are 157 the "covered" type, if any. 158 @rtype: int 159 """ 160 161 return self.covers() << 16 | self.rdtype
162
163 - def to_text(self, origin=None, relativize=True, **kw):
164 """Convert an rdata to text format. 165 @rtype: string 166 """ 167 raise NotImplementedError
168
169 - def to_wire(self, file, compress=None, origin=None):
170 """Convert an rdata to wire format. 171 @rtype: string 172 """ 173 174 raise NotImplementedError
175
176 - def to_digestable(self, origin=None):
177 """Convert rdata to a format suitable for digesting in hashes. This 178 is also the DNSSEC canonical form.""" 179 f = BytesIO() 180 self.to_wire(f, None, origin) 181 return f.getvalue()
182
183 - def validate(self):
184 """Check that the current contents of the rdata's fields are 185 valid. If you change an rdata by assigning to its fields, 186 it is a good idea to call validate() when you are done making 187 changes. 188 """ 189 dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
190
191 - def __repr__(self):
192 covers = self.covers() 193 if covers == dns.rdatatype.NONE: 194 ctext = '' 195 else: 196 ctext = '(' + dns.rdatatype.to_text(covers) + ')' 197 return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \ 198 dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \ 199 str(self) + '>'
200
201 - def __str__(self):
202 return self.to_text()
203
204 - def _cmp(self, other):
205 """Compare an rdata with another rdata of the same rdtype and 206 rdclass. Return < 0 if self < other in the DNSSEC ordering, 207 0 if self == other, and > 0 if self > other. 208 """ 209 our = self.to_digestable(dns.name.root) 210 their = other.to_digestable(dns.name.root) 211 if our == their: 212 return 0 213 if our > their: 214 return 1 215 216 return -1
217
218 - def __eq__(self, other):
219 if not isinstance(other, Rdata): 220 return False 221 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 222 return False 223 return self._cmp(other) == 0
224
225 - def __ne__(self, other):
226 if not isinstance(other, Rdata): 227 return True 228 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 229 return True 230 return self._cmp(other) != 0
231
232 - def __lt__(self, other):
233 if not isinstance(other, Rdata) or \ 234 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 235 236 return NotImplemented 237 return self._cmp(other) < 0
238
239 - def __le__(self, other):
240 if not isinstance(other, Rdata) or \ 241 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 242 return NotImplemented 243 return self._cmp(other) <= 0
244
245 - def __ge__(self, other):
246 if not isinstance(other, Rdata) or \ 247 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 248 return NotImplemented 249 return self._cmp(other) >= 0
250
251 - def __gt__(self, other):
252 if not isinstance(other, Rdata) or \ 253 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 254 return NotImplemented 255 return self._cmp(other) > 0
256
257 - def __hash__(self):
258 return hash(self.to_digestable(dns.name.root))
259 260 @classmethod
261 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
262 """Build an rdata object from text format. 263 264 @param rdclass: The rdata class 265 @type rdclass: int 266 @param rdtype: The rdata type 267 @type rdtype: int 268 @param tok: The tokenizer 269 @type tok: dns.tokenizer.Tokenizer 270 @param origin: The origin to use for relative names 271 @type origin: dns.name.Name 272 @param relativize: should names be relativized? 273 @type relativize: bool 274 @rtype: dns.rdata.Rdata instance 275 """ 276 277 raise NotImplementedError
278 279 @classmethod
280 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
281 """Build an rdata object from wire format 282 283 @param rdclass: The rdata class 284 @type rdclass: int 285 @param rdtype: The rdata type 286 @type rdtype: int 287 @param wire: The wire-format message 288 @type wire: string 289 @param current: The offset in wire of the beginning of the rdata. 290 @type current: int 291 @param rdlen: The length of the wire-format rdata 292 @type rdlen: int 293 @param origin: The origin to use for relative names 294 @type origin: dns.name.Name 295 @rtype: dns.rdata.Rdata instance 296 """ 297 298 raise NotImplementedError
299
300 - def choose_relativity(self, origin=None, relativize=True):
301 """Convert any domain names in the rdata to the specified 302 relativization. 303 """ 304 305 pass
306
307 308 -class GenericRdata(Rdata):
309 310 """Generate Rdata Class 311 312 This class is used for rdata types for which we have no better 313 implementation. It implements the DNS "unknown RRs" scheme. 314 """ 315 316 __slots__ = ['data'] 317
318 - def __init__(self, rdclass, rdtype, data):
319 super(GenericRdata, self).__init__(rdclass, rdtype) 320 self.data = data
321
322 - def to_text(self, origin=None, relativize=True, **kw):
323 return r'\# %d ' % len(self.data) + _hexify(self.data)
324 325 @classmethod
326 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
327 token = tok.get() 328 if not token.is_identifier() or token.value != '\#': 329 raise dns.exception.SyntaxError( 330 r'generic rdata does not start with \#') 331 length = tok.get_int() 332 chunks = [] 333 while 1: 334 token = tok.get() 335 if token.is_eol_or_eof(): 336 break 337 chunks.append(token.value.encode()) 338 hex = b''.join(chunks) 339 data = binascii.unhexlify(hex) 340 if len(data) != length: 341 raise dns.exception.SyntaxError( 342 'generic rdata hex data has wrong length') 343 return cls(rdclass, rdtype, data)
344
345 - def to_wire(self, file, compress=None, origin=None):
346 file.write(self.data)
347 348 @classmethod
349 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
350 return cls(rdclass, rdtype, wire[current: current + rdlen])
351 352 _rdata_modules = {} 353 _module_prefix = 'dns.rdtypes'
354 355 356 -def get_rdata_class(rdclass, rdtype):
357 358 def import_module(name): 359 mod = __import__(name) 360 components = name.split('.') 361 for comp in components[1:]: 362 mod = getattr(mod, comp) 363 return mod
364 365 mod = _rdata_modules.get((rdclass, rdtype)) 366 rdclass_text = dns.rdataclass.to_text(rdclass) 367 rdtype_text = dns.rdatatype.to_text(rdtype) 368 rdtype_text = rdtype_text.replace('-', '_') 369 if not mod: 370 mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype)) 371 if not mod: 372 try: 373 mod = import_module('.'.join([_module_prefix, 374 rdclass_text, rdtype_text])) 375 _rdata_modules[(rdclass, rdtype)] = mod 376 except ImportError: 377 try: 378 mod = import_module('.'.join([_module_prefix, 379 'ANY', rdtype_text])) 380 _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod 381 except ImportError: 382 mod = None 383 if mod: 384 cls = getattr(mod, rdtype_text) 385 else: 386 cls = GenericRdata 387 return cls 388
389 390 -def from_text(rdclass, rdtype, tok, origin=None, relativize=True):
391 """Build an rdata object from text format. 392 393 This function attempts to dynamically load a class which 394 implements the specified rdata class and type. If there is no 395 class-and-type-specific implementation, the GenericRdata class 396 is used. 397 398 Once a class is chosen, its from_text() class method is called 399 with the parameters to this function. 400 401 If I{tok} is a string, then a tokenizer is created and the string 402 is used as its input. 403 404 @param rdclass: The rdata class 405 @type rdclass: int 406 @param rdtype: The rdata type 407 @type rdtype: int 408 @param tok: The tokenizer or input text 409 @type tok: dns.tokenizer.Tokenizer or string 410 @param origin: The origin to use for relative names 411 @type origin: dns.name.Name 412 @param relativize: Should names be relativized? 413 @type relativize: bool 414 @rtype: dns.rdata.Rdata instance""" 415 416 if isinstance(tok, string_types): 417 tok = dns.tokenizer.Tokenizer(tok) 418 cls = get_rdata_class(rdclass, rdtype) 419 if cls != GenericRdata: 420 # peek at first token 421 token = tok.get() 422 tok.unget(token) 423 if token.is_identifier() and \ 424 token.value == r'\#': 425 # 426 # Known type using the generic syntax. Extract the 427 # wire form from the generic syntax, and then run 428 # from_wire on it. 429 # 430 rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin, 431 relativize) 432 return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data), 433 origin) 434 return cls.from_text(rdclass, rdtype, tok, origin, relativize)
435
436 437 -def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None):
438 """Build an rdata object from wire format 439 440 This function attempts to dynamically load a class which 441 implements the specified rdata class and type. If there is no 442 class-and-type-specific implementation, the GenericRdata class 443 is used. 444 445 Once a class is chosen, its from_wire() class method is called 446 with the parameters to this function. 447 448 @param rdclass: The rdata class 449 @type rdclass: int 450 @param rdtype: The rdata type 451 @type rdtype: int 452 @param wire: The wire-format message 453 @type wire: string 454 @param current: The offset in wire of the beginning of the rdata. 455 @type current: int 456 @param rdlen: The length of the wire-format rdata 457 @type rdlen: int 458 @param origin: The origin to use for relative names 459 @type origin: dns.name.Name 460 @rtype: dns.rdata.Rdata instance""" 461 462 wire = dns.wiredata.maybe_wrap(wire) 463 cls = get_rdata_class(rdclass, rdtype) 464 return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin)
465