-
Notifications
You must be signed in to change notification settings - Fork 315
Expand file tree
/
Copy path23_agents_parallelization.py
More file actions
114 lines (90 loc) · 3.54 KB
/
23_agents_parallelization.py
File metadata and controls
114 lines (90 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
This example shows the parallelization pattern. We run the agent three times in parallel, and pick
the best result.
# Usage:
🤖: I'm a financial report research analyst. Enter a stock ticker on IDX to begin.
👧: ADRO
"""
import asyncio
from agents import Agent, Runner, ItemHelpers, function_tool, trace
from utils.api_client import retrieve_from_endpoint
@function_tool
def get_company_financials(ticker: str) -> str:
"""
Get company financials from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/company/report/{ticker}/?sections=financials"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
@function_tool
def get_revenue_segments(ticker: str) -> str:
"""
Get revenue segments for a company from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/company/get-segments/{ticker}/"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
@function_tool
def get_quarterly_financials(ticker: str) -> str:
"""
Get revenue segments for a company from Indonesia Exchange (IDX)
"""
url = f"https://api.sectors.app/v1/financials/quarterly/{ticker}/?report_date=2024-12-31&approx=true"
try:
return retrieve_from_endpoint(url)
except Exception as e:
print(f"Error occurred: {e}")
return None
company_financials_research_agent = Agent(
name="company_financials_research_agent",
instructions="Research the financials of a company based on the ticker provided.",
tools=[get_company_financials],
output_type=str
)
company_revenue_breakdown_agent = Agent(
name="company_revenue_breakdown_agent",
instructions="Research the revenue breakdown of a company based on the ticker provided.",
tools=[get_revenue_segments],
output_type=str
)
company_quarterly_financials_agent = Agent(
name="company_quarterly_financials_agent",
instructions="Research the quarterly financials of a company based on the ticker provided.",
tools=[get_quarterly_financials],
output_type=str
)
research_team_leader_aggregator = Agent(
name="research_team_leader_aggregator",
instructions="You are the team leader of a research team. You will aggregate the results from these agents and provide a consolidated answer that is relevant to the user.",
output_type=str
)
async def main():
input_prompt = input(f"🤖: I'm a financial report research analyst. Enter a stock ticker on IDX to begin. \n👧: ")
# Ensure the entire workflow is a single trace
with trace("Parallelization"):
# Run the agents in parallel
agent_res1, agent_res2, agent_res3 = await asyncio.gather(
Runner.run(company_financials_research_agent, input_prompt),
Runner.run(company_revenue_breakdown_agent, input_prompt),
Runner.run(company_quarterly_financials_agent, input_prompt)
)
outputs = [
ItemHelpers.text_message_outputs(agent_res1.new_items),
ItemHelpers.text_message_outputs(agent_res2.new_items),
ItemHelpers.text_message_outputs(agent_res3.new_items),
]
# Aggregate the results
aggregated_result = "\n\n".join(outputs)
summary = await Runner.run(
research_team_leader_aggregator,
aggregated_result
)
print(f"🤖: {summary.final_output}")
if __name__ == "__main__":
asyncio.run(main())