Pyhton酷我音乐爬取MP3文件案例
一、数据爬取思路分析
找到数据来源,通过浏览器抓包工具
通过网络标签和关键字搜索
查看网页源代码,找到数据位置
确定实现思路和技术实现方式
二、代码实现
发送请求
获取数据
解析数据
保存数据
使用会员登录可下载所有列表歌曲,非会员只能下载免费的MP3歌曲
支持分页下载,下载所有分页列表歌曲
关注微信公众号【站在前沿】,回复kuwo,获取完整代码下载地址
kuwo.py
def downMp3s(musicIds, musicNames):
for musicId, musicName in zip(musicIds, musicNames):
url = f'http://www.kuwo.cn/api/v1/www/music/playUrl?mid={musicId}&type=music&httpsStatus=1&reqId=7e2a25c3-51e3-11ee-b0bd-49dd3fc18b18&plat=web_www&from='
# logging.debug(until.getResJson(url=url, headers=headers))
code = until.getResJson(url=url, headers=headers)['code']
if code == 200:
downUrl = until.getResJson(url=url, headers=headers)['data']['url']
# logging.debug(downUrl)
until.downBinFile(downUrl, '', 'mp3', musicName + '.mp3')
logging.info('歌曲:' + musicName + '.mp3 下载完成')
elif code == -1:
logging.info('歌曲:' + musicName + '为付费内容,无法下载')
else:
logging.info('歌曲:' + musicName + '下载时返回Code为' + code)
if __name__ == '__main__':
urlMp3List = 'http://www.kuwo.cn/api/www/bang/bang/musicList'
params = {
"bangId": "93",
"pn": "1",
"rn": "20",
"httpsStatus": "1",
"reqId": "787a5650-51e8-11ee-adef-53b278c91f8b",
"plat": "web_www",
"from": ""
}
for pn in range(1, 16):
params['pn'] = str(pn)
mp3List = until.getResJson3(urlMp3List, headers, params)
# pprint.pprint(mp3List)
musicIds = []
musicNames = []
for i in range(0, 20):
musicIds.append(mp3List['data']['musicList'][i]['rid'])
musicNames.append(mp3List['data']['musicList'][i]['name'])
# logging.debug(musicIds)
# logging.debug(musicNames)
# 开始下载当前页的歌曲列表
downMp3s(musicIds, musicNames)
logging.info('===============第' + str(pn) + '页下载完成=================')
until.py
import logging
import requests
import re
import datetime
from tqdm import tqdm# 配置日志输出的格式和级别
logging.basicConfig(
level=logging.INFO, # 设置日志级别为DEBUG
format="%(asctime)s - %(levelname)s - %(message)s" # 指定日志输出的格式,包含时间、级别和消息
)
# -----------------网页内容请求-------------------
# ------------get方式
def getResText(url, headers):
with requests.get(url=url, headers=headers) as res:
res.encoding = "utf-8"
resText = res.text
return resText
def getResJson(url, headers):
with requests.get(url=url, headers=headers) as res:
res.encoding = "utf-8"
json = res.json()
return json
def getResJson3(url, headers, params):
with requests.get(url=url, headers=headers, params=params) as res:
res.encoding = "utf-8"
json = res.json()
return json
# ------------post方式
def postResText(url, headers, data):
with requests.post(url=url, headers=headers, data=data) as res:
res.encoding = "utf-8"
resText = res.text
return resText
def postResJson(url, headers, data):
with requests.post(url=url, headers=headers, data=data) as res:
res.encoding = "utf-8"
json = res.json()
return json
# -----------------下载文件-------------------
# -----------------------------------------------------get方式
# 下载视频、图片、文件等
def downBinFile(url, headers, path, fileName):
startTime = datetime.datetime.now()
if headers == '':
with requests.get(url=url, stream=True) as res:
content = res.content
else:
with requests.get(url=url, headers=headers, stream=True) as res:
content = res.content
saveBinFile(content, path, fileName)
logging.info(
'文件[' + fileName + ']下载完成,总共耗时{}毫秒'.format((datetime.datetime.now() - startTime).total_seconds() * 1000))
# 下载文件显示进度条
def downBinFileShowBar(url, headers, path, fileName):
startTime = datetime.datetime.now()
if headers == '':
with requests.get(url=url, stream=True) as file:
saveBinFileShowBar(file, path, fileName)
else:
with requests.get(url=url, headers=headers, stream=True) as file:
saveBinFileShowBar(file, path, fileName)
logging.info(
'文件[' + fileName + ']下载完成,总共耗时{}毫秒'.format((datetime.datetime.now() - startTime).total_seconds() * 1000))
# ----------------------------------------------------post方式
# 下载视频、图片、文件等
def postDownBinFile(url, headers, path, fileName):
startTime = datetime.datetime.now()
if headers == '':
with requests.post(url=url, stream=True) as res:
content = res.content
else:
with requests.post(url=url, headers=headers, stream=True) as res:
content = res.content
saveBinFile(content, path, fileName)
logging.info(
'文件[' + fileName + ']下载完成,总共耗时{}毫秒'.format((datetime.datetime.now() - startTime).total_seconds() * 1000))
# 下载文件显示进度条
def postDownBinFileShowBar(url, headers, path, fileName):
startTime = datetime.datetime.now()
if headers == '':
with requests.post(url=url, stream=True) as file:
saveBinFileShowBar(file, path, fileName)
else:
with requests.post(url=url, headers=headers, stream=True) as file:
saveBinFileShowBar(file, path, fileName)
logging.info(
'文件[' + fileName + ']下载完成,总共耗时{}毫秒'.format((datetime.datetime.now() - startTime).total_seconds() * 1000))
# -----------------------------------------------------保存文件
# 通过数据流保存视频、图片、文件等
def saveBinFile(content, path, fileName):
file = re.sub(r"[<>:\"/\\|?*]+", "", fileName)
file = file.replace(' ', '')
with open(path + '\\' + file, 'wb') as f:
f.write(content)
# 通过请求对象保存视频、图片、文件等,同时显示进度条
def saveBinFileShowBar(file, path, fileName):
fileSize = int(file.headers.get('Content-Length'))
progressBar = tqdm(total=fileSize)
fileName = re.sub(r"[<>:\"/\\|?*]+", "", fileName)
fileName = fileName.replace(' ', '')
with open(path + '\\' + fileName, 'wb') as f:
for chunk in file.iter_content(1024 * 1024 * 2):
f.write(chunk)
progressBar.set_description(f'正在下载[' + fileName + ']中...')
progressBar.update(1024 * 1024 * 2)
progressBar.set_description('文件[' + fileName + ']下载完成')
progressBar.close()def checkFileName(fileName):
return re.sub(r"[<>:\"/\\|?*]+", "", fileName).replace(' ', '')
关注微信公众号【站在前沿】,回复kuwo,获取完整代码下载地址