0%

自定义数据源

自定义数据源

Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等,但是这些并不能满足我们的所有需求,所以spark streaming提供了自定义数据源的功能,只需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 自定义数据源
class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY){

var isStop : Boolean = false
// 启动时调用,作用是用来读取数据并将数据发送给spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
while(!isStop){
// 生成字符
val message = new Random().nextPrintableChar().toString

store(message)
Thread.sleep(500)
}

}
}.start()
}


// 停止
override def onStop(): Unit = {
isStop = true

}
}

在使用自定义数据源时,只需要去接收该数据源数据即可

1
2
3
4
5
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkStreaming")
// 第二个参数是采集周期
val streamingContext = new StreamingContext(sparkConf, Seconds(3))

val stream = streamingContext.receiverStream(new MyReceiver)

欢迎关注我的其它发布渠道