RSS Feeds mit Logstash und Elasticsearch durchsuchen
In diesem Blogpost möchte ich aufzeigen, wie man RSS Feeds mit Logstash und Elasticsearch durchsuchen kann. Folgendes Logstash-Skript holt sich alle 2 Stunden Informationen aus der Logistikbranche.
input {
rss {
url => "http://www.dvz.de/rssallenews"
interval => 7200
tags => ["de", "rss", "dvz"]
}
rss {
url => "http://www.openpr.de/rss/kategorie/12/Logistik-Transport.xml"
interval => 7200
tags => ["de", "rss", "openpr"]
}
rss {
url => "http://www.pressebox.de/rss/flag/logistik"
interval => 7200
tags => ["de", "rss", "openpr"]
}
}
filter {
}
output {
elasticsearch {
action => "index"
hosts => "localhost"
workers => 1
}
stdout {}
}
Und folgendes Python-Skript ermöglicht das Dursuchen des RSS-Feeds. Die Elasticsearch-Query liefert keine Dubletten (da ja alle 2 Stunden der RSS-Feed indiziert wird) zurück. Dies wird mit Hilfe einer top_hits Aggregation erreicht.
from elasticsearch import Elasticsearch
import html2text
from terminaltables import AsciiTable
import datetime
# init Elasticsearch
es = Elasticsearch([
{'host': '0.0.0.0', 'port': 0}
])
def addRowTable(table, col1, col2, col3, col4):
table.append([str(col1), str(col2), str(col3), str(col4)])
def queryElastic(query):
# query Elasticsearch
results = es.search(index='logstash*', body={
"size":0,
"query":{
"query_string":{
"query": query,
"analyze_wildcard" : "true"
}
},
"aggs":{
"dedup":{
"terms":{
"field":"title.raw",
"size":0
},
"aggs":{
"dedup_docs":{
"top_hits":{
"size":1
}
}
}
}
}
})
# setting parameters for html2text
plainhtml = html2text.HTML2Text()
plainhtml.ignore_links = True
plainhtml.ignore_images = True
# setting ascii table cols
table_data = [['Datum', 'Titel', 'Meldung', 'Link']]
# sorting results by timestamp date
results = sorted(results['aggregations']['dedup']['buckets'], key=lambda k: k['dedup_docs']['hits']['hits'][0]['_source'].get('@timestamp', 0), reverse=True)
for result in results:
datum = datetime.datetime.strptime(result['dedup_docs']['hits']['hits'][0]['_source']['@timestamp'], '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%d.%m.%y')
titel = result['dedup_docs']['hits']['hits'][0]['_source']['title'].strip()
meldung = plainhtml.handle(result['dedup_docs']['hits']['hits'][0]['_source']['message'].strip().replace('\n', ''))
link = result['dedup_docs']['hits']['hits'][0]['_source']['link'].strip()
addRowTable(table_data, datum, titel, meldung, link)
# create ascii table
table = AsciiTable(table_data)
table.inner_row_border = True
#print table
print(table.table)
queryElastic(input("Enter RSS search string: "))
Und so sieht das Ganze in Aktion aus:
Natürlich kann die Abfrage auch in einer Web-Applikation bereitgestellt werden. Python wurde hier nur zur Veranschaulichung benutzt.