1. Concurrent prompt processing¶
Use concurrent prompt processing when an application needs to send multiple
independent prompts without waiting for each prompt to finish before starting
the next one. The select_ai module supports this pattern with both the
synchronous Profile API and the asynchronous AsyncProfile API.
Create a connection pool before running concurrent work. Use
select_ai.create_pool() for synchronous recipes and
select_ai.create_pool_async() for asynchronous recipes.
1.1. Recipe summary¶
Recipe |
Script |
When to use |
|---|---|---|
Sync completion |
Use |
|
Sync input order |
Use |
|
Sync queue |
Use worker threads and a queue for producer-consumer workloads where prompts may arrive over time. |
|
Async input order |
Use |
|
Async completion |
Use |
|
Async pipeline |
Use |
|
Async queue |
Use async queue workers for long-running async services or background prompt processors. |
1.2. Environment variables¶
The recipes use the same connection environment variables as the other samples:
export SELECT_AI_USER=<select_ai_db_user>
export SELECT_AI_PASSWORD=<select_ai_db_password>
export SELECT_AI_DB_CONNECT_STRING=<db_connect_string>
Optional environment variables control pool sizing and profile names:
export SELECT_AI_POOL_MIN=1
export SELECT_AI_POOL_MAX=4
export SELECT_AI_POOL_INCREMENT=1
export SELECT_AI_PROFILE_NAME=oci_ai_profile
Use SELECT_AI_PROFILE_NAME=async_oci_ai_profile for the async recipes if
that is the async profile name in your environment.
1.3. sync_thread_pool.py¶
This recipe uses ThreadPoolExecutor and as_completed(). Results are
printed in the order they finish.
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
max_workers = int(os.getenv("SELECT_AI_MAX_WORKERS", str(pool_max)))
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
def show_sql(prompt):
profile = select_ai.Profile(profile_name=profile_name)
return prompt, profile.show_sql(prompt=prompt)
select_ai.create_pool(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(show_sql, prompt) for prompt in prompts]
for future in as_completed(futures):
prompt, sql = future.result()
print(f"\nPrompt: {prompt}")
print(sql)
finally:
select_ai.disconnect()
1.4. sync_ordered_results.py¶
This recipe uses ThreadPoolExecutor.map(). Prompts run concurrently, but
results are printed in the same order as the input list.
import os
from concurrent.futures import ThreadPoolExecutor
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
max_workers = int(os.getenv("SELECT_AI_MAX_WORKERS", str(pool_max)))
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
def show_sql(prompt):
profile = select_ai.Profile(profile_name=profile_name)
return profile.show_sql(prompt=prompt)
select_ai.create_pool(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(show_sql, prompts)
for prompt, sql in zip(prompts, results):
print(f"\nPrompt: {prompt}")
print(sql)
finally:
select_ai.disconnect()
1.5. sync_queue_workers.py¶
This recipe uses worker threads and queue.Queue. It is useful when prompt
producers and prompt processors are separate parts of an application.
import os
from queue import Queue
from threading import Thread
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "oci_ai_profile")
worker_count = int(os.getenv("SELECT_AI_WORKER_COUNT", str(pool_max)))
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
def worker(queue, results):
while True:
item = queue.get()
try:
if item is None:
return
index, prompt = item
profile = select_ai.Profile(profile_name=profile_name)
sql = profile.show_sql(prompt=prompt)
results[index] = (prompt, sql)
finally:
queue.task_done()
select_ai.create_pool(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
queue = Queue()
results = [None] * len(prompts)
workers = [
Thread(target=worker, args=(queue, results))
for _ in range(worker_count)
]
for thread in workers:
thread.start()
for index, prompt in enumerate(prompts):
queue.put((index, prompt))
for _ in workers:
queue.put(None)
queue.join()
for thread in workers:
thread.join()
for prompt, sql in results:
print(f"\nPrompt: {prompt}")
print(sql)
finally:
select_ai.disconnect()
1.6. async_gather.py¶
This recipe uses asyncio.gather(). Prompts run concurrently, and results
are returned in the same order as the input task list.
import asyncio
import os
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
async def show_sql(profile, prompt):
return await profile.show_sql(prompt=prompt)
async def main():
select_ai.create_pool_async(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
profile = await select_ai.AsyncProfile(profile_name=profile_name)
tasks = [show_sql(profile, prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
for prompt, sql in zip(prompts, results):
print(f"\nPrompt: {prompt}")
print(sql)
finally:
await select_ai.async_disconnect()
asyncio.run(main())
1.7. async_as_completed.py¶
This recipe uses asyncio.as_completed(). It is useful for command-line
tools or services that can forward each answer as soon as it is ready.
import asyncio
import os
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
async def show_sql(profile, prompt):
sql = await profile.show_sql(prompt=prompt)
return prompt, sql
async def main():
select_ai.create_pool_async(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
profile = await select_ai.AsyncProfile(profile_name=profile_name)
tasks = [show_sql(profile, prompt) for prompt in prompts]
for task in asyncio.as_completed(tasks):
prompt, sql = await task
print(f"\nPrompt: {prompt}")
print(sql)
finally:
await select_ai.async_disconnect()
asyncio.run(main())
1.8. async_pipeline.py¶
This recipe uses AsyncProfile.run_pipeline() to send multiple
prompt/action pairs in one database round trip. This is different from Python
task concurrency: the application submits a batch and receives the batch
results when the pipeline completes.
import asyncio
import os
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")
prompt_specifications = [
("How many customers?", select_ai.Action.SHOWSQL),
("How many promotions?", select_ai.Action.RUNSQL),
("Explain how to count products.", select_ai.Action.EXPLAINSQL),
]
async def main():
select_ai.create_pool_async(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
profile = await select_ai.AsyncProfile(profile_name=profile_name)
results = await profile.run_pipeline(
prompt_specifications, continue_on_error=True
)
for (prompt, action), result in zip(prompt_specifications, results):
print(f"\nPrompt: {prompt}")
print(f"Action: {action}")
print(result)
finally:
await select_ai.async_disconnect()
asyncio.run(main())
1.9. async_queue_workers.py¶
This recipe uses asyncio.Queue and async worker tasks. It is useful for
long-running async applications that receive prompts over time.
import asyncio
import os
import select_ai
user = os.getenv("SELECT_AI_USER")
password = os.getenv("SELECT_AI_PASSWORD")
dsn = os.getenv("SELECT_AI_DB_CONNECT_STRING")
pool_min = int(os.getenv("SELECT_AI_POOL_MIN", "1"))
pool_max = int(os.getenv("SELECT_AI_POOL_MAX", "4"))
pool_increment = int(os.getenv("SELECT_AI_POOL_INCREMENT", "1"))
profile_name = os.getenv("SELECT_AI_PROFILE_NAME", "async_oci_ai_profile")
worker_count = int(os.getenv("SELECT_AI_WORKER_COUNT", str(pool_max)))
prompts = [
"How many customers?",
"How many products?",
"How many promotions?",
"List the top 5 customers by sales.",
]
async def worker(name, profile, queue, results):
while True:
item = await queue.get()
try:
if item is None:
return
index, prompt = item
sql = await profile.show_sql(prompt=prompt)
results[index] = (prompt, sql)
finally:
queue.task_done()
async def main():
select_ai.create_pool_async(
user=user,
password=password,
dsn=dsn,
min_size=pool_min,
max_size=pool_max,
increment=pool_increment,
)
try:
profile = await select_ai.AsyncProfile(profile_name=profile_name)
queue = asyncio.Queue()
results = [None] * len(prompts)
workers = [
asyncio.create_task(worker(i, profile, queue, results))
for i in range(worker_count)
]
for index, prompt in enumerate(prompts):
await queue.put((index, prompt))
for _ in workers:
await queue.put(None)
await queue.join()
await asyncio.gather(*workers)
for prompt, sql in results:
print(f"\nPrompt: {prompt}")
print(sql)
finally:
await select_ai.async_disconnect()
asyncio.run(main())
1.10. Pool sizing¶
Pool size controls how many database connections the application can use at one time. For thread and worker recipes, keep the worker count close to the pool maximum unless the application intentionally needs additional queued work.
In multi-process deployments, each process creates its own pool. Total possible database connections are approximately:
processes * SELECT_AI_POOL_MAX
Choose pool sizes that leave capacity for other database clients and for the
AI provider calls made by DBMS_CLOUD_AI.