Ejecutar una araña Scrapy en una tarea de apio

Esto ya no funciona , la API de scrapy ha cambiado.

Ahora la documentación muestra una forma de ” Ejecutar Scrapy desde un script ” pero obtengo el error ReactorNotRestartable .

Mi tarea:

 from celery import Task from twisted.internet import reactor from scrapy.crawler import Crawler from scrapy import log, signals from scrapy.utils.project import get_project_settings from .spiders import MySpider class MyTask(Task): def run(self, *args, **kwargs): spider = MySpider settings = get_project_settings() crawler = Crawler(settings) crawler.signals.connect(reactor.stop, signal=signals.spider_closed) crawler.configure() crawler.crawl(spider) crawler.start() log.start() reactor.run() 

    El reactor retorcido no se puede reiniciar. Una solución para esto es permitir que la tarea de apio cree un nuevo proceso secundario para cada rastreo que desee ejecutar, como se propone en la siguiente publicación:

    Ejecutar arañas Scrapy en una tarea de apio

    Esto evita el “reactor no puede ser un problema reiniciable” al utilizar el paquete de multiprocesamiento. Pero el problema con esto es que la solución ahora es obsoleta con la última versión de apio debido al hecho de que en su lugar se encontrará con otro problema donde un proceso de daemon no puede generar subprocesos. Entonces, para que la solución funcione, debes bajar a la versión de apio.

    Sí, y la API de scrapy ha cambiado. Pero con modificaciones menores (importar Crawler en lugar de CrawlerProcess). Puede hacer que la solución funcione al bajar en la versión de apio.

    El problema de apio se puede encontrar aquí: apio problema # 1709

    Aquí está mi script de rastreo actualizado que funciona con versiones de apio más nuevas al utilizar billar en lugar de multiprocesamiento:

     from scrapy.crawler import Crawler from scrapy.conf import settings from myspider import MySpider from scrapy import log, project from twisted.internet import reactor from billiard import Process from scrapy.utils.project import get_project_settings class UrlCrawlerScript(Process): def __init__(self, spider): Process.__init__(self) settings = get_project_settings() self.crawler = Crawler(settings) self.crawler.configure() self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed) self.spider = spider def run(self): self.crawler.crawl(self.spider) self.crawler.start() reactor.run() def run_spider(url): spider = MySpider(url) crawler = UrlCrawlerScript(spider) crawler.start() crawler.join() 

    Editar: Al leer el número de apio # 1709 , sugieren usar billar en lugar de multiproceso para que se levante la limitación del subproceso. En otras palabras, deberíamos probar el billar y ver si funciona.

    Editar 2: ¡ Sí, al usar billar , mi script funciona con la última versión de apio! Ver mi script actualizado

    El reactor Twisted no se puede reiniciar, por lo que una vez que una araña termina de funcionar y el crawler detiene el reactor implícitamente, ese trabajador es inútil.

    Como se publicó en las respuestas a esa otra pregunta, todo lo que tiene que hacer es matar al trabajador que ejecutó su araña y reemplazarla por una nueva, lo que evita que el reactor se inicie y se detenga más de una vez. Para hacer esto, solo establece:

     CELERYD_MAX_TASKS_PER_CHILD = 1 

    La desventaja es que no está realmente utilizando el reactor Twisted en todo su potencial y desperdiciando recursos ejecutando múltiples reactores, ya que un reactor puede ejecutar múltiples arañas a la vez en un solo proceso. Un mejor enfoque es ejecutar un reactor por trabajador (o incluso un reactor en todo el mundo) y no permita que el crawler toque.

    Estoy trabajando en esto para un proyecto muy similar, así que actualizaré esta publicación si hago algún progreso.

    Para evitar el error de ReactorNotRestartable al ejecutar Scrapy en Celery Tasks Queue, he usado threads. El mismo enfoque utilizado para ejecutar el reactor Twisted varias veces en una aplicación. Scrapy también usó Twisted, por lo que podemos hacer lo mismo.

    Aquí está el código:

     from threading import Thread from scrapy.crawler import CrawlerProcess import scrapy class MySpider(scrapy.Spider): name = 'my_spider' class MyCrawler: spider_settings = {} def run_crawler(self): process = CrawlerProcess(self.spider_settings) process.crawl(MySpider) Thread(target=process.start).start() 

    No se olvide de boost CELERYD_CONCURRENCY para el apio.

     CELERYD_CONCURRENCY = 10 

    funciona bien para mí

    Esto no está bloqueando el proceso en ejecución, pero de todos modos la mejor práctica de scrapy es procesar datos en devoluciones de llamadas. Solo hazlo de esta manera:

     for crawler in process.crawlers: crawler.spider.save_result_callback = some_callback crawler.spider.save_result_callback_params = some_callback_params Thread(target=process.start).start() 

    Yo diría que este enfoque es muy ineficiente si tiene que realizar muchas tareas. Porque el apio está enhebrado: ejecuta cada tarea dentro de su propio hilo. Digamos que con RabbitMQ como intermediario puede pasar> 10K q / s. ¡Con Aplery esto podría causar sobrecargas de 10K! Aconsejaría no usar apio aquí. ¡En lugar de eso, acceda al intermediario directamente!