Azure持久功能:处理列表

2024年 2月 22日 49.3k 0

azure持久功能:处理列表

问题内容

我有一个用 python 编写的 azure 持久函数,带有一个协调器和两个活动函数

orchestrator 调用第一个活动函数,并作为回报接收一个列表变量(名称列表和此列表可以在每次执行函数时都是动态的)

下一步是为每个列表项调用第二个活动函数(顺序处理 - 由于第二个活动函数调用的 api 限制)

#dynamically gets generated by the first activity function
payload=[1,2,3,4]

tasks = [context.call_activity("secondfunction",ps) for ps in payload]
output = yield context.task_all(tasks)

登录后复制

我在扇出方法中使用的不是串行的,但我似乎无法找到我想要做的事情的替代方法。

此外,在 host.json 文件中,我尝试强制在给定时间只能运行一个活动函数,以避免并行处理

"extensions": {
"durableTask": {
"maxConcurrentActivityFunctions": 1,
"maxConcurrentOrchestratorFunctions": 1
}
}

登录后复制

还值得注意的是,我无法将整个列表传递给活动函数,就好像我执行活动函数将花费超过 5-10 分钟,这是 azure 函数的超时限制,因此尝试迭代列表编排功能

但结果不是连续的

非常感谢您的反馈

正确答案

您可以尝试使用以下两种方法来实现您的要求:-

方法 1:-

我的function_app.py:-

import azure.functions as func
import azure.durable_functions as df

myapp = df.dfapp(http_auth_level=func.authlevel.anonymous)

# http starter
@myapp.route(route="orchestrators/{functionname}")
@myapp.durable_client_input(client_name="client")
async def http_start(req: func.httprequest, client):
function_name = req.route_params.get('functionname')
instance_id = await client.start_new(function_name, none) # pass the functionname here
response = client.create_check_status_response(req, instance_id)
return response

# orchestrator
@myapp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
cities = ["seattle", "tokyo", "london"]

tasks = []
for city in cities:
tasks.append(context.call_activity("hello", city))

# wait for all tasks to complete
results = yield context.task_all(tasks)

return results

# activity
@myapp.activity_trigger(input_name="city")
def hello(city: str):
print(f"processing {city}...")
# your activity function logic goes here
result = f"hello {city}!"

return result

登录后复制

输出:-

函数 url:-

http://localhost:7071/api/orchestrators/hello_orchestrator

登录后复制

方法 2:-

function_app.py:-

import azure.functions as func
import azure.durable_functions as df

myApp = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS)

# HTTP Starter
@myApp.route(route="orchestrators/{functionName}")
@myApp.durable_client_input(client_name="client")
async def http_start(req: func.HttpRequest, client):
function_name = req.route_params.get('functionName')
instance_id = await client.start_new(function_name, None) # Pass the functionName here
response = client.create_check_status_response(req, instance_id)
return response

# Orchestrator
@myApp.orchestration_trigger(context_name="context")
def hello_orchestrator(context):
# Call the first activity to get a list of names
names_list = yield context.call_activity("get_names")

# Process each name sequentially using the second activity
results = []
for name in names_list:
result = yield context.call_activity("process_name", name)
results.append(result)

return results

# First Activity
@myApp.activity_trigger
def get_names():
# Your logic to retrieve a dynamic list of names goes here
# For demonstration purposes, returning a hardcoded list
return ["John", "Alice", "Bob"]

# Second Activity
@myApp.activity_trigger(input_name="name")
def process_name(name: str):
print(f"Processing {name}...")
# Your logic to process each name goes here
result = f"Hello {name}!"

return result

登录后复制

以上就是Azure持久功能:处理列表的详细内容,更多请关注每日运维网(www.mryunwei.com)其它相关文章!

相关文章

JavaScript2024新功能:Object.groupBy、正则表达式v标志
PHP trim 函数对多字节字符的使用和限制
新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
为React 19做准备:WordPress 6.6用户指南
如何删除WordPress中的所有评论

发布评论