1. Concurrent prompt processing

Use concurrent prompt processing when an application needs to send multiple independent prompts without waiting for each prompt to finish before starting the next one. The select_ai module supports this pattern with both the synchronous Profile API and the asynchronous AsyncProfile API.

Create a connection pool before running concurrent work. Use select_ai.create_pool() for synchronous recipes and select_ai.create_pool_async() for asynchronous recipes.

1.1. Recipe summary

Recipe

Script

When to use

Sync completion

sync_thread_pool.py

Use ThreadPoolExecutor when prompts are independent and results can be handled as soon as each prompt completes.

Sync input order

sync_ordered_results.py

Use ThreadPoolExecutor.map() when result order must match the input prompt order.

Sync queue

sync_queue_workers.py

Use worker threads and a queue for producer-consumer workloads where prompts may arrive over time.

Async input order

async_gather.py

Use asyncio.gather() when result order must match the input prompt order.

Async completion

async_as_completed.py

Use asyncio.as_completed() when each result should be processed as soon as it is available.

Async pipeline

async_pipeline.py

Use run_pipeline() when all prompt/action pairs are known up front and should be sent in a single database round trip.

Async queue

async_queue_workers.py

Use async queue workers for long-running async services or background prompt processors.

1.2. Environment variables

The recipes use the same connection environment variables as the other samples:

export SELECT_AI_USER=<select_ai_db_user>
export SELECT_AI_PASSWORD=<select_ai_db_password>
export SELECT_AI_DB_CONNECT_STRING=<db_connect_string>

Optional environment variables control pool sizing and profile names:

export SELECT_AI_POOL_MIN=1
export SELECT_AI_POOL_MAX=4
export SELECT_AI_POOL_INCREMENT=1
export SELECT_AI_PROFILE_NAME=oci_ai_profile

Use SELECT_AI_PROFILE_NAME=async_oci_ai_profile for the async recipes if that is the async profile name in your environment.

1.3. sync_thread_pool.py

This recipe uses ThreadPoolExecutor and as_completed(). Results are printed in the order they finish.

import os
from concurrent.futures import ThreadPoolExecutor, as_completed

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
max_workers = int(os.getenv("SELECT_AI_MAX_WORKERS", str(pool_max)))

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


def show_sql(prompt):
    profile = select_ai.Profile(profile_name=profile_name)
    return prompt, profile.show_sql(prompt=prompt)


select_ai.create_pool(
    user=user,
    password=password,
    dsn=dsn,
    min_size=pool_min,
    max_size=pool_max,
    increment=pool_increment,
)

try:
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(show_sql, prompt) for prompt in prompts]

        for future in as_completed(futures):
            prompt, sql = future.result()
            print(f"\nPrompt: {prompt}")
            print(sql)
finally:
    select_ai.disconnect()

1.4. sync_ordered_results.py

This recipe uses ThreadPoolExecutor.map(). Prompts run concurrently, but results are printed in the same order as the input list.

import os
from concurrent.futures import ThreadPoolExecutor

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
max_workers = int(os.getenv("SELECT_AI_MAX_WORKERS", str(pool_max)))

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


def show_sql(prompt):
    profile = select_ai.Profile(profile_name=profile_name)
    return profile.show_sql(prompt=prompt)


select_ai.create_pool(
    user=user,
    password=password,
    dsn=dsn,
    min_size=pool_min,
    max_size=pool_max,
    increment=pool_increment,
)

try:
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = executor.map(show_sql, prompts)

        for prompt, sql in zip(prompts, results):
            print(f"\nPrompt: {prompt}")
            print(sql)
finally:
    select_ai.disconnect()

1.5. sync_queue_workers.py

This recipe uses worker threads and queue.Queue. It is useful when prompt producers and prompt processors are separate parts of an application.

import os
from queue import Queue
from threading import Thread

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
worker_count = int(os.getenv("SELECT_AI_WORKER_COUNT", str(pool_max)))

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


def worker(queue, results):
    while True:
        item = queue.get()
        try:
            if item is None:
                return

            index, prompt = item
            profile = select_ai.Profile(profile_name=profile_name)
            sql = profile.show_sql(prompt=prompt)
            results[index] = (prompt, sql)
        finally:
            queue.task_done()


select_ai.create_pool(
    user=user,
    password=password,
    dsn=dsn,
    min_size=pool_min,
    max_size=pool_max,
    increment=pool_increment,
)

try:
    queue = Queue()
    results = [None] * len(prompts)

    workers = [
        Thread(target=worker, args=(queue, results))
        for _ in range(worker_count)
    ]
    for thread in workers:
        thread.start()

    for index, prompt in enumerate(prompts):
        queue.put((index, prompt))

    for _ in workers:
        queue.put(None)

    queue.join()
    for thread in workers:
        thread.join()

    for prompt, sql in results:
        print(f"\nPrompt: {prompt}")
        print(sql)
finally:
    select_ai.disconnect()

1.6. async_gather.py

This recipe uses asyncio.gather(). Prompts run concurrently, and results are returned in the same order as the input task list.

import asyncio
import os

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


async def show_sql(profile, prompt):
    return await profile.show_sql(prompt=prompt)


async def main():
    select_ai.create_pool_async(
        user=user,
        password=password,
        dsn=dsn,
        min_size=pool_min,
        max_size=pool_max,
        increment=pool_increment,
    )

    try:
        profile = await select_ai.AsyncProfile(profile_name=profile_name)

        tasks = [show_sql(profile, prompt) for prompt in prompts]
        results = await asyncio.gather(*tasks)

        for prompt, sql in zip(prompts, results):
            print(f"\nPrompt: {prompt}")
            print(sql)
    finally:
        await select_ai.async_disconnect()


asyncio.run(main())

1.7. async_as_completed.py

This recipe uses asyncio.as_completed(). It is useful for command-line tools or services that can forward each answer as soon as it is ready.

import asyncio
import os

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


async def show_sql(profile, prompt):
    sql = await profile.show_sql(prompt=prompt)
    return prompt, sql


async def main():
    select_ai.create_pool_async(
        user=user,
        password=password,
        dsn=dsn,
        min_size=pool_min,
        max_size=pool_max,
        increment=pool_increment,
    )

    try:
        profile = await select_ai.AsyncProfile(profile_name=profile_name)
        tasks = [show_sql(profile, prompt) for prompt in prompts]

        for task in asyncio.as_completed(tasks):
            prompt, sql = await task
            print(f"\nPrompt: {prompt}")
            print(sql)
    finally:
        await select_ai.async_disconnect()


asyncio.run(main())

1.8. async_pipeline.py

This recipe uses AsyncProfile.run_pipeline() to send multiple prompt/action pairs in one database round trip. This is different from Python task concurrency: the application submits a batch and receives the batch results when the pipeline completes.

import asyncio
import os

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")

prompt_specifications = [
    ("How many customers?", select_ai.Action.SHOWSQL),
    ("How many promotions?", select_ai.Action.RUNSQL),
    ("Explain how to count products.", select_ai.Action.EXPLAINSQL),
]


async def main():
    select_ai.create_pool_async(
        user=user,
        password=password,
        dsn=dsn,
        min_size=pool_min,
        max_size=pool_max,
        increment=pool_increment,
    )

    try:
        profile = await select_ai.AsyncProfile(profile_name=profile_name)
        results = await profile.run_pipeline(
            prompt_specifications, continue_on_error=True
        )

        for (prompt, action), result in zip(prompt_specifications, results):
            print(f"\nPrompt: {prompt}")
            print(f"Action: {action}")
            print(result)
    finally:
        await select_ai.async_disconnect()


asyncio.run(main())

1.9. async_queue_workers.py

This recipe uses asyncio.Queue and async worker tasks. It is useful for long-running async applications that receive prompts over time.

import asyncio
import os

import select_ai

user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")

pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))

profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")
worker_count = int(os.getenv("SELECT_AI_WORKER_COUNT", str(pool_max)))

prompts = [
    "How many customers?",
    "How many products?",
    "How many promotions?",
    "List the top 5 customers by sales.",
]


async def worker(name, profile, queue, results):
    while True:
        item = await queue.get()
        try:
            if item is None:
                return

            index, prompt = item
            sql = await profile.show_sql(prompt=prompt)
            results[index] = (prompt, sql)
        finally:
            queue.task_done()


async def main():
    select_ai.create_pool_async(
        user=user,
        password=password,
        dsn=dsn,
        min_size=pool_min,
        max_size=pool_max,
        increment=pool_increment,
    )

    try:
        profile = await select_ai.AsyncProfile(profile_name=profile_name)
        queue = asyncio.Queue()
        results = [None] * len(prompts)

        workers = [
            asyncio.create_task(worker(i, profile, queue, results))
            for i in range(worker_count)
        ]

        for index, prompt in enumerate(prompts):
            await queue.put((index, prompt))

        for _ in workers:
            await queue.put(None)

        await queue.join()
        await asyncio.gather(*workers)

        for prompt, sql in results:
            print(f"\nPrompt: {prompt}")
            print(sql)
    finally:
        await select_ai.async_disconnect()


asyncio.run(main())

1.10. Pool sizing

Pool size controls how many database connections the application can use at one time. For thread and worker recipes, keep the worker count close to the pool maximum unless the application intentionally needs additional queued work.

In multi-process deployments, each process creates its own pool. Total possible database connections are approximately:

processes * SELECT_AI_POOL_MAX

Choose pool sizes that leave capacity for other database clients and for the AI provider calls made by DBMS_CLOUD_AI.