Nous expliquons ici comment écrire des données Apache Spark dans ElasticSearch (ES) en utilisant Python. Nous allons écrire les données du journal Apache dans ES.
Ce sujet est compliqué, à cause de tous les mauvais exemples alambiqués sur Internet. Mais ici, nous facilitons les choses.
Un facteur de complication est que Spark fournit un support natif pour l’écriture sur ElasticSearch en Scala et Java mais pas en Python. Car vous devez télécharger ES-Hadoop, qui est écrit par ElasticSearch, disponible ici.
Vous mettez ensuite cela dans la portée et le mettez à la disposition de pyspark comme ceci:
pyspark --jars elasticsearch-hadoop-6.4.1.jar
Définissez PySpark pour utiliser Python 3 comme ceci:
export PYSPARK_PYTHON=/usr/bin/python3
La clé pour comprendre l’écriture dans ElasticSearch est que, bien qu’ES soit une base de données JSON, elle a une exigence. Les données doivent être dans ce format:
{ "id: { the rest of your json}}
Ci-dessous, nous montrons comment faire cette transformation.
En bas se trouve le code complet et il est en ligne ici. Ici, nous l’expliquons en sections:
(Cet article fait partie de notre guide ElasticSearch. Utilisez le menu de droite pour naviguer.)
Analyse des fichiers journaux Apache
Nous lisons un journal Apache dans un RDD Spark. Nous écrivons ensuite une fonction parse() pour lire chaque chaîne dans des groupes d’expressions régulières, choisir les champs que nous voulons et la transmettre en tant que dictionnaire:.
rdd = sc.textFile("/home/ubuntu/walker/apache_logs")regex='^(\S+) (\S+) (\S+) \+\s\d{4})\] "(\S+)\s?(\S+)?\s?(\S+)?" (\d{3}|-) (\d+|-)\s?"?(*)"?\s?"?(*)?"?$'
p=re.compile(regex)def parse(str): s=p.match(str) d = {} d=s.group(1) d=s.group(4) d=s.group(5) d=s.group(6) return d
En d’autres termes, lorsque nous lisons pour la première fois le fichier texte dans un RDD, cela ressemble à ceci:
Puis nous utilisons la fonction rdd map() pour passer chaque ligne dans la fonction parse() pour le produire.
Maintenant, cela ressemble à JSON, mais ce n’est pas encore JSON. Nous utiliserons json.les vidages, qui, selon la description technique de la documentation Python, « sérialiseront obj en tant que flux au format JSON. »
Nous ajoutons également un identifiant. Dans la configuration ES ci-dessous, nous indiquons à ES quel champ sera l’identifiant unique du document : « es.mapping.id « : « id_doc ».
Nous calculons d’abord un résumé SHA sur l’ensemble du document JSON pour créer cet ID en tant que numéro unique.
Les résultats sont renvoyés comme ceci. Vous pouvez voir que l’ID est un très long numéro SHA devant suivi par le JSON.
Maintenant, nous spécifions la configuration ElasticSearch. Les points importants à noter sont:
» es.resource »: ‘walker/apache’ | « walker » est l’index et « apache » est le type. Le tout ensemble est souvent appelé « l’index. » |
« es.mapping.id « : « doc_id » | Ici, nous indiquons à ES quel document utiliser comme ID de document, ce qui revient à dire le champ _id. |
Le reste des champs sont explicites.
Ensuite, nous utilisons la méthode saveAsNewAPIHadoopFile() pour enregistrer le RDD dans ES. Il n’y a rien d’étude là-bas car la syntaxe est toujours la même pour ES, il n’est donc pas nécessaire de comprendre toutes les pièces de cela.
Maintenant, nous pouvons interroger ES à partir de la ligne de commande et regarder un document:
Voici le code complet: