| Server IP : 61.19.30.66 / Your IP : 216.73.216.15 Web Server : Apache/2.2.22 (Ubuntu) System : Linux klw 3.11.0-15-generic #25~precise1-Ubuntu SMP Thu Jan 30 17:39:31 UTC 2014 x86_64 User : www-data ( 33) PHP Version : 5.3.10-1ubuntu3.48 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : ON | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /usr/share/pyshared/twisted/test/ |
Upload File : |
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Test cases for twisted.protocols package.
"""
import struct
from zope.interface.verify import verifyObject
from twisted.trial import unittest
from twisted.protocols import basic, wire, portforward
from twisted.internet import reactor, protocol, defer, task, error, address
from twisted.internet.interfaces import IProtocolFactory, ILoggingContext
from twisted.test import proto_helpers
class LineTester(basic.LineReceiver):
"""
A line receiver that parses data received and make actions on some tokens.
@type delimiter: C{str}
@ivar delimiter: character used between received lines.
@type MAX_LENGTH: C{int}
@ivar MAX_LENGTH: size of a line when C{lineLengthExceeded} will be called.
@type clock: L{twisted.internet.task.Clock}
@ivar clock: clock simulating reactor callLater. Pass it to constructor if
you want to use the pause/rawpause functionalities.
"""
delimiter = '\n'
MAX_LENGTH = 64
def __init__(self, clock=None):
"""
If given, use a clock to make callLater calls.
"""
self.clock = clock
def connectionMade(self):
"""
Create/clean data received on connection.
"""
self.received = []
def lineReceived(self, line):
"""
Receive line and make some action for some tokens: pause, rawpause,
stop, len, produce, unproduce.
"""
self.received.append(line)
if line == '':
self.setRawMode()
elif line == 'pause':
self.pauseProducing()
self.clock.callLater(0, self.resumeProducing)
elif line == 'rawpause':
self.pauseProducing()
self.setRawMode()
self.received.append('')
self.clock.callLater(0, self.resumeProducing)
elif line == 'stop':
self.stopProducing()
elif line[:4] == 'len ':
self.length = int(line[4:])
elif line.startswith('produce'):
self.transport.registerProducer(self, False)
elif line.startswith('unproduce'):
self.transport.unregisterProducer()
def rawDataReceived(self, data):
"""
Read raw data, until the quantity specified by a previous 'len' line is
reached.
"""
data, rest = data[:self.length], data[self.length:]
self.length = self.length - len(data)
self.received[-1] = self.received[-1] + data
if self.length == 0:
self.setLineMode(rest)
def lineLengthExceeded(self, line):
"""
Adjust line mode when long lines received.
"""
if len(line) > self.MAX_LENGTH + 1:
self.setLineMode(line[self.MAX_LENGTH + 1:])
class LineOnlyTester(basic.LineOnlyReceiver):
"""
A buffering line only receiver.
"""
delimiter = '\n'
MAX_LENGTH = 64
def connectionMade(self):
"""
Create/clean data received on connection.
"""
self.received = []
def lineReceived(self, line):
"""
Save received data.
"""
self.received.append(line)
class FactoryTests(unittest.TestCase):
"""
Tests for L{protocol.Factory}.
"""
def test_interfaces(self):
"""
L{protocol.Factory} instances provide both L{IProtocolFactory} and
L{ILoggingContext}.
"""
factory = protocol.Factory()
self.assertTrue(verifyObject(IProtocolFactory, factory))
self.assertTrue(verifyObject(ILoggingContext, factory))
def test_logPrefix(self):
"""
L{protocol.Factory.logPrefix} returns the name of the factory class.
"""
class SomeKindOfFactory(protocol.Factory):
pass
self.assertEqual("SomeKindOfFactory", SomeKindOfFactory().logPrefix())
class WireTestCase(unittest.TestCase):
"""
Test wire protocols.
"""
def test_echo(self):
"""
Test wire.Echo protocol: send some data and check it send it back.
"""
t = proto_helpers.StringTransport()
a = wire.Echo()
a.makeConnection(t)
a.dataReceived("hello")
a.dataReceived("world")
a.dataReceived("how")
a.dataReceived("are")
a.dataReceived("you")
self.assertEqual(t.value(), "helloworldhowareyou")
def test_who(self):
"""
Test wire.Who protocol.
"""
t = proto_helpers.StringTransport()
a = wire.Who()
a.makeConnection(t)
self.assertEqual(t.value(), "root\r\n")
def test_QOTD(self):
"""
Test wire.QOTD protocol.
"""
t = proto_helpers.StringTransport()
a = wire.QOTD()
a.makeConnection(t)
self.assertEqual(t.value(),
"An apple a day keeps the doctor away.\r\n")
def test_discard(self):
"""
Test wire.Discard protocol.
"""
t = proto_helpers.StringTransport()
a = wire.Discard()
a.makeConnection(t)
a.dataReceived("hello")
a.dataReceived("world")
a.dataReceived("how")
a.dataReceived("are")
a.dataReceived("you")
self.assertEqual(t.value(), "")
class LineReceiverTestCase(unittest.TestCase):
"""
Test LineReceiver, using the C{LineTester} wrapper.
"""
buffer = '''\
len 10
0123456789len 5
1234
len 20
foo 123
0123456789
012345678len 0
foo 5
1234567890123456789012345678901234567890123456789012345678901234567890
len 1
a'''
output = ['len 10', '0123456789', 'len 5', '1234\n',
'len 20', 'foo 123', '0123456789\n012345678',
'len 0', 'foo 5', '', '67890', 'len 1', 'a']
def testBuffer(self):
"""
Test buffering for different packet size, checking received matches
expected data.
"""
for packet_size in range(1, 10):
t = proto_helpers.StringIOWithoutClosing()
a = LineTester()
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.buffer)/packet_size + 1):
s = self.buffer[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.assertEqual(self.output, a.received)
pause_buf = 'twiddle1\ntwiddle2\npause\ntwiddle3\n'
pause_output1 = ['twiddle1', 'twiddle2', 'pause']
pause_output2 = pause_output1+['twiddle3']
def test_pausing(self):
"""
Test pause inside data receiving. It uses fake clock to see if
pausing/resuming work.
"""
for packet_size in range(1, 10):
t = proto_helpers.StringIOWithoutClosing()
clock = task.Clock()
a = LineTester(clock)
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.pause_buf)/packet_size + 1):
s = self.pause_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.assertEqual(self.pause_output1, a.received)
clock.advance(0)
self.assertEqual(self.pause_output2, a.received)
rawpause_buf = 'twiddle1\ntwiddle2\nlen 5\nrawpause\n12345twiddle3\n'
rawpause_output1 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '']
rawpause_output2 = ['twiddle1', 'twiddle2', 'len 5', 'rawpause', '12345',
'twiddle3']
def test_rawPausing(self):
"""
Test pause inside raw date receiving.
"""
for packet_size in range(1, 10):
t = proto_helpers.StringIOWithoutClosing()
clock = task.Clock()
a = LineTester(clock)
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.rawpause_buf)/packet_size + 1):
s = self.rawpause_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.assertEqual(self.rawpause_output1, a.received)
clock.advance(0)
self.assertEqual(self.rawpause_output2, a.received)
stop_buf = 'twiddle1\ntwiddle2\nstop\nmore\nstuff\n'
stop_output = ['twiddle1', 'twiddle2', 'stop']
def test_stopProducing(self):
"""
Test stop inside producing.
"""
for packet_size in range(1, 10):
t = proto_helpers.StringIOWithoutClosing()
a = LineTester()
a.makeConnection(protocol.FileWrapper(t))
for i in range(len(self.stop_buf)/packet_size + 1):
s = self.stop_buf[i*packet_size:(i+1)*packet_size]
a.dataReceived(s)
self.assertEqual(self.stop_output, a.received)
def test_lineReceiverAsProducer(self):
"""
Test produce/unproduce in receiving.
"""
a = LineTester()
t = proto_helpers.StringIOWithoutClosing()
a.makeConnection(protocol.FileWrapper(t))
a.dataReceived('produce\nhello world\nunproduce\ngoodbye\n')
self.assertEqual(a.received,
['produce', 'hello world', 'unproduce', 'goodbye'])
def test_clearLineBuffer(self):
"""
L{LineReceiver.clearLineBuffer} removes all buffered data and returns
it as a C{str} and can be called from beneath C{dataReceived}.
"""
class ClearingReceiver(basic.LineReceiver):
def lineReceived(self, line):
self.line = line
self.rest = self.clearLineBuffer()
protocol = ClearingReceiver()
protocol.dataReceived('foo\r\nbar\r\nbaz')
self.assertEqual(protocol.line, 'foo')
self.assertEqual(protocol.rest, 'bar\r\nbaz')
# Deliver another line to make sure the previously buffered data is
# really gone.
protocol.dataReceived('quux\r\n')
self.assertEqual(protocol.line, 'quux')
self.assertEqual(protocol.rest, '')
class LineOnlyReceiverTestCase(unittest.TestCase):
"""
Test line only receiveer.
"""
buffer = """foo
bleakness
desolation
plastic forks
"""
def test_buffer(self):
"""
Test buffering over line protocol: data received should match buffer.
"""
t = proto_helpers.StringTransport()
a = LineOnlyTester()
a.makeConnection(t)
for c in self.buffer:
a.dataReceived(c)
self.assertEqual(a.received, self.buffer.split('\n')[:-1])
def test_lineTooLong(self):
"""
Test sending a line too long: it should close the connection.
"""
t = proto_helpers.StringTransport()
a = LineOnlyTester()
a.makeConnection(t)
res = a.dataReceived('x'*200)
self.assertIsInstance(res, error.ConnectionLost)
class TestMixin:
def connectionMade(self):
self.received = []
def stringReceived(self, s):
self.received.append(s)
MAX_LENGTH = 50
closed = 0
def connectionLost(self, reason):
self.closed = 1
class TestNetstring(TestMixin, basic.NetstringReceiver):
def stringReceived(self, s):
self.received.append(s)
self.transport.write(s)
class LPTestCaseMixin:
illegalStrings = []
protocol = None
def getProtocol(self):
"""
Return a new instance of C{self.protocol} connected to a new instance
of L{proto_helpers.StringTransport}.
"""
t = proto_helpers.StringTransport()
a = self.protocol()
a.makeConnection(t)
return a
def test_illegal(self):
"""
Assert that illegal strings cause the transport to be closed.
"""
for s in self.illegalStrings:
r = self.getProtocol()
for c in s:
r.dataReceived(c)
self.assertTrue(r.transport.disconnecting)
class NetstringReceiverTestCase(unittest.TestCase, LPTestCaseMixin):
strings = ['hello', 'world', 'how', 'are', 'you123', ':today', "a"*515]
illegalStrings = [
'9999999999999999999999', 'abc', '4:abcde',
'51:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaab,',]
protocol = TestNetstring
def setUp(self):
self.transport = proto_helpers.StringTransport()
self.netstringReceiver = TestNetstring()
self.netstringReceiver.makeConnection(self.transport)
def test_buffer(self):
"""
Strings can be received in chunks of different lengths.
"""
for packet_size in range(1, 10):
t = proto_helpers.StringTransport()
a = TestNetstring()
a.MAX_LENGTH = 699
a.makeConnection(t)
for s in self.strings:
a.sendString(s)
out = t.value()
for i in range(len(out)/packet_size + 1):
s = out[i*packet_size:(i+1)*packet_size]
if s:
a.dataReceived(s)
self.assertEqual(a.received, self.strings)
def test_sendNonStrings(self):
"""
L{basic.NetstringReceiver.sendString} will send objects that are not
strings by sending their string representation according to str().
"""
nonStrings = [ [], { 1 : 'a', 2 : 'b' }, ['a', 'b', 'c'], 673,
(12, "fine", "and", "you?") ]
a = TestNetstring()
t = proto_helpers.StringTransport()
a.MAX_LENGTH = 100
a.makeConnection(t)
for s in nonStrings:
a.sendString(s)
out = t.value()
t.clear()
length = out[:out.find(":")]
data = out[out.find(":") + 1:-1] #[:-1] to ignore the trailing ","
self.assertEqual(int(length), len(str(s)))
self.assertEqual(data, str(s))
warnings = self.flushWarnings(
offendingFunctions=[self.test_sendNonStrings])
self.assertEqual(len(warnings), 5)
self.assertEqual(
warnings[0]["message"],
"Data passed to sendString() must be a string. Non-string support "
"is deprecated since Twisted 10.0")
self.assertEqual(
warnings[0]['category'],
DeprecationWarning)
def test_receiveEmptyNetstring(self):
"""
Empty netstrings (with length '0') can be received.
"""
self.netstringReceiver.dataReceived("0:,")
self.assertEqual(self.netstringReceiver.received, [""])
def test_receiveOneCharacter(self):
"""
One-character netstrings can be received.
"""
self.netstringReceiver.dataReceived("1:a,")
self.assertEqual(self.netstringReceiver.received, ["a"])
def test_receiveTwoCharacters(self):
"""
Two-character netstrings can be received.
"""
self.netstringReceiver.dataReceived("2:ab,")
self.assertEqual(self.netstringReceiver.received, ["ab"])
def test_receiveNestedNetstring(self):
"""
Netstrings with embedded netstrings. This test makes sure that
the parser does not become confused about the ',' and ':'
characters appearing inside the data portion of the netstring.
"""
self.netstringReceiver.dataReceived("4:1:a,,")
self.assertEqual(self.netstringReceiver.received, ["1:a,"])
def test_moreDataThanSpecified(self):
"""
Netstrings containing more data than expected are refused.
"""
self.netstringReceiver.dataReceived("2:aaa,")
self.assertTrue(self.transport.disconnecting)
def test_moreDataThanSpecifiedBorderCase(self):
"""
Netstrings that should be empty according to their length
specification are refused if they contain data.
"""
self.netstringReceiver.dataReceived("0:a,")
self.assertTrue(self.transport.disconnecting)
def test_missingNumber(self):
"""
Netstrings without leading digits that specify the length
are refused.
"""
self.netstringReceiver.dataReceived(":aaa,")
self.assertTrue(self.transport.disconnecting)
def test_missingColon(self):
"""
Netstrings without a colon between length specification and
data are refused.
"""
self.netstringReceiver.dataReceived("3aaa,")
self.assertTrue(self.transport.disconnecting)
def test_missingNumberAndColon(self):
"""
Netstrings that have no leading digits nor a colon are
refused.
"""
self.netstringReceiver.dataReceived("aaa,")
self.assertTrue(self.transport.disconnecting)
def test_onlyData(self):
"""
Netstrings consisting only of data are refused.
"""
self.netstringReceiver.dataReceived("aaa")
self.assertTrue(self.transport.disconnecting)
def test_receiveNetstringPortions_1(self):
"""
Netstrings can be received in two portions.
"""
self.netstringReceiver.dataReceived("4:aa")
self.netstringReceiver.dataReceived("aa,")
self.assertEqual(self.netstringReceiver.received, ["aaaa"])
self.assertTrue(self.netstringReceiver._payloadComplete())
def test_receiveNetstringPortions_2(self):
"""
Netstrings can be received in more than two portions, even if
the length specification is split across two portions.
"""
for part in ["1", "0:01234", "56789", ","]:
self.netstringReceiver.dataReceived(part)
self.assertEqual(self.netstringReceiver.received, ["0123456789"])
def test_receiveNetstringPortions_3(self):
"""
Netstrings can be received one character at a time.
"""
for part in "2:ab,":
self.netstringReceiver.dataReceived(part)
self.assertEqual(self.netstringReceiver.received, ["ab"])
def test_receiveTwoNetstrings(self):
"""
A stream of two netstrings can be received in two portions,
where the first portion contains the complete first netstring
and the length specification of the second netstring.
"""
self.netstringReceiver.dataReceived("1:a,1")
self.assertTrue(self.netstringReceiver._payloadComplete())
self.assertEqual(self.netstringReceiver.received, ["a"])
self.netstringReceiver.dataReceived(":b,")
self.assertEqual(self.netstringReceiver.received, ["a", "b"])
def test_maxReceiveLimit(self):
"""
Netstrings with a length specification exceeding the specified
C{MAX_LENGTH} are refused.
"""
tooLong = self.netstringReceiver.MAX_LENGTH + 1
self.netstringReceiver.dataReceived("%s:%s" %
(tooLong, "a" * tooLong))
self.assertTrue(self.transport.disconnecting)
def test_consumeLength(self):
"""
C{_consumeLength} returns the expected length of the
netstring, including the trailing comma.
"""
self.netstringReceiver._remainingData = "12:"
self.netstringReceiver._consumeLength()
self.assertEqual(self.netstringReceiver._expectedPayloadSize, 13)
def test_consumeLengthBorderCase1(self):
"""
C{_consumeLength} works as expected if the length specification
contains the value of C{MAX_LENGTH} (border case).
"""
self.netstringReceiver._remainingData = "12:"
self.netstringReceiver.MAX_LENGTH = 12
self.netstringReceiver._consumeLength()
self.assertEqual(self.netstringReceiver._expectedPayloadSize, 13)
def test_consumeLengthBorderCase2(self):
"""
C{_consumeLength} raises a L{basic.NetstringParseError} if
the length specification exceeds the value of C{MAX_LENGTH}
by 1 (border case).
"""
self.netstringReceiver._remainingData = "12:"
self.netstringReceiver.MAX_LENGTH = 11
self.assertRaises(basic.NetstringParseError,
self.netstringReceiver._consumeLength)
def test_consumeLengthBorderCase3(self):
"""
C{_consumeLength} raises a L{basic.NetstringParseError} if
the length specification exceeds the value of C{MAX_LENGTH}
by more than 1.
"""
self.netstringReceiver._remainingData = "1000:"
self.netstringReceiver.MAX_LENGTH = 11
self.assertRaises(basic.NetstringParseError,
self.netstringReceiver._consumeLength)
def test_deprecatedModuleAttributes(self):
"""
Accessing one of the old module attributes used by the
NetstringReceiver parser emits a deprecation warning.
"""
basic.LENGTH, basic.DATA, basic.COMMA, basic.NUMBER
warnings = self.flushWarnings(
offendingFunctions=[self.test_deprecatedModuleAttributes])
self.assertEqual(len(warnings), 4)
for warning in warnings:
self.assertEqual(warning['category'], DeprecationWarning)
self.assertEqual(
warnings[0]['message'],
("twisted.protocols.basic.LENGTH was deprecated in Twisted 10.2.0: "
"NetstringReceiver parser state is private."))
self.assertEqual(
warnings[1]['message'],
("twisted.protocols.basic.DATA was deprecated in Twisted 10.2.0: "
"NetstringReceiver parser state is private."))
self.assertEqual(
warnings[2]['message'],
("twisted.protocols.basic.COMMA was deprecated in Twisted 10.2.0: "
"NetstringReceiver parser state is private."))
self.assertEqual(
warnings[3]['message'],
("twisted.protocols.basic.NUMBER was deprecated in Twisted 10.2.0: "
"NetstringReceiver parser state is private."))
class IntNTestCaseMixin(LPTestCaseMixin):
"""
TestCase mixin for int-prefixed protocols.
"""
protocol = None
strings = None
illegalStrings = None
partialStrings = None
def test_receive(self):
"""
Test receiving data find the same data send.
"""
r = self.getProtocol()
for s in self.strings:
for c in struct.pack(r.structFormat,len(s)) + s:
r.dataReceived(c)
self.assertEqual(r.received, self.strings)
def test_partial(self):
"""
Send partial data, nothing should be definitely received.
"""
for s in self.partialStrings:
r = self.getProtocol()
for c in s:
r.dataReceived(c)
self.assertEqual(r.received, [])
def test_send(self):
"""
Test sending data over protocol.
"""
r = self.getProtocol()
r.sendString("b" * 16)
self.assertEqual(r.transport.value(),
struct.pack(r.structFormat, 16) + "b" * 16)
def test_lengthLimitExceeded(self):
"""
When a length prefix is received which is greater than the protocol's
C{MAX_LENGTH} attribute, the C{lengthLimitExceeded} method is called
with the received length prefix.
"""
length = []
r = self.getProtocol()
r.lengthLimitExceeded = length.append
r.MAX_LENGTH = 10
r.dataReceived(struct.pack(r.structFormat, 11))
self.assertEqual(length, [11])
def test_longStringNotDelivered(self):
"""
If a length prefix for a string longer than C{MAX_LENGTH} is delivered
to C{dataReceived} at the same time as the entire string, the string is
not passed to C{stringReceived}.
"""
r = self.getProtocol()
r.MAX_LENGTH = 10
r.dataReceived(
struct.pack(r.structFormat, 11) + 'x' * 11)
self.assertEqual(r.received, [])
class RecvdAttributeMixin(object):
"""
Mixin defining tests for string receiving protocols with a C{recvd}
attribute which should be settable by application code, to be combined with
L{IntNTestCaseMixin} on a L{TestCase} subclass
"""
def makeMessage(self, protocol, data):
"""
Return C{data} prefixed with message length in C{protocol.structFormat}
form.
"""
return struct.pack(protocol.structFormat, len(data)) + data
def test_recvdContainsRemainingData(self):
"""
In stringReceived, recvd contains the remaining data that was passed to
dataReceived that was not part of the current message.
"""
result = []
r = self.getProtocol()
def stringReceived(receivedString):
result.append(r.recvd)
r.stringReceived = stringReceived
completeMessage = (struct.pack(r.structFormat, 5) + ('a' * 5))
incompleteMessage = (struct.pack(r.structFormat, 5) + ('b' * 4))
# Receive a complete message, followed by an incomplete one
r.dataReceived(completeMessage + incompleteMessage)
self.assertEquals(result, [incompleteMessage])
def test_recvdChanged(self):
"""
In stringReceived, if recvd is changed, messages should be parsed from
it rather than the input to dataReceived.
"""
r = self.getProtocol()
result = []
payloadC = 'c' * 5
messageC = self.makeMessage(r, payloadC)
def stringReceived(receivedString):
if not result:
r.recvd = messageC
result.append(receivedString)
r.stringReceived = stringReceived
payloadA = 'a' * 5
payloadB = 'b' * 5
messageA = self.makeMessage(r, payloadA)
messageB = self.makeMessage(r, payloadB)
r.dataReceived(messageA + messageB)
self.assertEquals(result, [payloadA, payloadC])
def test_switching(self):
"""
Data already parsed by L{IntNStringReceiver.dataReceived} is not
reparsed if C{stringReceived} consumes some of the
L{IntNStringReceiver.recvd} buffer.
"""
proto = self.getProtocol()
mix = []
SWITCH = "\x00\x00\x00\x00"
for s in self.strings:
mix.append(self.makeMessage(proto, s))
mix.append(SWITCH)
result = []
def stringReceived(receivedString):
result.append(receivedString)
proto.recvd = proto.recvd[len(SWITCH):]
proto.stringReceived = stringReceived
proto.dataReceived("".join(mix))
# Just another byte, to trigger processing of anything that might have
# been left in the buffer (should be nothing).
proto.dataReceived("\x01")
self.assertEqual(result, self.strings)
# And verify that another way
self.assertEqual(proto.recvd, "\x01")
def test_recvdInLengthLimitExceeded(self):
"""
The L{IntNStringReceiver.recvd} buffer contains all data not yet
processed by L{IntNStringReceiver.dataReceived} if the
C{lengthLimitExceeded} event occurs.
"""
proto = self.getProtocol()
DATA = "too long"
proto.MAX_LENGTH = len(DATA) - 1
message = self.makeMessage(proto, DATA)
result = []
def lengthLimitExceeded(length):
result.append(length)
result.append(proto.recvd)
proto.lengthLimitExceeded = lengthLimitExceeded
proto.dataReceived(message)
self.assertEqual(result[0], len(DATA))
self.assertEqual(result[1], message)
class TestInt32(TestMixin, basic.Int32StringReceiver):
"""
A L{basic.Int32StringReceiver} storing received strings in an array.
@ivar received: array holding received strings.
"""
class Int32TestCase(unittest.TestCase, IntNTestCaseMixin, RecvdAttributeMixin):
"""
Test case for int32-prefixed protocol
"""
protocol = TestInt32
strings = ["a", "b" * 16]
illegalStrings = ["\x10\x00\x00\x00aaaaaa"]
partialStrings = ["\x00\x00\x00", "hello there", ""]
def test_data(self):
"""
Test specific behavior of the 32-bits length.
"""
r = self.getProtocol()
r.sendString("foo")
self.assertEqual(r.transport.value(), "\x00\x00\x00\x03foo")
r.dataReceived("\x00\x00\x00\x04ubar")
self.assertEqual(r.received, ["ubar"])
class TestInt16(TestMixin, basic.Int16StringReceiver):
"""
A L{basic.Int16StringReceiver} storing received strings in an array.
@ivar received: array holding received strings.
"""
class Int16TestCase(unittest.TestCase, IntNTestCaseMixin, RecvdAttributeMixin):
"""
Test case for int16-prefixed protocol
"""
protocol = TestInt16
strings = ["a", "b" * 16]
illegalStrings = ["\x10\x00aaaaaa"]
partialStrings = ["\x00", "hello there", ""]
def test_data(self):
"""
Test specific behavior of the 16-bits length.
"""
r = self.getProtocol()
r.sendString("foo")
self.assertEqual(r.transport.value(), "\x00\x03foo")
r.dataReceived("\x00\x04ubar")
self.assertEqual(r.received, ["ubar"])
def test_tooLongSend(self):
"""
Send too much data: that should cause an error.
"""
r = self.getProtocol()
tooSend = "b" * (2**(r.prefixLength*8) + 1)
self.assertRaises(AssertionError, r.sendString, tooSend)
class NewStyleTestInt16(TestInt16, object):
"""
A new-style class version of TestInt16
"""
class NewStyleInt16TestCase(Int16TestCase):
"""
This test case verifies that IntNStringReceiver still works when inherited
by a new-style class.
"""
protocol = NewStyleTestInt16
class TestInt8(TestMixin, basic.Int8StringReceiver):
"""
A L{basic.Int8StringReceiver} storing received strings in an array.
@ivar received: array holding received strings.
"""
class Int8TestCase(unittest.TestCase, IntNTestCaseMixin, RecvdAttributeMixin):
"""
Test case for int8-prefixed protocol
"""
protocol = TestInt8
strings = ["a", "b" * 16]
illegalStrings = ["\x00\x00aaaaaa"]
partialStrings = ["\x08", "dzadz", ""]
def test_data(self):
"""
Test specific behavior of the 8-bits length.
"""
r = self.getProtocol()
r.sendString("foo")
self.assertEqual(r.transport.value(), "\x03foo")
r.dataReceived("\x04ubar")
self.assertEqual(r.received, ["ubar"])
def test_tooLongSend(self):
"""
Send too much data: that should cause an error.
"""
r = self.getProtocol()
tooSend = "b" * (2**(r.prefixLength*8) + 1)
self.assertRaises(AssertionError, r.sendString, tooSend)
class OnlyProducerTransport(object):
# Transport which isn't really a transport, just looks like one to
# someone not looking very hard.
paused = False
disconnecting = False
def __init__(self):
self.data = []
def pauseProducing(self):
self.paused = True
def resumeProducing(self):
self.paused = False
def write(self, bytes):
self.data.append(bytes)
class ConsumingProtocol(basic.LineReceiver):
# Protocol that really, really doesn't want any more bytes.
def lineReceived(self, line):
self.transport.write(line)
self.pauseProducing()
class ProducerTestCase(unittest.TestCase):
def testPauseResume(self):
p = ConsumingProtocol()
t = OnlyProducerTransport()
p.makeConnection(t)
p.dataReceived('hello, ')
self.failIf(t.data)
self.failIf(t.paused)
self.failIf(p.paused)
p.dataReceived('world\r\n')
self.assertEqual(t.data, ['hello, world'])
self.failUnless(t.paused)
self.failUnless(p.paused)
p.resumeProducing()
self.failIf(t.paused)
self.failIf(p.paused)
p.dataReceived('hello\r\nworld\r\n')
self.assertEqual(t.data, ['hello, world', 'hello'])
self.failUnless(t.paused)
self.failUnless(p.paused)
p.resumeProducing()
p.dataReceived('goodbye\r\n')
self.assertEqual(t.data, ['hello, world', 'hello', 'world'])
self.failUnless(t.paused)
self.failUnless(p.paused)
p.resumeProducing()
self.assertEqual(t.data, ['hello, world', 'hello', 'world', 'goodbye'])
self.failUnless(t.paused)
self.failUnless(p.paused)
p.resumeProducing()
self.assertEqual(t.data, ['hello, world', 'hello', 'world', 'goodbye'])
self.failIf(t.paused)
self.failIf(p.paused)
class TestableProxyClientFactory(portforward.ProxyClientFactory):
"""
Test proxy client factory that keeps the last created protocol instance.
@ivar protoInstance: the last instance of the protocol.
@type protoInstance: L{portforward.ProxyClient}
"""
def buildProtocol(self, addr):
"""
Create the protocol instance and keeps track of it.
"""
proto = portforward.ProxyClientFactory.buildProtocol(self, addr)
self.protoInstance = proto
return proto
class TestableProxyFactory(portforward.ProxyFactory):
"""
Test proxy factory that keeps the last created protocol instance.
@ivar protoInstance: the last instance of the protocol.
@type protoInstance: L{portforward.ProxyServer}
@ivar clientFactoryInstance: client factory used by C{protoInstance} to
create forward connections.
@type clientFactoryInstance: L{TestableProxyClientFactory}
"""
def buildProtocol(self, addr):
"""
Create the protocol instance, keeps track of it, and makes it use
C{clientFactoryInstance} as client factory.
"""
proto = portforward.ProxyFactory.buildProtocol(self, addr)
self.clientFactoryInstance = TestableProxyClientFactory()
# Force the use of this specific instance
proto.clientProtocolFactory = lambda: self.clientFactoryInstance
self.protoInstance = proto
return proto
class Portforwarding(unittest.TestCase):
"""
Test port forwarding.
"""
def setUp(self):
self.serverProtocol = wire.Echo()
self.clientProtocol = protocol.Protocol()
self.openPorts = []
def tearDown(self):
try:
self.proxyServerFactory.protoInstance.transport.loseConnection()
except AttributeError:
pass
try:
pi = self.proxyServerFactory.clientFactoryInstance.protoInstance
pi.transport.loseConnection()
except AttributeError:
pass
try:
self.clientProtocol.transport.loseConnection()
except AttributeError:
pass
try:
self.serverProtocol.transport.loseConnection()
except AttributeError:
pass
return defer.gatherResults(
[defer.maybeDeferred(p.stopListening) for p in self.openPorts])
def test_portforward(self):
"""
Test port forwarding through Echo protocol.
"""
realServerFactory = protocol.ServerFactory()
realServerFactory.protocol = lambda: self.serverProtocol
realServerPort = reactor.listenTCP(0, realServerFactory,
interface='127.0.0.1')
self.openPorts.append(realServerPort)
self.proxyServerFactory = TestableProxyFactory('127.0.0.1',
realServerPort.getHost().port)
proxyServerPort = reactor.listenTCP(0, self.proxyServerFactory,
interface='127.0.0.1')
self.openPorts.append(proxyServerPort)
nBytes = 1000
received = []
d = defer.Deferred()
def testDataReceived(data):
received.extend(data)
if len(received) >= nBytes:
self.assertEqual(''.join(received), 'x' * nBytes)
d.callback(None)
self.clientProtocol.dataReceived = testDataReceived
def testConnectionMade():
self.clientProtocol.transport.write('x' * nBytes)
self.clientProtocol.connectionMade = testConnectionMade
clientFactory = protocol.ClientFactory()
clientFactory.protocol = lambda: self.clientProtocol
reactor.connectTCP(
'127.0.0.1', proxyServerPort.getHost().port, clientFactory)
return d
def test_registerProducers(self):
"""
The proxy client registers itself as a producer of the proxy server and
vice versa.
"""
# create a ProxyServer instance
addr = address.IPv4Address('TCP', '127.0.0.1', 0)
server = portforward.ProxyFactory('127.0.0.1', 0).buildProtocol(addr)
# set the reactor for this test
reactor = proto_helpers.MemoryReactor()
server.reactor = reactor
# make the connection
serverTransport = proto_helpers.StringTransport()
server.makeConnection(serverTransport)
# check that the ProxyClientFactory is connecting to the backend
self.assertEqual(len(reactor.tcpClients), 1)
# get the factory instance and check it's the one we expect
host, port, clientFactory, timeout, _ = reactor.tcpClients[0]
self.assertIsInstance(clientFactory, portforward.ProxyClientFactory)
# Connect it
client = clientFactory.buildProtocol(addr)
clientTransport = proto_helpers.StringTransport()
client.makeConnection(clientTransport)
# check that the producers are registered
self.assertIdentical(clientTransport.producer, serverTransport)
self.assertIdentical(serverTransport.producer, clientTransport)
# check the streaming attribute in both transports
self.assertTrue(clientTransport.streaming)
self.assertTrue(serverTransport.streaming)
class StringTransportTestCase(unittest.TestCase):
"""
Test L{proto_helpers.StringTransport} helper behaviour.
"""
def test_noUnicode(self):
"""
Test that L{proto_helpers.StringTransport} doesn't accept unicode data.
"""
s = proto_helpers.StringTransport()
self.assertRaises(TypeError, s.write, u'foo')