integ/python/zerorpc-python/debian/deb_folder/patches/0001-Use-eventlet-instead-o...

2737 lines
92 KiB
Diff

From 68d1ba0a7157850ecdee09d64063f4c004faeed5 Mon Sep 17 00:00:00 2001
From: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
Date: Wed, 20 Jul 2022 12:26:15 -0300
Subject: [PATCH] Use eventlet instead of gevent
However, the following tests are still failing:
FAILED tests/test_middleware.py::test_server_inspect_exception_middleware - zerorpc.exceptions.LostRemote: Lost remote after 10s heartbeat
FAILED tests/test_middleware_before_after_exec.py::test_hook_server_before_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware'
FAILED tests/test_middleware_before_after_exec.py::test_hook_server_after_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware'
FAILED tests/test_pubpush.py::test_pubsub_inheritance - RuntimeError: The subscriber didn't receive any published message
FAILED tests/test_pubpush.py::test_pubsub_composite - RuntimeError: The subscriber didn't receive any published message
Signed-off-by: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
---
setup.py | 13 +-
tests/test_buffered_channel.py | 48 ++--
tests/test_client.py | 6 +-
tests/test_client_async.py | 16 +-
tests/test_client_heartbeat.py | 40 +--
tests/test_heartbeat.py | 46 ++--
tests/test_middleware.py | 88 +++---
tests/test_middleware_before_after_exec.py | 81 ++++--
tests/test_middleware_client.py | 76 ++++--
tests/test_pubpush.py | 35 ++-
tests/test_reqstream.py | 6 +-
tests/test_server.py | 18 +-
tests/test_zmq.py | 8 +-
tests/zmqbug.py | 270 ++++++++++++++----
tox.ini | 2 +-
zerorpc/channel.py | 30 +-
zerorpc/context.py | 2 +-
zerorpc/core.py | 32 +--
zerorpc/events.py | 42 ++-
zerorpc/gevent_zmq.py | 301 +++++++++++++--------
zerorpc/heartbeat.py | 20 +-
21 files changed, 748 insertions(+), 432 deletions(-)
diff --git a/setup.py b/setup.py
index b07ebcb..d57ddcb 100644
--- a/setup.py
+++ b/setup.py
@@ -44,12 +44,13 @@ requirements = [
if sys.version_info < (2, 7):
requirements.append('argparse')
-if sys.version_info < (2, 7):
- requirements.append('gevent>=1.1.0,<1.2.0')
-elif sys.version_info < (3, 0):
- requirements.append('gevent>=1.0')
-else:
- requirements.append('gevent>=1.1')
+# if sys.version_info < (2, 7):
+# requirements.append('gevent>=1.1.0,<1.2.0')
+# elif sys.version_info < (3, 0):
+# requirements.append('gevent>=1.0')
+# else:
+# requirements.append('gevent>=1.1')
+requirements.append('eventlet>=0.24.1')
with open("README.rst", "r") as fh:
long_description = fh.read()
diff --git a/tests/test_buffered_channel.py b/tests/test_buffered_channel.py
index 20b8173..f94152c 100644
--- a/tests/test_buffered_channel.py
+++ b/tests/test_buffered_channel.py
@@ -28,7 +28,7 @@ from __future__ import absolute_import
from builtins import range
import pytest
-import gevent
+import eventlet
import sys
from zerorpc import zmq
@@ -57,7 +57,7 @@ def test_close_server_bufchan():
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
server_bufchan.recv()
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE SERVER SOCKET!!!')
server_bufchan.close()
if sys.version_info < (2, 7):
@@ -92,7 +92,7 @@ def test_close_client_bufchan():
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
server_bufchan.recv()
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE CLIENT SOCKET!!!')
client_bufchan.close()
if sys.version_info < (2, 7):
@@ -125,7 +125,7 @@ def test_heartbeat_can_open_channel_server_close():
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE SERVER SOCKET!!!')
server_bufchan.close()
if sys.version_info < (2, 7):
@@ -160,12 +160,12 @@ def test_heartbeat_can_open_channel_client_close():
server_bufchan = zerorpc.BufferedChannel(server_hbchan)
try:
while True:
- gevent.sleep(1)
+ eventlet.sleep(1)
finally:
server_bufchan.close()
- server_coro = gevent.spawn(server_fn)
+ server_coro = eventlet.spawn(server_fn)
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE CLIENT SOCKET!!!')
client_bufchan.close()
client.close()
@@ -173,7 +173,7 @@ def test_heartbeat_can_open_channel_client_close():
pytest.raises(zerorpc.LostRemote, server_coro.get)
else:
with pytest.raises(zerorpc.LostRemote):
- server_coro.get()
+ server_coro.wait()
print('SERVER LOST CLIENT :)')
server.close()
@@ -200,7 +200,7 @@ def test_do_some_req_rep():
assert list(event.args) == [x + x * x]
client_bufchan.close()
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
coro_pool.spawn(client_do)
def server_do():
@@ -217,7 +217,7 @@ def test_do_some_req_rep():
coro_pool.spawn(server_do)
- coro_pool.join()
+ coro_pool.waitall()
client.close()
server.close()
@@ -250,7 +250,7 @@ def test_do_some_req_rep_lost_server():
client_bufchan.recv()
client_bufchan.close()
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
coro_pool.spawn(client_do)
def server_do():
@@ -266,7 +266,7 @@ def test_do_some_req_rep_lost_server():
coro_pool.spawn(server_do)
- coro_pool.join()
+ coro_pool.waitall()
client.close()
server.close()
@@ -293,7 +293,7 @@ def test_do_some_req_rep_lost_client():
assert list(event.args) == [x + x * x]
client_bufchan.close()
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
coro_pool.spawn(client_do)
def server_do():
@@ -316,7 +316,7 @@ def test_do_some_req_rep_lost_client():
coro_pool.spawn(server_do)
- coro_pool.join()
+ coro_pool.waitall()
client.close()
server.close()
@@ -353,7 +353,7 @@ def test_do_some_req_rep_client_timeout():
assert list(event.args) == [x]
client_bufchan.close()
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
coro_pool.spawn(client_do)
def server_do():
@@ -367,7 +367,7 @@ def test_do_some_req_rep_client_timeout():
for x in range(20):
event = server_bufchan.recv()
assert event.name == 'sleep'
- gevent.sleep(TIME_FACTOR * event.args[0])
+ eventlet.sleep(TIME_FACTOR * event.args[0])
server_bufchan.emit('OK', event.args)
pytest.raises(zerorpc.LostRemote, _do_with_assert_raises)
else:
@@ -375,14 +375,14 @@ def test_do_some_req_rep_client_timeout():
for x in range(20):
event = server_bufchan.recv()
assert event.name == 'sleep'
- gevent.sleep(TIME_FACTOR * event.args[0])
+ eventlet.sleep(TIME_FACTOR * event.args[0])
server_bufchan.emit('OK', event.args)
server_bufchan.close()
coro_pool.spawn(server_do)
- coro_pool.join()
+ coro_pool.waitall()
client.close()
server.close()
@@ -410,7 +410,7 @@ def test_congestion_control_server_pushing():
read_cnt.value += 1
client_bufchan.close()
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
coro_pool.spawn(client_do)
def server_do():
@@ -443,7 +443,7 @@ def test_congestion_control_server_pushing():
coro_pool.spawn(server_do)
try:
- coro_pool.join()
+ coro_pool.waitall()
except zerorpc.LostRemote:
pass
finally:
@@ -485,7 +485,7 @@ def test_on_close_if():
if event.name == 'done':
return
seen.append(event.args)
- gevent.sleep(0.1)
+ eventlet.sleep(0.1)
def server_do():
for i in range(0, 10):
@@ -494,12 +494,12 @@ def test_on_close_if():
client_bufchan.on_close_if = is_stream_done
- coro_pool = gevent.pool.Pool()
+ coro_pool = eventlet.greenpool.GreenPool()
g1 = coro_pool.spawn(client_do)
g2 = coro_pool.spawn(server_do)
- g1.get() # Re-raise any exceptions...
- g2.get()
+ g1.wait() # Re-raise any exceptions...
+ g2.wait()
assert seen == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
diff --git a/tests/test_client.py b/tests/test_client.py
index 6a692b3..b9e3be3 100644
--- a/tests/test_client.py
+++ b/tests/test_client.py
@@ -24,7 +24,7 @@
from __future__ import absolute_import
-import gevent
+import eventlet
import zerorpc
from .testutils import teardown, random_ipc_endpoint
@@ -39,7 +39,7 @@ def test_client_connect():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client()
client.connect(endpoint)
@@ -56,7 +56,7 @@ def test_client_quick_connect():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(endpoint)
diff --git a/tests/test_client_async.py b/tests/test_client_async.py
index ced4b1f..a2272ff 100644
--- a/tests/test_client_async.py
+++ b/tests/test_client_async.py
@@ -26,7 +26,7 @@
from __future__ import print_function
from __future__ import absolute_import
import pytest
-import gevent
+import eventlet
import sys
from zerorpc import zmq
@@ -43,12 +43,12 @@ def test_client_server_client_timeout_with_async():
return 42
def add(self, a, b):
- gevent.sleep(TIME_FACTOR * 10)
+ eventlet.sleep(TIME_FACTOR * 10)
return a + b
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
client.connect(endpoint)
@@ -57,11 +57,11 @@ def test_client_server_client_timeout_with_async():
if sys.version_info < (2, 7):
def _do_with_assert_raises():
- print(async_result.get())
+ print(async_result.wait())
pytest.raises(zerorpc.TimeoutExpired, _do_with_assert_raises)
else:
with pytest.raises(zerorpc.TimeoutExpired):
- print(async_result.get())
+ print(async_result.wait())
client.close()
srv.close()
@@ -79,13 +79,13 @@ def test_client_server_with_async():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client()
client.connect(endpoint)
async_result = client.lolita(async_=True)
- assert async_result.get() == 42
+ assert async_result.wait() == 42
async_result = client.add(1, 4, async_=True)
- assert async_result.get() == 5
+ assert async_result.wait() == 5
diff --git a/tests/test_client_heartbeat.py b/tests/test_client_heartbeat.py
index 6b552a4..908c866 100644
--- a/tests/test_client_heartbeat.py
+++ b/tests/test_client_heartbeat.py
@@ -28,7 +28,7 @@ from __future__ import absolute_import
from builtins import next
from builtins import range
-import gevent
+import eventlet
import zerorpc
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
@@ -43,11 +43,11 @@ def test_client_server_hearbeat():
return 42
def slow(self):
- gevent.sleep(TIME_FACTOR * 10)
+ eventlet.sleep(TIME_FACTOR * 10)
srv = MySrv(heartbeat=TIME_FACTOR * 1)
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(heartbeat=TIME_FACTOR * 1)
client.connect(endpoint)
@@ -62,13 +62,13 @@ def test_client_server_activate_heartbeat():
class MySrv(zerorpc.Server):
def lolita(self):
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
return 42
srv = MySrv(heartbeat=TIME_FACTOR * 4)
srv.bind(endpoint)
- gevent.spawn(srv.run)
- gevent.sleep(0)
+ eventlet.spawn(srv.run)
+ eventlet.sleep(0)
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4)
client.connect(endpoint)
@@ -86,13 +86,13 @@ def test_client_server_passive_hearbeat():
return 42
def slow(self):
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
return 2
srv = MySrv(heartbeat=TIME_FACTOR * 4)
srv.bind(endpoint)
- gevent.spawn(srv.run)
- gevent.sleep(0)
+ eventlet.spawn(srv.run)
+ eventlet.sleep(0)
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4, passive_heartbeat=True)
client.connect(endpoint)
@@ -112,16 +112,16 @@ def test_client_hb_doesnt_linger_on_streaming():
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
def test_client():
assert list(client1.iter()) == list(range(42))
print('sleep 3s')
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
- gevent.spawn(test_client).join()
+ eventlet.spawn(test_client).wait()
def est_client_drop_few():
@@ -134,7 +134,7 @@ def est_client_drop_few():
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
client2 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
@@ -143,7 +143,7 @@ def est_client_drop_few():
assert client1.lolita() == 42
assert client2.lolita() == 42
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
assert client3.lolita() == 42
@@ -158,7 +158,7 @@ def test_client_drop_empty_stream():
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
@@ -167,9 +167,9 @@ def test_client_drop_empty_stream():
i = client1.iter()
print('sleep 3s')
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
- gevent.spawn(test_client).join()
+ eventlet.spawn(test_client).wait()
def test_client_drop_stream():
@@ -183,7 +183,7 @@ def test_client_drop_stream():
srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context())
@@ -195,6 +195,6 @@ def test_client_drop_stream():
assert list(next(i) for x in range(142)) == list(range(142))
print('sleep 3s')
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
- gevent.spawn(test_client).join()
+ eventlet.spawn(test_client).wait()
diff --git a/tests/test_heartbeat.py b/tests/test_heartbeat.py
index 14c66fd..c34d204 100644
--- a/tests/test_heartbeat.py
+++ b/tests/test_heartbeat.py
@@ -28,7 +28,7 @@ from __future__ import absolute_import
from builtins import range
import pytest
-import gevent
+import eventlet
import sys
from zerorpc import zmq
@@ -55,7 +55,7 @@ def test_close_server_hbchan():
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
server_hbchan.recv()
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE SERVER SOCKET!!!')
server_hbchan.close()
if sys.version_info < (2, 7):
@@ -88,7 +88,7 @@ def test_close_client_hbchan():
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
server_hbchan.recv()
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE CLIENT SOCKET!!!')
client_hbchan.close()
if sys.version_info < (2, 7):
@@ -119,7 +119,7 @@ def test_heartbeat_can_open_channel_server_close():
server_channel = server.channel(event)
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE SERVER SOCKET!!!')
server_hbchan.close()
if sys.version_info < (2, 7):
@@ -150,7 +150,7 @@ def test_heartbeat_can_open_channel_client_close():
server_channel = server.channel(event)
server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2)
- gevent.sleep(TIME_FACTOR * 3)
+ eventlet.sleep(TIME_FACTOR * 3)
print('CLOSE CLIENT SOCKET!!!')
client_hbchan.close()
client.close()
@@ -189,7 +189,7 @@ def test_do_some_req_rep():
assert list(event.args) == [x + x * x]
client_hbchan.close()
- client_task = gevent.spawn(client_do)
+ client_task = eventlet.spawn(client_do)
def server_do():
for x in range(20):
@@ -198,10 +198,10 @@ def test_do_some_req_rep():
server_hbchan.emit('OK', (sum(event.args),))
server_hbchan.close()
- server_task = gevent.spawn(server_do)
+ server_task = eventlet.spawn(server_do)
- server_task.get()
- client_task.get()
+ server_task.wait()
+ client_task.wait()
client.close()
server.close()
@@ -233,7 +233,7 @@ def test_do_some_req_rep_lost_server():
client_hbchan.recv()
client_hbchan.close()
- client_task = gevent.spawn(client_do)
+ client_task = eventlet.spawn(client_do)
def server_do():
event = server.recv()
@@ -245,10 +245,10 @@ def test_do_some_req_rep_lost_server():
server_hbchan.emit('OK', (sum(event.args),))
server_hbchan.close()
- server_task = gevent.spawn(server_do)
+ server_task = eventlet.spawn(server_do)
- server_task.get()
- client_task.get()
+ server_task.wait()
+ client_task.wait()
client.close()
server.close()
@@ -274,7 +274,7 @@ def test_do_some_req_rep_lost_client():
assert list(event.args) == [x + x * x]
client_hbchan.close()
- client_task = gevent.spawn(client_do)
+ client_task = eventlet.spawn(client_do)
def server_do():
event = server.recv()
@@ -293,10 +293,10 @@ def test_do_some_req_rep_lost_client():
server_hbchan.recv()
server_hbchan.close()
- server_task = gevent.spawn(server_do)
+ server_task = eventlet.spawn(server_do)
- server_task.get()
- client_task.get()
+ server_task.wait()
+ client_task.wait()
client.close()
server.close()
@@ -332,7 +332,7 @@ def test_do_some_req_rep_client_timeout():
assert list(event.args) == [x]
client_hbchan.close()
- client_task = gevent.spawn(client_do)
+ client_task = eventlet.spawn(client_do)
def server_do():
event = server.recv()
@@ -344,7 +344,7 @@ def test_do_some_req_rep_client_timeout():
for x in range(20):
event = server_hbchan.recv()
assert event.name == 'sleep'
- gevent.sleep(TIME_FACTOR * event.args[0])
+ eventlet.sleep(TIME_FACTOR * event.args[0])
server_hbchan.emit('OK', event.args)
pytest.raises(zerorpc.LostRemote, _do_with_assert_raises)
else:
@@ -352,13 +352,13 @@ def test_do_some_req_rep_client_timeout():
for x in range(20):
event = server_hbchan.recv()
assert event.name == 'sleep'
- gevent.sleep(TIME_FACTOR * event.args[0])
+ eventlet.sleep(TIME_FACTOR * event.args[0])
server_hbchan.emit('OK', event.args)
server_hbchan.close()
- server_task = gevent.spawn(server_do)
+ server_task = eventlet.spawn(server_do)
- server_task.get()
- client_task.get()
+ server_task.wait()
+ client_task.wait()
client.close()
server.close()
diff --git a/tests/test_middleware.py b/tests/test_middleware.py
index 3163a3a..12ba899 100644
--- a/tests/test_middleware.py
+++ b/tests/test_middleware.py
@@ -26,11 +26,13 @@
from __future__ import print_function
from __future__ import absolute_import
from builtins import str
+
+import greenlet
from future.utils import tobytes
import pytest
-import gevent
-import gevent.local
+import eventlet
+import eventlet.corolocal
import random
import hashlib
import sys
@@ -109,7 +111,7 @@ def test_resolve_endpoint_events():
cnt = c.register_middleware(Resolver())
assert cnt == 1
srv.bind('some_service')
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(heartbeat=TIME_FACTOR * 1, context=c)
client.connect('some_service')
@@ -123,7 +125,7 @@ class Tracer(object):
'''Used by test_task_context_* tests'''
def __init__(self, identity):
self._identity = identity
- self._locals = gevent.local.local()
+ self._locals = eventlet.corolocal.local()
self._log = []
@property
@@ -169,7 +171,7 @@ def test_task_context():
srv = zerorpc.Server(Srv(), context=srv_ctx)
srv.bind(endpoint)
- srv_task = gevent.spawn(srv.run)
+ srv_task = eventlet.spawn(srv.run)
c = zerorpc.Client(context=cli_ctx)
c.connect(endpoint)
@@ -179,7 +181,10 @@ def test_task_context():
assert x == 42
srv.stop()
- srv_task.join()
+ try:
+ srv_task.wait()
+ except greenlet.GreenletExit:
+ pass
assert cli_tracer._log == [
('new', cli_tracer.trace_id),
@@ -212,7 +217,7 @@ def test_task_context_relay():
srv = zerorpc.Server(Srv(), context=srv_ctx)
srv.bind(endpoint1)
- srv_task = gevent.spawn(srv.run)
+ srv_task = eventlet.spawn(srv.run)
c_relay = zerorpc.Client(context=srv_relay_ctx)
c_relay.connect(endpoint1)
@@ -223,7 +228,7 @@ def test_task_context_relay():
srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx)
srv_relay.bind(endpoint2)
- srv_relay_task = gevent.spawn(srv_relay.run)
+ srv_relay_task = eventlet.spawn(srv_relay.run)
c = zerorpc.Client(context=cli_ctx)
c.connect(endpoint2)
@@ -232,8 +237,14 @@ def test_task_context_relay():
srv_relay.stop()
srv.stop()
- srv_relay_task.join()
- srv_task.join()
+ try:
+ srv_relay_task.wait()
+ except greenlet.GreenletExit:
+ pass
+ try:
+ srv_task.wait()
+ except greenlet.GreenletExit:
+ pass
assert cli_tracer._log == [
('new', cli_tracer.trace_id),
@@ -268,7 +279,7 @@ def test_task_context_relay_fork():
srv = zerorpc.Server(Srv(), context=srv_ctx)
srv.bind(endpoint1)
- srv_task = gevent.spawn(srv.run)
+ srv_task = eventlet.spawn(srv.run)
c_relay = zerorpc.Client(context=srv_relay_ctx)
c_relay.connect(endpoint1)
@@ -277,16 +288,16 @@ def test_task_context_relay_fork():
def echo(self, msg):
def dothework(msg):
return c_relay.echo(msg) + 'relayed'
- g = gevent.spawn(zerorpc.fork_task_context(dothework,
+ g = eventlet.spawn(zerorpc.fork_task_context(dothework,
srv_relay_ctx), 'relay' + msg)
print('relaying in separate task:', g)
- r = g.get()
+ r = g.wait()
print('back to main task')
return r
srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx)
srv_relay.bind(endpoint2)
- srv_relay_task = gevent.spawn(srv_relay.run)
+ srv_relay_task = eventlet.spawn(srv_relay.run)
c = zerorpc.Client(context=cli_ctx)
c.connect(endpoint2)
@@ -295,8 +306,15 @@ def test_task_context_relay_fork():
srv_relay.stop()
srv.stop()
- srv_relay_task.join()
- srv_task.join()
+ try:
+ srv_relay_task.wait()
+ except greenlet.GreenletExit:
+ pass
+
+ try:
+ srv_task.wait()
+ except greenlet.GreenletExit:
+ pass
assert cli_tracer._log == [
('new', cli_tracer.trace_id),
@@ -324,25 +342,28 @@ def test_task_context_pushpull():
pusher_tracer = Tracer('[pusher]')
pusher_ctx.register_middleware(pusher_tracer)
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Puller(object):
def echo(self, msg):
- trigger.set()
+ trigger.send()
puller = zerorpc.Puller(Puller(), context=puller_ctx)
puller.bind(endpoint)
- puller_task = gevent.spawn(puller.run)
+ puller_task = eventlet.spawn(puller.run)
c = zerorpc.Pusher(context=pusher_ctx)
c.connect(endpoint)
- trigger.clear()
+ # trigger.reset()
c.echo('hello')
trigger.wait()
puller.stop()
- puller_task.join()
+ try:
+ puller_task.wait()
+ except greenlet.GreenletExit:
+ pass
assert pusher_tracer._log == [
('new', pusher_tracer.trace_id),
@@ -362,29 +383,32 @@ def test_task_context_pubsub():
publisher_tracer = Tracer('[publisher]')
publisher_ctx.register_middleware(publisher_tracer)
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Subscriber(object):
def echo(self, msg):
- trigger.set()
+ trigger.send()
subscriber = zerorpc.Subscriber(Subscriber(), context=subscriber_ctx)
subscriber.bind(endpoint)
- subscriber_task = gevent.spawn(subscriber.run)
+ subscriber_task = eventlet.spawn(subscriber.run)
c = zerorpc.Publisher(context=publisher_ctx)
c.connect(endpoint)
- trigger.clear()
+ # trigger.reset()
# We need this retry logic to wait that the subscriber.run coroutine starts
# reading (the published messages will go to /dev/null until then).
- while not trigger.is_set():
+ while not trigger.ready():
c.echo('pub...')
if trigger.wait(TIME_FACTOR * 1):
break
subscriber.stop()
- subscriber_task.join()
+ try:
+ subscriber_task.wait()
+ except greenlet.GreenletExit:
+ pass
print(publisher_tracer._log)
assert ('new', publisher_tracer.trace_id) in publisher_tracer._log
@@ -429,7 +453,7 @@ def test_server_inspect_exception_middleware():
module = Srv()
server = zerorpc.Server(module, context=ctx)
server.bind(endpoint)
- gevent.spawn(server.run)
+ eventlet.spawn(server.run)
client = zerorpc.Client()
client.connect(endpoint)
@@ -447,7 +471,7 @@ def test_server_inspect_exception_middleware():
def test_server_inspect_exception_middleware_puller():
endpoint = random_ipc_endpoint()
- barrier = gevent.event.Event()
+ barrier = eventlet.event.Event()
middleware = InspectExceptionMiddleware(barrier)
ctx = zerorpc.Context()
ctx.register_middleware(middleware)
@@ -455,12 +479,12 @@ def test_server_inspect_exception_middleware_puller():
module = Srv()
server = zerorpc.Puller(module, context=ctx)
server.bind(endpoint)
- gevent.spawn(server.run)
+ eventlet.spawn(server.run)
client = zerorpc.Pusher()
client.connect(endpoint)
- barrier.clear()
+ # barrier.reset()
client.echo('This is a test which should call the InspectExceptionMiddleware')
barrier.wait(timeout=TIME_FACTOR * 2)
@@ -479,7 +503,7 @@ def test_server_inspect_exception_middleware_stream():
module = Srv()
server = zerorpc.Server(module, context=ctx)
server.bind(endpoint)
- gevent.spawn(server.run)
+ eventlet.spawn(server.run)
client = zerorpc.Client()
client.connect(endpoint)
diff --git a/tests/test_middleware_before_after_exec.py b/tests/test_middleware_before_after_exec.py
index 5dafeb0..32bfc4c 100644
--- a/tests/test_middleware_before_after_exec.py
+++ b/tests/test_middleware_before_after_exec.py
@@ -25,7 +25,9 @@
from __future__ import absolute_import
from builtins import range
-import gevent
+import eventlet
+import greenlet
+
import zerorpc
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
@@ -39,7 +41,7 @@ class EchoModule(object):
def echo(self, msg):
self.last_msg = 'echo: ' + msg
if self._trigger:
- self._trigger.set()
+ self._trigger.send()
return self.last_msg
@zerorpc.stream
@@ -63,7 +65,7 @@ def test_hook_server_before_exec():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -78,17 +80,20 @@ def test_hook_server_before_exec():
assert test_middleware.called == True
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_before_exec_puller():
zero_ctx = zerorpc.Context()
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
endpoint = random_ipc_endpoint()
echo_module = EchoModule(trigger)
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Pusher()
test_client.connect(endpoint)
@@ -96,7 +101,7 @@ def test_hook_server_before_exec_puller():
test_client.echo("test")
trigger.wait(timeout=TIME_FACTOR * 2)
assert echo_module.last_msg == "echo: test"
- trigger.clear()
+ # trigger.reset()
# Test with a middleware
test_middleware = ServerBeforeExecMiddleware()
@@ -108,7 +113,10 @@ def test_hook_server_before_exec_puller():
assert test_middleware.called == True
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_before_exec_stream():
zero_ctx = zerorpc.Context()
@@ -116,7 +124,7 @@ def test_hook_server_before_exec_stream():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -135,7 +143,10 @@ def test_hook_server_before_exec_stream():
assert echo == "echo: test"
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
class ServerAfterExecMiddleware(object):
@@ -153,7 +164,7 @@ def test_hook_server_after_exec():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -170,17 +181,20 @@ def test_hook_server_after_exec():
assert test_middleware.reply_event_name == 'OK'
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_after_exec_puller():
zero_ctx = zerorpc.Context()
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
endpoint = random_ipc_endpoint()
echo_module = EchoModule(trigger)
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Pusher()
test_client.connect(endpoint)
@@ -188,7 +202,7 @@ def test_hook_server_after_exec_puller():
test_client.echo("test")
trigger.wait(timeout=TIME_FACTOR * 2)
assert echo_module.last_msg == "echo: test"
- trigger.clear()
+ # trigger.reset()
# Test with a middleware
test_middleware = ServerAfterExecMiddleware()
@@ -202,7 +216,10 @@ def test_hook_server_after_exec_puller():
assert test_middleware.reply_event_name is None
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_after_exec_stream():
zero_ctx = zerorpc.Context()
@@ -210,7 +227,7 @@ def test_hook_server_after_exec_stream():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -232,7 +249,10 @@ def test_hook_server_after_exec_stream():
assert test_middleware.reply_event_name == 'STREAM_DONE'
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
class BrokenEchoModule(object):
@@ -246,7 +266,7 @@ class BrokenEchoModule(object):
raise RuntimeError("BrokenEchoModule")
finally:
if self._trigger:
- self._trigger.set()
+ self._trigger.send()
@zerorpc.stream
def echoes(self, msg):
@@ -258,7 +278,7 @@ def test_hook_server_after_exec_on_error():
test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -272,17 +292,20 @@ def test_hook_server_after_exec_on_error():
assert test_middleware.called == False
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_after_exec_on_error_puller():
zero_ctx = zerorpc.Context()
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
endpoint = random_ipc_endpoint()
echo_module = BrokenEchoModule(trigger)
test_server = zerorpc.Puller(echo_module, context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Pusher()
test_client.connect(endpoint)
@@ -298,7 +321,10 @@ def test_hook_server_after_exec_on_error_puller():
assert test_middleware.called == False
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_server_after_exec_on_error_stream():
zero_ctx = zerorpc.Context()
@@ -306,7 +332,7 @@ def test_hook_server_after_exec_on_error_stream():
test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client()
test_client.connect(endpoint)
@@ -320,4 +346,7 @@ def test_hook_server_after_exec_on_error_stream():
assert test_middleware.called == False
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
diff --git a/tests/test_middleware_client.py b/tests/test_middleware_client.py
index 943985e..64f7b5a 100644
--- a/tests/test_middleware_client.py
+++ b/tests/test_middleware_client.py
@@ -25,7 +25,9 @@
from __future__ import absolute_import
from builtins import range
-import gevent
+import eventlet
+import greenlet
+
import zerorpc
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
@@ -62,7 +64,7 @@ class EchoModule(object):
def timeout(self, msg):
self.last_msg = "timeout: " + msg
- gevent.sleep(TIME_FACTOR * 2)
+ eventlet.sleep(TIME_FACTOR * 2)
def test_hook_client_before_request():
@@ -78,7 +80,7 @@ def test_hook_client_before_request():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -93,7 +95,10 @@ def test_hook_client_before_request():
assert test_middleware.method == 'echo'
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
class ClientAfterRequestMiddleware(object):
def __init__(self):
@@ -111,7 +116,7 @@ def test_hook_client_after_request():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -126,7 +131,10 @@ def test_hook_client_after_request():
assert test_middleware.retcode == 'OK'
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_after_request_stream():
zero_ctx = zerorpc.Context()
@@ -134,7 +142,7 @@ def test_hook_client_after_request_stream():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -156,7 +164,10 @@ def test_hook_client_after_request_stream():
assert test_middleware.retcode == 'STREAM_DONE'
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_after_request_timeout():
@@ -176,7 +187,7 @@ def test_hook_client_after_request_timeout():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
test_client.connect(endpoint)
@@ -189,7 +200,10 @@ def test_hook_client_after_request_timeout():
assert "timeout" in ex.args[0]
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
class ClientAfterFailedRequestMiddleware(object):
def __init__(self):
@@ -212,7 +226,7 @@ def test_hook_client_after_request_remote_error():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
test_client.connect(endpoint)
@@ -224,7 +238,10 @@ def test_hook_client_after_request_remote_error():
assert test_middleware.called == True
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_after_request_remote_error_stream():
@@ -235,7 +252,7 @@ def test_hook_client_after_request_remote_error_stream():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx)
test_client.connect(endpoint)
@@ -247,7 +264,10 @@ def test_hook_client_after_request_remote_error_stream():
assert test_middleware.called == True
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_handle_remote_error_inspect():
@@ -264,7 +284,7 @@ def test_hook_client_handle_remote_error_inspect():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -277,7 +297,10 @@ def test_hook_client_handle_remote_error_inspect():
assert ex.name == "RuntimeError"
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
# This is a seriously broken idea, but possible nonetheless
class ClientEvalRemoteErrorMiddleware(object):
@@ -298,7 +321,7 @@ def test_hook_client_handle_remote_error_eval():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -311,7 +334,10 @@ def test_hook_client_handle_remote_error_eval():
assert "BrokenEchoModule" in ex.args[0]
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_handle_remote_error_eval_stream():
test_middleware = ClientEvalRemoteErrorMiddleware()
@@ -321,7 +347,7 @@ def test_hook_client_handle_remote_error_eval_stream():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -334,7 +360,10 @@ def test_hook_client_handle_remote_error_eval_stream():
assert "BrokenEchoModule" in ex.args[0]
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
def test_hook_client_after_request_custom_error():
@@ -360,7 +389,7 @@ def test_hook_client_after_request_custom_error():
test_server = zerorpc.Server(EchoModule(), context=zero_ctx)
test_server.bind(endpoint)
- test_server_task = gevent.spawn(test_server.run)
+ test_server_task = eventlet.spawn(test_server.run)
test_client = zerorpc.Client(context=zero_ctx)
test_client.connect(endpoint)
@@ -373,4 +402,7 @@ def test_hook_client_after_request_custom_error():
assert "BrokenEchoModule" in ex.args[0]
test_server.stop()
- test_server_task.join()
+ try:
+ test_server_task.wait()
+ except greenlet.GreenletExit:
+ pass
diff --git a/tests/test_pubpush.py b/tests/test_pubpush.py
index a99f9b4..512a1a0 100644
--- a/tests/test_pubpush.py
+++ b/tests/test_pubpush.py
@@ -27,8 +27,7 @@ from __future__ import print_function
from __future__ import absolute_import
from builtins import range
-import gevent
-import gevent.event
+import eventlet
import zerorpc
from .testutils import teardown, random_ipc_endpoint
@@ -39,19 +38,19 @@ def test_pushpull_inheritance():
pusher = zerorpc.Pusher()
pusher.bind(endpoint)
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Puller(zerorpc.Puller):
def lolita(self, a, b):
print('lolita', a, b)
assert a + b == 3
- trigger.set()
+ trigger.send()
puller = Puller()
puller.connect(endpoint)
- gevent.spawn(puller.run)
+ eventlet.spawn(puller.run)
- trigger.clear()
+ # trigger.reset()
pusher.lolita(1, 2)
trigger.wait()
print('done')
@@ -62,19 +61,19 @@ def test_pubsub_inheritance():
publisher = zerorpc.Publisher()
publisher.bind(endpoint)
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Subscriber(zerorpc.Subscriber):
def lolita(self, a, b):
print('lolita', a, b)
assert a + b == 3
- trigger.set()
+ trigger.send()
subscriber = Subscriber()
subscriber.connect(endpoint)
- gevent.spawn(subscriber.run)
+ eventlet.spawn(subscriber.run)
- trigger.clear()
+ # trigger.reset()
# We need this retry logic to wait that the subscriber.run coroutine starts
# reading (the published messages will go to /dev/null until then).
for attempt in range(0, 10):
@@ -87,13 +86,13 @@ def test_pubsub_inheritance():
def test_pushpull_composite():
endpoint = random_ipc_endpoint()
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Puller(object):
def lolita(self, a, b):
print('lolita', a, b)
assert a + b == 3
- trigger.set()
+ trigger.send()
pusher = zerorpc.Pusher()
pusher.bind(endpoint)
@@ -101,9 +100,9 @@ def test_pushpull_composite():
service = Puller()
puller = zerorpc.Puller(service)
puller.connect(endpoint)
- gevent.spawn(puller.run)
+ eventlet.spawn(puller.run)
- trigger.clear()
+ # trigger.reset()
pusher.lolita(1, 2)
trigger.wait()
print('done')
@@ -111,13 +110,13 @@ def test_pushpull_composite():
def test_pubsub_composite():
endpoint = random_ipc_endpoint()
- trigger = gevent.event.Event()
+ trigger = eventlet.event.Event()
class Subscriber(object):
def lolita(self, a, b):
print('lolita', a, b)
assert a + b == 3
- trigger.set()
+ trigger.send()
publisher = zerorpc.Publisher()
publisher.bind(endpoint)
@@ -125,9 +124,9 @@ def test_pubsub_composite():
service = Subscriber()
subscriber = zerorpc.Subscriber(service)
subscriber.connect(endpoint)
- gevent.spawn(subscriber.run)
+ eventlet.spawn(subscriber.run)
- trigger.clear()
+ # trigger.reset()
# We need this retry logic to wait that the subscriber.run coroutine starts
# reading (the published messages will go to /dev/null until then).
for attempt in range(0, 10):
diff --git a/tests/test_reqstream.py b/tests/test_reqstream.py
index 71e1511..2cb9266 100644
--- a/tests/test_reqstream.py
+++ b/tests/test_reqstream.py
@@ -27,7 +27,7 @@ from __future__ import print_function
from __future__ import absolute_import
from builtins import range
-import gevent
+import eventlet
import zerorpc
from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR
@@ -55,7 +55,7 @@ def test_rcp_streaming():
srv = MySrv(heartbeat=TIME_FACTOR * 4)
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(heartbeat=TIME_FACTOR * 4)
client.connect(endpoint)
@@ -67,7 +67,7 @@ def test_rcp_streaming():
assert isinstance(r, Iterator)
l = []
print('wait 4s for fun')
- gevent.sleep(TIME_FACTOR * 4)
+ eventlet.sleep(TIME_FACTOR * 4)
for x in r:
l.append(x)
assert l == list(range(10))
diff --git a/tests/test_server.py b/tests/test_server.py
index 86997a9..a58f5eb 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -28,7 +28,7 @@ from __future__ import absolute_import
from builtins import range
import pytest
-import gevent
+import eventlet
import sys
from zerorpc import zmq
@@ -49,7 +49,7 @@ def test_server_manual():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client_events = zerorpc.Events(zmq.DEALER)
client_events.connect(endpoint)
@@ -82,7 +82,7 @@ def test_client_server():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client()
client.connect(endpoint)
@@ -103,12 +103,12 @@ def test_client_server_client_timeout():
return 42
def add(self, a, b):
- gevent.sleep(TIME_FACTOR * 10)
+ eventlet.sleep(TIME_FACTOR * 10)
return a + b
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
client.connect(endpoint)
@@ -132,7 +132,7 @@ def test_client_server_exception():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
client.connect(endpoint)
@@ -159,7 +159,7 @@ def test_client_server_detailed_exception():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client = zerorpc.Client(timeout=TIME_FACTOR * 2)
client.connect(endpoint)
@@ -192,7 +192,7 @@ def test_exception_compat_v1():
srv = MySrv()
srv.bind(endpoint)
- gevent.spawn(srv.run)
+ eventlet.spawn(srv.run)
client_events = zerorpc.Events(zmq.DEALER)
client_events.connect(endpoint)
@@ -215,7 +215,7 @@ def test_exception_compat_v1():
assert event.name == 'ERR'
(msg,) = event.args
print('msg only', msg)
- assert msg == "NameError('donotexist',)"
+ assert msg == "NameError('donotexist')"
client_events.close()
srv.close()
diff --git a/tests/test_zmq.py b/tests/test_zmq.py
index 1e7b4dd..18ee39f 100644
--- a/tests/test_zmq.py
+++ b/tests/test_zmq.py
@@ -25,7 +25,7 @@
from __future__ import print_function
from __future__ import absolute_import
-import gevent
+import eventlet
from zerorpc import zmq
from .testutils import teardown, random_ipc_endpoint
@@ -61,6 +61,6 @@ def test1():
s.close()
c.term()
- s = gevent.spawn(server)
- c = gevent.spawn(client)
- c.join()
+ s = eventlet.spawn(server)
+ c = eventlet.spawn(client)
+ c.wait()
diff --git a/tests/zmqbug.py b/tests/zmqbug.py
index 1d102a2..da83fd2 100644
--- a/tests/zmqbug.py
+++ b/tests/zmqbug.py
@@ -28,29 +28,147 @@
from __future__ import print_function
import zmq
+from zmq.constants import *
-import gevent.event
-import gevent.core
+import eventlet
+import eventlet.hubs
+import greenlet
+from collections import deque
STOP_EVERYTHING = False
+class LockReleaseError(Exception):
+ pass
+
+class _QueueLock(object):
+ """A Lock that can be acquired by at most one thread. Any other
+ thread calling acquire will be blocked in a queue. When release
+ is called, the threads are awoken in the order they blocked,
+ one at a time. This lock can be required recursively by the same
+ thread."""
+
+ def __init__(self):
+ self._waiters = deque()
+ self._count = 0
+ self._holder = None
+ self._hub = eventlet.hubs.get_hub()
+
+ def __nonzero__(self):
+ return bool(self._count)
+
+ __bool__ = __nonzero__
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self, type, value, traceback):
+ self.release()
+
+ def acquire(self):
+ current = greenlet.getcurrent()
+ if (self._waiters or self._count > 0) and self._holder is not current:
+ # block until lock is free
+ self._waiters.append(current)
+ self._hub.switch()
+ w = self._waiters.popleft()
+
+ assert w is current, 'Waiting threads woken out of order'
+ assert self._count == 0, 'After waking a thread, the lock must be unacquired'
+
+ self._holder = current
+ self._count += 1
+
+ def release(self):
+ if self._count <= 0:
+ raise LockReleaseError("Cannot release unacquired lock")
+
+ self._count -= 1
+ if self._count == 0:
+ self._holder = None
+ if self._waiters:
+ # wake next
+ self._hub.schedule_call_global(0, self._waiters[0].switch)
+
+
+class _BlockedThread(object):
+ """Is either empty, or represents a single blocked thread that
+ blocked itself by calling the block() method. The thread can be
+ awoken by calling wake(). Wake() can be called multiple times and
+ all but the first call will have no effect."""
+
+ def __init__(self):
+ self._blocked_thread = None
+ self._wakeupper = None
+ self._hub = eventlet.hubs.get_hub()
+
+ def __nonzero__(self):
+ return self._blocked_thread is not None
+
+ __bool__ = __nonzero__
+
+ def block(self, deadline=None):
+ if self._blocked_thread is not None:
+ raise Exception("Cannot block more than one thread on one BlockedThread")
+ self._blocked_thread = greenlet.getcurrent()
+
+ if deadline is not None:
+ self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake)
+
+ try:
+ self._hub.switch()
+ finally:
+ self._blocked_thread = None
+ # cleanup the wakeup task
+ if self._wakeupper is not None:
+ # Important to cancel the wakeup task so it doesn't
+ # spuriously wake this greenthread later on.
+ self._wakeupper.cancel()
+ self._wakeupper = None
+
+ def wake(self):
+ """Schedules the blocked thread to be awoken and return
+ True. If wake has already been called or if there is no
+ blocked thread, then this call has no effect and returns
+ False."""
+ if self._blocked_thread is not None and self._wakeupper is None:
+ self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
+ return True
+ return False
+
class ZMQSocket(zmq.Socket):
def __init__(self, context, socket_type):
super(ZMQSocket, self).__init__(context, socket_type)
on_state_changed_fd = self.getsockopt(zmq.FD)
- self._readable = gevent.event.Event()
- self._writable = gevent.event.Event()
- try:
- # gevent>=1.0
- self._state_event = gevent.hub.get_hub().loop.io(
- on_state_changed_fd, gevent.core.READ)
- self._state_event.start(self._on_state_changed)
- except AttributeError:
- # gevent<1.0
- self._state_event = gevent.core.read_event(on_state_changed_fd,
- self._on_state_changed, persist=True)
+ self.__dict__['_eventlet_send_event'] = _BlockedThread()
+ self.__dict__['_eventlet_recv_event'] = _BlockedThread()
+ self.__dict__['_eventlet_send_lock'] = _QueueLock()
+ self.__dict__['_eventlet_recv_lock'] = _QueueLock()
+
+ def event(fd):
+ # Some events arrived at the zmq socket. This may mean
+ # there's a message that can be read or there's space for
+ # a message to be written.
+ send_wake = self._eventlet_send_event.wake()
+ recv_wake = self._eventlet_recv_event.wake()
+ if not send_wake and not recv_wake:
+ # if no waiting send or recv thread was woken up, then
+ # force the zmq socket's events to be processed to
+ # avoid repeated wakeups
+ events = self.getsockopt(zmq.EVENTS)
+ if events & zmq.POLLOUT:
+ self._eventlet_send_event.wake()
+ if events & zmq.POLLIN:
+ self._eventlet_recv_event.wake()
+
+ hub = eventlet.hubs.get_hub()
+ self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
+ self.getsockopt(FD),
+ event,
+ lambda _: None,
+ lambda: None)
+ self.__dict__['_eventlet_clock'] = hub.clock
def _on_state_changed(self, event=None, _evtype=None):
if self.closed:
@@ -64,47 +182,89 @@ class ZMQSocket(zmq.Socket):
if events & zmq.POLLIN:
self._readable.set()
- def close(self):
- if not self.closed and getattr(self, '_state_event', None):
- try:
- # gevent>=1.0
- self._state_event.stop()
- except AttributeError:
- # gevent<1.0
- self._state_event.cancel()
- super(ZMQSocket, self).close()
+ def close(self, linger=None):
+ super(ZMQSocket, self).close(linger)
+ if self._eventlet_listener is not None:
+ eventlet.hubs.get_hub().remove(self._state_event)
+ self.__dict__['_eventlet_listener'] = None
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
def send(self, data, flags=0, copy=True, track=False):
if flags & zmq.NOBLOCK:
- return super(ZMQSocket, self).send(data, flags, copy, track)
+ result = super(ZMQSocket, self).send(data, flags, copy, track)
+ # Instead of calling both wake methods, could call
+ # self.getsockopt(EVENTS) which would trigger wakeups if
+ # needed.
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
+ return result
+
+ # TODO: pyzmq will copy the message buffer and create Message
+ # objects under some circumstances. We could do that work here
+ # once to avoid doing it every time the send is retried.
flags |= zmq.NOBLOCK
- while True:
- try:
- return super(ZMQSocket, self).send(data, flags, copy, track)
- except zmq.ZMQError as e:
- if e.errno != zmq.EAGAIN:
- raise
- self._writable.clear()
- self._writable.wait()
+ with self._eventlet_send_lock:
+ while True:
+ try:
+ return super(ZMQSocket, self).send(data, flags, copy, track)
+ except zmq.ZMQError as e:
+ if e.errno == zmq.EAGAIN:
+ self._eventlet_send_event.block()
+ else:
+ raise
+ finally:
+ # The call to send processes 0mq events and may
+ # make the socket ready to recv. Wake the next
+ # receiver. (Could check EVENTS for POLLIN here)
+ self._eventlet_recv_event.wake()
def recv(self, flags=0, copy=True, track=False):
if flags & zmq.NOBLOCK:
- return super(ZMQSocket, self).recv(flags, copy, track)
+ msg = super(ZMQSocket, self).recv(flags, copy, track)
+ # Instead of calling both wake methods, could call
+ # self.getsockopt(EVENTS) which would trigger wakeups if
+ # needed.
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
+ return msg
+
+ deadline = None
+ if hasattr(zmq, 'RCVTIMEO'):
+ sock_timeout = self.getsockopt(zmq.RCVTIMEO)
+ if sock_timeout == -1:
+ pass
+ elif sock_timeout > 0:
+ deadline = self._eventlet_clock() + sock_timeout / 1000.0
+ else:
+ raise ValueError(sock_timeout)
+
flags |= zmq.NOBLOCK
- while True:
- try:
- return super(ZMQSocket, self).recv(flags, copy, track)
- except zmq.ZMQError as e:
- if e.errno != zmq.EAGAIN:
- raise
- self._readable.clear()
- while not self._readable.wait(timeout=10):
- events = self.getsockopt(zmq.EVENTS)
- if bool(events & zmq.POLLIN):
- print("here we go, nobody told me about new messages!")
- global STOP_EVERYTHING
- STOP_EVERYTHING = True
- raise gevent.GreenletExit()
+ with self._eventlet_recv_lock:
+ while True:
+ try:
+ return super(ZMQSocket, self).recv(flags, copy, track)
+ except zmq.ZMQError as e:
+ if e.errno == zmq.EAGAIN:
+ # zmq in its wisdom decided to reuse EAGAIN for timeouts
+ if deadline is not None and self._eventlet_clock() > deadline:
+ e.is_timeout = True
+ raise
+
+ self._eventlet_recv_event.block(deadline=deadline)
+ else:
+ raise
+ finally:
+ # The call to recv processes 0mq events and may
+ # make the socket ready to send. Wake the next
+ # receiver. (Could check EVENTS for POLLOUT here)
+ while self._eventlet_send_event.wake():
+ events = self.getsockopt(zmq.EVENTS)
+ if bool(events & zmq.POLLIN):
+ print("here we go, nobody told me about new messages!")
+ global STOP_EVERYTHING
+ STOP_EVERYTHING = True
+ raise greenlet.GreenletExit()
zmq_context = zmq.Context()
@@ -124,11 +284,11 @@ def server():
socket.send(msg)
cnt.responded += 1
- gevent.spawn(responder)
+ eventlet.spawn(responder)
while not STOP_EVERYTHING:
print("cnt.responded=", cnt.responded)
- gevent.sleep(0.5)
+ eventlet.sleep(0.5)
def client():
@@ -149,17 +309,17 @@ def client():
def sendmsg():
while not STOP_EVERYTHING:
- socket.send('', flags=zmq.SNDMORE)
- socket.send('hello')
+ socket.send(b'', flags=zmq.SNDMORE)
+ socket.send(b'hello')
cnt.send += 1
- gevent.sleep(0)
+ eventlet.sleep(0)
- gevent.spawn(recvmsg)
- gevent.spawn(sendmsg)
+ eventlet.spawn(recvmsg)
+ eventlet.spawn(sendmsg)
while not STOP_EVERYTHING:
print("cnt.recv=", cnt.recv, "cnt.send=", cnt.send)
- gevent.sleep(0.5)
+ eventlet.sleep(0.5)
-gevent.spawn(server)
+eventlet.spawn(server)
client()
diff --git a/tox.ini b/tox.ini
index 96bace8..a12cbc6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,6 +11,6 @@ commands =
passenv = ZPC_TEST_TIME_FACTOR
[flake8]
-ignore = E501,E128
+ignore = E501,E128,E129,F841,W504
filename = *.py,zerorpc
exclude = tests,.git,dist,doc,*.egg-info,__pycache__,setup.py
diff --git a/zerorpc/channel.py b/zerorpc/channel.py
index ad21c27..bd376ec 100644
--- a/zerorpc/channel.py
+++ b/zerorpc/channel.py
@@ -22,11 +22,7 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
-import gevent.pool
-import gevent.queue
-import gevent.event
-import gevent.local
-import gevent.lock
+import eventlet
import logging
from .exceptions import TimeoutExpired
@@ -43,8 +39,8 @@ class ChannelMultiplexer(ChannelBase):
self._channel_dispatcher_task = None
self._broadcast_queue = None
if events.recv_is_supported and not ignore_broadcast:
- self._broadcast_queue = gevent.queue.Queue(maxsize=1)
- self._channel_dispatcher_task = gevent.spawn(
+ self._broadcast_queue = eventlet.queue.Queue(maxsize=1)
+ self._channel_dispatcher_task = eventlet.spawn(
self._channel_dispatcher)
@property
@@ -98,7 +94,7 @@ class ChannelMultiplexer(ChannelBase):
def channel(self, from_event=None):
if self._channel_dispatcher_task is None:
- self._channel_dispatcher_task = gevent.spawn(
+ self._channel_dispatcher_task = eventlet.spawn(
self._channel_dispatcher)
return Channel(self, from_event)
@@ -117,7 +113,7 @@ class Channel(ChannelBase):
self._multiplexer = multiplexer
self._channel_id = None
self._zmqid = None
- self._queue = gevent.queue.Queue(maxsize=1)
+ self._queue = eventlet.queue.Queue(maxsize=1)
if from_event is not None:
self._channel_id = from_event.header[u'message_id']
self._zmqid = from_event.identity
@@ -156,7 +152,7 @@ class Channel(ChannelBase):
def recv(self, timeout=None):
try:
event = self._queue.get(timeout=timeout)
- except gevent.queue.Empty:
+ except eventlet.queue.Empty:
raise TimeoutExpired(timeout)
return event
@@ -172,11 +168,11 @@ class BufferedChannel(ChannelBase):
self._input_queue_size = inqueue_size
self._remote_queue_open_slots = 1
self._input_queue_reserved = 1
- self._remote_can_recv = gevent.event.Event()
- self._input_queue = gevent.queue.Queue()
+ self._remote_can_recv = eventlet.event.Event()
+ self._input_queue = eventlet.queue.Queue()
self._verbose = False
self._on_close_if = None
- self._recv_task = gevent.spawn(self._recver)
+ self._recv_task = eventlet.spawn(self._recver)
@property
def recv_is_supported(self):
@@ -211,7 +207,7 @@ class BufferedChannel(ChannelBase):
except Exception:
logger.exception('gevent_zerorpc.BufferedChannel._recver')
if self._remote_queue_open_slots > 0:
- self._remote_can_recv.set()
+ self._remote_can_recv.send()
elif self._input_queue.qsize() == self._input_queue_size:
raise RuntimeError(
'BufferedChannel, queue overflow on event:', event)
@@ -227,12 +223,12 @@ class BufferedChannel(ChannelBase):
def emit_event(self, event, timeout=None):
if self._remote_queue_open_slots == 0:
- self._remote_can_recv.clear()
+ # self._remote_can_recv.reset() # TODO Check if the result is equivalent to gevent.clear()
self._remote_can_recv.wait(timeout=timeout)
self._remote_queue_open_slots -= 1
try:
self._channel.emit_event(event)
- except:
+ except Exception:
self._remote_queue_open_slots += 1
raise
@@ -253,7 +249,7 @@ class BufferedChannel(ChannelBase):
try:
event = self._input_queue.get(timeout=timeout)
- except gevent.queue.Empty:
+ except eventlet.queue.Empty:
raise TimeoutExpired(timeout)
self._input_queue_reserved -= 1
diff --git a/zerorpc/context.py b/zerorpc/context.py
index debce26..6e20720 100644
--- a/zerorpc/context.py
+++ b/zerorpc/context.py
@@ -29,7 +29,7 @@ from future.utils import tobytes
import uuid
import random
-from . import gevent_zmq as zmq
+from eventlet.green import zmq
class Context(zmq.Context):
diff --git a/zerorpc/core.py b/zerorpc/core.py
index 9dbf5cc..ea89f36 100644
--- a/zerorpc/core.py
+++ b/zerorpc/core.py
@@ -30,13 +30,9 @@ from future.utils import iteritems
import sys
import traceback
-import gevent.pool
-import gevent.queue
-import gevent.event
-import gevent.local
-import gevent.lock
+import eventlet
-from . import gevent_zmq as zmq
+from eventlet.green import zmq
from .exceptions import TimeoutExpired, RemoteError, LostRemote
from .channel import ChannelMultiplexer, BufferedChannel
from .socket import SocketBase
@@ -52,7 +48,7 @@ logger = getLogger(__name__)
class ServerBase(object):
def __init__(self, channel, methods=None, name=None, context=None,
- pool_size=None, heartbeat=5):
+ pool_size=1000, heartbeat=5):
self._multiplexer = ChannelMultiplexer(channel)
if methods is None:
@@ -60,7 +56,7 @@ class ServerBase(object):
self._context = context or Context.get_instance()
self._name = name or self._extract_name()
- self._task_pool = gevent.pool.Pool(size=pool_size)
+ self._task_pool = eventlet.greenpool.GreenPool(size=pool_size)
self._acceptor_task = None
self._methods = self._filter_methods(ServerBase, self, methods)
@@ -171,12 +167,12 @@ class ServerBase(object):
self._task_pool.spawn(self._async_task, initial_event)
def run(self):
- self._acceptor_task = gevent.spawn(self._acceptor)
+ self._acceptor_task = eventlet.spawn(self._acceptor)
try:
- self._acceptor_task.get()
+ self._acceptor_task.wait()
finally:
self.stop()
- self._task_pool.join(raise_error=True)
+ self._task_pool.waitall()
def stop(self):
if self._acceptor_task is not None:
@@ -272,10 +268,8 @@ class ClientBase(object):
kargs.get('async_', False) is False):
return self._process_response(request_event, bufchan, timeout)
- async_result = gevent.event.AsyncResult()
- gevent.spawn(self._process_response, request_event, bufchan,
- timeout).link(async_result)
- return async_result
+ return eventlet.spawn(self._process_response, request_event, bufchan,
+ timeout)
def __getattr__(self, method):
return lambda *args, **kargs: self(method, *args, **kargs)
@@ -283,7 +277,7 @@ class ClientBase(object):
class Server(SocketBase, ServerBase):
- def __init__(self, methods=None, name=None, context=None, pool_size=None,
+ def __init__(self, methods=None, name=None, context=None, pool_size=1000,
heartbeat=5):
SocketBase.__init__(self, zmq.ROUTER, context)
if methods is None:
@@ -368,15 +362,15 @@ class Puller(SocketBase):
del exc_infos
def run(self):
- self._receiver_task = gevent.spawn(self._receiver)
+ self._receiver_task = eventlet.spawn(self._receiver)
try:
- self._receiver_task.get()
+ self._receiver_task.wait()
finally:
self._receiver_task = None
def stop(self):
if self._receiver_task is not None:
- self._receiver_task.kill(block=False)
+ self._receiver_task.kill()
class Publisher(Pusher):
diff --git a/zerorpc/events.py b/zerorpc/events.py
index f87d0b5..ce97ad6 100644
--- a/zerorpc/events.py
+++ b/zerorpc/events.py
@@ -28,15 +28,12 @@ from builtins import str
from builtins import range
import msgpack
-import gevent.pool
-import gevent.queue
-import gevent.event
-import gevent.local
-import gevent.lock
+import eventlet
+import greenlet
import logging
import sys
-from . import gevent_zmq as zmq
+from eventlet.green import zmq
from .exceptions import TimeoutExpired
from .context import Context
from .channel_base import ChannelBase
@@ -50,8 +47,8 @@ else:
return frame.buffer
# gevent <= 1.1.0.rc5 is missing the Python3 __next__ method.
-if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'):
- setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next)
+# if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'):
+# setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next)
logger = logging.getLogger(__name__)
@@ -67,20 +64,20 @@ class SequentialSender(object):
for i in range(len(parts) - 1):
try:
self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE)
- except (gevent.GreenletExit, gevent.Timeout) as e:
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
if i == 0:
raise
self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE)
try:
self._socket.send(parts[-1], copy=False)
- except (gevent.GreenletExit, gevent.Timeout) as e:
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
self._socket.send(parts[-1], copy=False)
if e:
raise e
def __call__(self, parts, timeout=None):
if timeout:
- with gevent.Timeout(timeout):
+ with eventlet.Timeout(timeout):
self._send(parts)
else:
self._send(parts)
@@ -97,7 +94,7 @@ class SequentialReceiver(object):
while True:
try:
part = self._socket.recv(copy=False)
- except (gevent.GreenletExit, gevent.Timeout) as e:
+ except (greenlet.GreenletExit, eventlet.Timeout) as e:
if len(parts) == 0:
raise
part = self._socket.recv(copy=False)
@@ -110,7 +107,7 @@ class SequentialReceiver(object):
def __call__(self, timeout=None):
if timeout:
- with gevent.Timeout(timeout):
+ with eventlet.Timeout(timeout):
return self._recv()
else:
return self._recv()
@@ -120,21 +117,22 @@ class Sender(SequentialSender):
def __init__(self, socket):
self._socket = socket
- self._send_queue = gevent.queue.Channel()
- self._send_task = gevent.spawn(self._sender)
+ self._send_queue = eventlet.queue.Queue(maxsize=0) # Channel
+ self._send_task = eventlet.spawn(self._sender)
def close(self):
if self._send_task:
self._send_task.kill()
def _sender(self):
- for parts in self._send_queue:
+ while True:
+ parts = self._send_queue.get()
super(Sender, self)._send(parts)
def __call__(self, parts, timeout=None):
try:
self._send_queue.put(parts, timeout=timeout)
- except gevent.queue.Full:
+ except eventlet.queue.Full:
raise TimeoutExpired(timeout)
@@ -142,8 +140,8 @@ class Receiver(SequentialReceiver):
def __init__(self, socket):
self._socket = socket
- self._recv_queue = gevent.queue.Channel()
- self._recv_task = gevent.spawn(self._recver)
+ self._recv_queue = eventlet.queue.Queue(maxsize=0) # Channel
+ self._recv_task = eventlet.spawn(self._recver)
def close(self):
if self._recv_task:
@@ -158,7 +156,7 @@ class Receiver(SequentialReceiver):
def __call__(self, timeout=None):
try:
return self._recv_queue.get(timeout=timeout)
- except gevent.queue.Empty:
+ except eventlet.queue.Empty:
raise TimeoutExpired(timeout)
@@ -281,11 +279,11 @@ class Events(ChannelBase):
def close(self):
try:
self._send.close()
- except (AttributeError, TypeError, gevent.GreenletExit):
+ except (AttributeError, TypeError, greenlet.GreenletExit):
pass
try:
self._recv.close()
- except (AttributeError, TypeError, gevent.GreenletExit):
+ except (AttributeError, TypeError, greenlet.GreenletExit):
pass
self._socket.close()
diff --git a/zerorpc/gevent_zmq.py b/zerorpc/gevent_zmq.py
index 9430695..bac9a48 100644
--- a/zerorpc/gevent_zmq.py
+++ b/zerorpc/gevent_zmq.py
@@ -22,10 +22,11 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
-# Based on https://github.com/traviscline/gevent-zeromq/
+# Based on https://github.com/eventlet/eventlet/blob/37fca06d7466a698cf53a1ae6e4b3d840a3ced7a/eventlet/green/zmq.py
# We want to act like zmq
from zmq import * # noqa
+from zmq.constants import * # noqa
# Explicit import to please flake8
from zmq import ZMQError
@@ -33,13 +34,112 @@ from zmq import ZMQError
# A way to access original zmq
import zmq as _zmq
-import gevent.event
-import gevent.core
+import eventlet
+import eventlet.hubs
+from eventlet.support import greenlets as greenlet
import errno
from logging import getLogger
+from collections import deque
logger = getLogger(__name__)
+class LockReleaseError(Exception):
+ pass
+
+class _QueueLock(object):
+ """A Lock that can be acquired by at most one thread. Any other
+ thread calling acquire will be blocked in a queue. When release
+ is called, the threads are awoken in the order they blocked,
+ one at a time. This lock can be required recursively by the same
+ thread."""
+
+ def __init__(self):
+ self._waiters = deque()
+ self._count = 0
+ self._holder = None
+ self._hub = eventlet.hubs.get_hub()
+
+ def __nonzero__(self):
+ return bool(self._count)
+
+ __bool__ = __nonzero__
+
+ def __enter__(self):
+ self.acquire()
+
+ def __exit__(self, type, value, traceback):
+ self.release()
+
+ def acquire(self):
+ current = greenlet.getcurrent()
+ if (self._waiters or self._count > 0) and self._holder is not current:
+ # block until lock is free
+ self._waiters.append(current)
+ self._hub.switch()
+ w = self._waiters.popleft()
+
+ assert w is current, 'Waiting threads woken out of order'
+ assert self._count == 0, 'After waking a thread, the lock must be unacquired'
+
+ self._holder = current
+ self._count += 1
+
+ def release(self):
+ if self._count <= 0:
+ raise LockReleaseError("Cannot release unacquired lock")
+
+ self._count -= 1
+ if self._count == 0:
+ self._holder = None
+ if self._waiters:
+ # wake next
+ self._hub.schedule_call_global(0, self._waiters[0].switch)
+
+class _BlockedThread(object):
+ """Is either empty, or represents a single blocked thread that
+ blocked itself by calling the block() method. The thread can be
+ awoken by calling wake(). Wake() can be called multiple times and
+ all but the first call will have no effect."""
+
+ def __init__(self):
+ self._blocked_thread = None
+ self._wakeupper = None
+ self._hub = eventlet.hubs.get_hub()
+
+ def __nonzero__(self):
+ return self._blocked_thread is not None
+
+ __bool__ = __nonzero__
+
+ def block(self, deadline=None):
+ if self._blocked_thread is not None:
+ raise Exception("Cannot block more than one thread on one BlockedThread")
+ self._blocked_thread = greenlet.getcurrent()
+
+ if deadline is not None:
+ self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake)
+
+ try:
+ self._hub.switch()
+ finally:
+ self._blocked_thread = None
+ # cleanup the wakeup task
+ if self._wakeupper is not None:
+ # Important to cancel the wakeup task so it doesn't
+ # spuriously wake this greenthread later on.
+ self._wakeupper.cancel()
+ self._wakeupper = None
+
+ def wake(self):
+ """Schedules the blocked thread to be awoken and return
+ True. If wake has already been called or if there is no
+ blocked thread, then this call has no effect and returns
+ False."""
+ if self._blocked_thread is not None and self._wakeupper is None:
+ self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch)
+ return True
+ return False
+
class Context(_zmq.Context):
@@ -57,47 +157,43 @@ class Socket(_zmq.Socket):
# NOTE: pyzmq 13.0.0 messed up with setattr (they turned it into a
# non-op) and you can't assign attributes normally anymore, hence the
# tricks with self.__dict__ here
- self.__dict__["_readable"] = gevent.event.Event()
- self.__dict__["_writable"] = gevent.event.Event()
- try:
- # gevent>=1.0
- self.__dict__["_state_event"] = gevent.hub.get_hub().loop.io(
- on_state_changed_fd, gevent.core.READ)
- self._state_event.start(self._on_state_changed)
- except AttributeError:
- # gevent<1.0
- self.__dict__["_state_event"] = \
- gevent.core.read_event(on_state_changed_fd,
- self._on_state_changed, persist=True)
-
- def _on_state_changed(self, event=None, _evtype=None):
- if self.closed:
- self._writable.set()
- self._readable.set()
- return
- while True:
- try:
+ self.__dict__['_eventlet_send_event'] = _BlockedThread()
+ self.__dict__['_eventlet_recv_event'] = _BlockedThread()
+ self.__dict__['_eventlet_send_lock'] = _QueueLock()
+ self.__dict__['_eventlet_recv_lock'] = _QueueLock()
+
+ def event(fd):
+ # Some events arrived at the zmq socket. This may mean
+ # there's a message that can be read or there's space for
+ # a message to be written.
+ send_wake = self._eventlet_send_event.wake()
+ recv_wake = self._eventlet_recv_event.wake()
+ if not send_wake and not recv_wake:
+ # if no waiting send or recv thread was woken up, then
+ # force the zmq socket's events to be processed to
+ # avoid repeated wakeups
events = self.getsockopt(_zmq.EVENTS)
- break
- except ZMQError as e:
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
- raise
+ if events & _zmq.POLLOUT:
+ self._eventlet_send_event.wake()
+ if events & _zmq.POLLIN:
+ self._eventlet_recv_event.wake()
- if events & _zmq.POLLOUT:
- self._writable.set()
- if events & _zmq.POLLIN:
- self._readable.set()
+ hub = eventlet.hubs.get_hub()
+ self.__dict__['_eventlet_listener'] = hub.add(hub.READ,
+ self.getsockopt(_zmq.FD),
+ event,
+ lambda _: None,
+ lambda: None)
+ self.__dict__['_eventlet_clock'] = hub.clock
- def close(self):
- if not self.closed and getattr(self, '_state_event', None):
- try:
- # gevent>=1.0
- self._state_event.stop()
- except AttributeError:
- # gevent<1.0
- self._state_event.cancel()
- super(Socket, self).close()
+ def close(self, linger=None):
+ super(Socket, self).close(linger)
+ if self._eventlet_listener is not None:
+ eventlet.hubs.get_hub().remove(self._state_event)
+ self.__dict__['_eventlet_listener'] = None
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
def connect(self, *args, **kwargs):
while True:
@@ -109,80 +205,71 @@ class Socket(_zmq.Socket):
def send(self, data, flags=0, copy=True, track=False):
if flags & _zmq.NOBLOCK:
- return super(Socket, self).send(data, flags, copy, track)
+ result = super(Socket, self).send(data, flags, copy, track)
+ # Instead of calling both wake methods, could call
+ # self.getsockopt(EVENTS) which would trigger wakeups if
+ # needed.
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
+ return result
+
+ # TODO: pyzmq will copy the message buffer and create Message
+ # objects under some circumstances. We could do that work here
+ # once to avoid doing it every time the send is retried.
flags |= _zmq.NOBLOCK
- while True:
- try:
- msg = super(Socket, self).send(data, flags, copy, track)
- # The following call, force polling the state of the zmq socket
- # (POLLIN and/or POLLOUT). It seems that a POLLIN event is often
- # missed when the socket is used to send at the same time,
- # forcing to poll at this exact moment seems to reduce the
- # latencies when a POLLIN event is missed. The drawback is a
- # reduced throughput (roughly 8.3%) in exchange of a normal
- # concurrency. In other hand, without the following line, you
- # loose 90% of the performances as soon as there is simultaneous
- # send and recv on the socket.
- self._on_state_changed()
- return msg
- except _zmq.ZMQError as e:
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
- raise
- self._writable.clear()
- # The following sleep(0) force gevent to switch out to another
- # coroutine and seems to refresh the notion of time that gevent may
- # have. This definitively eliminate the gevent bug that can trigger
- # a timeout too soon under heavy load. In theory it will incur more
- # CPU usage, but in practice it balance even with the extra CPU used
- # when the timeout triggers too soon in the following loop. So for
- # the same CPU load, you get a better throughput (roughly 18.75%).
- gevent.sleep(0)
- while not self._writable.wait(timeout=1):
+ with self._eventlet_send_lock:
+ while True:
try:
- if self.getsockopt(_zmq.EVENTS) & _zmq.POLLOUT:
- logger.error("/!\\ gevent_zeromq BUG /!\\ "
- "catching up after missing event (SEND) /!\\")
- break
+ return super(Socket, self).send(data, flags, copy, track)
except ZMQError as e:
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
+ if e.errno == _zmq.EAGAIN:
+ self._eventlet_send_event.block()
+ else:
raise
+ finally:
+ # The call to send processes 0mq events and may
+ # make the socket ready to recv. Wake the next
+ # receiver. (Could check EVENTS for POLLIN here)
+ self._eventlet_recv_event.wake()
def recv(self, flags=0, copy=True, track=False):
if flags & _zmq.NOBLOCK:
- return super(Socket, self).recv(flags, copy, track)
+ msg = super(Socket, self).recv(flags, copy, track)
+ # Instead of calling both wake methods, could call
+ # self.getsockopt(EVENTS) which would trigger wakeups if
+ # needed.
+ self._eventlet_send_event.wake()
+ self._eventlet_recv_event.wake()
+ return msg
+
+ deadline = None
+ if hasattr(_zmq, 'RCVTIMEO'):
+ sock_timeout = self.getsockopt(_zmq.RCVTIMEO)
+ if sock_timeout == -1:
+ pass
+ elif sock_timeout > 0:
+ deadline = self._eventlet_clock() + sock_timeout / 1000.0
+ else:
+ raise ValueError(sock_timeout)
+
flags |= _zmq.NOBLOCK
- while True:
- try:
- msg = super(Socket, self).recv(flags, copy, track)
- # The following call, force polling the state of the zmq socket
- # (POLLIN and/or POLLOUT). It seems that a POLLOUT event is
- # often missed when the socket is used to receive at the same
- # time, forcing to poll at this exact moment seems to reduce the
- # latencies when a POLLOUT event is missed. The drawback is a
- # reduced throughput (roughly 8.3%) in exchange of a normal
- # concurrency. In other hand, without the following line, you
- # loose 90% of the performances as soon as there is simultaneous
- # send and recv on the socket.
- self._on_state_changed()
- return msg
- except _zmq.ZMQError as e:
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
- raise
- self._readable.clear()
- # The following sleep(0) force gevent to switch out to another
- # coroutine and seems to refresh the notion of time that gevent may
- # have. This definitively eliminate the gevent bug that can trigger
- # a timeout too soon under heavy load. In theory it will incur more
- # CPU usage, but in practice it balance even with the extra CPU used
- # when the timeout triggers too soon in the following loop. So for
- # the same CPU load, you get a better throughput (roughly 18.75%).
- gevent.sleep(0)
- while not self._readable.wait(timeout=1):
+ with self._eventlet_recv_lock:
+ while True:
try:
- if self.getsockopt(_zmq.EVENTS) & _zmq.POLLIN:
- logger.error("/!\\ gevent_zeromq BUG /!\\ "
- "catching up after missing event (RECV) /!\\")
- break
- except ZMQError as e:
- if e.errno not in (_zmq.EAGAIN, errno.EINTR):
+ return super(Socket, self).recv(flags, copy, track)
+ except _zmq.ZMQError as e:
+ if e.errno == _zmq.EAGAIN:
+ # zmq in its wisdom decided to reuse EAGAIN for timeouts
+ if deadline is not None and self._eventlet_clock() > deadline:
+ e.is_timeout = True
+ raise
+
+ self._eventlet_recv_event.block(deadline=deadline)
+ else:
raise
+ finally:
+ # The call to recv processes 0mq events and may
+ # make the socket ready to send. Wake the next
+ # receiver. (Could check EVENTS for POLLOUT here)
+ self._eventlet_send_event.wake()
+
diff --git a/zerorpc/heartbeat.py b/zerorpc/heartbeat.py
index 23b974d..daa7d50 100644
--- a/zerorpc/heartbeat.py
+++ b/zerorpc/heartbeat.py
@@ -24,11 +24,7 @@
import time
-import gevent.pool
-import gevent.queue
-import gevent.event
-import gevent.local
-import gevent.lock
+import eventlet
from .exceptions import LostRemote, TimeoutExpired
from .channel_base import ChannelBase
@@ -40,12 +36,12 @@ class HeartBeatOnChannel(ChannelBase):
self._closed = False
self._channel = channel
self._heartbeat_freq = freq
- self._input_queue = gevent.queue.Channel()
+ self._input_queue = eventlet.queue.Queue(maxsize=0) # Channel
self._remote_last_hb = None
self._lost_remote = False
- self._recv_task = gevent.spawn(self._recver)
+ self._recv_task = eventlet.spawn(self._recver)
self._heartbeat_task = None
- self._parent_coroutine = gevent.getcurrent()
+ self._parent_coroutine = eventlet.getcurrent()
self._compat_v2 = None
if not passive:
self._start_heartbeat()
@@ -72,20 +68,20 @@ class HeartBeatOnChannel(ChannelBase):
def _heartbeat(self):
while True:
- gevent.sleep(self._heartbeat_freq)
+ eventlet.sleep(self._heartbeat_freq)
if self._remote_last_hb is None:
self._remote_last_hb = time.time()
if time.time() > self._remote_last_hb + self._heartbeat_freq * 2:
self._lost_remote = True
if not self._closed:
- gevent.kill(self._parent_coroutine,
+ eventlet.kill(self._parent_coroutine,
self._lost_remote_exception())
break
self._channel.emit(u'_zpc_hb', (0,)) # 0 -> compat with protocol v2
def _start_heartbeat(self):
if self._heartbeat_task is None and self._heartbeat_freq is not None and not self._closed:
- self._heartbeat_task = gevent.spawn(self._heartbeat)
+ self._heartbeat_task = eventlet.spawn(self._heartbeat)
def _recver(self):
while True:
@@ -120,7 +116,7 @@ class HeartBeatOnChannel(ChannelBase):
raise self._lost_remote_exception()
try:
return self._input_queue.get(timeout=timeout)
- except gevent.queue.Empty:
+ except eventlet.queue.Empty:
raise TimeoutExpired(timeout)
@property
--
2.25.1