From 22ced650930f41a44806a18cdaf979f087111912 Mon Sep 17 00:00:00 2001 From: Ramine Agoune Date: Wed, 10 Dec 2025 12:07:11 +0100 Subject: [PATCH 1/2] feat(queries): Task fn replaces ForeachChannel This simplified API uses ~40% less memory and reduces ~55% the number of allocations. --- README.md | 49 +++-- benchmark/volt_test.go | 21 ++ query.go | 438 ++++++++++++++++++++++++++++++++++++++++- query_test.go | 350 ++++++++++++++++++++++++++++++++ 4 files changed, 834 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index 2d8d218..1cb285b 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,8 @@ for result := range query.Foreach(nil) { transformData(result.A) } ``` -The Foreach function receives a function to pre-filter the results, and returns an iterator. + +DEPRECATED: The Foreach function receives a function to pre-filter the results, and returns an iterator. For faster performances, you can use concurrency with the function ForeachChannel: ```go @@ -143,6 +144,21 @@ func runWorkers(workersNumber int, worker func(int)) { } ``` +The Task function replaces the previous ForeachChannel function: it gives a simpler API with smaller memory footprint, for similar performances. + +It is useful for executing code on all queried entities, by dispatching the work across a defined number of workers. +Internally, this function leverages the iterator typically obtained through the Foreach function. + +The number of workers should be considered based on the task's complexity: +dispatching has a minmal overhead that might outweigh the benefits if the entity count to worker count ratio is not appropriate. +```go +query := volt.CreateQuery2[transformComponent, meshComponent](world, volt.QueryConfiguration{OptionalComponents: []volt.OptionalComponent{meshComponentId}}) + +query.Task(4, nil, func(result volt.QueryResult2[transformComponent, meshComponent]) { + transformData(result.A) +}) +``` + Queries exist for 1 to 8 Components. You can also get the number of entities, without looping on each: @@ -255,21 +271,22 @@ goarch: amd64 pkg: benchmark cpu: AMD Ryzen 7 5800X 8-Core Processor -| Benchmark | Iterations | ns/op | B/op | Allocs/op | -|---------------------------------|------------|-----------|------------|-----------| -| BenchmarkCreateEntityArche-16 | 171 | 7138387 | 11096954 | 61 | -| BenchmarkIterateArche-16 | 2798 | 429744 | 354 | 4 | -| BenchmarkAddArche-16 | 253 | 4673362 | 122153 | 100000 | -| BenchmarkRemoveArche-16 | 247 | 4840772 | 100000 | 100000 | -| BenchmarkCreateEntityUECS-16 | 27 | 38852089 | 49119503 | 200146 | -| BenchmarkIterateUECS-16 | 4892 | 235333 | 128 | 3 | -| BenchmarkAddUECS-16 | 28 | 38982533 | 4721942 | 100005 | -| BenchmarkRemoveUECS-16 | 30 | 40290316 | 3336712 | 100000 | -| BenchmarkCreateEntityVolt-16 | 63 | 18836136 | 35181458 | 100101 | -| BenchmarkIterateVolt-16 | 3619 | 337764 | 256 | 8 | -| BenchmarkIterateConcurrentlyVolt-16 | 9164 | 121653 | 3324 | 91 | -| BenchmarkAddVolt-16 | 103 | 11379690 | 4313182 | 300000 | -| BenchmarkRemoveVolt-16 | 146 | 7647252 | 400001 | 100000 | +| Benchmark | Iterations | ns/op | B/op | Allocs/op | +|--------------------------------------------------|-------------|-----------|------------|-----------| +| BenchmarkCreateEntityArche-16 | 171 | 7138387 | 11096954 | 61 | +| BenchmarkIterateArche-16 | 2798 | 429744 | 354 | 4 | +| BenchmarkAddArche-16 | 253 | 4673362 | 122153 | 100000 | +| BenchmarkRemoveArche-16 | 247 | 4840772 | 100000 | 100000 | +| BenchmarkCreateEntityUECS-16 | 27 | 38852089 | 49119503 | 200146 | +| BenchmarkIterateUECS-16 | 4892 | 235333 | 128 | 3 | +| BenchmarkAddUECS-16 | 28 | 38982533 | 4721942 | 100005 | +| BenchmarkRemoveUECS-16 | 30 | 40290316 | 3336712 | 100000 | +| BenchmarkCreateEntityVolt-16 | 63 | 18836136 | 35181458 | 100101 | +| BenchmarkIterateVolt-16 | 3619 | 337764 | 256 | 8 | +| (DEPRECATED) BenchmarkIterateConcurrentlyVolt-16 | 9164 | 121653 | 3324 | 91 | +| BenchmarkTaskVolt-16 | 9859 | 119525 | 1847 | 38 | +| BenchmarkAddVolt-16 | 103 | 11379690 | 4313182 | 300000 | +| BenchmarkRemoveVolt-16 | 146 | 7647252 | 400001 | 100000 | These results show a few things: - Arche is the fastest tool for writes operations. In our game development though we would rather lean towards fastest read operations, because the games loops will read way more often than write. diff --git a/benchmark/volt_test.go b/benchmark/volt_test.go index 4effaf6..77c35ab 100644 --- a/benchmark/volt_test.go +++ b/benchmark/volt_test.go @@ -76,6 +76,27 @@ func BenchmarkIterateConcurrentlyVolt(b *testing.B) { b.ReportAllocs() } +func BenchmarkTaskVolt(b *testing.B) { + world := volt.CreateWorld(ENTITIES_COUNT) + volt.RegisterComponent[testTransform](world, &volt.ComponentConfig[testTransform]{}) + volt.RegisterComponent[testTag](world, &volt.ComponentConfig[testTag]{}) + + for i := 0; i < ENTITIES_COUNT; i++ { + id := world.CreateEntity() + volt.AddComponent[testTransform](world, id, testTransform{}) + volt.AddComponent[testTag](world, id, testTag{}) + } + + for b.Loop() { + query := volt.CreateQuery2[testTransform, testTag](world, volt.QueryConfiguration{}) + query.Task(WORKERS, nil, func(result volt.QueryResult2[testTransform, testTag]) { + transformData(result.A) + }) + } + + b.ReportAllocs() +} + func BenchmarkAddVolt(b *testing.B) { b.StopTimer() diff --git a/query.go b/query.go index eefe624..b05f49d 100644 --- a/query.go +++ b/query.go @@ -4,6 +4,7 @@ import ( "iter" "math" "slices" + "sync" ) // Optional ComponentId for Queries. @@ -117,10 +118,43 @@ func (query *Query1[A]) Foreach(filterFn func(QueryResult1[A]) bool) iter.Seq[Qu } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult1. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query1[A]) Task(workersCount int, filterFn func(QueryResult1[A]) bool, fn func(data QueryResult1[A])) { + storageA := getStorage[A](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult1[A] + + if sliceA != nil { + result.A = &sliceA[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult1 for all the entities with component A // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query1[A]) ForeachChannel(chunkSize int, filterFn func(QueryResult1[A]) bool) <-chan iter.Seq[QueryResult1[A]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -283,10 +317,48 @@ func (query *Query2[A, B]) Foreach(filterFn func(QueryResult2[A, B]) bool) iter. } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult2. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query2[A, B]) Task(workersCount int, filterFn func(QueryResult2[A, B]) bool, fn func(data QueryResult2[A, B])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult2[A, B] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult2 for all the entities with components A, B // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query2[A, B]) ForeachChannel(chunkSize int, filterFn func(QueryResult2[A, B]) bool) <-chan iter.Seq[QueryResult2[A, B]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -472,10 +544,53 @@ func (query *Query3[A, B, C]) Foreach(filterFn func(QueryResult3[A, B, C]) bool) } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult3. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query3[A, B, C]) Task(workersCount int, filterFn func(QueryResult3[A, B, C]) bool, fn func(data QueryResult3[A, B, C])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult3[A, B, C] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult3 for all the entities with components A, B, C // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query3[A, B, C]) ForeachChannel(chunkSize int, filterFn func(QueryResult3[A, B, C]) bool) <-chan iter.Seq[QueryResult3[A, B, C]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -679,10 +794,58 @@ func (query *Query4[A, B, C, D]) Foreach(filterFn func(QueryResult4[A, B, C, D]) } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult4. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query4[A, B, C, D]) Task(workersCount int, filterFn func(QueryResult4[A, B, C, D]) bool, fn func(data QueryResult4[A, B, C, D])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult4[A, B, C, D] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult4 for all the entities with components A, B, C, D // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query4[A, B, C, D]) ForeachChannel(chunkSize int, filterFn func(QueryResult4[A, B, C, D]) bool) <-chan iter.Seq[QueryResult4[A, B, C, D]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -903,10 +1066,63 @@ func (query *Query5[A, B, C, D, E]) Foreach(filterFn func(QueryResult5[A, B, C, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult5. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query5[A, B, C, D, E]) Task(workersCount int, filterFn func(QueryResult5[A, B, C, D, E]) bool, fn func(data QueryResult5[A, B, C, D, E])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult5[A, B, C, D, E] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult5 for all the entities with components A, B, C, D, E // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query5[A, B, C, D, E]) ForeachChannel(chunkSize int, filterFn func(QueryResult5[A, B, C, D, E]) bool) <-chan iter.Seq[QueryResult5[A, B, C, D, E]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1145,10 +1361,68 @@ func (query *Query6[A, B, C, D, E, F]) Foreach(filterFn func(QueryResult6[A, B, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult6. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query6[A, B, C, D, E, F]) Task(workersCount int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool, fn func(data QueryResult6[A, B, C, D, E, F])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult6[A, B, C, D, E, F] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult6 for all the entities with components A, B, C, D, E, F // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query6[A, B, C, D, E, F]) ForeachChannel(chunkSize int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool) <-chan iter.Seq[QueryResult6[A, B, C, D, E, F]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1405,10 +1679,73 @@ func (query *Query7[A, B, C, D, E, F, G]) Foreach(filterFn func(QueryResult7[A, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult7. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query7[A, B, C, D, E, F, G]) Task(workersCount int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool, fn func(data QueryResult7[A, B, C, D, E, F, G])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + storageG := getStorage[G](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + sliceG := storageG.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult7[A, B, C, D, E, F, G] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + if sliceG != nil { + result.G = &sliceG[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult7 for all the entities with components A, B, C, D, E, F, G // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query7[A, B, C, D, E, F, G]) ForeachChannel(chunkSize int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool) <-chan iter.Seq[QueryResult7[A, B, C, D, E, F, G]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1682,10 +2019,78 @@ func (query *Query8[A, B, C, D, E, F, G, H]) Foreach(filterFn func(QueryResult8[ } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult8. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query8[A, B, C, D, E, F, G, H]) Task(workersCount int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool, fn func(data QueryResult8[A, B, C, D, E, F, G, H])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + storageG := getStorage[G](query.World) + storageH := getStorage[H](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + sliceG := storageG.archetypesComponentsEntities[archetype.Id] + sliceH := storageH.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult8[A, B, C, D, E, F, G, H] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + if sliceG != nil { + result.G = &sliceG[i] + } + if sliceH != nil { + result.H = &sliceH[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult8 for all the entities with components A, B, C, D, E, F, G, H // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query8[A, B, C, D, E, F, G, H]) ForeachChannel(chunkSize int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool) <-chan iter.Seq[QueryResult8[A, B, C, D, E, F, G, H]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1794,3 +2199,20 @@ func (query *Query8[A, B, C, D, E, F, G, H]) ForeachChannel(chunkSize int, filte return channel } + +func task[T any](workersCount int, data []T, fn func(i int, data T)) { + var wg sync.WaitGroup + dataSize := len(data) + chunkSize := (dataSize + workersCount - 1) / workersCount + + for workerID := 0; workerID < workersCount; workerID++ { + wg.Add(1) + go func(start, end int) { + defer wg.Done() + for i := start; i < end; i++ { + fn(i, data[i]) + } + }(workerID*chunkSize, min((workerID+1)*chunkSize, dataSize)) + } + wg.Wait() +} diff --git a/query_test.go b/query_test.go index 361973f..ac78c20 100644 --- a/query_test.go +++ b/query_test.go @@ -2,6 +2,7 @@ package volt import ( "slices" + "sync" "testing" ) @@ -99,6 +100,46 @@ func TestQuery1_Foreach(t *testing.T) { } } +func TestQuery1_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponent[testComponent1](world, entityId, testComponent1{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery1[testComponent1](world, QueryConfiguration{}) + var results []QueryResult1[testComponent1] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult1[testComponent1]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery1_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -234,6 +275,47 @@ func TestQuery2_Foreach(t *testing.T) { } } +func TestQuery2_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents2[testComponent1, testComponent2](world, entityId, testComponent1{}, testComponent2{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery2[testComponent1, testComponent2](world, QueryConfiguration{}) + var results []QueryResult2[testComponent1, testComponent2] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult2[testComponent1, testComponent2]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery2_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -375,6 +457,48 @@ func TestQuery3_Foreach(t *testing.T) { } } +func TestQuery3_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents3[testComponent1, testComponent2, testComponent3](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery3[testComponent1, testComponent2, testComponent3](world, QueryConfiguration{}) + var results []QueryResult3[testComponent1, testComponent2, testComponent3] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult3[testComponent1, testComponent2, testComponent3]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery3_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -520,6 +644,49 @@ func TestQuery4_Foreach(t *testing.T) { } } +func TestQuery4_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents4[testComponent1, testComponent2, testComponent3, testComponent4](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery4[testComponent1, testComponent2, testComponent3, testComponent4](world, QueryConfiguration{}) + var results []QueryResult4[testComponent1, testComponent2, testComponent3, testComponent4] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult4[testComponent1, testComponent2, testComponent3, testComponent4]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery4_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -670,6 +837,50 @@ func TestQuery5_Foreach(t *testing.T) { } } +func TestQuery5_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5](world, QueryConfiguration{}) + var results []QueryResult5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery5_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -825,6 +1036,51 @@ func TestQuery6_Foreach(t *testing.T) { } } +func TestQuery6_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6](world, QueryConfiguration{}) + var results []QueryResult6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery6_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -986,6 +1242,52 @@ func TestQuery7_Foreach(t *testing.T) { } } +func TestQuery7_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + RegisterComponent[testComponent7](world, &ComponentConfig[testComponent7]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}, testComponent7{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7](world, QueryConfiguration{}) + var results []QueryResult7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery7_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -1144,6 +1446,54 @@ func TestQuery8_Foreach(t *testing.T) { } } } + +func TestQuery8_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + RegisterComponent[testComponent7](world, &ComponentConfig[testComponent7]{}) + RegisterComponent[testComponent8](world, &ComponentConfig[testComponent8]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}, testComponent7{}, testComponent8{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8](world, QueryConfiguration{}) + var results []QueryResult8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery8_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) From 1ddc95f37a9cd24ec524f8e71fcb0276e583a137 Mon Sep 17 00:00:00 2001 From: Ramine Agoune Date: Wed, 10 Dec 2025 12:16:26 +0100 Subject: [PATCH 2/2] chore: rename data to result in Query.Task --- query.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/query.go b/query.go index b05f49d..906aaca 100644 --- a/query.go +++ b/query.go @@ -124,7 +124,7 @@ func (query *Query1[A]) Foreach(filterFn func(QueryResult1[A]) bool) iter.Seq[Qu // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query1[A]) Task(workersCount int, filterFn func(QueryResult1[A]) bool, fn func(data QueryResult1[A])) { +func (query *Query1[A]) Task(workersCount int, filterFn func(QueryResult1[A]) bool, fn func(result QueryResult1[A])) { storageA := getStorage[A](query.World) archetypes := query.filter() @@ -323,7 +323,7 @@ func (query *Query2[A, B]) Foreach(filterFn func(QueryResult2[A, B]) bool) iter. // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query2[A, B]) Task(workersCount int, filterFn func(QueryResult2[A, B]) bool, fn func(data QueryResult2[A, B])) { +func (query *Query2[A, B]) Task(workersCount int, filterFn func(QueryResult2[A, B]) bool, fn func(result QueryResult2[A, B])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) @@ -550,7 +550,7 @@ func (query *Query3[A, B, C]) Foreach(filterFn func(QueryResult3[A, B, C]) bool) // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query3[A, B, C]) Task(workersCount int, filterFn func(QueryResult3[A, B, C]) bool, fn func(data QueryResult3[A, B, C])) { +func (query *Query3[A, B, C]) Task(workersCount int, filterFn func(QueryResult3[A, B, C]) bool, fn func(result QueryResult3[A, B, C])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World) @@ -800,7 +800,7 @@ func (query *Query4[A, B, C, D]) Foreach(filterFn func(QueryResult4[A, B, C, D]) // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query4[A, B, C, D]) Task(workersCount int, filterFn func(QueryResult4[A, B, C, D]) bool, fn func(data QueryResult4[A, B, C, D])) { +func (query *Query4[A, B, C, D]) Task(workersCount int, filterFn func(QueryResult4[A, B, C, D]) bool, fn func(result QueryResult4[A, B, C, D])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World) @@ -1072,7 +1072,7 @@ func (query *Query5[A, B, C, D, E]) Foreach(filterFn func(QueryResult5[A, B, C, // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query5[A, B, C, D, E]) Task(workersCount int, filterFn func(QueryResult5[A, B, C, D, E]) bool, fn func(data QueryResult5[A, B, C, D, E])) { +func (query *Query5[A, B, C, D, E]) Task(workersCount int, filterFn func(QueryResult5[A, B, C, D, E]) bool, fn func(result QueryResult5[A, B, C, D, E])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World) @@ -1367,7 +1367,7 @@ func (query *Query6[A, B, C, D, E, F]) Foreach(filterFn func(QueryResult6[A, B, // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query6[A, B, C, D, E, F]) Task(workersCount int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool, fn func(data QueryResult6[A, B, C, D, E, F])) { +func (query *Query6[A, B, C, D, E, F]) Task(workersCount int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool, fn func(result QueryResult6[A, B, C, D, E, F])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World) @@ -1685,7 +1685,7 @@ func (query *Query7[A, B, C, D, E, F, G]) Foreach(filterFn func(QueryResult7[A, // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query7[A, B, C, D, E, F, G]) Task(workersCount int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool, fn func(data QueryResult7[A, B, C, D, E, F, G])) { +func (query *Query7[A, B, C, D, E, F, G]) Task(workersCount int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool, fn func(result QueryResult7[A, B, C, D, E, F, G])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World) @@ -2025,7 +2025,7 @@ func (query *Query8[A, B, C, D, E, F, G, H]) Foreach(filterFn func(QueryResult8[ // // The workersCount parameter determines the number of parallel workers. // Data is automatically partitioned across workers for optimal performance. -func (query *Query8[A, B, C, D, E, F, G, H]) Task(workersCount int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool, fn func(data QueryResult8[A, B, C, D, E, F, G, H])) { +func (query *Query8[A, B, C, D, E, F, G, H]) Task(workersCount int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool, fn func(result QueryResult8[A, B, C, D, E, F, G, H])) { storageA := getStorage[A](query.World) storageB := getStorage[B](query.World) storageC := getStorage[C](query.World)