【Airflow】构建爬虫任务系统

news/2024/7/19 8:41:32 标签: python, 爬虫, airflow

爬虫脚本太多了需要进行管理一下,领导决定使用airflow
我了解了一下这个平台是用来做任务调度。
是一个ETL工具
ETL是将业务系统的数据经过抽取、清洗转换之后加载到数据仓库的过程
在这里插入图片描述
这里是一个github的地址

https://github.com/apache/airflow

这里是官方文档

https://airflow.apache.org/docs/apache-airflow/stable/index.html

这里是我学习的视频资料

https://www.bilibili.com/video/BV19f4y1V7UG/
博主的视频有一些老了,不过也是可以学习的。

先看一下成果吧
在这里插入图片描述

这里是首页记录了,我得dags任务
dags就相当于我得一个爬虫任务,
在这里插入图片描述
我们进入这个dag里面,可以看到
有4个小任务(task)和一task执行的状态,还有一些基本信息。
在这里插入图片描述
可以看每一个task的日志,如果发生问题也可以更方便的定位问题的。
每个dag都有很多配置,比如定时任务,失败重试,自动拉起,报警邮件等功能。
以后再也不用上服务器去看日志了
嘻嘻嘻嘻嘻

conda环境安装

这里需要一个干净的环境,我使用的miniconda
来到 conda网站

https://docs.conda.io/projects/miniconda/en/latest/miniconda-other-installer-links.html

在这里插入图片描述
这里选择linux 的 我是用的是python3.8
然后复制这个链接地址。
在这里插入图片描述
然后我们到服务器上将这个包下好。
执行命令

wget https://repo.anaconda.com/miniconda/Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

在这里插入图片描述
下好之后。
安装好之后,执行

bash Miniconda3-py38_23.9.0-0-Linux-x86_64.sh

然后一直yes 就ok
在这里插入图片描述
conda这里就装好了
在这里插入图片描述
也带着python。然后我们创建一个环境

conda create -n airflow  # 创建环境
conda activate airflow   # 激活环境

这样环境就差不多了
安装一下airflow

 conda install apache-airflow

在这里插入图片描述
这里会依赖很多包
在这里插入图片描述
执行airflow命令,出现这些,说明airflow已经装好了。

airflow_76">配置airflow

找到你得airflow路径,
里面会有一个airflow.cfg文件
在这里插入图片描述
这里是配置dags的文件路径
在这里插入图片描述
这里默认使用的是sqlite数据库,
后续也可以改成mysql

https://blog.csdn.net/qq_43439214/article/details/129898191
这个博主的文章很不错,将airflow的数据库换成了mysql

我这里就先用默认的了,
执行命令

airflow db init 

在这里插入图片描述
出现下面内容说明数据库已经初始化好了。
执行下面命令进行创建用户

airflow users create --username zhang --firstname zhang --lastname zhang --role Admin --email  airflow@example.com

并为其用户设置密码。
配置好了之后
我们开一个会话
在这里插入图片描述
来通过webserver开启 web服务,这里在要访问你得公网地址默认端口是8080
在这里插入图片描述
到这里来到了ui界面输入,你刚刚创建的用户名和密码。
在这里插入图片描述
来到这里了。
这样我们的配置基本就没问题了。

举个例子

来举一个例子,
来写一个dags看一下。

python">from __future__ import annotations
# [START tutorial]
# [START import_module]
import os
import sys
from datetime import datetime, timedelta
from textwrap import dedent
from airflow.operators.python import PythonOperator, BranchPythonOperator


# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator

# [END import_module]
# [START instantiate_dag]


def first_task():
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")
    print("这里是first_task")


def second_task():
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")
    print("这里是second_task")


with DAG(
    dag_id="test_airflow",
    default_args={
        'owner': "guapisansan",
        "depends_on_past": True,
        "email": ["177664833@qq.com"],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
    },
    # [END default_args]
    description="测试一把",
    schedule_interval='0 15 * * *',  # 设置调度间隔为每天下午六点,
    catchup=False,
    start_date=datetime(2023, 10, 23),
    tags=["test"],
) as dag:
    first_task_op = PythonOperator(
        task_id="first_task",
        python_callable=first_task
    )

    second_task_op = PythonOperator(
        task_id="second_task",
        python_callable=second_task
    )

    first_task_op >> second_task_op


我这里给了一段非常简单的代码。
大概意思就是有两个task,分别是first和second 他们对应的也分别是两个python函数
配置了一些参数,如 description dag的简介, 还有一些参数,可以在官方文档查询。

记不记得我们的airflow.cfg文件里面有一个这个参数
在这里插入图片描述
这里就是我们存放所有 dags的路径。一定要是绝对路径
将刚刚写的脚本放到这个路径下,
然后我们在执行命令

airflow dags list

在这里插入图片描述
这里存在我刚刚的创建的test_airflow
这时候我们在创建一个会话
执行命令

airflow scheduler

这是一个调度命令
这时候我们刷新刚刚的web页面
在这里插入图片描述
发现我们dags已经挂到ui页面上了
我们点进去看看
在这里插入图片描述
看到了我们的子任务task 出现了,
现在需要手动触发一下看看结果。
在这里插入图片描述
点击这个trigger按钮
在这里插入图片描述
可以看到这两个task都执行成功了。
右边的框里面有执行时间,还有一些任务的基础信息,
在这里插入图片描述
在这里插入图片描述
点击小图片查看任务执行的日志,
发现我们print打印出来了,
我这里只是举一个小例子先跑起来,可以根据自己的脚本进行配置等等。
airflow是一个非常好用的任务调度工具,相对于数据处理更好,我们当作爬虫管理系统也可以用。
用它将分散的脚本整理,更加方便观察和调度。


http://www.niftyadmin.cn/n/5121027.html

相关文章

Istio实战(六)- Istio 部署

部署Istio方法 IstioctlIstio OperatorHelm4.2 使用Istioctl 部署 # mkdir /apps # cd /apps # curl -L https://istio.io/downloadIstio | ISTIO_VERSION=1.13.3 TARGET_ARCH=x86_64 sh - # ln -sf /apps/istio-1.13.3 /apps/istio # ln -sf /apps/istio/bin/istioctl /usr/b…

基于springboot实现乐校园二手书交易管理系统【项目源码+论文说明】

基于springboot实现乐校园二手书交易管理系统演示 摘要 在Internet高速发展的今天,我们生活的各个领域都涉及到计算机的应用,其中包括乐校园二手书交易管理系统的网络应用,在外国二手书交易管理系统已经是很普遍的方式,不过国内的…

hdlbits系列verilog解答(向量级联)-18

文章目录 一、问题描述二、verilog源码三、仿真结果一、问题描述 级联运算符允许将向量连接在一起以形成更大的向量。但是有时您希望将同一个数据级联在一起很多次,而做类似 assign a = {b,b,b,b,b,b}; .复制运算符允许重复一个向量并将它们连接在一起:{num{vector}}。这将按…

单片机核心/RTOS必备 (前言)

学习路线差别 单片机入门(HAL):简单、快速、实际上工作中涉及单片机编程时,也提倡使用HAL库。对于学习来说,HAL库封装了很多技术细节,对技术成长帮助不大。 比如,可能接触不到:重定…

【Html】交通灯问题

效果 实现方式 计时器:setTimeout或setInterval来计时。setInterval和 setTimeout 在某些情况下可能会出现计时不准确的情况。这通常是由于JavaScript的事件循环机制和其他代码执行所需的时间造成的。 问询:通过getCurrentLight将每个状态的持续时间设置…

基于 QT+CEF 开发跨平台软件

在上篇文章提到,国替给软件开发带来一些新机会,这不,机会立刻来了。为国产系统开发版本的单子已经谈妥,但客户要的比较急,希望一个月就交付。 目前跨平台软件开发的框架很多,比如 Electron、Flutter、Tarui…

【合集】Spring Cloud 组件——架构进化史话 Eureka,Nacos,OpenFeign,Ribbon,Sentinel,Gateway . . .

前言 Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线,一次性令牌,全局锁,领导选举&…

使用cannon.js创建3D物理仿真场景

本文将详细介绍使用cannon.js创建3D物理仿真场景的步骤和技巧。 一、cannon.js简介 cannon.js是一个开源的JavaScript物理库,用于实现3D物理仿真。它可以被用于游戏开发、机器人控制、交互式的3D应用以及其他需要物理交互的场景。 与其他物理库不同的是&#xff…