Intro to the Python DataStream API
Python DataStream API简介
Flink中的DataStream程序是对数据流实现转换(例如,过滤,更新状态,定义窗口,聚合)
数据流最初是通过各种来源创建的(例如,消息队列,socket streams,文件)
结果是通过sinks返回, 可以写入到文件或者标准输出
Python DataStream API 是一个 DataStream API Python版本
它允许Python users 写Python DatStream API job
Python DataStream API程序的通用结构#
from pyflink.common import WatermarkStrategy, Row
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource
from pyflink.datastream.functions import RuntimeContext, MapFunction
from pyflink.datastream.state import ValueStateDescriptor
class MyMapFunction(MapFunction):
def open(self, runtime_context: RuntimeContext):
state_desc = ValueStateDescriptor('cnt', Types.PICKLED_BYTE_ARRAY())
self.cnt_state = runtime_context.get_state(state_desc)
def map(self, value):
cnt = self.cnt_state.value()
if cnt is None or cnt < 2:
self.cnt_state.update(1 if cnt is None else cnt + 1)
return value[0], value[1] + 1
else:
return value[0], value[1]
def state_access_demo():
# 1. create a StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# 2. create source DataStream
seq_num_source = NumberSequenceSource(1, 10000)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
# 3. define the execution logic
ds = ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \
.key_by(lambda a: a[0]) \
.map(MyMapFunction(), output_type=Types.TUPLE([Types.LONG(), Types.LONG()]))
# 4. create sink and emit result to sink
output_path = '/opt/output/'
file_sink = FileSink \
.for_row_format(output_path, Encoder.simple_string_encoder()) \
.with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
.build()
ds.sink_to(file_sink)
# 5. execute the job
env.execute('state_access_demo')
if __name__ == '__main__':
state_access_demo()
创建流执行环境
StreamExecutionEnvironment 是DataStream API program的核心概念
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
Create a DataStream #
DataStream API 从special DataStream class 获取它的名字
用于表示一组数据
一个DataStream 是类似于一个普通的Python Collection
但在某些关键方面确截然不同
它们是不变的,意味着一旦它们被创建出来
你不能增加或者删除元素
你可以同添加一个源来创建一个初始化流 。
然后你可以从中派出新的流
Create from a list object #
你可以从列表对象创建数据流:
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
[root@master flink]# cat t108.py
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
ds = env.from_collection(
collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.print()
env.execute()
[root@master flink]# python3 t108.py
4> +I[3, aaa|a]
2> +I[1, aaa|bb]
3> +I[2, bb|a]
[root@master flink]# python3 t108.py
3> +I[2, bb|a]
4> +I[3, aaa|a]
2> +I[1, aaa|bb]
参数type_info是可选的 ,如果未指定, the output type of the returned DataStream will be Types.PICKLED_BYTE_ARRAY().
Create using DataStream connectors #
你也可以使用add_source方法的数据流连接器来创建数据流
[root@master flink]# cat t109.py
from pyflink.common.serialization import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
env = StreamExecutionEnvironment.get_execution_environment()
# the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
env.add_jars("file:///root/flink-sql-connector-kafka_2.11-1.14.4.jar")
deserialization_schema = JsonRowDeserializationSchema.builder() \
.type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
kafka_consumer = FlinkKafkaConsumer(
topics='test1',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': '127.0.0.1:9092'})
ds = env.add_source(kafka_consumer)
ds.print()
env.execute('kafka_consumer')
注意:它目前只支持使用FlinkKafkaConsumer 作为DataStream source connectors
使用add_source方法
注意: The DataStream 创建使用add_source 只能在流模式下执行
你也可以调用from_source方法,使用同一的数据流源连接器创建数据流:
[root@master flink]# cat t110.py
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource
env = StreamExecutionEnvironment.get_execution_environment()
seq_num_source = NumberSequenceSource(1, 13)
ds = env.from_source(
source=seq_num_source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name='seq_num_source',
type_info=Types.LONG())
ds.print()
env.execute()
[root@master flink]# python3 t110.py
1> 5
1> 6
1> 7
2> 8
2> 9
2> 10
1> 11
1> 12
1> 13
3> 1
3> 2
3> 3
3> 4
Create using Table & SQL connectors #
Table & SQL 连接器可以用于创建一个DataStream
你可以首选创建一个表使用Table & SQL connectors 然后将其转换为DataStream
[root@master flink]# cat t111.py
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
ds.print()
env.execute()
[root@master flink]# python3 t111.py
1> +I[-2067256075, 863b6a9eedaa9f768c40f5121849281cde26ce5429b61bfd5aa1f5652d2f9a3d3d676d10a2625108a3ecfeb14fa32c9cf040]
1> +I[101276961, 9da7dc03a6a02c2d11fe6882aca2d394e7e5efd59ba5a0ee1be04bf87dc98eda79d4953f373793928ccefe760393d4537df9]
1> +I[1564072475, 9e843321728762f984a14213d15a6d21ed377780abf3e687070db1f36d5ae6fb3088dee950370013d3bd78a4bf8f0eae3291]
4> +I[-1081557692, 342d88ef3562de78b96484e8f3fc8b18cd71658bbc70a7fd609e3c9c50896c8a6b9e63065f30cecc19075d8400a6c5db34de]
4> +I[-775958658, c4017cbd2f9644467c5d7dabedf8f56588127dfc057aca765c6bbed1f8c7da326d087259f89f6f348b214006aa0c8c1e2fad]
3> +I[1213087197, bc2e6ea4272ae2129f50b6b2ba246e1d126493728d6c485e84edc67885ffe701eb32ec2f72eac3a5fcceab311af9f0cf9af9]
3> +I[-1213361843, 42a8f275f61d93fea78145dec35c085b0e2f2f58f16391b78133c8a883efeda5ab8cb8871601a2cb073bb21c1b871920a15d]
2> +I[-366331131, 4bb8aefd2a04cbccb0d9aeb340bbec78091448467c3a36371c0d6daccfbc8ae2b5a44e6dd9c83315a29a6a13b94aa70032a9]
2> +I[-486375425, a4e43a7867570823bd72a8bcdf684ab57984ca5ea38ca73edd86163e1155c7ba6e583d6fd6880ba3d075d563f53d62f6785b]
2> +I[1329260098, 7d1b5577526b00fb4764f8127979aeab157314ce9c9c044745628feceb0a5c453c80b99a279734693943bebbd56e8b093245]
[root@master flink]#
DataStream Transformations #
操作转换一个或者多个DataStream到一个新的DataStream
程序可以组合多个转换到一个复杂的dataflow topologies
Conversion between DataStream and Table #
Submit Job #
最后,你应该调用StreamExecutionEnvironment.execute 执行方法来提交作业
env.execute()
If you convert the DataStream to a Table and then write it to a Table API & SQL sink connector,
it may happen that you need to submit the job using TableEnvironment.execute method.
t_env.execute()