403Webshell
Server IP : 61.19.30.66  /  Your IP : 216.73.216.15
Web Server : Apache/2.2.22 (Ubuntu)
System : Linux klw 3.11.0-15-generic #25~precise1-Ubuntu SMP Thu Jan 30 17:39:31 UTC 2014 x86_64
User : www-data ( 33)
PHP Version : 5.3.10-1ubuntu3.48
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
MySQL : ON  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python2.7/dist-packages/landscape/broker/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python2.7/dist-packages/landscape/broker/store.pyc

}9Rc@sdZddlZddlZddlZddlZddlmZddlmZdZ	dZ
defdYZd	Z
dS(
sMessage storage.

The sequencing system we use in the message store may be quite confusing
if you haven't looked at it in the last 10 minutes. For that reason, let's
review the mechanics here.

Our goal is to implement a reasonably robust system for delivering messages
from us to our peer. The system should be smart enough to recover if the peer
happens to lose messages that we have already sent, provided that these
messages are not too old (we'll see below what 'too old' means).

Messages added to the store are identified by increasing natural numbers, the
first message added is identified by 0, the second by 1, and so on. We call
"sequence" the number identifying the next message that we want to send. For
example, if the store has been added ten messages (that we represent with
uppercase letters) and we want start sending the first of them, our store
would like like::

    sequence: 0
    messages: A, B, C, D, E, F, G, H, I, J
              ^

The "^" marker is what we call "pending offset" and is the displacement of the
message we want to send next from the first message we have in the store.

Let's say we now send to our peer a batch of 3 sequential messages. In the
payload we include the body of the messages being sent and the sequence, which
identifies the first message of the batch. In this case the payload would look
like (pseudo-code)::

    (sequence: 0, messages: A, B, C)

If everything works fine on the other end, our peer replies with a payload that
would like::

    (next-expected-sequence: 4)

meaning that the peer as received all the three messages that we sent, an so
the next message it expects to receive is the one identified by the number 4.
At this point we update both our pending offset and our sequence values, and
the store now looks like::

    sequence: 4
    messages: A, B, C, D, E, F, G, H, I, J
                       ^

Great, now let's pretend that we send another batch, this time with five
messages::

    (sequence: 4, messages: D, E, F, G, H)

Our peer receives them fine responding with a payload looking like::

    (next-expected-sequence: 9)

meaning that it received all the eight messages we sent so far and it's waiting
for the ninth. This is the second successful batch that we send in a row, so we
can be reasonably confident that at least the messages in the first batch are
not really needed anymore. We delete them and we update our sequence and
pending offset accordingly::

    sequence: 9
    messages: D, E, F, G, H, I, J
                             ^

Note that we still want to keep around the messages we sent in the very last
batch, just in case. Indeed we now try to send a third batch with the last two
messages that we have, but our peer surprisingly replies us with this payload::

    (next-expected-sequence: 6)

Ouch! This means that something bad happened and our peer has somehow lost not
only the two messages that we sent in the last batch, but also the last three
messages of the former batch :(

Luckly we've kept enough old messages around that we can try to send them
again, we update our sequence and pending offset and the store looks like::

    sequence: 6
    messages: D, E, F, G, H, I, J
                    ^

We can now start again sending messages using the same strategy.

Note however that in the worst case scenario we could receive from our peer
a next-expected-sequence value which is so old to be outside our buffer
of already-sent messages. In that case there is now way we can recover the
lost messages, and we'll just send the oldest one that we have.

See L{MessageStore} for details about how messages are stored on the file
system and L{landscape.lib.message.got_next_expected} to check how the
strategy for updating the pending offset and the sequence is implemented.
iN(tbpickle(t
SERVER_APIthtbtMessageStorecBsOeZdZeZdejdZdZdZdZ	dZ
dZdZd	Z
d
ZdZdZd
ZdZdZdZdZdZd$dZdZdZdZdZdZdZdZd$dZ ddZ!dZ"dZ#d Z$d!Z%d"Z&d#Z'RS(%sRA message store which stores its messages in a file system hierarchy.

    Beside the "sequence" and the "pending offset" values described in the
    module docstring above, the L{MessageStore} also stores what we call
    "server sequence", which is the next message number expected by the
    *client* itself (because we are in turn the peer of a specular message
    system running in the server, which tries to deliver messages to us).

    The server sequence is entirely unrelated to the stored messages, but is
    incremented when successfully receiving messages from the server, in the
    very same way described above but with the roles inverted.

    @param persist: a L{Persist} used to save state parameters like the
        accepted message types, sequence, server uuid etc.
    @param directory: base of the file system hierarchy
    icCsq||_||_||_i|_||_|jd|_|j}tj	j
|smtj|ndS(Ns
message-store(t	_get_timet
_directoryt_directory_sizet_schemast_original_persisttroot_att_persistt_message_dirtostpathtisdirtmakedirs(tselftpersistt	directorytdirectory_sizetget_timetmessage_dir((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyt__init__s					cCs|jjdS(sPersist metadata to disk.N(R	tsave(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytcommitscCsNt|tttfks!t|jjdtt||jdS(sSpecify the types of messages that the server will expect from us.

        If messages are added to the store which are not currently
        accepted, they will be saved but ignored until their type is
        accepted.
        saccepted-typesN(ttypettupletlisttsettAssertionErrorRtsortedt_reprocess_holding(Rttypes((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_accepted_typess!cCs|jjddS(s)Get a list of all accepted message types.saccepted-types((Rtget(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_accepted_typesscCs||jkS(s>Return bool indicating if C{type} is an accepted message type.(R$(RR((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytacceptsscCs|jjddS(sGet the current sequence.

        @return: The sequence number of the message that the server expects us
            to send on the next exchange.
        tsequencei(RR#(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_sequencescCs|jjd|dS(sSet the current sequence.

        Set the sequence number of the message that the server expects us to
        send on the next exchange.
        R&N(RR(Rtnumber((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_sequencescCs|jjddS(sGet the current server sequence.

        @return: the sequence number of the message that we will ask the server
            to send to us on the next exchange.
        tserver_sequencei(RR#(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_server_sequencescCs|jjd|dS(sSet the current server sequence.

        Set the sequence number of the message that we will ask the server to
        send to us on the next exchange.
        R*N(RR(RR(((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_server_sequencescCs|jjdS(s%Return the currently set server UUID.tserver_uuid(RR#(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_server_uuidscCs|jjd|dS(s=Change the known UUID from the server we're communicating to.R-N(RR(Rtuuid((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_server_uuidscCs|jjdS(s:Get the authentication token to use for the next exchange.texchange_token(RR#(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_exchange_tokenscCs|jjd|dS(s:Set the authentication token to use for the next exchange.R1N(RR(Rttoken((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_exchange_tokenscCs|jjddS(sGet the current pending offset.tpending_offseti(RR#(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_pending_offsetscCs|jjd|dS(sSet the current pending offset.

        Set the offset into the message pool to consider assigned to the
        current sequence number as returned by l{get_sequence}.
        R5N(RR(Rtval((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytset_pending_offsetscCs|j|j|dS(s/Increment the current pending offset by C{val}.N(R8R6(RR7((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytadd_pending_offsetscCstd|jDS(s&Return the number of pending messages.css|]}dVqdS(iN((t.0tx((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pys	<genexpr>s(tsumt_walk_pending_messages(R((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytcount_pending_messagesscCs|j}g}x|jD]}|dk	rGt||krGPn|j|j|}ytj|}Wn0tk
r}t	j
||j|tqX|d|kr|j|t
q|j|qW|S(s;Get any pending messages that aren't being held, up to max.RN(R$R=tNonetlent_get_contentRRtloadst
ValueErrortloggingt	exceptiont
_add_flagstBROKENtHELDtappend(Rtmaxtaccepted_typestmessagestfilenametdatatmessagete((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_pending_messagess
cCsyxrtj|jdtt|jD]H}tj|tjj	|d}tj
|s)tj|q)q)WdS(s>Delete messages which are unlikely to be needed in the future.texcludeiN(t	itertoolstislicet_walk_messagesRHRGR6R
tunlinkRtsplittlistdirtrmdir(Rtfntcontaining_dir((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytdelete_old_messagess
cCs5|jdx!|jD]}tj|qWdS(sRemove ALL stored messages.iN(R8RUR
RV(RRM((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytdelete_all_messagess
cCs||j|j<dS(sAdd a schema to be applied to messages of the given type.

        The schema must be an instance of L{landscape.schema.Message}.
        N(RR(Rtschema((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyt
add_schema
scCsd}|j}x|jdtD]n}|j|}t|ksR||krntj|j|krntSt|kr%t|kr%|d7}q%q%Wt	S(sReturn bool indicating if C{message_id} still hasn't been delivered.

        @param message_id: Identifier returned by the L{add()} method.
        iRRi(
R6RURGt
_get_flagsRHR
tstattst_inotTruetFalse(Rt
message_idtiR5RMtflags((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyt
is_pendingscCsd|kst|j|dj|}d|krH|j|d<ntj|}|j}t|dd}|j||j	t
j|d||j|ds|j
|t}nt
j|j}|S(sQueue a message for delivery.

        @param message: a C{dict} with a C{type} key and other keys conforming
            to the L{Message} schema for that specifc message type.
        @return: message_id, which is an identifier for the added message.
        Rtapis.tmptw(RRtcoerceRiRtdumpst_get_next_message_filenametopentwritetcloseR
trenameR%t
_set_flagsRHRaRb(RROtmessage_dataRMtfileRe((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytadd!s

cCs|j}|r|d}ntj|jdd}|j|}|se|j|d}nt||jkrtt|djddd}|j||}nA|jtt|d}tj|tj	j
|d}|S(Nit0t_ii(t_get_sorted_filenamesR
RRR@RtstrtintRWRtjoin(Rtmessage_dirst
newest_dirtmessage_filenamesRM((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRmDs
'
ccsQ|j}x>t|jdttD] \}}||kr)|Vq)q)WdS(s,Walk the files which are definitely pending.RRN(R6t	enumerateRURHRG(RR5RfRM((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyR=Ys
ccs|rt|}n|j}xc|D][}xR|j|D]A}t|j|}|sk||@r>|j||Vq>q>Wq(WdS(N(RRxR`R(RRRR|RRMRg((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRUas
tcCsQgtj|j|D]}|jds|^q}|jdd|S(Ns.tmptkeycSst|jddS(NRwi(RzRW(R;((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyt<lambda>ns(R
RXRtendswithtsort(RtdirR;t
message_files((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRxkscGstjj|j|S(N(R
RR{R(Rtargs((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRqscCs,t|}z|jSWd|jXdS(N(RntreadRp(RRMRt((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRAtsc
CsDd}|j}|j}x|jD]}|j|}ytj|j|}Wn9tk
r}tj	|t
|kr<|d7}q<q+X|d|k}t
|kr|r<|j}	tj
||	|j|	t|tt
q<q+|r2||kr2|j|t|tt
Bn|d7}q+WdS(se
        Unhold accepted messages left behind, and hold unaccepted
        pending messages.
        iiRN(R6R$RUR`RRBRARCRDRERHRmR
RqRrR(
RtoffsetR5RKtold_filenameRgRORPtacceptedtnew_filename((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyR {s(
&#cCs3tjj|}d|kr/|jddSdS(NRwiR(R
RtbasenameRW(RRR((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyR`scCsztjj|\}}tjj||jdd}|rf|ddjtt|7}ntj|||S(NRwiR(R
RRWR{RRRq(RRRgtdirnameRtnew_path((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRrs"&cCs!|j||j||dS(N(RrR`(RRRg((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRFsN((t__name__t
__module__t__doc__RRittimeRRR"R$R%R'R)R+R,R.R0R2R4R6R8R9R>R?RQR\R]R_RhRuRmR=RURxRRAR R`RrRF(((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyRlsH																						#		
					cOsGddlm}t||}x!|jD]}|j|q,W|S(sP
    Get a L{MessageStore} object with all Landscape message schemas added.
    i(tmessage_schemas(tlandscape.message_schemasRRtvaluesR_(RtkwargsRtstoreR^((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pytget_default_message_stores
(RRRSRDR
t
landscape.libRt	landscapeRRHRGtobjectRR(((s:/usr/lib/python2.7/dist-packages/landscape/broker/store.pyt<module>]s>

Youez - 2016 - github.com/yon3zu
LinuXploit