OpenStack zeromq send_multipart code trace from oslo.messaging._drivers.impl_zmq to eventlet.green.zmq to pyzmq
1)
https://github.com/JioCloud/oslo.messaging/blob/stable/icehouse/oslo/messaging/_drivers/impl_zmq.py#L216
zmq = importutils.try_import('eventlet.green.zmq') <=====1
class ZmqSocket(object):
"""A tiny wrapper around ZeroMQ.
Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = _get_ctxt().socket(zmq_type) <=====4
def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data, **kwargs) <====8
def _get_ctxt():
if not zmq:
raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) <=====2
return ZMQ_CTX
2)
https://github.com/eventlet/eventlet/blob/master/eventlet/green/zmq.py#L302
__zmq__ = __import__('zmq')
_Socket = __zmq__.Socket
_Socket_send_multipart = _Socket.send_multipart<=====10
class Context(__zmq__.Context): <=====3
"""Subclass of :class:`zmq.core.context.Context`
"""
def socket(self, socket_type): <=====5
"""Overridden method to ensure that the green version of socket is used
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
that a :class:`Socket` with all of its send and recv methods set to be
non-blocking is returned
"""
if self.closed:
raise ZMQError(ENOTSUP)
return Socket(self, socket_type) <=====6
class Socket(_Socket): <=====7
@_wraps(_Socket.send_multipart)<=====9
def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====12
"""A send_multipart method that's safe to use when multiple
greenthreads are calling send, send_multipart, recv and
recv_multipart on the same socket.
"""
if flags & NOBLOCK:
return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or
# acquire lock here so the subsequent calls to send for the
# message parts after the first don't block
with self._eventlet_send_lock:
return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or
def _wraps(source_fn): <=====11
"""A decorator that copies the __name__ and __doc__ from the given
function
"""
def wrapper(dest_fn):
dest_fn.__name__ = source_fn.__name__
dest_fn.__doc__ = source_fn.__doc__
return dest_fn
return wrapper
3)
https://github.com/zeromq/pyzmq/blob/4b8c8a680bb86fc6e0a82447abd7821f046a026a/zmq/sugar/socket.py#L47
import zmq
from zmq.backend import Socket as SocketBase <======17
class Socket(SocketBase, AttributeSetter):
def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====14
for i,msg in enumerate(msg_parts):
if isinstance(msg, (zmq.Frame, bytes, _buffer_type)):
continue
try:
_buffer_type(msg)
except Exception as e:
rmsg = repr(msg)
if len(rmsg) > 32:
rmsg = rmsg[:32] + '...'
raise TypeError(
"Frame %i (%s) does not support the buffer interface." % (
i, rmsg,
))
for msg in msg_parts[:-1]: <======15
self.send(msg, SNDMORE|flags, copy=copy, track=track) <======16
# Send the last part without the extra SNDMORE flag.
return self.send(msg_parts[-1], flags, copy=copy, track=track)
__all__ = ['Socket'] <=====
4)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/__init__.py
from .select import public_api, select_backend
backend = os.environ['PYZMQ_BACKEND'] <====a (Find Backend 'cython' or 'cffi')
if backend in ('cython', 'cffi'):
backend = 'zmq.backend.%s' % backend
_ns = select_backend(backend) <====b (Load backend api modules)
globals().update(_ns)
__all__ = public_api
5)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/select.py
public_api = [ <====c (Backend api modules list)
'Context',
'Socket',
... ...
]
def select_backend(name):
mod = __import__(name, fromlist=public_api) <====d (Load backend api modules)
6)
Here, selected backend is "cython"
https://github.com/zeromq/pyzmq/tree/master/zmq/backend/cython
7)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/cython/socket.pyx#L619
cdef class Socket: <======18
cpdef object send(self, object data, int flags=0, copy=True, track=False): <======19
_check_closed(self)
if isinstance(data, unicode):
raise TypeError("unicode not allowed, use send_string")
if copy:
# msg.bytes never returns the input data object
# it is always a copy, but always the same copy
if isinstance(data, Frame):
data = data.buffer
return _send_copy(self.handle, data, flags) <======20
else:
if isinstance(data, Frame):
if track and not data.tracker:
raise ValueError('Not a tracked message')
msg = data
else:
msg = Frame(data, track=track)
return _send_frame(self.handle, msg, flags)
cdef inline object _send_copy(void *handle, object msg, int flags=0): <======21
rc = zmq_msg_init_size(&data, msg_c_len)
_check_rc(rc)
while True:
with nogil:
memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
rc = zmq_msg_send(&data, handle, flags) <======22
if not rc < 0:
rc2 = zmq_msg_close(&data)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
_check_rc(rc2)
1)
https://github.com/JioCloud/oslo.messaging/blob/stable/icehouse/oslo/messaging/_drivers/impl_zmq.py#L216
zmq = importutils.try_import('eventlet.green.zmq') <=====1
class ZmqSocket(object):
"""A tiny wrapper around ZeroMQ.
Simplifies the send/recv protocol and connection management.
Can be used as a Context (supports the 'with' statement).
"""
def __init__(self, addr, zmq_type, bind=True, subscribe=None):
self.sock = _get_ctxt().socket(zmq_type) <=====4
def send(self, data, **kwargs):
if not self.can_send:
raise RPCException(_("You cannot send on this socket."))
self.sock.send_multipart(data, **kwargs) <====8
def _get_ctxt():
if not zmq:
raise ImportError("Failed to import eventlet.green.zmq")
global ZMQ_CTX
if not ZMQ_CTX:
ZMQ_CTX = zmq.Context(CONF.rpc_zmq_contexts) <=====2
return ZMQ_CTX
2)
https://github.com/eventlet/eventlet/blob/master/eventlet/green/zmq.py#L302
__zmq__ = __import__('zmq')
_Socket = __zmq__.Socket
_Socket_send_multipart = _Socket.send_multipart<=====10
class Context(__zmq__.Context): <=====3
"""Subclass of :class:`zmq.core.context.Context`
"""
def socket(self, socket_type): <=====5
"""Overridden method to ensure that the green version of socket is used
Behaves the same as :meth:`zmq.core.context.Context.socket`, but ensures
that a :class:`Socket` with all of its send and recv methods set to be
non-blocking is returned
"""
if self.closed:
raise ZMQError(ENOTSUP)
return Socket(self, socket_type) <=====6
class Socket(_Socket): <=====7
@_wraps(_Socket.send_multipart)<=====9
def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====12
"""A send_multipart method that's safe to use when multiple
greenthreads are calling send, send_multipart, recv and
recv_multipart on the same socket.
"""
if flags & NOBLOCK:
return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or
# acquire lock here so the subsequent calls to send for the
# message parts after the first don't block
with self._eventlet_send_lock:
return _Socket_send_multipart(self, msg_parts, flags, copy, track) <=====13/or
def _wraps(source_fn): <=====11
"""A decorator that copies the __name__ and __doc__ from the given
function
"""
def wrapper(dest_fn):
dest_fn.__name__ = source_fn.__name__
dest_fn.__doc__ = source_fn.__doc__
return dest_fn
return wrapper
3)
https://github.com/zeromq/pyzmq/blob/4b8c8a680bb86fc6e0a82447abd7821f046a026a/zmq/sugar/socket.py#L47
import zmq
from zmq.backend import Socket as SocketBase <======17
class Socket(SocketBase, AttributeSetter):
def send_multipart(self, msg_parts, flags=0, copy=True, track=False): <=====14
for i,msg in enumerate(msg_parts):
if isinstance(msg, (zmq.Frame, bytes, _buffer_type)):
continue
try:
_buffer_type(msg)
except Exception as e:
rmsg = repr(msg)
if len(rmsg) > 32:
rmsg = rmsg[:32] + '...'
raise TypeError(
"Frame %i (%s) does not support the buffer interface." % (
i, rmsg,
))
for msg in msg_parts[:-1]: <======15
self.send(msg, SNDMORE|flags, copy=copy, track=track) <======16
# Send the last part without the extra SNDMORE flag.
return self.send(msg_parts[-1], flags, copy=copy, track=track)
__all__ = ['Socket'] <=====
4)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/__init__.py
from .select import public_api, select_backend
backend = os.environ['PYZMQ_BACKEND'] <====a (Find Backend 'cython' or 'cffi')
if backend in ('cython', 'cffi'):
backend = 'zmq.backend.%s' % backend
_ns = select_backend(backend) <====b (Load backend api modules)
globals().update(_ns)
__all__ = public_api
5)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/select.py
public_api = [ <====c (Backend api modules list)
'Context',
'Socket',
... ...
]
def select_backend(name):
mod = __import__(name, fromlist=public_api) <====d (Load backend api modules)
6)
Here, selected backend is "cython"
https://github.com/zeromq/pyzmq/tree/master/zmq/backend/cython
7)
https://github.com/zeromq/pyzmq/blob/master/zmq/backend/cython/socket.pyx#L619
cdef class Socket: <======18
cpdef object send(self, object data, int flags=0, copy=True, track=False): <======19
_check_closed(self)
if isinstance(data, unicode):
raise TypeError("unicode not allowed, use send_string")
if copy:
# msg.bytes never returns the input data object
# it is always a copy, but always the same copy
if isinstance(data, Frame):
data = data.buffer
return _send_copy(self.handle, data, flags) <======20
else:
if isinstance(data, Frame):
if track and not data.tracker:
raise ValueError('Not a tracked message')
msg = data
else:
msg = Frame(data, track=track)
return _send_frame(self.handle, msg, flags)
cdef inline object _send_copy(void *handle, object msg, int flags=0): <======21
rc = zmq_msg_init_size(&data, msg_c_len)
_check_rc(rc)
while True:
with nogil:
memcpy(zmq_msg_data(&data), msg_c, zmq_msg_size(&data))
rc = zmq_msg_send(&data, handle, flags) <======22
if not rc < 0:
rc2 = zmq_msg_close(&data)
try:
_check_rc(rc)
except InterruptedSystemCall:
continue
else:
break
_check_rc(rc2)
zeromq publish subscribe example https://github.com/zeromq/pyzmq/issues/669
ReplyDelete