1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 import struct
17 import base64
18
19 import dns.exception
20 import dns.inet
21 import dns.name
25
26 """IPSECKEY record
27
28 @ivar precedence: the precedence for this key data
29 @type precedence: int
30 @ivar gateway_type: the gateway type
31 @type gateway_type: int
32 @ivar algorithm: the algorithm to use
33 @type algorithm: int
34 @ivar gateway: the public key
35 @type gateway: None, IPv4 address, IPV6 address, or domain name
36 @ivar key: the public key
37 @type key: string
38 @see: RFC 4025"""
39
40 __slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
41
42 - def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
43 gateway, key):
65
66 - def to_text(self, origin=None, relativize=True, **kw):
67 if self.gateway_type == 0:
68 gateway = '.'
69 elif self.gateway_type == 1:
70 gateway = self.gateway
71 elif self.gateway_type == 2:
72 gateway = self.gateway
73 elif self.gateway_type == 3:
74 gateway = str(self.gateway.choose_relativity(origin, relativize))
75 else:
76 raise ValueError('invalid gateway type')
77 return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
78 self.algorithm, gateway,
79 dns.rdata._base64ify(self.key))
80
81 @classmethod
82 - def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True):
83 precedence = tok.get_uint8()
84 gateway_type = tok.get_uint8()
85 algorithm = tok.get_uint8()
86 if gateway_type == 3:
87 gateway = tok.get_name().choose_relativity(origin, relativize)
88 else:
89 gateway = tok.get_string()
90 chunks = []
91 while 1:
92 t = tok.get().unescape()
93 if t.is_eol_or_eof():
94 break
95 if not t.is_identifier():
96 raise dns.exception.SyntaxError
97 chunks.append(t.value.encode())
98 b64 = b''.join(chunks)
99 key = base64.b64decode(b64)
100 return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
101 gateway, key)
102
103 - def to_wire(self, file, compress=None, origin=None):
118
119 @classmethod
120 - def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None):
121 if rdlen < 3:
122 raise dns.exception.FormError
123 header = struct.unpack('!BBB', wire[current: current + 3])
124 gateway_type = header[1]
125 current += 3
126 rdlen -= 3
127 if gateway_type == 0:
128 gateway = None
129 elif gateway_type == 1:
130 gateway = dns.inet.inet_ntop(dns.inet.AF_INET,
131 wire[current: current + 4])
132 current += 4
133 rdlen -= 4
134 elif gateway_type == 2:
135 gateway = dns.inet.inet_ntop(dns.inet.AF_INET6,
136 wire[current: current + 16])
137 current += 16
138 rdlen -= 16
139 elif gateway_type == 3:
140 (gateway, cused) = dns.name.from_wire(wire[: current + rdlen],
141 current)
142 current += cused
143 rdlen -= cused
144 else:
145 raise dns.exception.FormError('invalid IPSECKEY gateway type')
146 key = wire[current: current + rdlen].unwrap()
147 return cls(rdclass, rdtype, header[0], gateway_type, header[2],
148 gateway, key)
149