摘要
上一篇内容我介绍了wordpress工作台的常用功能以及网站样式的构建,但网站不能空有架构,还需要有内容填充,在实际生产过程中,不可能在wordpress后台一篇一篇的添加文章,然后点击发布(这样效率也太低了,当然个人博客除外)。对于有大批量更新文章需求的站点,我们需要一个内容采集发布器来帮助我们自动完成这些操作。
市面上以 火车头、八爪鱼采集器为代表的一大批自动采集软件 会在我们使用的某一个关键节点要充值才能使用。比如我之前用过的火车头采集器,在使用css采集时 按class名字搜索内容要收费、发布时也要收费。所以我就考虑自己写一个采集器。
对于有需要的人,看完这篇文章,应该可以节省几百块的采集器会员费用。
废话太多了,下面直接开始...
操作步骤
先了解一下wordpress数据库各表的作用和字段blog.51cto.com/u_15967457/…
主要思路比较简单,就是从互联网上爬取内容,然后存到数据库中,就可以实现自动发布了。
实现步骤
1:定义采集源集合
category_list = [
# {'category_name': '头条资讯', 'category_flag': 'toutiao', 'url': 'https://www.xxx.cc/latest-2/page/'},
{'category_name': '亚马逊Amazon', 'category_flag': 'amazon', 'url': 'https://www.xxx.cc/category/amazon/page/'},
{'category_name': '虾皮Shopee', 'category_flag': 'shopee', 'url': 'https://www.xxx.cc/category/shopee/page/'},
{'category_name': '希音SheIn', 'category_flag': 'shein', 'url': 'https://www.xxx.cc/category/shein/page/'},
{'category_name': '速卖通', 'category_flag': 'aliexpress', 'url': 'https://www.xxx.cc/category/aliexpress/page/'},
{'category_name': 'Wish', 'category_flag': 'wish', 'url': 'https://www.xxx.cc/category/wish/page/'},
{'category_name': 'shopify', 'category_flag': 'shopify', 'url': 'https://www.xxx.cc/category/shopify/page/'},
{'category_name': 'tiktok', 'category_flag': 'tiktok', 'url': 'https://www.xxx.cc/category/tiktok/page/'},
{'category_name': '跨境百科', 'category_flag': 'kuajingbaike', 'url': 'https://www.xxx.cc/category/kuajingbaike/page/'},
{'category_name': '卖家必备工具', 'category_flag': 'tools', 'url': 'https://www.xxx.cc/category/tools/page/'},
]
要采集互联网上的内容,得先有个站点供我们采集,可以在网络上找到心仪的站点 打包成一个集合。
注意这个集合中的 category_name是跟我分类目录的名字一样,category_flag对应的值是分类目录的别名,通过上面的这个集合,规定某个url对应的资源被发布到相应的分类下(可以通过下面的sql语句查看有几个分类)
select wt.* from wp_terms wt LEFT JOIN wp_term_taxonomy wtt on wt.term_id=wtt.term_id where wtt.taxonomy='category'
2:根据资源站url找到分类列表url集合
我直接把代码贴出来,然后做讲解
2.1 先引入相关库,定义一些常量
import re
import time # 用于-线程休眠
import urllib3
import requests
from bs4 import BeautifulSoup
from wordpress_xmlrpc import Client, WordPressPost
from wordpress_xmlrpc.methods import media, posts
from utils.common_util import update_remote_url_name
from utils.http_util import get_header
from utils.redis_util import mredis
from datetime import datetime
import pymysql
import traceback
# WordPress站点的URL和登录凭据
wordpress_url = 'http://xxx.xxx.com/xmlrpc.php'
wordpress_username = 'admin'
wordpress_password = '123456'
client = Client(wordpress_url, wordpress_username, wordpress_password)
conn = pymysql.connect(host='xxx.mysql.rds.aliyuncs.com', user='root', password='wp@xxxx', database='wordpress', charset='utf8')
cur = conn.cursor()
2.2 获取本次爬取的页数范围,主要是结尾页,防止重复爬取 和 资源浪费,下面方法的入参就是上面
category_list集合中的其中一个。
因为一个采集器会多次执行采集任务,不可能每次执行任务时都把一个站点的所有文章采集下来,没必要也耗资源。为了解决这个问题,我的思路是这样的:
由于每次最新更新的文章都会出现在前几页,以前发布的文章会按序往后面的页数推移。比如,第一次我们爬取了250页,过了几天这个网站有新文章发布,总页数增加到了253页,那么我们再次爬取的时候,其实就只用爬取 第 1到第3页(253-250)的内容。
按照上面的思路,我们需要记录 上次爬取的总页数,上次最大页数包含的文章数。我们把这些数值存在redis中供代码调用。
def get_page_range(category_item):
# 本次爬取到的最大页数
max_page_number = 1
# 本次最大页数包含的文章数
max_page_article_count = 0
# 上一次爬取到的最大页数
old_max_page_number = mredis.hash_get("pythonspider_kuajing", category_item['category_flag'] + ':max_page_number')
# 上一次最大页数包含的文章数
old_max_page_article_count = mredis.hash_get("pythonspider_kuajing", category_item['category_flag'] + ':max_page_article_count')
if old_max_page_number == None: old_max_page_number = 0
if old_max_page_article_count == None: old_max_page_article_count = 0
old_max_page_number = int(old_max_page_number)
old_max_page_article_count = int(old_max_page_article_count)
# 如果上一次的最大页数不为空,那么检测所有页数时,先从上一次最大页数之后开始检测(此处只为获取一个数值,并非去获取内容),防止资源浪费
if old_max_page_number >= 1:
max_page_number = old_max_page_number
continue_scrolling = True
while continue_scrolling:
try:
# 根据HTML元素的选择器提取内容
urllib3.disable_warnings()
response = requests.get(url=(category_item['url'] + str(max_page_number)), headers=get_header(),
verify=False)
# 解析HTML内容
soup = BeautifulSoup(response.text, "html.parser")
# 替换为你想要提取内容的元素选择器
source_list = []
if len(soup.select(".posts-loop")) != 0:
source_list = soup.select(".posts-loop")[0].find_all('div', {'id': lambda x: x and 'post-' in x})
if len(source_list) == 0:
# 逻辑是先进else的,在else的末尾 max_page_number+1了,所以这里要-1
max_page_number = max_page_number - 1
# 如果本次跟上次一样 那么就不存,没有最新的文章更新,那么不更新redis
# 如果本次跟上次一样 有文章页面或文章数更新,那么同步更新redis
if max_page_number == old_max_page_number and max_page_article_count == old_max_page_article_count:
raise Exception(
str(category_item['category_name']) + "——获取文章资源列表,上次已完全获取,没有最新的更新文章")
else:
mredis.hash_add("pythonspider_kuajing", category_item['category_flag'] + ':max_page_number', max_page_number)
mredis.hash_add("pythonspider_kuajing", category_item['category_flag'] + ':max_page_article_count', max_page_article_count)
raise Exception(str(category_item['category_name']) + "——获取文章资源列表,找不到更多文章了")
else:
max_page_article_count = len(source_list)
print(str(category_item['category_name']) + "——获取文章资源列表,第" + str(max_page_number) + "页")
max_page_number = max_page_number + 1
except Exception as e:
continue_scrolling = False
print(e)
# 如果页数相同:文章数相同,那么就不继续爬了
# 如果页数相同,本次文章数多,那么说明更新的文章数没有一整页,那就爬第1页就好了
# 如果页数不同,本次页数多,那么说明更新的文章多了(本次的最大页数-上次最大页数=x)这么多页数,
# 由于最早更新的文章是排列在前面的,所以只用爬取第1页到第x页就好了
if max_page_number == old_max_page_number and max_page_article_count == old_max_page_article_count:
raise Exception(str(category_item['category_name']) + "——获取文章资源列表,找不到更多文章了")
elif max_page_number == old_max_page_number and max_page_article_count > old_max_page_article_count:
return 1
elif max_page_number > old_max_page_number:
return max_page_number - old_max_page_number
在上面的代码中,我们得到了本次要爬取的页面范围。通过遍历范围,拼接参数的方式,如:url+"/page?1"的方式可以获取到第1页的内容,通过url+"/page?2"的方式可以获取到第2页的内容。这个规则是根据站点的页面参数分析得到的,不同站点的规则可能不同,但原理应该相近。
3:根据分类列表url找到文章url集合
下面这个方法的入参就是 url+"/page?1" 拼成的具体分类列表页的目录
方法的内容就是通过css选择器、id选择器提取 文章标题,缩略图的url,和文章url。将这些信息打包返回,为下一步提取文章内容做准备
def get_category_list(category_list_url):
article_list = []
urllib3.disable_warnings()
response = requests.get(url=category_list_url, headers=get_header(),
verify=False)
# 解析HTML内容
soup = BeautifulSoup(response.text, "html.parser")
# 替换为你想要提取内容的元素选择器
source_list = soup.select(".posts-loop")[0].find_all('div', {'id': lambda x: x and 'post-' in x})
if source_list:
for source in source_list:
# 文章id
source_post_id = str(source.get('id'))
select_terms_sql = "select count(id) from wp_source_info where source_post_id=%s"
select_terms_param = (source_post_id)
cur.execute(select_terms_sql, select_terms_param)
result = cur.fetchone()
if len(result) > 0 and result[0] > 0:
continue
# url链接
url = '' if len(source.select(".entry-title")) == 0 else source.select(".entry-title")[0].find('a').get('href').strip()
# 文章标题
title = '' if len(source.select(".entry-title")) == 0 else source.select(".entry-title")[0].find('a').text.strip()
# 列表页简介
introduction = '' if len(source.select(".entry-summary")) == 0 else source.select(".entry-summary")[0].text.strip()
# 列表页略缩图
img = '' if len(source.select(".thumbnail-wrap")) == 0 else source.select(".thumbnail-wrap")[0].find('img').get('src')
article = {
'source_post_id': source_post_id,
'title': title,
'introduction': introduction,
'url': url,
'image_url': img,
}
article_list.append(article)
else:
print("未找到匹配的元素")
return article_list
4,根据文章url集合爬取文章并发布
下面的方法是整篇内容的重中之重,但实操起来很简单,因为python有wordpress相关的类库来帮我们执行 文章发布,缩略图插入的操作。有几个点要注意下
1:在爬取文章内容的时候,通过request.get()方法请求目标网址,其中verify要设置为false,这样网站就不会进行安全校验了,同时headers要设置随机,主要是headers中的userAgent要随机,才能尽量模拟真实用户请求。
2:在requests.get之前要调用urllib3.disable_warnings() 方法避免http请求报错
3:推送缩略图的时候调用 client.call(media.UploadFile(image_data))方法,这个方法会把图片上传到wordpress的媒体库中,并返回缩略图的的id(这里要说明一下为何需要上传到媒体库中,我们从网页爬取的图片如果按照源链接存到我们自己的数据库中并作为内容出现在文章页面中时,那么每当有用户访问该文章,就会有一部分流量被用去加载 图片源链接的资源,如果流量很多,那么对源站点来说会有很高的负载,同时也会引起源站点的反爬机制,于己于人都不好。所以我们要把源链接替换成自己的链接存到文章内容中)
4:推送文章的时候调用client.call(posts.NewPost(post))方法
def get_push_articles(category_name, article_list):
article_num = 1
url_pattern = r"https?://\S+?\.(?:jpg|jpeg|png|gif|bmp)"
for article_item in article_list:
urllib3.disable_warnings()
article_response = requests.get(url=article_item.get('url'), headers=get_header(), verify=False)
# 解析HTML内容
article_soup = BeautifulSoup(article_response.text, "html.parser")
article_div = article_soup.find_all('article', {'id': lambda x: x and article_item.get('source_post_id') in x})[0]
post_time_str = article_div.select(".entry-date")[0].contents[0].strip()
# 将原始时间字符串解析为 datetime 对象
original_datetime = datetime.strptime(post_time_str, '%Y年 %m月 %d日 %H:%M')
# 将 datetime 对象格式化为目标时间字符串
article_item['post_time'] = original_datetime.strftime('%Y-%m-%d %H:%M:%S')
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章《' + str(article_item.get('title')) + '》,采集开始')
article_item['content'] = str(article_div.select(".entry-content")[0])
article_item['image_id'] = 0
# 使用正则表达式查找所有匹配的URL
urls = re.findall(url_pattern, article_item['content'])
if len(urls) == 0 and article_item.get('image_url') != '':
image_response = requests.get(url=article_item.get('image_url'), headers=get_header(), verify=False)
image_data = {
'name': update_remote_url_name(article_item.get('image_url')),
'type': 'image/jpeg',
'bits': image_response.content
}
upload_response = {}
try:
upload_response = client.call(media.UploadFile(image_data))
except Exception as e:
print("图片上传异常:" + e)
traceback.print_exc()
article_item['image_id'] = upload_response['id']
article_item['image_new_url'] = upload_response['url']
# 用新的URL替换匹配的URL
for index, image_url in enumerate(urls):
image_response = requests.get(url=image_url, headers=get_header(), verify=False)
image_data = {
'name': update_remote_url_name(image_url),
'type': 'image/jpeg',
'bits': image_response.content
}
upload_response = client.call(media.UploadFile(image_data))
image_new_url = upload_response['url']
image_id = upload_response['id']
# 文章内的第一张图片为列表缩略图
if index == 0:
article_item['image_url'] = image_new_url
article_item['image_id'] = image_id
article_item['content'] = article_item['content'].replace(image_url, image_new_url)
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章,第' + str(index + 1) + '张图片替换外链结束')
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章,采集结束')
post = WordPressPost()
post.title = article_item.get('title')
post.content = article_item.get('content')
post.excerpt = article_item.get('introduction')
post.post_status = 'publish' # 文章状态,不写默认是草稿,private表示私密的,draft表示草稿,publish表示发布
post.terms_names = {
# 'post_tag': ['技术'], # 文章所属标签,没有则自动创建
'category': [category_name] # 文章所属分类,没有则自动创建
}
if article_item['image_id'] != 0:
post.thumbnail = article_item['image_id'] # 缩略图的id
else:
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章,内容没有外链图片')
post.id = client.call(posts.NewPost(post))
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章新增成功,文章id:' + post.id)
if int(post.id) > 0:
insert_postinfo_params = []
insert_postinfo_params.append(
(int(post.id), str(article_item['source_post_id']), article_item['post_time']))
insert_postinfo_sql = "INSERT INTO wp_source_info(`local_post_id`,`source_post_id`,`source_post_time`,`create_time`) VALUES (%s, %s, %s, now());"
cur.executemany(insert_postinfo_sql, insert_postinfo_params)
conn.commit()
print(str(category_name) + '——第' + str(article_num) + '/' + str(len(article_list)) + '篇文章,源文章信息记录完毕,发布完毕')
print('————————————————————————————————')
article_num = article_num + 1
time.sleep(1)
代码写到这里,主要的逻辑其实已经讲完了。下面写个main方法把上面的代码整合一下
5,整合
def main():
for category_item in category_list:
end_page = 0
all_article_list = []
try:
# 计算本次获取该类别的最大页数
end_page = get_page_range(category_item)
except Exception as e:
print("获取页面范围异常:" + str(e))
traceback.print_exc()
try:
for page_number in range(1, (end_page + 1)):
page_url = category_item.get('url') + str(page_number)
article_list = get_category_list(page_url)
all_article_list.extend(article_list)
print(str(category_item.get('category_name')) + "——第" + str(page_number) + "页获取资源列表结束,获取文章" + str(len(article_list)) + "篇")
except Exception as e:
print("获取页面链接异常:" + e)
traceback.print_exc()
try:
get_push_articles(category_item.get('category_name'), all_article_list)
except Exception as e:
print("获取页面内容并发布异常:" + str(e))
traceback.print_exc()
if __name__ == "__main__":
main()
写在后面
运行上面的程序基本上已经能够把一个网站的内容给爬取下来并发布了,不同网站的采集规则不同,比如这个网站通过class采集内容,另外的网站通过id采集内容,这都是很好分析的。
运行上面的程序,等运行完毕之后就可以看到效果了,然而这一篇文章还是基于本地wordpress运行。那么怎么放到服务器上去运行呢,比如我的服务器是阿里云的centos,其实迁移上去还是蛮简单的,下一篇文章会主要讲这个