前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住给大家分享一下。点击跳转到网站:https://www.captainai.net/dongkelun
前言
平时工作中有时会遇到 DataStream 转 Table 的需求,之前也写过几次,但是不总结就会忘掉还得专门扒拉之前写的代码,比较麻烦,现在总结一下,方便后面再有需要时查找。
DataStream
DataStream<Tuple> 转 Table1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16DataStream<Tuple5<Integer, String, Double, Long, String>> dataStream = env.fromCollection(
java.util.Arrays.asList(
Tuple5.of(1, "手机", 5999.99, 1733400000000L, "2025-12-05"),
Tuple5.of(2, "电脑", 8999.50, 1733400001000L, "2025-12-05"),
Tuple5.of(3, "耳机", 399.00, 1733400002000L, "2025-12-05")
)
);
// 字段顺序必须和 Tuple5 一致:id(第1位)、name(第2位)、price(第3位)、ts(第4位)、dt(第5位)
Table table = tableEnv.fromDataStream(
dataStream,
"id, name, price, ts, dt" // 直接指定字段名,类型会自动匹配(Tuple5 的泛型)
);
table.execute().print();
table.printSchema();
打印如下:
1 | +-------------+--------------------------------+--------------------------------+----------------------+--------------------------------+ |
如果不指定字段名称,则默认的字段名称为 f0,f1,f2,f3,f4
DataStream|
DataStream<Row> 转 Table1
2
3
4
5
6
7
8
9
10
11
12
13
14DataStream<Row> dataStream = env.fromCollection(
java.util.Arrays.asList(
Row.of(1, "手机", 5999.99, 1733400000000L, "2025-12-05"),
Row.of(2, "电脑", 8999.50, 1733400001000L, "2025-12-05"),
Row.of(3, "耳机", 399.00, 1733400002000L, "2025-12-05")
)
);
Table table = tableEnv.fromDataStream(
dataStream,
"id, name, price, ts, dt"
);
table.execute().print();
结果和 DataStream
DataStream
其实 DataStream<Tuple> 和 DataStream<Row> 都比较简单,DataStream<JSONArray> 稍微复杂一些,需要先将其转换为 DataStream<Tuple> 和 DataStream<Row>,复杂点就主要在于这个转换的过程。本文主要记录的就是 DataStream<JSONArray> 怎么转化为 Table 的, 这个可能实际需求也会更多一点,比如将从接口返回的 JSONArray 转为 Table。
造数
1 | private static JSONArray getJsonArray() { |
构造 DataStream 和 DataStream
JSONArray 可以直接构造出 DataStream
1 | JSONArray jsonArray = getJsonArray(); |
打印结果:1
2
3
4[{"dt":"2025-12-05","price":5999.99,"name":"手机","id":1,"ts":1733400000000},{"dt":"2025-12-05","price":8999.5,"name":"电脑","id":2,"ts":1733400001000},{"dt":"2025-12-05","price":399.0,"name":"耳机","id":3,"ts":1733400002000}]
{"dt":"2025-12-05","price":5999.99,"name":"手机","id":1,"ts":1733400000000}
{"dt":"2025-12-05","price":8999.5,"name":"电脑","id":2,"ts":1733400001000}
{"dt":"2025-12-05","price":399.0,"name":"耳机","id":3,"ts":1733400002000}
然后 DataStream
先转 DataStream
1 | JSONArray jsonArray = getJsonArray(); |
但是这样会报错:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16The return type of function 'dataStreamTupleLambda2Table(JsonArrayToTable.java:104)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.dag.Transformation.getOutputType(Transformation.java:508)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:191)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.asQueryOperation(AbstractStreamTableEnvironmentImpl.java:277)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:295)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:290)
at com.dkl.flink.stream.JsonArrayToTable.dataStreamTupleLambda2Table(JsonArrayToTable.java:118)
at com.dkl.flink.stream.JsonArrayToTable.main(JsonArrayToTable.java:25)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface. Otherwise the type has to be specified explicitly using type information.
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:371)
at org.apache.flink.api.java.typeutils.TypeExtractionUtils.extractTypeFromLambda(TypeExtractionUtils.java:188)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:557)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:174)
at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:610)
at com.dkl.flink.stream.JsonArrayToTable.dataStreamTupleLambda2Table(JsonArrayToTable.java:104)
... 1 more
有两个异常信息提示:
- The return type of function ‘dataStreamTupleLambda2Table(JsonArrayToTable.java:104)’ could not be determined automatically, due to type erasure. You can give type information hints by using the returns(…) method on the result of the transformation call, or by letting your function implement the ‘ResultTypeQueryable’ interface.
- 由于类型擦除,无法自动确定函数“dataStreamTupleLambda2Table(JsonArrayToTable.java:104)”的返回类型。您可以通过在转换调用的结果上使用returns(…)方法,或者让您的函数实现’ResultTypeQueryable’接口,来给出类型信息提示。
- The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.
- “Collector”的泛型类型参数缺失。在涉及Java泛型时,lambda方法往往无法提供足够的信息以实现自动类型提取。一个简单的解决办法是使用一个(匿名)类来实现“org.apache.flink.api.common.functions.FlatMapFunction”接口。否则,必须使用类型信息来显式指定类型。
接下来按照上面两个提示分别修改测试
结果上使用 returns
1 | DataStream<Tuple5<Integer, String, Double, Long, String>> dataStream = jsonArrayDataStream |
匿名内部类
1 | DataStream<Tuple5<Integer, String, Double, Long, String>> dataStream = jsonArrayDataStream |
上面两种方法都可行
先转 DataStream|
和先转 DataStream
结果上使用 returns
1 | DataStream<Row> dataStream = jsonArrayDataStream |
匿名内部类
1 | DataStream<Row> dataStream = jsonArrayDataStream |
但是这种方式依然会报错1
2
3
4
5
6
7
8Exception in thread "main" org.apache.flink.table.api.ValidationException: An input of GenericTypeInfo<Row> cannot be converted to Table. Please specify the type of the input with a RowTypeInfo.
at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:287)
at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:259)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lambda$asQueryOperation$2(AbstractStreamTableEnvironmentImpl.java:284)
at java.util.Optional.map(Optional.java:215)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.asQueryOperation(AbstractStreamTableEnvironmentImpl.java:281)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:295)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:290)
GenericTypeInfo
RowTypeInfo
除了前面提到的在 flatMap / map 后面通过 returns 方法指定 RowTypeInfo 之外,还可以直接在 flatMap / map 的参数中指定:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21static RowTypeInfo rowTypeInfo = new RowTypeInfo(
Types.INT, // id 类型
Types.STRING, // name 类型
Types.DOUBLE, // price 类型
Types.LONG, // ts 类型
Types.STRING // dt 类型
);
DataStream<Row> dataStream = jsonArrayDataStream
.flatMap((FlatMapFunction<JSONArray, Row>) (jsonArray1, out) -> {
for (int i = 0; i < jsonArray1.size(); i++) {
JSONObject object = jsonArray1.getJSONObject(i);
out.collect(Row.of(
object.getInteger("id"),
object.getString("name"),
object.getDouble("price"),
object.getLong("ts"),
object.getString("dt")
));
}
}, rowTypeInfo);
另外 RowTypeInfo 支持设置字段名称,这样DataStream转Table时就不用指定字段名称了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24static TypeInformation<?>[] types = new TypeInformation[]{
Types.INT, // id 字段类型
Types.STRING, // name 字段类型
Types.DOUBLE, // price 字段类型
Types.LONG, // ts 字段类型
Types.STRING // dt 字段类型
};
static String[] fieldNames = new String[]{"id", "name", "price", "ts", "dt"};
static RowTypeInfo rowTypeInfoWithFields = new RowTypeInfo(types, fieldNames);
DataStream<Row> dataStream = jsonArrayDataStream
.flatMap((FlatMapFunction<JSONArray, Row>) (jsonArray1, out) -> {
for (int i = 0; i < jsonArray1.size(); i++) {
JSONObject object = jsonArray1.getJSONObject(i);
out.collect(Row.of(
object.getInteger("id"),
object.getString("name"),
object.getDouble("price"),
object.getLong("ts"),
object.getString("dt")
));
}
}, rowTypeInfoWithFields);
Table table = tableEnv.fromDataStream(dataStream);
最终建议使用这种方式。
DataStream
上面都是通过DataStream<JSONArray> flatMap 进行转化的,下面给出 DataStream
1 | DataStream<Row> dataStream = jsonObjectDataStream |