加入收藏 | 设为首页 | 会员中心 | 我要投稿 PHP编程网 - 黄冈站长网 (http://www.0713zz.com/)- 数据应用、建站、人体识别、智能机器人、语音技术!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-17 17:27:07 所属栏目:教程 来源:孙金城
导读:副标题#e# 一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Q

自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

(1) Source Function定义

支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:

  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  2. extends SourceFunction[T] { 
  3. override def run(ctx: SourceContext[T]): Unit = { 
  4. dataWithTimestampList.foreach { 
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  6. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  7. override def cancel(): Unit = ???} 

(2) 定义 StreamTableSource

我们自定义的Source要携带我们测试的数据,以及对应WaterMark数据,具体如下:

  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { 
  2.  
  3. val fieldNames = Array("accessTime", "region", "userId") 
  4. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) 
  5. val rowType = new RowTypeInfo( 
  6. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], 
  7. fieldNames) 
  8.  
  9. // 页面访问表数据 rows with timestamps and watermarks 
  10. val data = Seq( 
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), 
  12. Right(1510365660000L), 
  13. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), 
  14. Right(1510365660000L), 
  15. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), 
  16. Right(1510366200000L), 
  17. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), 
  18. Right(1510366260000L), 
  19. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), 
  20. Right(1510373400000L) 
  21.  
  22. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { 
  23. Collections.singletonList(new RowtimeAttributeDescriptor( 
  24. "accessTime", 
  25. new ExistingField("accessTime"), 
  26. PreserveWatermarks.INSTANCE)) 
  27.  
  28. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { 
  29. execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType) 
  30.  
  31. override def getReturnType: TypeInformation[Row] = rowType 
  32.  
  33. override def getTableSchema: TableSchema = schema 
  34.  

2. Sink 定义

(编辑:PHP编程网 - 黄冈站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读