将 CSV 数据从 Logstash 加载到 Elasticsearch
我们将使用 CSV 数据通过 Logstash 将数据上传到 Elasticsearch。为了进行数据分析,我们可以从 kaggle.com 网站获取数据。Kaggle.com 网站上传了所有类型的数据,用户可以使用它进行数据分析。
我们从这里获取了 countries.csv 数据:
https ://www.kaggle.com/fernandol/countries-of-the-world 。您可以下载 csv 文件并使用它。
我们将要使用的 csv 文件具有以下详细信息。
文件名 - countrydata.csv
列 - “国家”、“地区”、“人口”、“地区”
您还可以创建一个虚拟 csv 文件并使用它。我们将使用 logstash 将这些数据从countriesdata.csv转储到 elasticsearch。
在终端中启动 elasticsearch 和 Kibana 并保持运行。我们必须为 logstash 创建配置文件,其中将包含有关 CSV 文件列的详细信息以及其他详细信息,如下面给出的 logstash-config 文件中所示 -
input {
file {
path => "C:/kibanaproject/countriesdata.csv"
start_position => "beginning"
sincedb_path => "NUL"
}
}
filter {
csv {
separator => ","
columns => ["Country","Region","Population","Area"]
}
mutate {convert => ["Population", "integer"]}
mutate {convert => ["Area", "integer"]}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
=> "countriesdata-%{+dd.MM.YYYY}"
}
stdout {codec => json_lines }
}
在配置文件中,我们创建了 3 个组件 -
输入
我们需要指定输入文件的路径,在我们的例子中是一个 csv 文件。将存储 csv 文件的路径提供给 path 字段。
筛选
将使用带有分隔符的 csv 组件,在我们的例子中是逗号,还有可用于我们的 csv 文件的列。由于 logstash 将所有数据视为 string ,如果我们希望将任何列用作 integer ,则必须使用 mutate 指定相同的浮点数,如上所示。
输出
对于输出,我们需要指定需要放置数据的位置。在这里,在我们的例子中,我们使用的是弹性搜索。需要提供给 elasticsearch 的数据是运行它的主机,我们将其称为 localhost。下一个字段是索引,我们将其命名为国家-currentdate。一旦数据在 Elasticsearch 中更新,我们必须在 Kibana 中使用相同的索引。
将上述配置文件保存为logstash_countries.config。请注意,我们需要在下一步中将此配置的路径提供给 logstash 命令。
要将数据从 csv 文件加载到 elasticsearch,我们需要启动 elasticsearch 服务器 -
现在,运行http://localhost:9200在浏览器中确认 elasticsearch 是否运行成功。
我们正在运行弹性搜索。现在转到安装logstash的路径并运行以下命令将数据上传到elasticsearch。
> logstash -f logstash_countries.conf
上面的屏幕显示了从 CSV 文件加载到 Elasticsearch 的数据。要知道我们是否在 Elasticsearch 中创建了索引,我们可以检查如下 -
我们可以看到如上所示创建的 countriesdata-28.12.2018 索引。
指数详情 - countries-28.12.2018 如下 -
请注意,当数据从 logstash 上传到 elasticsearch 时,会创建带有属性的映射详细信息。