22from pathlib import Path
33import logging
44import re
5- import shutil
6- import tempfile
75from collections import OrderedDict
86from typing import Optional
97from app .config import settings
108from app .models .chat import LessonChatRequest
11- from app .utils .blob_store import BlobStore
129from app .utils .prompt_template import PromptTemplate
13- from rag_wrapper import InMemRagOps
10+ from app . services . rag_adapters import BaseRagAdapter , RagAdapterFactory
1411from llama_index .llms .azure_openai import AzureOpenAI
1512from llama_index .embeddings .azure_openai import AzureOpenAIEmbedding
1613from llama_index .core .llms import ChatMessage
@@ -48,10 +45,10 @@ def __init__(self):
4845 )
4946
5047 # Initialize blob storage for retrieving index files
51- self . _blob_store = BlobStore ()
48+ # Note: BlobStore is now handled inside InMemRagOpsAdapter
5249
53- # Initialize LRU cache for RAGOps instances (max 32 items)
54- self ._rag_ops_cache : OrderedDict [str , InMemRagOps ] = OrderedDict ()
50+ # Initialize LRU cache for RAG adapter instances (max 32 items)
51+ self ._rag_adapter_cache : OrderedDict [str , BaseRagAdapter ] = OrderedDict ()
5552 self ._cache_size = 32
5653
5754 async def __call__ (
@@ -68,31 +65,11 @@ async def __call__(
6865 str: The chat response from the RAG system
6966 """
7067 try :
71- # Create local directory for storing downloaded index files
72- index_persist_dir = os .path .join (
73- tempfile .gettempdir (), request .index_path .replace ("/" , "_" )
74- )
75-
76- # Get or create cached RAGOps instance
77- rag_ops = self ._get_or_create_rag_ops (index_persist_dir )
78-
79- # Check if index exists locally, download if not
80- index_exists = await rag_ops .index_exists ()
81- if not index_exists :
82- logging .info (f"Downloading new RAG index at { index_persist_dir } ..." )
83-
84- # Download index files from blob storage
85- downloaded_file_paths = await self ._blob_store .download_blobs_to_folder (
86- prefix = request .index_path , target_folder = index_persist_dir
87- )
88-
89- if not downloaded_file_paths :
90- raise RuntimeError (
91- f"No files downloaded for index path: { request .index_path } "
92- )
68+ # Get or create cached RAG adapter instance
69+ rag_adapter = await self ._get_or_create_rag_adapter (request .index_path )
9370
94- file_paths_str = " \n " . join ( downloaded_file_paths )
95- logging . info ( f"Downloaded RAG index files: { file_paths_str } " )
71+ # Initiate the index (download files for InMem, no-op for Qdrant )
72+ await rag_adapter . initiate_index ( )
9673
9774 # Extract chapter details and build system message
9875 system_message = self ._prompt_template .get_prompt_with_variables (
@@ -111,27 +88,33 @@ async def __call__(
11188 ] + chat_messages [:- 1 ]
11289
11390 # Get response from RAG system using current message and chat history
114- return await rag_ops .chat_with_index (
91+ return await rag_adapter .chat_with_index (
11592 curr_message = chat_messages [- 1 ].content , chat_history = chat_history
11693 )
11794
11895 except Exception as e :
11996 logger .error (f"Error in lesson chat service: { e } " , exc_info = True )
12097 raise
12198
122- def _clear_index_files (self , index_folder ) -> None :
99+ def _clear_adapter_resources (self , adapter : BaseRagAdapter ) -> None :
123100 """
124- Clean up index files from the specified folder .
101+ Clean up resources used by the RAG adapter .
125102
126103 Args:
127- index_folder: Path to the folder containing index files to remove
104+ adapter: The RAG adapter instance to clean up
128105 """
129- if os .path .exists (index_folder ):
130- try :
131- shutil .rmtree (index_folder )
132- logging .info (f"Successfully cleared resources at { index_folder } " )
133- except Exception as e :
134- logging .error (f"Failed to clear resources at { index_folder } : { e } " )
106+ try :
107+ # Use asyncio to run the async cleanup method
108+ import asyncio
109+
110+ loop = asyncio .get_event_loop ()
111+ if loop .is_running ():
112+ # Create a task for cleanup if we're in an async context
113+ asyncio .create_task (adapter .cleanup ())
114+ else :
115+ loop .run_until_complete (adapter .cleanup ())
116+ except Exception as e :
117+ logging .error (f"Failed to clean up adapter resources: { e } " )
135118
136119 def _extract_details (self , chapter_id : str ):
137120 """
@@ -162,57 +145,63 @@ def _extract_details(self, chapter_id: str):
162145 else :
163146 raise ValueError (f"Invalid chapter_id format: { chapter_id } " )
164147
165- def _get_or_create_rag_ops (self , index_persist_dir : str ) -> InMemRagOps :
148+ async def _get_or_create_rag_adapter (self , index_path : str ) -> BaseRagAdapter :
166149 """
167- Get or create a RAGOps instance with LRU caching.
150+ Get or create a RAG adapter instance with LRU caching.
168151
169152 Args:
170- index_persist_dir: Directory path for the RAG index persistence
153+ index_path: Path to the RAG index
171154
172155 Returns:
173- InMemRagOps : Cached or newly created RAGOps instance
156+ BaseRagAdapter : Cached or newly created RAG adapter instance
174157 """
175- # Check if RAGOps instance exists in cache
176- if index_persist_dir in self ._rag_ops_cache :
158+ # Generate cache key using the factory method
159+ cache_key = index_path
160+
161+ # Check if adapter instance exists in cache
162+ if cache_key in self ._rag_adapter_cache :
177163 # Move to end (most recently used)
178- rag_ops = self ._rag_ops_cache .pop (index_persist_dir )
179- self ._rag_ops_cache [ index_persist_dir ] = rag_ops
180- logger .debug (f"Retrieved RAGOps from cache for: { index_persist_dir } " )
181- return rag_ops
182-
183- # Create new RAGOps instance
184- rag_ops = InMemRagOps (
185- persist_dir = index_persist_dir ,
164+ adapter = self ._rag_adapter_cache .pop (cache_key )
165+ self ._rag_adapter_cache [ cache_key ] = adapter
166+ logger .debug (f"Retrieved RAG adapter from cache for: { index_path } " )
167+ return adapter
168+
169+ # Create new adapter instance
170+ adapter = RagAdapterFactory . create_adapter (
171+ index_path = index_path ,
186172 completion_llm = self ._completion_llm ,
187- emb_llm = self ._embedding_llm ,
173+ embedding_llm = self ._embedding_llm ,
188174 )
189175
176+ # Initialize the adapter
177+ await adapter .initialize ()
178+
190179 # Add to cache and handle LRU eviction
191- self ._rag_ops_cache [ index_persist_dir ] = rag_ops
180+ self ._rag_adapter_cache [ cache_key ] = adapter
192181
193182 # Remove least recently used item if cache is full
194- if len (self ._rag_ops_cache ) > self ._cache_size :
195- oldest_key = next (iter (self ._rag_ops_cache ))
196- evicted_rag_ops = self ._rag_ops_cache .pop (oldest_key )
183+ if len (self ._rag_adapter_cache ) > self ._cache_size :
184+ oldest_key = next (iter (self ._rag_adapter_cache ))
185+ evicted_adapter = self ._rag_adapter_cache .pop (oldest_key )
197186
198- # Clean up index files for the evicted RAGOps instance
199- self ._clear_index_files ( evicted_rag_ops . persist_dir )
187+ # Clean up resources for the evicted adapter
188+ self ._clear_adapter_resources ( evicted_adapter )
200189
201190 logger .debug (
202- f"Evicted RAGOps from cache and cleared index files : { oldest_key } "
191+ f"Evicted RAG adapter from cache and cleared resources : { oldest_key } "
203192 )
204193
205- logger .debug (f"Created new RAGOps and cached for: { index_persist_dir } " )
206- return rag_ops
194+ logger .debug (f"Created new RAG adapter and cached for: { index_path } " )
195+ return adapter
207196
208197 def cleanup (self ) -> None :
209- """Clear the RAGOps cache and associated index files ."""
210- # Clean up index files for all cached RAGOps instances
211- for rag_ops in self ._rag_ops_cache .values ():
212- self ._clear_index_files ( rag_ops . persist_dir )
198+ """Clear the RAG adapter cache and associated resources ."""
199+ # Clean up resources for all cached adapters
200+ for adapter in self ._rag_adapter_cache .values ():
201+ self ._clear_adapter_resources ( adapter )
213202
214- self ._rag_ops_cache .clear ()
215- logger .info ("RAGOps cache cleared and index files cleaned up" )
203+ self ._rag_adapter_cache .clear ()
204+ logger .info ("RAG adapter cache cleared and resources cleaned up" )
216205
217206 def get_cache_info (self ) -> dict :
218207 """
@@ -222,9 +211,9 @@ def get_cache_info(self) -> dict:
222211 dict: Dictionary containing cache size and keys
223212 """
224213 return {
225- "cache_size" : len (self ._rag_ops_cache ),
214+ "cache_size" : len (self ._rag_adapter_cache ),
226215 "max_cache_size" : self ._cache_size ,
227- "cached_keys" : list (self ._rag_ops_cache .keys ()),
216+ "cached_keys" : list (self ._rag_adapter_cache .keys ()),
228217 }
229218
230219
0 commit comments