Перейти к содержанию

Сервисы (Services)

В этом разделе представлена документация по ключевым сервисным слоям приложения.

app.services.ingestion.IngestionService

Source code in app/services/ingestion.py
class IngestionService:
    def __init__(self, db: AsyncSession, redis=None):
        self.db = db
        self.catalog_repo = PostgresCatalogRepository(db)
        self.parsing_repo = ParsingRepository(db, redis=redis)

    async def ingest_products(self, products: List[ScrapedProduct], source_id: int):
        if not products:
            return 0

        # Fetch source config for custom normalization
        source = await self.parsing_repo.get_source(source_id)
        strip_params = DEFAULT_STRIP_PARAMS
        if source and source.config:
            custom_strip = source.config.get("strip_params")
            if isinstance(custom_strip, list):
                strip_params = set(custom_strip)

        # 1. Handle Categories
        external_categories = list(set([p.category for p in products if p.category]))
        if external_categories:
            await self.parsing_repo.get_or_create_category_maps(external_categories)

        # 2. Prepare Product data for Upsert
        product_dicts = []
        seen_gift_ids = set()

        for p in products:
            clean_url = normalize_url(p.product_url, strip_params=strip_params)
            gift_id = f"{p.site_key}:{clean_url}" 

            if gift_id in seen_gift_ids:
                continue
            seen_gift_ids.add(gift_id)

            product_dicts.append({
                "gift_id": gift_id,
                "title": p.title,
                "description": p.description,
                "price": p.price,
                "currency": p.currency,
                "image_url": p.image_url,
                "product_url": clean_url,
                "merchant": p.merchant,
                "category": p.category, 
                "raw": p.raw_data,
                "is_active": True
            })

        # 3. Bulk Upsert
        count = await self.catalog_repo.upsert_products(product_dicts)

        # 4. Update Source Stats
        await self.parsing_repo.update_source_stats(source_id, {
            "processed_items": len(products),
            "new_items": count
        })

        # 5. Log Run History
        await self.parsing_repo.log_parsing_run(
            source_id=source_id,
            status="completed",
            items_scraped=len(products),
            items_new=count
        )

        await self.db.commit()
        return count

    async def ingest_categories(self, categories: List[ScrapedCategory], activation_quota: int = 50):
        """Discovers new ParsingSources. Activates some, puts others in backlog."""
        if not categories:
            return 0

        # Count how many we already activated today
        activated_today = await self.parsing_repo.count_discovered_today()
        remaining_quota = max(0, activation_quota - activated_today)

        count = 0
        for i, cat in enumerate(categories):
            clean_url = normalize_url(cat.url)

            # If within quota, status is 'waiting', else 'discovered' (backlog)
            # hub (list) sources are activated by default if within quota
            is_within_quota = (i < remaining_quota)
            status = "waiting" if is_within_quota else "discovered"
            is_active = True if is_within_quota else False

            # Check if source already exists
            existing = await self.parsing_repo.get_source_by_url(clean_url)
            if existing:
                continue

            source_data = {
                "url": clean_url,
                "site_key": cat.site_key,
                "type": "list", 
                "strategy": "deep", 
                "priority": 50,
                "refresh_interval_hours": 24,
                "is_active": is_active,
                "status": status,
                "config": {
                    "discovery_name": cat.name,
                    "parent_url": cat.parent_url,
                    "discovered_at": datetime.utcnow().isoformat()
                }
            }
            try:
                await self.parsing_repo.upsert_source(source_data)
                count += 1
            except Exception as e:
                logger.error(f"Failed to upsert discovered source {clean_url}: {e}")

        await self.db.commit()
        return count

Functions

ingest_categories(categories, activation_quota=50) async

Discovers new ParsingSources. Activates some, puts others in backlog.

Source code in app/services/ingestion.py
async def ingest_categories(self, categories: List[ScrapedCategory], activation_quota: int = 50):
    """Discovers new ParsingSources. Activates some, puts others in backlog."""
    if not categories:
        return 0

    # Count how many we already activated today
    activated_today = await self.parsing_repo.count_discovered_today()
    remaining_quota = max(0, activation_quota - activated_today)

    count = 0
    for i, cat in enumerate(categories):
        clean_url = normalize_url(cat.url)

        # If within quota, status is 'waiting', else 'discovered' (backlog)
        # hub (list) sources are activated by default if within quota
        is_within_quota = (i < remaining_quota)
        status = "waiting" if is_within_quota else "discovered"
        is_active = True if is_within_quota else False

        # Check if source already exists
        existing = await self.parsing_repo.get_source_by_url(clean_url)
        if existing:
            continue

        source_data = {
            "url": clean_url,
            "site_key": cat.site_key,
            "type": "list", 
            "strategy": "deep", 
            "priority": 50,
            "refresh_interval_hours": 24,
            "is_active": is_active,
            "status": status,
            "config": {
                "discovery_name": cat.name,
                "parent_url": cat.parent_url,
                "discovered_at": datetime.utcnow().isoformat()
            }
        }
        try:
            await self.parsing_repo.upsert_source(source_data)
            count += 1
        except Exception as e:
            logger.error(f"Failed to upsert discovered source {clean_url}: {e}")

    await self.db.commit()
    return count

app.services.intelligence.IntelligenceAPIClient

Hybrid Intelligence Client supporting both online (RunPod/API) and offline (DB queue) execution.

  • Online (priority='high'): Immediate execution via RunPod or Intelligence API
  • Offline (priority='low'): Task queued in database for external workers
Source code in app/services/intelligence.py
class IntelligenceAPIClient:
    """
    Hybrid Intelligence Client supporting both online (RunPod/API) and offline (DB queue) execution.

    - Online (priority='high'): Immediate execution via RunPod or Intelligence API
    - Offline (priority='low'): Task queued in database for external workers
    """
    def __init__(self):
        settings = get_settings()
        self.intelligence_api_base = settings.intelligence_api_base
        self.intelligence_api_token = settings.intelligence_api_token
        self.runpod_api_key = settings.runpod_api_key
        self.runpod_endpoint_id = settings.runpod_endpoint_id
        self.together_api_key = settings.together_api_key
        self.timeout = 10.0

    async def get_embeddings(
        self, 
        texts: List[str], 
        priority: str = "high",
        db: Optional[AsyncSession] = None
    ) -> Optional[List[List[float]]]:
        """
        Get embeddings for texts.

        Args:
            texts: List of texts to embed
            priority: 'high' for online execution, 'low' for offline queue
            db: Database session (required if priority='low')

        Returns:
            List of embedding vectors if priority='high', None if priority='low' (task queued)
        """
        if priority == "low":
            if db is None:
                raise ValueError("Database session required for offline task scheduling")
            await self._schedule_task(
                db=db,
                task_type="embedding",
                payload={"texts": texts, "model": logic_config.model_embedding}
            )
            return None

        # Online execution
        return await self._get_embeddings_online(texts)

    async def _get_embeddings_online(self, texts: List[str]) -> List[List[float]]:
        """Execute embeddings based on configured provider in logic_config."""
        provider = logic_config.llm.embedding_provider

        if provider == "runpod":
            if self.runpod_api_key and self.runpod_endpoint_id:
                try:
                    return await self._call_runpod_embeddings(texts)
                except Exception as e:
                    logger.warning(f"RunPod embeddings failed, falling back to Intelligence API: {e}")
            else:
                logger.warning("RunPod provider selected but credentials missing, falling back.")

        if provider == "together":
            if self.together_api_key:
                try:
                    return await self._call_together_embeddings(texts)
                except Exception as e:
                    logger.warning(f"Together AI embeddings failed, falling back to Intelligence API: {e}")
            else:
                logger.warning("Together provider selected but credentials missing, falling back.")

        # Default fallback to Intelligence API
        return await self._call_intelligence_api_embeddings(texts)

    async def _call_runpod_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Call RunPod serverless endpoint for embeddings."""
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                f"https://api.runpod.ai/v2/{self.runpod_endpoint_id}/runsync",
                json={
                    "input": {
                        "texts": texts,
                        "model": logic_config.model_embedding
                    }
                },
                headers={"Authorization": f"Bearer {self.runpod_api_key}"}
            )
            response.raise_for_status()
            data = response.json()
            return data["output"]["embeddings"]

    async def _call_together_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Call Together AI for embeddings."""
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                "https://api.together.xyz/v1/embeddings",
                json={
                    "input": texts,
                    "model": logic_config.model_embedding
                },
                headers={"Authorization": f"Bearer {self.together_api_key}"}
            )
            response.raise_for_status()
            data = response.json()
            # Together API returns format: {"data": [{"embedding": [...]}, ...]}
            return [item["embedding"] for item in data.get("data", [])]

    async def _call_intelligence_api_embeddings(self, texts: List[str]) -> List[List[float]]:
        """Call Intelligence API for embeddings."""
        if not self.intelligence_api_token:
            logger.warning("IntelligenceAPI token missing, using dummy embeddings.")
            return [[0.0] * 1024 for _ in texts]

        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.intelligence_api_base}/v1/embeddings",
                    json={"input": texts, "model": logic_config.model_embedding},
                    headers={"Authorization": f"Bearer {self.intelligence_api_token}"}
                )
                response.raise_for_status()
                data = response.json()
                return [item["embedding"] for item in data.get("data", [])]
        except Exception as e:
            logger.error(f"IntelligenceAPI embeddings call failed: {e}")
            return [[0.0] * 1024 for _ in texts]

    async def rerank(
        self, 
        query: str, 
        documents: List[str],
        priority: str = "high",
        db: Optional[AsyncSession] = None
    ) -> Optional[List[float]]:
        """
        Rerank documents relative to a query.

        Args:
            query: Query text
            documents: List of documents to rerank
            priority: 'high' for online execution, 'low' for offline queue
            db: Database session (required if priority='low')

        Returns:
            List of relevance scores if priority='high', None if priority='low' (task queued)
        """
        if priority == "low":
            if db is None:
                raise ValueError("Database session required for offline task scheduling")
            await self._schedule_task(
                db=db,
                task_type="rerank",
                payload={"query": query, "documents": documents}
            )
            return None

        # Online execution
        return await self._rerank_online(query, documents)

    async def _rerank_online(self, query: str, documents: List[str]) -> List[float]:
        """Execute reranking via Intelligence API."""
        if not self.intelligence_api_token:
            return [0.0] * len(documents)

        try:
            async with httpx.AsyncClient(timeout=self.timeout) as client:
                response = await client.post(
                    f"{self.intelligence_api_base}/v1/rerank",
                    json={"query": query, "documents": documents},
                    headers={"Authorization": f"Bearer {self.intelligence_api_token}"}
                )
                response.raise_for_status()
                return response.json().get("scores", [0.0] * len(documents))
        except Exception as e:
            logger.error(f"IntelligenceAPI rerank call failed: {e}")
            return [0.0] * len(documents)

    async def _schedule_task(self, db: AsyncSession, task_type: str, payload: dict) -> None:
        """Schedule a task in the database queue for offline processing."""
        task = ComputeTask(
            id=uuid.uuid4(),
            task_type=task_type,
            priority="low",
            status="pending",
            payload=payload
        )
        db.add(task)
        await db.commit()
        logger.info(f"Scheduled {task_type} task {task.id} for offline processing")

Functions

get_embeddings(texts, priority='high', db=None) async

Get embeddings for texts.

Parameters:

Name Type Description Default
texts List[str]

List of texts to embed

required
priority str

'high' for online execution, 'low' for offline queue

'high'
db Optional[AsyncSession]

Database session (required if priority='low')

None

Returns:

Type Description
Optional[List[List[float]]]

List of embedding vectors if priority='high', None if priority='low' (task queued)

Source code in app/services/intelligence.py
async def get_embeddings(
    self, 
    texts: List[str], 
    priority: str = "high",
    db: Optional[AsyncSession] = None
) -> Optional[List[List[float]]]:
    """
    Get embeddings for texts.

    Args:
        texts: List of texts to embed
        priority: 'high' for online execution, 'low' for offline queue
        db: Database session (required if priority='low')

    Returns:
        List of embedding vectors if priority='high', None if priority='low' (task queued)
    """
    if priority == "low":
        if db is None:
            raise ValueError("Database session required for offline task scheduling")
        await self._schedule_task(
            db=db,
            task_type="embedding",
            payload={"texts": texts, "model": logic_config.model_embedding}
        )
        return None

    # Online execution
    return await self._get_embeddings_online(texts)

rerank(query, documents, priority='high', db=None) async

Rerank documents relative to a query.

Parameters:

Name Type Description Default
query str

Query text

required
documents List[str]

List of documents to rerank

required
priority str

'high' for online execution, 'low' for offline queue

'high'
db Optional[AsyncSession]

Database session (required if priority='low')

None

Returns:

Type Description
Optional[List[float]]

List of relevance scores if priority='high', None if priority='low' (task queued)

Source code in app/services/intelligence.py
async def rerank(
    self, 
    query: str, 
    documents: List[str],
    priority: str = "high",
    db: Optional[AsyncSession] = None
) -> Optional[List[float]]:
    """
    Rerank documents relative to a query.

    Args:
        query: Query text
        documents: List of documents to rerank
        priority: 'high' for online execution, 'low' for offline queue
        db: Database session (required if priority='low')

    Returns:
        List of relevance scores if priority='high', None if priority='low' (task queued)
    """
    if priority == "low":
        if db is None:
            raise ValueError("Database session required for offline task scheduling")
        await self._schedule_task(
            db=db,
            task_type="rerank",
            payload={"query": query, "documents": documents}
        )
        return None

    # Online execution
    return await self._rerank_online(query, documents)

app.services.recommendation.RecommendationService

Source code in app/services/recommendation.py
class RecommendationService:
    def __init__(self, session: AsyncSession, embedding_service: EmbeddingService):
        self.session = session
        self.repo = PostgresCatalogRepository(session)
        self.embedding_service = embedding_service
        self.intelligence_client = get_intelligence_client()

    async def generate_recommendations(
        self, 
        request: RecommendationRequest,
        engine_version: str = "vector_v1"
    ) -> RecommendationResponse:
        """
        Main orchestration method for recommendation generation.
        Implements Stages A, B, C, D from the roadmap.
        """
        logger.info(f"Generating recommendations for request: {request}")

        # Stage A: Vector Retrieval
        candidates = await self._retrieve_candidates(request)

        # Stage B: CPU Ranker
        ranked_candidates = await self._rank_candidates(request, candidates)

        # Stage C: LLM-as-judge Rerank (Stub)
        final_candidates = await self._judge_rerank(request, ranked_candidates)

        # Stage D: Constraints Re-ranking (Diversity)
        final_gifts_data = self._apply_final_rank_and_diversity(request, final_candidates)

        gifts = [
            GiftDTO(
                id=g.gift_id,
                title=g.title,
                description=g.description,
                price=g.price,
                currency=g.currency,
                image_url=g.image_url,
                product_url=g.product_url,
                merchant=g.merchant,
                category=g.category
            ) for g in final_gifts_data
        ]
        featured_gift = gifts[0] if gifts else None

        return RecommendationResponse(
            quiz_run_id="stub-id", # TODO: integrate with quiz_run repo
            engine_version=engine_version,
            featured_gift=featured_gift,
            gifts=gifts,
            debug={"status": "candidates_retrieved", "count": len(candidates)} if request.debug else None
        )

    def _build_query_text(self, request: RecommendationRequest) -> str:
        """Construct semantic search query from quiz answers."""
        parts = [
            f"Gift for {request.relationship or 'someone'}",
            f"Occasion: {request.occasion}" if request.occasion else "",
            f"Age: {request.recipient_age}",
            f"Gender: {request.recipient_gender}" if request.recipient_gender else "",
            f"Interests: {', '.join(request.interests)}" if request.interests else "",
            f"Description: {request.interests_description}" if request.interests_description else "",
            f"Vibe: {request.vibe}" if request.vibe else "",
        ]
        return " ".join([p for p in parts if p]).strip()

    async def _retrieve_candidates(self, request: RecommendationRequest) -> list[Any]:
        """Stage A: Vector Retrieval (pgvector)"""
        query_text = self._build_query_text(request)
        logger.info(f"Retrieving candidates for query: '{query_text}'")

        # 1. Generate Query Embedding
        query_vector = await self.embedding_service.embed_batch_async([query_text])
        query_vector = query_vector[0]

        # 2. Search in Repo (Top 50 candidates for further ranking)
        candidates = await self.repo.search_similar_products(
            embedding=query_vector, 
            limit=50,
            is_active_only=True
        )
        return candidates

    async def _rank_candidates(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage B: CPU Ranker (Logic from SoT Section 5)"""
        return candidates

    async def _judge_rerank(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage C: LLM-as-judge Rerank (Stub for now)"""
        return candidates

    def _apply_final_rank_and_diversity(self, request: RecommendationRequest, candidates: list[Any]) -> list[Any]:
        """Stage D: Constraints Re-ranking & Diversity"""
        return candidates[:request.top_n]

    async def find_preview_products(
        self, 
        search_queries: List[str], 
        hypothesis_title: str = "", 
        max_price: Optional[int] = None,
        session_id: Optional[str] = None,
        hypothesis_id: Optional[uuid.UUID] = None,
        track_title: Optional[str] = None,
        llm_model: Optional[str] = None,
        search_context: str = "preview",
        limit_per_query: Optional[int] = None
    ) -> List[GiftDTO]:
        """
        Advanced retrieval for previews:
        1. Flexible budget (max_price + margin from logic_config)
        2. Parallel multi-query vector search
        3. Global reranking and Interleaving
        """
        from app.core.logic_config import logic_config

        # Override config with parameter if provided
        final_limit_per_query = limit_per_query or logic_config.items_per_query
        target_queries = search_queries[:logic_config.max_queries_for_preview]

        # 1. Flexible Budget
        effective_max_price = None
        if max_price:
            effective_max_price = int(max_price * (1 + logic_config.budget_margin_fraction))

        # 2. Parallel Vector Search for all queries
        async def _fetch_for_query(query: str):
            query_vectors = await self.embedding_service.embed_batch_async([query])
            if not query_vectors:
                return query, []

            candidates = await self.repo.search_similar_products(
                embedding=query_vectors[0],
                limit=logic_config.rerank_candidate_limit,
                is_active_only=True,
                max_price=effective_max_price
            )
            return query, candidates

        search_tasks = [_fetch_for_query(q) for q in target_queries]
        search_results_raw = await asyncio.gather(*search_tasks, return_exceptions=True)

        search_results = []
        for res in search_results_raw:
            if isinstance(res, Exception):
                logger.error(f"Search task failed in find_preview_products: {res}")
            else:
                search_results.append(res)

        # Map queries to their candidates and gather ALL unique candidates for reranking
        query_to_results = {}
        all_unique_candidates = {}

        # LOGGING: Track coverage for each query
        for res in search_results:
            if isinstance(res, tuple):
                query, candidates = res
                query_to_results[query] = candidates

                # Calculate simple metrics for logging
                results_count = len(candidates)
                top_sim = 0.0
                avg_sim = 0.0
                top_id = None

                if candidates:
                    # In vector search similarity is not directly in Product model, 
                    # but we can log that it found results.
                    # If we had access to the raw similarity scores from the repo, we'd log them here.
                    top_id = candidates[0].gift_id

                # Create SearchLog entry
                try:
                    log_entry = SearchLog(
                        session_id=session_id,
                        hypothesis_id=hypothesis_id,
                        track_title=track_title,
                        hypothesis_title=hypothesis_title,
                        search_context=search_context,
                        llm_model=llm_model or logic_config.model_smart,
                        search_query=query,
                        results_count=results_count,
                        top_gift_id=top_id,
                        max_price=max_price,
                        engine_version="advanced_v1"
                    )
                    self.session.add(log_entry)
                except Exception as e:
                    logger.warning(f"Failed to log search result: {e}")

                for c in candidates:
                    if c.gift_id not in all_unique_candidates:
                        all_unique_candidates[c.gift_id] = c

        candidates_list = list(all_unique_candidates.values())
        if not candidates_list:
            return []

        # 3. Global Reranking
        doc_texts = [f"{c.title} {c.description or ''}" for c in candidates_list]
        query_context = hypothesis_title or " ".join(target_queries[:2])

        try:
            scores = await self.intelligence_client.rerank(query_context, doc_texts)
            id_to_score = {c.gift_id: score for c, score in zip(candidates_list, scores)}
        except Exception as e:
            logger.error(f"RecommendationService: Reranking failed, falling back to vector scores: {e}")
            # Proactive notification
            notifier = get_notification_service()
            await notifier.notify(
                topic="intelligence_error",
                message=f"Reranking failed for hypothesis: {hypothesis_title}",
                data={"error": str(e), "doc_count": len(doc_texts)}
            )
            # Fallback: assign scores based on original retrieval order to maintain some ranking
            id_to_score = {c.gift_id: (1.0 - (idx / len(candidates_list))) for idx, c in enumerate(candidates_list)}

        # 4. Filter, Sort and pick Top N per query
        per_query_final = []
        for query in target_queries:
            candidates = query_to_results.get(query, [])
            scored = sorted(candidates, key=lambda x: id_to_score.get(x.gift_id, 0.0), reverse=True)
            per_query_final.append(scored[:final_limit_per_query])

        # 5. Interleave with Deduplication
        final_list = []
        seen_ids = set()

        for i in range(final_limit_per_query):
            for q_list in per_query_final:
                if i < len(q_list):
                    p = q_list[i]
                    if p.gift_id not in seen_ids:
                        seen_ids.add(p.gift_id)
                        final_list.append(GiftDTO(
                            id=p.gift_id,
                            title=p.title,
                            description=p.description,
                            price=float(p.price) if p.price else None,
                            currency=p.currency or "RUB",
                            image_url=p.image_url,
                            product_url=p.product_url,
                            merchant=p.merchant,
                            category=p.category
                        ))

        # 6. Create HypothesisProductLink entries for the final list
        if hypothesis_id:
            try:
                for idx, gift_dto in enumerate(final_list[:10]): # Log top 10 shown products
                    link = HypothesisProductLink(
                        hypothesis_id=hypothesis_id,
                        gift_id=gift_dto.id,
                        similarity_score=id_to_score.get(gift_dto.id, 0.0), # Use reranker score if available
                        rank_position=idx + 1,
                        was_shown=True
                    )
                    self.session.add(link)
            except Exception as e:
                logger.warning(f"Failed to link products to hypothesis in find_preview_products: {e}")

        return final_list

    async def get_deep_dive_products(
        self, 
        search_queries: List[str], 
        hypothesis_title: str,
        hypothesis_description: str,
        max_price: Optional[int] = None,
        session_id: Optional[str] = None,
        hypothesis_id: Optional[uuid.UUID] = None,
        track_title: Optional[str] = None,
        llm_model: Optional[str] = None,
        limit: int = 15
    ) -> List[GiftDTO]:
        """
        Stage E: Deep Dive - Multi-query expansion + Reranking.
        """
        logger.info(f"Deep dive for hypothesis: {hypothesis_title}")

        # 1. Multi-query search (Parallel)
        from app.core.logic_config import logic_config
        effective_max_price = None
        if max_price:
            effective_max_price = int(max_price * (1 + logic_config.budget_margin_fraction))

        async def _search_one(query: str):
            query_vectors = await self.embedding_service.embed_batch_async([query])
            if not query_vectors:
                return []
            return await self.repo.search_similar_products(
                embedding=query_vectors[0],
                limit=15,
                is_active_only=True,
                max_price=effective_max_price
            )

        search_tasks = [_search_one(q) for q in search_queries[:3]]
        search_results_raw = await asyncio.gather(*search_tasks, return_exceptions=True)

        search_results = []
        for res in search_results_raw:
            if isinstance(res, Exception):
                logger.error(f"Search task failed in get_deep_dive_products: {res}")
            else:
                search_results.append(res)

        all_candidates = {}
        for results in search_results:
            for c in results:
                if c.gift_id not in all_candidates:
                    all_candidates[c.gift_id] = c

        candidates_list = list(all_candidates.values())
        if not candidates_list:
            # Log zero results for all queries
            for q in search_queries[:3]:
                 self.session.add(SearchLog(
                     search_query=q,
                     results_count=0,
                     engine_version="deep_dive_v1"
                 ))
            return []

        # LOGGING: Log aggregate for deep dive
        for q in search_queries[:3]:
            self.session.add(SearchLog(
                session_id=session_id,
                hypothesis_id=hypothesis_id,
                track_title=track_title,
                hypothesis_title=hypothesis_title,
                search_context="deep_dive",
                llm_model=llm_model or logic_config.model_smart,
                search_query=q,
                results_count=len(search_results[0]) if len(search_results) > 0 else 0,
                max_price=max_price,
                engine_version="advanced_v1"
            ))

        # 2. Reranking
        doc_texts = [f"{c.title} {c.description or ''}" for c in candidates_list]
        query_context = f"{hypothesis_title} {hypothesis_description}"

        try:
            scores = await self.intelligence_client.rerank(query_context, doc_texts)
            id_to_score = {c.gift_id: score for c, score in zip(candidates_list, scores)}
        except Exception as e:
            logger.error(f"RecommendationService: Deep dive reranking failed, falling back to vector order: {e}")
            # Proactive notification
            notifier = get_notification_service()
            await notifier.notify(
                topic="intelligence_error",
                message=f"Deep dive reranking failed for: {hypothesis_title}",
                data={"error": str(e)}
            )
            id_to_score = {c.gift_id: (1.0 - (idx / len(candidates_list))) for idx, c in enumerate(candidates_list)}

        # Sort candidates list based on scores
        candidates_list.sort(key=lambda x: id_to_score.get(x.gift_id, 0.0), reverse=True)
        scored_products = candidates_list[:limit]

        # 3. Final DTO conversion
        results = []
        for p in scored_products:
            results.append(GiftDTO(
                id=p.gift_id,
                title=p.title,
                description=p.description,
                price=float(p.price) if p.price else None,
                currency=p.currency or "RUB",
                image_url=p.image_url,
                product_url=p.product_url,
                merchant=p.merchant,
                category=p.category
            ))

        return results

Functions

find_preview_products(search_queries, hypothesis_title='', max_price=None, session_id=None, hypothesis_id=None, track_title=None, llm_model=None, search_context='preview', limit_per_query=None) async

Advanced retrieval for previews: 1. Flexible budget (max_price + margin from logic_config) 2. Parallel multi-query vector search 3. Global reranking and Interleaving

Source code in app/services/recommendation.py
async def find_preview_products(
    self, 
    search_queries: List[str], 
    hypothesis_title: str = "", 
    max_price: Optional[int] = None,
    session_id: Optional[str] = None,
    hypothesis_id: Optional[uuid.UUID] = None,
    track_title: Optional[str] = None,
    llm_model: Optional[str] = None,
    search_context: str = "preview",
    limit_per_query: Optional[int] = None
) -> List[GiftDTO]:
    """
    Advanced retrieval for previews:
    1. Flexible budget (max_price + margin from logic_config)
    2. Parallel multi-query vector search
    3. Global reranking and Interleaving
    """
    from app.core.logic_config import logic_config

    # Override config with parameter if provided
    final_limit_per_query = limit_per_query or logic_config.items_per_query
    target_queries = search_queries[:logic_config.max_queries_for_preview]

    # 1. Flexible Budget
    effective_max_price = None
    if max_price:
        effective_max_price = int(max_price * (1 + logic_config.budget_margin_fraction))

    # 2. Parallel Vector Search for all queries
    async def _fetch_for_query(query: str):
        query_vectors = await self.embedding_service.embed_batch_async([query])
        if not query_vectors:
            return query, []

        candidates = await self.repo.search_similar_products(
            embedding=query_vectors[0],
            limit=logic_config.rerank_candidate_limit,
            is_active_only=True,
            max_price=effective_max_price
        )
        return query, candidates

    search_tasks = [_fetch_for_query(q) for q in target_queries]
    search_results_raw = await asyncio.gather(*search_tasks, return_exceptions=True)

    search_results = []
    for res in search_results_raw:
        if isinstance(res, Exception):
            logger.error(f"Search task failed in find_preview_products: {res}")
        else:
            search_results.append(res)

    # Map queries to their candidates and gather ALL unique candidates for reranking
    query_to_results = {}
    all_unique_candidates = {}

    # LOGGING: Track coverage for each query
    for res in search_results:
        if isinstance(res, tuple):
            query, candidates = res
            query_to_results[query] = candidates

            # Calculate simple metrics for logging
            results_count = len(candidates)
            top_sim = 0.0
            avg_sim = 0.0
            top_id = None

            if candidates:
                # In vector search similarity is not directly in Product model, 
                # but we can log that it found results.
                # If we had access to the raw similarity scores from the repo, we'd log them here.
                top_id = candidates[0].gift_id

            # Create SearchLog entry
            try:
                log_entry = SearchLog(
                    session_id=session_id,
                    hypothesis_id=hypothesis_id,
                    track_title=track_title,
                    hypothesis_title=hypothesis_title,
                    search_context=search_context,
                    llm_model=llm_model or logic_config.model_smart,
                    search_query=query,
                    results_count=results_count,
                    top_gift_id=top_id,
                    max_price=max_price,
                    engine_version="advanced_v1"
                )
                self.session.add(log_entry)
            except Exception as e:
                logger.warning(f"Failed to log search result: {e}")

            for c in candidates:
                if c.gift_id not in all_unique_candidates:
                    all_unique_candidates[c.gift_id] = c

    candidates_list = list(all_unique_candidates.values())
    if not candidates_list:
        return []

    # 3. Global Reranking
    doc_texts = [f"{c.title} {c.description or ''}" for c in candidates_list]
    query_context = hypothesis_title or " ".join(target_queries[:2])

    try:
        scores = await self.intelligence_client.rerank(query_context, doc_texts)
        id_to_score = {c.gift_id: score for c, score in zip(candidates_list, scores)}
    except Exception as e:
        logger.error(f"RecommendationService: Reranking failed, falling back to vector scores: {e}")
        # Proactive notification
        notifier = get_notification_service()
        await notifier.notify(
            topic="intelligence_error",
            message=f"Reranking failed for hypothesis: {hypothesis_title}",
            data={"error": str(e), "doc_count": len(doc_texts)}
        )
        # Fallback: assign scores based on original retrieval order to maintain some ranking
        id_to_score = {c.gift_id: (1.0 - (idx / len(candidates_list))) for idx, c in enumerate(candidates_list)}

    # 4. Filter, Sort and pick Top N per query
    per_query_final = []
    for query in target_queries:
        candidates = query_to_results.get(query, [])
        scored = sorted(candidates, key=lambda x: id_to_score.get(x.gift_id, 0.0), reverse=True)
        per_query_final.append(scored[:final_limit_per_query])

    # 5. Interleave with Deduplication
    final_list = []
    seen_ids = set()

    for i in range(final_limit_per_query):
        for q_list in per_query_final:
            if i < len(q_list):
                p = q_list[i]
                if p.gift_id not in seen_ids:
                    seen_ids.add(p.gift_id)
                    final_list.append(GiftDTO(
                        id=p.gift_id,
                        title=p.title,
                        description=p.description,
                        price=float(p.price) if p.price else None,
                        currency=p.currency or "RUB",
                        image_url=p.image_url,
                        product_url=p.product_url,
                        merchant=p.merchant,
                        category=p.category
                    ))

    # 6. Create HypothesisProductLink entries for the final list
    if hypothesis_id:
        try:
            for idx, gift_dto in enumerate(final_list[:10]): # Log top 10 shown products
                link = HypothesisProductLink(
                    hypothesis_id=hypothesis_id,
                    gift_id=gift_dto.id,
                    similarity_score=id_to_score.get(gift_dto.id, 0.0), # Use reranker score if available
                    rank_position=idx + 1,
                    was_shown=True
                )
                self.session.add(link)
        except Exception as e:
            logger.warning(f"Failed to link products to hypothesis in find_preview_products: {e}")

    return final_list

generate_recommendations(request, engine_version='vector_v1') async

Main orchestration method for recommendation generation. Implements Stages A, B, C, D from the roadmap.

Source code in app/services/recommendation.py
async def generate_recommendations(
    self, 
    request: RecommendationRequest,
    engine_version: str = "vector_v1"
) -> RecommendationResponse:
    """
    Main orchestration method for recommendation generation.
    Implements Stages A, B, C, D from the roadmap.
    """
    logger.info(f"Generating recommendations for request: {request}")

    # Stage A: Vector Retrieval
    candidates = await self._retrieve_candidates(request)

    # Stage B: CPU Ranker
    ranked_candidates = await self._rank_candidates(request, candidates)

    # Stage C: LLM-as-judge Rerank (Stub)
    final_candidates = await self._judge_rerank(request, ranked_candidates)

    # Stage D: Constraints Re-ranking (Diversity)
    final_gifts_data = self._apply_final_rank_and_diversity(request, final_candidates)

    gifts = [
        GiftDTO(
            id=g.gift_id,
            title=g.title,
            description=g.description,
            price=g.price,
            currency=g.currency,
            image_url=g.image_url,
            product_url=g.product_url,
            merchant=g.merchant,
            category=g.category
        ) for g in final_gifts_data
    ]
    featured_gift = gifts[0] if gifts else None

    return RecommendationResponse(
        quiz_run_id="stub-id", # TODO: integrate with quiz_run repo
        engine_version=engine_version,
        featured_gift=featured_gift,
        gifts=gifts,
        debug={"status": "candidates_retrieved", "count": len(candidates)} if request.debug else None
    )

get_deep_dive_products(search_queries, hypothesis_title, hypothesis_description, max_price=None, session_id=None, hypothesis_id=None, track_title=None, llm_model=None, limit=15) async

Stage E: Deep Dive - Multi-query expansion + Reranking.

Source code in app/services/recommendation.py
async def get_deep_dive_products(
    self, 
    search_queries: List[str], 
    hypothesis_title: str,
    hypothesis_description: str,
    max_price: Optional[int] = None,
    session_id: Optional[str] = None,
    hypothesis_id: Optional[uuid.UUID] = None,
    track_title: Optional[str] = None,
    llm_model: Optional[str] = None,
    limit: int = 15
) -> List[GiftDTO]:
    """
    Stage E: Deep Dive - Multi-query expansion + Reranking.
    """
    logger.info(f"Deep dive for hypothesis: {hypothesis_title}")

    # 1. Multi-query search (Parallel)
    from app.core.logic_config import logic_config
    effective_max_price = None
    if max_price:
        effective_max_price = int(max_price * (1 + logic_config.budget_margin_fraction))

    async def _search_one(query: str):
        query_vectors = await self.embedding_service.embed_batch_async([query])
        if not query_vectors:
            return []
        return await self.repo.search_similar_products(
            embedding=query_vectors[0],
            limit=15,
            is_active_only=True,
            max_price=effective_max_price
        )

    search_tasks = [_search_one(q) for q in search_queries[:3]]
    search_results_raw = await asyncio.gather(*search_tasks, return_exceptions=True)

    search_results = []
    for res in search_results_raw:
        if isinstance(res, Exception):
            logger.error(f"Search task failed in get_deep_dive_products: {res}")
        else:
            search_results.append(res)

    all_candidates = {}
    for results in search_results:
        for c in results:
            if c.gift_id not in all_candidates:
                all_candidates[c.gift_id] = c

    candidates_list = list(all_candidates.values())
    if not candidates_list:
        # Log zero results for all queries
        for q in search_queries[:3]:
             self.session.add(SearchLog(
                 search_query=q,
                 results_count=0,
                 engine_version="deep_dive_v1"
             ))
        return []

    # LOGGING: Log aggregate for deep dive
    for q in search_queries[:3]:
        self.session.add(SearchLog(
            session_id=session_id,
            hypothesis_id=hypothesis_id,
            track_title=track_title,
            hypothesis_title=hypothesis_title,
            search_context="deep_dive",
            llm_model=llm_model or logic_config.model_smart,
            search_query=q,
            results_count=len(search_results[0]) if len(search_results) > 0 else 0,
            max_price=max_price,
            engine_version="advanced_v1"
        ))

    # 2. Reranking
    doc_texts = [f"{c.title} {c.description or ''}" for c in candidates_list]
    query_context = f"{hypothesis_title} {hypothesis_description}"

    try:
        scores = await self.intelligence_client.rerank(query_context, doc_texts)
        id_to_score = {c.gift_id: score for c, score in zip(candidates_list, scores)}
    except Exception as e:
        logger.error(f"RecommendationService: Deep dive reranking failed, falling back to vector order: {e}")
        # Proactive notification
        notifier = get_notification_service()
        await notifier.notify(
            topic="intelligence_error",
            message=f"Deep dive reranking failed for: {hypothesis_title}",
            data={"error": str(e)}
        )
        id_to_score = {c.gift_id: (1.0 - (idx / len(candidates_list))) for idx, c in enumerate(candidates_list)}

    # Sort candidates list based on scores
    candidates_list.sort(key=lambda x: id_to_score.get(x.gift_id, 0.0), reverse=True)
    scored_products = candidates_list[:limit]

    # 3. Final DTO conversion
    results = []
    for p in scored_products:
        results.append(GiftDTO(
            id=p.gift_id,
            title=p.title,
            description=p.description,
            price=float(p.price) if p.price else None,
            currency=p.currency or "RUB",
            image_url=p.image_url,
            product_url=p.product_url,
            merchant=p.merchant,
            category=p.category
        ))

    return results