Spark Streaming 自定义接收器
Spark Streaming 可以从任何任意数据源接收流数据,而不仅仅是其内置支持的数据源(即,Kafka、Kinesis、文件、套接字等)。这要求开发人员实现一个自定义的接收器,用于从相关数据源接收数据。本指南将介绍实现自定义接收器并在 Spark Streaming 应用程序中使用它的过程。请注意,自定义接收器可以用 Scala 或 Java 实现。
实现自定义接收器
首先是实现一个 Receiver(Scala 文档, Java 文档)。自定义接收器必须通过实现两个方法来扩展此抽象类
onStart()
: 启动数据接收时需要做的事情。onStop()
: 停止数据接收时需要做的事情。
onStart()
和 onStop()
都不能无限期阻塞。通常,onStart()
会启动负责接收数据的线程,而 onStop()
会确保这些接收数据的线程停止。接收线程还可以使用 isStopped()
(一个 Receiver
方法)来检查它们是否应该停止接收数据。
一旦数据被接收,可以通过调用 store(data)
将数据存储到 Spark 中,这是 Receiver 类提供的一个方法。store()
有多种形式,允许一次存储一个接收到的数据记录,或存储整个对象集合/序列化字节。请注意,用于实现接收器的 store()
形式会影响其可靠性和容错语义。这将在后面详细讨论。
接收线程中的任何异常都应被捕获并妥善处理,以避免接收器静默失败。restart(<exception>)
将通过异步调用 onStop()
,然后延迟调用 onStart()
来重启接收器。stop(<exception>)
将调用 onStop()
并终止接收器。此外,reportError(<error>)
会向驱动程序报告错误消息(在日志和 UI 中可见),而不会停止/重启接收器。
以下是一个自定义接收器,它通过套接字接收文本流。它将文本流中以“\n”分隔的行视为记录并将其存储到 Spark 中。如果接收线程在连接或接收时出现任何错误,接收器将重新启动以再次尝试连接。
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
public class JavaCustomReceiver extends Receiver<String> {
String host = null;
int port = -1;
public JavaCustomReceiver(String host_ , int port_) {
super(StorageLevel.MEMORY_AND_DISK_2());
host = host_;
port = port_;
}
@Override
public void onStart() {
// Start the thread that receives data over a connection
new Thread(this::receive).start();
}
@Override
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private void receive() {
Socket socket = null;
String userInput = null;
try {
// connect to the server
socket = new Socket(host, port);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
// Until stopped or connection broken continue reading
while (!isStopped() && (userInput = reader.readLine()) != null) {
System.out.println("Received data '" + userInput + "'");
store(userInput);
}
reader.close();
socket.close();
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again");
} catch(ConnectException ce) {
// restart if could not connect to server
restart("Could not connect", ce);
} catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
}
在 Spark Streaming 应用程序中使用自定义接收器
通过使用 streamingContext.receiverStream(<instance of custom receiver>)
,可以在 Spark Streaming 应用程序中使用自定义接收器。这将使用自定义接收器实例接收到的数据创建一个输入 DStream,如下所示
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...
完整的源代码位于示例 CustomReceiver.scala 中。
// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...
完整的源代码位于示例 JavaCustomReceiver.java 中。
接收器可靠性
如Spark Streaming 编程指南中简要讨论的,根据其可靠性和容错语义,接收器分为两种。
- 可靠接收器 - 对于允许对发送数据进行确认的可靠源,*可靠接收器*会正确地向源确认数据已可靠地接收并存储在 Spark 中(即,成功复制)。通常,实现这种接收器需要仔细考虑源确认的语义。
- 不可靠接收器 - *不可靠接收器*不向源发送确认。这可用于不支持确认的源,或者即使是可靠源,当用户不想或不需要处理确认的复杂性时也可以使用。
要实现一个*可靠接收器*,您必须使用 store(multiple-records)
来存储数据。store
的这种形式是一个阻塞调用,只有在所有给定记录都存储在 Spark 中之后才会返回。如果接收器配置的存储级别使用复制(默认启用),则此调用将在复制完成后返回。因此,它确保数据可靠存储,并且接收器现在可以适当地确认源。这确保了当接收器在复制数据过程中失败时不会丢失数据——缓冲数据将不会被确认,因此稍后会由源重新发送。
*不可靠接收器*无需实现任何此类逻辑。它可以简单地从源接收记录,并使用 store(single-record)
一次插入一条。虽然它没有 store(multiple-records)
的可靠性保证,但它具有以下优点
- 系统负责将数据分块为适当大小的块(请参阅Spark Streaming 编程指南中的块间隔)。
- 如果已指定速率限制,系统会负责控制接收速率。
- 由于这两点,不可靠接收器比可靠接收器更易于实现。
下表总结了两种类型接收器的特性
接收器类型 | 特性 |
---|---|
不可靠接收器 | 易于实现。 系统负责块生成和速率控制。无容错保证,接收器失败时可能丢失数据。 |
可靠接收器 | 强大的容错保证,可确保零数据丢失。 块生成和速率控制由接收器实现处理。 实现复杂性取决于源的确认机制。 |