首页 > 科技 > Python 爬取分析全国 12 个城市 4 万条房价信息,告诉你买不起

Python 爬取分析全国 12 个城市 4 万条房价信息,告诉你买不起

通过分页、线程池、代理池等技术,快速爬取链家网近4万条在售二手房信息,速度可达 10000 条 / 5 分钟。

通过对二手房作数据分析,得到北上广深等(新)一线城市四地房价的纵向比较,同时对各个城市各个区的房价做横向对比,并将对比结果可视化出来。

主要用到的库或模块包括

  • Requests
  • PyQuery
  • ThreadPoolExecutor
  • JSON
  • Matplotlib
  • PyEcharts

环境:

  • Widnows10
  • Python3.5
  • Pycharm2018

数据抓取

爬虫架构设计

通过分析链家网的 URL ,不难发现,每一个城市的链家网的基本格式是:

城市名简拼 + ”.lianjia.com“

所以整个爬虫最外层应该是遍历一个保存城市简拼的列表,拼接得到一个个起始 URL,根据这些 URL 爬取对应城市的链家网。

针对每一个城市的链家网而言,首先得到该城市在售二手房的总套数,由于每一页显示的套数是 30,由总套数整除以30再加上1可以得到总页数,但是由于最大可浏览页数为 100,所以我们这里得加个判断,如果总页数大于 100 的话,令总页数等于 100。

分析具体城市的链家网每一页的 URL, 以北京为例,我们可以发现第 N 页的 URL 是:

bj.lianjia.com/ershoufang/pg{N},由此我们可以通过以下代码来得到每一页的 URL:

for i in range(total_page):
page_url = "bj.lianjia.com/ershoufang/pg{}".format(i+1)

本来得到每一页的 URL 后,我们可以得到该页上 30 套房的房价信息和详情页 URL,但是页面上没有房子所在区的信息。

我们只能再向下请求访问详情页 URL,从而提取出我们想要的所有数据。

综上所述,我们可以将整个框架从上往下分为四层,如下图所示:

基于上述思路,在写代码的时候,可以分层从上往下实现,方便调试。

第一层 & 第二层:获取总套数

根据城市简拼得到起始 URL,并得到总套数,为分页做准备。

def get_list_page_url(city):

start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能访问到前一百页
if total_page > 100:
total_page = 100

page_url_list = list()

for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list

except:
print("获取总套数出错,请确认起始URL是否正确")
return None

第三层:根据起始 URL 得到分页 URL

def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}

try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
print(i)
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("获取列表页" + page_url + "出错")

第四层

本层做的是具体解析,解析使用的是 PyQuery 库,支持 CSS 选择器且比 Beautiful Soup 方便。仅仅需要下面几行代码就帮助我们获得了目标数据:

response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#detail_url 是得到的详情页 URL
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url

多线程爬取

p = ThreadPoolExecutor(30)
for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
p.shutdown()

IP 代理池

下载后新开一个 Pycharm 视窗运行该项目,然后我们可以用下面的方式来获取可用的代理 IP:

def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("请先运行代理池")

然后通过参数设置使用代理 IP:

proxies = {
"http": "http://" + get_valid_ip(),
}
response = requests.get(url=detail_url, headers=headers, proxies=proxies)

数据保存

采用 JSON文件形式保存数据,每个城市保存一个 JSON 文件,文件名为该城市简拼。

def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))

稍等一会儿,所有数据就保存在本地了:

数据分析

数据整合

在这里做一些求同地区房价最大值、最小值、平均值,以及数据格式统一化的工作:

def split_data():
global region_data
region_data = dict()
for region in dic_data.keys():
# 最大值、最小值、平均值
region_data[region] = {"max":dic_data[region][0],"min":dic_data[region][0],"average":0}
for per_price in dic_data[region]:
if per_price > region_data[region]["max"]:
region_data[region]["max"] = per_price
if per_price < region_data[region]["min"]:
region_data[region]["min"] = per_price
region_data[region]["average"] += per_price
region_data[region]["average"] /= len(dic_data[region])
# 保留两位小数
region_data[region]["average"] = round(region_data[region]["average"],2)

数据可视化

将分析结果通过 Matplotlib 直观的体现出来,该部分的代码如下:

def data_viewer():
label_list = region_data.keys() # 横坐标刻度显示值
max = []
min = []
average = []
for label in label_list:
max.append(region_data[label].get("max"))
min.append(region_data[label].get("min"))
average.append(region_data[label].get("average"))
x = range(len(max))
"""
绘制条形图
left: 长条形中点横坐标
height: 长条形高度
width: 长条形宽度,默认值0
.8
label: 为后面设置legend准备
"""
rects1 = plt.bar(x=x, height=max, width=0.25, alpha=0.8, color='red', label="最大值")
rects2 = plt.bar(x=[i + 0.25 for i in x], height=average, width=0.25, color='green', label="平均值")
rects3 = plt.bar(x=[i + 0.5 for i in x], height=min, width=0.25, color='blue', label="最小值")
#plt.ylim(0, 50) # y轴取值范围
plt.ylabel("房价/元")
"""
设置x轴刻度显示值
参数一:中点坐标
参数二:显示值
"""
plt.xticks([index + 0.2 for index in x], label_list)
plt.xlabel("地区")
plt.legend()
for rect in rects1:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height+1, str(height), ha="center", va="bottom")
for rect in rects2:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
for rect in rects3:
height = rect.get_height()
plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom")
plt.show()

结果如下:

限于篇幅,其他城市的图就不放了。

再来看全国主要一线城市二手房房价有序条形图:

可以看出,北京、上海、深圳的房价大致在同一水平线,而厦门位于第四,广州在第六,最后看一下房价地域图:

最终代码

import requests

from concurrent.futures import ThreadPoolExecutor

from pyquery import PyQuery as pq

import json

import threading


import time


def get_list_page_url(city):

start_url = "https://{}.lianjia.com/ershoufang".format(city)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
}
try:
response = requests.get(start_url, headers=headers)
# print(response.status_code, response.text)
doc = pq(response.text)
total_num = int(doc(".resultDes .total span").text())
total_page = total_num // 30 + 1
# 只能访问到前一百页
if total_page > 100:
total_page = 100

page_url_list = list()

for i in range(total_page):
url = start_url + "/pg" + str(i + 1) + "/"
page_url_list.append(url)
#print(url)
return page_url_list

except:
print("获取总套数出错,请确认起始URL是否正确")
return None


detail_list = list()

# 需要先在本地开启代理池
# 代理池仓库: https://github.com/Python3WebSpider/ProxyPool
def get_valid_ip():
url = "http://localhost:5000/get"
try:
ip = requests.get(url).text
return ip
except:
print("请先运行代理池")


def get_detail_page_url(page_url):
global detail_list
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}

try:
response = requests.get(page_url,headers=headers,timeout=3)
doc = pq(response.text)
# broswer.get(page_url)
# print(page_url)
# doc = pq(broswer.page_source)
i = 0
detail_urls = list()
for item in doc(".sellListContent li").items():
i += 1
if i == 31:
break
child_item = item(".noresultRecommend")
if child_item == None:
i -= 1
detail_url = child_item.attr("href")
detail_urls.append(detail_url)
return detail_urls
except:
print("获取列表页" + page_url + "出错")

lock = threading.Lock()

def detail_page_parser(res):
global detail_list
detail_urls = res.result()
if not detail_urls:
print("detail url 为空")
return None
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36',
'Referer': 'https://bj.lianjia.com/ershoufang'
}
for detail_url in detail_urls:
try:
response = requests.get(url=detail_url, headers=headers,timeout=3)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url

detail_list.append(detail_dict)

print(unit_price, title, area)

except:
print("获取详情页出错,换ip重试")
proxies = {
"http": "http://" + get_valid_ip(),
}
try:
response = requests.get(url=detail_url, headers=headers, proxies=proxies)
#print(response.status_code)
detail_dict = dict()
doc = pq(response.text)
unit_price = doc(".unitPriceValue").text()
unit_price = unit_price[0:unit_price.index("元")]
title = doc("h1").text()
area = doc(".areaName .info a").eq(0).text().strip()
url = detail_url
# 已下架的还会爬取,但是没有价格
if len(unit_price)>0:
detail_dict["title"] = title
detail_dict["area"] = area
detail_dict["price"] = unit_price
detail_dict["url"] = url

detail_list.append(detail_dict)

print(unit_price, title, area)
except:
print("重试失败...")


def save_data(data,filename):
with open(filename+".json", 'w', encoding="utf-8") as f:
f.write(json.dumps(data, indent=2, ensure_ascii=False))

def main():
# cq,cs,nj,dl,wh,cc
city_list = ['nj']
for city in city_list:
page_url_list = get_list_page_url(city)

# pool = threadpool.ThreadPool(20)
# requests = threadpool.makeRequests(page_and_detail_parser, page_url_list)
# [pool.putRequest(req) for req in requests]
# pool.wait()

p = ThreadPoolExecutor(30)

for page_url in page_url_list:
p.submit(get_detail_page_url, page_url).add_done_callback(detail_page_parser)
# 这里的回调函数拿到的是一个对象。
# 先把返回的res得到一个结果。即在前面加上一个res.result(),这个结果就是get_detail_page_url的返回
p.shutdown()

print(detail_list)

save_data(detail_list, city)

detail_list.clear()

if __name__ == '__main__':
old = time.time()
main()
new = time.time()
delta_time = new - old
print("程序共运行{}s".format(delta_time))

本文来自投稿,不代表本人立场,如若转载,请注明出处:http://www.souzhinan.com/kj/290209.html