使用python创建一个高效、可用的代理池服务
代理
向目标网站或服务器隐藏IP,利用代理我们可以解决目标网站封 IP 的问题
- 由代理服务器自己去访问你的目标网站,并加载它的内容,然后再把这些加载过的内容传递到你的窗口上。目标网站就无法获取你的IP地址,取而代之获取的是代理服务器的IP地址。
设计思路
- 系统分为四块,获取模块、存储模块、检查模块、接口模块
- 获取模块需要定时去各大代理网站抓取代理,代理可以是免费公开代理也可以是付费代理,代理的形式都是 IP 加端口,尽量从不同来源获取,尽量抓取高匿代理,抓取完之后将可用代理保存到数据库中。
- 存储模块负责存储抓取下来的代理。首先我们需要保证代理不重复,另外我们还需要标识代理的可用情况,而且需要动态实时处理每个代理,所以说,一种比较高效和方便的存储方式就是使用 Redis 的 Sorted Set,也就是有序集合。
- 检测模块需要定时将数据库中的代理进行检测,在这里我们需要设置一个检测链接,最好是爬取哪个网站就检测哪个网站,这样更加有针对性,如果要做一个通用型的代理,那可以设置百度等链接来检测。另外我们需要标识每一个代理的状态,如设置分数标识,100 分代表可用,分数越少代表越不可用,检测一次如果可用,我们可以将其立即设置为 100 满分,也可以在原基础上加 1 分,当不可用,可以将其减 1 分,当减到一定阈值后就直接从数据库移除。通过这样的标识分数,我们就可以区分出代理的可用情况,选用的时候会更有针对性。
- 接口模块需要用 API 来提供对外服务的接口,其实我们可以直接连数据库来取,但是这样就需要知道数据库的连接信息,不太安全,而且需要配置连接,所以一个比较安全和方便的方式就是提供一个 Web API 接口,通过访问接口即可拿到可用代理。另外由于可用代理可能有多个,我们可以提供随机返回一个可用代理的接口,这样保证每个可用代理都可以取到,实现负载均衡。
实现思路
- 基于上述分析:
- 存储模块使用 Redis 的有序集合,用以代理的去重和状态标识,同时它也是中心模块和基础模块,将其他模块串联起来。
- 获取模块定时从代理网站获取代理,将获取的代理传递给存储模块,保存到数据库。
- 检测模块定时通过存储模块获取所有代理,并对其进行检测,根据不同的检测结果对代理设置不同的标识。
- 接口模块通过 Web API 提供服务接口,其内部还是连接存储模块,获取可用的代理
模块实现
代理存储
使用 Redis 的有序集合,集合的每一个元素都是不重复的,对于代理代理池来说,集合的元素就变成了一个个代理,也就是 IP 加端口的形式,如 60.207.237.111:8888,这样的一个代理就是集合的一个元素。另外有序集合的每一个元素还都有一个分数字段,分数是可以重复的,是一个浮点数类型,也可以是整数类型。该集合会根据每一个元素的分数对集合进行降序排序。 对于代理池来说,这个分数可以作为我们判断一个代理可用不可用的标志,我们将 100 设为最高分,代表可用,0 设为最低分,代表不可用。从代理池中获取代理的时候会随机获取分数最高的代理,注意这里是随机获取高分代理,这样可以保证每个可用代理都会被调用到。 分数是我们判断代理稳定性的重要标准,在这里我们设置分数规则如下:
分数 100 为可用,检测器会定时循环检测每个代理可用情况,一旦检测到有可用的代理就立即置为 100,检测到不可用就将分数减 1,减至 0 后移除。
新获取的代理添加时将分数置为 10,当测试可行立即置 100,不可行分数减 1,减至 0 后移除。
实现代码
- 存储代码示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90MAX_SCORE = 100
MIN_SCORE = 0
INITIAL_SCORE = 10
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_KEY = 'proxies'
import redis
from random import choice
class RedisClient(object):
def __init__(self, host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD):
"""
初始化
:param host: Redis 地址
:param port: Redis 端口
:param password: Redis密码
"""
self.db = redis.StrictRedis(host=host, port=port, password=password, decode_responses=True)
def add(self, proxy, score=INITIAL_SCORE):
"""
添加代理,设置分数为最高
:param proxy: 代理
:param score: 分数
:return: 添加结果
"""
if not self.db.zscore(REDIS_KEY, proxy):
return self.db.zadd(REDIS_KEY, score, proxy)
def random(self):
"""
随机获取有效代理,首先尝试获取最高分数代理,如果不存在,按照排名获取,否则异常
:return: 随机代理
"""
result = self.db.zrangebyscore(REDIS_KEY, MAX_SCORE, MAX_SCORE)
if len(result):
return choice(result)
else:
result = self.db.zrevrange(REDIS_KEY, 0, 100)
if len(result):
return choice(result)
else:
raise PoolEmptyError
def decrease(self, proxy):
"""
代理值减一分,小于最小值则删除
:param proxy: 代理
:return: 修改后的代理分数
"""
score = self.db.zscore(REDIS_KEY, proxy)
if score and score > MIN_SCORE:
print('代理', proxy, '当前分数', score, '减1')
return self.db.zincrby(REDIS_KEY, proxy, -1)
else:
print('代理', proxy, '当前分数', score, '移除')
return self.db.zrem(REDIS_KEY, proxy)
def exists(self, proxy):
"""
判断是否存在
:param proxy: 代理
:return: 是否存在
"""
return not self.db.zscore(REDIS_KEY, proxy) == None
def max(self, proxy):
"""
将代理设置为MAX_SCORE
:param proxy: 代理
:return: 设置结果
"""
print('代理', proxy, '可用,设置为', MAX_SCORE)
return self.db.zadd(REDIS_KEY, MAX_SCORE, proxy)
def count(self):
"""
获取数量
:return: 数量
"""
return self.db.zcard(REDIS_KEY)
def all(self):
"""
获取全部代理
:return: 全部代理列表
"""
return self.db.zrangebyscore(REDIS_KEY, MIN_SCORE, MAX_SCORE)上述代码中MAX_SCORE、MIN_SCORE、INITIAL_SCORE 分别代表最大分数、最小分数、初始分数。REDIS_HOST、REDIS_PORT、REDIS_PASSWORD 分别代表了 Redis 的连接信息,即地址、端口、密码。REDIS_KEY 是有序集合的键名
获取代理
- 定义一个 Crawler 来从各大网站抓取代理,代理网站类代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71import json
from .utils import get_page
from pyquery import PyQuery as pq
class ProxyMetaclass(type):
def __new__(cls, name, bases, attrs):
count = 0
attrs['__CrawlFunc__'] = []
for k, v in attrs.items():
if 'crawl_' in k:
attrs['__CrawlFunc__'].append(k)
count += 1
attrs['__CrawlFuncCount__'] = count
return type.__new__(cls, name, bases, attrs)
class Crawler(object, metaclass=ProxyMetaclass):
def get_proxies(self, callback):
proxies = []
for proxy in eval("self.{}()".format(callback)):
print('成功获取到代理', proxy)
proxies.append(proxy)
return proxies
def crawl_daili66(self, page_count=4):
"""
获取代理66
:param page_count: 页码
:return: 代理
"""
start_url = 'http://www.66ip.cn/{}.html'
urls = [start_url.format(page) for page in range(1, page_count + 1)]
for url in urls:
print('Crawling', url)
html = get_page(url)
if html:
doc = pq(html)
trs = doc('.containerbox table tr:gt(0)').items()
for tr in trs:
ip = tr.find('td:nth-child(1)').text()
port = tr.find('td:nth-child(2)').text()
yield ':'.join([ip, port])
def crawl_proxy360(self):
"""
获取Proxy360
:return: 代理
"""
start_url = 'http://www.proxy360.cn/Region/China'
print('Crawling', start_url)
html = get_page(start_url)
if html:
doc = pq(html)
lines = doc('div[name="list_proxy_ip"]').items()
for line in lines:
ip = line.find('.tbBottomLine:nth-child(1)').text()
port = line.find('.tbBottomLine:nth-child(2)').text()
yield ':'.join([ip, port])
def crawl_goubanjia(self):
"""
获取Goubanjia
:return: 代理
"""
start_url = 'http://www.goubanjia.com/free/gngn/index.shtml'
html = get_page(start_url)
if html:
doc = pq(html)
tds = doc('td.ip').items()
for td in tds:
td.find('p').remove()
yield td.text().replace(' ', '')上述代码通过:统一定义以 crawl 开头,抓取代理 66、Proxy360、Goubanjia 三个免费代理网站,根据yield 返回一个个代理。首先将网页获取,然后用 PyQuery 解析,解析出 IP 加端口的形式的代理然后返回。 然后定义了一个 get_proxies () 方法,将所有以 crawl 开头的方法调用一遍,获取每个方法返回的代理并组合成列表形式返回,通过此方法,我们可以只需要添加一个以 crawl 开头的方法,例如抓取快代理,在 Crawler 类中增加 crawl_kuaidaili () 方法,仿照其他的几个方法将其定义成生成器,抓取其网站的代理,然后通过 yield 返回代理即可。
- 然后,定义一个 Getter 类,动态地调用所有以 crawl 开头的方法,然后获取抓取到的代理,将其加入到数据库存储起来
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71from db import RedisClient
from crawler import Crawler
POOL_UPPER_THRESHOLD = 10000
# 获取模块中所有的类
crawlers_cls = [
base,
Ip89Crawler,
Daili66Crawler,
Data5UCrawler,
DocipCrawler,
FatezeroCrawler,
GeonodeCrawler,
GoubanjiaCrawler,
IhuanCrawler,
IP3366Crawler,
IPHaiCrawler,
JiangxianliCrawler,
KuaidailiCrawler,
SeoFangFaCrawler,
TaiyangdailiCrawler,
UqidataCrawler,
XiaoShuCrawler,
XicidailiCrawler,
XiladailiCrawler,
YqIeCrawler,
ZhandayeCrawler
]
crawlers_cls = [cls for cls in crawlers_cls
if isinstance(cls, type) and issubclass(cls, BaseCrawler) and cls is not BaseCrawler
and not getattr(cls, 'ignore', False)]
class Getter(object):
"""
getter of src
"""
def __init__(self):
"""
init db and crawlers
"""
self.redis = RedisClient()
self.crawlers_cls = crawlers_cls
self.crawlers = [crawler_cls() for crawler_cls in self.crawlers_cls]
def is_full(self):
"""
if src if full
return: bool
"""
return self.redis.count() >= PROXY_NUMBER_MAX
def run(self):
"""
run crawlers to get proxy
:return:
"""
logger.info('stating getter...')
if self.is_full():
logger.error("crawler is full!")
return
for crawler in self.crawlers:
logger.info(f'crawler {crawler} to get proxy')
for proxy in crawler.crawl():
self.redis.add(proxy)
if not self.crawlers:
logger.error("import crawler error! please check crawler import.")Getter 类就是获取器类,这其中定义了一个变量 POOL_UPPER_THRESHOLD 表示代理池的最大数量,然后定义了 is_over_threshold () 方法判断代理池是否已经达到了容量阈值,它就是调用了 RedisClient 的 count () 方法获取代理的数量,然后加以判断,如果数量达到阈值则返回 True,否则 False。如果不想加这个限制可以将此方法永久返回 True。 接下来定义了 run () 方法,首先判断了代理池是否达到阈值,然后在这里就调用了 Crawler 类的 CrawlFunc 属性,获取到所有以 crawl 开头的方法列表,依次通过 get_proxies () 方法调用,得到各个方法抓取到的代理,然后再利用 RedisClient 的 add () 方法加入数据库.
检测代理
由于代理的数量非常多,为了提高代理的检测效率,我们在这里使用异步请求库 Aiohttp 来进行检测.
- 检测代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49VALID_STATUS_CODES = [200]
TEST_URL = 'http://www.baidu.com'
BATCH_TEST_SIZE = 100
class Tester(object):
def __init__(self):
self.redis = RedisClient()
async def test_single_proxy(self, proxy):
"""
测试单个代理
:param proxy: 单个代理
:return: None
"""
conn = aiohttp.TCPConnector(verify_ssl=False)
async with aiohttp.ClientSession(connector=conn) as session:
try:
if isinstance(proxy, bytes):
proxy = proxy.decode('utf-8')
real_proxy = 'http://' + proxy
print('正在测试', proxy)
async with session.get(TEST_URL, proxy=real_proxy, timeout=15) as response:
if response.status in VALID_STATUS_CODES:
self.redis.max(proxy)
print('代理可用', proxy)
else:
self.redis.decrease(proxy)
print('请求响应码不合法', proxy)
except (ClientError, ClientConnectorError, TimeoutError, AttributeError):
self.redis.decrease(proxy)
print('代理请求失败', proxy)
def run(self):
"""
测试主函数
:return: None
"""
print('测试器开始运行')
try:
proxies = self.redis.all()
loop = asyncio.get_event_loop()
# 批量测试
for i in range(0, len(proxies), BATCH_TEST_SIZE):
test_proxies = proxies[i:i + BATCH_TEST_SIZE]
tasks = [self.test_single_proxy(proxy) for proxy in test_proxies]
loop.run_until_complete(asyncio.wait(tasks))
time.sleep(5)
except Exception as e:
print('测试器发生错误', e.args)上述代码:异步test_single_proxy () 方法,用来检测单个代理的可用情况,其参数就是被检测的代理;批量测试的最大值 BATCH_TEST_SIZE 为 100,避免当代理池过大时全部测试导致内存开销过大的问题, run () 方法里面获取了所有的代理列表,使用 Aiohttp 分配任务,启动运行
接口封装
- 以 Web API 的形式暴露可用代理,使用一个比较轻量级的库 Flask 来实现这个接口模块:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35from flask import Flask, g
from db import RedisClient
__all__ = ['app']
app = Flask(__name__)
def get_conn():
if not hasattr(g, 'redis'):
g.redis = RedisClient()
return g.redis
def index():
return '<h2>Welcome to Proxy Pool System</h2>'
def get_proxy():
"""
获取随机可用代理
:return: 随机代理
"""
conn = get_conn()
return conn.random()
def get_counts():
"""
获取代理池总量
:return: 代理池总量
"""
conn = get_conn()
return str(conn.count())
if __name__ == '__main__':
app.run()
资源调度模块
- 调用以上所定义的三个模块,将以上三个模块通过多进程的形式运行,高效利用资源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51TESTER_CYCLE = 20
GETTER_CYCLE = 20
TESTER_ENABLED = True
GETTER_ENABLED = True
API_ENABLED = True
from multiprocessing import Process
from api import app
from getter import Getter
from tester import Tester
class Scheduler():
def schedule_tester(self, cycle=TESTER_CYCLE):
"""
定时测试代理
"""
tester = Tester()
while True:
print('测试器开始运行')
tester.run()
time.sleep(cycle)
def schedule_getter(self, cycle=GETTER_CYCLE):
"""
定时获取代理
"""
getter = Getter()
while True:
print('开始抓取代理')
getter.run()
time.sleep(cycle)
def schedule_api(self):
"""
开启API
"""
app.run(API_HOST, API_PORT)
def run(self):
print('代理池开始运行')
if TESTER_ENABLED:
tester_process = Process(target=self.schedule_tester)
tester_process.start()
if GETTER_ENABLED:
getter_process = Process(target=self.schedule_getter)
getter_process.start()
if API_ENABLED:
api_process = Process(target=self.schedule_api)
api_process.start() - 以下版本兼容win平台打包exe:
1 | #!/usr/bin/env python |
三个常量,TESTER_ENABLED、GETTER_ENABLED、API_ENABLED 都是布尔类型,True 或者 False。标明了测试模块、获取模块、接口模块的开关,如果为 True,则代表模块开启。 启动入口是 run () 方法,其分别判断了三个模块的开关,如果开启的话,就新建一个 Process 进程,设置好启动目标,然后调用 start () 方法运行.
运行与测试
运行代码
- 采用如下形式
1 | import argparse |
编写测试代码
- request请求测试
1 |
|
代理使用
- 具体用法不一,举例如下:
1 | import requests |
- 抑或
1 | proxy = { |
补充
random 获取代理机制
1 |
|
base crawel
- 基础定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42class BaseCrawler(object):
urls = []
def fetch(self, url, **kwargs):
try:
headers = Headers(headers=True).generate()
kwargs.setdefault('timeout', GET_TIMEOUT)
kwargs.setdefault('verify', False)
kwargs.setdefault('headers', headers)
response = requests.get(url, **kwargs)
if response.status_code == 200:
response.encoding = 'utf-8'
return response.text
except (requests.ConnectionError, requests.ReadTimeout):
return
def process(self, html, url):
"""
used for parse html
"""
for proxy in self.parse(html):
# logger.info(f'fetched proxy {proxy.string()} from {url}')
yield proxy
def crawl(self):
"""
crawl main method
"""
try:
for url in self.urls:
logger.info(f'fetching {url}')
html = self.fetch(url)
if not html:
continue
time.sleep(.5)
yield from self.process(html, url)
except RetryError:
logger.error(
f'crawler {self} crawled proxy unsuccessfully, '
'please check if target url is valid or network issue') - 扩展应用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from pyquery import PyQuery as pq
from schemas.proxy import Proxy
from crawlers.base import BaseCrawler
from loguru import logger
import re
BASE_URL = 'https://www.zdaye.com/dayProxy/{page}.html'
MAX_PAGE = 5 * 2
class ZhandayeCrawler(BaseCrawler):
"""
zhandaye crawler, https://www.zdaye.com/dayProxy/
"""
urls_catalog = [BASE_URL.format(page=page) for page in range(1, MAX_PAGE)]
headers = {
'User-Agent': 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_4) AppleWebKit/537.36 (KHTML, '
'like Gecko) Chrome/83.0.4103.61 Safari/537.36 '
}
urls = []
ignore = True
def crawl(self):
self.crawl_catalog()
yield from super().crawl()
def crawl_catalog(self):
for url in self.urls_catalog:
logger.info(f'fetching {url}')
html = self.fetch(url, headers=self.headers)
self.parse_catalog(html)
def parse_catalog(self, html):
"""
parse html file to get proxies
:return:
"""
doc = pq(html)
for item in doc('#J_posts_list .thread_item div div p a').items():
url = 'https://www.zdaye.com' + item.attr('href')
logger.info(f'get detail url: {url}')
self.urls.append(url)
def parse(self, html):
doc = pq(html)
trs = doc('.cont br').items()
for tr in trs:
line = tr[0].tail
match = re.search(r'(\d+\.\d+\.\d+\.\d+):(\d+)', line)
if match:
host = match.group(1)
port = match.group(2)
yield Proxy(host=host, port=port)
if __name__ == '__main__':
crawler = ZhandayeCrawler()
for proxy in crawler.crawl():
print(proxy)