- Logstash 输入阶段
- Logstash - 收集日志
- Logstash - 支持的输入
- Logstash 解析和转换
- Logstash - 解析日志
- Logstash - 过滤器
- Logstash - 转换日志
- Logstash 输出阶段
- Logstash - 输出阶段
- Logstash - 支持的输出
- Logstash 高级主题
- Logstash - 插件
- Logstash - 监控 API
- Logstash - 安全性和监控
- Logstash 有用资源
- Logstash 快速指南
- Logstash - 有用资源
- Logstash - 讨论
Logstash 快速指南
Logstash - 简介
Logstash 是一个基于过滤器/管道模式的工具,用于收集、处理和生成日志或事件。它有助于集中和实时分析来自不同来源的日志和事件。
Logstash 使用 JRuby 编程语言编写,运行在 JVM 上,因此您可以在不同的平台上运行 Logstash。它从几乎所有类型的来源收集不同类型的数据,例如日志、数据包、事件、事务、时间戳数据等。数据源可以是社交数据、电子商务、新闻文章、CRM、游戏数据、网络趋势、金融数据、物联网、移动设备等。
Logstash 通用功能
Logstash 的通用功能如下:
Logstash 可以从不同的来源收集数据并发送到多个目标。
Logstash 可以处理所有类型的日志数据,例如 Apache 日志、Windows 事件日志、网络协议上的数据、标准输入的数据等等。
Logstash 还可以处理 http 请求和响应数据。
Logstash 提供各种过滤器,帮助用户通过解析和转换数据来找到更多有意义的信息。
Logstash 也可以用于处理物联网中的传感器数据。
Logstash 是开源的,并根据 Apache 许可证 2.0 版提供。
Logstash 关键概念
Logstash 的关键概念如下:
事件对象
它是 Logstash 中的主要对象,它封装了 Logstash 管道中的数据流。Logstash 使用此对象来存储输入数据并在过滤器阶段添加创建的额外字段。
Logstash 为开发人员提供了事件 API 来操作事件。在本教程中,此事件以各种名称引用,例如日志数据事件、日志事件、日志数据、输入日志数据、输出日志数据等。
管道
它包含 Logstash 中从输入到输出的数据流阶段。输入数据进入管道,并以事件的形式进行处理。然后以用户或最终系统期望的格式发送到输出目标。
输入
这是 Logstash 管道的第一个阶段,用于获取 Logstash 中的数据以进行进一步处理。Logstash 提供各种插件来从不同的平台获取数据。一些最常用的插件是 - File、Syslog、Redis 和 Beats。
过滤器
这是 Logstash 的中间阶段,事件的实际处理发生在此阶段。开发人员可以使用 Logstash 预定义的正则表达式模式来创建序列,以区分事件中的字段和可接受的输入事件的条件。
Logstash 提供各种插件来帮助开发人员将事件解析和转换为所需结构。一些最常用的过滤器插件是 - Grok、Mutate、Drop、Clone 和 Geoip。
输出
这是 Logstash 管道的最后一个阶段,输出事件可以格式化为目标系统所需的结构。最后,它使用插件将完全处理后的输出事件发送到目标。一些最常用的插件是 - Elasticsearch、File、Graphite、Statsd 等。
Logstash 优点
以下几点说明了 Logstash 的各种优点。
Logstash 提供正则表达式模式序列来识别和解析任何输入事件中的各种字段。
Logstash 支持各种 Web 服务器和数据源来提取日志数据。
Logstash 提供多个插件来将日志数据解析和转换为任何用户期望的格式。
Logstash 是集中的,这使得它易于处理和收集来自不同服务器的数据。
Logstash 支持许多数据库、网络协议和其他服务作为日志事件的目标源。
Logstash 使用 HTTP 协议,使用户能够升级 Elasticsearch 版本而无需同步升级 Logstash。
Logstash 缺点
以下几点说明了 Logstash 的各种缺点。
Logstash 使用 http,这会对日志数据的处理产生负面影响。
使用 Logstash 有时可能有点复杂,因为它需要对输入日志数据有很好的理解和分析。
过滤器插件不是通用的,因此用户可能需要找到正确的模式序列以避免解析错误。
在下一章中,我们将了解 ELK 堆栈是什么以及它如何帮助 Logstash。
Logstash - ELK 堆栈
ELK 代表 **Elasticsearch、Logstash** 和 **Kibana**。在 ELK 堆栈中,Logstash 从不同的输入源提取日志数据或其他事件。它处理事件,然后将其存储在 Elasticsearch 中。Kibana 是一个 Web 接口,它访问 Elasticsearch 中的日志数据并将其可视化。
Logstash 和 Elasticsearch
Logstash 提供输入和输出 Elasticsearch 插件,用于读取和写入日志事件到 Elasticsearch。Elasticsearch 作为输出目标也由 Elasticsearch 公司推荐,因为它与 Kibana 兼容。Logstash 通过 http 协议将数据发送到 Elasticsearch。
Elasticsearch 提供批量上传功能,这有助于将数据从不同的来源或 Logstash 实例上传到集中的 Elasticsearch 引擎。ELK 比其他 DevOps 解决方案具有以下优点:
ELK 堆栈更容易管理,并且可以扩展以处理 PB 级别的事件。
ELK 堆栈架构非常灵活,并且它提供了与 Hadoop 的集成。Hadoop 主要用于存档目的。Logstash 可以使用 Flume 直接连接到 Hadoop,Elasticsearch 提供了一个名为 **es-hadoop** 的连接器来连接 Hadoop。
ELK 的总拥有成本远低于其替代方案。
Logstash 和 Kibana
Kibana 不直接与 Logstash 交互,而是通过数据源(在 ELK 堆栈中是 Elasticsearch)。Logstash 从每个来源收集数据,Elasticsearch 以非常快的速度分析它,然后 Kibana 提供对该数据的可操作见解。
Kibana 是一个基于 Web 的可视化工具,它帮助开发人员和其他人员分析 Logstash 在 Elasticsearch 引擎中收集的大量事件的变化。这种可视化使得预测或查看输入源中错误或其他重要事件趋势的变化变得容易。
Logstash - 安装
要在系统上安装 Logstash,我们应该按照以下步骤操作:
**步骤 1** - 检查计算机中安装的 Java 版本;它应该是 Java 8,因为它与 Java 9 不兼容。您可以通过以下方式检查:
在 Windows 操作系统 (OS) 中(使用命令提示符):
> java -version
在 UNIX OS 中(使用终端):
$ echo $JAVA_HOME
**步骤 2** - 从以下地址下载 Logstash:
https://elastic.ac.cn/downloads/logstash.
对于 Windows OS,下载 ZIP 文件。
对于 UNIX OS,下载 TAR 文件。
对于 Debian OS,下载 DEB 文件。
对于 Red Hat 和其他 Linux 发行版,下载 RPN 文件。
APT 和 Yum 实用程序也可以用于在许多 Linux 发行版中安装 Logstash。
**步骤 3** - Logstash 的安装过程非常简单。让我们看看如何在不同的平台上安装 Logstash。
**注意** - 不要在安装文件夹中放置任何空格或冒号。
**Windows OS** - 解压 zip 包,Logstash 就安装好了。
**UNIX OS** - 将 tar 文件解压到任何位置,Logstash 就安装好了。
$tar –xvf logstash-5.0.2.tar.gz
对于 Linux OS 使用 APT 实用程序:
- 下载并安装公共签名密钥:
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
- 保存存储库定义:
$ echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo tee -a /etc/apt/sources.list.d/elastic-5.x.list
- 运行更新:
$ sudo apt-get update
- 现在您可以使用以下命令进行安装:
$ sudo apt-get install logstash
**对于 Debian Linux OS 使用 YUM 实用程序** -
- 下载并安装公共签名密钥:
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
在“/etc/yum.repos.d/”目录中带有 .repo 后缀的文件中添加以下文本。例如,**logstash.repo**
[logstash-5.x] name = Elastic repository for 5.x packages baseurl = https://artifacts.elastic.co/packages/5.x/yum gpgcheck = 1 gpgkey = https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled = 1 autorefresh = 1 type = rpm-md
- 您现在可以使用以下命令安装 Logstash:
$ sudo yum install logstash
**步骤 4** - 转到 Logstash 主目录。在 bin 文件夹中,在 Windows 的情况下运行 **elasticsearch.bat** 文件,或者您可以使用命令提示符和终端执行相同的操作。在 UNIX 中,运行 Logstash 文件。
我们需要指定输入源、输出源和可选过滤器。为了验证安装,您可以使用标准输入流 (stdin) 作为输入源和标准输出流 (stdout) 作为输出源来运行它,使用基本配置。您也可以使用 **–e** 选项在命令行中指定配置。
在 Windows 中:
> cd logstash-5.0.1/bin > Logstash -e 'input { stdin { } } output { stdout {} }'
在 Linux 中:
$ cd logstash-5.0.1/bin $ ./logstash -e 'input { stdin { } } output { stdout {} }'
**注意** - 在 Windows 的情况下,您可能会收到一个错误,指出未设置 JAVA_HOME。为此,请在环境变量中将其设置为“C:\Program Files\Java\jre1.8.0_111”或您安装 java 的位置。
**步骤 5** - Logstash Web 界面的默认端口为 9600 到 9700,在 **logstash-5.0.1\config\logstash.yml** 中定义为 **http.port**,它将选择给定范围内的第一个可用端口。
我们可以通过浏览 **https://127.0.0.1:9600** 来检查 Logstash 服务器是否已启动并正在运行,或者如果端口不同,则请检查命令提示符或终端。我们可以看到分配的端口为“成功启动 Logstash API 端点 {:port ⇒ 9600}”。它将返回一个 JSON 对象,其中包含以下关于已安装 Logstash 的信息:
{ "host":"manu-PC", "version":"5.0.1", "http_address":"127.0.0.1:9600", "build_date":"2016-11-11T22:28:04+00:00", "build_sha":"2d8d6263dd09417793f2a0c6d5ee702063b5fada", "build_snapshot":false }
Logstash - 内部架构
在本章中,我们将讨论 Logstash 的内部架构和不同组件。
Logstash 服务架构
Logstash 处理来自不同服务器和数据源的日志,它充当发件人。发件人用于收集日志,它们安装在每个输入源中。像 **Redis、Kafka** 或 **RabbitMQ** 这样的代理是缓冲区,用于保存数据供索引器使用,可能存在多个代理作为故障转移实例。
像 **Lucene** 这样的索引器用于索引日志以获得更好的搜索性能,然后将输出存储在 Elasticsearch 或其他输出目标中。输出存储中的数据可用于 Kibana 和其他可视化软件。
Logstash 内部架构
Logstash 管道包含三个组件:**输入、过滤器** 和 **输出**。输入部分负责指定和访问输入数据源,例如 **Apache Tomcat 服务器** 的日志文件夹。
解释 Logstash 管道的示例
Logstash 配置文件包含有关 Logstash 三个组件的详细信息。在这种情况下,我们创建一个名为 **Logstash.conf** 的文件。
以下配置捕获来自输入日志“inlog.log”的数据,并将其写入输出日志“outlog.log”,而无需任何过滤器。
Logstash.conf
Logstash 配置文件仅使用输入插件从 **inlog.log** 文件复制数据,并使用输出插件将日志数据刷新到 **outlog.log** 文件。
input { file { path => "C:/tpwork/logstash/bin/log/inlog.log" } } output { file { path => "C:/tpwork/logstash/bin/log/outlog.log" } }
运行 Logstash
Logstash 使用 **–f** 选项来指定配置文件。
C:\logstash\bin> logstash –f logstash.conf
inlog.log
以下代码块显示输入日志数据。
Hello tutorialspoint.com
outlog.log
Logstash 的输出包含 message 字段中的输入数据。Logstash还会向输出添加其他字段,例如时间戳、输入源路径、版本、主机和标签。
{ "path":"C:/tpwork/logstash/bin/log/inlog1.log", "@timestamp":"2016-12-13T02:28:38.763Z", "@version":"1", "host":"Dell-PC", "message":" Hello tutorialspoint.com", "tags":[] }
如您所见,Logstash 的输出包含比输入日志提供的数据更多内容。输出包含源路径、时间戳、版本、主机名和标签,这些用于表示额外的消息,例如错误。
我们可以使用过滤器来处理数据,使其符合我们的需求。在接下来的示例中,我们使用过滤器来获取数据,该过滤器将输出限制为仅包含类似 GET 或 POST 的动词,后跟一个**唯一资源标识符**的数据。
Logstash.conf
在此 Logstash 配置中,我们添加了一个名为**grok**的过滤器来过滤输入数据。与模式序列输入日志匹配的输入日志事件,只有在出现错误的情况下才会到达输出目标。Logstash 在不匹配 grok 过滤器模式序列的输出事件中添加名为“_grokparsefailure”的标签。
Logstash 提供了许多内置正则表达式模式,用于解析流行的服务器日志,例如 Apache。此处使用的模式需要一个类似 get、post 等的动词,后跟一个统一资源标识符。
input { file { path => "C:/tpwork/logstash/bin/log/inlog2.log" } } filter { grok { match => {"message" => "%{WORD:verb} %{URIPATHPARAM:uri}"} } } output { file { path => "C:/tpwork/logstash/bin/log/outlog2.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
C:\logstash\bin> logstash –f Logstash.conf
inlog2.log
我们的输入文件包含两个由默认分隔符(即换行符)分隔的事件。第一个事件与 GROk 中指定的模式匹配,第二个事件不匹配。
GET /tutorialspoint/Logstash Input 1234
outlog2.log
我们可以看到,第二个输出事件包含“_grokparsefailure”标签,因为它与 grok 过滤器模式不匹配。用户还可以使用输出插件中的**“if”**条件来删除输出中这些不匹配的事件。
{ "path":"C:/tpwork/logstash/bin/log/inlog2.log", "@timestamp":"2016-12-13T02:47:10.352Z","@version":"1","host":"Dell-PC","verb":"GET", "message":"GET /tutorialspoint/logstash", "uri":"/tutorialspoint/logstash", "tags":[] } { "path":"C:/tpwork/logstash/bin/log/inlog2.log", "@timestamp":"2016-12-13T02:48:12.418Z", "@version":"1", "host":"Dell-PC", "message":"t 1234\r", "tags":["_grokparsefailure"] }
Logstash - 收集日志
使用 shipper 收集来自不同服务器或数据源的日志。shipper 是安装在服务器上的 Logstash 实例,它访问服务器日志并将日志发送到特定的输出位置。
它主要将输出发送到 Elasticsearch 进行存储。Logstash 从以下来源获取输入:
- STDIN
- Syslog
- 文件
- TCP/UDP
- Microsoft Windows 事件日志
- Websocket
- Zeromq
- 自定义扩展
使用 Apache Tomcat 7 服务器收集日志
在此示例中,我们使用文件输入插件收集安装在 Windows 上的 Apache Tomcat 7 服务器的日志,并将它们发送到另一个日志。
logstash.conf
在这里,Logstash 被配置为访问本地安装的 Apache Tomcat 7 的访问日志。文件插件的路径设置中使用正则表达式模式来从日志文件中获取数据。这包含其名称中的“access”,并且它添加了 apache 类型,这有助于区分集中式目标源中的 apache 事件和其他事件。最后,输出事件将显示在 output.log 中。
input { file { path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*" type => "apache" } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
C:\logstash\bin> logstash –f Logstash.conf
Apache Tomcat 日志
访问 Apache Tomcat 服务器及其 Web 应用程序(**https://127.0.0.1:8080**)以生成日志。Logstash 实时读取日志中的更新数据,并将其存储在配置文件中指定的 output.log 中。
Apache Tomcat 根据日期生成新的访问日志文件,并将访问事件记录到该文件中。在我们的例子中,它是在 Apache Tomcat 的 **logs** 目录中的 localhost_access_log.2016-12-24.txt。
0:0:0:0:0:0:0:1 - - [ 25/Dec/2016:18:37:00 +0800] "GET / HTTP/1.1" 200 11418 0:0:0:0:0:0:0:1 - munish [ 25/Dec/2016:18:37:02 +0800] "GET /manager/html HTTP/1.1" 200 17472 0:0:0:0:0:0:0:1 - - [ 25/Dec/2016:18:37:08 +0800] "GET /docs/ HTTP/1.1" 200 19373 0:0:0:0:0:0:0:1 - - [ 25/Dec/2016:18:37:10 +0800] "GET /docs/introduction.html HTTP/1.1" 200 15399
output.log
您可以在输出事件中看到,添加了一个 type 字段,并且事件存在于 message 字段中。
{ "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt", "@timestamp":"2016-12-25T10:37:00.363Z","@version":"1","host":"Dell-PC", "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] \"GET / HTTP/1.1\" 200 11418\r","type":"apache","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z", "@version":"1","host":"Dell-PC", "message":"0:0:0:0:0:0:0:1 - munish [25/Dec/2016:18:37:02 +0800] \"GET / manager/html HTTP/1.1\" 200 17472\r","type":"apache","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z", "@version":"1","host":"Dell-PC", "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:08 +0800] \"GET /docs/ HTTP/1.1\" 200 19373\r","type":"apache","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:20.436Z", "@version":"1","host":"Dell-PC", "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:10 +0800] \"GET /docs/ introduction.html HTTP/1.1\" 200 15399\r","type":"apache","tags":[] }
使用 STDIN 插件收集日志
在本节中,我们将讨论另一个使用**STDIN 插件**收集日志的示例。
logstash.conf
这是一个非常简单的示例,其中 Logstash 读取用户在标准输入中输入的事件。在我们的例子中,它是命令提示符,它将事件存储在 output.log 文件中。
input { stdin{} } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
C:\logstash\bin> logstash –f Logstash.conf
在命令提示符中写入以下文本:
用户输入了以下两行。Logstash 通过分隔符设置分隔事件,其默认值为“\n”。用户可以通过更改文件插件中分隔符的值来更改它。
Tutorialspoint.com welcomes you Simply easy learning
output.log
以下代码块显示输出日志数据。
{ "@timestamp":"2016-12-25T11:41:16.518Z","@version":"1","host":"Dell-PC", "message":"tutrialspoint.com welcomes you\r","tags":[] } { "@timestamp":"2016-12-25T11:41:53.396Z","@version":"1","host":"Dell-PC", "message":"simply easy learning\r","tags":[] }
Logstash - 支持的输入
Logstash 支持来自不同来源的大量日志。它与下面解释的著名来源一起工作。
从指标收集日志
系统事件和其他时间活动记录在指标中。Logstash 可以访问系统指标中的日志并使用过滤器对其进行处理。这有助于以自定义方式向用户显示事件的实时馈送。指标根据指标过滤器的**flush_interval 设置**刷新,默认设置为 5 秒。
我们通过收集和分析通过 Logstash 运行的事件并在命令提示符上显示实时馈送来跟踪 Logstash 生成的测试指标。
logstash.conf
此配置包含一个由 Logstash 提供的生成器插件,用于测试指标,并将 type 设置设置为“generated”以进行解析。在过滤阶段,我们仅使用“if”语句处理具有 generated 类型的行。然后,metrics 插件计算 meter 设置中指定的字段。metrics 插件每隔 5 秒(在**flush_interval**中指定)刷新一次计数。
最后,使用**codec 插件**进行格式化,将过滤器事件输出到标准输出(如命令提示符)。Codec 插件使用[events][rate_1m]值在 1 分钟滑动窗口中输出每秒事件。
input { generator { type => "generated" } } filter { if [type] == "generated" { metrics { meter => "events" add_tag => "metric" } } } output { # only emit events with the 'metric' tag if "metric" in [tags] { stdout { codec => line { format => "rate: %{[events][rate_1m]}" } } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logsaths –f logstash.conf
stdout(命令提示符)
rate: 1308.4 rate: 1308.4 rate: 1368.654529135342 rate: 1416.4796003951449 rate: 1464.974293984808 rate: 1523.3119444107458 rate: 1564.1602979542715 rate: 1610.6496496890895 rate: 1645.2184750334154 rate: 1688.7768007612485 rate: 1714.652283095914 rate: 1752.5150680019278 rate: 1785.9432934744932 rate: 1806.912181962126 rate: 1836.0070454626025 rate: 1849.5669494173826 rate: 1871.3814756851832 rate: 1883.3443123790712 rate: 1906.4879113216743 rate: 1925.9420717997118 rate: 1934.166137658981 rate: 1954.3176526556897 rate: 1957.0107444542625
从 Web 服务器收集日志
Web 服务器会生成大量关于用户访问和错误的日志。Logstash 帮助使用输入插件从不同的服务器提取日志,并将它们存储在集中式位置。
我们正在从本地 Apache Tomcat 服务器的**stderr 日志**提取数据,并将其存储在 output.log 中。
logstash.conf
此 Logstash 配置文件指示 Logstash 读取 apache 错误日志并添加名为“apache-error”的标签。我们可以简单地使用文件输出插件将其发送到 output.log。
input { file { path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0 /logs/*stderr*" type => "apache-error" } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>Logstash –f Logstash.conf
输入日志示例
这是 Apache Tomcat 中发生服务器事件时生成的示例**stderr 日志**。
C:\Program Files\Apache Software Foundation\Tomcat 7.0\logs\ tomcat7-stderr.2016-12-25.log
Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start INFO: Starting ProtocolHandler ["http-bio-9999"] Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start INFO: Starting ProtocolHandler ["ajp-bio-8009"] Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start INFO: Server startup in 823 ms
output.log
{ "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z", "@version":"1","host":"Dell-PC", "message":"Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start\r", "type":"apache-error","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z", "@version":"1","host":"Dell-PC", "message":"INFO: Starting ProtocolHandler [ \"ajp-bio-8009\"]\r","type":"apache-error","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z", "@version":"1","host":"Dell-PC", "message":"Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start\r", "type":"apache-error","tags":[] } { "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/ tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z", "@version":"1","host":"Dell-PC", "message":"INFO: Server startup in 823 ms\r","type":"apache-error","tags":[] }
从数据源收集日志
首先,让我们了解如何配置 MySQL 进行日志记录。在 MySQL 数据库服务器的 [mysqld] 下的**my.ini 文件**中添加以下几行。
在 Windows 中,它位于 MySQL 的安装目录内,位于:
C:\wamp\bin\mysql\mysql5.7.11
在 UNIX 中,您可以在 – /etc/mysql/my.cnf 中找到它
general_log_file = "C:/wamp/logs/queries.log" general_log = 1
logstash.conf
在此配置文件中,文件插件用于读取 MySQL 日志并将其写入 ouput.log。
input { file { path => "C:/wamp/logs/queries.log" } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
queries.log
这是由在 MySQL 数据库中执行的查询生成的日志。
2016-12-25T13:05:36.854619Z 2 Query select * from test1_users 2016-12-25T13:05:51.822475Z 2 Query select count(*) from users 2016-12-25T13:05:59.998942Z 2 Query select count(*) from test1_users
output.log
{ "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:37.905Z", "@version":"1","host":"Dell-PC", "message":"2016-12-25T13:05:36.854619Z 2 Query\tselect * from test1_users", "tags":[] } { "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:51.938Z", "@version":"1","host":"Dell-PC", "message":"2016-12-25T13:05:51.822475Z 2 Query\tselect count(*) from users", "tags":[] } { "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:06:00.950Z", "@version":"1","host":"Dell-PC", "message":"2016-12-25T13:05:59.998942Z 2 Query\tselect count(*) from test1_users", "tags":[] }
Logstash - 解析日志
Logstash 使用输入插件接收日志,然后使用过滤器插件来解析和转换数据。日志的解析和转换根据输出目标中存在的系统执行。Logstash 解析日志数据,仅转发所需的字段。之后,这些字段将转换为目标系统兼容且易于理解的形式。
如何解析日志?
日志的解析是通过使用**GROK**(图形化知识表示)模式来执行的,您可以在 Github 上找到它们:
https://github.com/elastic/logstash/tree/v1.4.2/patterns.
Logstash 将日志数据与指定的 GROK 模式或模式序列匹配,以解析日志,例如“%{COMBINEDAPACHELOG}”,这通常用于 apache 日志。
解析后的数据结构更清晰,更易于搜索和执行查询。Logstash 在输入日志中搜索指定的 GROK 模式,并提取日志中的匹配行。您可以使用 GROK 调试器来测试您的 GROK 模式。
GROK 模式的语法是 %{SYNTAX:SEMANTIC}。Logstash GROK 过滤器采用以下形式:
%{PATTERN:FieldName}
这里,PATTERN 代表 GROK 模式,fieldname 是字段的名称,它表示输出中解析的数据。
例如,使用在线 GROK 调试器 https://grokdebugger.com/
输入
日志中出现错误行的示例:
[Wed Dec 07 21:54:54.048805 2016] [:error] [pid 1234:tid 3456829102] [client 192.168.1.1:25007] JSP Notice: Undefined index: abc in /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11
GROK 模式序列
此 GROK 模式序列与包含时间戳、日志级别、进程 ID、事务 ID 和错误消息的日志事件匹配。
\[(%{DAY:day} %{MONTH:month} %{MONTHDAY} %{TIME} %{YEAR})\] \[.*:%{LOGLEVEL:loglevel}\] \[pid %{NUMBER:pid}:tid %{NUMBER:tid}\] \[client %{IP:clientip}:.*\] %{GREEDYDATA:errormsg}
输出
输出采用 JSON 格式。
{ "day": [ "Wed" ], "month": [ "Dec" ], "loglevel": [ "error" ], "pid": [ "1234" ], "tid": [ "3456829102" ], "clientip": [ "192.168.1.1" ], "errormsg": [ "JSP Notice: Undefined index: abc in /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11" ] }
Logstash - 过滤器
Logstash 在输入和输出之间的管道中间使用过滤器。Logstash 的过滤器测量操作和创建事件,例如**Apache-Access**。许多过滤器插件用于管理 Logstash 中的事件。这里,在**Logstash 聚合过滤器**的示例中,我们正在过滤数据库中每个 SQL 事务的持续时间并计算总时间。
安装聚合过滤器插件
使用 Logstash-plugin 实用程序安装聚合过滤器插件。对于 Windows,Logstash-plugin 是 Logstash **bin 文件夹**中的批处理文件。
>logstash-plugin install logstash-filter-aggregate
logstash.conf
在此配置中,您可以看到三个“if”语句,用于**初始化、递增**和**生成**事务的总持续时间,即**sql_duration**。聚合插件用于添加存在于输入日志的每个事件中的 sql_duration。
input { file { path => "C:/tpwork/logstash/bin/log/input.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logstash.conf
input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END - end
output.log
如配置文件中指定的,最后一个“if”语句中记录器为 – TRANSACTION_END,它打印总事务时间或 sql_duration。这在 output.log 中以黄色突出显示。
{ "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z", "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC", "message":"8566 - TRANSACTION_START - start\r","tags":[] } { "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL", "@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[] } { "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL", "@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[] } { "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END", "@version":"1","host":"wcnlab-PC","label":"end", "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[] }
Logstash - 转换日志
Logstash 提供各种插件来转换解析的日志。这些插件可以**添加、删除**和**更新**日志中的字段,以便更好地理解和在输出系统中进行查询。
我们使用**Mutate 插件**在输入日志的每一行中添加一个字段名称 user。
安装 Mutate 过滤器插件
要安装 mutate 过滤器插件;我们可以使用以下命令。
>Logstash-plugin install Logstash-filter-mutate
logstash.conf
在此配置文件中,Mutate 插件在 Aggregate 插件之后添加,以添加新字段。
input { file { path => "C:/tpwork/logstash/bin/log/input.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } mutate { add_field => {"user" => "tutorialspoint.com"} } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logstash.conf
input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END - end
output.log
您可以看到输出事件中有一个名为“user”的新字段。
{ "path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-25T19:55:37.383Z", "@version":"1", "host":"wcnlab-PC", "message":"NFO - 48566 - TRANSACTION_START - start\r", "user":"tutorialspoint.com","tags":["_grokparsefailure"] } { "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-25T19:55:37.383Z","loglevel":"INFO","logger":"SQL", "@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 320\r", "user":"tutorialspoint.com","taskid":"48566","tags":[] } { "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO", "logger":"SQL","@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 200\r", "user":"tutorialspoint.com","taskid":"48566","tags":[] } { "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO", "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC","label":"end", "message":" INFO - 48566 - TRANSACTION_END - end\r", "user":"tutorialspoint.com","taskid":"48566","tags":[] }
Logstash - 输出阶段
输出是 Logstash 管道中的最后一个阶段,它将输入日志中的过滤器数据发送到指定的目的地。Logstash 提供多个输出插件,可以将过滤后的日志事件存储到各种不同的存储和搜索引擎中。
存储日志
Logstash 可以将过滤后的日志存储在**文件、Elasticsearch 引擎、stdout、AWS CloudWatch**等中。Logstash 还可以使用**TCP、UDP、Websocket**等网络协议将日志事件传输到远程存储系统。
在 ELK 堆栈中,用户使用 Elasticsearch 引擎来存储日志事件。在下面的示例中,我们将为本地 Elasticsearch 引擎生成日志事件。
安装 Elasticsearch 输出插件
我们可以使用以下命令安装 Elasticsearch 输出插件。
>logstash-plugin install Logstash-output-elasticsearch
logstash.conf
此配置文件包含一个 Elasticsearch 插件,该插件将输出事件存储在本地安装的 Elasticsearch 中。
input { file { path => "C:/tpwork/logstash/bin/log/input.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } mutate { add_field => {"user" => "tutorialspoint.com"} } } output { elasticsearch { hosts => ["127.0.0.1:9200"] } }
Input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END - end
启动本地主机上的 Elasticsearch
要在本地主机上启动 Elasticsearch,您应该使用以下命令。
C:\elasticsearch\bin> elasticsearch
Elasticsearch 准备就绪后,您可以通过在浏览器中键入以下 URL 来检查它。
https://127.0.0.1:9200/
响应
以下代码块显示本地主机上 Elasticsearch 的响应。
{ "name" : "Doctor Dorcas", "cluster_name" : "elasticsearch", "version" : { "number" : "2.1.1", "build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71", "build_timestamp" : "2015-12-15T13:05:55Z", "build_snapshot" : false, "lucene_version" : "5.3.1" }, "tagline" : "You Know, for Search" }
**注意** - 有关 Elasticsearch 的更多信息,您可以点击以下链接。
https://tutorialspoint.com/elasticsearch/index.html
现在,使用上述 Logstash.conf 运行 Logstash
>Logstash –f Logstash.conf
在将上述文本粘贴到输出日志中之后,该文本将由 Logstash 存储在 Elasticsearch 中。您可以通过在浏览器中键入以下 URL 来检查存储的数据。
https://127.0.0.1:9200/logstash-2017.01.01/_search?pretty
响应
这是存储在索引 Logstash-2017.01.01 中的 JSON 格式数据。
{ "took" : 20, "timed_out" : false, "_shards" : { "total" : 5, "successful" : 5, "failed" : 0 }, "hits" : { "total" : 10, "max_score" : 1.0, "hits" : [ { "_index" : "logstash-2017.01.01", "_type" : "logs", "_id" : "AVlZ9vF8hshdrGm02KOs", "_score" : 1.0, "_source":{ "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2017-01-01T12:17:49.140Z","loglevel":"INFO", "logger":"SQL","@version":"1","host":"wcnlab-PC", "label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 200\r", "user":"tutorialspoint.com","taskid":"48566","tags":[] } }, { "_index" : "logstash-2017.01.01", "_type" : "logs", "_id" : "AVlZ9vF8hshdrGm02KOt", "_score" : 1.0, "_source":{ "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2017-01-01T12:17:49.145Z","loglevel":"INFO", "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC", "label":"end", "message":" INFO - 48566 - TRANSACTION_END - end\r", "user":"tutorialspoint.com","taskid":"48566","tags":[] } } } }
Logstash - 支持的输出
Logstash 提供多个插件来支持各种数据存储或搜索引擎。日志的输出事件可以发送到输出文件、标准输出或 Elasticsearch 等搜索引擎。Logstash 中有三种受支持的输出类型,它们是:
- 标准输出
- 文件输出
- 空输出
现在让我们详细讨论每个输出。
标准输出 (stdout)
它用于将过滤后的日志事件作为数据流生成到命令行界面。这是一个将数据库事务的总持续时间生成到 stdout 的示例。
logstash.conf
此配置文件包含一个标准输出插件,用于将总 sql_duration 写入标准输出。
input { file { path => "C:/tpwork/logstash/bin/log/input.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } } output { if [logger] == "TRANSACTION_END" { stdout { codec => line{format => "%{sql_duration}"} } } }
注意 − 如果尚未安装,请安装聚合过滤器。
>logstash-plugin install Logstash-filter-aggregate
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logsatsh.conf
Input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END – end
stdout(在 Windows 中为命令提示符,在 UNIX 中为终端)
这是总 sql_duration 320 + 200 = 520。
520
文件输出
Logstash 还可以将过滤器日志事件存储到输出文件中。我们将使用上述示例,将输出存储到文件中,而不是 STDOUT。
logstash.conf
此 Logstash 配置文件指示 Logstash 将总 sql_duration 存储到输出日志文件中。
input { file { path => "C:/tpwork/logstash/bin/log/input1.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } } output { if [logger] == "TRANSACTION_END" { file { path => "C:/tpwork/logstash/bin/log/output.log" codec => line{format => "%{sql_duration}"} } } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logsatsh.conf
input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END – end
output.log
以下代码块显示输出日志数据。
520
空输出
这是一个特殊的输出插件,用于分析输入和过滤器插件的性能。
Logstash - 插件
Logstash 为其管道的三个阶段(输入、过滤器和输出)提供各种插件。这些插件帮助用户从各种来源捕获日志,例如 Web 服务器、数据库、网络协议等。
捕获后,Logstash 可以根据用户的需求将数据解析并转换为有意义的信息。最后,Logstash 可以将这些有意义的信息发送或存储到各种目标来源,例如 Elasticsearch、AWS Cloudwatch 等。
输入插件
Logstash 中的输入插件帮助用户从各种来源提取和接收日志。使用输入插件的语法如下:
Input { Plugin name { Setting 1…… Setting 2…….. } }
您可以使用以下命令下载输入插件:
>Logstash-plugin install Logstash-input-<plugin name>
Logstash-plugin 实用程序位于 Logstash 安装目录的bin 文件夹中。下表列出了 Logstash 提供的输入插件。
序号 | 插件名称和说明 |
---|---|
1 | beats 获取来自 Elastic Beats 框架的日志数据或事件。 |
2 | cloudwatch 从 Amazon Web Services 提供的 API CloudWatch 中提取事件。 |
3 | couchdb_changes 使用此插件发送来自 couchdb 的 _changes URI 的事件。 |
4 | drupal_dblog 提取启用了 DBLog 的 Drupal 的 watchdog 日志数据。 |
5 | Elasticsearch 检索在 Elasticsearch 集群中执行的查询结果。 |
6 | eventlog 获取 Windows 事件日志中的事件。 |
7 | exec 将 shell 命令输出作为输入添加到 Logstash。 |
8 | file 从输入文件获取事件。当 Logstash 在本地安装并访问输入源日志时,这很有用。 |
9 | generator 用于测试目的,它创建随机事件。 |
10 | github 捕获来自 GitHub webhook 的事件。 |
11 | graphite 从 Graphite 监控工具获取指标数据。 |
12 | heartbeat 也用于测试,它产生类似心跳的事件 |
13 | http 通过两种网络协议收集日志事件,分别是 http 和 https。 |
14 | http_poller 用于将 HTTP API 输出解码为事件。 |
15 | jdbc 它将 JDBC 事务转换为 Logstash 中的事件。 |
16 | jmx 使用 JMX 从远程 Java 应用程序提取指标。 |
17 | log4j 通过 TCP 套接字捕获来自 Log4j 的 socketAppender 对象的事件。 |
18 | rss 将命令行工具的输出作为输入事件添加到 Logstash。 |
19 | tcp 通过 TCP 套接字捕获事件。 |
20 | 从 Twitter 流 API 收集事件。 |
21 | unix 通过 UNIX 套接字收集事件。 |
22 | websocket 通过 websocket 协议捕获事件。 |
23 | xmpp 通过 Jabber/xmpp 协议读取事件。 |
插件设置
所有插件都有其特定的设置,有助于指定插件中的重要字段,例如端口、路径等。我们将讨论一些输入插件的设置。
文件
此输入插件用于直接从输入源中存在的日志或文本文件提取事件。它的工作方式类似于 UNIX 中的 tail 命令,保存最后读取的游标,只读取输入文件中新追加的数据,但这可以通过使用 star_position 设置来更改。以下是此输入插件的设置。
设置名称 | 默认值 | 描述 |
---|---|---|
add_field | {} | 向输入事件追加新字段。 |
close_older | 3600 | 最后读取时间(以秒为单位)超过此插件中指定的的文件将被关闭。 |
codec | “plain” | 它用于在进入 Logstash 管道之前解码数据。 |
delimiter | “\n” | 它用于指定换行符。 |
discover_interval | 15 | 这是在指定路径中发现新文件之间的时间间隔(以秒为单位)。 |
enable_metric | true | 它用于启用或禁用指定插件的指标报告和收集。 |
exclude | 它用于指定应从输入插件中排除的文件名或模式。 | |
Id | 为该插件实例指定唯一标识。 | |
max_open_files | 它指定 Logstash 同时可以打开的最大输入文件数。 | |
path | 指定文件的路径,它可以包含文件名的模式。 | |
start_position | “end” | 如果需要,您可以将其更改为“beginning”,这样 Logstash 最初将从开头开始读取文件,而不仅仅是新的日志事件。 |
start_interval | 1 | 它指定 Logstash 检查修改后的文件的时间间隔(以秒为单位)。 |
tags | 添加任何其他信息,例如 Logstash 在任何日志事件无法符合指定的 grok 过滤器时,会在标签中添加“_grokparsefailure”。 | |
type | 这是一个特殊字段,您可以将其添加到输入事件中,它在过滤器和 Kibana 中很有用。 |
Elasticsearch
此特定插件用于读取 Elasticsearch 集群中的搜索查询结果。以下是此插件中使用的设置:
设置名称 | 默认值 | 描述 |
---|---|---|
add_field | {} | 与文件插件相同,它用于在输入事件中追加字段。 |
ca_file | 它用于指定 SSL 证书颁发机构文件的路径。 | |
codec | “plain” | 它用于在进入 Logstash 管道之前解码来自 Elasticsearch 的输入事件。 |
docinfo | “false” | 如果要从 Elasticsearch 引擎提取其他信息(如索引、类型和 ID),可以将其更改为 true。 |
docinfo_fields | ["_index", "_type", "_id"] | 您可以消除在 Logstash 输入中不需要的任何字段。 |
enable_metric | true | 它用于启用或禁用该插件实例的指标报告和收集。 |
hosts | 它用于指定所有 Elasticsearch 引擎的地址,这些引擎将作为该 Logstash 实例的输入源。语法为 host:port 或 IP:port。 | |
Id | 它用于为该特定输入插件实例提供唯一的标识号。 | |
index | "logstash-*" | 它用于指定 Logstash 将监控的索引名称或模式,以作为 Logstash 的输入。 |
password | 用于身份验证。 | |
query | "{ \"sort\": [ \"_doc\" ] }" | 执行的查询。 |
ssl | false | 启用或禁用安全套接字层。 |
tags | 向输入事件添加任何其他信息。 | |
type | 它用于对输入表单进行分类,以便在以后的阶段轻松搜索所有输入事件。 | |
user | 用于身份验证。 |
eventlog
此输入插件从 Windows 服务器的 win32 API 读取数据。以下是此插件的设置:
设置名称 | 默认值 | 描述 |
---|---|---|
add_field | {} | 与文件插件相同,它用于在输入事件中追加字段 |
codec | “plain” | 它用于在进入 Logstash 管道之前解码来自 Windows 的输入事件 |
logfile | ["Application", "Security", "System"] | 输入日志文件中所需的事件 |
interval | 1000 | 它以毫秒为单位,定义两次连续检查新事件日志之间的时间间隔 |
tags | 向输入事件添加任何其他信息 | |
type | 它用于将来自特定插件的输入表单分类为给定的类型,以便在以后的阶段轻松搜索所有输入事件 |
此输入插件用于从其流式 API 收集 Twitter 的 feed。下表描述了此插件的设置。
设置名称 | 默认值 | 描述 |
---|---|---|
add_field | {} | 与文件插件相同,它用于在输入事件中追加字段 |
codec | “plain” | 它用于在进入 Logstash 管道之前解码来自 Windows 的输入事件 |
consumer_key | 它包含 Twitter 应用的消费者密钥。更多信息,请访问 https://dev.twitter.com/apps/new | |
consumer_secret | 它包含 Twitter 应用的消费者密钥。更多信息,请访问 https://dev.twitter.com/apps/new | |
enable_metric | true | 它用于启用或禁用该插件实例的指标报告和收集 |
follows | 它指定用逗号分隔的用户 ID,Logstash 检查这些用户的 Twitter 状态。 更多信息,请访问 |
|
full_tweet | false | 如果希望 Logstash 读取 Twitter API 返回的完整对象,可以将其更改为 true |
id | 它用于为该特定输入插件实例提供唯一的标识号 | |
ignore_retweets | False | 可以将其设置为 true 以忽略输入 Twitter feed 中的转发 |
keywords | 这是一个关键字数组,需要在 Twitter 输入 feed 中跟踪 | |
language | 它定义了 Logstash 从输入 Twitter feed 中需要的推文的语言。这是一个标识符数组,它定义了 Twitter 中的特定语言 | |
locations | 根据指定的地理位置过滤输入 feed 中的推文。这是一个数组,包含地理位置的经度和纬度 | |
oauth_token | 这是一个必需字段,包含用户 oauth 令牌。更多信息,请访问以下链接 https://dev.twitter.com/apps | |
oauth_token_secret | 这是一个必需字段,包含用户 oauth 密钥。更多信息,请访问以下链接 https://dev.twitter.com/apps | |
tags | 向输入事件添加任何其他信息 | |
type | 它用于将来自特定插件的输入表单分类为给定的类型,以便在以后的阶段轻松搜索所有输入事件 |
TCP
TCP 用于通过 TCP 套接字获取事件;它可以从用户连接或服务器读取,这取决于 mode 设置。下表描述了此插件的设置:
设置名称 | 默认值 | 描述 |
---|---|---|
add_field | {} | 与文件插件相同,它用于在输入事件中追加字段 |
codec | “plain” | 它用于在进入 Logstash 管道之前解码来自 Windows 的输入事件 |
enable_metric | true | 它用于启用或禁用该插件实例的指标报告和收集 |
host | “0.0.0.0” | 客户端依赖的服务器操作系统的地址 |
id | 它包含 Twitter 应用的消费者密钥 | |
mode | “server” | 它用于指定输入源是服务器还是客户端。 |
port | 它定义端口号 | |
ssl_cert | 它用于指定 SSL 证书的路径 | |
ssl_enable | false | 启用或禁用 SSL |
ssl_key | 指定 SSL 密钥文件的路径 | |
tags | 向输入事件添加任何其他信息 | |
type | 它用于将来自特定插件的输入表单分类为给定的类型,以便在以后的阶段轻松搜索所有输入事件 |
Logstash – 输出插件
Logstash 支持各种输出源和不同的技术,例如数据库、文件、电子邮件、标准输出等。
使用输出插件的语法如下:
output { Plugin name { Setting 1…… Setting 2…….. } }
您可以使用以下命令下载输出插件:
>logstash-plugin install logstash-output-<plugin name>
Logstash-plugin 实用程序位于 Logstash 安装目录的 bin 文件夹中。下表描述了 Logstash 提供的输出插件。
序号 | 插件名称和说明 |
---|---|
1 | CloudWatch 此插件用于将聚合的指标数据发送到 Amazon Web Services 的 CloudWatch。 |
2 | csv 它用于以逗号分隔的方式写入输出事件。 |
3 | Elasticsearch 它用于将输出日志存储在 Elasticsearch 索引中。 |
4 | 它用于在生成输出时发送通知电子邮件。用户可以在电子邮件中添加有关输出的信息。 |
5 | exec 它用于运行与输出事件匹配的命令。 |
6 | ganglia 它将指标写入 Ganglia 的 gmond。 |
7 | gelf 它用于以 GELF 格式为 Graylog2 生成输出。 |
8 | google_bigquery 它将事件输出到 Google BigQuery。 |
9 | google_cloud_storage 它将输出事件存储到 Google Cloud Storage。 |
10 | graphite 它用于将输出事件存储到 Graphite。 |
11 | graphtastic 它用于在 Windows 上写入输出指标。 |
12 | hipchat 它用于将输出日志事件存储到 HipChat。 |
13 | http 它用于将输出日志事件发送到 http 或 https 终结点。 |
14 | influxdb 它用于将输出事件存储到 InfluxDB。 |
15 | irc 它用于将输出事件写入 irc。 |
16 | mongodb 它将输出数据存储在 MongoDB 中。 |
17 | nagios 它用于使用被动检查结果通知 Nagios。 |
18 | nagios_nsca 它用于通过 NSCA 协议使用被动检查结果通知 Nagios。 |
19 | opentsdb 它将 Logstash 输出事件存储到 OpenTSDB。 |
20 | pipe 它将输出事件流式传输到另一个程序的标准输入。 |
21 | rackspace 它用于将输出日志事件发送到 Rackspace Cloud 的队列服务。 |
22 | redis 它使用 rpush 命令将输出日志数据发送到 Redis 队列。 |
23 | riak 它用于将输出事件存储到 Riak 分布式键值对中。 |
24 | s3 它将输出日志数据存储到 Amazon Simple Storage Service。 |
25 | sns 它用于将输出事件发送到 Amazon 的 Simple Notification Service。 |
26 | solr_http 它在 Solr 中索引和存储输出日志数据。 |
27 | sps 它用于将事件发送到 AWS 的 Simple Queue Service。 |
28 | statsd 它用于将指标数据发送到 statsd 网络守护进程。 |
29 | stdout 它用于在 CLI(如命令提示符)的标准输出上显示输出事件。 |
30 | syslog 它用于将输出事件发送到 syslog 服务器。 |
31 | tcp 它用于将输出事件发送到 TCP 套接字。 |
32 | udp 它用于通过 UDP 推送输出事件。 |
33 | websocket 它用于通过 WebSocket 协议推送输出事件。 |
34 | xmpp 它用于通过 XMPP 协议推送输出事件。 |
所有插件都有其特定的设置,这有助于在插件中指定重要的字段,例如端口、路径等。我们将讨论一些输出插件的设置。
Elasticsearch
Elasticsearch 输出插件使 Logstash 能够将输出存储在 Elasticsearch 引擎的特定集群中。这是用户常用的选择之一,因为它包含在 ELK Stack 包中,因此为 DevOps 提供端到端解决方案。下表描述了此输出插件的设置。
设置名称 | 默认值 | 描述 |
---|---|---|
action | index | 它用于定义在 Elasticsearch 引擎中执行的操作。此设置的其他值包括 delete、create、update 等。 |
cacert | 它包含服务器证书验证的 .cer 或 .pem 文件的路径。 | |
codec | “plain” | 它用于在将输出日志数据发送到目标源之前对其进行编码。 |
doc_as_upset | false | 此设置用于更新操作。如果在输出插件中未指定文档 ID,则它会在 Elasticsearch 引擎中创建一个文档。 |
document_type | 它用于将相同类型的事件存储在相同的文档类型中。如果未指定,则使用事件类型。 | |
flush_size | 500 | 这用于提高 Elasticsearch 中批量上传的性能。 |
hosts | [“127.0.0.1”] | 它是输出日志数据的目标地址数组。 |
idle_flush_time | 1 | 它定义两次刷新之间的时间限制(秒),Logstash 在此设置中指定的时间限制后强制刷新。 |
index | "logstash-%{+YYYY.MM.dd}" | 它用于指定 Elasticsearch 引擎的索引。 |
manage_temlpate | true | 它用于在 Elasticsearch 中应用默认模板。 |
parent | nil | 它用于指定 Elasticsearch 中父文档的 ID。 |
password | 它用于对 Elasticsearch 中的安全集群进行身份验证请求。 | |
path | 它用于指定 Elasticsearch 的 HTTP 路径。 | |
pipeline | nil | 它用于设置用户希望为事件执行的摄取管道。 |
proxy | 它用于指定 HTTP 代理。 | |
retry_initial_interval | 2 | 它用于设置批量重试之间初始时间间隔(秒)。每次重试后都会翻倍,直到达到 retry_max_interval。 |
retry_max_interval | 64 | 它用于设置 retry_initial_interval 的最大时间间隔。 |
retry_on_conflict | 1 | 它是 Elasticsearch 更新文档的重试次数。 |
ssl | 启用或禁用对 Elasticsearch 的 SSL/TLS 安全连接。 | |
template | 它包含 Elasticsearch 中自定义模板的路径。 | |
template_name | "logstash" | 这用于命名 Elasticsearch 中的模板。 |
timeout | 60 | 它是对 Elasticsearch 的网络请求的超时时间。 |
upsert | “” | 它更新文档,如果文档 ID 不存在,则在 Elasticsearch 中创建一个新文档。 |
user | 它包含用于在安全的 Elasticsearch 集群中对 Logstash 请求进行身份验证的用户。 |
电子邮件输出插件用于在 Logstash 生成输出时通知用户。下表描述了此插件的设置。
设置名称 | 默认值 | 描述 |
---|---|---|
address | “localhost” | 它是邮件服务器的地址。 |
attachments | [] | 它包含附加文件的名称和位置。 |
body | “” | 它包含电子邮件正文,应为纯文本。 |
cc | 它包含以逗号分隔的电子邮件地址,用于电子邮件的抄送。 | |
codec | “plain” | 它用于在将输出日志数据发送到目标源之前对其进行编码。 |
contenttype | "text/html; charset = UTF-8" | 它用于设置电子邮件的内容类型。 |
debug | false | 它用于在调试模式下执行邮件中继。 |
domain | "localhost" | 它用于设置发送电子邮件消息的域名。 |
from | "[email protected]" | 它用于指定发件人的电子邮件地址。 |
htmlbody | “” | 它用于指定 html 格式的电子邮件正文。 |
password | 它用于对邮件服务器进行身份验证。 | |
port | 25 | 它用于定义与邮件服务器通信的端口。 |
replyto | 它用于指定电子邮件“回复”字段的电子邮件 ID。 | |
subject | “” | 它包含电子邮件的主题行。 |
use_tls | false | 启用或禁用与邮件服务器通信的 TSL。 |
username | 它包含用于对服务器进行身份验证的用户名。 | |
via | “smtp” | 它定义 Logstash 发送电子邮件的方法。 |
Http
此设置用于通过 http 将输出事件发送到目标。此插件具有以下设置:
设置名称 | 默认值 | 描述 |
---|---|---|
automatic_retries | 1 | 它用于设置 logstash 重试 http 请求的次数。 |
cacert | 它包含服务器证书验证的文件路径。 | |
codec | “plain” | 它用于在将输出日志数据发送到目标源之前对其进行编码。 |
content_type | 它指定对目标服务器的 http 请求的内容类型。 | |
cookies | true | 它用于启用或禁用 cookie。 |
format | "json" | 它用于设置 http 请求正文的格式。 |
headers | 它包含 http 标头的信息。 | |
http_method | “” | 它用于指定 logstash 在请求中使用的 http 方法,其值可以是“put”、“post”、“patch”、“delete”、“get”、“head”。 |
request_timeout | 60 | 它用于对邮件服务器进行身份验证。 |
url | 这是此插件必需的设置,用于指定 http 或 https 终结点。 |
stdout
stdout 输出插件用于将输出事件写入命令行界面的标准输出。在 Windows 中是命令提示符,在 UNIX 中是终端。此插件具有以下设置:
设置名称 | 默认值 | 描述 |
---|---|---|
codec | “plain” | 它用于在将输出日志数据发送到目标源之前对其进行编码。 |
workers | 1 | 它用于指定输出的工作程序数量。 |
statsd
它是一个网络守护程序,用于通过 UDP 将矩阵数据发送到目标后端服务。在 Windows 中是命令提示符,在 UNIX 中是终端。此插件具有以下设置:
设置名称 | 默认值 | 描述 |
---|---|---|
codec | “plain” | 它用于在将输出日志数据发送到目标源之前对其进行编码。 |
count | {} | 它用于定义要在指标中使用的计数。 |
decrement | [] | 它用于指定递减指标名称。 |
host | “localhost” | 它包含 statsd 服务器的地址。 |
increment | [] | 它用于指定递增指标名称。 |
port | 8125 | 它包含 statsd 服务器的端口。 |
sample_rate | 1 | 它用于指定指标的采样率。 |
sender | “%{host}” | 它指定发送者的名称。 |
set | {} | 它用于指定一个集合指标。 |
timing | {} | 它用于指定一个计时指标。 |
workers | 1 | 它用于指定输出的工作程序数量。 |
过滤器插件
Logstash 支持各种过滤器插件,用于将输入日志解析和转换为更结构化且易于查询的格式。
使用过滤器插件的语法如下:
filter { Plugin name { Setting 1…… Setting 2…….. } }
您可以使用以下命令下载过滤器插件:
>logstash-plugin install logstash-filter-<plugin name>
Logstash-plugin 实用程序位于 Logstash 安装目录的 bin 文件夹中。下表描述了 Logstash 提供的输出插件。
序号 | 插件名称和说明 |
---|---|
1 | aggregate 此插件收集或聚合来自相同类型各种事件的数据,并在最终事件中对其进行处理。 |
2 | alter 它允许用户更改日志事件的字段,而 mutate 过滤器无法处理这些字段。 |
3 | anonymize 它用于用一致的哈希值替换字段的值。 |
4 | cipher 它用于在将输出事件存储到目标源之前对其进行加密。 |
5 | clone 它用于在 Logstash 中创建输出事件的副本。 |
6 | collate 它根据时间或计数合并来自不同日志的事件。 |
7 | csv 此插件根据分隔符解析输入日志中的数据。 |
8 | date 它解析事件中字段中的日期,并将其设置为事件的时间戳。 |
9 | dissect 此插件帮助用户从非结构化数据中提取字段,并使 grok 过滤器能够正确地解析它们。 |
10 | drop 它用于删除相同类型或任何其他相似性的所有事件。 |
11 | elapsed 它用于计算开始事件和结束事件之间的时间。 |
12 | Elasticsearch 它用于将 Elasticsearch 中存在的先前日志事件的字段复制到 Logstash 中的当前日志事件。 |
13 | extractnumbers 它用于从日志事件中的字符串中提取数字。 |
14 | geoip 它在事件中添加一个字段,该字段包含日志事件中存在的 IP 地址的经纬度。 |
15 | grok 这是一个常用的过滤器插件,用于解析事件以获取字段。 |
16 | i18n 它删除日志事件中字段中的特殊字符。 |
17 | json 它用于在事件中或事件的特定字段中创建结构化的 JSON 对象。 |
18 | kv 此插件可用于解析日志数据中的键值对。 |
19 | metrics 它用于聚合指标,例如计算每个事件中的时间持续时间。 |
20 | multiline 它也是一个常用的过滤器插件,可以帮助用户将多行日志数据转换为单个事件。 |
21 | mutate 此插件用于重命名、删除、替换和修改事件中的字段。 |
22 | range 它用于根据预期范围检查事件中字段的数值以及字符串的长度是否在范围内。 |
23 | ruby 它用于运行任意的 Ruby 代码。 |
24 | sleep 这使得 Logstash 休眠指定的时间。 |
25 | split 它用于拆分事件的一个字段,并将所有拆分的值放在该事件的克隆中。 |
26 | xml 它用于通过解析日志中存在的 XML 数据来创建事件。 |
Codec 插件
Codec 插件可以是输入或输出插件的一部分。这些插件用于更改或格式化日志数据的表示方式。Logstash 提供多个 Codec 插件,如下所示:
序号 | 插件名称和说明 |
---|---|
1 | avro 此插件将 Logstash 事件编码序列化为 avro 数据,或将 avro 记录解码为 Logstash 事件。 |
2 | cloudfront 此插件读取来自 AWS cloudfront 的编码数据。 |
3 | cloudtrail 此插件用于读取来自 AWS cloudtrail 的数据。 |
4 | collectd 这读取通过 UDP 收集的称为 collected 的二进制协议的数据。 |
5 | compress_spooler 它用于将 Logstash 中的日志事件压缩成分批处理。 |
6 | dots 这用于通过为每个事件设置一个点到 stdout 来进行性能跟踪。 |
7 | es_bulk 这用于将 Elasticsearch 的批量数据转换为 Logstash 事件,包括 Elasticsearch 元数据。 |
8 | graphite 此 codec 将来自 graphite 的数据读取到事件中,并将事件更改为 graphite 格式的记录。 |
9 | gzip_lines 此插件用于处理 gzip 编码的数据。 |
10 | json 这用于将 JSON 数组中的单个元素转换为单个 Logstash 事件。 |
11 | json_lines 它用于处理带有换行符分隔符的 JSON 数据。 |
12 | line 此插件将以单行读取和写入事件,这意味着换行符后将会有一个新的事件。 |
13 | multiline 它用于将多行日志数据转换为单个事件。 |
14 | netflow 此插件用于将 netflow v5/v9 数据转换为 logstash 事件。 |
15 | nmap 它将 nmap 结果数据解析为 XML 格式。 |
16 | plain 这读取没有分隔符的文本。 |
17 | rubydebug 此插件将使用 Ruby awesome print 库写入输出 Logstash 事件。 |
构建您自己的插件
您还可以创建适合您需求的 Logstash 自定制插件。Logstash-plugin 实用程序用于创建自定制插件。在这里,我们将创建一个过滤器插件,它将在事件中添加一条自定制消息。
生成基本结构
用户可以使用 logstash-plugin 实用程序的 generate 选项生成必要的文 件,也可以在 GitHub 上找到。
>logstash-plugin generate --type filter --name myfilter --path c:/tpwork/logstash/lib
这里,**type** 选项用于指定插件是输入、输出还是过滤器。在此示例中,我们正在创建名为 **myfilter** 的过滤器插件。path 选项用于指定要创建插件目录的路径。执行上述命令后,您将看到创建了一个目录结构。
开发插件
您可以在插件目录的 **\lib\logstash\filters** 文件夹中找到插件的代码文件。文件扩展名为 **.rb**。
在我们的例子中,代码文件位于以下路径:
C:\tpwork\logstash\lib\logstash-filter-myfilter\lib\logstash\filters\myfilter.rb
我们将消息更改为:default ⇒ "Hi, You are learning this on tutorialspoint.com" 并保存文件。
安装插件
要安装此插件,需要修改 Logstash 的 Gemfile。您可以在 Logstash 的安装目录中找到此文件。在我们的例子中,它位于 **C:\tpwork\logstash**。使用任何文本编辑器编辑此文件,并在其中添加以下文本。
gem "logstash-filter-myfilter",:path => "C:/tpwork/logstash/lib/logstash-filter-myfilter"
在上述命令中,我们指定了插件的名称以及可以在哪里找到它以进行安装。然后,运行 Logstash-plugin 实用程序来安装此插件。
>logstash-plugin install --no-verify
测试
在这里,我们在之前的示例之一中添加了 **myfilter**:
logstash.conf
此 Logstash 配置文件在 grok 过滤器插件之后,在 filter 部分包含 myfilter。
input { file { path => "C:/tpwork/logstash/bin/log/input1.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } myfilter{} } output { file { path => "C:/tpwork/logstash/bin/log/output1.log" codec => rubydebug } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logsatsh.conf
input.log
以下代码块显示输入日志数据。
INFO - 48566 - TRANSACTION_START - start
output.log
以下代码块显示输出日志数据。
{ "path" => "C:/tpwork/logstash/bin/log/input.log", "@timestamp" => 2017-01-07T06:25:25.484Z, "loglevel" => "INFO", "logger" => "TRANSACTION_END", "@version" => "1", "host" => "Dell-PC", "label" => "end", "message" => "Hi, You are learning this on tutorialspoint.com", "taskid" => "48566", "tags" => [] }
在 Logstash 上发布
开发人员还可以通过将其上传到 github 并遵循 Elasticsearch 公司定义的标准化步骤,将其自定制插件发布到 Logstash。
有关发布的更多信息,请参阅以下 URL:
https://elastic.ac.cn/guide/en/logstash/current/contributing-to-logstash.html
Logstash - 监控 API
Logstash 提供 API 来监控其性能。这些监控 API 提取有关 Logstash 的运行时指标。
节点信息 API
此 API 用于获取有关 Logstash 节点的信息。它以 JSON 格式返回 OS、Logstash 管道和 JVM 的信息。
您可以使用以下 URL 向 Logstash 发送 **get** 请求来提取信息:
GET https://127.0.0.1:9600/_node?pretty
响应
以下是节点信息 API 的响应。
{ "host" : "Dell-PC", "version" : "5.0.1", "http_address" : "127.0.0.1:9600", "pipeline" : { "workers" : 4, "batch_size" : 125, "batch_delay" : 5, "config_reload_automatic" : false, "config_reload_interval" : 3 }, "os" : { "name" : "Windows 7", "arch" : "x86", "version" : "6.1", "available_processors" : 4 }, "jvm" : { "pid" : 312, "version" : "1.8.0_111", "vm_name" : "Java HotSpot(TM) Client VM", "vm_version" : "1.8.0_111", "vm_vendor" : "Oracle Corporation", "start_time_in_millis" : 1483770315412, "mem" : { "heap_init_in_bytes" : 16777216, "heap_max_in_bytes" : 1046937600, "non_heap_init_in_bytes" : 163840, "non_heap_max_in_bytes" : 0 }, "gc_collectors" : [ "ParNew", "ConcurrentMarkSweep" ] } }
您还可以通过在 URL 中添加其名称来获取管道、操作系统和 JVM 的特定信息。
GET https://127.0.0.1:9600/_node/os?pretty GET https://127.0.0.1:9600/_node/pipeline?pretty GET https://127.0.0.1:9600/_node/jvm?pretty
插件信息 API
此 API 用于获取有关 Logstash 中已安装插件的信息。您可以通过向下面提到的 URL 发送 get 请求来检索此信息:
GET https://127.0.0.1:9600/_node/plugins?pretty
响应
以下是插件信息 API 的响应。
{ "host" : "Dell-PC", "version" : "5.0.1", "http_address" : "127.0.0.1:9600", "total" : 95, "plugins" : [ { "name" : "logstash-codec-collectd", "version" : "3.0.2" }, { "name" : "logstash-codec-dots", "version" : "3.0.2" }, { "name" : "logstash-codec-edn", "version" : "3.0.2" }, { "name" : "logstash-codec-edn_lines", "version" : "3.0.2" }, ............ }
节点统计信息 API
此 API 用于以 JSON 对象提取 Logstash 的统计信息(内存、进程、JVM、管道)。您可以通过向下面提到的 URL 发送 get 请求来检索此信息:
GET https://127.0.0.1:9600/_node/stats/?pretty GET https://127.0.0.1:9600/_node/stats/process?pretty GET https://127.0.0.1:9600/_node/stats/jvm?pretty GET https://127.0.0.1:9600/_node/stats/pipeline?pretty
热线程 API
此 API 检索有关 Logstash 中热线程的信息。热线程是 Java 线程,其 CPU 使用率很高,并且运行时间长于正常执行时间。您可以通过向下面提到的 URL 发送 get 请求来检索此信息:
GET https://127.0.0.1:9600/_node/hot_threads?pretty
用户可以使用以下 URL 以更易于阅读的形式获取响应。
GET https://127.0.0.1:9600/_node/hot_threads?human = true
Logstash - 安全性和监控
在本章中,我们将讨论 Logstash 的安全性和监控方面。
监控
Logstash 是一个非常好的工具,用于监控生产环境中的服务器和服务。生产环境中的应用程序会生成不同类型的日志数据,例如访问日志、错误日志等。Logstash 可以使用过滤器插件计算或分析错误、访问或其他事件的数量。此分析和计数可用于监控不同的服务器及其服务。
Logstash 提供了诸如 **HTTP Poller** 之类的插件来监控网站状态监控。在这里,我们正在监控一个名为 **mysite** 的网站,该网站托管在本地 Apache Tomcat 服务器上。
logstash.conf
在此配置文件中,http_poller 插件用于在 interval 设置中指定的时间间隔后命中插件中指定的站点。最后,它将站点的状态写入标准输出。
input { http_poller { urls => { site => "https://127.0.0.1:8080/mysite" } request_timeout => 20 interval => 30 metadata_target => "http_poller_metadata" } } output { if [http_poller_metadata][code] == 200 { stdout { codec => line{format => "%{http_poller_metadata[response_message]}"} } } if [http_poller_metadata][code] != 200 { stdout { codec => line{format => "down"} } } }
运行 Logstash
我们可以使用以下命令运行 Logstash。
>logstash –f logstash.conf
stdout
如果站点正常运行,则输出将为:
Ok
如果我们使用 Tomcat 的 **Manager App** 停止站点,输出将更改为:
down
安全
Logstash 提供了大量功能,用于与外部系统进行安全通信并支持身份验证机制。所有 Logstash 插件都支持通过 HTTP 连接进行身份验证和加密。
使用 HTTP 协议的安全
在 Logstash 提供的各种插件(例如 Elasticsearch 插件)中,有用户和密码等设置用于身份验证。
elasticsearch { user => <username> password => <password> }
另一种身份验证是用于 Elasticsearch 的 **PKI(公钥基础设施)**。开发人员需要在 Elasticsearch 输出插件中定义两个设置才能启用 PKI 身份验证。
elasticsearch { keystore => <string_value> keystore_password => <password> }
在 HTTPS 协议中,开发人员可以使用授权机构的证书进行 SSL/TLS。
elasticsearch { ssl => true cacert => <path to .pem file> }
使用传输协议的安全
要将传输协议与 Elasticsearch 一起使用,用户需要将协议设置设置为传输。这避免了 JSON 对象的反序列化,并提高了效率。
基本身份验证与在 Elasticsearch 输出协议中的 http 协议中执行的身份验证相同。
elasticsearch { protocol => “transport” user => <username> password => <password> }
PKI 身份验证还需要将 SSL 设置为 true,并在 Elasticsearch 输出协议中使用其他设置:
elasticsearch { protocol => “transport” ssl => true keystore => <string_value> keystore_password => <password> }
最后,SSL 安全性比其他通信安全方法需要更多设置。
elasticsearch { ssl => true ssl => true keystore => <string_value> keystore_password => <password> truststore =>truststore_password => <password> }
Logstash 的其他安全优势
Logstash 可以帮助输入系统源防止拒绝服务攻击等攻击。监控日志并分析这些日志中的不同事件可以帮助系统管理员检查传入连接和错误的变化。这些分析可以帮助查看服务器上是否正在发生或即将发生攻击。
Elasticsearch 公司的其他产品,例如 **x-pack** 和 **filebeat**,提供了一些与 Logstash 安全通信的功能。