PySpark 中的日志记录#
引言#
pyspark.logger 模块为 PySpark 用户提供了结构化的客户端日志记录功能。
该模块包含一个 PySparkLogger
类,该类提供了多种方法,可以以结构化的 JSON 格式在不同级别记录消息
可以轻松配置该日志记录器,使其将日志写入控制台或指定文件。
自定义日志格式#
默认日志格式为 JSON,其中包含时间戳、日志级别、日志记录器名称以及日志消息和任何提供的附加上下文。
日志条目示例
{
"ts": "2024-06-28 19:53:48,563",
"level": "ERROR",
"logger": "DataFrameQueryContextLogger",
"msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n",
"context": {
"file": "/path/to/file.py",
"line": "17",
"fragment": "divide"
"errorClass": "DIVIDE_BY_ZERO"
},
"exception": {
"class": "Py4JJavaError",
"msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...",
"stacktrace": [
{
"class": null,
"method": "deco",
"file": ".../spark/python/pyspark/errors/exceptions/captured.py",
"line": "247"
}
]
},
}
设置#
要开始使用 PySpark 日志记录模块,需要从 pyspark.logger 导入 PySparkLogger
。
from pyspark.logger import PySparkLogger
用法#
创建日志记录器#
可以通过调用 PySparkLogger.getLogger()
创建日志记录器实例。默认情况下,它会创建一个名为“PySparkLogger”的日志记录器,其日志级别为 INFO。
logger = PySparkLogger.getLogger()
记录消息#
该日志记录器提供了三种主要的日志消息方法:PySparkLogger.info()
、PySparkLogger.warning()
和 PySparkLogger.error()
。
PySparkLogger.info:使用此方法记录信息性消息。
user = "test_user" action = "login" logger.info(f"User {user} performed {action}", user=user, action=action)
PySparkLogger.warning:使用此方法记录警告消息。
user = "test_user" action = "access" logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action)
PySparkLogger.error:使用此方法记录错误消息。
user = "test_user" action = "update_profile" logger.error("An error occurred for user {user} during {action}", user=user, action=action)
记录到控制台#
from pyspark.logger import PySparkLogger
# Create a logger that logs to console
logger = PySparkLogger.getLogger("ConsoleLogger")
user = "test_user"
action = "test_action"
logger.warning(f"User {user} takes an {action}", user=user, action=action)
这会以以下 JSON 格式记录信息
{
"ts": "2024-06-28 19:44:19,030",
"level": "WARNING",
"logger": "ConsoleLogger",
"msg": "User test_user takes an test_action",
"context": {
"user": "test_user",
"action": "test_action"
},
}
记录到文件#
要将消息记录到文件,请使用 PySparkLogger.addHandler()
将标准 Python 日志模块中的 FileHandler 添加到您的日志记录器。
这种方法与标准的 Python 日志记录实践一致。
from pyspark.logger import PySparkLogger
import logging
# Create a logger that logs to a file
file_logger = PySparkLogger.getLogger("FileLogger")
handler = logging.FileHandler("application.log")
file_logger.addHandler(handler)
user = "test_user"
action = "test_action"
file_logger.warning(f"User {user} takes an {action}", user=user, action=action)
日志消息将以相同的 JSON 格式保存到 application.log 中。