Skip to content

Chat

Prompt

Bases: BaseModel

Data model for user input in the chat API. Represents a user-provided prompt for querying the chat service.

Attributes:

Name Type Description
prompt str | None

The user's input query.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
74
75
76
77
78
79
80
81
82
class Prompt(BaseModel):
    """
    Data model for user input in the chat API. Represents a user-provided prompt for querying the chat service.

    Attributes:
        prompt (str | None): The user's input query.
    """

    prompt: str | None = None

answer_question(prompt, context, sql_query=None, sql_results=None) async

Generates an answer based on a given prompt and context using an asynchronous streaming response. It streams the model's response as it becomes available, yielding chunks of data to the client.

Parameters:

Name Type Description Default
prompt str

The prompt for which the question needs to be answered.

required
context str

The context related to the prompt for providing an accurate answer.

required
sql_query str

The SQL query generated by the language model.

None
sql_results list

The results of the SQL query execution.

None

Yields:

Name Type Description
str

The streamed response from the model.

Raises:

Type Description
HTTPException

If there are issues with connecting to the API or parsing data from the response.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def answer_question(
    prompt: str,
    context: str,
    sql_query: Optional[str] = None,
    sql_results: Optional[list] = None,
):
    """
    Generates an answer based on a given prompt and context using an asynchronous streaming response.
    It streams the model's response as it becomes available, yielding chunks of data to the client.

    Parameters:
        prompt (str): The prompt for which the question needs to be answered.
        context (str): The context related to the prompt for providing an accurate answer.
        sql_query (str, optional): The SQL query generated by the language model.
        sql_results (list, optional): The results of the SQL query execution.

    Yields:
        str: The streamed response from the model.

    Raises:
        HTTPException: If there are issues with connecting to the API or parsing data from the response.
    """
    try:
        # If sql_query and sql_results are provided, yield them first.
        if sql_query and sql_results:
            initial_data = {"sql_query": sql_query, "sql_results": sql_results}
            # Yield the initial data as JSON.
            yield f"data: {json.dumps(initial_data)}\n\n"

        # Construct the initial system and user messages for the AI model.
        response = await async_client.chat.completions.create(
            model=model,
            messages=[
                {
                    "role": "system",
                    "content": """You are a helpful and friendly assistant.
                    You answer questions with context the user provides.
                    You ONLY answer using the context provided.
                    """,
                },
                {
                    "role": "user",
                    "content": f"Please answer the prompt with the following context:\n\n<prompt>{prompt}\n\n<context>{context}",
                },
            ],
            stream=True,
        )

        # Accumulate the streamed response incrementally.
        streamed_response = ""
        async for chunk in response:
            # Check if the response contains any content and append it to the streamed response.
            if len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                if delta.content:
                    streamed_response += delta.content
                    yield f"data: {streamed_response}\n\n"
    except Exception as e:
        # Log any errors encountered during response parsing and raise an HTTPException.
        print("Error with streaming response: " + str(e))
        raise HTTPException(503, "Error parsing data from LLM API")

background_chat(messages, return_sql_info=False) async

Handles the creation and processing of chat messages with the language model. It sends messages to the model, parses the response, and interacts with the database to fetch results.

Parameters:

Name Type Description Default
messages list

A list of dictionaries representing the conversation history.

required
return_sql_info bool

If True, returns the SQL query and results along with the chat response. Default is False.

False

Returns:

Type Description

list or tuple: - If return_sql_info is False, returns the query results. - If return_sql_info is True, returns a tuple containing: - sql_query (str): The SQL query generated by the language model. - sql_results (list): The results of the SQL query execution.

Raises:

Type Description
HTTPException

If there is an issue creating the conversation.

NoResultsException

If no results are found in the database for the provided query.

SQLQueryException

If there is an issue with the SQL query execution.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
async def background_chat(messages: list, return_sql_info: bool = False):
    """
    Handles the creation and processing of chat messages with the language model.
    It sends messages to the model, parses the response, and interacts with the database to fetch results.

    Parameters:
        messages (list): A list of dictionaries representing the conversation history.
        return_sql_info (bool, optional): If True, returns the SQL query and results along with the chat response.
                                          Default is False.

    Returns:
        list or tuple:
            - If return_sql_info is False, returns the query results.
            - If return_sql_info is True, returns a tuple containing:
                - sql_query (str): The SQL query generated by the language model.
                - sql_results (list): The results of the SQL query execution.

    Raises:
        HTTPException: If there is an issue creating the conversation.
        NoResultsException: If no results are found in the database for the provided query.
        SQLQueryException: If there is an issue with the SQL query execution.
    """
    try:
        # Send the conversation history to the OpenAI client for response generation.
        response = client.chat.completions.create(model="gpt-4o", messages=messages)
    except Exception as e:
        # Log and raise an HTTPException if there is an error in API communication.
        print("Error in creating conversation:", str(e))
        raise HTTPException(503, error503)

    try:
        # Extract the response message, cleaning up formatting artifacts.
        sql_query = (
            response.choices[0]
            .message.content.replace("```sql", "")
            .replace("```", "")
            .strip()
        )
        print(sql_query)
        # Add the assistant's message back into the conversation history.
        messages.append({"role": "assistant", "content": sql_query})

        # Establish a connection to the database using a tunnel.
        tunnel, conn = await get_mssql_connection()

        try:
            # Execute the extracted SQL query against the database and fetch results.
            sql_results = await query_mssql_database(sql_query, conn, tunnel)
            # Raise a custom exception if no results are found.
            if sql_results is None or len(sql_results) == 0:
                raise NoResultsException(
                    "No results found. Please try to rephrase your query."
                )
            if return_sql_info:
                # Return the SQL query and SQL results.
                return sql_query, sql_results
            else:
                # Return only the query results.
                return sql_results
        except exc.SQLAlchemyError as e:
            # Handle SQL errors with a custom exception for better error reporting.
            raise SQLQueryException(str(e))
    except SQLQueryException as e:
        # Reraise SQL-specific exceptions for external handling.
        raise e
    except Exception as e:
        # Log and raise any other exceptions that occur during response processing.
        print("Response (Streaming) Error: " + str(e))
        raise e

chat(prompt, return_sql_info=Query(False, description='Set to True to return SQL query and results'), session=Depends(my_cxo_db_conn.get_db_sess)) async

Processes a user's prompt by querying a database and generating responses using embeddings and similarity calculations. It retries up to three times in case of unsuccessful queries, aiming to refine the results and provide better responses.

Parameters:

Name Type Description Default
prompt Prompt

The user's input query.

required
return_sql_info bool

If True, returns the SQL query and results along with the chat response. Default is False.

Query(False, description='Set to True to return SQL query and results')
session AsyncSession

AsyncSession: Database session for executing queries.

Depends(get_db_sess)

Returns:

Name Type Description
StreamingResponse
  • Streams the response to the client, including the SQL query and results if requested.

Raises:

Type Description
HTTPException

If there is an issue processing the request.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
@chat_router.post("/", response_class=StreamingResponse)
async def chat(
    prompt: Prompt,
    return_sql_info: bool = Query(
        False, description="Set to True to return SQL query and results"
    ),
    session: AsyncSession = Depends(my_cxo_db_conn.get_db_sess),
):
    """
    Processes a user's prompt by querying a database and generating responses using embeddings and similarity calculations.
    It retries up to three times in case of unsuccessful queries, aiming to refine the results and provide better responses.

    Parameters:
        prompt (Prompt): The user's input query.
        return_sql_info (bool, optional): If True, returns the SQL query and results along with the chat response.
                                          Default is False.
        session: AsyncSession: Database session for executing queries.

    Returns:
        StreamingResponse:
            - Streams the response to the client, including the SQL query and results if requested.

    Raises:
        HTTPException: If there is an issue processing the request.
    """
    # Fetch synonym maps and merchant notes for enriching the prompt context.
    synonym_map = get_synonym_map()
    merchant_notes = get_merchant_notes()

    # Determine relevant tables based on the user's prompt.
    tables = await get_tables_to_query(prompt.prompt)
    tables_json = json.loads(tables)

    # Connect to the database and register the vector extension for similarity searches.
    await register_vector(session)

    # Create an embedding request for the relevant tables.
    tables_refs = ",".join(tables_json["table_names"])
    data = {
        "model": "nomic-embed-text",
        "input": "search_query: " + tables_refs,
    }

    # Send the embedding request to an external API for processing.
    response = requests.post("https://ollama.boxtalk.ai/api/embed", json=data)
    res_data = response.json()
    embeddings = res_data["embeddings"][0]

    # Query the database for similar documents using the embedding.
    stmt = text(
        "SELECT * FROM documents WHERE customer_id = 1 ORDER BY embedding <-> :embedding LIMIT 10"
    )
    result = await session.execute(stmt, {"embedding": embeddings})
    results = result.scalars().all()

    # Gather the schema data from the query results for use in generating the response.
    schema_data = [row["content"] for row in results]

    # Load templates for generating system and user messages for SQL generation.
    template = jinja_env.get_template("sql_gen_system_prompt.jinja")
    user_template = jinja_env.get_template("sql_gen_user_prompt.jinja")
    messages = [
        {"role": "system", "content": template.render()},
        {
            "role": "user",
            "content": user_template.render(
                schemas=schema_data,
                user_question=prompt.prompt,
                synonyms=synonym_map,
                user_notes=merchant_notes,
            ),
        },
    ]

    retries = 0  # Initialize retry counter for reattempting the query.
    sql_query = None  # Initialize sql_query.
    sql_results = None  # Initialize sql_results.

    while retries < 3:
        context = "Rephrase your question please."
        try:
            # Attempt to generate an answer with the current context.
            if return_sql_info:
                # Get sql_query and sql_results.
                sql_query, sql_results = await background_chat(
                    messages, return_sql_info
                )
                # Use sql_results as context for the answer_question function.
                context = f"SQL Query:\n{sql_query}\n\nSQL Results:\n{sql_results}"
            else:
                # Get the query results.
                sql_results = await background_chat(messages)
                # Use sql_results as context for the answer_question function.
                context = f"SQL Results:\n{sql_results}"
            break  # Exit the loop if the query is successful.
        except NoResultsException:
            # Handle cases where no results are found and prompt for a query rewrite.
            print("No results found. Please try to rephrase your query.")
            messages.append(
                {
                    "role": "user",
                    "content": "No results found. Please try to rewrite the SQL query in correct TSQL syntax.",
                }
            )
            retries += 1
            continue  # Retry with a modified message.
        except SQLQueryException as e:
            # Handle SQL query errors by instructing the model to fix the query.
            messages.append(
                {
                    "role": "user",
                    "content": f"Rewrite the query in proper TSQL syntax to fix the error below.\n\n Error: {e}",
                }
            )
            retries += 1
            continue  # Retry with a modified message.

    # Return the final answer as a streaming response.
    return StreamingResponse(
        answer_question(
            prompt.prompt,
            context,
            sql_query=sql_query if return_sql_info else None,
            sql_results=sql_results if return_sql_info else None,
        ),
        media_type="text/event-stream",
    )

get_include_list()

Reads the 'include.json' file in STATIC_DIR and returns the list of tables to include. Assumes the JSON file contains a 'tables' key with a list of table names.

Returns:

Name Type Description
list

List of tables to include.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
49
50
51
52
53
54
55
56
57
58
59
def get_include_list():
    """
    Reads the 'include.json' file in STATIC_DIR and returns the list of tables to include.
    Assumes the JSON file contains a 'tables' key with a list of table names.

    Returns:
        list: List of tables to include.
    """
    with open(STATIC_DIR / "include.json") as f:
        data = json.load(f)
        return data["tables"]

get_merchant_notes()

Reads merchant notes from the 'merchant_notes.md' file in STATIC_DIR and returns its content. Assumes the file exists at the specified path.

Returns:

Name Type Description
str

The content of the merchant notes file.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
62
63
64
65
66
67
68
69
70
71
def get_merchant_notes():
    """
    Reads merchant notes from the 'merchant_notes.md' file in STATIC_DIR and returns its content.
    Assumes the file exists at the specified path.

    Returns:
        str: The content of the merchant notes file.
    """
    with open(STATIC_DIR / "merchant_notes.md") as f:
        return f.read()

get_synonym_map()

Reads the synonym map from the 'synonym_map.md' file in STATIC_DIR and returns its content as a string. Assumes the file exists at the specified path.

Returns:

Name Type Description
str

The content of the synonym map file.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
37
38
39
40
41
42
43
44
45
46
def get_synonym_map():
    """
    Reads the synonym map from the 'synonym_map.md' file in STATIC_DIR and returns its content as a string.
    Assumes the file exists at the specified path.

    Returns:
        str: The content of the synonym map file.
    """
    with open(STATIC_DIR / "synonym_map.md") as f:
        return f.read()

get_tables_to_query(prompt) async

Generates a list of tables relevant to a given prompt by rendering Jinja2 templates for system and user messages. It uses the synonym map, include list, and merchant notes to craft a detailed prompt for the language model.

Parameters:

Name Type Description Default
prompt str

The user's query to identify relevant tables.

required

Returns:

Name Type Description
str

JSON string containing table information.

Raises:

Type Description
HTTPException

If there is an error in creating campaigns or processing the response from the chat completion client.

Source code in mycxo/boxtalk/routes/api/chat/chat.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
async def get_tables_to_query(prompt: str):
    """
    Generates a list of tables relevant to a given prompt by rendering Jinja2 templates for system and user messages.
    It uses the synonym map, include list, and merchant notes to craft a detailed prompt for the language model.

    Parameters:
        prompt (str): The user's query to identify relevant tables.

    Returns:
        str: JSON string containing table information.

    Raises:
        HTTPException: If there is an error in creating campaigns or processing the response from the chat completion client.
    """
    # Retrieve necessary information for generating the system and user messages.
    synonym_map = get_synonym_map()
    include_table_list = get_include_list()
    merchant_notes = get_merchant_notes()

    # Load and render the system template message.
    system_template = jinja_env.get_template("init_system_prompt.jinja")
    system_message = system_template.render()

    # Load and render the user template message, providing context for the user's query.
    user_template = jinja_env.get_template("init_user_prompt.jinja")
    user_message = user_template.render(
        {
            "table_names": include_table_list,
            "synonyms": synonym_map,
            "user_notes": merchant_notes,
            "question": prompt,
        }
    )

    # Construct the conversation history with the system and user messages.
    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_message},
    ]

    try:
        # Send the messages to the language model for table identification.
        response = client.chat.completions.create(model=model, messages=messages)
    except Exception as e:
        # Log and raise an error if there is an issue in the API communication.
        print("Error in creating campaigns:", str(e))
        raise HTTPException(503, error503)

    try:
        # Extract and clean up the JSON message from the response.
        message = (
            response.choices[0]
            .message.content.replace("```json", "")
            .replace("```", "")
            .strip()
        )
        print(message)
        return message
    except Exception as e:
        # Log and raise an error if there is an issue with response parsing.
        print("Error Response (Streaming) Error: " + str(e))
        raise HTTPException(503, error503)