diff --git a/README.md b/README.md index 2d8d218..1cb285b 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,8 @@ for result := range query.Foreach(nil) { transformData(result.A) } ``` -The Foreach function receives a function to pre-filter the results, and returns an iterator. + +DEPRECATED: The Foreach function receives a function to pre-filter the results, and returns an iterator. For faster performances, you can use concurrency with the function ForeachChannel: ```go @@ -143,6 +144,21 @@ func runWorkers(workersNumber int, worker func(int)) { } ``` +The Task function replaces the previous ForeachChannel function: it gives a simpler API with smaller memory footprint, for similar performances. + +It is useful for executing code on all queried entities, by dispatching the work across a defined number of workers. +Internally, this function leverages the iterator typically obtained through the Foreach function. + +The number of workers should be considered based on the task's complexity: +dispatching has a minmal overhead that might outweigh the benefits if the entity count to worker count ratio is not appropriate. +```go +query := volt.CreateQuery2[transformComponent, meshComponent](world, volt.QueryConfiguration{OptionalComponents: []volt.OptionalComponent{meshComponentId}}) + +query.Task(4, nil, func(result volt.QueryResult2[transformComponent, meshComponent]) { + transformData(result.A) +}) +``` + Queries exist for 1 to 8 Components. You can also get the number of entities, without looping on each: @@ -255,21 +271,22 @@ goarch: amd64 pkg: benchmark cpu: AMD Ryzen 7 5800X 8-Core Processor -| Benchmark | Iterations | ns/op | B/op | Allocs/op | -|---------------------------------|------------|-----------|------------|-----------| -| BenchmarkCreateEntityArche-16 | 171 | 7138387 | 11096954 | 61 | -| BenchmarkIterateArche-16 | 2798 | 429744 | 354 | 4 | -| BenchmarkAddArche-16 | 253 | 4673362 | 122153 | 100000 | -| BenchmarkRemoveArche-16 | 247 | 4840772 | 100000 | 100000 | -| BenchmarkCreateEntityUECS-16 | 27 | 38852089 | 49119503 | 200146 | -| BenchmarkIterateUECS-16 | 4892 | 235333 | 128 | 3 | -| BenchmarkAddUECS-16 | 28 | 38982533 | 4721942 | 100005 | -| BenchmarkRemoveUECS-16 | 30 | 40290316 | 3336712 | 100000 | -| BenchmarkCreateEntityVolt-16 | 63 | 18836136 | 35181458 | 100101 | -| BenchmarkIterateVolt-16 | 3619 | 337764 | 256 | 8 | -| BenchmarkIterateConcurrentlyVolt-16 | 9164 | 121653 | 3324 | 91 | -| BenchmarkAddVolt-16 | 103 | 11379690 | 4313182 | 300000 | -| BenchmarkRemoveVolt-16 | 146 | 7647252 | 400001 | 100000 | +| Benchmark | Iterations | ns/op | B/op | Allocs/op | +|--------------------------------------------------|-------------|-----------|------------|-----------| +| BenchmarkCreateEntityArche-16 | 171 | 7138387 | 11096954 | 61 | +| BenchmarkIterateArche-16 | 2798 | 429744 | 354 | 4 | +| BenchmarkAddArche-16 | 253 | 4673362 | 122153 | 100000 | +| BenchmarkRemoveArche-16 | 247 | 4840772 | 100000 | 100000 | +| BenchmarkCreateEntityUECS-16 | 27 | 38852089 | 49119503 | 200146 | +| BenchmarkIterateUECS-16 | 4892 | 235333 | 128 | 3 | +| BenchmarkAddUECS-16 | 28 | 38982533 | 4721942 | 100005 | +| BenchmarkRemoveUECS-16 | 30 | 40290316 | 3336712 | 100000 | +| BenchmarkCreateEntityVolt-16 | 63 | 18836136 | 35181458 | 100101 | +| BenchmarkIterateVolt-16 | 3619 | 337764 | 256 | 8 | +| (DEPRECATED) BenchmarkIterateConcurrentlyVolt-16 | 9164 | 121653 | 3324 | 91 | +| BenchmarkTaskVolt-16 | 9859 | 119525 | 1847 | 38 | +| BenchmarkAddVolt-16 | 103 | 11379690 | 4313182 | 300000 | +| BenchmarkRemoveVolt-16 | 146 | 7647252 | 400001 | 100000 | These results show a few things: - Arche is the fastest tool for writes operations. In our game development though we would rather lean towards fastest read operations, because the games loops will read way more often than write. diff --git a/benchmark/volt_test.go b/benchmark/volt_test.go index 4effaf6..77c35ab 100644 --- a/benchmark/volt_test.go +++ b/benchmark/volt_test.go @@ -76,6 +76,27 @@ func BenchmarkIterateConcurrentlyVolt(b *testing.B) { b.ReportAllocs() } +func BenchmarkTaskVolt(b *testing.B) { + world := volt.CreateWorld(ENTITIES_COUNT) + volt.RegisterComponent[testTransform](world, &volt.ComponentConfig[testTransform]{}) + volt.RegisterComponent[testTag](world, &volt.ComponentConfig[testTag]{}) + + for i := 0; i < ENTITIES_COUNT; i++ { + id := world.CreateEntity() + volt.AddComponent[testTransform](world, id, testTransform{}) + volt.AddComponent[testTag](world, id, testTag{}) + } + + for b.Loop() { + query := volt.CreateQuery2[testTransform, testTag](world, volt.QueryConfiguration{}) + query.Task(WORKERS, nil, func(result volt.QueryResult2[testTransform, testTag]) { + transformData(result.A) + }) + } + + b.ReportAllocs() +} + func BenchmarkAddVolt(b *testing.B) { b.StopTimer() diff --git a/query.go b/query.go index eefe624..906aaca 100644 --- a/query.go +++ b/query.go @@ -4,6 +4,7 @@ import ( "iter" "math" "slices" + "sync" ) // Optional ComponentId for Queries. @@ -117,10 +118,43 @@ func (query *Query1[A]) Foreach(filterFn func(QueryResult1[A]) bool) iter.Seq[Qu } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult1. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query1[A]) Task(workersCount int, filterFn func(QueryResult1[A]) bool, fn func(result QueryResult1[A])) { + storageA := getStorage[A](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult1[A] + + if sliceA != nil { + result.A = &sliceA[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult1 for all the entities with component A // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query1[A]) ForeachChannel(chunkSize int, filterFn func(QueryResult1[A]) bool) <-chan iter.Seq[QueryResult1[A]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -283,10 +317,48 @@ func (query *Query2[A, B]) Foreach(filterFn func(QueryResult2[A, B]) bool) iter. } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult2. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query2[A, B]) Task(workersCount int, filterFn func(QueryResult2[A, B]) bool, fn func(result QueryResult2[A, B])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult2[A, B] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult2 for all the entities with components A, B // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query2[A, B]) ForeachChannel(chunkSize int, filterFn func(QueryResult2[A, B]) bool) <-chan iter.Seq[QueryResult2[A, B]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -472,10 +544,53 @@ func (query *Query3[A, B, C]) Foreach(filterFn func(QueryResult3[A, B, C]) bool) } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult3. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query3[A, B, C]) Task(workersCount int, filterFn func(QueryResult3[A, B, C]) bool, fn func(result QueryResult3[A, B, C])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult3[A, B, C] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult3 for all the entities with components A, B, C // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query3[A, B, C]) ForeachChannel(chunkSize int, filterFn func(QueryResult3[A, B, C]) bool) <-chan iter.Seq[QueryResult3[A, B, C]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -679,10 +794,58 @@ func (query *Query4[A, B, C, D]) Foreach(filterFn func(QueryResult4[A, B, C, D]) } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult4. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query4[A, B, C, D]) Task(workersCount int, filterFn func(QueryResult4[A, B, C, D]) bool, fn func(result QueryResult4[A, B, C, D])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult4[A, B, C, D] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult4 for all the entities with components A, B, C, D // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query4[A, B, C, D]) ForeachChannel(chunkSize int, filterFn func(QueryResult4[A, B, C, D]) bool) <-chan iter.Seq[QueryResult4[A, B, C, D]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -903,10 +1066,63 @@ func (query *Query5[A, B, C, D, E]) Foreach(filterFn func(QueryResult5[A, B, C, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult5. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query5[A, B, C, D, E]) Task(workersCount int, filterFn func(QueryResult5[A, B, C, D, E]) bool, fn func(result QueryResult5[A, B, C, D, E])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult5[A, B, C, D, E] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult5 for all the entities with components A, B, C, D, E // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query5[A, B, C, D, E]) ForeachChannel(chunkSize int, filterFn func(QueryResult5[A, B, C, D, E]) bool) <-chan iter.Seq[QueryResult5[A, B, C, D, E]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1145,10 +1361,68 @@ func (query *Query6[A, B, C, D, E, F]) Foreach(filterFn func(QueryResult6[A, B, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult6. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query6[A, B, C, D, E, F]) Task(workersCount int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool, fn func(result QueryResult6[A, B, C, D, E, F])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult6[A, B, C, D, E, F] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult6 for all the entities with components A, B, C, D, E, F // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query6[A, B, C, D, E, F]) ForeachChannel(chunkSize int, filterFn func(QueryResult6[A, B, C, D, E, F]) bool) <-chan iter.Seq[QueryResult6[A, B, C, D, E, F]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1405,10 +1679,73 @@ func (query *Query7[A, B, C, D, E, F, G]) Foreach(filterFn func(QueryResult7[A, } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult7. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query7[A, B, C, D, E, F, G]) Task(workersCount int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool, fn func(result QueryResult7[A, B, C, D, E, F, G])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + storageG := getStorage[G](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + sliceG := storageG.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult7[A, B, C, D, E, F, G] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + if sliceG != nil { + result.G = &sliceG[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult7 for all the entities with components A, B, C, D, E, F, G // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query7[A, B, C, D, E, F, G]) ForeachChannel(chunkSize int, filterFn func(QueryResult7[A, B, C, D, E, F, G]) bool) <-chan iter.Seq[QueryResult7[A, B, C, D, E, F, G]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1682,10 +2019,78 @@ func (query *Query8[A, B, C, D, E, F, G, H]) Foreach(filterFn func(QueryResult8[ } } +// Task executes fn in parallel across workersCount goroutines for all entities matching the query. +// Each entity's components are passed to fn through QueryResult8. +// If filterFn is provided and returns false for an entity, that entity is skipped. +// +// The workersCount parameter determines the number of parallel workers. +// Data is automatically partitioned across workers for optimal performance. +func (query *Query8[A, B, C, D, E, F, G, H]) Task(workersCount int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool, fn func(result QueryResult8[A, B, C, D, E, F, G, H])) { + storageA := getStorage[A](query.World) + storageB := getStorage[B](query.World) + storageC := getStorage[C](query.World) + storageD := getStorage[D](query.World) + storageE := getStorage[E](query.World) + storageF := getStorage[F](query.World) + storageG := getStorage[G](query.World) + storageH := getStorage[H](query.World) + + archetypes := query.filter() + for _, archetype := range archetypes { + sliceA := storageA.archetypesComponentsEntities[archetype.Id] + sliceB := storageB.archetypesComponentsEntities[archetype.Id] + sliceC := storageC.archetypesComponentsEntities[archetype.Id] + sliceD := storageD.archetypesComponentsEntities[archetype.Id] + sliceE := storageE.archetypesComponentsEntities[archetype.Id] + sliceF := storageF.archetypesComponentsEntities[archetype.Id] + sliceG := storageG.archetypesComponentsEntities[archetype.Id] + sliceH := storageH.archetypesComponentsEntities[archetype.Id] + + task(workersCount, archetype.entities, func(i int, data EntityId) { + var result QueryResult8[A, B, C, D, E, F, G, H] + + if sliceA != nil { + result.A = &sliceA[i] + } + if sliceB != nil { + result.B = &sliceB[i] + } + if sliceC != nil { + result.C = &sliceC[i] + } + if sliceD != nil { + result.D = &sliceD[i] + } + if sliceE != nil { + result.E = &sliceE[i] + } + if sliceF != nil { + result.F = &sliceF[i] + } + if sliceG != nil { + result.G = &sliceG[i] + } + if sliceH != nil { + result.H = &sliceH[i] + } + result.EntityId = archetype.entities[i] + + if filterFn != nil && !filterFn(result) { + return + } + + fn(result) + }) + } +} + // ForeachChannel returns a channel of iterators of QueryResult8 for all the entities with components A, B, C, D, E, F, G, H // to which filterFn function returns true. -// // The parameter chunkSize defines the size of each iterators. +// +// Deprecated: ForeachChannel is deprecated and will be removed in a future version. +// Use Task(workersCount, filterFn, fn) instead, which offers better performance +// and a simpler API for parallel iteration. func (query *Query8[A, B, C, D, E, F, G, H]) ForeachChannel(chunkSize int, filterFn func(QueryResult8[A, B, C, D, E, F, G, H]) bool) <-chan iter.Seq[QueryResult8[A, B, C, D, E, F, G, H]] { if chunkSize == 0 { panic("chunk size must be greater than zero") @@ -1794,3 +2199,20 @@ func (query *Query8[A, B, C, D, E, F, G, H]) ForeachChannel(chunkSize int, filte return channel } + +func task[T any](workersCount int, data []T, fn func(i int, data T)) { + var wg sync.WaitGroup + dataSize := len(data) + chunkSize := (dataSize + workersCount - 1) / workersCount + + for workerID := 0; workerID < workersCount; workerID++ { + wg.Add(1) + go func(start, end int) { + defer wg.Done() + for i := start; i < end; i++ { + fn(i, data[i]) + } + }(workerID*chunkSize, min((workerID+1)*chunkSize, dataSize)) + } + wg.Wait() +} diff --git a/query_test.go b/query_test.go index 361973f..ac78c20 100644 --- a/query_test.go +++ b/query_test.go @@ -2,6 +2,7 @@ package volt import ( "slices" + "sync" "testing" ) @@ -99,6 +100,46 @@ func TestQuery1_Foreach(t *testing.T) { } } +func TestQuery1_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponent[testComponent1](world, entityId, testComponent1{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery1[testComponent1](world, QueryConfiguration{}) + var results []QueryResult1[testComponent1] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult1[testComponent1]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery1_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -234,6 +275,47 @@ func TestQuery2_Foreach(t *testing.T) { } } +func TestQuery2_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents2[testComponent1, testComponent2](world, entityId, testComponent1{}, testComponent2{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery2[testComponent1, testComponent2](world, QueryConfiguration{}) + var results []QueryResult2[testComponent1, testComponent2] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult2[testComponent1, testComponent2]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery2_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -375,6 +457,48 @@ func TestQuery3_Foreach(t *testing.T) { } } +func TestQuery3_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents3[testComponent1, testComponent2, testComponent3](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery3[testComponent1, testComponent2, testComponent3](world, QueryConfiguration{}) + var results []QueryResult3[testComponent1, testComponent2, testComponent3] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult3[testComponent1, testComponent2, testComponent3]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery3_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -520,6 +644,49 @@ func TestQuery4_Foreach(t *testing.T) { } } +func TestQuery4_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents4[testComponent1, testComponent2, testComponent3, testComponent4](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery4[testComponent1, testComponent2, testComponent3, testComponent4](world, QueryConfiguration{}) + var results []QueryResult4[testComponent1, testComponent2, testComponent3, testComponent4] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult4[testComponent1, testComponent2, testComponent3, testComponent4]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery4_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -670,6 +837,50 @@ func TestQuery5_Foreach(t *testing.T) { } } +func TestQuery5_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5](world, QueryConfiguration{}) + var results []QueryResult5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult5[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery5_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -825,6 +1036,51 @@ func TestQuery6_Foreach(t *testing.T) { } } +func TestQuery6_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6](world, QueryConfiguration{}) + var results []QueryResult6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult6[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery6_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -986,6 +1242,52 @@ func TestQuery7_Foreach(t *testing.T) { } } +func TestQuery7_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + RegisterComponent[testComponent7](world, &ComponentConfig[testComponent7]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}, testComponent7{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7](world, QueryConfiguration{}) + var results []QueryResult7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult7[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery7_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER) @@ -1144,6 +1446,54 @@ func TestQuery8_Foreach(t *testing.T) { } } } + +func TestQuery8_Task(t *testing.T) { + var entities []EntityId + world := CreateWorld(TEST_ENTITY_NUMBER) + RegisterComponent[testComponent1](world, &ComponentConfig[testComponent1]{}) + RegisterComponent[testComponent2](world, &ComponentConfig[testComponent2]{}) + RegisterComponent[testComponent3](world, &ComponentConfig[testComponent3]{}) + RegisterComponent[testComponent4](world, &ComponentConfig[testComponent4]{}) + RegisterComponent[testComponent5](world, &ComponentConfig[testComponent5]{}) + RegisterComponent[testComponent6](world, &ComponentConfig[testComponent6]{}) + RegisterComponent[testComponent7](world, &ComponentConfig[testComponent7]{}) + RegisterComponent[testComponent8](world, &ComponentConfig[testComponent8]{}) + + for i := 0; i < TEST_ENTITY_NUMBER; i++ { + entityId := world.CreateEntity() + entities = append(entities, entityId) + + err := AddComponents8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8](world, entityId, testComponent1{}, testComponent2{}, testComponent3{}, testComponent4{}, testComponent5{}, testComponent6{}, testComponent7{}, testComponent8{}) + if err != nil { + t.Errorf("%s", err.Error()) + } + } + + query := CreateQuery8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8](world, QueryConfiguration{}) + var results []QueryResult8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8] + var mu sync.Mutex + + query.Task(4, nil, func(result QueryResult8[testComponent1, testComponent2, testComponent3, testComponent4, testComponent5, testComponent6, testComponent7, testComponent8]) { + mu.Lock() + results = append(results, result) + mu.Unlock() + }) + + for _, entityId := range entities { + found := false + for _, result := range results { + if result.EntityId == entityId { + found = true + break + } + } + if !found { + t.Errorf("query should return EntityId %d in Task iterator", entityId) + break + } + } +} + func TestQuery8_ForeachChannel(t *testing.T) { var entities []EntityId world := CreateWorld(TEST_ENTITY_NUMBER)