import struct
import six
from flextls.exception import NotEnoughData
[docs]class Field(object):
"""
Base class for all fields. Used to extract additional information.
:param String name: Name of the field
:param Mixed default: Default field value
:param String fmt: Format string used to decode the data
"""
def __init__(self, name, default, fmt="H"):
self._value = None
self.set_value(default)
self.name = name
if fmt[0] in "@=<>!":
self.fmt = fmt
else:
self.fmt = "!"+fmt
self.size = struct.calcsize(self.fmt)
[docs] def assemble(self):
"""
Assemble the field by using the given value.
:return: The assembled data
:rtype: bytes
"""
return struct.pack(self.fmt, self.value)
[docs] def dissect(self, data):
"""
Dissect the field.
:param bytes data: The data to extract the field value from
:return: The rest of the data not used to dissect the field value
:rtype: bytes
"""
if len(data) < self.size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
self.value = struct.unpack(self.fmt, data[:self.size])[0]
return data[self.size:]
[docs] def get_value(self):
"""
Return the field value.
:return: The value of the field
:rtype: Mixed
"""
return self._value
[docs] def set_value(self, value):
"""
Set the value of the field
:param Mixed value: The value
"""
self._value = value
value = property(get_value, set_value)
# Numbers
[docs]class UInt8Field(Field):
"""
Field representing an 8-bit unsigned integer value(range: 0 through 255 decimal).
"""
def __init__(self, name, default):
Field.__init__(self, name, default, "B")
[docs]class UInt16Field(Field):
"""
Field representing an 16-bit unsigned integer value(range: 0 through 65535 decimal).
"""
def __init__(self, name, default):
Field.__init__(self, name, default, "H")
[docs]class UInt24Field(Field):
"""
Field representing an 16-bit unsigned integer value.
"""
def __init__(self, name, default):
Field.__init__(self, name, default, "BH")
def assemble(self):
value = (int(self.value / (2**16)), int(self.value % (2**16)))
return struct.pack(self.fmt, *value)
def dissect(self, data):
if len(data) < self.size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
tmp = struct.unpack(self.fmt, data[:self.size])
self.value = (tmp[0] * (2 ** 16)) + tmp[1]
return data[self.size:]
[docs]class UInt48Field(Field):
"""
Field representing an 48-bit unsigned integer value.
"""
def __init__(self, name, default):
Field.__init__(self, name, default, "HI")
def assemble(self):
value = (int(self.value / (2**32)), int(self.value % (2**32)))
return struct.pack(self.fmt, *value)
def dissect(self, data):
if len(data) < self.size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
tmp = struct.unpack(self.fmt, data[:self.size])
self.value = (tmp[0] * (2 ** 32)) + tmp[1]
return data[self.size:]
[docs]class RandomField(Field):
"""
Random data.
"""
def __init__(self, name):
Field.__init__(self, name, default=b"A"*32, fmt="32s")
# Enums
[docs]class EnumField(Field):
"""
The field should only use the defined values.
:param String name: The name of the field
:param Mixed default: A value defined in the enums list
:param Dict enums: List of possible values.
:param String fmt: The format string
"""
def __init__(self, name, default, enums, fmt="H"):
self.enums = enums
Field.__init__(self, name, default, fmt)
[docs] def get_value_name(self, pretty=False):
"""
Get the name of the value
:param Boolean pretty: Return the name in a pretty format
:return: The name
:rtype: String
"""
if pretty:
return "%s (%x)" % (
self.enums.get(self._value, "n/a"),
self._value
)
return self.enums.get(self._value, "n/a")
[docs] def set_value(self, value, force=False):
"""
Set the value.
:param String|Integer value: The value to set. Must be in the enum list.
:param Boolean force: Set the value without checking it
:raises ValueError: If value name given but it isn't available
:raises TypeError: If value is not String or Integer
"""
if force:
self._value = value
return
if value is None:
self._value = value
return
if isinstance(value, six.integer_types):
self._value = value
return
if isinstance(value, six.string_types):
for v, n in self.enums.items():
if n == value:
self._value = v
return
raise ValueError("Unable to find value name in enum list")
raise TypeError(
"Value for '%s' must by of type String or Integer not '%s'" % (
self.name,
type(value)
)
)
value = property(Field.get_value, set_value)
[docs]class UInt8EnumField(EnumField):
"""
The field should only use the defined values. The value must be an 8-Bit unsigned integer.
:param String name: The name of the field
:param Mixed default: A value defined in the enums list
:param Dict enums: List of possible values.
"""
def __init__(self, name, default, enums):
EnumField.__init__(self, name, default, enums, "B")
[docs]class UInt16EnumField(EnumField):
"""
The field should only use the defined values. The value must be an 16-Bit unsigned integer.
:param String name: The name of the field
:param Mixed default: A value defined in the enums list
:param Dict enums: List of possible values.
"""
def __init__(self, name, default, enums):
EnumField.__init__(self, name, default, enums, "H")
# Vectors
[docs]class VectorListBaseField(object):
"""
A vector as defined by the RFC is a single dimensioned array.
:param String name: The name of the field
:param flextls.field.Field item_class:
:param List item_class_args:
:param String fmt: The format string
"""
def __init__(self, name, item_class=None, item_class_args=None, fmt="H"):
self.name = name
self.item_class = item_class
if item_class_args is None:
item_class_args = []
self.item_class_args = item_class_args
self.items = []
if fmt[0] in "@=<>!":
self.fmt = fmt
else:
self.fmt = "!"+fmt
def assemble(self):
data = b""
for item in self.items:
data = data + item.assemble()
return struct.pack(self.fmt, len(data)) + data
def dissect(self, data):
len_size = struct.calcsize(self.fmt)
if len(data) < len_size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
payload_size = struct.unpack(self.fmt, data[:len_size])[0]
data = data[len_size:]
if len(data) < payload_size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
payload_data = data[:payload_size]
while len(payload_data) > 0:
item = self.item_class(*self.item_class_args)
payload_data = item.dissect(payload_data)
self.items.append(item)
return data[payload_size:]
@property
def size(self):
size = struct.calcsize(self.fmt)
for item in self.items:
size = size + item.size
return size
@property
def value(self):
return self.items
[docs]class VectorListUInt8Field(VectorListBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 8-bit unsigned integer.
:param String name: The name of the field
:param flextls.field.Field item_class:
:param List item_class_args:
:param String fmt: The format string of the length identifier
"""
def __init__(self, name, item_class=None, item_class_args=None):
VectorListBaseField.__init__(self, name, item_class, item_class_args, fmt="B")
[docs]class VectorListUInt16Field(VectorListBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 16-bit unsigned integer.
:param String name: The name of the field
:param flextls.field.Field item_class:
:param List item_class_args:
:param String fmt: The format string of the length identifier
"""
def __init__(self, name, item_class=None, item_class_args=None):
VectorListBaseField.__init__(self, name, item_class, item_class_args, fmt="H")
[docs]class VectorListInt24Field(VectorListBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 24-bit unsigned integer.
:param String name: The name of the field
:param flextls.field.Field item_class:
:param List item_class_args:
:param String fmt: The format string of the length identifier
"""
def __init__(self, name, item_class=None, item_class_args=None):
VectorListBaseField.__init__(self, name, item_class, item_class_args, fmt="BH")
def assemble(self):
data = b""
for item in self.items:
data = data + item.assemble()
value = (int(self.value / (2**16)), int(self.value % (2**16)))
return struct.pack(self.fmt, *value) + data
def dissect(self, data):
if len(data) < self.size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
tmp = struct.unpack(self.fmt, data[:self.size])
payload_length = (tmp[0] * (2 ** 16)) + tmp[1]
data = data[self.size:]
if len(data) < payload_length:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
payload_data = data[:payload_length]
while len(payload_data) > 0:
item = self.item_class()
payload_data = item.dissect(payload_data)
self.items.append(item)
return data[payload_length:]
[docs]class CertificateListField(VectorListInt24Field):
"""
List of certificates
:param String name: The name of the field
"""
def __init__(self, name):
VectorListInt24Field.__init__(
self,
name,
CertificateField,
)
[docs]class CipherSuitesField(VectorListUInt16Field):
"""
List of cipher suites.
:param String name: The name of the field
"""
def __init__(self, name):
VectorListUInt16Field.__init__(
self,
name,
CipherSuiteField,
)
[docs]class ServerNameListField(VectorListUInt16Field):
"""
List of server names
:param String name: The name of the field
"""
def __init__(self, name):
VectorListUInt16Field.__init__(
self,
name,
ServerNameField,
)
[docs]class ExtensionsField(VectorListUInt16Field):
"""
List of extensions
:param String name: The name of the field
"""
def __init__(self, name):
from flextls.protocol.handshake.extension import Extension
VectorListUInt16Field.__init__(
self,
name,
Extension
)
def assemble(self):
if len(self.items) == 0:
return b""
return VectorListUInt16Field.assemble(self)
def dissect(self, data):
if len(data) == 0:
return data
return VectorListUInt16Field.dissect(self, data)
[docs]class CompressionMethodsField(VectorListUInt8Field):
"""
List of compression methods
:param String name: The name of the field
"""
def __init__(self, name):
VectorListUInt8Field.__init__(
self,
name,
CompressionMethodField,
)
[docs]class VectorBaseField(object):
"""
A vector as defined by the RFC is a single dimensioned array.
:param String name: The name of the field
:param Bytes default: Default value of the field
:param String fmt: The format string of the length identifier
"""
def __init__(self, name, default=b"", fmt="H", connection=None):
self.name = name
self.value = default
if fmt[0] in "@=<>!":
self.fmt = fmt
else:
self.fmt = "!"+fmt
def assemble(self):
data = self.value
if data is None:
data = b""
return struct.pack(self.fmt, len(data)) + data
def dissect(self, data):
len_size = struct.calcsize(self.fmt)
if len(data) < len_size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
value_size = struct.unpack(self.fmt, data[:len_size])[0]
data = data[len_size:]
self.value = data[:value_size]
return data[value_size:]
@property
def size(self):
size = struct.calcsize(self.fmt)
if self.value is not None:
size = size + len(self.value)
return size
[docs]class VectorUInt8Field(VectorBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 8-bit unsigned integer.
:param String name: The name of the field
:param String fmt: The format string of the length identifier
"""
def __init__(self, name):
VectorBaseField.__init__(self, name, fmt="B")
[docs]class VectorUInt16Field(VectorBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 16-bit unsigned integer.
:param String name: The name of the field
:param String fmt: The format string of the length identifier
"""
def __init__(self, name):
VectorBaseField.__init__(self, name, fmt="H")
[docs]class VectorInt24Field(VectorBaseField):
"""
A vector as defined by the RFC is a single dimensioned array.
The length identifier of this vector is a 24-bit unsigned integer.
:param String name: The name of the field
:param String fmt: The format string of the length identifier
"""
def __init__(self, name):
VectorBaseField.__init__(self, name, fmt="BH")
def assemble(self):
data_length = len(self.value)
len_value = (int(data_length / (2**16)), int(data_length % (2**16)))
return struct.pack(self.fmt, *len_value) + self.value
def dissect(self, data):
if len(data) < self.size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
tmp = struct.unpack(self.fmt, data[:self.size])
data_length = (tmp[0] * (2 ** 16)) + tmp[1]
data = data[self.size:]
if len(data) < data_length:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
self.value = data[:data_length]
return data[data_length:]
[docs]class CertificateField(VectorInt24Field):
"""
A certificate.
:param String name: The name of the field
"""
def __init__(self, name="certificate"):
VectorInt24Field.__init__(self, name)
[docs]class HostNameField(VectorUInt16Field):
"""
The hostname.
"""
pass
# Multipart
[docs]class MultiPartField(object):
"""
A field consisting of more than one value.
:param String name: The name of the field
:param fields: List of sub fields
"""
payload_list = None
def __init__(self, name, fields=[]):
self.fields = []
self.name = name
self.fields = fields
self.payload_identifier_field = None
self.payload_length_field = None
def __getattr__(self, name):
return self.get_field_value(name)
def __setattr__(self, name, value):
if name == "fields":
object.__setattr__(self, name, value)
return
for field in self.fields:
if field.name == name:
field.value = value
return
object.__setattr__(self, name, value)
@classmethod
def add_payload_type(cls, pattern, payload_class):
if cls.payload_list is None:
cls.payload_list = {}
cls.payload_list[pattern] = payload_class
def assemble(self):
data = b""
payload = b""
if not isinstance(self.payload, bytes) and self.payload is not None:
payload = self.payload.assemble()
if self.payload_identifier_field is not None:
for pay_pattern, pay_class in self.payload_list.items():
if isinstance(self.payload, pay_class):
self.set_field_value(
self.payload_identifier_field,
pay_pattern
)
break
elif self.payload is not None:
payload = self.payload
if self.payload_length_field is not None and payload is not None:
self.set_field_value(
self.payload_length_field,
len(payload)
)
for field in self.fields:
data = data + field.assemble()
data = data + payload
return data
def dissect(self, data):
for field in self.fields:
data = field.dissect(data)
if self.payload_identifier_field is not None:
if self.payload_length_field is None:
payload_data = data
data = data[:0]
else:
payload_length = self.get_field_value(self.payload_length_field)
payload_data = data[:payload_length]
data = data[payload_length:]
payload_class = None
if self.payload_list is not None:
payload_class = self.payload_list.get(
self.get_field_value(self.payload_identifier_field),
None
)
if payload_class is None:
self.payload = payload_data
else:
obj = payload_class("onknown")
payload_data = obj.dissect(payload_data)
self.payload = obj
return data
def get_field_value(self, name):
for field in self.fields:
if field.name == name:
return field.value
def set_field_value(self, name, value):
for field in self.fields:
if field.name == name:
field.value = value
@property
def value(self):
return self
[docs]class ServerNameField(MultiPartField):
"""
The server name
"""
def __init__(self, name="test", **kwargs):
MultiPartField.__init__(self, name, **kwargs)
self.fields = [
UInt8EnumField(
"name_type",
None,
{
0: "host_name",
255: None
}
),
]
self.payload_identifier_field = "name_type"
ServerNameField.add_payload_type(0, HostNameField)
[docs]class VersionField(MultiPartField):
"""
The protocol version field.
:param String name: Name of the field
"""
def __init__(self, name):
MultiPartField.__init__(
self,
name,
[
UInt8Field("major", 3),
UInt8Field("minor", 0)
]
)
[docs]class SignatureAndHashAlgorithmField(MultiPartField):
"""
Representing a signature and hash algorithm
"""
def __init__(self, name):
MultiPartField.__init__(
self,
name,
[
UInt8Field("hash", 0),
UInt8Field("signature", 0)
]
)
[docs]class ServerDHParamsField(MultiPartField):
"""
RFC5246 Section 7.4.3. Server Key Exchange Message
"""
def __init__(self, name):
MultiPartField.__init__(
self,
name,
[
VectorUInt16Field("dh_p"),
VectorUInt16Field("dh_g"),
VectorUInt16Field("dh_Ys")
]
)
[docs]class ServerECDHParamsField(MultiPartField):
"""
RFC4492 ECC Cipher Suites for TLS
"""
def __init__(self, name):
MultiPartField.__init__(
self,
name,
[
ECParametersField("curve_params", None),
ECPointField("public")
]
)
class ECParametersField(Field):
def dissect(self, data):
"""
Dissect the field.
:param bytes data: The data to extract the field value from
:return: The rest of the data not used to dissect the field value
:rtype: bytes
"""
size = struct.calcsize("B")
if len(data) < size:
raise NotEnoughData(
"Not enough data to decode field '%s' value" % self.name
)
curve_type = struct.unpack("B", data[:size])[0]
if curve_type == 0x03:
self._value = ECParametersNamedCurveField("none")
data = self._value.dissect(data)
else:
raise NotImplementedError(
"Decoding of KeyExchange message for curve 0x%.2X not implemented" % curve_type
)
return data
[docs]class ECParametersNamedCurveField(MultiPartField):
"""
RFC4492 ECC Cipher Suites for TLS
"""
def __init__(self, name):
MultiPartField.__init__(
self,
name,
[
UInt8Field("curve_type", 0x03),
UInt16Field("namedcurve", 0)
]
)
[docs]class ECPointField(VectorUInt8Field):
"""
RFC4492 ECC Cipher Suites for TLS
"""
pass
# Custom
[docs]class CipherSuiteField(UInt16Field):
"""
A cipher suite
"""
def __init__(self, name="unnamed"):
UInt16Field.__init__(self, name, None)
[docs]class SSLv2CipherSuiteField(UInt24Field):
"""
A cipher suite for SSLv2
"""
def __init__(self, name="unnamed"):
UInt24Field.__init__(self, name, None)
[docs]class CompressionMethodField(UInt8Field):
"""
Compression method
"""
def __init__(self, name="unnamed"):
UInt8Field.__init__(self, name, None)