Skip to content
Snippets Groups Projects
Commit 88949b8e authored by hugo.varenne's avatar hugo.varenne
Browse files

feat: Add part of Google

parent 6c872c7d
No related branches found
No related tags found
No related merge requests found
# Creator: Abir Chebbi (abir.chebbi@hesge.ch)
import os
import argparse
from google.cloud import storage
def create_bucket(gcp_client, bucket_name):
"""Creates a new bucket."""
bucket = gcp_client.create_bucket(bucket_name)
# Function to write files to S3
def write_files(gcp_client, directory, bucket):
bucket = gcp_client.bucket(bucket)
for filename in os.listdir(directory):
if filename.endswith(".pdf"): # Check if the file is a PDF
file_path = os.path.join(directory, filename)
with open(file_path, 'rb') as file:
print(f"Uploading {filename} to bucket {bucket}...")
blob = bucket.blob(filename)
blob.upload_from_filename(filename)
print(f"{filename} uploaded successfully.")
def main(bucket_name, local_dir):
gcp_client = storage.Client()
create_bucket(gcp_client, bucket_name)
write_files(gcp_client, local_dir, bucket_name)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Upload PDF files to an GCP bucket")
parser.add_argument("--bucket_name", help="The name of the GCP bucket to which the files will be uploaded")
parser.add_argument("--local_path", help="The name of the folder to put the pdf files")
args = parser.parse_args()
main(args.bucket_name, args.local_path)
# Creator: Abir Chebbi (abir.chebbi@hesge.ch)
## Source: https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless-sdk.html
import boto3
import botocore
import time
import argparse
client = boto3.client('opensearchserverless')
#service = 'aoss'
def createEncryptionPolicy(client,policy_name, collection_name):
"""Creates an encryption policy for the specified collection."""
try:
response = client.create_security_policy(
description=f'Encryption policy for {collection_name}',
name=policy_name,
policy=f"""
{{
\"Rules\": [
{{
\"ResourceType\": \"collection\",
\"Resource\": [
\"collection/{collection_name}\"
]
}}
],
\"AWSOwnedKey\": true
}}
""",
type='encryption'
)
print('\nEncryption policy created:')
print(response)
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ConflictException':
print(
'[ConflictException] The policy name or rules conflict with an existing policy.')
else:
raise error
def createNetworkPolicy(client,policy_name,collection_name):
"""Creates a network policy for the specified collection."""
try:
response = client.create_security_policy(
description=f'Network policy for {collection_name}',
name=policy_name,
policy=f"""
[{{
\"Description\": \"Public access for {collection_name}\",
\"Rules\": [
{{
\"ResourceType\": \"dashboard\",
\"Resource\": [\"collection/{collection_name}\"]
}},
{{
\"ResourceType\": \"collection\",
\"Resource\": [\"collection/{collection_name}\"]
}}
],
\"AllowFromPublic\": true
}}]
""",
type='network'
)
print('\nNetwork policy created:')
print(response)
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ConflictException':
print(
'[ConflictException] A network policy with this name already exists.')
else:
raise error
def createAccessPolicy(client, policy_name, collection_name, IAM_USER):
"""Creates a data access policy for the specified collection."""
try:
policy_content = f"""
[
{{
"Rules": [
{{
"Resource": ["collection/{collection_name}"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
],
"ResourceType": "collection"
}},
{{
"Resource": ["index/{collection_name}/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
],
"ResourceType": "index"
}}
],
"Principal": ["arn:aws:iam::352909266144:user/{IAM_USER}"]
}}
]
"""
response = client.create_access_policy(
description=f'Data access policy for {collection_name}',
name=policy_name,
policy=policy_content,
type='data'
)
print('\nAccess policy created:')
print(response)
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ConflictException':
print('[ConflictException] An access policy with this name already exists.')
else:
raise error
def waitForCollectionCreation(client,collection_name):
"""Waits for the collection to become active"""
time.sleep(30)
response = client.batch_get_collection(
names=[collection_name])
print('\nCollection successfully created:')
print(response["collectionDetails"])
# Extract the collection endpoint from the response
host = (response['collectionDetails'][0]['collectionEndpoint'])
final_host = host.replace("https://", "")
return final_host
def main(collection_name,IAM_USER):
encryption_policy_name = f'{collection_name}-encryption-policy'
network_policy_name = f'{collection_name}-network-policy'
access_policy_name = f'{collection_name}-access-policy'
createEncryptionPolicy(client, encryption_policy_name, collection_name)
createNetworkPolicy(client, network_policy_name, collection_name)
createAccessPolicy(client, access_policy_name, collection_name,IAM_USER)
collection = client.create_collection(name=collection_name,type='VECTORSEARCH')
ENDPOINT= waitForCollectionCreation(client,collection_name)
print("Collection created successfully:", collection)
print("Collection ENDPOINT:", ENDPOINT)
if __name__== "__main__":
parser = argparse.ArgumentParser(description="Create collection")
parser.add_argument("--collection_name", help="The name of the collection")
parser.add_argument("--iam_user", help="The iam user")
args = parser.parse_args()
main(args.collection_name,args.iam_user)
# Creator: Abir Chebbi (abir.chebbi@hesge.ch)
import boto3
BUCKET_NAME = ''
S3_CLIENT = boto3.client('s3')
S3_RESOURCE = boto3.resource('s3')
# # # Delete Bucket
# First, delete all objects in the Bucket
bucket = S3_RESOURCE.Bucket(BUCKET_NAME)
print("Deleting all objects in Bucket\n")
bucket.objects.all().delete()
print("Deleting Bucket")
# Bucket Deletion
response = S3_CLIENT.delete_bucket(
Bucket=BUCKET_NAME
)
print(response)
langchain-community
pypdf
opensearch-py
boto3
google-cloud
google-cloud-storage
google
\ No newline at end of file
# Creator: Abir Chebbi (abir.chebbi@hesge.ch)
import boto3
import os
from langchain_community.document_loaders import PyPDFDirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.embeddings import BedrockEmbeddings
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from langchain_community.vectorstores import OpenSearchVectorSearch
import argparse
## S3_client
s3_client = boto3.client('s3')
## Bedrock client
bedrock_client = boto3.client(service_name="bedrock-runtime")
## Configuration for AWS authentication and OpenSearch client
credentials = boto3.Session().get_credentials()
awsauth = AWSV4SignerAuth(credentials, 'us-east-1', 'aoss')
## Create Index in Opensearch
def create_index(client,index_name):
indexBody = {
"settings": {
"index.knn": True
},
"mappings": {
"properties": {
"vector_field": {
"type": "knn_vector",
"dimension": 1536,
"method": {
"engine": "faiss",
"name": "hnsw"
}
}
}
}
}
try:
create_response = client.indices.create(index_name, body=indexBody)
print('\nCreating index:')
print(create_response)
except Exception as e:
print(e)
print("(Index likely already exists?)")
## Load docs from S3
def download_documents(bucket_name,local_dir):
response = s3_client.list_objects_v2(Bucket=bucket_name)
for item in response['Contents']:
key = item['Key']
if key.endswith('.pdf'):
local_filename = os.path.join(local_dir, key)
s3_client.download_file(Bucket=bucket_name, Key=key, Filename=local_filename)
## Split pages/text into chunks
def split_text(docs, chunk_size, chunk_overlap):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks = text_splitter.split_documents(docs)
return chunks
## Generate embeddings
def generate_embeddings(bedrock_client, chunks):
embeddings_model = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client)
chunks_list=[chunk.page_content for chunk in chunks]
embeddings = embeddings_model.embed_documents(chunks_list)
return embeddings
# Store generated embeddings into an OpenSearch index.
def store_embeddings(embeddings, texts, meta_data, host, awsauth, index_name):
docsearch = OpenSearchVectorSearch.from_embeddings(
embeddings,
texts,
meta_data,
opensearch_url=f'https://{host}:443',
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
index_name=index_name,
bulk_size=1000
)
return docsearch
# Func to do both generating and storing embeddings
def generate_store_embeddings(bedrock_client, chunks,awsauth,index_name):
embeddings_model = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_client)
docsearch = OpenSearchVectorSearch.from_documents(
chunks,
embeddings_model,
opensearch_url=f'https://{host}:443',
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
index_name=index_name,
bulk_size=1000
)
return docsearch
## main
def main(bucket_name, endpoint,index_name, local_path):
## Opensearch Client
OpenSearch_client = OpenSearch(
hosts=[{'host': endpoint, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
)
download_documents(bucket_name,local_path)
loader= PyPDFDirectoryLoader(local_path)
docs = loader.load()
print('Start chunking')
chunks = split_text(docs, 1000, 100)
print(chunks[1])
create_index(OpenSearch_client,index_name)
print('Start vectorising')
embeddings= generate_embeddings(bedrock_client, chunks)
print(embeddings[1])
texts = [chunk.page_content for chunk in chunks]
# Prepare metadata for each chunk
meta_data = [{'source': chunk.metadata['source'], 'page': chunk.metadata['page'] + 1} for chunk in chunks]
print('Start storing')
store_embeddings(embeddings, texts, meta_data ,endpoint, awsauth,index_name)
print('End storing')
if __name__== "__main__":
parser = argparse.ArgumentParser(description="Process PDF documents and store their embeddings.")
parser.add_argument("--bucket_name", help="The S3 bucket name where documents are stored")
parser.add_argument("--endpoint", help="The OpenSearch service endpoint")
parser.add_argument("--index_name", help="The name of the OpenSearch index")
parser.add_argument("--local_path", help="local path")
args = parser.parse_args()
main(args.bucket_name, args.endpoint, args.index_name, args.local_path)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment