Получение данных из источника блокировки в обратном вызове цикла Gevent

Я пытаюсь использовать Pycurl с Gevent для загрузки HTTP. Для этого я полагаюсь на модуль geventcurl.py, который изменяет Multi API libcurl для использования цикла событий Gevent.

Проблема заключается в обратном вызове READFUNCTION. Этот обратный вызов выполняется в контексте HUB, поэтому мы не можем wait(), но этот обратный вызов должен возвращать данные для загрузки, и в моем случае эти данные поступают из блокирующего источник.

Вот фрагмент кода, демонстрирующий проблему:

#!/usr/bin/env python
from gevent import monkey; monkey.patch_all()
import gevent
from gevent.queue import Queue
import pycurl
from geventcurl import Curl

URL = 'http://localhost:8000/'

class QueueReader:
    def __init__(self, q):
        self.q = q
    def read_callback(self, size):
        return self.q.get(timeout=10)

dataq = Queue(10)
c = Curl()
c.setopt(pycurl.URL, URL)
c.setopt(pycurl.UPLOAD, 1)
c.setopt(pycurl.READFUNCTION, QueueReader(dataq).read_callback)

# Start transfer
g = gevent.spawn(c.perform)
for i in xrange(10):
    dataq.put(str(i))
    gevent.sleep(1)
g.join()
c.close()

Чтобы запустить фрагмент, вам просто нужно, чтобы что-то слушало на localhost: 8000, nc -l 8000 подойдет. Что происходит, так это то, что, поскольку read_callback() выполняется в контексте HUB, он не будет ждать, если очередь пуста, он немедленно вызовет исключение Empy. Использование AsyncResult также не помогает, так как нам нужно ждать () результата.

Есть ли способ получить данные из возможно блокирующего источника в обратном вызове цикла событий?


person André Cruz    schedule 06.08.2012    source источник


Ответы (1)


Если вы избавитесь от timeout=10, он будет ждать данных от источника блокировки. (См.: http://www.gevent.org/gevent.queue.html, поведение по умолчанию.)

Если блокировка read_callback() будет мешать работе цикла for i in xrange(10), поместите этот цикл в отдельный гринлет, например, через gevent.spawn().

Также обратите внимание, что по умолчанию put() будет блокироваться, если очередь заполнена. Используйте put_nowait(), put(timeout=...) или put(block=False), чтобы изменить поведение по умолчанию.

person kkurian    schedule 03.10.2012