一.背景
这个和demo1类似,只是提供另外一种实现方式,类似kafka 利用streamTableSource 来做。
二.代码
@Data @ToString public class UserInfo implements Serializable { private Timestamp pTime; private String userId; private String itemId; public UserInfo() { } public UserInfo(String userId, String itemId) { this.userId = userId; this.itemId = itemId; this.pTime = new Timestamp(System.currentTimeMillis()); } }
public class UserTableSource implements StreamTableSource<UserInfo>, DefinedRowtimeAttributes { /** * 返回类型 * @return */ @Override public TypeInformation<UserInfo> getReturnType() { return TypeInformation.of(UserInfo.class); } @Override public TableSchema getTableSchema() { // 可以 这样定义 // TableSchema schema = new TableSchema( // new String[]{"pTime","userId","itemId"}, // new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.STRING}); return TableSchema.fromTypeInfo(getReturnType()); } @Override public String explainSource() { return "userSource"; } @Override public DataStream<UserInfo> getDataStream(StreamExecutionEnvironment execEnv) { UserDataSource source = new UserDataSource(); DataStream<UserInfo> userInfoDataStream = execEnv.addSource(source); return userInfoDataStream; } @Override public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() { RowtimeAttributeDescriptor descriptor = new RowtimeAttributeDescriptor("pTime", new ExistingField("pTime"), new AscendingTimestamps()); return Collections.singletonList(descriptor); } }
public class UserStreamTableApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // create a TableEnvironment StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); tEnv.registerTableSource("test", new UserTableSource()); Table result = tEnv.sqlQuery("SELECT userId,TUMBLE_END(pTime, INTERVAL '5' SECOND) as pTime,count(1) as cnt FROM test" + " GROUP BY TUMBLE(pTime, INTERVAL '5' SECOND),userId "); // deal with (Tuple2<Boolean, Row> value) -> out.collect(row) SingleOutputStreamOperator allClick = tEnv.toRetractStream(result, Row.class) .flatMap((Tuple2<Boolean, Row> value, Collector<Row> out) -> { out.collect(value.f1); }).returns(Row.class); // add sink or print allClick.print(); env.execute("test"); } }
相关推荐
赠送jar包:flink-table-planner-blink_2.11-1.12.7.jar; 赠送原API文档:flink-table-planner-blink_2.11-1.12.7-javadoc.jar; 赠送源代码:flink-table-planner-blink_2.11-1.12.7-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-planner-blink_2.11-1.13.2.jar; 赠送原API文档:flink-table-planner-blink_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-table-planner-blink_2.11-1.13.2-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-api-java-bridge_2.11-1.12.7.jar; 赠送原API文档:flink-table-api-java-bridge_2.11-1.12.7-javadoc.jar; 赠送源代码:flink-table-api-java-bridge_2.11-1.12.7-sources.jar; 赠送...
赠送jar包:flink-table-planner_2.12-1.14.3.jar 赠送原API文档:flink-table-planner_2.12-1.14.3-javadoc.jar 赠送源代码:flink-table-planner_2.12-1.14.3-sources.jar 包含翻译后的API文档:flink-table-...
赠送jar包:flink-table-common-1.12.7.jar; 赠送原API文档:flink-table-common-1.12.7-javadoc.jar; 赠送源代码:flink-table-common-1.12.7-sources.jar; 赠送Maven依赖信息文件:flink-table-common-1.12.7....
赠送jar包:flink-table-runtime-blink_2.11-1.10.0.jar; 赠送原API文档:flink-table-runtime-blink_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-table-runtime-blink_2.11-1.10.0-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-api-java-bridge_2.11-1.13.2.jar; 赠送原API文档:flink-table-api-java-bridge_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-table-api-java-bridge_2.11-1.13.2-sources.jar; 赠送...
赠送jar包:flink-table-planner-blink_2.11-1.13.2.jar; 赠送原API文档:flink-table-planner-blink_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-table-planner-blink_2.11-1.13.2-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-planner-blink_2.11-1.10.0.jar; 赠送原API文档:flink-table-planner-blink_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-table-planner-blink_2.11-1.10.0-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-api-java-bridge_2.11-1.10.0.jar; 赠送原API文档:flink-table-api-java-bridge_2.11-1.10.0-javadoc.jar; 赠送源代码:flink-table-api-java-bridge_2.11-1.10.0-sources.jar; 赠送...
赠送jar包:flink-table-runtime-blink_2.11-1.13.2.jar; 赠送原API文档:flink-table-runtime-blink_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-table-runtime-blink_2.11-1.13.2-sources.jar; 赠送Maven依赖...
赠送jar包:flink-table-api-java-1.14.3.jar 赠送原API文档:flink-table-api-java-1.14.3-javadoc.jar 赠送源代码:flink-table-api-java-1.14.3-sources.jar 包含翻译后的API文档:flink-table-api-java-...
赠送jar包:flink-table-api-java-bridge_2.12-1.14.3.jar; 赠送原API文档:flink-table-api-java-bridge_2.12-1.14.3-javadoc.jar; 赠送源代码:flink-table-api-java-bridge_2.12-1.14.3-sources.jar; 赠送...
赠送jar包:flink-table-api-java-bridge_2.12-1.14.3.jar 赠送原API文档:flink-table-api-java-bridge_2.12-1.14.3-javadoc.jar 赠送源代码:flink-table-api-java-bridge_2.12-1.14.3-sources.jar 包含翻译后...
赠送jar包:flink-table-common-1.13.2.jar; 赠送原API文档:flink-table-common-1.13.2-javadoc.jar; 赠送源代码:flink-table-common-1.13.2-sources.jar; 赠送Maven依赖信息文件:flink-table-common-1.13.2....
赠送jar包:flink-table-common-1.14.3.jar 赠送原API文档:flink-table-common-1.14.3-javadoc.jar 赠送源代码:flink-table-common-1.14.3-sources.jar 包含翻译后的API文档:flink-table-common-1.14.3-...
赠送jar包:flink-table-planner_2.12-1.14.3.jar; 赠送原API文档:flink-table-planner_2.12-1.14.3-javadoc.jar; 赠送源代码:flink-table-planner_2.12-1.14.3-sources.jar; 赠送Maven依赖信息文件:flink-...
赠送jar包:flink-table-api-java-bridge_2.11-1.13.2.jar; 赠送原API文档:flink-table-api-java-bridge_2.11-1.13.2-javadoc.jar; 赠送源代码:flink-table-api-java-bridge_2.11-1.13.2-sources.jar; 赠送...
赠送jar包:flink-table-common-1.13.2.jar; 赠送原API文档:flink-table-common-1.13.2-javadoc.jar; 赠送源代码:flink-table-common-1.13.2-sources.jar; 赠送Maven依赖信息文件:flink-table-common-1.13.2....
赠送jar包:flink-table-runtime_2.12-1.14.3.jar; 赠送原API文档:flink-table-runtime_2.12-1.14.3-javadoc.jar; 赠送源代码:flink-table-runtime_2.12-1.14.3-sources.jar; 赠送Maven依赖信息文件:flink-...