Intro to the Python DataStream API #

news/2024/7/19 11:27:10 标签: python, 爬虫, 开发语言

Intro to the Python DataStream API

Python DataStream API简介

Flink中的DataStream程序是对数据流实现转换(例如,过滤,更新状态,定义窗口,聚合)

数据流最初是通过各种来源创建的(例如,消息队列,socket streams,文件)

结果是通过sinks返回, 可以写入到文件或者标准输出

Python DataStream API 是一个 DataStream API Python版本  

它允许Python users 写Python DatStream API  job

Python DataStream API程序的通用结构#

from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor


class MyMapFunction(MapFunction):

    def open(self, runtime_context: RuntimeContext):
        state_desc = ValueStateDescriptor('cnt', Types.PICKLED_BYTE_ARRAY())
        self.cnt_state = runtime_context.get_state(state_desc)

    def map(self, value):
        cnt = self.cnt_state.value()
        if cnt is None or cnt < 2:
            self.cnt_state.update(1 if cnt is None else cnt + 1)
            return value[0], value[1] + 1
        else:
            return value[0], value[1]


def state_access_demo():
    # 1. create a StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()

    # 2. create source DataStream
    seq_num_source = NumberSequenceSource(1, 10000)
    ds = env.from_source(
        source=seq_num_source,
        watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
        source_name='seq_num_source',
        type_info=Types.LONG())

    # 3. define the execution logic
    ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
           .key_by(lambda a: a[0]) \
           .map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))

    # 4. create sink and emit result to sink
    output_path = '/opt/output/'
    file_sink = FileSink \
        .for_row_format(output_path, Encoder.simple_string_encoder()) \
        .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
        .build()
    ds.sink_to(file_sink)

    # 5. execute the job
    env.execute('state_access_demo')


if __name__ == '__main__':
    state_access_demo()

创建流执行环境

StreamExecutionEnvironment 是DataStream API program的核心概念

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()


Create a DataStream #


DataStream API  从special DataStream class 获取它的名字

用于表示一组数据 

一个DataStream 是类似于一个普通的Python Collection

但在某些关键方面确截然不同  

它们是不变的,意味着一旦它们被创建出来 

你不能增加或者删除元素 

你可以同添加一个源来创建一个初始化流  。

然后你可以从中派出新的流 

Create from a list object #


你可以从列表对象创建数据流:

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
    
    
[root@master flink]# cat t108.py 
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.print()
env.execute()
[root@master flink]# python3 t108.py 
4> +I[3, aaa|a]
2> +I[1, aaa|bb]
3> +I[2, bb|a]
[root@master flink]# python3 t108.py 
3> +I[2, bb|a]
4> +I[3, aaa|a]
2> +I[1, aaa|bb]

参数type_info是可选的 ,如果未指定, the output type of the returned DataStream will be Types.PICKLED_BYTE_ARRAY().

Create using DataStream connectors #

你也可以使用add_source方法的数据流连接器来创建数据流

[root@master flink]# cat t109.py 
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")

deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_consumer = FlinkKafkaConsumer(
    topics='test1',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': '127.0.0.1:9092'})

ds = env.add_source(kafka_consumer)
ds.print()
env.execute('kafka_consumer')

注意:它目前只支持使用FlinkKafkaConsumer 作为DataStream source connectors

使用add_source方法

注意: The DataStream 创建使用add_source 只能在流模式下执行

你也可以调用from_source方法,使用同一的数据流源连接器创建数据流:


[root@master flink]# cat t110.py 
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource

env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 13)
ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='seq_num_source',
    type_info=Types.LONG())
ds.print()
env.execute()
[root@master flink]# python3 t110.py 
1> 5
1> 6
1> 7
2> 8
2> 9
2> 10
1> 11
1> 12
1> 13
3> 1
3> 2
3> 3
3> 4

Create using Table & SQL connectors #

Table & SQL 连接器可以用于创建一个DataStream

你可以首选创建一个表使用Table & SQL connectors 然后将其转换为DataStream

[root@master flink]# cat t111.py 
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
        CREATE TABLE my_source (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
        )
    """)

ds = t_env.to_append_stream(
    t_env.from_path('my_source'),
    Types.ROW([Types.INT(), Types.STRING()]))
ds.print()
env.execute()
[root@master flink]# python3 t111.py 
1> +I[-2067256075, 863b6a9eedaa9f768c40f5121849281cde26ce5429b61bfd5aa1f5652d2f9a3d3d676d10a2625108a3ecfeb14fa32c9cf040]
1> +I[101276961, 9da7dc03a6a02c2d11fe6882aca2d394e7e5efd59ba5a0ee1be04bf87dc98eda79d4953f373793928ccefe760393d4537df9]
1> +I[1564072475, 9e843321728762f984a14213d15a6d21ed377780abf3e687070db1f36d5ae6fb3088dee950370013d3bd78a4bf8f0eae3291]
4> +I[-1081557692, 342d88ef3562de78b96484e8f3fc8b18cd71658bbc70a7fd609e3c9c50896c8a6b9e63065f30cecc19075d8400a6c5db34de]
4> +I[-775958658, c4017cbd2f9644467c5d7dabedf8f56588127dfc057aca765c6bbed1f8c7da326d087259f89f6f348b214006aa0c8c1e2fad]
3> +I[1213087197, bc2e6ea4272ae2129f50b6b2ba246e1d126493728d6c485e84edc67885ffe701eb32ec2f72eac3a5fcceab311af9f0cf9af9]
3> +I[-1213361843, 42a8f275f61d93fea78145dec35c085b0e2f2f58f16391b78133c8a883efeda5ab8cb8871601a2cb073bb21c1b871920a15d]
2> +I[-366331131, 4bb8aefd2a04cbccb0d9aeb340bbec78091448467c3a36371c0d6daccfbc8ae2b5a44e6dd9c83315a29a6a13b94aa70032a9]
2> +I[-486375425, a4e43a7867570823bd72a8bcdf684ab57984ca5ea38ca73edd86163e1155c7ba6e583d6fd6880ba3d075d563f53d62f6785b]
2> +I[1329260098, 7d1b5577526b00fb4764f8127979aeab157314ce9c9c044745628feceb0a5c453c80b99a279734693943bebbd56e8b093245]
[root@master flink]# 


DataStream Transformations #


操作转换一个或者多个DataStream到一个新的DataStream

程序可以组合多个转换到一个复杂的dataflow topologies

Conversion between DataStream and Table #


Submit Job #


最后,你应该调用StreamExecutionEnvironment.execute 执行方法来提交作业

env.execute()
If you convert the DataStream to a Table and then write it to a Table API & SQL sink connector,

 it may happen that you need to submit the job using TableEnvironment.execute method.

t_env.execute()


http://www.niftyadmin.cn/n/343878.html

相关文章

如何使用python实现分类问题中以一定概率生成类标签?

如何使用python实现分类问题中以一定概率生成类标签&#xff1f; 请参考一下 https://gitee.com/chenglinyu99/generate_number_certain_probability 这也是我的仓库。

若依源码解析:用LoggingAspect进行日志处理

文章目录 摘要源代码LoggingAspect的作用Pointcut的作用在logback.xml配置文件中添加 <logger> 元素 摘要 本文将深入探讨若依框架中的LoggingAspect切面的作用和重要性。LoggingAspect是若依框架中用于实现日志记录功能的切面组件&#xff0c;它通过切点和切面的概念&a…

chatgpt赋能Python-pythonfloat设置精度

Python float设置精度 在Python中&#xff0c;float是一种表示小数的数据类型。但是在某些情况下&#xff0c;我们可能需要设置float的精度&#xff0c;以避免舍入误差造成的问题。在本文中&#xff0c;我们将介绍如何在Python中设置float的精度。 为什么需要设置float的精度…

项目中遇到的一些问题总结(七)

MySQL 幻读 幻读&#xff08;Phantom Read&#xff09;是指在同一事务中&#xff0c;针对一个表&#xff0c;多次执行某个查询时结果集不同&#xff0c;导致出现“幻行”的情况。通俗点说&#xff0c;就是一个事务在某个字段上执行了查询&#xff0c;得到一组数据&#xff0c;…

阿里云大学考试python初级-python初级

阿里云大学考试python初级题目及解析 1.Python中用来处理可视化视图的模块是&#xff08;&#xff09; A.numpy B.pandas C.matplotib D.scipy C numpy是数组矩阵运算库 pandas是数据分析库 matplotlib是可视化视图模块 scipy是机器学习分析库 2.range&#xff08;&#x…

chatgpt赋能Python-pythonf检验

Python的重要性与应用 Python是一种高级编程语言&#xff0c;因其简单易学和灵活性而备受欢迎。它已经成为数据分析、web开发、机器学习等许多领域的重要工具。在本篇文章中&#xff0c;我们将探讨Python在SEO中的作用。 Python对SEO的影响 SEO是搜索引擎优化的缩写&#xf…

人工智能之读懂CNN卷积神经网络

通过往期文章的分享,我们了解了神经网络的结构,一般分为输入层,隐藏层,输出层 TensorFlow神经网络 那什么是卷积神经网络那,这就要我们追溯一下人类识别图像的原理 人类的视觉原理如下:从原始信号摄入开始(瞳孔摄入像素 Pixels),接着做初步处理(大脑皮层某些细胞发现…

chatgpt赋能Python-pythonfrom

PythonFrom是什么&#xff1f; PythonFrom 是一种基于 Python 语言的开源数据采集与清洗框架&#xff0c;它提供了现代化的数据处理流程&#xff0c;非常适合于爬虫、数据挖掘和机器学习等应用场景。 特点 1. 简单易学 PythonFrom 采用了类似于 SQL 的语法结构&#xff0c;…