Custom Search

Sunday, July 26, 2015

OpenStack zeromq send_multipart code trace from oslo.messaging._drivers.impl_zmq to eventlet.green.zmq to pyzmq

OpenStack zeromq send_multipart code trace from oslo.messaging._drivers.impl_zmq to eventlet.green.zmq to pyzmq

1)

https://github.com/JioCloud/oslo.messaging/blob/stable/icehouse/oslo/messaging/_drivers/impl_zmq.py#L216

zmq = importutils.try_import('eventlet.green.zmq') <=====1

class ZmqSocket(object):

    """A tiny wrapper around ZeroMQ.

    Simplifies the send/recv protocol and connection management.
    Can be used as a Context (supports the 'with' statement).
    """
    def __init__(self, addr, zmq_type, bind=True, subscribe=None):
        self.sock = _get_ctxt().socket(zmq_type) <=====4

    def send(self, data, **kwargs):
        if not self.can_send:
            raise RPCException(_("You cannot send on this socket."))
        self.sock.send_multipart(data, **kwargs) <====8

def _get_ctxt():

    if not zmq:
        raise ImportError("Failed to import eventlet.green.zmq")
    global ZMQ_CTX
    if not ZMQ_CTX:
        ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) <=====2
    return ZMQ_CTX

2)
https://github.com/eventlet/eventlet/blob/master/eventlet/green/zmq.py#L302

__zmq__ = __import__('zmq')
_Socket = __zmq__.Socket
_Socket_send_multipart = _Socket.send_multipart<=====10

class Context(__zmq__.Context): <=====3
    """Subclass of :class:`zmq.core.context.Context`
    """
    def socket(self, socket_type): <=====5
        """Overridden method to ensure that the green version of socket is used

        Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
        that a :class:`Socket` with all of its send and recv methods set to be
        non-blocking is returned
        """
        if self.closed:
            raise ZMQError(ENOTSUP)
        return Socket(self, socket_type) <=====6

class Socket(_Socket): <=====7
    @_wraps(_Socket.send_multipart)<=====9
    def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====12
        """A send_multipart method that's safe to use when multiple
        greenthreads are calling send, send_multipart, recv and
        recv_multipart on the same socket.
        """
        if flags & NOBLOCK:
            return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or

        # acquire lock here so the subsequent calls to send for the
        # message parts after the first don't block
        with self._eventlet_send_lock:
            return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or

def _wraps(source_fn): <=====11
    """A decorator that copies the __name__ and __doc__ from the given
    function
    """
    def wrapper(dest_fn):
        dest_fn.__name__ = source_fn.__name__
        dest_fn.__doc__ = source_fn.__doc__
        return dest_fn
    return wrapper

3)
https://github.com/zeromq/pyzmq/blob/4b8c8a680bb86fc6e0a82447abd7821f046a026a/zmq/sugar/socket.py#L47

import zmq
from zmq.backend import Socket as SocketBase <======17

class Socket(SocketBase, AttributeSetter):
    def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====14
        for i,msg in enumerate(msg_parts):
            if isinstance(msg, (zmq.Frame, bytes, _buffer_type)):
                continue
            try:
                _buffer_type(msg)
            except Exception as e:
                rmsg = repr(msg)
                if len(rmsg) > 32:
                    rmsg = rmsg[:32] + '...'
                raise TypeError(
                    "Frame %i (%s) does not support the buffer interface." % (
                    i, rmsg,
                ))
        for msg in msg_parts[:-1]: <======15
            self.send(msg, SNDMORE|flags, copy=copy, track=track) <======16
        # Send the last part without the extra SNDMORE flag.
        return self.send(msg_parts[-1], flags, copy=copy, track=track)   

__all__ = ['Socket'] <=====

4)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/__init__.py

from .select import public_api, select_backend

backend = os.environ['PYZMQ_BACKEND'] <====a (Find Backend 'cython' or 'cffi')
if backend in ('cython', 'cffi'):
    backend = 'zmq.backend.%s' % backend

_ns = select_backend(backend) <====b (Load backend api modules)
globals().update(_ns)

__all__ = public_api

5)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/select.py

public_api = [  <====c (Backend api modules list)
    'Context',
    'Socket',
    ... ...
    ]
   
def select_backend(name):
    mod = __import__(name, fromlist=public_api) <====d (Load backend api modules)

6)

Here, selected backend is "cython"
https://github.com/zeromq/pyzmq/tree/master/zmq/backend/cython

7)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/cython/socket.pyx#L619

cdef class Socket: <======18
    cpdef object send(self, object data, int flags=0, copy=True, track=False): <======19
        _check_closed(self)
       
        if isinstance(data, unicode):
            raise TypeError("unicode not allowed, use send_string")
       
        if copy:
            # msg.bytes never returns the input data object
            # it is always a copy, but always the same copy
            if isinstance(data, Frame):
                data = data.buffer
            return _send_copy(self.handle, data, flags) <======20
        else:
            if isinstance(data, Frame):
                if track and not data.tracker:
                    raise ValueError('Not a tracked message')
                msg = data
            else:
                msg = Frame(data, track=track)
            return _send_frame(self.handle, msg, flags)
           
cdef inline object _send_copy(void *handle, object msg, int flags=0): <======21
    rc = zmq_msg_init_size(&data, msg_c_len)
    _check_rc(rc)
   
    while True:
        with nogil:
            memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
            rc = zmq_msg_send(&data, handle, flags) <======22
            if not rc < 0:
                rc2 = zmq_msg_close(&data)
        try:
            _check_rc(rc)
        except InterruptedSystemCall:
            continue
        else:
            break
    _check_rc(rc2)

1 comment:

  1. zeromq publish subscribe example https://github.com/zeromq/pyzmq/issues/669

    ReplyDelete