-
Notifications
You must be signed in to change notification settings - Fork 1.9k
feat(spark): add spark random functions #19908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
for reference the spark random functions are implemented here https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala |
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't fully reviewed the PR yet, but this reminds me of
Is this something we'll need to be concerned about? In how seed is treated across record batches
| ColumnarValue::Scalar(ScalarValue::Int64(None)) => 0, | ||
| _ => { | ||
| return exec_err!( | ||
| "`{}` function expects an Int64 seed argument", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "`{}` function expects an Int64 seed argument", | |
| "`{}` function expects a constant Int64 seed argument", |
oh yea, that will be an issue... I'm curious if the RecordBatch concept in Datafusion is a direct equivalent of a partition in Spark ? what i mean is can we expect the same determinism in record batches as partitions in spark ? If not, then we can use some internal state in the UDF to avoid the same seed across batches (AtomicU64 we would increment on every invocation ?) |
| } | ||
|
|
||
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| let [seed] = take_function_args(self.name(), args.args)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the UDF is called without any arguments ?!
The seed is optional - https://github.com/apache/datafusion/pull/19908/changes#diff-ba2db3d7d9cf4ab01c5a4186821e481786accc221e203b3a31450f8ca6d5c473R59
randr impl below is checking the args length: https://github.com/apache/datafusion/pull/19908/changes#diff-ba2db3d7d9cf4ab01c5a4186821e481786accc221e203b3a31450f8ca6d5c473R191-R194
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is no seed provided we default to the Datafusion random implementation with the simplify call
| } | ||
| _ => { | ||
| return exec_err!( | ||
| "`{}` function expects a positive Int32 length argument", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark (v4.0.1) behaves a little bit different:
Length>0 (OK):
spark-sql (default)> SELECT randstr(3, 0) AS result;
ceV
Time taken: 0.913 seconds, Fetched 1 row(s)
Length=0 (OK, empty string):
spark-sql (default)> SELECT randstr(0, 0) AS result;
Time taken: 0.043 seconds, Fetched 1 row(s)
Length<0 (NOK):
spark-sql (default)> SELECT randstr(-1, 0) AS result;
[INVALID_PARAMETER_VALUE.LENGTH] The value of parameter(s) `length` in `randstr` is invalid: Expects `length` greater than or equal to 0, but got -1. SQLSTATE: 22023
org.apache.spark.SparkRuntimeException: [INVALID_PARAMETER_VALUE.LENGTH] The value of parameter(s) `length` in `randstr` is invalid: Expects `length` greater than or equal to 0, but got -1. SQLSTATE: 22023
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated for length = 0
| } | ||
|
|
||
| fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| internal_err!("`invoke_with_args` is not implemented for {}", self.name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to always depend on simplify() ?
I think other udfs implement both invoke_with_args() and simplify(). This way it still works if PhysicalExprSimplifier is not used (disabled optimizer) or if the UDF is called directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes i think we can consider it safe, there are a bunch of places where only simplify is implemented https://github.com/search?q=repo%3Aapache%2Fdatafusion+ExprSimplifyResult+language%3ARust+path%3A%2F%5Edatafusion%5C%2Ffunctions%5C%2Fsrc%5C%2F%2F&type=code
|
|
||
| Ok(ExprSimplifyResult::Simplified( | ||
| min.clone() | ||
| .add((max.sub(min)).mul(rand_expr)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible that min is a volatile Expr ? E.g. random().
In that case the two usages above will evaluate to two different values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
min should be a literal, will update the logic to verify that
| SELECT random(0::integer); | ||
| ---- | ||
| 0.324575268031407 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| query B | |
| SELECT random() > 0; | |
| ---- | |
| true |
to test random() without a seed
Update: Actually random() is used below, so it is tested. I guess it works due to the simplification but it would fail to read the seed when executed directly, i.e. without simplification.
Might need someone from comet or sail to chip in, they might be more familiar with how concepts map between DataFusion and Spark
This could be a good stop-gap solution in the meantime 🤔 |
Which issue does this PR close?
datafusion-sparkSpark Compatible Functions #15914Rationale for this change
Implement spark random functions:
What changes are included in this PR?
New spark random functions
Are these changes tested?
yes in slt
Are there any user-facing changes?
yes