Spark Streaming 自定义接收器
除了内置支持的数据源(即 Kafka、Kinesis、文件、套接字等)之外,Spark Streaming 还可以从任何任意数据源接收流数据。这要求开发人员实现一个*接收器*,该接收器可自定义以从相关数据源接收数据。本指南将逐步介绍如何实现自定义接收器并在 Spark Streaming 应用程序中使用它。请注意,自定义接收器可以用 Scala 或 Java 实现。
实现自定义接收器
首先要实现一个 Receiver(Scala 文档,Java 文档)。自定义接收器必须通过实现以下两种方法来扩展此抽象类
onStart()
:开始接收数据时要做的事情。
onStop()
:停止接收数据时要做的事情。
onStart()
和 onStop()
都不能无限期阻塞。通常,onStart()
会启动负责接收数据的线程,而 onStop()
会确保停止这些接收数据的线程。接收线程还可以使用 isStopped()
(一种 Receiver
方法)来检查它们是否应该停止接收数据。
收到数据后,可以通过调用 store(data)
(Receiver 类提供的一种方法)将数据存储在 Spark 中。 store()
有多种形式,允许一次存储一条接收到的数据记录,或存储整个对象/序列化字节集合。请注意,用于实现接收器的 store()
的形式会影响其可靠性和容错语义。这将在后面详细讨论。
应捕获并正确处理接收线程中的任何异常,以避免接收器静默失败。 restart(<exception>)
将通过异步调用 onStop()
然后在延迟后调用 onStart()
来重启接收器。 stop(<exception>)
将调用 onStop()
并终止接收器。此外,reportError(<error>)
会向驱动程序报告错误消息(在日志和 UI 中可见),而不会停止/重启接收器。
以下是一个自定义接收器,它通过套接字接收文本流。它将文本流中以“\n”分隔的行视为记录,并将它们存储在 Spark 中。如果接收线程在连接或接收时出现任何错误,则会重启接收器以再次尝试连接。
在 Spark Streaming 应用程序中使用自定义接收器
可以通过使用 streamingContext.receiverStream(<自定义接收器实例>)
在 Spark Streaming 应用程序中使用自定义接收器。这将使用自定义接收器实例接收的数据创建一个输入 DStream,如下所示
完整的源代码位于示例 CustomReceiver.scala 中。
完整的源代码位于示例 JavaCustomReceiver.java 中。
接收器可靠性
正如在Spark Streaming 编程指南中简要讨论的那样,根据接收器的可靠性和容错语义,接收器可以分为两种。
- 可靠接收器 - 对于允许确认已发送数据的*可靠源*,*可靠接收器* 会正确地向源确认已接收到数据并已可靠地存储在 Spark 中(即已成功复制)。通常,实现此接收器需要仔细考虑源确认的语义。
- 不可靠接收器 - *不可靠接收器* 不会向源发送确认。这可以用于不支持确认的源,甚至可以用于可靠源,当您不想或不需要深入了解确认的复杂性时。
要实现*可靠接收器*,您必须使用 store(多条记录)
来存储数据。这种形式的 store
是一个阻塞调用,只有在所有给定的记录都存储在 Spark 中后才会返回。如果接收器配置的存储级别使用复制(默认启用),则此调用将在复制完成后返回。因此,它可以确保数据可靠存储,并且接收器现在可以适当地确认源。这可以确保当接收器在复制数据过程中失败时不会丢失任何数据 - 缓冲的数据不会被确认,因此稍后将由源重新发送。
不可靠接收器 不必实现任何此类逻辑。它可以简单地从源接收记录,并使用 store(单条记录)
一次插入一条记录。虽然它没有 store(多条记录)
的可靠性保证,但它具有以下优点
下表总结了两种类型接收器的特性
接收器类型 |
特性 |
不可靠接收器 |
易于实现。 系统负责块生成和速率控制。没有容错保证,在接收器故障时可能会丢失数据。 |
可靠接收器 |
强大的容错保证,可以确保零数据丢失。 块生成和速率控制由接收器实现处理。 实现的复杂性取决于源的确认机制。 |
|
|