自定义数据源
Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等,但是这些并不能满足我们的所有需求,所以spark streaming提供了自定义数据源的功能,只需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY){
var isStop : Boolean = false override def onStart(): Unit = { new Thread("Socket Receiver") { override def run() { while(!isStop){ val message = new Random().nextPrintableChar().toString
store(message) Thread.sleep(500) }
} }.start() }
override def onStop(): Unit = { isStop = true
} }
|
在使用自定义数据源时,只需要去接收该数据源数据即可
1 2 3 4 5
| val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
val streamingContext = new StreamingContext(sparkConf, Seconds(3))
val stream = streamingContext.receiverStream(new MyReceiver)
|