403Webshell
Server IP : 61.19.30.66  /  Your IP : 216.73.216.15
Web Server : Apache/2.2.22 (Ubuntu)
System : Linux klw 3.11.0-15-generic #25~precise1-Ubuntu SMP Thu Jan 30 17:39:31 UTC 2014 x86_64
User : www-data ( 33)
PHP Version : 5.3.10-1ubuntu3.48
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
MySQL : ON  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python2.7/dist-packages/twisted/test/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyc
ó
ê=Nc@s¦dZddlZddlZddlZddlmZddlmZmZm	Z	m
Z
mZmZddl
mZmZmZmZdejfd„ƒYZddd	„ƒYZd
ejfd„ƒYZdejfd
„ƒYZdZdejfd„ƒYZdejfd„ƒYZe	jedƒdkr^x/eeefD]Zde_qHWnddlZe	j edƒdkr¢xefD]Zde_qŒWndS(sC
Test methods in twisted.internet.threads and reactor thread APIs.
iÿÿÿÿN(tunittest(treactortdefert
interfacestthreadstprotocolterror(tfailuret
threadabletlogt
threadpooltReactorThreadsTestCasecBsheZdZd„Zd„Zd„Zd„Zd„Zd„Zd„Z	d„Z
d	„Zd
„ZRS(s.
    Tests for the reactor threading API.
    cCsJtjdƒ|jtjjdƒtjdƒ|jtjjdƒdS(s:
        Try to change maximum number of threads.
        i"iN(RtsuggestThreadPoolSizetassertEqualR
tmax(tself((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_suggestThreadPoolSizes

cCstjtjdƒS(sÄ
        The reactor's threadpool is only available when the reactor is running,
        so to have a sane behavior during the tests we make a dummy
        L{threads.deferToThread} call.
        i(Rt
deferToThreadttimetsleep(R((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt_waitForThread!scs"‡fd†}ˆjƒj|ƒS(s€
        Test callInThread functionality: set a C{threading.Event}, and check
        that it's not in the main thread.
        csqtjƒ‰g‰‡‡fd†}tj|ƒˆjdƒˆjƒsZˆjdƒnˆjˆtgƒdS(Ncs!ˆjtjƒƒˆjƒdS(N(tappendRtisInIOThreadtset((twaitertresult(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytthreadedFunc2sixsTimed out waiting for event.(	t	threadingtEventRtcallInThreadtwaittisSettfailR
tFalse(tignR(R(RRs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytcb/s

(RtaddCallback(RR#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_callInThread*s
cCsd„}|jƒj|ƒS(sk
        Test callFromThread functionality: from the main thread, and from
        another thread.
        cs`tjƒ}tjƒ‰‡fd†}tj|ƒtj|jdƒtj|ˆgdtƒS(NcstjˆjdƒdS(N(RtcallFromThreadtcallbacktNone((tfiredByOtherThread(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRHstfireOnOneErrback(	RtDeferredRRR&R'R(tDeferredListtTrue(R"tfiredByReactorThreadR((R)s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#Ds
(RR$(RR#((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_callFromThread?s	
cs"‡fd†}ˆjƒj|ƒS(sT
        Try to make an overflow on the reactor waker using callFromThread.
        cs€dˆ_tjƒ‰‡‡fd†}tj|ƒˆjdƒˆjƒs]ˆjdƒnˆjdk	r|t	jˆjƒSdS(NcsSxBtdƒD]4}ytjd„ƒWq
tjƒˆ_Pq
Xq
WˆjƒdS(Ni †cSsdS(N(R((((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt<lambda>`s(txrangeRR&RtFailureR(ti(RR(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytthreadedFunction[s	ixsTimed out waiting for event(
R(RRRRRRRR R(R"R4(R(Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#Xs	


(RR$(RR#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_wakerOverflowTscsgtjƒ‰g‰g‰‡‡‡‡‡fd†}‡‡‡‡fd†}ˆjƒj|ƒj|ƒS(sK
        Utility method to test L{threads.blockingCallFromThread}.
        cs>‡‡‡‡fd†}tj|ƒtjˆjˆjƒƒS(NcsTytjtˆƒ}Wn tk
r8}ˆj|ƒnXˆj|ƒˆjƒdS(N(RtblockingCallFromThreadRt	ExceptionRR(trte(terrorsRtresultstreactorFunc(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRus
(RRRRRt
getTimeout(R"R(R:RRR;R<(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytcb1ts	
cs&ˆjƒsˆjdƒnˆˆfS(NsTimed out waiting for event(RR (R"(RRR:R;(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytcb2s(RRRR$taddBoth(RR<R>R?((R:RRR;R<s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt_testBlockingCallFromThreadms
cs.d„}‡fd†}ˆj|ƒj|ƒS(sÀ
        Test blockingCallFromThread facility: create a thread, call a function
        in the reactor using L{threads.blockingCallFromThread}, and verify the
        result returned.
        cSs
tjdƒS(Ntfoo(Rtsucceed(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR<Žscsˆj|dddƒdS(NiRB(R
(tres(R(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#s(RAR$(RR<R#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_blockingCallFromThreadˆs	cs.d„}‡fd†}ˆj|ƒj|ƒS(sx
        Test blockingCallFromThread as above, but be sure the resulting
        Deferred is not already fired.
        cSs&tjƒ}tjd|jdƒ|S(Ngš™™™™™¹?tegg(RR+Rt	callLaterR'(td((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR<šscsˆj|dddƒdS(NiRF(R
(RD(R(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#žs(RAR$(RR<R#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt test_asyncBlockingCallFromThread•s	cs.d„}‡fd†}ˆj|ƒj|ƒS(s?
        Test error report for blockingCallFromThread.
        cSstjtdƒƒS(Ntbar(RR tRuntimeError(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR<§scsAˆjt|ddtƒƒˆj|ddjddƒdS(NiiRJ(tassert_t
isinstanceRKR
targs(RD(R(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#©s(RAR$(RR<R#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt test_errorBlockingCallFromThread£s	cs.d„}‡fd†}ˆj|ƒj|ƒS(s‰
        Test error report for blockingCallFromThread as above, but be sure the
        resulting Deferred is not already fired.
        cSs,tjƒ}tjd|jtdƒƒ|S(Ngš™™™™™¹?tspam(RR+RRGterrbackRK(RH((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR<´scsAˆjt|ddtƒƒˆj|ddjddƒdS(NiiRP(RLRMRKR
RN(RD(R(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR#¸s(RAR$(RR<R#((Rs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt%test_asyncErrorBlockingCallFromThread¯s	(
t__name__t
__module__t__doc__RRR%R/R5RARERIRORR(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRs	
							
		tCountercBseZdZdZd„ZRS(icCs?|jd}||jdkr2d|_t‚n||_dS(sA non thread-safe method.iN(tindextproblemt
ValueError(Rtnext((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytaddÃs

		(RSRTRWRXR[(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRV¿stDeferredResultTestCasecBsDeZdZd„Zd„Zd„Zd„Zd„Zd„ZRS(s(
    Test twisted.internet.threads.
    cCstjdƒdS(Ni(RR(R((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytsetUp×scCstjdƒdS(Ni(RR(R((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttearDownÛscsg‰d‰tjƒ‰‡‡‡‡fd†}tjgtˆƒD]}ˆj|fif^qCtj|fifgƒˆS(Ni
cs'ˆjˆtˆƒƒˆjdƒdS(N(R
trangeR'R(((RHRtLtN(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytfinishedäs(RR+RtcallMultipleInThreadR1RRR&(RRbR3((RHRR`Ras=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttestCallMultipleßs+cCs5tjdd„dddƒ}|j|jdƒ|S(s’
        L{threads.deferToThread} executes the function passed, and correctly
        handles the positional and keyword arguments given.
        icSs||S(N((txty((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR0ósiRfii(RRR$R
(RRH((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_deferredResultîscsDdtfd„ƒY‰‡fd†}tj|ƒ}|j|ˆƒS(s²
        Check that L{threads.deferToThread} return a failure object
        with an appropriate exception instance when the called
        function raises an exception.
        tNewErrorcBseZRS((RSRT(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRhþscs
ˆƒ‚dS(N(((Rh(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt
raiseErrors(R7RRt
assertFailure(RRiRH((Rhs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttest_deferredFailureøscCs2tjd„ƒ}|jd„ƒ|j|tƒS(s•
        Check that a successfull L{threads.deferToThread} followed by a one
        that raises an exception correctly result as a failure.
        cSsdS(N(R((((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR0scSstjd„ƒS(NcSsddS(Nii((((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR0s(RR(R"((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR0s(RRR$RjtZeroDivisionError(RRH((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt test_deferredFailureAfterSuccesss(	RSRTRUR]R^RdRgRkRm(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR\Òs				
	tDeferToThreadPoolTestCasecBs2eZdZd„Zd„Zd„Zd„ZRS(s=
    Test L{twisted.internet.threads.deferToThreadPool}.
    cCs&tjddƒ|_|jjƒdS(Nii(R
t
ThreadPoolttptstart(R((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR] scCs|jjƒdS(N(Rptstop(R((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR^%scCs>tjt|jdd„dddƒ}|j|jdƒ|S(s–
        L{threads.deferToThreadPool} executes the function passed, and
        correctly handles the positional and keyword arguments given.
        icSs||S(N((ReRf((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR0/siRfii(RtdeferToThreadPoolRRpR$R
(RRH((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRg)scsMdtfd„ƒY‰‡fd†}tjt|j|ƒ}|j|ˆƒS(s¶
        Check that L{threads.deferToThreadPool} return a failure object with an
        appropriate exception instance when the called function raises an
        exception.
        RhcBseZRS((RSRT(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRh:scs
ˆƒ‚dS(N(((Rh(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRi<s(R7RRsRRpRj(RRiRH((Rhs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRk4s(RSRTRUR]R^RgRk(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRns
			s_
import time
import %(reactor)s
%(reactor)s.install()

from twisted.internet import reactor

def threadedCall():
    print 'threaded call'

reactor.callInThread(threadedCall)

# Spin very briefly to try to give the thread a chance to run, if it
# is going to.  Is there a better way to achieve this behavior?
for i in xrange(100):
    time.sleep(0.0)
tThreadStartupProcessProtocolcBs,eZd„Zd„Zd„Zd„ZRS(cCs||_g|_g|_dS(N(Rbtoutterr(RRb((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt__init__Ws		cCs|jj|ƒdS(N(RuR(RRu((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytoutReceived\scCs|jj|ƒdS(N(RvR(RRv((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyterrReceived_scCs#|jj|j|j|fƒdS(N(RbR'RuRv(Rtreason((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytprocessEndedbs(RSRTRwRxRyR{(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyRtVs			tStartupBehaviorTestCasecBseZdZd„ZRS(s
    Test cases for the behavior of the reactor threadpool near startup
    boundary conditions.

    In particular, this asserts that no threaded calls are attempted
    until the reactor starts up, that calls attempted before it starts
    are in fact executed once it has started, and that in both cases,
    the reactor properly cleans itself up (which is tested for
    somewhat implicitly, by requiring a child process be able to exit,
    something it cannot do unless the threadpool has been properly
    torn down).
    cs͈jƒ}t|dƒ}|jtitjd6ƒ|jƒ‡fd†}‡fd†}tjj	ƒ}tj
jtj
ƒ|d<tjƒj||ƒ}t|ƒ‰tjˆtjd|f|ƒ|S(NtwRcss|\}}}|jtjƒr;ˆjd||fƒn|rXtjd|fƒnˆj|d|fƒdS(Ns.Process did not exit cleanly (out: %s err: %s)s'Unexpected output on standard error: %ss(Expected no output, instead received:
%s(tcheckRtProcessTerminatedR R	tmsgtfailIf(t.0RuRvRz(R(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytprogramFinished|scs!|jtjƒˆjdƒ|S(NtKILL(ttrapRtTimeoutErrort
signalProcess(Rv(tproto(s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pytprogramTimeout„s
t
PYTHONPATHtpython(tmktemptfiletwritet_callBeforeStartupProgramRRTtclosetostenvirontcopytpathseptjointsystpathRR+taddCallbacksRttspawnProcesst
executable(RtprognametprogfileRƒR‰tenvRH((RRˆs=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyttestCallBeforeStartupUnexecutedvs
(RSRTRURž(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyR|gss(No thread support, nothing to test here.s7No process support, cannot run subprocess thread tests.((!RUR–R‘Rt
twisted.trialRttwisted.internetRRRRRRttwisted.pythonRRR	R
tTestCaseRRVR\RnRtProcessProtocolRtR|tIReactorThreadsR(tclstskipRtIReactorProcess(((s=/usr/lib/python2.7/dist-packages/twisted/test/test_threads.pyt<module>s($."­I8+


Youez - 2016 - github.com/yon3zu
LinuXploit