Package dns :: Package rdtypes :: Package IN :: Module APL
[hide private]
[frames] | no frames]

Source Code for Module dns.rdtypes.IN.APL

  1  # Copyright (C) 2003-2007, 2009-2011 Nominum, Inc. 
  2  # 
  3  # Permission to use, copy, modify, and distribute this software and its 
  4  # documentation for any purpose with or without fee is hereby granted, 
  5  # provided that the above copyright notice and this permission notice 
  6  # appear in all copies. 
  7  # 
  8  # THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 
  9  # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 
 10  # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 
 11  # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 
 12  # WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 
 13  # ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 
 14  # OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 
 15   
 16  import struct 
 17  import binascii 
 18   
 19  import dns.exception 
 20  import dns.inet 
 21  import dns.rdata 
 22  import dns.tokenizer 
 23  from dns._compat import xrange 
24 25 26 -class APLItem(object):
27 28 """An APL list item. 29 30 @ivar family: the address family (IANA address family registry) 31 @type family: int 32 @ivar negation: is this item negated? 33 @type negation: bool 34 @ivar address: the address 35 @type address: string 36 @ivar prefix: the prefix length 37 @type prefix: int 38 """ 39 40 __slots__ = ['family', 'negation', 'address', 'prefix'] 41
42 - def __init__(self, family, negation, address, prefix):
43 self.family = family 44 self.negation = negation 45 self.address = address 46 self.prefix = prefix
47
48 - def __str__(self):
49 if self.negation: 50 return "!%d:%s/%s" % (self.family, self.address, self.prefix) 51 else: 52 return "%d:%s/%s" % (self.family, self.address, self.prefix)
53
54 - def to_wire(self, file):
55 if self.family == 1: 56 address = dns.inet.inet_pton(dns.inet.AF_INET, self.address) 57 elif self.family == 2: 58 address = dns.inet.inet_pton(dns.inet.AF_INET6, self.address) 59 else: 60 address = binascii.unhexlify(self.address) 61 # 62 # Truncate least significant zero bytes. 63 # 64 last = 0 65 for i in xrange(len(address) - 1, -1, -1): 66 if address[i] != chr(0): 67 last = i + 1 68 break 69 address = address[0: last] 70 l = len(address) 71 assert l < 128 72 if self.negation: 73 l |= 0x80 74 header = struct.pack('!HBB', self.family, self.prefix, l) 75 file.write(header) 76 file.write(address)
77
78 79 -class APL(dns.rdata.Rdata):
80 81 """APL record. 82 83 @ivar items: a list of APL items 84 @type items: list of APL_Item 85 @see: RFC 3123""" 86 87 __slots__ = ['items'] 88
89 - def __init__(self, rdclass, rdtype, items):
90 super(APL, self).__init__(rdclass, rdtype) 91 self.items = items
92
93 - def to_text(self, origin=None, relativize=True, **kw):
94 return ' '.join(map(str, self.items))
95 96 @classmethod
97 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
98 items = [] 99 while 1: 100 token = tok.get().unescape() 101 if token.is_eol_or_eof(): 102 break 103 item = token.value 104 if item[0] == '!': 105 negation = True 106 item = item[1:] 107 else: 108 negation = False 109 (family, rest) = item.split(':', 1) 110 family = int(family) 111 (address, prefix) = rest.split('/', 1) 112 prefix = int(prefix) 113 item = APLItem(family, negation, address, prefix) 114 items.append(item) 115 116 return cls(rdclass, rdtype, items)
117
118 - def to_wire(self, file, compress=None, origin=None):
119 for item in self.items: 120 item.to_wire(file)
121 122 @classmethod
123 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
124 items = [] 125 while 1: 126 if rdlen == 0: 127 break 128 if rdlen < 4: 129 raise dns.exception.FormError 130 header = struct.unpack('!HBB', wire[current: current + 4]) 131 afdlen = header[2] 132 if afdlen > 127: 133 negation = True 134 afdlen -= 128 135 else: 136 negation = False 137 current += 4 138 rdlen -= 4 139 if rdlen < afdlen: 140 raise dns.exception.FormError 141 address = wire[current: current + afdlen].unwrap() 142 l = len(address) 143 if header[0] == 1: 144 if l < 4: 145 address += '\x00' * (4 - l) 146 address = dns.inet.inet_ntop(dns.inet.AF_INET, address) 147 elif header[0] == 2: 148 if l < 16: 149 address += '\x00' * (16 - l) 150 address = dns.inet.inet_ntop(dns.inet.AF_INET6, address) 151 else: 152 # 153 # This isn't really right according to the RFC, but it 154 # seems better than throwing an exception 155 # 156 address = address.encode('hex_codec') 157 current += afdlen 158 rdlen -= afdlen 159 item = APLItem(header[0], negation, address, header[1]) 160 items.append(item) 161 return cls(rdclass, rdtype, items)
162