Spark Streaming 自定义接收器

Spark Streaming 可以从任何任意数据源接收流式数据,而不仅仅是它内置支持的那些(即,Kafka、Kinesis、文件、sockets 等)。这需要开发人员实现一个 *receiver*,该 *receiver* 针对从相关数据源接收数据进行自定义。本指南将引导您完成实现自定义接收器并在 Spark Streaming 应用程序中使用它的过程。请注意,自定义接收器可以使用 Scala 或 Java 实现。

实现自定义接收器

这从实现一个 Receiver 开始 (Scala 文档, Java 文档)。自定义接收器必须通过实现两个方法来扩展这个抽象类

onStart()onStop() 都不应无限期地阻塞。通常,onStart() 会启动负责接收数据的线程,而 onStop() 会确保停止这些接收数据的线程。接收线程还可以使用 isStopped(),这是一个 Receiver 方法,以检查它们是否应该停止接收数据。

一旦接收到数据,就可以通过调用 store(data) 将数据存储在 Spark 内部,这是 Receiver 类提供的方法。 有许多 store() 的变体,允许您一次存储一个接收到的数据记录,或者作为对象/序列化字节的整个集合存储。 请注意,用于实现接收器的 store() 的变体会影响其可靠性和容错语义。 稍后将在后面更详细地讨论这一点。

接收线程中的任何异常都应被捕获并正确处理,以避免接收器静默失败。restart(<exception>) 将通过异步调用 onStop() 并延迟后调用 onStart() 来重新启动接收器。stop(<exception>) 将调用 onStop() 并终止接收器。 此外,reportError(<error>) 会向驱动程序报告错误消息(在日志和 UI 中可见),而不会停止/重新启动接收器。

以下是一个自定义接收器,它通过套接字接收文本流。它将文本流中以“\n”分隔的行视为记录,并将它们与 Spark 一起存储。如果接收线程在连接或接收时发生任何错误,则重新启动接收器以再次尝试连接。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  @Override
  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread(this::receive).start();
  }

  @Override
  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data '" + userInput + "'");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

在 Spark Streaming 应用程序中使用自定义接收器

可以通过使用 streamingContext.receiverStream(<instance of custom receiver>) 在 Spark Streaming 应用程序中使用自定义接收器。 这将创建一个使用自定义接收器实例接收的数据的输入 DStream,如下所示

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

完整的源代码位于示例 CustomReceiver.scala 中。

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...

完整的源代码位于示例 JavaCustomReceiver.java 中。

接收器可靠性

正如在 Spark Streaming 编程指南 中简要讨论的那样,根据其可靠性和容错语义,有两种类型的接收器。

  1. 可靠的接收器 - 对于允许确认已发送数据的可靠的来源可靠的接收器会正确地向来源确认数据已被接收并可靠地存储在 Spark 中(即,已成功复制)。通常,实现此接收器需要仔细考虑源确认的语义。
  2. 不可靠的接收器 - 不可靠的接收器向源发送确认。这可以用于不支持确认的源,或者甚至用于可靠的源,当人们不想或不需要涉及确认的复杂性时。

要实现一个可靠的接收器,您必须使用 store(multiple-records) 来存储数据。 这种 store 的变体是一个阻塞调用,只有在给定的所有记录都已存储在 Spark 内部后才会返回。 如果接收器的配置存储级别使用复制(默认情况下启用),则此调用将在复制完成后返回。 因此,它可以确保数据被可靠地存储,并且接收器现在可以适当地确认源。 这可以确保当接收器在复制数据期间发生故障时不会丢失数据 - 缓冲的数据将不会被确认,因此稍后将被源重新发送。

一个不可靠的接收器不必实现任何这种逻辑。 它可以简单地从源接收记录并使用 store(single-record) 一次插入一个记录。 虽然它没有获得 store(multiple-records) 的可靠性保证,但它具有以下优点

下表总结了两种类型接收器的特征

接收器类型 特点
不可靠的接收器 易于实现。
系统负责块生成和速率控制。 没有容错保证,可能会在接收器故障时丢失数据。
可靠的接收器 强大的容错保证,可以确保零数据丢失。
块生成和速率控制由接收器实现处理。
实现复杂性取决于源的确认机制。