Elasticsearch:运用 JINA 来实现多模态搜索的 RAG

张开发
2026/4/14 20:37:11 15 分钟阅读

分享文章

Elasticsearch:运用 JINA 来实现多模态搜索的 RAG
Jina Embeddings v4 是一个 38 亿参数的通用向量模型用于多模态多语言检索支持单向量和多向量输出。那么我们该如何使用它对图片及文字进行搜索并最终对搜索的结果做 RAG。下载源码闲话少说我们直接到地址 https://github.com/liu-xiao-guo/jina_multimodal_rag 下载源码。git clone https://github.com/liu-xiao-guo/jina_multimodal_rag$ pwd /Users/liuxg/python/jina_multimodal_rag $ tree -L 3 . ├── README.md ├── app.py ├── images │ ├── bladerunner-city.jpg │ ├── images (1).jpeg │ ├── images (2).jpeg │ ├── images (3).jpeg │ ├── matrix-code.jpg │ ├── starwars-lightsaber.jpg │ └── tfa_poster_wide_header-1536x864-324397389357.0.0.1537961254.webp ├── pics │ ├── pic1.png │ ├── pic2.png │ └── pic3.png ├── requirements.txt └── texts ├── 1.txt ├── 10.txt ├── 2.txt ├── 3.txt ├── 4.txt ├── 5.txt ├── 6.txt ├── 7.txt ├── 8.txt └── 9.txt如上所示我们代码在 app.py 里。我们可以把所有需要向量化的图片放入到 images 目录下。把所有的需要向量化的文字放入到 texts 里文件中。除了上面的文件之外还有一个叫做 .env 的文件.envES_URLYour ES_URL ES_API_KEYYour ES_API_Key GEMINI_FLASH_API_KEYYour Gemini Flash API Key我们需要根据自己的配置填入相应的设置。在今天的使用中我们使用 https://openrouter.ai/ 来调用 Gemini 3 Flash multimodal LLM 来完成我们的 RAG。代码设计为了方便我们的代码设计我们使用 streamlit 来设计界面app.pyimport os import torch import streamlit as st from PIL import Image from transformers import AutoModel from elasticsearch import Elasticsearch from dotenv import load_dotenv import openai import base64 from io import BytesIO # Load environment variables from .env if exists load_dotenv() # ------------------------- # Config # ------------------------- INDEX_NAME multimodal-index IMAGE_FOLDER ./images # local image folder TEXT_FOLDER ./texts # local text folder ES_URL os.getenv(ES_URL, https://localhost:9200) ES_API_KEY os.getenv(ES_API_KEY, ) OPENROUTER_API_KEY os.getenv(GEMINI_FLASH_API_KEY) # ------------------------- # Elasticsearch # ------------------------- st.cache_resource def get_es(): return Elasticsearch(ES_URL, verify_certsFalse, api_keyES_API_KEY) es get_es() # Initialize Elasticsearch client # ------------------------- # Model loading # ------------------------- st.cache_resource def load_model(): device ( mps if torch.backends.mps.is_available() else cuda if torch.cuda.is_available() else cpu ) model AutoModel.from_pretrained( jinaai/jina-embeddings-v4, trust_remote_codeTrue, torch_dtypetorch.float32, ).to(device) model.eval() return model, device model, device load_model() # ------------------------- # LLM Client loading (OpenRouter) # ------------------------- st.cache_resource def load_llm_client(): if not OPENROUTER_API_KEY: st.error(GEMINI_FLASH_API_KEY not found in environment variables.) return None client openai.OpenAI( base_urlhttps://openrouter.ai/api/v1, api_keyOPENROUTER_API_KEY, ) return client llm_client load_llm_client() LLM_MODEL_NAME google/gemini-3-flash-preview # ------------------------- # Index setup # ------------------------- def create_index(): if es.indices.exists(indexINDEX_NAME): return mapping { mappings: { properties: { filename: {type: keyword}, path: {type: keyword}, caption: {type: text}, vector_field: { type: dense_vector, dims: 2048, index: True, similarity: cosine } } } } es.indices.create(indexINDEX_NAME, bodymapping) create_index() # ------------------------- # Embedding helpers # ------------------------- def embed_image(pil_image): with torch.inference_mode(): vec model.encode_image( images[pil_image], taskretrieval, return_numpyTrue ) return vec[0] def embed_text(text): with torch.inference_mode(): vec model.encode_text( texts[text], taskretrieval, prompt_namequery, return_numpyTrue ) return vec[0] # ------------------------- # Batch ingestion for images # ------------------------- def ingest_image_folder(folder): docs [] for fname in os.listdir(folder): if not fname.lower().endswith((.png, .jpg, .jpeg, .webp)): continue path os.path.join(folder, fname) image Image.open(path).convert(RGB) vec embed_image(image) docs.append({ _index: INDEX_NAME, _source: { filename: fname, path: path, caption: fname.replace(_, ), vector_field: vec.tolist(), } }) if docs: from elasticsearch.helpers import bulk bulk(es, docs) # ------------------------- # Batch ingestion for text files # ------------------------- def ingest_text_folder(folder): docs [] for fname in os.listdir(folder): if not fname.lower().endswith(.txt): continue path os.path.join(folder, fname) with open(path, r, encodingutf-8) as f: text f.read().strip() vec embed_text(text) docs.append({ _index: INDEX_NAME, _source: { filename: fname, path: path, caption: text[:500], vector_field: vec.tolist(), } }) if docs: from elasticsearch.helpers import bulk bulk(es, docs) # ------------------------- # KNN search only # ------------------------- def knn_search(query, k10): vec embed_text(query) body { size: k, query: { knn: { field: vector_field, query_vector: vec.tolist(), k: k, num_candidates: 50 } } } res es.search(indexINDEX_NAME, bodybody) return res[hits][hits] # ------------------------- # Image to Base64 helper # ------------------------- def pil_to_base64(image, formatjpeg): buffered BytesIO() image.save(buffered, formatformat) img_str base64.b64encode(buffered.getvalue()).decode() return fdata:image/{format};base64,{img_str} # ------------------------- # RAG Augmentation # ------------------------- def generate_rag_response(user_query: str, k: int 3): Retrieves top K documents, creates a multimodal prompt, and generates a response from Gemini via OpenRouter. st.write(fSearching for top {k} relevant documents for RAG...) results knn_search(user_query, kk) if not results: st.warning(No relevant documents found for RAG.) return # Build the multimodal prompt for OpenAI-compatible API content_parts [] text_context Based on the following information:\n for hit in results: src hit[_source] path src.get(path, ) if path and os.path.exists(path) and path.lower().endswith((.png, .jpg, .jpeg, .webp)): text_context f- Image: {src.get(filename, N/A)}\n try: img Image.open(path).convert(RGB) base64_image pil_to_base64(img) content_parts.append({ type: image_url, image_url: {url: base64_image} }) except Exception as e: st.error(fCould not load image {path}: {e}) else: text_context f- Text Content: {src.get(caption, N/A)}\n text_context f\nAnswer the question: {user_query} content_parts.insert(0, {type: text, text: text_context}) messages [{role: user, content: content_parts}] st.subheader(Gemini Flash Multimodal Prompt:) st.json(messages) if llm_client: with st.spinner(Gemini Flash is generating a response via OpenRouter...): try: response llm_client.chat.completions.create( modelLLM_MODEL_NAME, messagesmessages, max_tokens1024, ) st.markdown(**LLM Generated Response:**) st.markdown(response.choices[0].message.content) except Exception as e: st.error(fError generating response from OpenRouter: {e}) # ------------------------- # Streamlit UI # ------------------------- st.title(️ Multimodal Image Text KNN Search) # Batch ingestion buttons st.subheader(Ingest Data) if st.button( Ingest image folder): ingest_image_folder(IMAGE_FOLDER) st.success(Images ingested successfully) if st.button( Ingest text folder): ingest_text_folder(TEXT_FOLDER) st.success(Text files ingested successfully) col1, col2 st.columns([3, 1]) with col2: if st.button(⚠️ Delete Re-ingest All): with st.spinner(Deleting index and re-ingesting all data...): if es.indices.exists(indexINDEX_NAME): es.indices.delete(indexINDEX_NAME) st.toast(fIndex {INDEX_NAME} deleted.) create_index() st.toast(Index created.) ingest_image_folder(IMAGE_FOLDER) st.toast(Images ingested.) ingest_text_folder(TEXT_FOLDER) st.toast(Texts ingested.) st.success(All data has been re-ingested successfully!) # Search box for typing text queries st.subheader(Search) user_query st.text_input(Type your search query here, keysearch_query) k_value st.slider(Number of results to retrieve (K), min_value1, max_value10, value4) if user_query: st.subheader(fRetrieval Results (Top {k_value})) retrieval_results knn_search(user_query, kk_value) if retrieval_results: cols_per_row 3 for i in range(0, len(retrieval_results), cols_per_row): row retrieval_results[i:i cols_per_row] cols st.columns(len(row), gapmedium) for col, hit in zip(cols, row): src hit[_source] path src.get(path, ) if path and os.path.exists(path) and path.lower().endswith((.png, .jpg, .jpeg, .webp)): col.image(path, captionf{src.get(filename, )}, width200) else: col.write(f**[TEXT]**\n\n{src.get(caption, )}) col.write(fScore: {hit[_score]:.3f}) else: st.info(No relevant documents found in the index.) st.subheader(RAG System Output) st.write(---) generate_rag_response(user_query, kk_value)代码不是很长。运行代码我们需要在虚拟环境中使用如下的命令来安装所需要的库pip install -r requirements.txt我们使用如下的命令来执行streamlit run app.py首次运行我们可以直接点击Delete Re-ingest All按钮来写入所有的 images 及 texts。当然我们也可以分别使用ingest image folder及ingest text folder来完成文件的写入。值得注意的是它们并不会删除之前的索引数据而且重新写入 images 或 texts 目录里的文件。如果多次点击这个按钮它会对该文件夹中的文件多次写入。如下是搜索的结果当我们搜索 Star wars祝大家学习愉快

更多文章