PySpark 中的日志记录#

引言#

pyspark.logger 模块为 PySpark 用户提供了结构化的客户端日志记录功能。

该模块包含一个 PySparkLogger 类,该类提供了多种方法,可以以结构化的 JSON 格式在不同级别记录消息

可以轻松配置该日志记录器,使其将日志写入控制台或指定文件。

自定义日志格式#

默认日志格式为 JSON,其中包含时间戳、日志级别、日志记录器名称以及日志消息和任何提供的附加上下文。

日志条目示例

{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/path/to/file.py",
    "line": "17",
    "fragment": "divide"
    "errorClass": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...",
    "stacktrace": [
      {
        "class": null,
        "method": "deco",
        "file": ".../spark/python/pyspark/errors/exceptions/captured.py",
        "line": "247"
      }
    ]
  },
}

设置#

要开始使用 PySpark 日志记录模块,需要从 pyspark.logger 导入 PySparkLogger

from pyspark.logger import PySparkLogger

用法#

创建日志记录器#

可以通过调用 PySparkLogger.getLogger() 创建日志记录器实例。默认情况下,它会创建一个名为“PySparkLogger”的日志记录器,其日志级别为 INFO。

logger = PySparkLogger.getLogger()

记录消息#

该日志记录器提供了三种主要的日志消息方法:PySparkLogger.info()PySparkLogger.warning()PySparkLogger.error()

  • PySparkLogger.info:使用此方法记录信息性消息。

    user = "test_user"
    action = "login"
    logger.info(f"User {user} performed {action}", user=user, action=action)
    
  • PySparkLogger.warning:使用此方法记录警告消息。

    user = "test_user"
    action = "access"
    logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action)
    
  • PySparkLogger.error:使用此方法记录错误消息。

    user = "test_user"
    action = "update_profile"
    logger.error("An error occurred for user {user} during {action}", user=user, action=action)
    

记录到控制台#

from pyspark.logger import PySparkLogger

# Create a logger that logs to console
logger = PySparkLogger.getLogger("ConsoleLogger")

user = "test_user"
action = "test_action"

logger.warning(f"User {user} takes an {action}", user=user, action=action)

这会以以下 JSON 格式记录信息

{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "ConsoleLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}

记录到文件#

要将消息记录到文件,请使用 PySparkLogger.addHandler() 将标准 Python 日志模块中的 FileHandler 添加到您的日志记录器。

这种方法与标准的 Python 日志记录实践一致。

from pyspark.logger import PySparkLogger
import logging

# Create a logger that logs to a file
file_logger = PySparkLogger.getLogger("FileLogger")
handler = logging.FileHandler("application.log")
file_logger.addHandler(handler)

user = "test_user"
action = "test_action"

file_logger.warning(f"User {user} takes an {action}", user=user, action=action)

日志消息将以相同的 JSON 格式保存到 application.log 中。