python_proxy_server
Published in:2024-03-22 |
Words: 5.3k | Reading time: 26min | reading:

使用python创建一个高效、可用的代理池服务

代理

向目标网站或服务器隐藏IP,利用代理我们可以解决目标网站封 IP 的问题

  • 由代理服务器自己去访问你的目标网站,并加载它的内容,然后再把这些加载过的内容传递到你的窗口上。目标网站就无法获取你的IP地址,取而代之获取的是代理服务器的IP地址。

设计思路

  • 系统分为四块,获取模块、存储模块、检查模块、接口模块
  • 获取模块需要定时去各大代理网站抓取代理,代理可以是免费公开代理也可以是付费代理,代理的形式都是 IP 加端口,尽量从不同来源获取,尽量抓取高匿代理,抓取完之后将可用代理保存到数据库中。
  • 存储模块负责存储抓取下来的代理。首先我们需要保证代理不重复,另外我们还需要标识代理的可用情况,而且需要动态实时处理每个代理,所以说,一种比较高效和方便的存储方式就是使用 Redis 的 Sorted Set,也就是有序集合。
  • 检测模块需要定时将数据库中的代理进行检测,在这里我们需要设置一个检测链接,最好是爬取哪个网站就检测哪个网站,这样更加有针对性,如果要做一个通用型的代理,那可以设置百度等链接来检测。另外我们需要标识每一个代理的状态,如设置分数标识,100 分代表可用,分数越少代表越不可用,检测一次如果可用,我们可以将其立即设置为 100 满分,也可以在原基础上加 1 分,当不可用,可以将其减 1 分,当减到一定阈值后就直接从数据库移除。通过这样的标识分数,我们就可以区分出代理的可用情况,选用的时候会更有针对性。
  • 接口模块需要用 API 来提供对外服务的接口,其实我们可以直接连数据库来取,但是这样就需要知道数据库的连接信息,不太安全,而且需要配置连接,所以一个比较安全和方便的方式就是提供一个 Web API 接口,通过访问接口即可拿到可用代理。另外由于可用代理可能有多个,我们可以提供随机返回一个可用代理的接口,这样保证每个可用代理都可以取到,实现负载均衡。

实现思路

  • 基于上述分析:
  • 存储模块使用 Redis 的有序集合,用以代理的去重和状态标识,同时它也是中心模块和基础模块,将其他模块串联起来。
  • 获取模块定时从代理网站获取代理,将获取的代理传递给存储模块,保存到数据库。
  • 检测模块定时通过存储模块获取所有代理,并对其进行检测,根据不同的检测结果对代理设置不同的标识。
  • 接口模块通过 Web API 提供服务接口,其内部还是连接存储模块,获取可用的代理

模块实现

代理存储

  • 使用 Redis 的有序集合,集合的每一个元素都是不重复的,对于代理代理池来说,集合的元素就变成了一个个代理,也就是 IP 加端口的形式,如 60.207.237.111:8888,这样的一个代理就是集合的一个元素。另外有序集合的每一个元素还都有一个分数字段,分数是可以重复的,是一个浮点数类型,也可以是整数类型。该集合会根据每一个元素的分数对集合进行降序排序。 对于代理池来说,这个分数可以作为我们判断一个代理可用不可用的标志,我们将 100 设为最高分,代表可用,0 设为最低分,代表不可用。从代理池中获取代理的时候会随机获取分数最高的代理,注意这里是随机获取高分代理,这样可以保证每个可用代理都会被调用到。 分数是我们判断代理稳定性的重要标准,在这里我们设置分数规则如下:

  • 分数 100 为可用,检测器会定时循环检测每个代理可用情况,一旦检测到有可用的代理就立即置为 100,检测到不可用就将分数减 1,减至 0 后移除。

  • 新获取的代理添加时将分数置为 10,当测试可行立即置 100,不可行分数减 1,减至 0 后移除。

实现代码

  • 存储代码示例如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    MAX_SCORE = 100
    MIN_SCORE = 0
    INITIAL_SCORE = 10
    REDIS_HOST = 'localhost'
    REDIS_PORT = 6379
    REDIS_PASSWORD = None
    REDIS_KEY = 'proxies'

    import redis
    from random import choice

    class RedisClient(object):
    def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
    """
    初始化
    :param host: Redis 地址
    :param port: Redis 端口
    :param password: Redis密码
    """
    self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)

    def add(self, proxy, score=INITIAL_SCORE):
    """
    添加代理,设置分数为最高
    :param proxy: 代理
    :param score: 分数
    :return: 添加结果
    """
    if not self.db.zscore(REDIS_KEY, proxy):
    return self.db.zadd(REDIS_KEY, score, proxy)

    def random(self):
    """
    随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
    :return: 随机代理
    """
    result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
    if len(result):
    return choice(result)
    else:
    result = self.db.zrevrange(REDIS_KEY, 0, 100)
    if len(result):
    return choice(result)
    else:
    raise PoolEmptyError

    def decrease(self, proxy):
    """
    代理值减一分,小于最小值则删除
    :param proxy: 代理
    :return: 修改后的代理分数
    """
    score = self.db.zscore(REDIS_KEY, proxy)
    if score and score > MIN_SCORE:
    print('代理', proxy, '当前分数', score, '减1')
    return self.db.zincrby(REDIS_KEY, proxy, -1)
    else:
    print('代理', proxy, '当前分数', score, '移除')
    return self.db.zrem(REDIS_KEY, proxy)

    def exists(self, proxy):
    """
    判断是否存在
    :param proxy: 代理
    :return: 是否存在
    """
    return not self.db.zscore(REDIS_KEY, proxy) == None

    def max(self, proxy):
    """
    将代理设置为MAX_SCORE
    :param proxy: 代理
    :return: 设置结果
    """
    print('代理', proxy, '可用,设置为', MAX_SCORE)
    return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)

    def count(self):
    """
    获取数量
    :return: 数量
    """
    return self.db.zcard(REDIS_KEY)

    def all(self):
    """
    获取全部代理
    :return: 全部代理列表
    """
    return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)

    上述代码中MAX_SCORE、MIN_SCORE、INITIAL_SCORE 分别代表最大分数、最小分数、初始分数。REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 分别代表了 Redis 的连接信息,即地址、端口、密码。REDIS_KEY 是有序集合的键名

获取代理

  • 定义一个 Crawler 来从各大网站抓取代理,代理网站类代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    import json
    from .utils import get_page
    from pyquery import PyQuery as pq

    class ProxyMetaclass(type):
    def __new__(cls, name, bases, attrs):
    count = 0
    attrs['__CrawlFunc__'] = []
    for k, v in attrs.items():
    if 'crawl_' in k:
    attrs['__CrawlFunc__'].append(k)
    count += 1
    attrs['__CrawlFuncCount__'] = count
    return type.__new__(cls, name, bases, attrs)

    class Crawler(object, metaclass=ProxyMetaclass):
    def get_proxies(self, callback):
    proxies = []
    for proxy in eval("self.{}()".format(callback)):
    print('成功获取到代理', proxy)
    proxies.append(proxy)
    return proxies

    def crawl_daili66(self, page_count=4):
    """
    获取代理66
    :param page_count: 页码
    :return: 代理
    """
    start_url = 'http://www.66ip.cn/{}.html'
    urls = [start_url.format(page) for page in range(1, page_count + 1)]
    for url in urls:
    print('Crawling', url)
    html = get_page(url)
    if html:
    doc = pq(html)
    trs = doc('.containerbox table tr:gt(0)').items()
    for tr in trs:
    ip = tr.find('td:nth-child(1)').text()
    port = tr.find('td:nth-child(2)').text()
    yield ':'.join([ip, port])

    def crawl_proxy360(self):
    """
    获取Proxy360
    :return: 代理
    """
    start_url = 'http://www.proxy360.cn/Region/China'
    print('Crawling', start_url)
    html = get_page(start_url)
    if html:
    doc = pq(html)
    lines = doc('div[name="list_proxy_ip"]').items()
    for line in lines:
    ip = line.find('.tbBottomLine:nth-child(1)').text()
    port = line.find('.tbBottomLine:nth-child(2)').text()
    yield ':'.join([ip, port])

    def crawl_goubanjia(self):
    """
    获取Goubanjia
    :return: 代理
    """
    start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
    html = get_page(start_url)
    if html:
    doc = pq(html)
    tds = doc('td.ip').items()
    for td in tds:
    td.find('p').remove()
    yield td.text().replace(' ', '')

    上述代码通过:统一定义以 crawl 开头,抓取代理 66、Proxy360、Goubanjia 三个免费代理网站,根据yield 返回一个个代理。首先将网页获取,然后用 PyQuery 解析,解析出 IP 加端口的形式的代理然后返回。 然后定义了一个 get_proxies () 方法,将所有以 crawl 开头的方法调用一遍,获取每个方法返回的代理并组合成列表形式返回,通过此方法,我们可以只需要添加一个以 crawl 开头的方法,例如抓取快代理,在 Crawler 类中增加 crawl_kuaidaili () 方法,仿照其他的几个方法将其定义成生成器,抓取其网站的代理,然后通过 yield 返回代理即可。

  • 然后,定义一个 Getter 类,动态地调用所有以 crawl 开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    from db import RedisClient
    from crawler import Crawler

    POOL_UPPER_THRESHOLD = 10000

    # 获取模块中所有的类
    crawlers_cls = [
    base,
    Ip89Crawler,
    Daili66Crawler,
    Data5UCrawler,
    DocipCrawler,
    FatezeroCrawler,
    GeonodeCrawler,
    GoubanjiaCrawler,
    IhuanCrawler,
    IP3366Crawler,
    IPHaiCrawler,
    JiangxianliCrawler,
    KuaidailiCrawler,
    SeoFangFaCrawler,
    TaiyangdailiCrawler,
    UqidataCrawler,
    XiaoShuCrawler,
    XicidailiCrawler,
    XiladailiCrawler,
    YqIeCrawler,
    ZhandayeCrawler
    ]

    crawlers_cls = [cls for cls in crawlers_cls
    if isinstance(cls, type) and issubclass(cls, BaseCrawler) and cls is not BaseCrawler
    and not getattr(cls, 'ignore', False)]


    class Getter(object):
    """
    getter of src
    """

    def __init__(self):
    """
    init db and crawlers
    """
    self.redis = RedisClient()
    self.crawlers_cls = crawlers_cls
    self.crawlers = [crawler_cls() for crawler_cls in self.crawlers_cls]

    def is_full(self):
    """
    if src if full
    return: bool
    """
    return self.redis.count() >= PROXY_NUMBER_MAX

    @logger.catch
    def run(self):
    """
    run crawlers to get proxy
    :return:
    """
    logger.info('stating getter...')
    if self.is_full():
    logger.error("crawler is full!")
    return
    for crawler in self.crawlers:
    logger.info(f'crawler {crawler} to get proxy')
    for proxy in crawler.crawl():
    self.redis.add(proxy)
    if not self.crawlers:
    logger.error("import crawler error! please check crawler import.")

    Getter 类就是获取器类,这其中定义了一个变量 POOL_UPPER_THRESHOLD 表示代理池的最大数量,然后定义了 is_over_threshold () 方法判断代理池是否已经达到了容量阈值,它就是调用了 RedisClient 的 count () 方法获取代理的数量,然后加以判断,如果数量达到阈值则返回 True,否则 False。如果不想加这个限制可以将此方法永久返回 True。 接下来定义了 run () 方法,首先判断了代理池是否达到阈值,然后在这里就调用了 Crawler 类的 CrawlFunc 属性,获取到所有以 crawl 开头的方法列表,依次通过 get_proxies () 方法调用,得到各个方法抓取到的代理,然后再利用 RedisClient 的 add () 方法加入数据库.

检测代理

由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库 Aiohttp 来进行检测.

  • 检测代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    VALID_STATUS_CODES = [200]
    TEST_URL = 'http://www.baidu.com'
    BATCH_TEST_SIZE = 100

    class Tester(object):
    def __init__(self):
    self.redis = RedisClient()

    async def test_single_proxy(self, proxy):
    """
    测试单个代理
    :param proxy: 单个代理
    :return: None
    """
    conn = aiohttp.TCPConnector(verify_ssl=False)
    async with aiohttp.ClientSession(connector=conn) as session:
    try:
    if isinstance(proxy, bytes):
    proxy = proxy.decode('utf-8')
    real_proxy = 'http://' + proxy
    print('正在测试', proxy)
    async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
    if response.status in VALID_STATUS_CODES:
    self.redis.max(proxy)
    print('代理可用', proxy)
    else:
    self.redis.decrease(proxy)
    print('请求响应码不合法', proxy)
    except (ClientError, ClientConnectorError, TimeoutError, AttributeError):
    self.redis.decrease(proxy)
    print('代理请求失败', proxy)

    def run(self):
    """
    测试主函数
    :return: None
    """
    print('测试器开始运行')
    try:
    proxies = self.redis.all()
    loop = asyncio.get_event_loop()
    # 批量测试
    for i in range(0, len(proxies), BATCH_TEST_SIZE):
    test_proxies = proxies[i:i + BATCH_TEST_SIZE]
    tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
    loop.run_until_complete(asyncio.wait(tasks))
    time.sleep(5)
    except Exception as e:
    print('测试器发生错误', e.args)

    上述代码:异步test_single_proxy () 方法,用来检测单个代理的可用情况,其参数就是被检测的代理;批量测试的最大值 BATCH_TEST_SIZE 为 100,避免当代理池过大时全部测试导致内存开销过大的问题, run () 方法里面获取了所有的代理列表,使用 Aiohttp 分配任务,启动运行

接口封装

  • 以 Web API 的形式暴露可用代理,使用一个比较轻量级的库 Flask 来实现这个接口模块:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    from flask import Flask, g
    from db import RedisClient

    __all__ = ['app']
    app = Flask(__name__)

    def get_conn():
    if not hasattr(g, 'redis'):
    g.redis = RedisClient()
    return g.redis

    @app.route('/')
    def index():
    return '<h2>Welcome to Proxy Pool System</h2>'

    @app.route('/random')
    def get_proxy():
    """
    获取随机可用代理
    :return: 随机代理
    """
    conn = get_conn()
    return conn.random()

    @app.route('/count')
    def get_counts():
    """
    获取代理池总量
    :return: 代理池总量
    """
    conn = get_conn()
    return str(conn.count())

    if __name__ == '__main__':
    app.run()

资源调度模块

  • 调用以上所定义的三个模块,将以上三个模块通过多进程的形式运行,高效利用资源
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    TESTER_CYCLE = 20
    GETTER_CYCLE = 20
    TESTER_ENABLED = True
    GETTER_ENABLED = True
    API_ENABLED = True

    from multiprocessing import Process
    from api import app
    from getter import Getter
    from tester import Tester

    class Scheduler():
    def schedule_tester(self, cycle=TESTER_CYCLE):
    """
    定时测试代理
    """
    tester = Tester()
    while True:
    print('测试器开始运行')
    tester.run()
    time.sleep(cycle)

    def schedule_getter(self, cycle=GETTER_CYCLE):
    """
    定时获取代理
    """
    getter = Getter()
    while True:
    print('开始抓取代理')
    getter.run()
    time.sleep(cycle)

    def schedule_api(self):
    """
    开启API
    """
    app.run(API_HOST, API_PORT)

    def run(self):
    print('代理池开始运行')
    if TESTER_ENABLED:
    tester_process = Process(target=self.schedule_tester)
    tester_process.start()

    if GETTER_ENABLED:
    getter_process = Process(target=self.schedule_getter)
    getter_process.start()

    if API_ENABLED:
    api_process = Process(target=self.schedule_api)
    api_process.start()
  • 以下版本兼容win平台打包exe:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys


sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import asyncio
import threading
import time
from processors.server import app
from processors.getter import Getter
from processors.tester import Tester
from run.setting import APP_PROD_METHOD_GEVENT, APP_PROD_METHOD_MEINHELD, APP_PROD_METHOD_TORNADO, CYCLE_GETTER, CYCLE_TESTER, API_HOST, \
API_THREADED, API_PORT, ENABLE_SERVER, IS_PROD, APP_PROD_METHOD, \
ENABLE_GETTER, ENABLE_TESTER, IS_WINDOWS
from loguru import logger

tester_process, getter_process, server_process = None, None, None


class Scheduler():
"""
scheduler
"""

def run_tester(self, cycle=CYCLE_TESTER):
"""
run tester
"""
if not ENABLE_TESTER:
logger.info('tester not enabled, exit')
return
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tester = Tester()
loop = 0
while True:
logger.debug(f'tester loop {loop} start...')
tester.run()

loop += 1
time.sleep(cycle)

def run_getter(self, cycle=CYCLE_GETTER):
"""
run getter
"""
if not ENABLE_GETTER:
logger.info('getter not enabled, exit')
return
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
getter = Getter()
loop = 0
while True:
logger.debug(f'getter loop {loop} start...')
getter.run()
loop += 1
time.sleep(cycle)

def run_server(self):
"""
run server for api
"""
if not ENABLE_SERVER:
logger.info('server not enabled, exit')
return
if IS_PROD:
if APP_PROD_METHOD == APP_PROD_METHOD_GEVENT:
try:
from gevent.pywsgi import WSGIServer
except ImportError as e:
logger.exception(e)
else:
http_server = WSGIServer((API_HOST, API_PORT), app)
http_server.serve_forever()

elif APP_PROD_METHOD == APP_PROD_METHOD_TORNADO:
try:
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
except ImportError as e:
logger.exception(e)
else:
http_server = HTTPServer(WSGIContainer(app))
http_server.listen(API_PORT)
IOLoop.instance().start()

elif APP_PROD_METHOD == APP_PROD_METHOD_MEINHELD:
try:
import meinheld
except ImportError as e:
logger.exception(e)
else:
meinheld.listen((API_HOST, API_PORT))
meinheld.run(app)

else:
logger.error("unsupported APP_PROD_METHOD")
return
else:
app.run(host=API_HOST, port=API_PORT, threaded=API_THREADED, use_reloader=False, debug=True)

def run(self):
global tester_process, getter_process, server_process
try:
logger.info('starting ..')
if ENABLE_TESTER:
tester_process = threading.Thread(
target=self.run_tester)
logger.info(f'starting tester, pid {tester_process.name}...')
tester_process.start()

if ENABLE_GETTER:
getter_process = threading.Thread(
target=self.run_getter)
logger.info(f'starting getter, pid {getter_process.name}...')
getter_process.start()

if ENABLE_SERVER:
server_process = threading.Thread(
target=self.run_server)
logger.info(f'starting server, pid {server_process.name}...')
server_process.start()

tester_process and tester_process.join()
getter_process and getter_process.join()
server_process and server_process.join()
except KeyboardInterrupt:
logger.info('received keyboard interrupt signal')
# tester_process and tester_process()
# getter_process and getter_process.stop()
# server_process and server_process.terminate()
finally:
# must call join method before calling is_alive
tester_process and tester_process.join()
getter_process and getter_process.join()
server_process and server_process.join()
logger.info(
f'tester is {"alive" if tester_process.is_alive() else "dead"}')
logger.info(
f'getter is {"alive" if getter_process.is_alive() else "dead"}')
logger.info(
f'server is {"alive" if server_process.is_alive() else "dead"}')
logger.info('proxy terminated')


if __name__ == '__main__':
scheduler = Scheduler()
scheduler.run()

三个常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布尔类型,True 或者 False。标明了测试模块、获取模块、接口模块的开关,如果为 True,则代表模块开启。 启动入口是 run () 方法,其分别判断了三个模块的开关,如果开启的话,就新建一个 Process 进程,设置好启动目标,然后调用 start () 方法运行.

运行与测试

运行代码

  • 采用如下形式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import argparse

from run.scheduler import Scheduler

parser = argparse.ArgumentParser(description='ProxyPool')
parser.add_argument('--processor', type=str, help='processor to run')
args = parser.parse_args()

if __name__ == '__main__':
# if processor set, just run it
if args.processor:
getattr(Scheduler(), f'run_{args.processor}')()
else:
Scheduler().run()

编写测试代码

  • request请求测试
1
2
3
4
5
6
7
8
9
10
11
12

import requests

PROXY_POOL_URL = 'http://localhost:5555/random'

def get_proxy():
try:
response = requests.get(PROXY_POOL_URL)
if response.status_code == 200:
return response.text
except ConnectionError:
return None

代理使用

  • 具体用法不一,举例如下:
1
2
3
4
5
6
7
8
9
10
11
12
import requests

proxy = get_proxy()
proxies = {
'http': 'http://' + proxy,
'https': 'https://' + proxy,
}
try:
response = requests.get('http://httpbin.org/get', proxies=proxies)
print(response.text)
except requests.exceptions.ConnectionError as e:
print('Error', e.args)
  • 抑或
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
proxy = {
"proxyType": "manual",
"httpProxy": "http://" + constants.proxy_server_ip + ":" + str(constants.proxy_server_port), # 代理服务器地址和端口
"ftpProxy": "ftp://" + constants.proxy_server_ip + ":" + str(constants.proxy_server_port),
"sslProxy": "https://" + constants.proxy_server_ip + ":" + str(constants.proxy_server_port),
"noProxy": "",
"proxyAutoconfigUrl": ""
}

options = webdriver.ChromeOptions()
if constants.spider_mode == 'auto':
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
logger.warning("current spider mode: auto spider image mode!")

# open dev tools
options.add_argument("--auto-open-devtools-for-tabs")
# 接受不安全证书
options.add_argument("--ignore-certificate-errors")
# 设置日志偏好,禁用所有日志
options = disabled_log_browser(options)
# 去除 “Chrome正受到自动化测试软件的控制”
options.add_experimental_option('excludeSwitches', ['enable-automation'])
# 添加浏览器特征
options.add_argument("--disable-blink-features=AutomationControlled")
# 模拟不同浏览器访问页面 减少被封风险
user_agents = read_user_agent()
if not user_agents:
# 没有txt 使用default value
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.150 '
'Safari/537.36 '
]
logger.warning(f"not user-agent, use default user-agent: {user_agents[0]}")
# 随机添加user-agent
cur_user_agents = random.choice(user_agents).strip()
# add user-agent
if cur_user_agents:
options.add_argument(
f"user-agent={cur_user_agents}")
logger.info(f"current use user-agent: {cur_user_agents}")
if proxy_flag == 'True':
if constants.proxy_mode == 'auto':
logger.info("cur spider use proxy is auto select proxy model.")
proxy_item = get_proxy_item() # 直接调用查询proxy接口即可
if not proxy_item:
logger.error("proxy_item None, will quit spider image!")
return None, None, None
logger.debug(f"use proxy: {proxy_item}")
proxy = {
"proxyType": "manual",
"httpProxy": "http://" + proxy_item, # 代理服务器地址和端口
"ftpProxy": "ftp://" + proxy_item,
"sslProxy": "https://" + proxy_item,
"noProxy": "",
"proxyAutoconfigUrl": ""
}
# options.set_capability("proxy", proxy)
options.add_argument("--proxy-server={}".format(proxy["httpProxy"]))
logger.info("current use internal proxy, proxy content: " + str(proxy['httpProxy']))
if constants.chrome_path != 'None':
ser = Service()
ser.path = constants.chrome_path
# 连接Edge浏览器
driver = webdriver.Chrome(service=ser, options=options)
logger.warning("user self define chrome driver exe!")
else:
driver = webdriver.Chrome(options=options)
logger.info("use system default web driver!")

补充

random 获取代理机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@logger.catch
def select_best_proxy(data):
"""

:param data:
:return:
"""
# same_max_value = []
block_split_num = 10
if block_split_num < len(data) - 1:
length_data = int(len(data) / block_split_num)
index = int(random.random() * length_data) + 1
if len(data) > length_data + 2:
logger.info(f"data length: {len(data)} > {length_data + 2}, use random proxy: {index}")
max_value = data[len(data) - index]
else:
logger.info(f"data length: {len(data)} < {length_data + 2}, use max value!")
max_value = data[len(data) - 1]
else:
logger.info(f"data length: {len(data)} < {block_split_num}, use max value!")
max_value = data[len(data) - 1]
return max_value.string()

base crawel

  • 基础定义
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    class BaseCrawler(object):
    urls = []

    @retry(stop_max_attempt_number=3, retry_on_result=lambda x: x is None, wait_fixed=2000)
    def fetch(self, url, **kwargs):
    try:
    headers = Headers(headers=True).generate()
    kwargs.setdefault('timeout', GET_TIMEOUT)
    kwargs.setdefault('verify', False)
    kwargs.setdefault('headers', headers)
    response = requests.get(url, **kwargs)
    if response.status_code == 200:
    response.encoding = 'utf-8'
    return response.text
    except (requests.ConnectionError, requests.ReadTimeout):
    return

    def process(self, html, url):
    """
    used for parse html
    """
    for proxy in self.parse(html):
    # logger.info(f'fetched proxy {proxy.string()} from {url}')
    yield proxy

    def crawl(self):
    """
    crawl main method
    """
    try:
    for url in self.urls:
    logger.info(f'fetching {url}')
    html = self.fetch(url)
    if not html:
    continue
    time.sleep(.5)
    yield from self.process(html, url)
    except RetryError:
    logger.error(
    f'crawler {self} crawled proxy unsuccessfully, '
    'please check if target url is valid or network issue')

  • 扩展应用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import os
    import sys


    sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
    from pyquery import PyQuery as pq
    from schemas.proxy import Proxy
    from crawlers.base import BaseCrawler
    from loguru import logger
    import re


    BASE_URL = 'https://www.zdaye.com/dayProxy/{page}.html'
    MAX_PAGE = 5 * 2


    class ZhandayeCrawler(BaseCrawler):
    """
    zhandaye crawler, https://www.zdaye.com/dayProxy/
    """
    urls_catalog = [BASE_URL.format(page=page) for page in range(1, MAX_PAGE)]
    headers = {
    'User-Agent': 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, '
    'like Gecko) Chrome/83.0.4103.61 Safari/537.36 '
    }
    urls = []
    ignore = True

    def crawl(self):
    self.crawl_catalog()
    yield from super().crawl()

    def crawl_catalog(self):
    for url in self.urls_catalog:
    logger.info(f'fetching {url}')
    html = self.fetch(url, headers=self.headers)
    self.parse_catalog(html)

    def parse_catalog(self, html):
    """
    parse html file to get proxies
    :return:
    """
    doc = pq(html)
    for item in doc('#J_posts_list .thread_item div div p a').items():
    url = 'https://www.zdaye.com' + item.attr('href')
    logger.info(f'get detail url: {url}')
    self.urls.append(url)

    def parse(self, html):
    doc = pq(html)
    trs = doc('.cont br').items()
    for tr in trs:
    line = tr[0].tail
    match = re.search(r'(\d+\.\d+\.\d+\.\d+):(\d+)', line)
    if match:
    host = match.group(1)
    port = match.group(2)
    yield Proxy(host=host, port=port)


    if __name__ == '__main__':
    crawler = ZhandayeCrawler()
    for proxy in crawler.crawl():
    print(proxy)

Prev:
基于opencv使用python实现人脸识别与视频生成
Next:
spider_image_system_introduction