From f3066d6ea3b8af3f3db0ebd0e89ea31be818509b Mon Sep 17 00:00:00 2001 From: Bob Haddleton Date: Thu, 22 Jan 2026 20:56:55 -0600 Subject: [PATCH] Add support for async compose and operatoe functions. Signed-off-by: Bob Haddleton --- README.md | 7 +++++ function/fn.py | 11 ++++++-- tests/test_fn.py | 66 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 82 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ec6de9c..a5c30c1 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,13 @@ spec: For more complex operations, see the [operation examples](example/operation/). +## Async Functions + +`function-python` supports async functions for compositions and operations. Use `async def compose()` +and `async def operate()` to define async functions. `async` should be used if the script/function +is making network or other potentially blocking IO calls `async`. This allows +`function-python` to serve multiple requests concurrently without blocking. + ## Usage Notes `function-python` is best for very simple cases. If writing Python inline of diff --git a/function/fn.py b/function/fn.py index 9aebedc..d2b9689 100644 --- a/function/fn.py +++ b/function/fn.py @@ -1,6 +1,7 @@ """A Crossplane composition function.""" import importlib.util +import inspect import types import grpc @@ -44,10 +45,16 @@ async def RunFunction( response.fatal(rsp, msg) case (True, False): log.debug("running composition function") - script.compose(req, rsp) + if inspect.iscoroutinefunction(script.compose): + await script.compose(req, rsp) + else: + script.compose(req, rsp) case (False, True): log.debug("running operation function") - script.operate(req, rsp) + if inspect.iscoroutinefunction(script.operate): + await script.operate(req, rsp) + else: + script.operate(req, rsp) case (False, False): msg = "script must define a compose or operate function" log.debug(msg) diff --git a/tests/test_fn.py b/tests/test_fn.py index 63cfd85..aa9da1b 100644 --- a/tests/test_fn.py +++ b/tests/test_fn.py @@ -24,6 +24,21 @@ def compose(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse): }) """ +async_composition_script = """ +from crossplane.function.proto.v1 import run_function_pb2 as fnv1 + +async def compose(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse): + rsp.desired.resources["bucket"].resource.update({ + "apiVersion": "s3.aws.upbound.io/v1beta2", + "kind": "Bucket", + "spec": { + "forProvider": { + "region": "us-east-1" + } + }, + }) +""" + operation_script = """ from crossplane.function.proto.v1 import run_function_pb2 as fnv1 @@ -33,6 +48,15 @@ def operate(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse): rsp.output["message"] = "Operation completed successfully" """ +async_operation_script = """ +from crossplane.function.proto.v1 import run_function_pb2 as fnv1 + +async def operate(req: fnv1.RunFunctionRequest, rsp: fnv1.RunFunctionResponse): + # Set output for operation monitoring + rsp.output["result"] = "success" + rsp.output["message"] = "Operation completed successfully" +""" + both_functions_script = """ from crossplane.function.proto.v1 import run_function_pb2 as fnv1 @@ -91,6 +115,31 @@ class TestCase: context=structpb.Struct(), ), ), + TestCase( + reason="Function should run async composition scripts with await.", + req=fnv1.RunFunctionRequest( + input=resource.dict_to_struct({"script": async_composition_script}), + ), + want=fnv1.RunFunctionResponse( + meta=fnv1.ResponseMeta(ttl=durationpb.Duration(seconds=60)), + desired=fnv1.State( + resources={ + "bucket": fnv1.Resource( + resource=resource.dict_to_struct( + { + "apiVersion": "s3.aws.upbound.io/v1beta2", + "kind": "Bucket", + "spec": { + "forProvider": {"region": "us-east-1"} + }, + } + ) + ) + } + ), + context=structpb.Struct(), + ), + ), ] runner = fn.FunctionRunner() @@ -128,6 +177,23 @@ class TestCase: ), ), ), + TestCase( + reason="Function should run async operation scripts with await.", + req=fnv1.RunFunctionRequest( + input=resource.dict_to_struct({"script": async_operation_script}), + ), + want=fnv1.RunFunctionResponse( + meta=fnv1.ResponseMeta(ttl=durationpb.Duration(seconds=60)), + desired=fnv1.State(), + context=structpb.Struct(), + output=resource.dict_to_struct( + { + "result": "success", + "message": "Operation completed successfully", + } + ), + ), + ), ] runner = fn.FunctionRunner()