Skip to content
Snippets Groups Projects
Commit 787f30a0 authored by lucas.landrecy's avatar lucas.landrecy
Browse files

bon code

parent 94879607
No related branches found
No related tags found
No related merge requests found
...@@ -4,17 +4,15 @@ WORKDIR /app ...@@ -4,17 +4,15 @@ WORKDIR /app
# Installer les dépendances # Installer les dépendances
COPY requirements.txt . COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt COPY clean.py .
COPY neo4j_import.py .
# Télécharger et extraire le fichier # Télécharger et extraire le fichier
RUN apt-get update && apt-get install -y wget tar && \ RUN apt-get update && apt-get install -y wget tar && \
wget -O dblp.tar.gz https://originalfileserver.aminer.cn/misc/dblp_v14.tar.gz && \ apt install libjansson-dev -y && \
mkdir /app/dblp_data && \ mkdir /app/dblp_data
tar -xzf dblp.tar.gz -C /app/dblp_data && \
rm dblp.tar.gz && \
apt-get remove -y wget && apt-get clean && rm -rf /var/lib/apt/lists/*
# Copier le script Python RUN pip install --no-cache-dir -r requirements.txt
COPY insert_script.py .
CMD ["python", "insert_script.py"] CMD sh -c "wget -O dblp.tar.gz https://originalfileserver.aminer.cn/misc/dblp_v14.tar.gz && tar -xzf dblp.tar.gz && python3 insert_script.py "
\ No newline at end of file
# cleaner.py
import re import re
import json
import ijson
import io
__all__ = ["nettoyer_gros_json"]
def pre_clean_json(input_path, output_path): def pre_clean_json(input_path, output_path):
""" """
Nettoie les annotations MongoDB type NumberInt(...) et les remplace par des entiers valides. Nettoie les annotations MongoDB type NumberInt(...) et les remplace par des entiers valides.
""" """
print(f"🧹 Nettoyage du fichier : {input_path}")
pattern = re.compile(r'NumberInt\((\d+)\)') pattern = re.compile(r'NumberInt\((\d+)\)')
with open(input_path, 'r', encoding='utf-8') as fin, open(output_path, 'w', encoding='utf-8') as fout: with open(input_path, 'r', encoding='utf-8') as fin, open(output_path, 'w', encoding='utf-8') as fout:
...@@ -13,12 +21,23 @@ def pre_clean_json(input_path, output_path): ...@@ -13,12 +21,23 @@ def pre_clean_json(input_path, output_path):
print(f"✅ Fichier nettoyé sauvegardé dans : {output_path}") print(f"✅ Fichier nettoyé sauvegardé dans : {output_path}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser() def nettoyer_gros_json(input_path, output_path):
parser.add_argument("--infile", type=str, required=True, help="Fichier JSON d'entrée (brut)") cleaned_path = "cleaned_" + input_path
parser.add_argument("--outfile", type=str, default="clean_dblpv13.json", help="Fichier nettoyé de sortie")
# Ensure cleaned file exists or use input_path directly if not modifying
# Uncomment the line below if you have a real cleaning step before parsing
pre_clean_json(input_path, cleaned_path)
args = parser.parse_args() print("Begin clear")
pre_clean_json(args.infile, args.outfile) with open(cleaned_path, 'r', encoding='utf-8') as infile, open(output_path, 'w', encoding='utf-8') as outfile:
outfile.write('[\n')
parser = ijson.items(infile, 'item')
first = True
for obj in parser:
filtered = {k: obj[k] for k in ['_id', 'title', 'authors', 'references'] if k in obj}
if not first:
outfile.write(',\n')
json.dump(filtered, outfile, ensure_ascii=False)
first = False
outfile.write('\n]')
import os
import ijson
import argparse
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from py2neo import Graph
from tqdm import tqdm
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
BATCH_SIZE = 500
MAX_WORKERS = 2
def batch_insert_articles(graph, articles):
query = """
UNWIND $articles AS article
MERGE (a:ARTICLE { _id: article._id })
SET a.title = article.title
FOREACH (author IN article.authors |
FOREACH (_ IN CASE WHEN author.name IS NOT NULL THEN [1] ELSE [] END |
MERGE (p:AUTHOR { _id: coalesce(author._id, author.name) })
SET p.name = author.name
MERGE (p)-[:AUTHORED]->(a)
)
)
FOREACH (ref_id IN article.references |
MERGE (r:ARTICLE { _id: ref_id })
MERGE (a)-[:CITES]->(r)
)
"""
graph.run(query, articles=articles)
def main(json_file, limit):
start_time = time.time()
print(f"⏱️ Début : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
graph = Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
print(f"📄 Lecture optimisée de {json_file} (limite: {limit})")
with open(json_file, 'r', encoding='utf-8') as f:
article_iter = ijson.items(f, 'item')
total = 0
futures = []
def flush_batch(batch):
if batch:
futures.append(executor.submit(batch_insert_articles, graph, list(batch)))
batch.clear()
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
batch = []
for article in tqdm(article_iter):
if limit and total >= limit:
break
batch.append(article)
total += 1
if len(batch) >= BATCH_SIZE:
flush_batch(batch)
time.sleep(0.1)
# envoyer les derniers articles
flush_batch(batch)
# attendre la fin de tous les threads
for future in tqdm(as_completed(futures), total=len(futures), desc="💾 Finalisation des insertions"):
future.result()
end_time = time.time()
elapsed_ms = int((end_time - start_time) * 1000)
print(f"✅ Import terminé")
print(f"⏱️ Fin : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🕓 Durée totale : {elapsed_ms:,} ms")
print(f"⚡ Vitesse moyenne : {int(total / (elapsed_ms / 1000))} it/s")
# --- CLI ---
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--file", type=str, default="clean_dblpv13.json", help="Chemin vers le fichier JSON nettoyé")
parser.add_argument("--limit", type=int, default=1000000, help="Nombre maximum d'articles à charger")
args = parser.parse_args()
main(args.file, args.limit)
import os
import ijson import ijson
import argparse import argparse
import time import time
from datetime import datetime from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed from py2neo import Graph
from neo4j import GraphDatabase
from tqdm import tqdm from tqdm import tqdm
NEO4J_URI = "bolt://localhost:7687" NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = "neo4j" NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = "12345678" NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
BATCH_SIZE = 10000 BATCH_SIZE = 500
MAX_WORKERS = 4
def batch_insert_articles(tx, articles): def batch_insert_articles(graph, articles):
query = """ query = """
UNWIND $articles AS article UNWIND $articles AS article
MERGE (a:ARTICLE { _id: article._id }) MERGE (a:ARTICLE { _id: article._id })
...@@ -31,45 +30,40 @@ def batch_insert_articles(tx, articles): ...@@ -31,45 +30,40 @@ def batch_insert_articles(tx, articles):
MERGE (a)-[:CITES]->(r) MERGE (a)-[:CITES]->(r)
) )
""" """
tx.run(query, articles=articles) graph.run(query, articles=articles)
def main(json_file, limit): def main(json_file, limit):
clear_file = "clear_"+json_file
nettoyer_gros_json(json_file, clear_file)
json_file = clear_file
start_time = time.time() start_time = time.time()
print(f"⏱️ Début : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"⏱️ Début : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) graph = Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
print(f"📄 Lecture optimisée de {json_file} (limite: {limit})") print(f"📄 Lecture optimisée de {json_file} (limite: {limit})")
with open(json_file, 'r', encoding='utf-8') as f: with open(json_file, 'r', encoding='utf-8') as f:
article_iter = ijson.items(f, 'item') article_iter = ijson.items(f, 'item')
total = 0 total = 0
futures = []
def flush_batch(batch):
if batch:
futures.append(executor.submit(session.execute_write, batch_insert_articles, batch))
#print(f"📤 Batch de {len(batch)} articles envoyé")
batch.clear() # Vider explicitement le batch pour libérer de la mémoire
with driver.session() as session, ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
batch = [] batch = []
for article in tqdm(article_iter): for article in tqdm(article_iter):
if limit and total >= limit: if limit and total >= limit:
break break
batch.append(article) batch.append(article)
total += 1 total += 1
if len(batch) >= BATCH_SIZE: if len(batch) >= BATCH_SIZE:
flush_batch(batch) batch_insert_articles(graph, batch)
batch.clear()
# envoyer les derniers articles
flush_batch(batch)
# attendre la fin de tous les threads # insérer les derniers éléments
for future in tqdm(as_completed(futures), total=len(futures), desc="💾 Finalisation des insertions"): if batch:
future.result() batch_insert_articles(graph, batch)
driver.close()
end_time = time.time() end_time = time.time()
elapsed_ms = int((end_time - start_time) * 1000) elapsed_ms = int((end_time - start_time) * 1000)
...@@ -78,11 +72,18 @@ def main(json_file, limit): ...@@ -78,11 +72,18 @@ def main(json_file, limit):
print(f"🕓 Durée totale : {elapsed_ms:,} ms") print(f"🕓 Durée totale : {elapsed_ms:,} ms")
print(f"⚡ Vitesse moyenne : {int(total / (elapsed_ms / 1000))} it/s") print(f"⚡ Vitesse moyenne : {int(total / (elapsed_ms / 1000))} it/s")
# Wait for user input before closing
test = ""
while test != "exit":
test = input("Tapez 'exit' pour quitter ou appuyez sur Entrée pour continuer...")
# --- CLI --- # --- CLI ---
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--file", type=str, default="clean_dblpv13.json", help="Chemin vers le fichier JSON nettoyé") parser.add_argument("--file", type=str, default="dblpv13.json", help="Chemin vers le fichier JSON nettoyé")
parser.add_argument("--limit", type=int, default=1000000, help="Nombre maximum d'articles à charger") parser.add_argument("--limit", type=int, default=5000000, help="Nombre maximum d'articles à charger")
args = parser.parse_args() args = parser.parse_args()
main(args.file, args.limit) main(args.file, args.limit)
\ No newline at end of file
File moved
...@@ -9,11 +9,15 @@ services: ...@@ -9,11 +9,15 @@ services:
- "7474:7474" - "7474:7474"
- "7687:7687" - "7687:7687"
environment: environment:
- NEO4J_AUTH=neo4j/testtest - NEO4J_AUTH=neo4j/12345678
volumes: volumes:
- ${HOME}/neo4j/logs:/logs - ${HOME}/neo4j/logs:/logs
- ${HOME}/neo4j/data:/data - ${HOME}/neo4j/data:/data
- ${HOME}/neo4j/import:/var/lib/neo4j/import deploy:
resources:
limits:
memory: 3g
neo4j_client: neo4j_client:
build: build:
...@@ -25,4 +29,10 @@ services: ...@@ -25,4 +29,10 @@ services:
environment: environment:
- NEO4J_URI=bolt://neo4j_db:7687 - NEO4J_URI=bolt://neo4j_db:7687
- NEO4J_USER=neo4j - NEO4J_USER=neo4j
- NEO4J_PASSWORD=testtest - NEO4J_PASSWORD=12345678
volumes:
- ${HOME}/neo4j/import:${HOME}/import
deploy:
resources:
limits:
memory: 3g
...@@ -3,14 +3,6 @@ FROM neo4j:latest ...@@ -3,14 +3,6 @@ FROM neo4j:latest
USER root USER root
RUN apt-get update && \ RUN apt-get update
apt-get install -y python3 python3-pip && \
pip3 install neo4j tqdm ijson && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
USER neo4j USER neo4j
RUN cd ~ && \
git clone https://gitedu.hesge.ch/lucas.landrecy/advdaba_labo2.git
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment