diff --git a/icrawler/builtin/urllist.py b/icrawler/builtin/urllist.py index 14719b5..4f31975 100644 --- a/icrawler/builtin/urllist.py +++ b/icrawler/builtin/urllist.py @@ -10,6 +10,9 @@ def worker_exec(self, queue_timeout=2, **kwargs): if self.signal.get("reach_max_num"): self.logger.info("downloaded image reached max num, thread %s" " exit", threading.current_thread().name) break + if self.signal.get("exceed_storage_space"): + self.logger.info("downloaded image reached max storage space, thread %s" " exit", threading.current_thread().name) + break try: url = self.in_queue.get(timeout=queue_timeout) except queue.Empty: diff --git a/icrawler/crawler.py b/icrawler/crawler.py index 12f8f1d..34d257c 100644 --- a/icrawler/crawler.py +++ b/icrawler/crawler.py @@ -5,6 +5,7 @@ import time from importlib import import_module +from . import defaults from . import storage as storage_package from .downloader import Downloader from .feeder import Feeder @@ -81,11 +82,11 @@ def set_logger(self, log_level=logging.INFO): def init_signal(self): """Init signal - 3 signals are added: ``feeder_exited``, ``parser_exited`` and - ``reach_max_num``. + 4 signals are added: ``feeder_exited``, ``parser_exited``, + ``reach_max_num`` and ``exceed_storage_space``. """ self.signal = Signal() - self.signal.set(feeder_exited=False, parser_exited=False, reach_max_num=False) + self.signal.set(feeder_exited=False, parser_exited=False, reach_max_num=False, exceed_storage_space=False) def set_storage(self, storage): """Set storage backend for downloader @@ -133,14 +134,7 @@ def set_session(self, headers=None): header to init the session) """ if headers is None: - headers = { - "Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2", - "User-Agent": ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" - " AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/88.0.4324.104 Safari/537.36" - ), - } + headers = defaults.DEFAULT_HEADERS elif not isinstance(headers, dict): raise TypeError('"headers" must be a dict object') diff --git a/icrawler/defaults.py b/icrawler/defaults.py new file mode 100644 index 0000000..e663075 --- /dev/null +++ b/icrawler/defaults.py @@ -0,0 +1,12 @@ +MAX_RETRIES = 3 +BACKOFF_BASE = 1.2 + +ACCEPT_LANGUAGES = "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2" +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" +) + +DEFAULT_HEADERS = { + "Accept-Language": ACCEPT_LANGUAGES, + "User-Agent": USER_AGENT, +} diff --git a/icrawler/downloader.py b/icrawler/downloader.py index f60870c..2da1288 100644 --- a/icrawler/downloader.py +++ b/icrawler/downloader.py @@ -1,4 +1,5 @@ import queue +import errno import time from io import BytesIO from threading import current_thread @@ -114,7 +115,7 @@ def download(self, task, default_ext, timeout=5, max_retry=3, overwrite=False, * return self.fetched_num -= 1 - while retry > 0 and not self.signal.get("reach_max_num"): + while retry > 0 and not self.signal.get("reach_max_num") and not self.signal.get("exceed_storage_space"): try: response = self.session.get(file_url, timeout=timeout) except Exception as e: @@ -136,10 +137,19 @@ def download(self, task, default_ext, timeout=5, max_retry=3, overwrite=False, * with self.lock: self.fetched_num += 1 filename = self.get_filename(task, default_ext) - self.logger.info("image #%s\t%s", self.fetched_num, file_url) - self.storage.write(filename, response.content) - task["success"] = True - task["filename"] = filename + self.logger.info("image #%s\t%s %s", self.fetched_num, filename, file_url) + + task["success"] = False + try: + task["filename"] = filename # may be zero bytes if OSError happened during write() + self.storage.write(filename, response.content) + task["success"] = True + except OSError as o: + # errno.EINVAL -- name too long + if o.errno == errno.ENOSPC: + self.signal.set(exceed_storage_space=True) + else: + raise break finally: retry -= 1 diff --git a/icrawler/parser.py b/icrawler/parser.py index 7fb845a..ee31350 100644 --- a/icrawler/parser.py +++ b/icrawler/parser.py @@ -61,6 +61,11 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs): "downloaded image reached max num, thread %s " "is ready to exit", current_thread().name ) break + if self.signal.get("exceed_storage_space"): + self.logger.info( + "no more storage space, thread %s " "is ready to exit", current_thread().name + ) + break # get the page url try: url = self.in_queue.get(timeout=queue_timeout) @@ -90,8 +95,14 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs): ) else: self.logger.info(f"parsing result page {url}") - for task in self.parse(response, **kwargs): - while not self.signal.get("reach_max_num"): + task_list = self.parse(response, **kwargs) + if not task_list: + self.logger.debug("self.parse() returned no tasks") + with open("task_list_error.log", 'ab') as f: + f.write(response.content) + + for task in task_list: + while not self.signal.get("reach_max_num") and not self.signal.get("exceed_storage_space"): try: if isinstance(task, dict): self.output(task, timeout=1) @@ -110,6 +121,8 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs): break if self.signal.get("reach_max_num"): break + if self.signal.get("exceed_storage_space"): + break self.in_queue.task_done() break finally: diff --git a/icrawler/utils/proxy_pool.py b/icrawler/utils/proxy_pool.py index 76da786..abcd93a 100644 --- a/icrawler/utils/proxy_pool.py +++ b/icrawler/utils/proxy_pool.py @@ -1,10 +1,10 @@ -import json import logging import queue import random import threading import time +import chanfig import requests from bs4 import BeautifulSoup @@ -44,7 +44,12 @@ def to_dict(self): dict: A dict with four keys: ``addr``, ``protocol``, ``weight`` and ``last_checked`` """ - return dict(addr=self.addr, protocol=self.protocol, weight=self.weight, last_checked=self.last_checked) + return { + "addr": self.addr, + "protocol": self.protocol, + "weight": self.weight, + "last_checked": self.last_checked, + } class ProxyPool: @@ -146,17 +151,15 @@ def save(self, filename): for proxy in self.proxies[protocol]: serializable_proxy = self.proxies[protocol][proxy].to_dict() proxies[protocol].append(serializable_proxy) - with open(filename, "w") as fout: - json.dump(proxies, fout) + chanfig.save(proxies, filename) def load(self, filename): """Load proxies from file""" - with open(filename) as fin: - proxies = json.load(fin) - for protocol in proxies: - for proxy in proxies[protocol]: + proxies = chanfig.load(filename) + for protocol, protocol_proxies in proxies.items(): + for proxy in protocol_proxies: self.proxies[protocol][proxy["addr"]] = Proxy( - proxy["addr"], proxy["protocol"], proxy["weight"], proxy["last_checked"] + proxy["addr"], protocol, proxy.get("weight", 1.0), proxy.get("last_checked") ) self.addr_list[protocol].append(proxy["addr"]) @@ -215,7 +218,7 @@ def is_valid(self, addr, protocol="http", timeout=5): raise except requests.exceptions.Timeout: return {"valid": False, "msg": "timeout"} - except: + except BaseException: # noqa: B036 return {"valid": False, "msg": "exception"} else: if r.status_code == 200: @@ -278,12 +281,12 @@ def scan( t = threading.Thread( name=f"val-{i + 1:0>2d}", target=self.validate, - kwargs=dict( - proxy_scanner=proxy_scanner, - expected_num=expected_num, - queue_timeout=queue_timeout, - val_timeout=val_timeout, - ), + kwargs={ + "proxy_scanner": proxy_scanner, + "expected_num": expected_num, + "queue_timeout": queue_timeout, + "val_timeout": val_timeout, + }, ) t.daemon = True val_threads.append(t) @@ -291,7 +294,7 @@ def scan( for t in val_threads: t.join() self.logger.info("Proxy scanning done!") - except: + except BaseException: raise finally: if out_file is not None: @@ -466,18 +469,14 @@ def scan_free_proxy_list(self): def scan_file(self, src_file): """Scan candidate proxies from an existing file""" self.logger.info(f"start scanning file {src_file} for proxy list...") - with open(src_file) as fin: - proxies = json.load(fin) + proxies = chanfig.load(src_file) for protocol in proxies.keys(): for proxy in proxies[protocol]: self.proxy_queue.put({"addr": proxy["addr"], "protocol": protocol}) def is_scanning(self): """Return whether at least one scanning thread is alive""" - for t in self.scan_threads: - if t.is_alive(): - return True - return False + return any(t.is_alive() for t in self.scan_threads) def scan(self): """Start a thread for each registered scan function to scan proxy lists""" diff --git a/icrawler/utils/session.py b/icrawler/utils/session.py index febf1fd..d575571 100644 --- a/icrawler/utils/session.py +++ b/icrawler/utils/session.py @@ -1,42 +1,59 @@ +from __future__ import annotations + +import logging +from collections.abc import Mapping from urllib.parse import urlsplit import requests +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential + +from .. import defaults +from .proxy_pool import ProxyPool class Session(requests.Session): - def __init__(self, proxy_pool): + def __init__( + self, proxy_pool: ProxyPool | None = None, headers: Mapping | None = None, cookies: Mapping | None = None + ): super().__init__() + self.logger = logging.getLogger("cscholars.connection") self.proxy_pool = proxy_pool + if headers is not None: + self.headers.update(headers) + if cookies is not None: + self.cookies.update(cookies) def _url_scheme(self, url): return urlsplit(url).scheme - def get(self, url, **kwargs): - proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url)) - if proxy is None: - return super().get(url, **kwargs) - try: - response = super().get(url, proxies=proxy.format(), **kwargs) - except requests.exceptions.ConnectionError: - self.proxy_pool.decrease_weight(proxy) - raise - except: - raise - else: - self.proxy_pool.increase_weight(proxy) - return response - - def post(self, url, data=None, json=None, **kwargs): - proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url)) - if proxy is None: - return super().get(url, data, json, **kwargs) - try: - response = super().post(url, data, json, proxies=proxy.format(), **kwargs) - except requests.exceptions.ConnectionError: - self.proxy_pool.decrease_weight(proxy) - raise - except: - raise + @retry( + stop=stop_after_attempt(defaults.MAX_RETRIES), + wait=wait_random_exponential(exp_base=defaults.BACKOFF_BASE), + retry=retry_if_exception_type((requests.RequestException, requests.HTTPError, requests.ConnectionError)), + ) + def request(self, method, url, *args, **kwargs): + message = f"{method}ing {url}" + if args and kwargs: + message += f" with {args} and {kwargs}" + elif args: + message += f" with {args}" + elif kwargs: + message += f" with {kwargs}" + self.logger.debug(message) + + if self.proxy_pool is not None: + proxy = self.proxy_pool.get_next(protocol=self._url_scheme(url)) + self.logger.debug(f"Using proxy: {proxy.format()}") + try: + response = super().request(method, url, *args, proxies=proxy.format(), **kwargs) + response.raise_for_status() + self.proxy_pool.increase_weight(proxy) + except (requests.ConnectionError, requests.HTTPError): + self.proxy_pool.decrease_weight(proxy) + raise else: - self.proxy_pool.increase_weight(proxy) - return response + response = super().request(method, url, *args, **kwargs) + + if "set-cookie" in response.headers: + self.cookies.update(response.cookies) + return response diff --git a/pyproject.toml b/pyproject.toml index db5f305..6f57f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,9 +46,9 @@ dynamic = [ dependencies = [ "beautifulsoup4", "bs4", + "chanfig", "lxml", "pillow", - "pyyaml", "requests", "six", ]